diff --git a/src/NServiceBus.Core.Tests/Reliability/Outbox/TransportReceiveToPhysicalMessageConnectorTests.cs b/src/NServiceBus.Core.Tests/Reliability/Outbox/TransportReceiveToPhysicalMessageConnectorTests.cs index 96e3f57b8e8..067a8b10c2c 100644 --- a/src/NServiceBus.Core.Tests/Reliability/Outbox/TransportReceiveToPhysicalMessageConnectorTests.cs +++ b/src/NServiceBus.Core.Tests/Reliability/Outbox/TransportReceiveToPhysicalMessageConnectorTests.cs @@ -174,6 +174,25 @@ public async Task Should_not_add_batch_dispatch_events_when_no_batched_messages( } } + [Test] + public async Task Should_use_default_dispatch_consistency_when_dispatching_from_outbox() + { + var messageId = "id"; + var properties = new DispatchProperties { ["Destination"] = "myEndpoint" }; + + fakeOutbox.ExistingMessage = new OutboxMessage(messageId, new[] + { + new NServiceBus.Outbox.TransportOperation("x", properties, Array.Empty(), []) + }); + + var context = CreateContext(fakeBatchPipeline, messageId); + + await Invoke(context); + + var operation = fakeBatchPipeline.TransportOperations.First(); + Assert.That(operation.RequiredDispatchConsistency, Is.EqualTo(DispatchConsistency.Default)); + } + static TestableTransportReceiveContext CreateContext(FakeBatchPipeline pipeline, string messageId) { var context = new TestableTransportReceiveContext diff --git a/src/NServiceBus.Core/NServiceBus.Core.csproj b/src/NServiceBus.Core/NServiceBus.Core.csproj index 89a27666363..35094497a66 100644 --- a/src/NServiceBus.Core/NServiceBus.Core.csproj +++ b/src/NServiceBus.Core/NServiceBus.Core.csproj @@ -32,7 +32,7 @@ $(TargetsForTfmSpecificContentInPackage);AddPropsFileToPackage - + diff --git a/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs b/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs index 2bb3eb0062e..e46f08a4b78 100644 --- a/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs +++ b/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs @@ -92,7 +92,7 @@ static void ConvertToPendingOperations(OutboxMessage deduplicationEntry, Pending message, DeserializeRoutingStrategy(operation.Options), operation.Options, - DispatchConsistency.Isolated + DispatchConsistency.Default )); } } diff --git a/src/NServiceBus.Core/Reliability/Outbox/ForceBatchDispatchToBeIsolatedBehavior.cs b/src/NServiceBus.Core/Reliability/Outbox/ForceBatchDispatchToBeIsolatedBehavior.cs deleted file mode 100644 index 195e34d3212..00000000000 --- a/src/NServiceBus.Core/Reliability/Outbox/ForceBatchDispatchToBeIsolatedBehavior.cs +++ /dev/null @@ -1,23 +0,0 @@ -namespace NServiceBus; - -using System; -using System.Threading.Tasks; -using Pipeline; -using Transport; - -class ForceBatchDispatchToBeIsolatedBehavior : IBehavior -{ - public Task Invoke(IBatchDispatchContext context, Func next) - { - foreach (var operation in context.Operations) - { - // Changing the dispatch consistency to be isolated to make sure the transport doesn't - // enlist the operations in the receive transaction. The transport might still want to batch - // operations for efficiency reasons but should never enlist in the incoming transport transaction. - // Otherwise a failure to ACK the incoming message after Outbox storage has been set to Dispatched - // would result in outgoing message loss. - operation.RequiredDispatchConsistency = DispatchConsistency.Isolated; - } - return next(context); - } -} \ No newline at end of file diff --git a/src/NServiceBus.Core/Reliability/Outbox/Outbox.cs b/src/NServiceBus.Core/Reliability/Outbox/Outbox.cs index e5e1f3ef5a0..135fdcbb452 100644 --- a/src/NServiceBus.Core/Reliability/Outbox/Outbox.cs +++ b/src/NServiceBus.Core/Reliability/Outbox/Outbox.cs @@ -49,14 +49,13 @@ protected override void Setup(FeatureConfigurationContext context) return; } - // ForceBatchDispatchToBeIsolatedBehavior set the dispatch consistency to isolated which instructs - // the transport to not enlist the outgoing operation in the incoming message transaction. Unfortunately - // this is not enough. We cannot allow the transport to operate in SendsWithAtomicReceive because a transport + // We cannot allow the transport to operate in SendsWithAtomicReceive because a transport // might then only release the outgoing operations when the incoming transport transaction is committed meaning // the actual sends would happen after we have set the outbox record as dispatched and not as part of // TransportReceiveToPhysicalMessageConnector fork into the batched dispatched phase. Should acknowledging // the incoming operation fail and the message be retried we would already have cleared the outbox record's - // transport operations leading to outgoing message loss. + // transport operations leading to outgoing message loss. ReceiveOnly mode prevents outgoing messages from + // being enlisted in the receive transaction, so there's no need to force DispatchConsistency.Isolated. if (context.Settings.GetRequiredTransactionModeForReceives() != TransportTransactionMode.ReceiveOnly) { throw new Exception( @@ -64,7 +63,6 @@ protected override void Setup(FeatureConfigurationContext context) } //note: in the future we should change the persister api to give us a "outbox factory" so that we can register it in DI here instead of relying on the persister to do it - context.Pipeline.Register("ForceBatchDispatchToBeIsolated", new ForceBatchDispatchToBeIsolatedBehavior(), "Makes sure that we dispatch straight to the transport so that we can safely set the outbox record to dispatched once the dispatch pipeline returns."); } internal const string TimeToKeepDeduplicationEntries = "Outbox.TimeToKeepDeduplicationEntries";