OElite.Restme.RabbitMQ 2.1.0-develop.449

OElite.Restme.RabbitMQ

NuGet Version Target Framework

RabbitMQ integration package for the Restme framework, providing enterprise-grade message queuing and asynchronous communication capabilities.

Overview

OElite.Restme.RabbitMQ provides robust RabbitMQ integration for the OElite platform, enabling reliable message queuing, publish-subscribe patterns, and distributed communication. Built on the official RabbitMQ .NET client, it offers enterprise-grade reliability with automatic reconnection, message persistence, and flexible routing capabilities.

Features

  • Reliable Message Queuing: Enterprise-grade message queuing with delivery guarantees
  • Publish-Subscribe Pattern: Support for topic-based messaging and event broadcasting
  • Automatic Serialization: JSON serialization/deserialization for complex message types
  • Connection Management: Robust connection handling with automatic reconnection
  • Virtual Host Support: Multi-tenancy through virtual host isolation
  • Flexible Routing: Exchange-based routing with custom routing keys
  • Error Handling: Comprehensive error handling with dead letter queues
  • Async/Await Support: Full asynchronous operations for optimal performance
  • Message Persistence: Durable queues and persistent messages for reliability

Installation

dotnet add package OElite.Restme.RabbitMQ

Quick Start

Basic Configuration

using OElite.Providers;

// Basic RabbitMQ connection
var connectionString = "localhost:5672";
var config = new RestConfig();
var queueProvider = new RabbitMQProvider(connectionString, config);

// With authentication
var authConnection = "amqp://username:password@localhost:5672";
var authProvider = new RabbitMQProvider(authConnection, config);

// With virtual host
var vhostConnection = "amqp://user:pass@host:5672|vhost=production";
var vhostProvider = new RabbitMQProvider(vhostConnection, config);

Basic Queue Operations

// Send message to queue
await queueProvider.SendAsync("order.processing", orderData);

// Receive message from queue
var message = await queueProvider.ReceiveAsync<OrderData>("order.processing");

// Send with routing key
await queueProvider.SendAsync("notifications", emailData, "email.urgent");

// Subscribe to queue with message handler
await queueProvider.SubscribeAsync<NotificationData>("notifications", HandleNotification);

private async Task HandleNotification(NotificationData notification)
{
    // Process notification
    await ProcessNotificationAsync(notification);
}

Core Features

Message Queuing

Reliable point-to-point messaging:

// Define message types
public class OrderProcessingMessage
{
    public string OrderId { get; set; }
    public string CustomerId { get; set; }
    public decimal Amount { get; set; }
    public DateTime OrderDate { get; set; }
}

// Send order processing message
var message = new OrderProcessingMessage
{
    OrderId = "ORD-123",
    CustomerId = "CUST-456",
    Amount = 99.99m,
    OrderDate = DateTime.UtcNow
};

await queueProvider.SendAsync("order.processing", message);

// Process orders
await queueProvider.SubscribeAsync<OrderProcessingMessage>("order.processing", async (order) =>
{
    await ProcessOrderAsync(order);
});

Publish-Subscribe Pattern

Event-driven messaging:

// Publisher: Send events to multiple subscribers
public class EventPublisher
{
    private readonly IQueueProvider _queue;

    public EventPublisher(IQueueProvider queue)
    {
        _queue = queue;
    }

    public async Task PublishUserRegisteredAsync(UserRegisteredEvent evt)
    {
        await _queue.SendAsync("user.events", evt, "user.registered");
    }

    public async Task PublishOrderPlacedAsync(OrderPlacedEvent evt)
    {
        await _queue.SendAsync("order.events", evt, "order.placed");
    }
}

// Subscribers: Handle specific events
public class EmailNotificationService
{
    public async Task StartAsync(IQueueProvider queue)
    {
        await queue.SubscribeAsync<UserRegisteredEvent>("user.events.email",
            HandleUserRegistered, "user.registered");
    }

    private async Task HandleUserRegistered(UserRegisteredEvent evt)
    {
        await SendWelcomeEmailAsync(evt.UserId, evt.Email);
    }
}

