Back to ER Diagram
Async Processing

Async Processing & Job Management

Background job processing, message queuing, batch operations, and scheduled tasks for ProKure+ at production scale.

Hangfire
Azure Service Bus
Email Batching
Report Generation

Overview

Async processing handles time-consuming operations without blocking user requests. This includes bulk imports, report generation, notification delivery, and scheduled maintenance tasks.

5
Queue Priorities
1000/chunk
Batch Size
5 min
Email Aggregation
5
Webhook Retries

Async Operations

  • Bulk Excel imports (vendors, items)
  • Report generation (PDF, Excel)
  • Email/SMS notifications
  • Webhook delivery
  • Data archival & cleanup

Key Technologies

  • Hangfire (Job Scheduling)
  • Azure Service Bus (Message Queue)
  • Redis (Distributed Locking)
  • SignalR (Progress Updates)
  • Azure Blob Storage (File Output)

Hangfire Job Processing

Hangfire manages all background jobs with PostgreSQL persistence for reliability and distributed processing.

QueuePriorityUse CasesTimeout
criticalHighestReverse auction bids, real-time alerts30 seconds
highHighPO approvals, email notifications2 minutes
defaultNormalStandard background tasks10 minutes
batchLowBulk imports, report generation1 hour
lowLowestData archival, cleanup tasks2 hours
// Program.cs - Hangfire Configuration (.NET 10)
builder.Services.AddHangfire(config => config
    .SetDataCompatibilityLevel(CompatibilityLevel.Version_180)
    .UseSimpleAssemblyNameTypeSerializer()
    .UseRecommendedSerializerSettings()
    .UsePostgreSqlStorage(
        builder.Configuration.GetConnectionString("HangfireConnection"),
        new PostgreSqlStorageOptions
        {
            SchemaName = "hangfire",
            QueuePollInterval = TimeSpan.FromSeconds(5),
            JobExpirationCheckInterval = TimeSpan.FromHours(1),
            PrepareSchemaIfNecessary = true,
            InvisibilityTimeout = TimeSpan.FromMinutes(30)
        }));

// Configure Hangfire Server with multiple queues
builder.Services.AddHangfireServer(options =>
{
    options.ServerName = $"ProKure-{Environment.MachineName}";
    options.WorkerCount = Environment.ProcessorCount * 2;
    options.Queues = new[]
    {
        "critical",      // Real-time auctions, alerts
        "high",          // PO approvals, notifications
        "default",       // Standard processing
        "batch",         // Batch imports, reports
        "low"            // Data archival, cleanup
    };
    options.StopTimeout = TimeSpan.FromSeconds(30);
});

Azure Service Bus Patterns

Azure Service Bus handles inter-service communication with guaranteed delivery and dead-letter handling.

Producer
API/Service
Service Bus
Queue/Topic
Consumer
Hangfire Worker
Retry (5x)
Exp. Backoff
Dead Letter
Manual Review
// Message Producer Service
public class MessageQueueService : IMessageQueueService
{
    private readonly ServiceBusSender _sender;

    public async Task PublishAsync<T>(T message, string messageType,
        CancellationToken ct = default) where T : class
    {
        var serviceBusMessage = new ServiceBusMessage(
            JsonSerializer.SerializeToUtf8Bytes(message))
        {
            MessageId = Guid.NewGuid().ToString(),
            ContentType = "application/json",
            Subject = messageType,
            ApplicationProperties =
            {
                ["TenantId"] = TenantContext.Current.TenantId,
                ["CorrelationId"] = Activity.Current?.Id,
                ["MessageType"] = typeof(T).FullName
            }
        };

        // Set TTL based on message type
        serviceBusMessage.TimeToLive = messageType switch
        {
            "auction-bid" => TimeSpan.FromMinutes(5),
            "po-approval" => TimeSpan.FromHours(24),
            "notification" => TimeSpan.FromHours(72),
            _ => TimeSpan.FromDays(7)
        };

        await _sender.SendMessageAsync(serviceBusMessage, ct);
    }
}

