From 5a7a9f5b1fd9e536898ed05d26dc62ccf69b5a97 Mon Sep 17 00:00:00 2001 From: Ben Lovell Date: Wed, 21 Jan 2026 22:21:24 +0100 Subject: [PATCH 1/3] refactor(runtime): move to single unified execution::App interface This marks the previous lib::App one as deprecated, moves everything over to the new interface from Vim as the default, renames that to be more similar to what we had before + a couple simplifications, and adds an alias to make sure that we don't break the tower-runner --- crates/tower-cmd/src/run.rs | 18 +- crates/tower-runtime/src/errors.rs | 3 + crates/tower-runtime/src/execution.rs | 54 ++++-- crates/tower-runtime/src/lib.rs | 8 +- crates/tower-runtime/src/local.rs | 211 +++++++++++++---------- crates/tower-runtime/src/subprocess.rs | 105 ++--------- crates/tower-runtime/tests/local_test.rs | 6 +- 7 files changed, 195 insertions(+), 210 deletions(-) diff --git a/crates/tower-cmd/src/run.rs b/crates/tower-cmd/src/run.rs index b96f2ad0..a95f0039 100644 --- a/crates/tower-cmd/src/run.rs +++ b/crates/tower-cmd/src/run.rs @@ -18,9 +18,8 @@ use tokio::sync::{ Mutex, }; use tokio::time::{sleep, timeout, Duration}; -use tower_runtime::execution::ExecutionHandle; use tower_runtime::execution::{ - CacheBackend, CacheConfig, CacheIsolation, ExecutionBackend, ExecutionSpec, ResourceLimits, + App as _, Backend, CacheBackend, CacheConfig, CacheIsolation, ExecutionSpec, ResourceLimits, RuntimeConfig as ExecRuntimeConfig, }; use tower_runtime::subprocess::SubprocessBackend; @@ -204,7 +203,7 @@ where // Monitor app status concurrently let handle = Arc::new(Mutex::new(handle)); - let status_task = tokio::spawn(monitor_cli_status(Arc::clone(&handle))); + let status_task = tokio::spawn(monitor_app_status(Arc::clone(&handle))); // Wait for app to complete or SIGTERM let status_result = tokio::select! { @@ -223,6 +222,7 @@ where // And if we crashed, err out match status_result { Status::Exited => output::success("Your local run exited cleanly."), + Status::Cancelled => output::success("Your local run was cancelled."), Status::Crashed { code } => { output::error(&format!("Your local run crashed with exit code: {}", code)); return Err(Error::AppCrashed); @@ -654,12 +654,10 @@ async fn monitor_output(mut output: OutputReceiver) { } } -/// monitor_local_status is a helper function that will monitor the status of a given app and waits for +/// monitor_app_status is a helper function that will monitor the status of a given app and waits for /// it to progress to a terminal state. -async fn monitor_cli_status( - handle: Arc>, -) -> Status { - use tower_runtime::execution::ExecutionHandle as _; +async fn monitor_app_status(handle: Arc>) -> Status { + use tower_runtime::execution::App as _; debug!("Starting status monitoring for CLI execution"); let mut check_count = 0; @@ -683,6 +681,10 @@ async fn monitor_cli_status( debug!("Run exited cleanly, stopping status monitoring"); return status; } + Status::Cancelled => { + debug!("Run was cancelled, stopping status monitoring"); + return status; + } Status::Crashed { .. } => { debug!("Run crashed, stopping status monitoring"); return status; diff --git a/crates/tower-runtime/src/errors.rs b/crates/tower-runtime/src/errors.rs index 0326cd27..e7c7eacd 100644 --- a/crates/tower-runtime/src/errors.rs +++ b/crates/tower-runtime/src/errors.rs @@ -76,6 +76,9 @@ pub enum Error { #[snafu(display("dependency installation failed"))] DependencyInstallationFailed, + + #[snafu(display("failed to wait for process: {message}"))] + ProcessWaitFailed { message: String }, } impl From for Error { diff --git a/crates/tower-runtime/src/execution.rs b/crates/tower-runtime/src/execution.rs index 47d6dd26..7fb0dabf 100644 --- a/crates/tower-runtime/src/execution.rs +++ b/crates/tower-runtime/src/execution.rs @@ -149,17 +149,19 @@ pub struct NetworkingSpec { } // ============================================================================ -// Execution Backend Trait +// Backend Trait // ============================================================================ -/// ExecutionBackend abstracts the compute substrate +/// Backend creates App instances for a specific compute substrate. +/// +/// Implementations: SubprocessBackend (subprocess), K8sBackend (Kubernetes) #[async_trait] -pub trait ExecutionBackend: Send + Sync { - /// The handle type this backend returns - type Handle: ExecutionHandle; +pub trait Backend: Send + Sync { + /// The App type this backend creates + type App: App; - /// Create a new execution environment - async fn create(&self, spec: ExecutionSpec) -> Result; + /// Create a new app execution + async fn create(&self, spec: ExecutionSpec) -> Result; /// Get backend capabilities fn capabilities(&self) -> BackendCapabilities; @@ -195,13 +197,15 @@ pub struct BackendCapabilities { } // ============================================================================ -// Execution Handle Trait +// App Trait // ============================================================================ -/// ExecutionHandle represents a running execution +/// App represents a running Tower application instance. +/// +/// Implementations: LocalApp (subprocess), K8sApp (Kubernetes pod) #[async_trait] -pub trait ExecutionHandle: Send + Sync { - /// Get a unique identifier for this execution +pub trait App: Send + Sync { + /// Unique identifier for this execution fn id(&self) -> &str; /// Get current execution status @@ -210,20 +214,26 @@ pub trait ExecutionHandle: Send + Sync { /// Subscribe to log stream async fn logs(&self) -> Result; - /// Terminate execution gracefully + /// Terminate execution gracefully (SIGTERM equivalent) async fn terminate(&mut self) -> Result<(), Error>; - /// Force kill execution - async fn kill(&mut self) -> Result<(), Error>; + /// Force kill execution (SIGKILL equivalent) + async fn kill(&mut self) -> Result<(), Error> { + self.terminate().await // default: same as terminate + } /// Wait for execution to complete async fn wait_for_completion(&self) -> Result; - /// Get service endpoint - async fn service_endpoint(&self) -> Result, Error>; + /// Get service endpoint (for long-running apps) + async fn service_endpoint(&self) -> Result, Error> { + Ok(None) // default: no endpoint + } /// Cleanup resources - async fn cleanup(&mut self) -> Result<(), Error>; + async fn cleanup(&mut self) -> Result<(), Error> { + self.terminate().await // default: just terminate + } } /// ServiceEndpoint describes how to reach a running service @@ -241,3 +251,13 @@ pub struct ServiceEndpoint { /// Full URL if applicable (e.g., "http://app-run-123.default.svc.cluster.local:8080") pub url: Option, } + +// ============================================================================ +// Deprecated Aliases (for backwards compatibility with tower-runner) +// ============================================================================ + +#[deprecated(note = "use `Backend` instead")] +pub use self::Backend as ExecutionBackend; + +#[deprecated(note = "use `App` instead")] +pub use self::App as ExecutionHandle; diff --git a/crates/tower-runtime/src/lib.rs b/crates/tower-runtime/src/lib.rs index 74091edc..f5f1bd97 100644 --- a/crates/tower-runtime/src/lib.rs +++ b/crates/tower-runtime/src/lib.rs @@ -11,6 +11,9 @@ pub mod execution; pub mod local; pub mod subprocess; +// Re-export SubprocessBackend from subprocess module +pub use subprocess::SubprocessBackend; + use errors::Error; #[derive(Copy, Clone)] @@ -41,6 +44,7 @@ pub enum Status { None, Running, Exited, + Cancelled, Crashed { code: i32 }, } @@ -48,16 +52,14 @@ pub type OutputReceiver = UnboundedReceiver; pub type OutputSender = UnboundedSender; +#[deprecated(note = "use `execution::App` instead")] pub trait App: Send + Sync { - // start will start the process fn start(opts: StartOptions) -> impl Future> + Send where Self: Sized; - // terminate will terminate the subprocess fn terminate(&mut self) -> impl Future> + Send; - // status checks the status of an app fn status(&self) -> impl Future> + Send; } diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index ec2ea64f..97faaf15 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use std::collections::HashMap; use std::env; use std::path::PathBuf; @@ -33,20 +34,19 @@ use tower_package::{Manifest, Package}; use tower_telemetry::debug; use tower_uv::Uv; -use crate::{App, Channel, Output, FD}; +use crate::execution::App; +use crate::{Channel, Output, OutputReceiver, FD}; + +type Completion = Result; pub struct LocalApp { + id: String, status: Mutex>, - - // waiter is what we use to communicate that the overall process is finished by the execution - // handle. - waiter: Mutex>, - - // terminator is what we use to flag that we want to terminate the child process. + completion_receiver: Mutex>, terminator: CancellationToken, - - // execute_handle keeps track of the current state of the execution lifecycle. - execute_handle: Option>>, + task: Option>>, + output_receiver: Mutex>, + _package: Option, } // Helper function to check if a file is executable @@ -101,7 +101,7 @@ async fn find_bash() -> Result { async fn execute_local_app( opts: StartOptions, - sx: oneshot::Sender, + tx: oneshot::Sender, cancel_token: CancellationToken, ) -> Result<(), Error> { let ctx = opts.ctx.clone(); @@ -159,7 +159,7 @@ async fn execute_local_app( if cancel_token.is_cancelled() { // if there's a waiter, we want them to know that the process was cancelled so we have // to return something on the relevant channel. - let _ = sx.send(-1); + let _ = tx.send(Ok(Status::Cancelled)); return Err(Error::Cancelled); } @@ -176,7 +176,7 @@ async fn execute_local_app( ) .await?; - let _ = sx.send(wait_for_process(ctx.clone(), &cancel_token, child).await); + let _ = tx.send(wait_for_process(ctx.clone(), &cancel_token, child).await); } else { // we put Uv in to protected mode when there's no caching configured/enabled. let protected_mode = opts.cache_dir.is_none(); @@ -198,7 +198,7 @@ async fn execute_local_app( // ensure everything is in place. if cancel_token.is_cancelled() { // again tell any waiters that we cancelled. - let _ = sx.send(-1); + let _ = tx.send(Ok(Status::Cancelled)); return Err(Error::Cancelled); } @@ -222,19 +222,19 @@ async fn execute_local_app( )); // Wait for venv to finish up. - let res = wait_for_process(ctx.clone(), &cancel_token, child).await; - - if res != 0 { - // If the venv process failed, we want to return an error. - let _ = sx.send(res); - return Err(Error::VirtualEnvCreationFailed); + match wait_for_process(ctx.clone(), &cancel_token, child).await { + Ok(Status::Exited) => {} + res => { + let _ = tx.send(res); + return Err(Error::VirtualEnvCreationFailed); + } } // Check once more if the process was cancelled before we do a uv sync. The sync itself, // once started, will take a while and we have logic for checking for cancellation. if cancel_token.is_cancelled() { // again tell any waiters that we cancelled. - let _ = sx.send(-1); + let _ = tx.send(Ok(Status::Cancelled)); return Err(Error::Cancelled); } @@ -275,12 +275,13 @@ async fn execute_local_app( )); // Let's wait for the setup to finish. We don't care about the results. - let res = wait_for_process(ctx.clone(), &cancel_token, child).await; - - if res != 0 { + match wait_for_process(ctx.clone(), &cancel_token, child).await { + Ok(Status::Exited) => {} // If the sync process failed, we want to return an error. - let _ = sx.send(res); - return Err(Error::DependencyInstallationFailed); + res => { + let _ = tx.send(res); + return Err(Error::DependencyInstallationFailed); + } } } } @@ -289,7 +290,7 @@ async fn execute_local_app( if cancel_token.is_cancelled() { // if there's a waiter, we want them to know that the process was cancelled so we have // to return something on the relevant channel. - let _ = sx.send(-1); + let _ = tx.send(Ok(Status::Cancelled)); return Err(Error::Cancelled); } @@ -312,7 +313,7 @@ async fn execute_local_app( BufReader::new(stderr), )); - let _ = sx.send(wait_for_process(ctx.clone(), &cancel_token, child).await); + let _ = tx.send(wait_for_process(ctx.clone(), &cancel_token, child).await); } // Everything was properly executed I suppose. @@ -324,70 +325,105 @@ impl Drop for LocalApp { // CancellationToken::cancel() is not async self.terminator.cancel(); - // Optionally spawn a task to wait for the handle - if let Some(execute_handle) = self.execute_handle.take() { - if let Ok(handle) = Handle::try_current() { - handle.spawn(async move { - let _ = execute_handle.await; + // Optionally spawn a task to wait for execution to complete + if let Some(task) = self.task.take() { + if let Ok(rt) = Handle::try_current() { + rt.spawn(async move { + let _ = task.await; }); } } } } -impl App for LocalApp { - async fn start(opts: StartOptions) -> Result { +impl LocalApp { + /// Create a new LocalApp with the given ID and StartOptions. + /// + /// The `output_receiver` parameter is optional - when provided (via Backend interface), + /// the `logs()` method will return this receiver. When None (legacy interface), + /// logs are sent to the output_sender in StartOptions and `logs()` returns an empty stream. + /// + /// The `package` parameter keeps the package (and its temp directory) alive for the + /// duration of the execution. + pub async fn new( + id: String, + opts: StartOptions, + output_receiver: Option, + package: Option, + ) -> Result { let terminator = CancellationToken::new(); - - let (sx, rx) = oneshot::channel::(); - let waiter = Mutex::new(rx); - - let handle = tokio::spawn(execute_local_app(opts, sx, terminator.clone())); - let execute_handle = Some(handle); + let (tx, rx) = oneshot::channel::(); + let task = tokio::spawn(execute_local_app(opts, tx, terminator.clone())); Ok(Self { - execute_handle, + id, + task: Some(task), terminator, - waiter, + completion_receiver: Mutex::new(rx), status: Mutex::new(None), + output_receiver: Mutex::new(output_receiver), + _package: package, }) } + /// Create a LocalApp using the legacy start() interface (for backward compatibility). + /// + /// Output is sent to the output_sender in StartOptions. The `logs()` method + /// will return an empty stream (use the output_sender's receiver directly). + pub async fn start(opts: StartOptions) -> Result { + Self::new("local".to_string(), opts, None, None).await + } +} + +#[async_trait] +impl App for LocalApp { + fn id(&self) -> &str { + &self.id + } + + async fn status(&self) -> Result { + let mut status = self.status.lock().await; + + if let Some(status) = *status { + return Ok(status); + } + + match self.completion_receiver.lock().await.try_recv() { + Err(TryRecvError::Empty) => Ok(Status::Running), + Err(TryRecvError::Closed) => Err(Error::WaiterClosed), + Ok(completion) => { + let next_status = completion?; + *status = Some(next_status); + Ok(next_status) + } + } + } + + async fn logs(&self) -> Result { + // Take the receiver (can only be called once meaningfully) + // Returns empty channel if already taken or using legacy interface + let (_, empty) = tokio::sync::mpsc::unbounded_channel(); + Ok(self.output_receiver.lock().await.take().unwrap_or(empty)) + } + async fn terminate(&mut self) -> Result<(), Error> { self.terminator.cancel(); - // Now we should wait for the join handle to finish. - if let Some(execute_handle) = self.execute_handle.take() { - let _ = execute_handle.await; - self.execute_handle = None; + if let Some(task) = self.task.take() { + let _ = task.await; } Ok(()) } - async fn status(&self) -> Result { - let mut status = self.status.lock().await; - - if let Some(status) = *status { - Ok(status) - } else { - let mut waiter = self.waiter.lock().await; - let res = waiter.try_recv(); - - match res { - Err(TryRecvError::Empty) => Ok(Status::Running), - Err(TryRecvError::Closed) => Err(Error::WaiterClosed), - Ok(t) => { - // We save this for the next time this gets called. - if t == 0 { - *status = Some(Status::Exited); - Ok(Status::Exited) - } else { - let next_status = Status::Crashed { code: t }; - *status = Some(next_status); - Ok(next_status) - } + async fn wait_for_completion(&self) -> Result { + loop { + let status = self.status().await?; + match status { + Status::None | Status::Running => { + tokio::time::sleep(Duration::from_millis(100)).await; } + _ => return Ok(status), } } } @@ -536,31 +572,32 @@ async fn wait_for_process( ctx: tower_telemetry::Context, cancel_token: &CancellationToken, mut child: Child, -) -> i32 { - let code = loop { +) -> Completion { + loop { if cancel_token.is_cancelled() { debug!(ctx: &ctx, "process cancelled, terminating child process"); kill_child_process(&ctx, child).await; - break -1; // return -1 to indicate that the process was cancelled. + return Ok(Status::Cancelled); } - let timeout = timeout(Duration::from_millis(25), child.wait()).await; - - if let Ok(res) = timeout { - if let Ok(status) = res { - break status.code().expect("no status code"); - } else { - // something went wrong. - debug!(ctx: &ctx, "failed to get status due to some kind of IO error: {}" , res.err().expect("no error somehow")); - break -1; + match timeout(Duration::from_millis(25), child.wait()).await { + Err(_) => continue, // timeout, check cancellation again + Ok(Err(e)) => { + debug!(ctx: &ctx, "IO error waiting on child process: {}", e); + return Err(Error::ProcessWaitFailed { + message: e.to_string(), + }); + } + Ok(Ok(status)) => { + let code = status.code().expect("process should have exit code"); + debug!(ctx: &ctx, "process exited with code {}", code); + return Ok(match code { + 0 => Status::Exited, + _ => Status::Crashed { code }, + }); } } - }; - - debug!(ctx: &ctx, "process exited with code {}", code); - - // this just shuts up the compiler about ignoring the results. - code + } } async fn drain_output( diff --git a/crates/tower-runtime/src/subprocess.rs b/crates/tower-runtime/src/subprocess.rs index 713722f2..32ebfa19 100644 --- a/crates/tower-runtime/src/subprocess.rs +++ b/crates/tower-runtime/src/subprocess.rs @@ -1,23 +1,17 @@ -//! Subprocess execution backend +//! Local subprocess execution backend use crate::errors::Error; -use crate::execution::{ - BackendCapabilities, CacheBackend, ExecutionBackend, ExecutionHandle, ExecutionSpec, - ServiceEndpoint, -}; +use crate::execution::{Backend, BackendCapabilities, CacheBackend, ExecutionSpec}; use crate::local::LocalApp; -use crate::{App, OutputReceiver, StartOptions, Status}; +use crate::StartOptions; use async_trait::async_trait; use std::path::PathBuf; -use std::sync::Arc; use tokio::fs::File; use tokio::io::AsyncWriteExt; -use tokio::sync::Mutex; -use tokio::time::Duration; use tower_package::Package; -/// SubprocessBackend executes apps as a subprocess +/// SubprocessBackend executes apps as local subprocesses pub struct SubprocessBackend { /// Optional default cache directory to use cache_dir: Option, @@ -65,10 +59,10 @@ impl SubprocessBackend { } #[async_trait] -impl ExecutionBackend for SubprocessBackend { - type Handle = SubprocessHandle; +impl Backend for SubprocessBackend { + type App = LocalApp; - async fn create(&self, spec: ExecutionSpec) -> Result { + async fn create(&self, spec: ExecutionSpec) -> Result { // Convert ExecutionSpec to StartOptions for LocalApp let (output_sender, output_receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -94,19 +88,13 @@ impl ExecutionBackend for SubprocessBackend { secrets: spec.secrets, parameters: spec.parameters, env_vars: spec.env_vars, - output_sender: output_sender.clone(), + output_sender, cache_dir, }; - // Start the LocalApp - let app = LocalApp::start(opts).await?; - - Ok(SubprocessHandle { - id: spec.id, - app: Arc::new(Mutex::new(app)), - output_receiver: Arc::new(Mutex::new(output_receiver)), - _package: package, // Keep package alive so temp dir doesn't get cleaned up - }) + // Start the LocalApp with the execution ID, output receiver, and package + // The package is kept alive so the temp dir doesn't get cleaned up + LocalApp::new(spec.id, opts, Some(output_receiver), Some(package)).await } fn capabilities(&self) -> BackendCapabilities { @@ -127,74 +115,3 @@ impl ExecutionBackend for SubprocessBackend { Ok(()) } } - -/// SubprocessHandle provides lifecycle management for a subprocess execution -pub struct SubprocessHandle { - id: String, - app: Arc>, - output_receiver: Arc>, - _package: Package, // Keep package alive to prevent temp dir cleanup -} - -#[async_trait] -impl ExecutionHandle for SubprocessHandle { - fn id(&self) -> &str { - &self.id - } - - async fn status(&self) -> Result { - let app = self.app.lock().await; - app.status().await - } - - async fn logs(&self) -> Result { - // Create a new channel for log streaming - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - - // Spawn a task to forward Output from the internal receiver - let output_receiver = self.output_receiver.clone(); - tokio::spawn(async move { - let mut receiver = output_receiver.lock().await; - while let Some(output) = receiver.recv().await { - if tx.send(output).is_err() { - break; // Receiver dropped - } - } - }); - - Ok(rx) - } - - async fn terminate(&mut self) -> Result<(), Error> { - let mut app = self.app.lock().await; - app.terminate().await - } - - async fn kill(&mut self) -> Result<(), Error> { - // For local processes, kill is the same as terminate - self.terminate().await - } - - async fn wait_for_completion(&self) -> Result { - loop { - let status = self.status().await?; - match status { - Status::None | Status::Running => { - tokio::time::sleep(Duration::from_millis(100)).await; - } - _ => return Ok(status), - } - } - } - - async fn service_endpoint(&self) -> Result, Error> { - // Local backend doesn't support service endpoints - Ok(None) - } - - async fn cleanup(&mut self) -> Result<(), Error> { - // Ensure the app is terminated - self.terminate().await?; - Ok(()) - } -} diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs index 02069308..2ce0e1f1 100644 --- a/crates/tower-runtime/tests/local_test.rs +++ b/crates/tower-runtime/tests/local_test.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use std::path::PathBuf; -use tower_runtime::{local::LocalApp, App, StartOptions, Status}; +use tower_runtime::execution::App as _; +use tower_runtime::{local::LocalApp, StartOptions, Status}; use config::Towerfile; use tower_package::{Package, PackageSpec}; @@ -373,5 +374,8 @@ async fn test_abort_on_dependency_installation_failure() { Status::None => { panic!("App should have a status"); } + Status::Cancelled => { + panic!("App should not have been cancelled"); + } } } From ab715718ca70b0ddeb5ecf8189b6f346d1b7b474 Mon Sep 17 00:00:00 2001 From: Ben Lovell Date: Wed, 21 Jan 2026 22:23:14 +0100 Subject: [PATCH 2/3] chore: be more anal about .expect grammar --- crates/tower-runtime/src/local.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 97faaf15..19b9a51c 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -205,7 +205,7 @@ async fn execute_local_app( let mut child = uv.venv(&working_dir, &env_vars).await?; // Drain the logs to the output channel. - let stdout = child.stdout.take().expect("no stdout"); + let stdout = child.stdout.take().expect("stdout should be available"); tokio::spawn(drain_output( FD::Stdout, Channel::Setup, @@ -213,7 +213,7 @@ async fn execute_local_app( BufReader::new(stdout), )); - let stderr = child.stderr.take().expect("no stderr"); + let stderr = child.stderr.take().expect("stderr should be available"); tokio::spawn(drain_output( FD::Stderr, Channel::Setup, @@ -258,7 +258,7 @@ async fn execute_local_app( } Ok(mut child) => { // Drain the logs to the output channel. - let stdout = child.stdout.take().expect("no stdout"); + let stdout = child.stdout.take().expect("stdout should be available"); tokio::spawn(drain_output( FD::Stdout, Channel::Setup, @@ -266,7 +266,7 @@ async fn execute_local_app( BufReader::new(stdout), )); - let stderr = child.stderr.take().expect("no stderr"); + let stderr = child.stderr.take().expect("stderr should be available"); tokio::spawn(drain_output( FD::Stderr, Channel::Setup, @@ -297,7 +297,7 @@ async fn execute_local_app( let mut child = uv.run(&working_dir, &program_path, &env_vars).await?; // Drain the logs to the output channel. - let stdout = child.stdout.take().expect("no stdout"); + let stdout = child.stdout.take().expect("stdout should be available"); tokio::spawn(drain_output( FD::Stdout, Channel::Program, @@ -305,7 +305,7 @@ async fn execute_local_app( BufReader::new(stdout), )); - let stderr = child.stderr.take().expect("no stderr"); + let stderr = child.stderr.take().expect("stderr should be available"); tokio::spawn(drain_output( FD::Stderr, Channel::Program, @@ -608,7 +608,7 @@ async fn drain_output( ) { let mut lines = input.lines(); - while let Some(line) = lines.next_line().await.expect("line iteration fialed") { + while let Some(line) = lines.next_line().await.expect("line iteration should succeed") { let _ = output.send(Output { channel, fd, From 290029a5847d63c35efe9c0025afa03f7497281b Mon Sep 17 00:00:00 2001 From: Ben Lovell Date: Wed, 21 Jan 2026 22:48:52 +0100 Subject: [PATCH 3/3] fix: SIGKILL on process shouldn't panic the handle --- crates/tower-runtime/src/local.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 19b9a51c..00d572b2 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -589,11 +589,11 @@ async fn wait_for_process( }); } Ok(Ok(status)) => { - let code = status.code().expect("process should have exit code"); - debug!(ctx: &ctx, "process exited with code {}", code); - return Ok(match code { - 0 => Status::Exited, - _ => Status::Crashed { code }, + debug!(ctx: &ctx, "process exited: {:?}", status); + return Ok(match status.code() { + Some(0) => Status::Exited, + Some(code) => Status::Crashed { code }, + None => Status::Cancelled, // killed by signal }); } }