-
Notifications
You must be signed in to change notification settings - Fork 380
feat: support Lambda Managed Instances (draft) #1067
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: support Lambda Managed Instances (draft) #1067
Conversation
8a991a7 to
db82e4b
Compare
|
Thanks @bnusunny for the approval! Patch: alessandrobologna@a8af68a If you’re OK with that approach, I can push it directly to this PR branch so the checks go green. |
- Add Config.max_concurrency and size runtime HTTP client pool from AWS_LAMBDA_MAX_CONCURRENCY. - Introduce windowed concurrent /next polling with semaphore-limited handler tasks and shutdown coordination. - Require Clone + Send + 'static handlers in lambda-runtime and lambda-http, and make internal layers/HTTP adapters cloneable. - Adjust streaming HTTP to use BoxCloneService and align bounds for concurrent execution. - Add RIE LMI helper (Makefile + test-rie.sh) and minor robustness improvements (Context parsing, basic example error logging). Tests: cargo +stable fmt --all; cargo +stable clippy --all-targets --all-features; cargo +stable test --all (integration test requiring TEST_ENDPOINT not configured); ./scripts/test-rie.sh basic-lambda
- Re-export run_with_streaming_response_concurrent from lambda_http - Remove now-unneeded dead_code allows in streaming helpers - Cfg-gate Duration import behind graceful-shutdown
- Replace windowed polling + per-invocation tasks with N worker tasks, each running its own /next loop (AWS_LAMBDA_MAX_CONCURRENCY). - Share invocation processing between sequential and concurrent paths. - Align graceful shutdown behavior with upstream (run hook then exit). - Add basic-lambda-concurrent example and update RIE helper script. - Clarify run_concurrent docs to describe the worker model.
- remove shutdown signaling so worker failures stay isolated - track remaining workers and return first infra error only when none remain - treat unexpected clean worker exits as infra errors - add deterministic worker-isolation regression test - align concurrent HTTP doc wording with worker-loop model
- update concurrent worker exit warning per review nit - drop redundant .cloned() in ApiGatewayV2 cookies mapping (clippy lint was pre-existing and unrelated to this PR)
d54058c to
f4a7612
Compare
|
Yes, please. |
- drop Config.max_concurrency to avoid semver breakage - add Runtime.concurrency_limit derived from AWS_LAMBDA_MAX_CONCURRENCY - size the API client pool to match the up-front worker allocation - update tests and Runtime literals accordingly
|
Ok, I've pushed the update to keep concurrency settings internal and to avoid the semver bump (so no Config public field needed). I also rebased the branch on current main. |
- Replace env-var based X-Ray header injection with Context.xray_trace_id - Set x-amzn-trace-id in HTTP and streaming adapters before handler call - Remove legacy update_xray_trace_id_header helper from request parsing Tests: - cargo +nightly fmt --all - cargo clippy --workspace --all-features -- -D warnings - cargo +stable test --all-features -p lambda_runtime_api_client -p lambda_runtime -p lambda_http
jlizen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall implementation looks great!
To summarize my feedback:
- I think naming things based on
_concurrentis overly limiting limiting if we expect future features to requireClone, i'd suggest_extendedinstead maybe? And we should probably nudge callers to use_extended,_concurrent,_maybe_concurrent, etc, unless they specifically need!Clonebounds? - The non-
*_concurrentshould have doc comments warning that it ignores concurrency variables and probably an error log or panic if the caller tries to invoke them with concurrency ENV set - I'd like to make it easier to trace per-worker issues by injecting task IDs into both worker loop spans and worker loop results
- I'd prefer to preserve all errors across workers rather than only the first. Also it seems like we don't really need the full-batteries
FuturesUnorderedinstead ofjoin_all? - I don't like the Oncelock that allows showing a single trace message, it seems like it could be simplified?
| /// `/next` polling loop. When the environment variable is unset or `<= 1`, | ||
| /// it falls back to the same sequential behavior as [`run`], so the same | ||
| /// handler can run on both classic Lambda and Lambda Managed Instances. | ||
| pub async fn run_concurrent<R, S, E>(handler: S) -> Result<(), Error> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Question, not sure): If the fallback case if AWS_LAMBDA_MAX_CONCURRENCY if unset is just lambda_runtime::run(), should this be our new suggested entrypoint, with fn run() marked as deprecated? I guess the main issue would be the Clone bounds are more restrictive, so maybe deprecation is a bit aggressive, but we could add that in the run() doc comments?
The regular run() should definitely also add a warning that it WON'T respect AWS_LAMBDA_MAX_CONCURRENCY.
Context: My suspicion is, most consumers don't mind the Clone bound, and the 'principal of least surprise' would be for it to respect the ENV. So we probably should be pointing people to prefer this method where possible.
I would suggest at minimum the following:
- probably this should be named
run_maybe_concurrent? Or mayberun_extendedto avoid a combinatorial explosion of new entrypoint names as featureset grows that needsClone? - We should have a guard clause in
lambda_runtime::run()that throws either either crashes the lambda runtime (preferred), or emits error logs (more conservative), ifAWS_LAMBDA_MAX_CONCURRENCYis set, since that is clearly a misconfiguration that would prove confusing to debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I added a doc warning on run() and a fail‑fast guard when AWS_LAMBDA_MAX_CONCURRENCY is set. I kept the *_concurrent naming for now to limit "code churn", and also because _extended seems to me a little vague. But I am open to change it if there's consensus on renaming all of them to what you suggested.
I have also added a note to the run() docstring to make it more clean that run_concurrent is preferred if the handler can satisfy Clone
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_concurrent is probably ok... ultimately future features that require Clone will likely involve concurrency, so the naming probably makes sense...
lambda-runtime/src/runtime.rs
Outdated
| } | ||
| } | ||
|
|
||
| match first_error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current impl, None should essentially never be reachable, right? Since even if we have no task-specific errors, we instead create an error out of a clean exit?
If the desired API is to return Ok(()) on all workers cleanly exiting, then we could use the enum mentioned above to gather that info, and emit an error log line if all exited cleanly, but then drop the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I am now using the WorkerError and aggregate everything.
For the other point, I currently treat all workers exiting (clean or not) as an error because then the runtime has no workers left and can't serve any more invocations. Returning Ok(()) here would make the runtime exit quietly and hide the failure. But I can change it if you prefer.
lambda-runtime/src/runtime.rs
Outdated
|
|
||
| let mut workers = FuturesUnordered::new(); | ||
| for _ in 1..limit { | ||
| workers.push(tokio::spawn(concurrent_worker_loop( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be worth instrumenting this future with a span containing the taskId? Seems pretty useful for debug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have added a span on the worker loop with the Tokio task id and instrumented the per-invocation processing with it.
lambda-runtime/src/runtime.rs
Outdated
| amzn_trace_env(&invocation.context); | ||
| } else { | ||
| // Inform users that X-Ray is available via context, not env var, in concurrent mode. | ||
| XRAY_LOGGED.get_or_init(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The OnceLock + get_or_init every invocation seems like unnecessary overhead / complexity. Why can't we just display this message once, at the start of run_concurrent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree.
- aggregate worker exits with task IDs and surface clean exits in errors - add per-worker spans and log unexpected exits at error level - guard run() when AWS_LAMBDA_MAX_CONCURRENCY is set; docs prefer run_concurrent - make streaming adapter cloneable for concurrent HTTP path - document pool size guidance and LMI test target; log example via tracing
jlizen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the tweaks! New error handling, removal of XRAY_LOGGED, into_stream_service_cloneable(), etc, look great!
There are a few nits that are totally up to you.
Main load bearing feedback has to do with the usage of Box::pin() for the worker future. If it it turns out to be tricky to remove, that's fine / just say so, we can punt that as it is something we can change later.
I also think having a debug log indicating when sequential fallback is used in run_concurrent, would be pretty handy (ref)
lambda-runtime/src/runtime.rs
Outdated
| let spawn_worker = |service: S, config: Arc<Config>, client: Arc<ApiClient>| -> WorkerJoinFuture { | ||
| let handle = tokio::spawn(concurrent_worker_loop(service, config, client)); | ||
| let task_id = handle.id(); | ||
| Box::pin(async move { (task_id, handle.await) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the extra Box::pin() here? The tokio::task::Id is Unpin? Is there away to avoid this extra allocation / indirection?
Can we just get away with using an unnameable type like we did previously?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I used Box::pin originally just to erase the unnameable async move type for FuturesUnordered, but, as you pointed out, that adds an allocation. I switched to spawning a task that returns (task_id, Result<(), BoxError>), so the set holds a concrete JoinHandle and there’s no Box::pin. For panics I use JoinError::id() a little below.
|
|
||
| // Track the first infrastructure error. A single worker failing should | ||
| // not terminate the whole runtime (LMI keeps running with the remaining | ||
| // Track worker exits across tasks. A single worker failing should not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed comment on this passage
lambda-runtime/src/runtime.rs
Outdated
| remaining_workers, | ||
| "Concurrent worker exited cleanly (unexpected - loop should run forever)" | ||
| "{}", | ||
| clean_exit_msg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i would probably just put this string slice inline since we only use it once
lambda-runtime/src/runtime.rs
Outdated
| "all concurrent workers exited cleanly (unexpected - loop should run forever)" | ||
| )?; | ||
| for task_id in clean { | ||
| write!(f, " [task {task_id}]")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is a bit odd, i would have expected something like [task_1, task_2, ..]
Might be easier with write!(f, " {}", clean.join(", ")) (and more efficient anyway, fewer write calls, though probably we aren't perf sensitive in this code path)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I switched the aggregate error formatting to just be valid JSON so it's easy to query in Log Insights/analytics tools (if the caller logs/prints it). The per‑worker logs are unchanged.
lambda-runtime/src/runtime.rs
Outdated
| if !failures.is_empty() { | ||
| write!(f, "; failures:")?; | ||
| for (task_id, err) in failures { | ||
| write!(f, " [task {task_id}] {err}")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same nit.
Also, I wonder, would it be worth something like: [{"id": "foo", "err": "foo" },..] to make it easier to query against in log insights or similar? Even if you need to regex match only the [..] part of the message, having some signpost that there is an error message is pretty nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, same reasoning here. I ended up switching the aggregate error formatting to valid JSON to keep it simple and queryable, rather than bespoke formatting
| /// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, this returns an error because it does not enable | ||
| /// concurrent polling. Use [`Runtime::run_concurrent`] instead. | ||
| pub async fn run(self) -> Result<(), BoxError> { | ||
| if let Some(raw) = concurrency_env_value() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So perf impact here for non-concurrent path is, exactly one environment variable check? That seems fine, good tradeoff for footgun avoidance.
lambda-runtime/src/runtime.rs
Outdated
| S::Future: Send, | ||
| { | ||
| let task_id = tokio::task::id(); | ||
| let span = info_span!("concurrent_worker_loop", task_id = %task_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: concurrent_worker_loop is a bit verbose to see in all inner log lines, i'd suggest worker probably
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed it just "worker"
lambda-runtime/src/runtime.rs
Outdated
| let task_id = tokio::task::id(); | ||
| let span = info_span!("concurrent_worker_loop", task_id = %task_id); | ||
| loop { | ||
| let event = match next_event_future(client.clone()).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to instrument the hyper future as well? I seem to recall hyper has some sort of tracing feature support that might benefit from task id context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side question - do we really need to clone the client here? Could we just pass it by reference and save the atomic increment? call() just requires &self right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I wrapped the /next request future in the worker span so it inherits the task_id context. I didn't enable hyper's tracing feature (to keep deps minimal, and it seems that tracing in 1.0 is still unstable), but this gives us span context for the request.
Also agreed, I have removed the cloning and passing &ApiClient into next_event_future now.
|
The CI failure related to MSRV seems unrelated... I'll fix separately Ref: #1077 |
- remove Box::pin by returning task results from spawned workers - include JoinError task ids and wrap /next future in worker span - avoid cloning ApiClient in the worker loop - emit aggregate worker errors as valid JSON payloads - add debug log when run_concurrent falls back to sequential polling - shorten worker span name to "worker" for log readability
📬 Issue #: #1065
✍️ Description of changes:
Support Lambda Managed Instances / concurrent runtime (refs #1065)
Note
I originally built this to experiment with Lambda Managed Instances in a test workload and thought it might be a useful starting point for upstream support. Parts of the implementation were drafted with the help of an AI assistant and have only been exercised in my own workloads and the existing test suite so far, so please treat this as a starting point for discussion rather than a final design. A super simple demo/benchmark of this implementation is at https://github.com/alessandrobologna/rust-lmi-demo
This draft PR implements a concurrent runtime mode for Lambda Managed Instances, plus the API changes required in
lambda-runtimeandlambda-http. It preserves existing single-invocation behavior for classic Lambda. The implementation is based on the AWS guidance for building runtimes that support Lambda Managed Instances, as described in Building custom runtimes for Lambda Managed Instances.Architecture (high level)
The runtime chooses between a classic "one invocation at a time" loop and a concurrent mode controlled by
AWS_LAMBDA_MAX_CONCURRENCY:/nextlong-poll loop fetches one event at a time and invokes the handler synchronously, preserving existing behavior.AWS_LAMBDA_MAX_CONCURRENCY >= 2): the runtime spawnsAWS_LAMBDA_MAX_CONCURRENCYworker tasks. Each worker runs its own/nextlong-poll loop and processes invocations sequentially within that task, so overall concurrency is bounded by the number of workers. If a worker crashes, the runtime continues operating with the remaining workers and only terminates if it can't keep at least 1 worker alive.sequenceDiagram participant Lambda as Lambda orchestrator participant Runtime as lambda-runtime participant Worker as Runtime worker task participant API as Runtime API participant Handler as User handler Lambda->>Runtime: Start process (env AWS_LAMBDA_MAX_CONCURRENCY = N) Runtime->>Worker: Spawn N worker tasks loop each worker (identical) loop each invocation Worker->>API: GET /runtime/invocation/next API-->>Worker: Invocation event + context headers Worker->>Handler: Call handler(event, context) Handler-->>Worker: Result or error Worker->>API: POST /response or /error end endBreaking changes & compatibility
lambda_runtime::run,lambda_http::run, andlambda_http::run_with_streaming_response) keep their original signatures and sequential behavior. Handlers that compiled against the current release should continue to compile unchanged.lambda_runtime::run_concurrentlambda_http::run_concurrentlambda_http::run_with_streaming_response_concurrentThese require handler services to implement
Clone + Send + 'static(with responses/stream bodiesSend/Sync + 'static) so they can be safely cloned and driven by the concurrent runtime.AWS_LAMBDA_MAX_CONCURRENCYis read directly by the runtime to decide concurrency and size the HTTP pool, without adding new public fields toConfig(avoids semver breakage for struct literals).AWS_LAMBDA_MAX_CONCURRENCY > 1), the runtime no longer sets_X_AMZN_TRACE_IDin the process environment. The per-invocation X-Ray trace ID is available viaContext::xray_trace_idand tracing spans instead. Sequential mode behavior is unchanged.AWS_LAMBDA_MAX_CONCURRENCYis set changes from "always sequential per environment" to "per-environment concurrency up to that value". Code that continues to call the existingrunfunctions will remain strictly sequential even if the env var is set.In other words: the earlier versions of this branch tightened the bounds on the existing
runfunctions, but after maintainer feedback those entrypoints are left as-is and concurrency is opt-in via the new*_concurrentAPIs.Below is a concise summary of the changes (unfortunately many) by area.
Runtime & Config (
lambda-runtime)AWS_LAMBDA_MAX_CONCURRENCYdirectly inside the runtime to decide whether concurrent mode should be enabled and to size the HTTP client pool, without extendingConfig.Runtime::newnow sizes thelambda_runtime_api_clientHTTP pool frommax_concurrencyso the number of idle connections matches expected concurrency.Runtime::runremains the original sequential/nextloop viarun_with_incoming, preserving existing behavior.Runtime::run_concurrentspawnsAWS_LAMBDA_MAX_CONCURRENCYworker tasks, each running its own/nextloop. Sequential and concurrent modes share the same invocation processing helper, so there are not two separate event loop implementations to maintain. WhenConfig::is_concurrent()is false, it falls back to the same sequential loop asRuntime::run.Err) are reported to Lambda via/invocation/{id}/errorand do not terminate the runtime.api_client,api_response,trace) now implement/deriveCloneso they can be composed into a cloneable service stack for the concurrent entrypoints.Context::newis more robust whenlambda-runtime-client-context/lambda-runtime-cognito-identityheaders are present but empty (treated asNoneinstead of failing JSON parse).concurrent_worker_crash_does_not_stop_other_workers) to verify worker isolation behavior.HTTP & streaming (
lambda-http,lambda-runtime-api-client)lambda_runtime_api_client::Client:with_pool_size(usize)on the builder and threads apool_size: Option<usize>into the Hyper client to setpool_max_idle_per_host.pool_sizeis not provided.lambda_http::runandrun_with_streaming_responsekeep their existing signatures and sequential behavior, delegating tolambda_runtime::run.lambda_http::run_concurrentandlambda_http::run_with_streaming_response_concurrentwrap the same handler types but require them to beClone + Send + 'static(with response/stream bounds aligned tolambda_runtime::run_concurrent) so they can be driven by the concurrent runtime.Adapter,StreamAdapter) are nowClonewhen the inner service isClone, and the streaming path usesBoxCloneServiceinternally for the concurrent entrypoint so the composed service stack can be cloned.run_with_streaming_response_concurrentis exported fromlambda_http, similarly torun_with_streaming_response.Tooling & examples
Makefileandscripts/test-rie.sh:RIE_MAX_CONCURRENCYand atest-rie-lmitarget that runs RIE withAWS_LAMBDA_MAX_CONCURRENCYset, making it easy to exercise managed-instances behavior locally.examples/basic-lambda-concurrent:lambda_runtime::run_concurrentso the concurrent code path can be exercised under RIE/LMI.examples/basic-lambda/src/main.rs:lambda_runtime::run(func).awaitin anif let Err(err)block to log and propagate runtime errors when testing under RIE.Validation
On this branch, I ran:
cargo +nightly fmt --all -- --checkcargo clippy --workspace --all-features -- -D warningscargo +stable test --all-features -p lambda_runtime_api_client -p lambda_runtime -p lambda_httpmake test-rie EXAMPLE=basic-lambdamake test-rie-lmi EXAMPLE=basic-lambda-concurrent(setsAWS_LAMBDA_MAX_CONCURRENCYinside the RIE container)If maintainers prefer, this could be split into smaller PRs (e.g., builder/Config prep, handler
Clonechanges, and finally the concurrent runtime), but this branch shows the full "end-to-end" implementation so that it can be tested with Lambda Managed Instances.🔏 By submitting this pull request
cargo +nightly fmt --all -- --check.cargo clippy --workspace --all-features -- -D warnings.