Dead Letter Handling

Messages that fail after 5 retries are moved to the dead-letter queue. An admin dashboard monitors dead-letter queues and allows manual reprocessing or deletion.

Batch Processing

Efficient batch processing for bulk imports, data migrations, and large-scale operations with memory management.

OperationChunk SizeParallelismMemory Strategy
Vendor Import1,000 rows4 threadsStream + GC per 10K rows
Item Master Import500 rows2 threadsStream + validation batch
PO Bulk Create100 itemsSequentialTransaction batching
Report Export5,000 rowsSingleStream to blob storage
// Excel Bulk Import with Chunking
public class BulkImportJob
{
    private const int ChunkSize = 1000;
    private const int MaxDegreeOfParallelism = 4;

    [Queue("batch")]
    [AutomaticRetry(Attempts = 3, DelaysInSeconds = new[] { 60, 300, 900 })]
    public async Task ImportVendorsAsync(Guid importJobId, string blobPath)
    {
        var job = await _context.ImportJobs.FindAsync(importJobId);
        job.Status = ImportStatus.Processing;
        job.StartedAt = DateTime.UtcNow;

        // Stream Excel file to avoid memory issues
        await using var stream = await _blobService.OpenReadAsync(blobPath);
        using var reader = ExcelReaderFactory.CreateOpenXmlReader(stream);

        var records = new List<VendorImportDto>();
        var rowNumber = 0;

        while (reader.Read())
        {
            rowNumber++;
            if (rowNumber == 1) continue; // Skip header

            var dto = ParseRow(reader, rowNumber);
            records.Add(dto);

            // Process in chunks to manage memory
            if (records.Count >= ChunkSize)
            {
                await ProcessChunkAsync(records);
                records.Clear();

                // Report progress via SignalR
                await _progress.ReportAsync(importJobId, rowNumber);

                // Allow GC to collect every 10K rows
                if (rowNumber % 10000 == 0)
                    GC.Collect(GC.MaxGeneration, GCCollectionMode.Optimized);
            }
        }

        // Process remaining records
        if (records.Any())
            await ProcessChunkAsync(records);

        job.Status = ImportStatus.Completed;
        job.CompletedAt = DateTime.UtcNow;
        await _context.SaveChangesAsync();
    }
}

Memory Management

Bulk imports use streaming + chunking to handle files with 100K+ rows without memory issues. Progress is reported every chunk for UI updates via SignalR.

Email Notification Batching

Intelligent email batching to reduce SendGrid/Brevo API calls and prevent notification fatigue.

Batching Configuration

  • 5-minute aggregation window
  • Max 50 notifications per batch
  • Immediate send for critical alerts
  • User preference for instant vs digest

Benefits

  • Reduces API calls by 80%
  • Prevents notification fatigue
  • Better email deliverability
  • Cost savings on email service
// Email Aggregation Service
public class EmailBatchingService : IEmailService
{
    private readonly IDistributedCache _cache;
    private readonly TimeSpan _batchWindow = TimeSpan.FromMinutes(5);

    public async Task QueueNotificationAsync(EmailNotification notification)
    {
        var batchKey = $"email:batch:{notification.RecipientId}:{notification.Category}";

        // Add to batch in Redis
        var batch = await _cache.GetAsync<EmailBatch>(batchKey)
            ?? new EmailBatch { RecipientId = notification.RecipientId };

        batch.Notifications.Add(notification);
        await _cache.SetAsync(batchKey, batch, _batchWindow);

        // Schedule batch send if this is the first notification
        if (batch.Notifications.Count == 1)
        {
            BackgroundJob.Schedule<EmailBatchSender>(
                x => x.SendBatchAsync(batchKey, CancellationToken.None),
                _batchWindow);
        }
    }
}

