Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,25 @@ public async Task Should_not_add_batch_dispatch_events_when_no_batched_messages(
}
}

[Test]
public async Task Should_use_default_dispatch_consistency_when_dispatching_from_outbox()
{
var messageId = "id";
var properties = new DispatchProperties { ["Destination"] = "myEndpoint" };

fakeOutbox.ExistingMessage = new OutboxMessage(messageId, new[]
{
new NServiceBus.Outbox.TransportOperation("x", properties, Array.Empty<byte>(), [])
});

var context = CreateContext(fakeBatchPipeline, messageId);

await Invoke(context);

var operation = fakeBatchPipeline.TransportOperations.First();
Assert.That(operation.RequiredDispatchConsistency, Is.EqualTo(DispatchConsistency.Default));
}

static TestableTransportReceiveContext CreateContext(FakeBatchPipeline pipeline, string messageId)
{
var context = new TestableTransportReceiveContext
Expand Down
2 changes: 1 addition & 1 deletion src/NServiceBus.Core/NServiceBus.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<TargetsForTfmSpecificContentInPackage>$(TargetsForTfmSpecificContentInPackage);AddPropsFileToPackage</TargetsForTfmSpecificContentInPackage>
</PropertyGroup>

<ItemGroup Label="Remove this ItemGroup when FastExpressionCompiler stops included test code in the package">
<ItemGroup Label="Remove this ItemGroup when FastExpressionCompiler stops included test code in the package" Condition="'$(PkgFastExpressionCompiler_Internal_src)' != ''">
<Compile Remove="$(PkgFastExpressionCompiler_Internal_src)\**\TestTools.cs" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ static void ConvertToPendingOperations(OutboxMessage deduplicationEntry, Pending
message,
DeserializeRoutingStrategy(operation.Options),
operation.Options,
DispatchConsistency.Isolated
DispatchConsistency.Default
));
}
}
Expand Down

This file was deleted.

8 changes: 3 additions & 5 deletions src/NServiceBus.Core/Reliability/Outbox/Outbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,20 @@ protected override void Setup(FeatureConfigurationContext context)
return;
}

// ForceBatchDispatchToBeIsolatedBehavior set the dispatch consistency to isolated which instructs
// the transport to not enlist the outgoing operation in the incoming message transaction. Unfortunately
// this is not enough. We cannot allow the transport to operate in SendsWithAtomicReceive because a transport
// We cannot allow the transport to operate in SendsWithAtomicReceive because a transport
// might then only release the outgoing operations when the incoming transport transaction is committed meaning
// the actual sends would happen after we have set the outbox record as dispatched and not as part of
// TransportReceiveToPhysicalMessageConnector fork into the batched dispatched phase. Should acknowledging
// the incoming operation fail and the message be retried we would already have cleared the outbox record's
// transport operations leading to outgoing message loss.
// transport operations leading to outgoing message loss. ReceiveOnly mode prevents outgoing messages from
// being enlisted in the receive transaction, so there's no need to force DispatchConsistency.Isolated.
if (context.Settings.GetRequiredTransactionModeForReceives() != TransportTransactionMode.ReceiveOnly)
{
throw new Exception(
$"Outbox requires transport to be running in `{nameof(TransportTransactionMode.ReceiveOnly)}` mode. Use the `{nameof(TransportDefinition.TransportTransactionMode)}` property on the transport definition to specify the transaction mode.");
}

//note: in the future we should change the persister api to give us a "outbox factory" so that we can register it in DI here instead of relying on the persister to do it
context.Pipeline.Register("ForceBatchDispatchToBeIsolated", new ForceBatchDispatchToBeIsolatedBehavior(), "Makes sure that we dispatch straight to the transport so that we can safely set the outbox record to dispatched once the dispatch pipeline returns.");
}

internal const string TimeToKeepDeduplicationEntries = "Outbox.TimeToKeepDeduplicationEntries";
Expand Down
Loading