diff --git a/docs/provider_rabbitmq.md b/docs/provider_rabbitmq.md index f60f5fee..16deff14 100644 --- a/docs/provider_rabbitmq.md +++ b/docs/provider_rabbitmq.md @@ -10,6 +10,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat - [Routing Keys and Wildcard Support](#routing-keys-and-wildcard-support) - [Basic Routing Keys](#basic-routing-keys) - [Wildcard Routing Keys](#wildcard-routing-keys) + - [Unrecognized Routing Key Handler](#unrecognized-routing-key-handler) - [Acknowledgment Mode](#acknowledgment-mode) - [Consumer Error Handling](#consumer-error-handling) - [Dead Letter Exchange](#dead-letter-exchange) @@ -20,6 +21,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat - [Default Exchange](#default-exchange) - [Why it exists](#why-it-exists) - [Connection Resiliency](#connection-resiliency) + - [Consumer Recovery](#consumer-recovery) - [Recipes](#recipes) - [01 Multiple consumers on the same queue with different concurrency](#01-multiple-consumers-on-the-same-queue-with-different-concurrency) - [Feedback](#feedback) @@ -212,18 +214,124 @@ services.AddSlimMessageBus(mbb => **Routing Key Pattern Examples:** -| Pattern | Matches | Doesn't Match | -|---------|---------|---------------| -| `regions.na.cities.*` | `regions.na.cities.toronto`
`regions.na.cities.newyork` | `regions.na.cities` (missing segment)
`regions.na.cities.toronto.downtown` (extra segment) | -| `audit.events.#` | `audit.events.users.signup`
`audit.events.orders.placed`
`audit.events` | `audit.users` (wrong prefix) | -| `orders.#.region.*` | `orders.processed.region.na`
`orders.created.cancelled.region.eu`
`orders.region.na` | `orders.processed.state.california` (wrong pattern)
`orders.processed.region` (missing final segment) | -| `#` | Any routing key | None (matches everything) | +| Pattern | Matches | Doesn't Match | +| --------------------- | -------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------- | +| `regions.na.cities.*` | `regions.na.cities.toronto`
`regions.na.cities.newyork` | `regions.na.cities` (missing segment)
`regions.na.cities.toronto.downtown` (extra segment) | +| `audit.events.#` | `audit.events.users.signup`
`audit.events.orders.placed`
`audit.events` | `audit.users` (wrong prefix) | +| `orders.#.region.*` | `orders.processed.region.na`
`orders.created.cancelled.region.eu`
`orders.region.na` | `orders.processed.state.california` (wrong pattern)
`orders.processed.region` (missing final segment) | +| `#` | Any routing key | None (matches everything) | **Performance Note:** SlimMessageBus optimizes routing key matching by: + - Using exact matches first for better performance - Only applying wildcard pattern matching when no exact match is found - Caching routing key patterns for efficient lookup +##### Unrecognized Routing Key Handler + +When a message arrives on a queue with a routing key that doesn't match any of the configured consumer routing key patterns, the `MessageUnrecognizedRoutingKeyHandler` is invoked to determine how the message should be handled. + +**Default Behavior:** + +By default, unrecognized messages are **acknowledged (Ack)** and removed from the queue: + +```cs +// Default behavior (built-in) +settings.MessageUnrecognizedRoutingKeyHandler = (transportMessage) => RabbitMqMessageConfirmOptions.Ack; +``` + +This default is appropriate when: + +- Routing key mismatches are expected and acceptable +- You want to silently discard messages that don't match any consumer +- Your application follows a "fail-open" approach for unknown messages + +**Customizing the Handler:** + +You can customize this behavior at the bus level to handle unrecognized messages differently: + +```cs +services.AddSlimMessageBus(mbb => +{ + mbb.WithProviderRabbitMQ(cfg => + { + // Option 1: Reject unrecognized messages without requeue (send to DLX if configured) + cfg.MessageUnrecognizedRoutingKeyHandler = (transportMessage) => + RabbitMqMessageConfirmOptions.Nack; + + // Option 2: Reject and requeue for later processing + cfg.MessageUnrecognizedRoutingKeyHandler = (transportMessage) => + RabbitMqMessageConfirmOptions.Nack | RabbitMqMessageConfirmOptions.Requeue; + + // Option 3: Custom logic based on routing key or message content + cfg.MessageUnrecognizedRoutingKeyHandler = (transportMessage) => + { + var routingKey = transportMessage.RoutingKey; + + // Log for monitoring and alerting + logger.LogWarning("Unrecognized routing key: {RoutingKey} on exchange: {Exchange}", + routingKey, transportMessage.Exchange); + + // Route to DLX if it looks like a potential typo or misconfiguration + if (routingKey.StartsWith("orders.")) + { + return RabbitMqMessageConfirmOptions.Nack; // Send to DLX for investigation + } + + // Ack and discard messages from other exchanges + return RabbitMqMessageConfirmOptions.Ack; + }; + }); +}); +``` + +**Available Options:** + +- **`Ack` (default)**: Acknowledge and remove the message from the queue - use when unrecognized messages should be silently discarded +- **`Nack`**: Reject the message and route to Dead Letter Exchange (if configured) - use for debugging routing issues or when messages shouldn't be lost +- **`Nack | Requeue`**: Reject and requeue the message for retry - use when routing keys might be registered dynamically or during rolling deployments + +**Example with Dead Letter Exchange:** + +```cs +services.AddSlimMessageBus(mbb => +{ + mbb.WithProviderRabbitMQ(cfg => + { + // Send unrecognized messages to DLX for investigation + cfg.MessageUnrecognizedRoutingKeyHandler = (transportMessage) => + RabbitMqMessageConfirmOptions.Nack; + }); + + mbb.Consume(x => x + .Queue("orders-queue") + .ExchangeBinding("orders", routingKey: "orders.created") + // Unrecognized messages will be routed to this DLX + .DeadLetterExchange("orders-dlq", exchangeType: ExchangeType.Direct) + .WithConsumer()); +}); +``` + +**Handler Parameters:** + +The handler receives `BasicDeliverEventArgs` which provides access to: + +- `RoutingKey`: The routing key of the unrecognized message +- `Exchange`: The exchange the message was published to +- `Body`: The message payload (ReadOnlyMemory) +- `BasicProperties`: Message properties including headers, MessageId, ContentType, etc. + +This allows for sophisticated routing decisions based on message metadata. + +**Common Use Cases:** + +1. **Development/Debugging**: Use `Nack` to route unrecognized messages to a DLX where they can be inspected +2. **Production Monitoring**: Log unrecognized routing keys and send metrics to your monitoring system +3. **Graceful Degradation**: Use `Ack` in production to prevent queue buildup from deprecated message types +4. **Rolling Deployments**: Use `Nack | Requeue` to temporarily requeue messages during deployments when consumers might not yet be ready + +**Note:** This handler only applies to messages where the routing key doesn't match any configured consumer patterns. Messages that match a routing key pattern but fail during processing are handled by the [Consumer Error Handling](#consumer-error-handling) mechanism instead. + #### Acknowledgment Mode When a consumer processes a message from a RabbitMQ queue, it needs to acknowledge that the message was processed. RabbitMQ supports three types of acknowledgments out which two are available in SMB: @@ -487,6 +595,7 @@ When a RabbitMQ server restarts or the connection is lost, SlimMessageBus automa 5. Resumes message processing automatically This ensures that: + - Messages don't pile up in queues during temporary outages - Consumers are visible in the RabbitMQ management UI after recovery - No manual intervention is required to restore message processing diff --git a/src/SlimMessageBus.Host.RabbitMQ/Config/Delegates.cs b/src/SlimMessageBus.Host.RabbitMQ/Config/Delegates.cs index c2846365..84501b43 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Config/Delegates.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Config/Delegates.cs @@ -30,3 +30,13 @@ /// /// public delegate void RabbitMqMessageConfirmAction(RabbitMqMessageConfirmOptions option); + +/// +/// Represents the method that handles a RabbitMQ message that includes an routing key that is obsolete or non relevent from the applicatin perspective +/// Provides access to the message, its properties, and a confirmation action. +/// +/// Use this delegate to process messages received from RabbitMQ queues where the routing key is not +/// relevant or not provided. The handler is responsible for invoking the confirmation action to ensure proper message +/// acknowledgment. +/// The event arguments containing the delivered RabbitMQ message and related metadata. +public delegate RabbitMqMessageConfirmOptions RabbitMqMessageUnrecognizedRoutingKeyHandler(BasicDeliverEventArgs transportMessage); \ No newline at end of file diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs index 55ef8a03..4058d47f 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs @@ -16,6 +16,8 @@ public class RabbitMqConsumer : AbstractRabbitMqConsumer, IRabbitMqConsumer private readonly IMessageProcessor _messageProcessor; private readonly RoutingKeyMatcherService> _routingKeyMatcher; + private readonly RabbitMqMessageUnrecognizedRoutingKeyHandler _messageUnrecognizedRoutingKeyHandler; + protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => _acknowledgementMode; public RabbitMqConsumer( @@ -23,7 +25,7 @@ public RabbitMqConsumer( IRabbitMqChannel channel, string queueName, IList consumers, - MessageBusBase messageBus, + MessageBusBase messageBus, MessageProvider messageProvider, IHeaderValueConverter headerValueConverter) : base(loggerFactory.CreateLogger(), @@ -33,6 +35,7 @@ public RabbitMqConsumer( queueName, headerValueConverter) { + _messageUnrecognizedRoutingKeyHandler = messageBus.ProviderSettings.MessageUnrecognizedRoutingKeyHandler; _acknowledgementMode = consumers.Select(x => x.GetOrDefault(RabbitMqProperties.MessageAcknowledgementMode, messageBus.Settings)).FirstOrDefault(x => x != null) ?? RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; // be default choose the safer acknowledgement mode @@ -102,7 +105,7 @@ protected override async Task OnStop() await base.OnStop(); } - private void InitializeConsumerContext(BasicDeliverEventArgs transportMessage, ConsumerContext consumerContext) + internal void InitializeConsumerContext(BasicDeliverEventArgs transportMessage, ConsumerContext consumerContext) { if (_acknowledgementMode == RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit) { @@ -164,6 +167,8 @@ protected override async Task OnMessageReceived(Dictionary. /// public IHeaderValueConverter HeaderValueConverter { get; set; } = new DefaultHeaderValueConverter(); + + /// + /// Allows to handle messages that arrive with an unrecognized routing key and decide what to do with them. + /// By default the message is Acknowledged. + /// + public RabbitMqMessageUnrecognizedRoutingKeyHandler MessageUnrecognizedRoutingKeyHandler { get; set; } = (_) => RabbitMqMessageConfirmOptions.Ack; } diff --git a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Consumers/RabbitMqConsumerTests.cs b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Consumers/RabbitMqConsumerTests.cs new file mode 100644 index 00000000..8fadf365 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Consumers/RabbitMqConsumerTests.cs @@ -0,0 +1,369 @@ +namespace SlimMessageBus.Host.RabbitMQ.Test.Consumers; + +using global::RabbitMQ.Client; +using global::RabbitMQ.Client.Events; + +using SlimMessageBus.Host.Collections; + +public class RabbitMqConsumerTests : IAsyncLifetime +{ + private readonly Mock _channelMock; + private readonly Mock _modelMock; + private readonly Mock _headerValueConverterMock; + private readonly Mock _loggerFactoryMock; + private readonly Mock> _consumerLoggerMock; + private readonly TestableMessageBus _messageBus; + private readonly Mock> _messageProviderMock; + private readonly ServiceProvider _serviceProvider; + private readonly List _consumersToDispose; + + public RabbitMqConsumerTests() + { + _channelMock = new Mock(); + _modelMock = new Mock(); + _headerValueConverterMock = new Mock(); + _loggerFactoryMock = new Mock(); + _consumerLoggerMock = new Mock>(); + _messageProviderMock = new Mock>(); + _consumersToDispose = []; + + // Setup default mock behavior + _channelMock.Setup(x => x.Channel).Returns(_modelMock.Object); + _channelMock.Setup(x => x.ChannelLock).Returns(new object()); + _modelMock.Setup(x => x.IsOpen).Returns(true); + + // Setup logger factory to return the consumer logger mock for any string parameter + _loggerFactoryMock.Setup(x => x.CreateLogger(It.IsAny())).Returns(_consumerLoggerMock.Object); + + // Setup service provider + var services = new ServiceCollection(); + services.AddSingleton(sp => Mock.Of()); + services.AddSingleton(sp => new AssemblyQualifiedNameMessageTypeResolver()); + services.AddSingleton(); + services.AddSingleton(TimeProvider.System); + services.AddSingleton(sp => new PendingRequestManager( + new InMemoryPendingRequestStore(), + sp.GetRequiredService(), + NullLoggerFactory.Instance)); + _serviceProvider = services.BuildServiceProvider(); + + // Create actual MessageBus instance instead of mocking it + var messageBusSettings = new MessageBusSettings + { + ServiceProvider = _serviceProvider + }; + var providerSettings = new RabbitMqMessageBusSettings(); + + _messageBus = new TestableMessageBus(messageBusSettings, providerSettings); + + _headerValueConverterMock.Setup(x => x.ConvertFrom(It.IsAny())) + .Returns((object o) => o); + } + + public Task InitializeAsync() => Task.CompletedTask; + + public async Task DisposeAsync() + { + foreach (var consumer in _consumersToDispose) + { + if (consumer != null) + { + await consumer.DisposeAsync(); + } + } + _consumersToDispose.Clear(); + + _serviceProvider?.Dispose(); + } + + [Fact] + public void When_ConstructorCalled_Given_SingleConsumer_Then_ShouldInitializeWithSingleProcessor() + { + // Arrange + var consumers = CreateConsumerSettings("test-queue", ""); + + // Act + var consumer = CreateRabbitMqConsumer("test-queue", consumers); + + // Assert + consumer.Should().NotBeNull(); + consumer.Path.Should().Be("test-queue"); + } + + [Fact] + public void When_ConstructorCalled_Given_MultipleConsumersWithDifferentRoutingKeys_Then_ShouldInitializeRoutingKeyMatcher() + { + // Arrange + var consumers = new List + { + CreateConsumerSettings("test-queue", "routing.key.1")[0], + CreateConsumerSettings("test-queue", "routing.key.2")[0], + CreateConsumerSettings("test-queue", "routing.*.wildcard")[0] + }; + + // Act + var consumer = CreateRabbitMqConsumer("test-queue", consumers); + + // Assert + consumer.Should().NotBeNull(); + consumer.Path.Should().Be("test-queue"); + } + + [Fact] + public async Task When_OnMessageReceived_Given_MessageWithUnrecognizedRoutingKey_Then_ShouldCallUnrecognizedHandler() + { + // Arrange + var consumers = CreateConsumerSettings("test-queue", "known.routing.key"); + var consumer = CreateRabbitMqConsumer("test-queue", consumers); + + var deliverEventArgs = CreateBasicDeliverEventArgs(routingKey: "unknown.routing.key"); + var messageHeaders = new Dictionary(); + + // Act + var exception = await consumer.OnMessageReceivedPublic(messageHeaders, deliverEventArgs); + + // Assert + exception.Should().BeNull(); + + // Verify message was acknowledged (default behavior for unrecognized routing key) + _modelMock.Verify(x => x.BasicAck(deliverEventArgs.DeliveryTag, false), Times.Once); + } + + [Fact] + public async Task When_OnMessageReceived_Given_AckMessageBeforeProcessingMode_Then_ShouldAckBeforeProcessing() + { + // Arrange + var consumers = CreateConsumerSettings("test-queue", ""); + RabbitMqProperties.MessageAcknowledgementMode.Set(consumers[0], RabbitMqMessageAcknowledgementMode.AckMessageBeforeProcessing); + + var consumer = CreateRabbitMqConsumer("test-queue", consumers); + + var deliverEventArgs = CreateBasicDeliverEventArgs(); + var messageHeaders = new Dictionary(); + + // Act + await consumer.OnMessageReceivedPublic(messageHeaders, deliverEventArgs); + + // Assert - Should be called once (before processing) + _modelMock.Verify(x => x.BasicAck(deliverEventArgs.DeliveryTag, false), Times.Once); + } + + [Theory] + [InlineData(RabbitMqMessageConfirmOptions.Ack, 1, 0, false)] + [InlineData(RabbitMqMessageConfirmOptions.Nack, 0, 1, false)] + [InlineData(RabbitMqMessageConfirmOptions.Nack | RabbitMqMessageConfirmOptions.Requeue, 0, 1, true)] + public void When_ConfirmMessage_Given_ConfirmOptions_Then_ShouldCallAppropriateMethod( + RabbitMqMessageConfirmOptions option, + int expectedAckCalls, + int expectedNackCalls, + bool expectedRequeue) + { + // Arrange + var consumers = CreateConsumerSettings("test-queue", ""); + var consumer = CreateRabbitMqConsumer("test-queue", consumers); + + var deliverEventArgs = CreateBasicDeliverEventArgs(); + var properties = new Dictionary(); + + // Act + consumer.ConfirmMessage(deliverEventArgs, option, properties); + + // Assert + _modelMock.Verify(x => x.BasicAck(deliverEventArgs.DeliveryTag, false), Times.Exactly(expectedAckCalls)); + _modelMock.Verify(x => x.BasicNack(deliverEventArgs.DeliveryTag, false, expectedRequeue), Times.Exactly(expectedNackCalls)); + properties.Should().ContainKey(RabbitMqConsumer.ContextProperty_MessageConfirmed); + } + + [Fact] + public void When_ConfirmMessage_Given_MessageAlreadyConfirmed_Then_ShouldNotConfirmAgain() + { + // Arrange + var consumers = CreateConsumerSettings("test-queue", ""); + var consumer = CreateRabbitMqConsumer("test-queue", consumers); + + var deliverEventArgs = CreateBasicDeliverEventArgs(); + var properties = new Dictionary + { + { RabbitMqConsumer.ContextProperty_MessageConfirmed, true } + }; + + // Act + consumer.ConfirmMessage(deliverEventArgs, RabbitMqMessageConfirmOptions.Ack, properties, warnIfAlreadyConfirmed: true); + + // Assert + _modelMock.Verify(x => x.BasicAck(It.IsAny(), It.IsAny()), Times.Never); + _modelMock.Verify(x => x.BasicNack(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); + } + + [Fact] + public void When_InitializeConsumerContext_Given_AckAutomaticByRabbitMode_Then_ShouldMarkAsConfirmed() + { + // Arrange + var consumers = CreateConsumerSettings("test-queue", ""); + RabbitMqProperties.MessageAcknowledgementMode.Set(consumers[0], RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit); + + var consumer = CreateRabbitMqConsumer("test-queue", consumers); + + var transportMessage = CreateBasicDeliverEventArgs(); + var consumerContext = new ConsumerContext(); + + // Act + consumer.InitializeConsumerContext(transportMessage, consumerContext); + + // Assert + consumerContext.Properties.Should().ContainKey(RabbitMqConsumer.ContextProperty_MessageConfirmed); + consumerContext.Properties[RabbitMqConsumer.ContextProperty_MessageConfirmed].Should().Be(true); + } + + [Fact] + public void When_ConstructorCalled_Given_MultipleConsumersWithDifferentInstances_Then_ShouldUseMaxInstances() + { + // Arrange + var consumers = new List + { + CreateConsumerSettings("test-queue", "routing.key.1", instances: 5)[0], + CreateConsumerSettings("test-queue", "routing.key.2", instances: 10)[0] + }; + + // Act + var consumer = CreateRabbitMqConsumer("test-queue", consumers); + + // Assert - The consumer should be created and use max instances internally + consumer.Should().NotBeNull(); + } + + [Fact] + public async Task When_OnStop_Given_ActiveConsumer_Then_ShouldWaitForBackgroundTasks() + { + // Arrange + var consumers = CreateConsumerSettings("test-queue", ""); + var consumer = CreateRabbitMqConsumer("test-queue", consumers); + + // Act + var stopTask = consumer.Stop(); + await stopTask; + + // Assert + stopTask.IsCompletedSuccessfully.Should().BeTrue(); + } + + [Fact] + public async Task When_DisposeAsync_Given_ActiveConsumer_Then_ShouldDisposeMessageProcessors() + { + // Arrange + var consumers = CreateConsumerSettings("test-queue", ""); + var consumer = CreateRabbitMqConsumer("test-queue", consumers); + + // Act + var act = async () => await consumer.DisposeAsync(); + + // Assert + await act.Should().NotThrowAsync(); + } + + [Fact] + public void When_InitializeConsumerContext_Given_ValidMessage_Then_ShouldSetTransportMessageAndConfirmAction() + { + // Arrange + var consumers = CreateConsumerSettings("test-queue", ""); + var consumer = CreateRabbitMqConsumer("test-queue", consumers); + + var transportMessage = CreateBasicDeliverEventArgs(); + var consumerContext = new ConsumerContext(); + + // Act + consumer.InitializeConsumerContext(transportMessage, consumerContext); + + // Assert + consumerContext.GetTransportMessage().Should().BeSameAs(transportMessage); + consumerContext.Properties.Should().ContainKey("RabbitMq_MessageConfirmAction"); + } + + private TestableRabbitMqConsumer CreateRabbitMqConsumer(string queueName, IList consumers) + { + var consumer = new TestableRabbitMqConsumer( + _loggerFactoryMock.Object, + _channelMock.Object, + queueName, + consumers, + _messageBus, + _messageProviderMock.Object, + _headerValueConverterMock.Object); + + _consumersToDispose.Add(consumer); + + // Start the consumer to initialize the CancellationToken + consumer.Start().Wait(); + + return consumer; + } + + private static IList CreateConsumerSettings(string queueName, string routingKey, int instances = 1) + { + var consumerSettings = new ConsumerSettings + { + Path = "test-exchange", + MessageType = typeof(TestMessage), + ConsumerType = typeof(TestConsumer), + ConsumerMethod = (consumer, message, ctx, ct) => Task.CompletedTask, + Instances = instances + }; + + RabbitMqProperties.QueueName.Set(consumerSettings, queueName); + RabbitMqProperties.BindingRoutingKey.Set(consumerSettings, routingKey); + + return [consumerSettings]; + } + + private static BasicDeliverEventArgs CreateBasicDeliverEventArgs( + Dictionary headers = null, + ulong deliveryTag = 1, + string routingKey = "test.routing.key") + { + var properties = new Mock(); + properties.Setup(x => x.Headers).Returns(headers); + + return new BasicDeliverEventArgs + { + DeliveryTag = deliveryTag, + Exchange = "test-exchange", + RoutingKey = routingKey, + BasicProperties = properties.Object, + Body = new ReadOnlyMemory(Array.Empty()) + }; + } + + // Test message and consumer types + private class TestMessage + { + public string Content { get; set; } + } + + private class TestConsumer + { + [System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1822:Mark members as static", Justification = "For the test we need a non static member")] + public Task Consume(TestMessage message) => Task.CompletedTask; + } + + // Testable subclass that exposes protected methods + private class TestableRabbitMqConsumer( + ILoggerFactory loggerFactory, + IRabbitMqChannel channel, + string queueName, + IList consumers, + MessageBusBase messageBus, + MessageProvider messageProvider, + IHeaderValueConverter headerValueConverter) : RabbitMqConsumer(loggerFactory, channel, queueName, consumers, messageBus, messageProvider, headerValueConverter) + { + public Task OnMessageReceivedPublic(Dictionary messageHeaders, BasicDeliverEventArgs transportMessage) + => OnMessageReceived(messageHeaders, transportMessage); + } + + // Testable MessageBus for testing + private class TestableMessageBus(MessageBusSettings settings, RabbitMqMessageBusSettings providerSettings) + : MessageBusBase(settings, providerSettings) + { + public override Task ProduceToTransport(object message, Type messageType, string path, IDictionary messageHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken) + => Task.CompletedTask; + } +}