public class AnalyticsService
{
    public async Task StartAsync(IQueueProvider queue)
    {
        await queue.SubscribeAsync<OrderPlacedEvent>("order.events.analytics",
            HandleOrderPlaced, "order.placed");
    }

    private async Task HandleOrderPlaced(OrderPlacedEvent evt)
    {
        await RecordOrderMetricsAsync(evt);
    }
}

Request-Response Pattern

Synchronous-style communication over async messaging:

// Request processing service
public class OrderService
{
    private readonly IQueueProvider _queue;

    public OrderService(IQueueProvider queue)
    {
        _queue = queue;
    }

    public async Task StartRequestHandlerAsync()
    {
        await _queue.SubscribeAsync<OrderValidationRequest>("order.validation.requests",
            HandleValidationRequest);
    }

    private async Task HandleValidationRequest(OrderValidationRequest request)
    {
        var response = new OrderValidationResponse
        {
            RequestId = request.RequestId,
            IsValid = await ValidateOrderAsync(request.Order),
            ValidationErrors = GetValidationErrors(request.Order)
        };

        await _queue.SendAsync("order.validation.responses", response, request.RequestId);
    }
}

// Client making requests
public class OrderValidator
{
    private readonly IQueueProvider _queue;
    private readonly Dictionary<string, TaskCompletionSource<OrderValidationResponse>> _pendingRequests;

    public OrderValidator(IQueueProvider queue)
    {
        _queue = queue;
        _pendingRequests = new Dictionary<string, TaskCompletionSource<OrderValidationResponse>>();

        // Start listening for responses
        StartResponseListenerAsync();
    }

    public async Task<OrderValidationResponse> ValidateOrderAsync(Order order)
    {
        var requestId = Guid.NewGuid().ToString();
        var request = new OrderValidationRequest
        {
            RequestId = requestId,
            Order = order
        };

        var tcs = new TaskCompletionSource<OrderValidationResponse>();
        _pendingRequests[requestId] = tcs;

        await _queue.SendAsync("order.validation.requests", request);

        // Wait for response with timeout
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
        cts.Token.Register(() => tcs.TrySetCanceled());

        return await tcs.Task;
    }

    private async Task StartResponseListenerAsync()
    {
        await _queue.SubscribeAsync<OrderValidationResponse>("order.validation.responses",
            HandleValidationResponse);
    }

    private Task HandleValidationResponse(OrderValidationResponse response)
    {
        if (_pendingRequests.TryGetValue(response.RequestId, out var tcs))
        {
            _pendingRequests.Remove(response.RequestId);
            tcs.SetResult(response);
        }
        return Task.CompletedTask;
    }
}

Advanced Configuration

Connection String Options

// Basic format
"localhost:5672"
"rabbitmq.example.com:5672"

// With AMQP URI
"amqp://guest:guest@localhost:5672"
"amqps://user:pass@secure-rabbit.com:5671"

// With virtual host
"amqp://user:pass@host:5672|vhost=production"
"localhost:5672|vhost=development"

// SSL connection
"amqps://user:pass@secure-rabbit.com:5671|vhost=prod"

Queue Configuration

// Configure queue properties
public class QueueConfig
{
    public bool Durable { get; set; } = true;          // Survive server restart
    public bool Exclusive { get; set; } = false;       // Queue exclusive to connection
    public bool AutoDelete { get; set; } = false;      // Delete when unused
    public Dictionary<string, object> Arguments { get; set; } = new();
}

// Create queue with configuration
var config = new QueueConfig
{
    Durable = true,
    Arguments = new Dictionary<string, object>
    {
        ["x-message-ttl"] = 3600000,  // 1 hour TTL
        ["x-max-length"] = 10000,     // Maximum queue length
        ["x-dead-letter-exchange"] = "dlx.exchange"
    }
};

Exchange Configuration

