diff --git a/Cargo.lock b/Cargo.lock index 675e212f..5ea25d7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -168,6 +168,17 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "async_zip" version = "0.0.16" @@ -2789,18 +2800,28 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -3605,7 +3626,6 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tar", - "tokio-util", "tower-telemetry", ] @@ -3613,10 +3633,12 @@ dependencies = [ name = "tower-runtime" version = "0.3.43" dependencies = [ + "async-trait", "chrono", "config", "nix 0.30.1", "snafu", + "tmpdir", "tokio", "tokio-util", "tower-package", @@ -3862,9 +3884,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" +checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" dependencies = [ "getrandom 0.3.3", "js-sys", diff --git a/Cargo.toml b/Cargo.toml index da4f236a..a6fc1635 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ repository = "https://github.com/tower/tower-cli" aes-gcm = "0.10" anyhow = "1.0.95" async-compression = { version = "0.4", features = ["tokio", "gzip"] } +async-trait = "0.1.89" async_zip = { version = "0.0.16", features = ["tokio", "tokio-fs", "deflate"] } axum = "0.8.4" base64 = "0.22" diff --git a/crates/config/src/session.rs b/crates/config/src/session.rs index 53830ccf..f7a22b07 100644 --- a/crates/config/src/session.rs +++ b/crates/config/src/session.rs @@ -21,7 +21,9 @@ fn extract_aid_from_jwt(jwt: &str) -> Option { let payload = parts[1]; let decoded = URL_SAFE_NO_PAD.decode(payload).ok()?; let json: serde_json::Value = serde_json::from_slice(&decoded).ok()?; - json.get("https://tower.dev/aid")?.as_str().map(String::from) + json.get("https://tower.dev/aid")? + .as_str() + .map(String::from) } const DEFAULT_TOWER_URL: &str = "https://api.tower.dev"; diff --git a/crates/tower-cmd/src/apps.rs b/crates/tower-cmd/src/apps.rs index a95a6251..31652eea 100644 --- a/crates/tower-cmd/src/apps.rs +++ b/crates/tower-cmd/src/apps.rs @@ -519,11 +519,11 @@ fn should_notify_run_wait(already_notified: bool, elapsed: Duration) -> bool { #[cfg(test)] mod tests { + use super::is_run_finished; use super::{ apps_cmd, is_run_started, next_backoff, should_emit_line, should_notify_run_wait, stream_logs_until_complete, LogFollowOutcome, FOLLOW_BACKOFF_INITIAL, FOLLOW_BACKOFF_MAX, }; - use super::is_run_finished; use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; use tower_api::models::run::Status; @@ -613,12 +613,18 @@ mod tests { #[test] fn test_run_wait_notification_logic() { - assert!(!should_notify_run_wait(true, super::RUN_START_MESSAGE_DELAY)); + assert!(!should_notify_run_wait( + true, + super::RUN_START_MESSAGE_DELAY + )); assert!(!should_notify_run_wait( false, super::RUN_START_MESSAGE_DELAY - Duration::from_millis(1) )); - assert!(should_notify_run_wait(false, super::RUN_START_MESSAGE_DELAY)); + assert!(should_notify_run_wait( + false, + super::RUN_START_MESSAGE_DELAY + )); } #[tokio::test] diff --git a/crates/tower-cmd/src/mcp.rs b/crates/tower-cmd/src/mcp.rs index eeabc1d9..af0a5b27 100644 --- a/crates/tower-cmd/src/mcp.rs +++ b/crates/tower-cmd/src/mcp.rs @@ -702,9 +702,7 @@ impl TowerService { } } - #[tool( - description = "Read and parse Towerfile configuration. Optional: working_directory." - )] + #[tool(description = "Read and parse Towerfile configuration. Optional: working_directory.")] async fn tower_file_read( &self, Parameters(request): Parameters, @@ -778,9 +776,7 @@ impl TowerService { } } - #[tool( - description = "Validate Towerfile configuration. Optional: working_directory." - )] + #[tool(description = "Validate Towerfile configuration. Optional: working_directory.")] async fn tower_file_validate( &self, Parameters(request): Parameters, diff --git a/crates/tower-cmd/src/run.rs b/crates/tower-cmd/src/run.rs index 50a3de07..b96f2ad0 100644 --- a/crates/tower-cmd/src/run.rs +++ b/crates/tower-cmd/src/run.rs @@ -3,20 +3,27 @@ use clap::{Arg, ArgMatches, Command}; use config::{Config, Towerfile}; use std::collections::HashMap; use std::path::PathBuf; +use tokio::fs::File; use tower_api::models::Run; use tower_package::{Package, PackageSpec}; -use tower_runtime::{local::LocalApp, App, AppLauncher, OutputReceiver, Status}; +use tower_runtime::{OutputReceiver, Status}; use tower_telemetry::{debug, Context}; +use crate::{api, output, util::dates}; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::{ - mpsc::{unbounded_channel, Receiver as MpscReceiver}, + mpsc::Receiver as MpscReceiver, oneshot::{self, Receiver as OneshotReceiver}, Mutex, }; use tokio::time::{sleep, timeout, Duration}; - -use crate::{api, output, util::dates}; +use tower_runtime::execution::ExecutionHandle; +use tower_runtime::execution::{ + CacheBackend, CacheConfig, CacheIsolation, ExecutionBackend, ExecutionSpec, ResourceLimits, + RuntimeConfig as ExecRuntimeConfig, +}; +use tower_runtime::subprocess::SubprocessBackend; pub fn run_cmd() -> Command { Command::new("run") @@ -148,7 +155,7 @@ where env_vars.insert("TOWER_URL".to_string(), config.tower_url.to_string()); // There should always be a session, if there isn't one then I'm not sure how we got here? - let session = config.session.ok_or(Error::NoSession)?; + let session = config.session.as_ref().ok_or(Error::NoSession)?; env_vars.insert("TOWER_JWT".to_string(), session.token.jwt.to_string()); @@ -162,34 +169,42 @@ where } } - // Build the package - let mut package = build_package(&towerfile).await?; - - // Unpack the package - package.unpack().await?; - - let (sender, receiver) = unbounded_channel(); - + // Build the package (creates tar.gz) + let package = build_package(&towerfile).await?; output::success(&format!("Launching app `{}`", towerfile.app.name)); - let output_task = tokio::spawn(output_handler(receiver)); - let mut launcher: AppLauncher = AppLauncher::default(); - launcher - .launch( - Context::new(), - sender, - package, - env.to_string(), - secrets, + // Open the tar.gz file as a stream + let package_path = package + .package_file_path + .as_ref() + .expect("Package must have a file path"); + let package_file = File::open(package_path).await?; + + let backend = SubprocessBackend::new(config.cache_dir.clone()); + let run_id = format!( + "cli-run-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + ); + let handle = backend + .create(build_cli_execution_spec( + config, + env, params, + secrets, env_vars, - config.cache_dir, - ) + package_file, + run_id, + )) .await?; + let receiver = handle.logs().await?; + let output_task = tokio::spawn(output_handler(receiver)); - // Monitor app output and status concurrently - let app = Arc::new(Mutex::new(launcher.app.unwrap())); - let status_task = tokio::spawn(monitor_local_status(Arc::clone(&app))); + // Monitor app status concurrently + let handle = Arc::new(Mutex::new(handle)); + let status_task = tokio::spawn(monitor_cli_status(Arc::clone(&handle))); // Wait for app to complete or SIGTERM let status_result = tokio::select! { @@ -199,7 +214,7 @@ where }, _ = tokio::signal::ctrl_c(), if !output::get_output_mode().is_mcp() => { output::write("\nReceived Ctrl+C, stopping local run...\n"); - app.lock().await.terminate().await.ok(); + handle.lock().await.terminate().await.ok(); return Ok(output_task.await.unwrap()); } }; @@ -222,6 +237,52 @@ where Ok(final_result) } +fn build_cli_execution_spec( + config: Config, + env: &str, + params: HashMap, + secrets: HashMap, + env_vars: HashMap, + package_stream: File, + run_id: String, +) -> ExecutionSpec { + let spec = ExecutionSpec { + id: run_id, + package_stream: Box::new(package_stream), + runtime: ExecRuntimeConfig { + image: "local".to_string(), + version: None, + cache: CacheConfig { + enable_bundle_cache: true, + enable_runtime_cache: true, + enable_dependency_cache: true, + backend: match config.cache_dir.clone() { + Some(dir) => CacheBackend::Local { cache_dir: dir }, + None => CacheBackend::None, + }, + isolation: CacheIsolation::None, + }, + entrypoint: None, + command: None, + }, + environment: env.to_string(), + secrets, + parameters: params, + env_vars, + resources: ResourceLimits { + cpu_millicores: None, + memory_mb: None, + storage_mb: None, + max_pids: None, + gpu_count: 0, + timeout_seconds: 3600, + }, + networking: None, + telemetry_ctx: Context::new(), + }; + spec +} + /// do_run_local is the entrypoint for running an app locally. It will load the Towerfile, build /// the package, and launch the app. The relevant package is cleaned up after execution is /// complete. @@ -595,8 +656,12 @@ async fn monitor_output(mut output: OutputReceiver) { /// monitor_local_status is a helper function that will monitor the status of a given app and waits for /// it to progress to a terminal state. -async fn monitor_local_status(app: Arc>) -> Status { - debug!("Starting status monitoring for LocalApp"); +async fn monitor_cli_status( + handle: Arc>, +) -> Status { + use tower_runtime::execution::ExecutionHandle as _; + + debug!("Starting status monitoring for CLI execution"); let mut check_count = 0; let mut err_count = 0; @@ -604,11 +669,11 @@ async fn monitor_local_status(app: Arc>) -> Status { check_count += 1; debug!( - "Status check #{}, attempting to get app status", + "Status check #{}, attempting to get CLI handle status", check_count ); - match app.lock().await.status().await { + match handle.lock().await.status().await { Ok(status) => { // We reset the error count to indicate that we can intermittently get statuses. err_count = 0; @@ -616,30 +681,27 @@ async fn monitor_local_status(app: Arc>) -> Status { match status { Status::Exited => { debug!("Run exited cleanly, stopping status monitoring"); - - // We're done. Exit this loop and function. return status; } Status::Crashed { .. } => { debug!("Run crashed, stopping status monitoring"); - - // We're done. Exit this loop and function. return status; } _ => { + debug!("Handle status: other, continuing to monitor"); sleep(Duration::from_millis(100)).await; } } } Err(e) => { - debug!("Failed to get app status: {:?}", e); + debug!("Failed to get handle status: {:?}", e); err_count += 1; // If we get five errors in a row, we abandon monitoring. if err_count >= 5 { - debug!("Failed to get app status after 5 attempts, giving up"); + debug!("Failed to get handle status after 5 attempts, giving up"); output::error("An error occured while monitoring your local run status!"); - return tower_runtime::Status::Crashed { code: -1 }; + return Status::Crashed { code: -1 }; } // Otherwise, keep on keepin' on. diff --git a/crates/tower-package/Cargo.toml b/crates/tower-package/Cargo.toml index 98efbaa4..1e302058 100644 --- a/crates/tower-package/Cargo.toml +++ b/crates/tower-package/Cargo.toml @@ -10,13 +10,12 @@ license = { workspace = true } async-compression = { workspace = true } config = { workspace = true } glob = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } sha2 = { workspace = true } -snafu = { workspace = true } +snafu = { workspace = true } tmpdir = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true } tokio-stream = { workspace = true } tokio-tar = { workspace = true } -tokio-util = { workspace = true } tower-telemetry = { workspace = true } diff --git a/crates/tower-runtime/Cargo.toml b/crates/tower-runtime/Cargo.toml index 8ab42fb5..37b47f62 100644 --- a/crates/tower-runtime/Cargo.toml +++ b/crates/tower-runtime/Cargo.toml @@ -7,14 +7,16 @@ rust-version = { workspace = true } license = { workspace = true } [dependencies] +async-trait = { workspace = true } chrono = { workspace = true } nix = { workspace = true } -snafu = { workspace = true } +snafu = { workspace = true } +tmpdir = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } -tower-package = { workspace = true } -tower-telemetry = { workspace = true } -tower-uv = { workspace = true } +tower-package = { workspace = true } +tower-telemetry = { workspace = true } +tower-uv = { workspace = true } [dev-dependencies] config = { workspace = true } diff --git a/crates/tower-runtime/src/errors.rs b/crates/tower-runtime/src/errors.rs index eae04338..0326cd27 100644 --- a/crates/tower-runtime/src/errors.rs +++ b/crates/tower-runtime/src/errors.rs @@ -17,14 +17,14 @@ pub enum Error { #[snafu(display("not implemented"))] NotImplemented, - #[snafu(display("bundle download failed"))] - BundleDownloadFailed, + #[snafu(display("package download failed"))] + PackageDownloadFailed, - #[snafu(display("bundle create failed"))] - BundleCreateFailed, + #[snafu(display("package create failed"))] + PackageCreateFailed, - #[snafu(display("bundle unpack failed"))] - BundleUnpackFailed, + #[snafu(display("package unpack failed"))] + PackageUnpackFailed, #[snafu(display("container already initialized"))] AlreadyInitialized, @@ -65,6 +65,15 @@ pub enum Error { #[snafu(display("cancelled"))] Cancelled, + #[snafu(display("app not started"))] + AppNotStarted, + + #[snafu(display("no execution handle"))] + NoHandle, + + #[snafu(display("invalid package"))] + InvalidPackage, + #[snafu(display("dependency installation failed"))] DependencyInstallationFailed, } @@ -94,3 +103,9 @@ impl From for Error { } } } + +impl From for Error { + fn from(_: tower_package::Error) -> Self { + Error::PackageUnpackFailed + } +} diff --git a/crates/tower-runtime/src/execution.rs b/crates/tower-runtime/src/execution.rs new file mode 100644 index 00000000..47d6dd26 --- /dev/null +++ b/crates/tower-runtime/src/execution.rs @@ -0,0 +1,243 @@ +//! Execution backend abstraction for Tower +//! +//! This module provides traits and types for abstracting execution backends, +//! allowing Tower to support multiple compute substrates (local processes, +//! Kubernetes pods, etc.) through a uniform interface. + +use async_trait::async_trait; +use std::collections::HashMap; +use std::path::PathBuf; +use tokio::io::AsyncRead; + +use crate::errors::Error; +use crate::{OutputReceiver, Status}; + +// ============================================================================ +// Core Execution Types +// ============================================================================ + +/// ExecutionSpec describes what to execute and how +pub struct ExecutionSpec { + /// Unique identifier for this execution (e.g., run_id) + pub id: String, + + /// Package as a stream of tar.gz data + pub package_stream: Box, + + /// Runtime configuration (image, version, etc.) + pub runtime: RuntimeConfig, + + /// Environment name (e.g., "production", "staging", "default") + pub environment: String, + + /// Secret key-value pairs to inject + pub secrets: HashMap, + + /// Parameter key-value pairs to inject + pub parameters: HashMap, + + /// Additional environment variables + pub env_vars: HashMap, + + /// Resource limits for execution + pub resources: ResourceLimits, + + /// Networking configuration (for service workloads) + pub networking: Option, + + /// Telemetry context for tracing + pub telemetry_ctx: tower_telemetry::Context, +} + +/// RuntimeConfig specifies the execution runtime environment +#[derive(Debug, Clone)] +pub struct RuntimeConfig { + /// Runtime image to use (e.g., "towerhq/tower-runtime:python-3.11") + pub image: String, + + /// Specific version/tag if applicable + pub version: Option, + + /// Cache configuration + pub cache: CacheConfig, + + /// Entrypoint override (if not using bundle's default) + pub entrypoint: Option>, + + /// Command override (if not using bundle's default) + pub command: Option>, +} + +/// CacheConfig describes what should be cached +#[derive(Debug, Clone)] +pub struct CacheConfig { + /// Enable bundle caching (content-addressable by checksum) + pub enable_bundle_cache: bool, + + /// Enable runtime layer caching (container image layers) + pub enable_runtime_cache: bool, + + /// Enable dependency caching (language-specific, e.g., pip cache, node_modules) + pub enable_dependency_cache: bool, + + /// Cache backend to use + pub backend: CacheBackend, + + /// Cache isolation strategy + pub isolation: CacheIsolation, +} + +/// CacheIsolation defines security boundaries for caches +#[derive(Debug, Clone)] +pub enum CacheIsolation { + /// Global sharing (safe for immutable content-addressable caches) + Global, + + /// Per-account isolation + PerAccount { account_id: String }, + + /// Per-app isolation + PerApp { app_id: String }, + + /// No isolation + None, +} + +/// CacheBackend describes where caches are stored +#[derive(Debug, Clone)] +pub enum CacheBackend { + /// Local filesystem cache + Local { cache_dir: PathBuf }, + + /// No caching + None, +} + +/// ResourceLimits defines compute resource constraints +#[derive(Debug, Clone)] +pub struct ResourceLimits { + /// CPU limit in millicores (e.g., 1000 = 1 CPU) + pub cpu_millicores: Option, + + /// Memory limit in megabytes + pub memory_mb: Option, + + /// Ephemeral storage limit in megabytes + pub storage_mb: Option, + + /// Maximum number of processes + pub max_pids: Option, + + /// GPU count + pub gpu_count: u32, + + /// Execution timeout in seconds + pub timeout_seconds: u32, +} + +/// NetworkingSpec defines networking requirements +#[derive(Debug, Clone)] +pub struct NetworkingSpec { + /// Port the app listens on + pub port: u16, + + /// Whether this app needs a stable service endpoint + pub expose_service: bool, + + /// Service name (for DNS) + pub service_name: Option, +} + +// ============================================================================ +// Execution Backend Trait +// ============================================================================ + +/// ExecutionBackend abstracts the compute substrate +#[async_trait] +pub trait ExecutionBackend: Send + Sync { + /// The handle type this backend returns + type Handle: ExecutionHandle; + + /// Create a new execution environment + async fn create(&self, spec: ExecutionSpec) -> Result; + + /// Get backend capabilities + fn capabilities(&self) -> BackendCapabilities; + + /// Cleanup backend resources + async fn cleanup(&self) -> Result<(), Error>; +} + +/// BackendCapabilities describes what a backend supports +#[derive(Debug, Clone)] +pub struct BackendCapabilities { + /// Backend name + pub name: String, + + /// Supports persistent volumes for caching + pub supports_persistent_cache: bool, + + /// Supports pre-warmed environments + pub supports_prewarming: bool, + + /// Supports network isolation + pub supports_network_isolation: bool, + + /// Supports service endpoints + pub supports_service_endpoints: bool, + + /// Typical startup latency in milliseconds + pub typical_cold_start_ms: u64, + pub typical_warm_start_ms: u64, + + /// Maximum concurrent executions + pub max_concurrent_executions: Option, +} + +// ============================================================================ +// Execution Handle Trait +// ============================================================================ + +/// ExecutionHandle represents a running execution +#[async_trait] +pub trait ExecutionHandle: Send + Sync { + /// Get a unique identifier for this execution + fn id(&self) -> &str; + + /// Get current execution status + async fn status(&self) -> Result; + + /// Subscribe to log stream + async fn logs(&self) -> Result; + + /// Terminate execution gracefully + async fn terminate(&mut self) -> Result<(), Error>; + + /// Force kill execution + async fn kill(&mut self) -> Result<(), Error>; + + /// Wait for execution to complete + async fn wait_for_completion(&self) -> Result; + + /// Get service endpoint + async fn service_endpoint(&self) -> Result, Error>; + + /// Cleanup resources + async fn cleanup(&mut self) -> Result<(), Error>; +} + +/// ServiceEndpoint describes how to reach a running service +#[derive(Debug, Clone)] +pub struct ServiceEndpoint { + /// Host/IP to connect to + pub host: String, + + /// Port to connect to + pub port: u16, + + /// Protocol (http, https, tcp, etc.) + pub protocol: String, + + /// Full URL if applicable (e.g., "http://app-run-123.default.svc.cluster.local:8080") + pub url: Option, +} diff --git a/crates/tower-runtime/src/lib.rs b/crates/tower-runtime/src/lib.rs index 195c3e26..74091edc 100644 --- a/crates/tower-runtime/src/lib.rs +++ b/crates/tower-runtime/src/lib.rs @@ -5,10 +5,11 @@ use std::path::PathBuf; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tower_package::Package; -use tower_telemetry::debug; pub mod errors; +pub mod execution; pub mod local; +pub mod subprocess; use errors::Error; @@ -47,7 +48,7 @@ pub type OutputReceiver = UnboundedReceiver; pub type OutputSender = UnboundedSender; -pub trait App { +pub trait App: Send + Sync { // start will start the process fn start(opts: StartOptions) -> impl Future> + Send where @@ -60,73 +61,6 @@ pub trait App { fn status(&self) -> impl Future> + Send; } -pub struct AppLauncher { - pub app: Option, -} - -impl std::default::Default for AppLauncher { - fn default() -> Self { - Self { app: None } - } -} - -impl AppLauncher { - pub async fn launch( - &mut self, - ctx: tower_telemetry::Context, - output_sender: OutputSender, - package: Package, - environment: String, - secrets: HashMap, - parameters: HashMap, - env_vars: HashMap, - cache_dir: Option, - ) -> Result<(), Error> { - let cwd = package.unpacked_path.clone().unwrap().to_path_buf(); - - let opts = StartOptions { - ctx, - output_sender, - cwd: Some(cwd), - environment, - secrets, - parameters, - package, - env_vars, - cache_dir, - }; - - // NOTE: This is a really awful hack to force any existing app to drop itself. Not certain - // this is exactly what we want to do... - self.app = None; - - let res = A::start(opts).await; - - if let Ok(app) = res { - self.app = Some(app); - Ok(()) - } else { - self.app = None; - Err(res.err().unwrap()) - } - } - - pub async fn terminate(&mut self) -> Result<(), Error> { - if let Some(app) = &mut self.app { - if let Err(err) = app.terminate().await { - debug!("failed to terminate app: {}", err); - Err(err) - } else { - self.app = None; - Ok(()) - } - } else { - // There's no app, so nothing to terminate. - Ok(()) - } - } -} - pub struct StartOptions { pub ctx: tower_telemetry::Context, pub package: Package, diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index a6296517..ec2ea64f 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -353,6 +353,18 @@ impl App for LocalApp { }) } + async fn terminate(&mut self) -> Result<(), Error> { + self.terminator.cancel(); + + // Now we should wait for the join handle to finish. + if let Some(execute_handle) = self.execute_handle.take() { + let _ = execute_handle.await; + self.execute_handle = None; + } + + Ok(()) + } + async fn status(&self) -> Result { let mut status = self.status.lock().await; @@ -379,18 +391,6 @@ impl App for LocalApp { } } } - - async fn terminate(&mut self) -> Result<(), Error> { - self.terminator.cancel(); - - // Now we should wait for the join handle to finish. - if let Some(execute_handle) = self.execute_handle.take() { - let _ = execute_handle.await; - self.execute_handle = None; - } - - Ok(()) - } } async fn execute_bash_program( diff --git a/crates/tower-runtime/src/subprocess.rs b/crates/tower-runtime/src/subprocess.rs new file mode 100644 index 00000000..713722f2 --- /dev/null +++ b/crates/tower-runtime/src/subprocess.rs @@ -0,0 +1,200 @@ +//! Subprocess execution backend + +use crate::errors::Error; +use crate::execution::{ + BackendCapabilities, CacheBackend, ExecutionBackend, ExecutionHandle, ExecutionSpec, + ServiceEndpoint, +}; +use crate::local::LocalApp; +use crate::{App, OutputReceiver, StartOptions, Status}; + +use async_trait::async_trait; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; +use tokio::sync::Mutex; +use tokio::time::Duration; +use tower_package::Package; + +/// SubprocessBackend executes apps as a subprocess +pub struct SubprocessBackend { + /// Optional default cache directory to use + cache_dir: Option, +} + +impl SubprocessBackend { + pub fn new(cache_dir: Option) -> Self { + Self { cache_dir } + } + + /// Receive package stream and unpack it + /// + /// Takes a stream of tar.gz data, saves it to a temp file, and unpacks it + /// Returns the Package (which keeps the temp directory alive) + async fn receive_and_unpack_package( + &self, + mut package_stream: Box, + ) -> Result { + // Create temp directory for this package + let temp_dir = tmpdir::TmpDir::new("tower-package") + .await + .map_err(|_| Error::PackageCreateFailed)?; + + // Save stream to tar.gz file + let tar_gz_path = temp_dir.to_path_buf().join("package.tar.gz"); + let mut file = File::create(&tar_gz_path) + .await + .map_err(|_| Error::PackageCreateFailed)?; + + tokio::io::copy(&mut package_stream, &mut file) + .await + .map_err(|_| Error::PackageCreateFailed)?; + + file.flush().await.map_err(|_| Error::PackageCreateFailed)?; + drop(file); + + // Unpack the package + let mut package = Package::default(); + package.package_file_path = Some(tar_gz_path); + package.tmp_dir = Some(temp_dir); + package.unpack().await?; + + Ok(package) + } +} + +#[async_trait] +impl ExecutionBackend for SubprocessBackend { + type Handle = SubprocessHandle; + + async fn create(&self, spec: ExecutionSpec) -> Result { + // Convert ExecutionSpec to StartOptions for LocalApp + let (output_sender, output_receiver) = tokio::sync::mpsc::unbounded_channel(); + + // Get cache_dir from spec or use backend default + let cache_dir = match &spec.runtime.cache.backend { + CacheBackend::Local { cache_dir } => Some(cache_dir.clone()), + _ => self.cache_dir.clone(), + }; + + // Receive package stream and unpack it + let package = self.receive_and_unpack_package(spec.package_stream).await?; + + let unpacked_path = package + .unpacked_path + .clone() + .ok_or(Error::PackageUnpackFailed)?; + + let opts = StartOptions { + ctx: spec.telemetry_ctx, + package: Package::from_unpacked_path(unpacked_path).await?, + cwd: None, // LocalApp determines cwd from package + environment: spec.environment, + secrets: spec.secrets, + parameters: spec.parameters, + env_vars: spec.env_vars, + output_sender: output_sender.clone(), + cache_dir, + }; + + // Start the LocalApp + let app = LocalApp::start(opts).await?; + + Ok(SubprocessHandle { + id: spec.id, + app: Arc::new(Mutex::new(app)), + output_receiver: Arc::new(Mutex::new(output_receiver)), + _package: package, // Keep package alive so temp dir doesn't get cleaned up + }) + } + + fn capabilities(&self) -> BackendCapabilities { + BackendCapabilities { + name: "local".to_string(), + supports_persistent_cache: true, + supports_prewarming: false, + supports_network_isolation: false, + supports_service_endpoints: false, + typical_cold_start_ms: 1000, // ~1s for venv + sync + typical_warm_start_ms: 100, // ~100ms with warm cache + max_concurrent_executions: None, // Limited by system resources + } + } + + async fn cleanup(&self) -> Result<(), Error> { + // Nothing to cleanup for local backend + Ok(()) + } +} + +/// SubprocessHandle provides lifecycle management for a subprocess execution +pub struct SubprocessHandle { + id: String, + app: Arc>, + output_receiver: Arc>, + _package: Package, // Keep package alive to prevent temp dir cleanup +} + +#[async_trait] +impl ExecutionHandle for SubprocessHandle { + fn id(&self) -> &str { + &self.id + } + + async fn status(&self) -> Result { + let app = self.app.lock().await; + app.status().await + } + + async fn logs(&self) -> Result { + // Create a new channel for log streaming + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + // Spawn a task to forward Output from the internal receiver + let output_receiver = self.output_receiver.clone(); + tokio::spawn(async move { + let mut receiver = output_receiver.lock().await; + while let Some(output) = receiver.recv().await { + if tx.send(output).is_err() { + break; // Receiver dropped + } + } + }); + + Ok(rx) + } + + async fn terminate(&mut self) -> Result<(), Error> { + let mut app = self.app.lock().await; + app.terminate().await + } + + async fn kill(&mut self) -> Result<(), Error> { + // For local processes, kill is the same as terminate + self.terminate().await + } + + async fn wait_for_completion(&self) -> Result { + loop { + let status = self.status().await?; + match status { + Status::None | Status::Running => { + tokio::time::sleep(Duration::from_millis(100)).await; + } + _ => return Ok(status), + } + } + } + + async fn service_endpoint(&self) -> Result, Error> { + // Local backend doesn't support service endpoints + Ok(None) + } + + async fn cleanup(&mut self) -> Result<(), Error> { + // Ensure the app is terminated + self.terminate().await?; + Ok(()) + } +}