// Email Batch Sender
public class EmailBatchSender
{
    [Queue("high")]
    public async Task SendBatchAsync(string batchKey, CancellationToken ct)
    {
        var batch = await _cache.GetAsync<EmailBatch>(batchKey);
        if (batch == null) return;

        // Group notifications by type for digest
        var grouped = batch.Notifications.GroupBy(n => n.Type);

        // Build and send digest email
        var email = new DigestEmail
        {
            To = batch.RecipientEmail,
            Subject = BuildDigestSubject(grouped),
            TemplateName = "notification-digest",
            TemplateData = new { ... }
        };

        await _brevoClient.SendTemplatedEmailAsync(email, ct);
        await _cache.RemoveAsync(batchKey);
    }
}

Async Report Generation

Background report generation with streaming output for large datasets and SignalR notifications when complete.

Request Report
Queue Job
Return Job ID
Process Data
Stream to Blob
Notify User
SignalR + SAS URL
// Report Generation Service
public class ReportGenerationService
{
    [Queue("batch")]
    [AutomaticRetry(Attempts = 2)]
    public async Task GenerateReportAsync(ReportRequest request)
    {
        var blobName = $"reports/{request.TenantId}/{request.JobId}.xlsx";

        // Stream directly to blob storage
        await using var blobStream = await _blobService.OpenWriteAsync(blobName);
        using var package = new ExcelPackage(blobStream);
        var worksheet = package.Workbook.Worksheets.Add(request.ReportName);

        // Stream data in batches to avoid memory issues
        var rowIndex = 2;
        await foreach (var batch in GetDataBatchesAsync(request))
        {
            foreach (var row in batch)
                WriteRow(worksheet, rowIndex++, row);
        }

        await package.SaveAsync();

        // Generate download URL with SAS token (24-hour expiry)
        var downloadUrl = _blobService.GenerateSasUrl(blobName, TimeSpan.FromHours(24));

        // Notify user via SignalR
        await _hubContext.Clients
            .User(request.UserId.ToString())
            .SendAsync("ReportReady", new
            {
                JobId = request.JobId,
                ReportName = request.ReportName,
                DownloadUrl = downloadUrl
            });
    }
}

Webhook Delivery with Retry

Reliable webhook delivery with exponential backoff and HMAC signature verification.

AttemptDelayCumulative Time
1Immediate0
230 seconds30 seconds
32 minutes2.5 minutes
45 minutes7.5 minutes
515 minutes22.5 minutes
61 hour1 hour 22.5 minutes
// Webhook Delivery Service
public class WebhookDeliveryService
{
    private readonly int[] _retryDelaysSeconds = { 30, 120, 300, 900, 3600 };

    [Queue("high")]
    public async Task DeliverWebhookAsync(WebhookDelivery delivery)
    {
        var webhook = await _context.Webhooks.FindAsync(delivery.WebhookId);

        // Build request with HMAC signature
        var payload = JsonSerializer.Serialize(delivery.Payload);
        var signature = ComputeHmacSignature(payload, webhook.Secret);

        var request = new HttpRequestMessage(HttpMethod.Post, webhook.Url)
        {
            Content = new StringContent(payload, Encoding.UTF8, "application/json")
        };
        request.Headers.Add("X-ProKure-Signature", signature);
        request.Headers.Add("X-ProKure-Delivery-Id", delivery.Id.ToString());
        request.Headers.Add("X-ProKure-Event", delivery.EventType);

        try
        {
            var response = await _httpClient.SendAsync(request);
            if (response.IsSuccessStatusCode)
            {
                delivery.Status = DeliveryStatus.Delivered;
                delivery.DeliveredAt = DateTime.UtcNow;
            }
            else
                throw new WebhookDeliveryException($"HTTP {response.StatusCode}");
        }
        catch (Exception ex)
        {
            delivery.AttemptCount++;

            if (delivery.AttemptCount < _retryDelaysSeconds.Length)
            {
                // Schedule retry with exponential backoff
                BackgroundJob.Schedule<WebhookDeliveryService>(
                    x => x.DeliverWebhookAsync(delivery),
                    TimeSpan.FromSeconds(_retryDelaysSeconds[delivery.AttemptCount]));
            }
            else
            {
                // Max retries exceeded - move to dead letter
                delivery.Status = DeliveryStatus.Failed;
                await _notificationService.NotifyWebhookFailureAsync(delivery);
            }
        }

        await _context.SaveChangesAsync();
    }

