Replace expression-tree pipeline execution with trampoline parts#7625
Replace expression-tree pipeline execution with trampoline parts#7625danielmarbach wants to merge 13 commits intomasterfrom
Conversation
4720d88 to
ab25ceb
Compare
ab25ceb to
cd446c9
Compare
|
Open question / assumption to validate:
https://docs.particular.net/nservicebus/pipeline/steps-stages-connectors @mikeminutillo maybe knows more. In theory I could add a fallback invocation, but that would require me to re-introduce generic method creation and currently I think that would be unnecessary. |
|
After analyzing the codebase, I can confirm that the core pipeline cannot be extended with custom stages or stage forks via public API, even ignoring the PipelineInvokers optimization.
While |
…roved clarity and reuse.
|
Any approvals? |
| [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
| public static Task Next(IBehaviorContext ctx) | ||
| { | ||
| var nextIndex = ctx.Extensions.AdvanceFrame(out var reachedEnd); |
There was a problem hiding this comment.
Does this mean that in order to call next multiple times in a single behavior you would need to create a new context first? For instance, what happens if someone has something like this in their environment?
class RetryDispatchesBehavior : Behavior<IDispatchContext>
{
public override async Task Invoke(IDispatchContext context, Func<Task> next)
{
var attempts = 0;
while(true)
{
try
{
attempts++;
await next();
break;
}
catch
{
if(attempts > 2)
throw;
}
}
}
}I believe that will work with the current pipeline implementation but here each call to next() will advance the frame, won't it?
I think the only place we do something like this is in the LoadHandlersConnector, where we do create a new context for each invocation. In any stage connector we'd have to be creating a new context (for the next stage).
There was a problem hiding this comment.
That is a pretty common pattern when I created custom behaviors that introduce duplicate message processing anomalies
There was a problem hiding this comment.
@SzymonPobiega @mikeminutillo excellent catch. There were no tests that explicitly tested that, so I completely forgot this valid scenario, which would break with this approach. I will mark this one as draft for now to rethink the design and submit a PR to add a test to Core about this expected behavior
There was a problem hiding this comment.
I mean the commit I pushed would fix it and have similar perf characteristics on the execution path but significantly slow down the exception path. I think it could be an OK trade-off, but I want to chew on this a bit further.
There was a problem hiding this comment.
Well, the credit goes to @mikeminutillo here. I would probably not notice it if not for his code snippet.
SzymonPobiega
left a comment
There was a problem hiding this comment.
I don't think I follow how this behavior changes the stack impact. My understanding is that when a behavior calls next(), it moves the control back to the PipelineRunner but the previous behavior needs to still be on the stack because we resume its execution after returning from next(). Or am I missing something?
I was also wondering how much of the performance is gained via "magic" unsafe optimizations vs using a different approach to the pipeline execution.
| [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
| public static Task Next(IBehaviorContext ctx) | ||
| { | ||
| var nextIndex = ctx.Extensions.AdvanceFrame(out var reachedEnd); |
There was a problem hiding this comment.
That is a pretty common pattern when I created custom behaviors that introduce duplicate message processing anomalies
|
Closed in favour of #7631 |
This PR aligns pipeline execution with a simplified trampoline model, keeps state on the context bag, and improves AOT/trimming friendliness while minimizing broader architectural change.
The trampoline model executes the pipeline as a flat loop-like progression over prebuilt parts, instead of recursively composing many nested delegates at runtime.
PipelinePart[]represents the execution plan.Index,RangeEnd) tracks current position/range.StageRunners.Next, increments index, and dispatches the next part.Nextentrypoint).This gives predictable control flow, avoids runtime code generation, and keeps hot-path dispatch compact.
Benefits
coupling.
more static.
Additional Notes
Pipeline Frame Safety
Cost of Adding a New Pipeline
This change does add a small amount of explicit wiring when introducing a brand-new pipeline. That should be called out, but in context:
So the tradeoff is intentional:
Benchmarks
https://github.com/danielmarbach/MicroBenchmarks and branches starting with
bare-metalExecution
Throwing
Warmup
Alternatives Considered
See https://github.com/danielmarbach/PipelinePlayground and and branches starting with
bare-metaland invokersPipeline Architecture
The Pipeline is a Trampoline in Disguise
The pipeline replaces recursive behavior invocation with an explicit iterator (
ContextBag.frame) that gets resumed viaPipelineRunner.Next.Each behavior receives
nextwhich isn't actually the next behavior; it's a callback into the trampoline:When you call
next(context), you're yielding control back to the trampoline viaPipelineRunner.Next.Mechanism
The frame on the context (
ContextBag.framewithIndexandRangeEnd) is the iterator. Callingnext()invokesPipelineRunner.Next()which advances the frame and loops.Stage Connectors: Frame Manipulation
Visual Flow
Why this matters
Without the trampoline, deep pipelines would blow the stack:
Sequence Diagram
sequenceDiagram participant Start as PipelineRunner.Start() participant Init as ctx.Extensions.InitFrame() participant Dispatch as PipelineRunner.Dispatch() participant Invokers as PipelineInvokers.Invoke() participant Behavior as behavior.Invoke() participant Next as PipelineRunner.Next() participant Advance as ctx.Extensions.AdvanceFrame() Note over Start: Frame: {Index: 0, RangeEnd: N} Start->>Init: Initialize iterator Start->>Dispatch: Dispatch(ctx, 0) Dispatch->>Invokers: Invoke(ctx, part) Invokers->>Behavior: Invoke(context, BehaviorNextCache.Next) Note right of Behavior: Behavior does work<br/>then calls await next(context) Behavior->>Next: PipelineRunner.Next(ctx) Note over Next: "next" IS PipelineRunner.Next! Next->>Advance: ctx.Extensions.AdvanceFrame() Note right of Advance: frame.Index++ Advance-->>Next: reachedEnd? alt Not reached end Next->>Dispatch: Dispatch(ctx, nextIndex) Note over Dispatch: Loop continues (trampoline) else End of pipeline Next-->>Behavior: Task.CompletedTask endThe frame in ContextBag is the iterator. Calling
next()yields toPipelineRunner.Next(). The trampoline loop (Dispatch→Invoke→ behavior →Next→Dispatch) replaces recursion.