// Configure exchanges for routing
public class ExchangeConfig
{
    public string Name { get; set; }
    public string Type { get; set; } = "topic";  // direct, fanout, topic, headers
    public bool Durable { get; set; } = true;
    public bool AutoDelete { get; set; } = false;
    public Dictionary<string, object> Arguments { get; set; } = new();
}

// Topic exchange for flexible routing
var exchangeConfig = new ExchangeConfig
{
    Name = "application.events",
    Type = "topic",
    Durable = true
};

Integration Patterns

Dependency Injection

// In Startup.cs or Program.cs
services.AddSingleton<IQueueProvider>(provider =>
{
    var connectionString = configuration.GetConnectionString("RabbitMQ");
    var config = new RestConfig();
    return new RabbitMQProvider(connectionString, config);
});

// Background service for message processing
public class MessageProcessorService : BackgroundService
{
    private readonly IQueueProvider _queue;
    private readonly ILogger<MessageProcessorService> _logger;

    public MessageProcessorService(IQueueProvider queue, ILogger<MessageProcessorService> logger)
    {
        _queue = queue;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await _queue.SubscribeAsync<OrderMessage>("order.processing", ProcessOrder);

        // Keep service running
        await Task.Delay(Timeout.Infinite, stoppingToken);
    }

    private async Task ProcessOrder(OrderMessage message)
    {
        try
        {
            await ProcessOrderAsync(message);
            _logger.LogInformation("Processed order {OrderId}", message.OrderId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to process order {OrderId}", message.OrderId);
            throw;
        }
    }
}

Event-Driven Architecture

// Domain events
public class DomainEventPublisher
{
    private readonly IQueueProvider _queue;

    public DomainEventPublisher(IQueueProvider queue)
    {
        _queue = queue;
    }

    public async Task PublishAsync<T>(T domainEvent, string routingKey = "") where T : IDomainEvent
    {
        await _queue.SendAsync("domain.events", domainEvent, routingKey);
    }
}

// Event handlers
public class UserEventHandler
{
    private readonly IQueueProvider _queue;
    private readonly IUserService _userService;

    public UserEventHandler(IQueueProvider queue, IUserService userService)
    {
        _queue = queue;
        _userService = userService;
    }

    public async Task StartAsync()
    {
        await _queue.SubscribeAsync<UserCreatedEvent>("user.events.handler",
            HandleUserCreated, "user.created");

        await _queue.SubscribeAsync<UserUpdatedEvent>("user.events.handler",
            HandleUserUpdated, "user.updated");
    }

    private async Task HandleUserCreated(UserCreatedEvent evt)
    {
        await _userService.InitializeUserProfileAsync(evt.UserId);
        await _userService.SendWelcomeEmailAsync(evt.UserId);
    }

    private async Task HandleUserUpdated(UserUpdatedEvent evt)
    {
        await _userService.UpdateUserCacheAsync(evt.UserId);
    }
}

Microservices Communication

// Service-to-service messaging
public class PaymentService
{
    private readonly IQueueProvider _queue;

    public PaymentService(IQueueProvider queue)
    {
        _queue = queue;
    }

    public async Task ProcessPaymentAsync(PaymentRequest request)
    {
        try
        {
            // Process payment
            var result = await ProcessPaymentInternalAsync(request);

            if (result.Success)
            {
                await _queue.SendAsync("payment.events", new PaymentCompletedEvent
                {
                    PaymentId = request.PaymentId,
                    OrderId = request.OrderId,
                    Amount = request.Amount,
                    ProcessedAt = DateTime.UtcNow
                }, "payment.completed");
            }
            else
            {
                await _queue.SendAsync("payment.events", new PaymentFailedEvent
                {
                    PaymentId = request.PaymentId,
                    OrderId = request.OrderId,
                    Reason = result.FailureReason,
                    FailedAt = DateTime.UtcNow
                }, "payment.failed");
            }
        }
        catch (Exception ex)
        {
            await _queue.SendAsync("payment.events", new PaymentErrorEvent
            {
                PaymentId = request.PaymentId,
                ErrorMessage = ex.Message,
                ErrorAt = DateTime.UtcNow
            }, "payment.error");

            throw;
        }
    }
}

// Order service listening for payment events
public class OrderService
{
    public async Task StartAsync(IQueueProvider queue)
    {
        await queue.SubscribeAsync<PaymentCompletedEvent>("order.payment.events",
            HandlePaymentCompleted, "payment.completed");

        await queue.SubscribeAsync<PaymentFailedEvent>("order.payment.events",
            HandlePaymentFailed, "payment.failed");
    }