    private string ComputeHmacSignature(string payload, string secret)
    {
        using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(secret));
        var hash = hmac.ComputeHash(Encoding.UTF8.GetBytes(payload));
        return $"sha256={Convert.ToHexString(hash).ToLowerInvariant()}";
    }
}

Scheduled Job Configuration

Recurring jobs for maintenance, cleanup, and scheduled operations using Hangfire cron expressions.

JobSchedulePurposeQueue
Data Archival2 AM dailyArchive records > 2 years oldlow
Database Maintenance3 AM SundayVACUUM, REINDEX operationslow
Auction Expiry CheckEvery hourClose expired auctionshigh
Contract Expiry Alert9 AM daily (IST)Send expiry remindershigh
Approval ReminderEvery 5 minutesRemind pending approversdefault
Temp File Cleanup4 AM dailyDelete expired temp fileslow
Session CleanupEvery 15 minutesRemove expired sessionsdefault
Materialized View RefreshEvery 2 hoursRefresh analytics viewsbatch
// Scheduled Jobs Configuration
public static class HangfireJobScheduler
{
    public static void ConfigureRecurringJobs()
    {
        // Daily maintenance - off-peak hours
        RecurringJob.AddOrUpdate<DataArchivalJob>(
            "data-archival",
            x => x.ArchiveOldDataAsync(CancellationToken.None),
            "0 2 * * *",  // 2 AM daily UTC
            new RecurringJobOptions { TimeZone = TimeZoneInfo.Utc });

        // Weekly database maintenance
        RecurringJob.AddOrUpdate<DatabaseMaintenanceJob>(
            "db-maintenance",
            x => x.RunMaintenanceAsync(CancellationToken.None),
            "0 3 * * 0",  // 3 AM every Sunday
            new RecurringJobOptions { TimeZone = TimeZoneInfo.Utc });

        // Contract expiry alerts - business hours IST
        RecurringJob.AddOrUpdate<ContractExpiryJob>(
            "contract-expiry-alert",
            x => x.SendExpiryAlertsAsync(CancellationToken.None),
            "0 9 * * *",  // 9 AM daily IST
            new RecurringJobOptions {
                TimeZone = TimeZoneInfo.FindSystemTimeZoneById("India Standard Time")
            });

        // Materialized view refresh
        RecurringJob.AddOrUpdate<MaterializedViewRefreshJob>(
            "mv-refresh",
            x => x.RefreshViewsAsync(CancellationToken.None),
            "0 */2 * * *",  // Every 2 hours
            new RecurringJobOptions { TimeZone = TimeZoneInfo.Utc });
    }
}

Job Monitoring

Hangfire Dashboard provides real-time monitoring of all jobs. Configure alerts for failed jobs and long-running operations through Application Insights integration.

Business Rules Summary

Rule IDCategoryDescription
BR-ASYNC-001Batch ProcessingBulk imports MUST process in chunks of 1000 rows max
BR-ASYNC-002Email BatchingNon-critical emails MUST be aggregated for 5 minutes
BR-ASYNC-003Report GenerationReports MUST stream to blob storage, not memory
BR-ASYNC-004Webhook DeliveryWebhooks MUST retry 5 times before dead-lettering
BR-ASYNC-005Queue PriorityReal-time auctions MUST use critical queue
BR-ASYNC-006Job MonitoringFailed jobs MUST trigger alerts via Application Insights