Handling Notifications with MediatR and Implementing a Dead Letter Queue (DLQ)
Overview
This guide covers how to handle notifications using MediatR, implementing a Dead Letter Queue (DLQ) using PostgreSQL, and ensuring robust notification processing with retry mechanisms.
1. Define MediatR Notifications and Handlers
First, define the MediatR notifications and their handlers.
AuditCreatedNotification.cs
public class AuditCreatedNotification : INotification
{
public Guid AuditId { get; set; }
}
public class AuditCreatedNotificationHandler : INotificationHandler<AuditCreatedNotification>
{
private readonly ILogger<AuditCreatedNotificationHandler> _logger;
private readonly NotificationService _notificationService;
private const int MaxRetryCount = 3;
public AuditCreatedNotificationHandler(ILogger<AuditCreatedNotificationHandler> logger, NotificationService notificationService)
{
_logger = logger;
_notificationService = notificationService;
}
public async Task Handle(AuditCreatedNotification notification, CancellationToken cancellationToken)
{
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(MaxRetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (exception, timeSpan, retryCount, context) =>
{
_logger.LogWarning($"Retry {retryCount} encountered an error: {exception.Message}. Waiting {timeSpan} before next retry.");
});
try
{
await retryPolicy.ExecuteAsync(async () =>
{
// Process the notification
if (new Random().Next(0, 2) == 0) // Simulating a failure
{
throw new Exception("Simulated processing failure.");
}
_logger.LogInformation($"Successfully processed audit: {notification.AuditId}");
});
}
catch (Exception ex)
{
_logger.LogError($"Failed to process audit: {notification.AuditId} after retries. Moving to DLQ. Error: {ex.Message}");
// Serialize and store in DLQ
await MoveToDeadLetterQueueAsync(notification, ex.Message);
}
}
private async Task MoveToDeadLetterQueueAsync(AuditCreatedNotification notification, string errorMessage)
{
var type = notification.GetType().FullName;
var content = JsonConvert.SerializeObject(notification);
var command = new NpgsqlCommand(
"INSERT INTO dead_letter_queue (original_notification_id, type, content, error_message) VALUES (@original_notification_id, @type, @content, @error_message)",
_notificationService.GetConnection()
);
command.Parameters.AddWithValue("original_notification_id", notification.AuditId);
command.Parameters.AddWithValue("type", type);
command.Parameters.AddWithValue("content", content);
command.Parameters.AddWithValue("error_message", errorMessage);
await _notificationService.GetConnection().OpenAsync();
await command.ExecuteNonQueryAsync();
await _notificationService.GetConnection().CloseAsync();
}
}
2. Create Tables in PostgreSQL
Create tables for storing notifications and dead-lettered notifications.
messages.sql
CREATE TABLE notifications (
id SERIAL PRIMARY KEY,
type TEXT NOT NULL,
content TEXT NOT NULL,
retry_count INT DEFAULT 0,
status TEXT DEFAULT 'Pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE dead_letter_queue (
id SERIAL PRIMARY KEY,
original_notification_id INT,
type TEXT NOT NULL,
content TEXT NOT NULL,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (original_notification_id) REFERENCES notifications(id)
);
3. Implement Logic for Processing Notifications and Handling Failures
Create a service to serialize and store notifications, and handle retries and moving to the DLQ.
NotificationService.cs
public class NotificationService
{
private readonly NpgsqlConnection _connection;
public NotificationService(string connectionString)
{
_connection = new NpgsqlConnection(connectionString);
}
public async Task StoreNotificationAsync(INotification notification)
{
var type = notification.GetType().FullName;
var content = JsonConvert.SerializeObject(notification);
var command = new NpgsqlCommand("INSERT INTO notifications (type, content) VALUES (@type, @content)", _connection);
command.Parameters.AddWithValue("type", type);
command.Parameters.AddWithValue("content", content);
await _connection.OpenAsync();
await command.ExecuteNonQueryAsync();
await _connection.CloseAsync();
}
public NpgsqlConnection GetConnection() => _connection;
}
DLQProcessorService.cs
public class DLQProcessorService
{
private readonly NpgsqlConnection _connection;
private readonly IMediator _mediator;
public DLQProcessorService(string connectionString, IMediator mediator)
{
_connection = new NpgsqlConnection(connectionString);
_mediator = mediator;
}
public async Task ProcessDeadLetterQueueAsync()
{
var failedNotifications = await GetFailedNotificationsAsync();
foreach (var deadLetter in failedNotifications)
{
try
{
var notificationType = Type.GetType(deadLetter.Type);
var notification = JsonConvert.DeserializeObject(deadLetter.Content, notificationType) as INotification;
await _mediator.Publish(notification);
// Remove from DLQ after successful processing
await RemoveFromDeadLetterQueueAsync(deadLetter.Id);
}
catch (Exception ex)
{
// Log the error and update the retry count or status if needed
Console.WriteLine($"Failed to reprocess notification {deadLetter.Id}: {ex.Message}");
}
}
}
private async Task<IEnumerable<DeadLetterMessage>> GetFailedNotificationsAsync()
{
var deadLetters = new List<DeadLetterMessage>();
var command = new NpgsqlCommand("SELECT * FROM dead_letter_queue", _connection);
await _connection.OpenAsync();
var reader = await command.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
deadLetters.Add(new DeadLetterMessage
{
Id = reader.GetInt32(0),
OriginalNotificationId = reader.GetInt32(1),
Type = reader.GetString(2),
Content = reader.GetString(3),
ErrorMessage = reader.GetString(4),
CreatedAt = reader.GetDateTime(5)
});
}
await _connection.CloseAsync();
return deadLetters;
}
private async Task RemoveFromDeadLetterQueueAsync(int id)
{
var command = new NpgsqlCommand("DELETE FROM dead_letter_queue WHERE id = @id", _connection);
command.Parameters.AddWithValue("id", id);
await _connection.OpenAsync();
await command.ExecuteNonQueryAsync();
await _connection.CloseAsync();
}
}
4. Schedule DLQ Processing
Use a background service in ASP.NET Core to periodically process the DLQ.
Startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddHostedService<DLQBackgroundService>();
services.AddSingleton(new DLQProcessorService(Configuration.GetConnectionString("DefaultConnection"), services.BuildServiceProvider().GetRequiredService<IMediator>()));
}
DLQBackgroundService.cs
public class DLQBackgroundService : BackgroundService
{
private readonly DLQProcessorService _dlqProcessorService;
public DLQBackgroundService(DLQProcessorService dlqProcessorService)
{
_dlqProcessorService = dlqProcessorService;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await _dlqProcessorService.ProcessDeadLetterQueueAsync();
await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken); // Process every 5 minutes
}
}
}
Summary
By following these steps, you ensure that your notifications are robust, with retry mechanisms and a Dead Letter Queue (DLQ) to handle failures effectively.
Let me know if you need further details or have any other questions!