    private async Task HandlePaymentCompleted(PaymentCompletedEvent evt)
    {
        await UpdateOrderStatusAsync(evt.OrderId, OrderStatus.Paid);
        await TriggerOrderFulfillmentAsync(evt.OrderId);
    }

    private async Task HandlePaymentFailed(PaymentFailedEvent evt)
    {
        await UpdateOrderStatusAsync(evt.OrderId, OrderStatus.PaymentFailed);
        await NotifyCustomerAsync(evt.OrderId, evt.Reason);
    }
}

Error Handling and Reliability

Dead Letter Queues

// Configure dead letter exchange
var queueConfig = new QueueConfig
{
    Arguments = new Dictionary<string, object>
    {
        ["x-dead-letter-exchange"] = "dlx.failed.messages",
        ["x-dead-letter-routing-key"] = "failed",
        ["x-message-ttl"] = 300000  // 5 minutes before DLQ
    }
};

// Handle failed messages
await queueProvider.SubscribeAsync<FailedMessage>("dlx.failed.messages", async (failedMsg) =>
{
    _logger.LogError("Message failed processing: {Message}", failedMsg);
    await StoreFailedMessageAsync(failedMsg);
    await AlertOperationsTeamAsync(failedMsg);
});

Retry Logic

public class ReliableMessageHandler
{
    private readonly IQueueProvider _queue;
    private readonly ILogger _logger;

    public async Task HandleMessageWithRetryAsync<T>(T message, Func<T, Task> handler, int maxRetries = 3)
    {
        for (int attempt = 1; attempt <= maxRetries; attempt++)
        {
            try
            {
                await handler(message);
                return; // Success
            }
            catch (Exception ex) when (attempt < maxRetries)
            {
                _logger.LogWarning(ex, "Message processing failed, attempt {Attempt}/{MaxRetries}",
                    attempt, maxRetries);

                var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt)); // Exponential backoff
                await Task.Delay(delay);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Message processing failed after {MaxRetries} attempts", maxRetries);
                throw;
            }
        }
    }
}

Performance Considerations

Connection Pooling

// Single provider instance for connection reuse
var provider = new RabbitMQProvider(connectionString, config);

// All operations use the same connection
await provider.SendAsync("queue1", message1);
await provider.SendAsync("queue2", message2);
await provider.ReceiveAsync<Message>("queue3");

Batch Processing

