From e0bea6d99edf46df7cdb18fa764a3b4805db2fe2 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Mon, 26 Jan 2026 22:14:26 +0100 Subject: [PATCH] Hack dispatch properties --- .../RoutingToDispatchConnectorTests.cs | 14 +++++----- .../Outgoing/ImmediateDispatchTerminator.cs | 12 ++------ .../Outgoing/RoutingToDispatchConnector.cs | 28 +++++++++++++++++-- .../Pipeline/Outgoing/SendComponent.cs | 3 +- .../Transports/ErrorContext.cs | 7 +++++ .../Transports/MessageContext.cs | 7 +++++ .../Transports/TransportDefinition.cs | 6 ++++ 7 files changed, 57 insertions(+), 20 deletions(-) diff --git a/src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs b/src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs index fa88ed971f3..9b58a6a5c54 100644 --- a/src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs +++ b/src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs @@ -17,7 +17,7 @@ public class RoutingToDispatchConnectorTests [Test] public async Task Should_preserve_message_state_for_one_routing_strategy_for_allocation_reasons() { - var behavior = new RoutingToDispatchConnector(); + var behavior = new RoutingToDispatchConnector([]); IEnumerable operations = null; var testableRoutingContext = new TestableRoutingContext { @@ -59,7 +59,7 @@ await behavior.Invoke(testableRoutingContext, context => [Test] public async Task Should_copy_message_state_for_multiple_routing_strategies() { - var behavior = new RoutingToDispatchConnector(); + var behavior = new RoutingToDispatchConnector([]); IEnumerable operations = null; var testableRoutingContext = new TestableRoutingContext { @@ -124,7 +124,7 @@ await behavior.Invoke(testableRoutingContext, context => [Test] public async Task Should_preserve_headers_generated_by_custom_routing_strategy() { - var behavior = new RoutingToDispatchConnector(); + var behavior = new RoutingToDispatchConnector([]); Dictionary headers = null; await behavior.Invoke(new TestableRoutingContext { RoutingStrategies = [new HeaderModifyingRoutingStrategy()] }, context => { @@ -142,7 +142,7 @@ public async Task Should_dispatch_immediately_if_user_requested() options.RequireImmediateDispatch(); var dispatched = false; - var behavior = new RoutingToDispatchConnector(); + var behavior = new RoutingToDispatchConnector([]); var message = new OutgoingMessage("ID", [], Array.Empty()); await behavior.Invoke(new RoutingContext(message, @@ -159,7 +159,7 @@ await behavior.Invoke(new RoutingContext(message, public async Task Should_dispatch_immediately_if_not_sending_from_a_handler() { var dispatched = false; - var behavior = new RoutingToDispatchConnector(); + var behavior = new RoutingToDispatchConnector([]); var message = new OutgoingMessage("ID", [], Array.Empty()); await behavior.Invoke(new RoutingContext(message, @@ -176,7 +176,7 @@ await behavior.Invoke(new RoutingContext(message, public async Task Should_not_dispatch_by_default() { var dispatched = false; - var behavior = new RoutingToDispatchConnector(); + var behavior = new RoutingToDispatchConnector([]); var message = new OutgoingMessage("ID", [], Array.Empty()); await behavior.Invoke(new RoutingContext(message, @@ -192,7 +192,7 @@ await behavior.Invoke(new RoutingContext(message, [Test] public async Task Should_promote_message_headers_to_pipeline_activity() { - var behavior = new RoutingToDispatchConnector(); + var behavior = new RoutingToDispatchConnector([]); var routingContext = new TestableRoutingContext(); routingContext.Message.Headers[Headers.ContentType] = "test content type"; // one of the headers that will be mapped to tags diff --git a/src/NServiceBus.Core/Pipeline/Outgoing/ImmediateDispatchTerminator.cs b/src/NServiceBus.Core/Pipeline/Outgoing/ImmediateDispatchTerminator.cs index c6ac3a760bd..7e57b38b23a 100644 --- a/src/NServiceBus.Core/Pipeline/Outgoing/ImmediateDispatchTerminator.cs +++ b/src/NServiceBus.Core/Pipeline/Outgoing/ImmediateDispatchTerminator.cs @@ -2,24 +2,16 @@ namespace NServiceBus; -using System.Linq; using System.Threading.Tasks; using Pipeline; using Transport; -class ImmediateDispatchTerminator : PipelineTerminator +class ImmediateDispatchTerminator(IMessageDispatcher dispatcher) : PipelineTerminator { - public ImmediateDispatchTerminator(IMessageDispatcher dispatcher) - { - this.dispatcher = dispatcher; - } - protected override Task Terminate(IDispatchContext context) { var transaction = context.Extensions.GetOrCreate(); - var operations = context.Operations as TransportOperation[] ?? context.Operations.ToArray(); + var operations = context.Operations as TransportOperation[] ?? [.. context.Operations]; return dispatcher.Dispatch(new TransportOperations(operations), transaction, context.CancellationToken); } - - readonly IMessageDispatcher dispatcher; } \ No newline at end of file diff --git a/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs b/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs index c8181545073..7bda3cb3dfa 100644 --- a/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs +++ b/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs @@ -3,6 +3,7 @@ namespace NServiceBus; using System; +using System.Collections.Frozen; using System.Diagnostics; using System.Text; using System.Threading.Tasks; @@ -12,7 +13,7 @@ namespace NServiceBus; using Routing; using Transport; -class RoutingToDispatchConnector : StageConnector +class RoutingToDispatchConnector(FrozenSet dispatchPropertyNamesToPropagate) : StageConnector { public override Task Invoke(IRoutingContext context, Func stage) { @@ -26,13 +27,36 @@ public override Task Invoke(IRoutingContext context, Func 0) + { + shouldPropagate = context.Extensions.TryGet("IncomingMessage.DispatchProperties", out dispatchProperties); + } + else + { + dispatchProperties = null; + shouldPropagate = false; + } + var operations = new TransportOperation[context.RoutingStrategies.Count]; var index = 0; // when there are more than one routing strategy we want to make sure each transport operation is independent var copySharedMutableMessageState = context.RoutingStrategies.Count > 1; foreach (var strategy in context.RoutingStrategies) { - operations[index] = context.ToTransportOperation(strategy, dispatchConsistency, copySharedMutableMessageState); + var transportOperation = context.ToTransportOperation(strategy, dispatchConsistency, copySharedMutableMessageState); + if (shouldPropagate) + { + foreach (var propertyName in dispatchPropertyNamesToPropagate) + { + if (dispatchProperties?.TryGetValue(propertyName, out var propertyValue) is true && !transportOperation.Properties.TryAdd(propertyName, propertyValue)) + { + // explicitly set on the outgoing operation, do not override + } + } + } + operations[index] = transportOperation; index++; } diff --git a/src/NServiceBus.Core/Pipeline/Outgoing/SendComponent.cs b/src/NServiceBus.Core/Pipeline/Outgoing/SendComponent.cs index 8228a648012..f5ef42b7bb9 100644 --- a/src/NServiceBus.Core/Pipeline/Outgoing/SendComponent.cs +++ b/src/NServiceBus.Core/Pipeline/Outgoing/SendComponent.cs @@ -6,6 +6,7 @@ namespace NServiceBus; using MessageInterfaces; using Microsoft.Extensions.DependencyInjection; using Pipeline; +using Settings; using Transport; class SendComponent @@ -27,7 +28,7 @@ public static SendComponent Initialize(PipelineSettings pipelineSettings, Hostin pipelineSettings.Register(new OutgoingPhysicalToRoutingConnector(), "Starts the message dispatch pipeline"); - pipelineSettings.Register(new RoutingToDispatchConnector(), "Decides if the current message should be batched or immediately be dispatched to the transport"); + pipelineSettings.Register(b => new RoutingToDispatchConnector(b.GetRequiredService().Get().DispatchPropertyNamesToPreserve), "Decides if the current message should be batched or immediately be dispatched to the transport"); pipelineSettings.Register(new BatchToDispatchConnector(), "Passes batched messages over to the immediate dispatch part of the pipeline"); pipelineSettings.Register(b => new ImmediateDispatchTerminator(b.GetRequiredService()), "Hands the outgoing messages over to the transport for immediate delivery"); diff --git a/src/NServiceBus.Core/Transports/ErrorContext.cs b/src/NServiceBus.Core/Transports/ErrorContext.cs index 52829dd0970..10442328806 100644 --- a/src/NServiceBus.Core/Transports/ErrorContext.cs +++ b/src/NServiceBus.Core/Transports/ErrorContext.cs @@ -38,6 +38,13 @@ public ErrorContext(Exception exception, Dictionary headers, str DelayedDeliveriesPerformed = Message.GetDelayedDeliveriesPerformed(); Extensions = context; + + if (context.TryGet(out var dispatchProperties)) + { + context.Remove(); + // Hack hardcoded string for now + context.Set("IncomingMessage.DispatchProperties", dispatchProperties); + } } /// diff --git a/src/NServiceBus.Core/Transports/MessageContext.cs b/src/NServiceBus.Core/Transports/MessageContext.cs index 3c1fed2cd6f..fac26aef7d3 100644 --- a/src/NServiceBus.Core/Transports/MessageContext.cs +++ b/src/NServiceBus.Core/Transports/MessageContext.cs @@ -33,6 +33,13 @@ public MessageContext(string nativeMessageId, Dictionary headers ReceiveAddress = receiveAddress; TransportTransaction = transportTransaction; + if (context.TryGet(out var dispatchProperties)) + { + context.Remove(); + // Hack hardcoded string for now + context.Set("IncomingMessage.DispatchProperties", dispatchProperties); + } + context.GetOrCreate(); } diff --git a/src/NServiceBus.Core/Transports/TransportDefinition.cs b/src/NServiceBus.Core/Transports/TransportDefinition.cs index 8b7e4f6a350..2b1b3503bc8 100644 --- a/src/NServiceBus.Core/Transports/TransportDefinition.cs +++ b/src/NServiceBus.Core/Transports/TransportDefinition.cs @@ -3,6 +3,7 @@ namespace NServiceBus.Transport; using System; +using System.Collections.Frozen; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -84,6 +85,11 @@ public virtual TransportTransactionMode TransportTransactionMode /// public bool SupportsTTBR { get; } + /// + /// TBD + /// + protected internal virtual FrozenSet DispatchPropertyNamesToPreserve { get; } = []; + /// /// Allows the transport to register required services into the service collection. ///