Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 115 additions & 6 deletions docs/provider_rabbitmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Routing Keys and Wildcard Support](#routing-keys-and-wildcard-support)
- [Basic Routing Keys](#basic-routing-keys)
- [Wildcard Routing Keys](#wildcard-routing-keys)
- [Unrecognized Routing Key Handler](#unrecognized-routing-key-handler)
- [Acknowledgment Mode](#acknowledgment-mode)
- [Consumer Error Handling](#consumer-error-handling)
- [Dead Letter Exchange](#dead-letter-exchange)
Expand All @@ -20,6 +21,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Default Exchange](#default-exchange)
- [Why it exists](#why-it-exists)
- [Connection Resiliency](#connection-resiliency)
- [Consumer Recovery](#consumer-recovery)
- [Recipes](#recipes)
- [01 Multiple consumers on the same queue with different concurrency](#01-multiple-consumers-on-the-same-queue-with-different-concurrency)
- [Feedback](#feedback)
Expand Down Expand Up @@ -212,18 +214,124 @@ services.AddSlimMessageBus(mbb =>

**Routing Key Pattern Examples:**

| Pattern | Matches | Doesn't Match |
|---------|---------|---------------|
| `regions.na.cities.*` | `regions.na.cities.toronto`<br/>`regions.na.cities.newyork` | `regions.na.cities` (missing segment)<br/>`regions.na.cities.toronto.downtown` (extra segment) |
| `audit.events.#` | `audit.events.users.signup`<br/>`audit.events.orders.placed`<br/>`audit.events` | `audit.users` (wrong prefix) |
| `orders.#.region.*` | `orders.processed.region.na`<br/>`orders.created.cancelled.region.eu`<br/>`orders.region.na` | `orders.processed.state.california` (wrong pattern)<br/>`orders.processed.region` (missing final segment) |
| `#` | Any routing key | None (matches everything) |
| Pattern | Matches | Doesn't Match |
| --------------------- | -------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------- |
| `regions.na.cities.*` | `regions.na.cities.toronto`<br/>`regions.na.cities.newyork` | `regions.na.cities` (missing segment)<br/>`regions.na.cities.toronto.downtown` (extra segment) |
| `audit.events.#` | `audit.events.users.signup`<br/>`audit.events.orders.placed`<br/>`audit.events` | `audit.users` (wrong prefix) |
| `orders.#.region.*` | `orders.processed.region.na`<br/>`orders.created.cancelled.region.eu`<br/>`orders.region.na` | `orders.processed.state.california` (wrong pattern)<br/>`orders.processed.region` (missing final segment) |
| `#` | Any routing key | None (matches everything) |

**Performance Note:** SlimMessageBus optimizes routing key matching by:

- Using exact matches first for better performance
- Only applying wildcard pattern matching when no exact match is found
- Caching routing key patterns for efficient lookup

##### Unrecognized Routing Key Handler

When a message arrives on a queue with a routing key that doesn't match any of the configured consumer routing key patterns, the `MessageUnrecognizedRoutingKeyHandler` is invoked to determine how the message should be handled.

**Default Behavior:**

By default, unrecognized messages are **acknowledged (Ack)** and removed from the queue:

```cs
// Default behavior (built-in)
settings.MessageUnrecognizedRoutingKeyHandler = (transportMessage) => RabbitMqMessageConfirmOptions.Ack;
```

This default is appropriate when:

- Routing key mismatches are expected and acceptable
- You want to silently discard messages that don't match any consumer
- Your application follows a "fail-open" approach for unknown messages

**Customizing the Handler:**

You can customize this behavior at the bus level to handle unrecognized messages differently:

```cs
services.AddSlimMessageBus(mbb =>
{
mbb.WithProviderRabbitMQ(cfg =>
{
// Option 1: Reject unrecognized messages without requeue (send to DLX if configured)
cfg.MessageUnrecognizedRoutingKeyHandler = (transportMessage) =>
RabbitMqMessageConfirmOptions.Nack;

// Option 2: Reject and requeue for later processing
cfg.MessageUnrecognizedRoutingKeyHandler = (transportMessage) =>
RabbitMqMessageConfirmOptions.Nack | RabbitMqMessageConfirmOptions.Requeue;

// Option 3: Custom logic based on routing key or message content
cfg.MessageUnrecognizedRoutingKeyHandler = (transportMessage) =>
{
var routingKey = transportMessage.RoutingKey;

// Log for monitoring and alerting
logger.LogWarning("Unrecognized routing key: {RoutingKey} on exchange: {Exchange}",
routingKey, transportMessage.Exchange);

// Route to DLX if it looks like a potential typo or misconfiguration
if (routingKey.StartsWith("orders."))
{
return RabbitMqMessageConfirmOptions.Nack; // Send to DLX for investigation
}

// Ack and discard messages from other exchanges
return RabbitMqMessageConfirmOptions.Ack;
};
});
});
```

**Available Options:**

- **`Ack` (default)**: Acknowledge and remove the message from the queue - use when unrecognized messages should be silently discarded
- **`Nack`**: Reject the message and route to Dead Letter Exchange (if configured) - use for debugging routing issues or when messages shouldn't be lost
- **`Nack | Requeue`**: Reject and requeue the message for retry - use when routing keys might be registered dynamically or during rolling deployments

**Example with Dead Letter Exchange:**

```cs
services.AddSlimMessageBus(mbb =>
{
mbb.WithProviderRabbitMQ(cfg =>
{
// Send unrecognized messages to DLX for investigation
cfg.MessageUnrecognizedRoutingKeyHandler = (transportMessage) =>
RabbitMqMessageConfirmOptions.Nack;
});

mbb.Consume<OrderEvent>(x => x
.Queue("orders-queue")
.ExchangeBinding("orders", routingKey: "orders.created")
// Unrecognized messages will be routed to this DLX
.DeadLetterExchange("orders-dlq", exchangeType: ExchangeType.Direct)
.WithConsumer<OrderCreatedConsumer>());
});
```

**Handler Parameters:**

The handler receives `BasicDeliverEventArgs` which provides access to:

- `RoutingKey`: The routing key of the unrecognized message
- `Exchange`: The exchange the message was published to
- `Body`: The message payload (ReadOnlyMemory<byte>)
- `BasicProperties`: Message properties including headers, MessageId, ContentType, etc.

This allows for sophisticated routing decisions based on message metadata.

**Common Use Cases:**

1. **Development/Debugging**: Use `Nack` to route unrecognized messages to a DLX where they can be inspected
2. **Production Monitoring**: Log unrecognized routing keys and send metrics to your monitoring system
3. **Graceful Degradation**: Use `Ack` in production to prevent queue buildup from deprecated message types
4. **Rolling Deployments**: Use `Nack | Requeue` to temporarily requeue messages during deployments when consumers might not yet be ready

**Note:** This handler only applies to messages where the routing key doesn't match any configured consumer patterns. Messages that match a routing key pattern but fail during processing are handled by the [Consumer Error Handling](#consumer-error-handling) mechanism instead.

#### Acknowledgment Mode

When a consumer processes a message from a RabbitMQ queue, it needs to acknowledge that the message was processed. RabbitMQ supports three types of acknowledgments out which two are available in SMB:
Expand Down Expand Up @@ -487,6 +595,7 @@ When a RabbitMQ server restarts or the connection is lost, SlimMessageBus automa
5. Resumes message processing automatically

This ensures that:

- Messages don't pile up in queues during temporary outages
- Consumers are visible in the RabbitMQ management UI after recovery
- No manual intervention is required to restore message processing
Expand Down
10 changes: 10 additions & 0 deletions src/SlimMessageBus.Host.RabbitMQ/Config/Delegates.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,13 @@
/// </summary>
/// <param name="option"></param>
public delegate void RabbitMqMessageConfirmAction(RabbitMqMessageConfirmOptions option);

/// <summary>
/// Represents the method that handles a RabbitMQ message that includes an routing key that is obsolete or non relevent from the applicatin perspective
/// Provides access to the message, its properties, and a confirmation action.
/// </summary>
/// <remarks>Use this delegate to process messages received from RabbitMQ queues where the routing key is not
/// relevant or not provided. The handler is responsible for invoking the confirmation action to ensure proper message
/// acknowledgment.</remarks>
/// <param name="transportMessage">The event arguments containing the delivered RabbitMQ message and related metadata.</param>
public delegate RabbitMqMessageConfirmOptions RabbitMqMessageUnrecognizedRoutingKeyHandler(BasicDeliverEventArgs transportMessage);
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ public class RabbitMqConsumer : AbstractRabbitMqConsumer, IRabbitMqConsumer
private readonly IMessageProcessor<BasicDeliverEventArgs> _messageProcessor;
private readonly RoutingKeyMatcherService<IMessageProcessor<BasicDeliverEventArgs>> _routingKeyMatcher;

private readonly RabbitMqMessageUnrecognizedRoutingKeyHandler _messageUnrecognizedRoutingKeyHandler;

protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => _acknowledgementMode;

public RabbitMqConsumer(
ILoggerFactory loggerFactory,
IRabbitMqChannel channel,
string queueName,
IList<ConsumerSettings> consumers,
MessageBusBase messageBus,
MessageBusBase<RabbitMqMessageBusSettings> messageBus,
MessageProvider<BasicDeliverEventArgs> messageProvider,
IHeaderValueConverter headerValueConverter)
: base(loggerFactory.CreateLogger<RabbitMqConsumer>(),
Expand All @@ -33,6 +35,7 @@ public RabbitMqConsumer(
queueName,
headerValueConverter)
{
_messageUnrecognizedRoutingKeyHandler = messageBus.ProviderSettings.MessageUnrecognizedRoutingKeyHandler;
_acknowledgementMode = consumers.Select(x => x.GetOrDefault(RabbitMqProperties.MessageAcknowledgementMode, messageBus.Settings)).FirstOrDefault(x => x != null)
?? RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; // be default choose the safer acknowledgement mode

Expand Down Expand Up @@ -102,7 +105,7 @@ protected override async Task OnStop()
await base.OnStop();
}

private void InitializeConsumerContext(BasicDeliverEventArgs transportMessage, ConsumerContext consumerContext)
internal void InitializeConsumerContext(BasicDeliverEventArgs transportMessage, ConsumerContext consumerContext)
{
if (_acknowledgementMode == RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit)
{
Expand Down Expand Up @@ -164,6 +167,8 @@ protected override async Task<Exception> OnMessageReceived(Dictionary<string, ob
else
{
Logger.LogDebug("Exchange {Exchange} - Queue {Queue}: No message processor found for routing key {RoutingKey}", transportMessage.Exchange, Path, transportMessage.RoutingKey);
var confirmAction = _messageUnrecognizedRoutingKeyHandler(transportMessage);
ConfirmMessage(transportMessage, confirmAction, messageHeaders);
}

// error handling happens in the message processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,11 @@ public string ConnectionString
/// See the <see cref="DefaultHeaderValueConverter"/>.
/// </summary>
public IHeaderValueConverter HeaderValueConverter { get; set; } = new DefaultHeaderValueConverter();

/// <summary>
/// Allows to handle messages that arrive with an unrecognized routing key and decide what to do with them.
/// By default the message is Acknowledged.
/// </summary>
public RabbitMqMessageUnrecognizedRoutingKeyHandler MessageUnrecognizedRoutingKeyHandler { get; set; } = (_) => RabbitMqMessageConfirmOptions.Ack;
}

Loading
Loading