// Process messages in batches for efficiency
public async Task ProcessMessagesBatchAsync(string queueName, int batchSize = 10)
{
    var messages = new List<ProcessingMessage>();

    await _queue.SubscribeAsync<ProcessingMessage>(queueName, async (message) =>
    {
        messages.Add(message);

        if (messages.Count >= batchSize)
        {
            await ProcessBatchAsync(messages.ToList());
            messages.Clear();
        }
    });

    // Process remaining messages periodically
    using var timer = new Timer(async _ =>
    {
        if (messages.Count > 0)
        {
            await ProcessBatchAsync(messages.ToList());
            messages.Clear();
        }
    }, null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
}

Requirements

  • .NET 8.0, 9.0, or 10.0
  • RabbitMQ.Client 7.1.2+
  • OElite.Restme (dependency for base abstractions)
  • RabbitMQ Server (3.8+ recommended)

Thread Safety

RabbitMQProvider is thread-safe for most operations:

  • Connection and channel management is thread-safe
  • Send operations are thread-safe
  • Subscribe operations should be set up once per queue
  • Can be used as a singleton in DI containers

License

Copyright © OElite Limited. All rights reserved.

Showing the top 20 packages that depend on OElite.Restme.RabbitMQ.

Packages Downloads
OElite.Services.Kortex
Package Description
7
OElite.Services
Package Description
4
OElite.Services.Kortex
Package Description
4
OElite.Services.Platform
Package Description
4

.NET 8.0

.NET 9.0

.NET 10.0

Version Downloads Last updated
2.1.1-develop.508 13 11/22/2025
2.1.0-develop.449 3 11/11/2025
2.1.0-develop.445 4 11/11/2025
2.0.9-develop.442 37 11/10/2025
2.0.9-develop.434 7 11/06/2025
2.0.9-develop.433 3 11/06/2025
2.0.9-develop.432 3 11/06/2025
2.0.9-develop.430 4 11/04/2025
2.0.9-develop.420 7 10/30/2025
2.0.9-develop.419 5 10/30/2025
2.0.9-develop.418 5 10/30/2025
2.0.9-develop.414 14 10/28/2025
2.0.9-develop.413 3 10/28/2025
2.0.9-develop.398 40 10/26/2025
2.0.9-develop.393 14 10/25/2025
2.0.9-develop.390 11 10/25/2025
2.0.9-develop.388 7 10/25/2025
2.0.9-develop.386 9 10/25/2025
2.0.9-develop.257 137 10/20/2025
2.0.9-develop.256 5 10/20/2025
2.0.9-develop.255 5 10/20/2025
2.0.9-develop.254 5 10/20/2025
2.0.9-develop.253 4 10/20/2025
2.0.9-develop.252 9 10/19/2025
2.0.9-develop.251 5 10/19/2025
2.0.9-develop.250 5 10/19/2025
2.0.9-develop.249 4 10/19/2025
2.0.9-develop.248 5 10/19/2025
2.0.9-develop.247 5 10/19/2025
2.0.9-develop.246 4 10/19/2025
2.0.9-develop.245 5 10/19/2025
2.0.9-develop.243 4 10/19/2025
2.0.9-develop.242 8 10/19/2025
2.0.9-develop.241 5 10/19/2025
2.0.9-develop.233 23 10/17/2025
2.0.9-develop.232 5 10/17/2025
2.0.9-develop.230 5 10/17/2025
2.0.9-develop.229 5 10/17/2025
2.0.9-develop.228 5 10/17/2025
2.0.9-develop.223 11 10/16/2025
2.0.9-develop.221 15 10/16/2025
2.0.9-develop.220 4 10/16/2025
2.0.9-develop.214 18 10/15/2025
2.0.9-develop.213 5 10/15/2025
2.0.9-develop.208 20 10/14/2025
2.0.9-develop.207 10 10/14/2025
2.0.9-develop.204 15 10/13/2025
2.0.9-develop.192 9 10/13/2025
2.0.9-develop.191 9 10/13/2025
2.0.9-develop.190 35 10/12/2025
2.0.9-develop.143 11 10/12/2025
2.0.9-develop.142 6 10/10/2025
2.0.9-develop.135 4 10/09/2025
2.0.9-develop.90 5 10/05/2025
2.0.9-develop.79 6 10/02/2025
2.0.9-develop.74 4 10/01/2025
2.0.9-develop.73 6 10/01/2025
2.0.9-develop.71 5 09/26/2025
2.0.9-develop.70 5 09/22/2025
2.0.9-develop.69 5 09/22/2025
2.0.9-develop.68 4 09/22/2025
2.0.9-develop.67 5 09/22/2025
2.0.9-develop.66 5 09/21/2025
2.0.9-develop.65 5 09/21/2025
2.0.9-develop.64 5 09/21/2025
2.0.9-develop.50 4 09/16/2025
2.0.9-develop.49 4 09/16/2025
2.0.9-develop.48 4 09/15/2025
2.0.8-develop.31 4 09/15/2025
2.0.7-develop.30 4 09/15/2025
2.0.6-develop.29 4 09/15/2025
2.0.6-develop.27 4 09/15/2025
2.0.5-develop.26 4 09/15/2025
2.0.5-develop.25 4 09/15/2025