diff --git a/.cargo-husky/hooks/pre-commit b/.cargo-husky/hooks/pre-commit new file mode 100755 index 0000000..a77ac24 --- /dev/null +++ b/.cargo-husky/hooks/pre-commit @@ -0,0 +1,5 @@ +#!/bin/sh +set -e + +cargo fmt -- --check +cargo clippy -- -D warnings diff --git a/.cargo-husky/hooks/pre-push b/.cargo-husky/hooks/pre-push new file mode 100755 index 0000000..150ed85 --- /dev/null +++ b/.cargo-husky/hooks/pre-push @@ -0,0 +1,4 @@ +#!/bin/sh +set -e + +cargo nextest run diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..270900f --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,47 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + branches: [main] + +env: + CARGO_TERM_COLOR: always + RUSTFLAGS: "-D warnings" + +jobs: + fmt: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt + - run: cargo fmt --check + + clippy: + strategy: + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + components: clippy + - uses: Swatinem/rust-cache@v2 + - run: cargo clippy --all-targets --all-features -- -D warnings + + test: + needs: [fmt, clippy] + strategy: + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + - uses: taiki-e/install-action@nextest + - run: cargo nextest run --all-features diff --git a/.gitignore b/.gitignore index a32b61a..5aeb9fb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +/.cache /target Cargo.lock diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a3efd4..efda7ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.6.0] - 2025-11-28 + +### Added + +- Windows support (named pipes, process management) +- Git hooks via cargo-husky (pre-commit: fmt/clippy, pre-push: nextest) + ## [0.5.0] - 2025-01-23 ### Added diff --git a/Cargo.toml b/Cargo.toml index c5015d2..abde2fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daemon-cli" -version = "0.5.0" +version = "0.6.0" edition = "2024" [dependencies] @@ -18,8 +18,18 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } parking_lot = "0.12" terminal_size = "0.4" supports-color = "3.0" +interprocess = { version = "2.2", features = ["tokio"] } + +[target.'cfg(unix)'.dependencies] nix = { version = "0.30.1", features = ["signal"] } +[target.'cfg(windows)'.dependencies] +windows-sys = { version = "0.61.2", features = [ + "Win32_Foundation", + "Win32_System_Threading", +] } +widestring = "1.0" + [[example]] name = "cli" path = "examples/cli.rs" @@ -27,3 +37,4 @@ path = "examples/cli.rs" [dev-dependencies] tokio-test = "0.4" rand = "0.8" +cargo-husky = { version = "1", default-features = false, features = ["user-hooks"] } diff --git a/examples/common/mod.rs b/examples/common/mod.rs index 85b234f..818beea 100644 --- a/examples/common/mod.rs +++ b/examples/common/mod.rs @@ -30,9 +30,9 @@ impl CommandHandler for CommandProcessor { mut output: impl AsyncWrite + Send + Unpin, cancel_token: CancellationToken, ) -> Result { - let parts: Vec<&str> = command.trim().split_whitespace().collect(); + let parts: Vec<&str> = command.split_whitespace().collect(); - match parts.get(0) { + match parts.first() { Some(&"status") => { output.write_all(b"Daemon ready for processing\n").await?; Ok(0) diff --git a/examples/concurrent.rs b/examples/concurrent.rs index fb5b44a..a1d8b94 100644 --- a/examples/concurrent.rs +++ b/examples/concurrent.rs @@ -83,9 +83,9 @@ impl CommandHandler for TaskQueueHandler { }); }); - let parts: Vec<&str> = command.trim().split_whitespace().collect(); + let parts: Vec<&str> = command.split_whitespace().collect(); - match parts.get(0) { + match parts.first() { Some(&"add-task") => { let task_description = parts[1..].join(" "); if task_description.is_empty() { diff --git a/src/client.rs b/src/client.rs index 181c9d7..636b22d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,7 @@ use crate::error_context::{ErrorContextBuffer, get_or_init_global_error_context}; +use crate::process::{TerminateResult, kill_process, process_exists, terminate_process}; use crate::terminal::TerminalInfo; -use crate::transport::{SocketClient, SocketMessage, socket_path}; +use crate::transport::{SocketClient, SocketMessage, daemon_socket_exists, socket_path}; use anyhow::{Result, bail}; use std::{fs, path::PathBuf, process::Stdio, time::Duration}; use tokio::{io::AsyncWriteExt, process::Command, time::sleep}; @@ -101,7 +102,7 @@ impl DaemonClient { // Daemon not running or not responsive - spawn our own tracing::debug!("No existing daemon found, spawning new daemon"); - if socket_path.exists() { + if daemon_socket_exists(daemon_name, root_path) { // Clean up stale socket file tracing::debug!("Cleaning up stale socket file"); let _ = fs::remove_file(&socket_path); @@ -247,7 +248,6 @@ impl DaemonClient { .map_err(|e| anyhow::anyhow!("Failed to spawn daemon process: {}", e))?; // Wait for daemon to become ready - let socket_path = socket_path(daemon_name, root_path); let mut attempts = 0; const MAX_ATTEMPTS: u32 = 50; // 5 seconds total @@ -266,7 +266,7 @@ impl DaemonClient { } // Try to connect - if socket_path.exists() + if daemon_socket_exists(daemon_name, root_path) && let Ok(socket_client) = SocketClient::connect(daemon_name, root_path).await { // Successfully connected - daemon is ready @@ -293,7 +293,7 @@ impl DaemonClient { && let Ok(pid) = pid_str.trim().parse::() { tracing::debug!(pid, "Cleaning up stale daemon process"); - let _ = Command::new("kill").arg(pid.to_string()).output().await; + kill_process(pid).await; } // Remove stale PID file let _ = fs::remove_file(&pid_file); @@ -305,11 +305,18 @@ impl DaemonClient { /// /// This method will: /// 1. Read the daemon's PID from the PID file - /// 2. Send SIGTERM for graceful shutdown - /// 3. Wait up to 1 second for the process to exit - /// 4. Send SIGKILL if the process is still running + /// 2. Attempt graceful shutdown (Unix: SIGTERM, Windows: immediate termination) + /// 3. Wait up to 1 second for the process to exit (Unix only) + /// 4. Force terminate if still running (Unix: SIGKILL) /// 5. Clean up PID and socket files /// + /// # Platform Behavior + /// + /// - **Unix**: Sends SIGTERM for graceful shutdown, waits up to 1 second, + /// then sends SIGKILL if still running. + /// - **Windows**: Immediately terminates the process. Windows has no + /// SIGTERM equivalent for console applications. + /// /// Returns an error if the daemon is not running or cannot be stopped. /// /// # Example @@ -325,152 +332,64 @@ impl DaemonClient { /// ``` pub async fn force_stop(&self) -> Result<()> { let pid_file = crate::transport::pid_path(&self.daemon_name, &self.root_path); - let socket_path = socket_path(&self.daemon_name, &self.root_path); + let sock_path = socket_path(&self.daemon_name, &self.root_path); // Read PID file if !pid_file.exists() { // Best-effort cleanup of potentially stale socket - let _ = fs::remove_file(&socket_path); + let _ = fs::remove_file(&sock_path); bail!("Daemon is not running (no PID file found)"); } let pid_str = fs::read_to_string(&pid_file) .map_err(|e| anyhow::anyhow!("Failed to read PID file: {}", e))?; - let pid = pid_str + let pid: u32 = pid_str .trim() - .parse::() + .parse() .map_err(|e| anyhow::anyhow!("Invalid PID in file: {}", e))?; tracing::info!(pid, "Force-stopping daemon"); - // Check if process exists using nix crate's kill with signal 0 - use nix::sys::signal::{Signal, kill}; - use nix::unistd::Pid; - - let nix_pid = Pid::from_raw(pid); - - // Verify process exists - match kill(nix_pid, None) { - Ok(_) => { - // Process exists and we can signal it + // Check if process exists + match process_exists(pid) { + Ok(true) => { + // Process exists, continue + } + Ok(false) => { + // Process doesn't exist, clean up files + tracing::warn!(pid, "Process not running, cleaning up files"); + let _ = fs::remove_file(&pid_file); + let _ = fs::remove_file(&sock_path); + bail!("Daemon process (PID {}) is not running", pid); } Err(e) => { - use nix::errno::Errno; - match e { - Errno::ESRCH => { - // Process doesn't exist, clean up files - tracing::warn!(pid, "Process not running, cleaning up files"); - let _ = fs::remove_file(&pid_file); - let _ = fs::remove_file(&socket_path); - bail!("Daemon process (PID {}) is not running", pid); - } - Errno::EPERM => { - // Permission denied - process exists but we can't signal it - bail!( - "Permission denied: cannot signal daemon process (PID {}). \ - Process appears to be running but owned by another user.", - pid - ); - } - _ => { - // Other error - be conservative and don't delete files - bail!("Error checking daemon process (PID {}): {}", pid, e); - } - } + bail!("Error checking daemon process (PID {}): {}", pid, e); } } - // Send SIGTERM for graceful shutdown - tracing::debug!(pid, "Sending SIGTERM"); - kill(nix_pid, Signal::SIGTERM) - .map_err(|e| anyhow::anyhow!("Failed to send SIGTERM: {}", e))?; - - // Wait up to 1 seconds for process to exit - for i in 0..10 { - sleep(Duration::from_millis(100)).await; - - // Check if process still exists - match kill(nix_pid, None) { - Ok(_) => { - // Process still running, continue waiting - } - Err(e) => { - use nix::errno::Errno; - match e { - Errno::ESRCH => { - // Process has exited - tracing::info!(pid, "Daemon stopped gracefully"); - let _ = fs::remove_file(&pid_file); - let _ = fs::remove_file(&socket_path); - return Ok(()); - } - Errno::EPERM => { - // Permission denied - we can't verify if it's still running - // This is unusual during shutdown wait, but be safe - bail!( - "Permission denied while checking daemon process (PID {}). \ - Cannot verify shutdown status.", - pid - ); - } - _ => { - // Other error during wait - be conservative - bail!("Error checking daemon process (PID {}) during shutdown: {}", pid, e); - } - } - } + // Terminate with 1 second graceful timeout (Unix only; Windows is immediate) + match terminate_process(pid, 1000).await { + TerminateResult::Terminated => { + tracing::info!(pid, "Daemon stopped"); } - - if i == 9 { - tracing::warn!(pid, "Process did not exit after SIGTERM, sending SIGKILL"); + TerminateResult::AlreadyDead => { + tracing::info!(pid, "Daemon was already stopped"); } - } - - // Process still running, send SIGKILL - tracing::debug!(pid, "Sending SIGKILL"); - kill(nix_pid, Signal::SIGKILL) - .map_err(|e| anyhow::anyhow!("Failed to send SIGKILL: {}", e))?; - - // Wait briefly for SIGKILL to take effect - sleep(Duration::from_millis(500)).await; - - // Verify process is dead - match kill(nix_pid, None) { - Ok(_) => { - // Process still exists after SIGKILL - this is bad - bail!("Failed to kill daemon process (PID {})", pid); + TerminateResult::PermissionDenied => { + bail!( + "Permission denied: cannot stop daemon process (PID {}). \ + Process appears to be running but owned by another user.", + pid + ); } - Err(e) => { - use nix::errno::Errno; - match e { - Errno::ESRCH => { - // Process is gone, success - } - Errno::EPERM => { - // Permission denied after SIGKILL - can't verify - bail!( - "Permission denied: cannot verify daemon process (PID {}) was killed. \ - SIGKILL was sent but status unclear.", - pid - ); - } - _ => { - // Other error - unclear if process is dead - bail!( - "Error verifying daemon process (PID {}) after SIGKILL: {}", - pid, - e - ); - } - } + TerminateResult::Error(e) => { + bail!("Failed to stop daemon (PID {}): {}", pid, e); } } - tracing::info!(pid, "Daemon force-stopped with SIGKILL"); - // Clean up files let _ = fs::remove_file(&pid_file); - let _ = fs::remove_file(&socket_path); + let _ = fs::remove_file(&sock_path); Ok(()) } @@ -577,26 +496,27 @@ impl DaemonClient { let result = self.execute_command_internal(command.clone()).await; // Check if we should auto-restart on error - if let Err(ref error) = result { - if self.auto_restart_on_error && Self::is_fatal_connection_error(error) { - tracing::warn!( - error = %error, - "Fatal connection error detected, restarting daemon and retrying" - ); - - // Restart daemon - if let Err(restart_err) = self.restart().await { - tracing::error!(error = %restart_err, "Failed to restart daemon"); - return Err(anyhow::anyhow!( - "Daemon crashed and restart failed: {}", - restart_err - )); - } + if let Err(ref error) = result + && self.auto_restart_on_error + && Self::is_fatal_connection_error(error) + { + tracing::warn!( + error = %error, + "Fatal connection error detected, restarting daemon and retrying" + ); - // Retry command once - tracing::info!("Retrying command after daemon restart"); - return self.execute_command_internal(command).await; + // Restart daemon + if let Err(restart_err) = self.restart().await { + tracing::error!(error = %restart_err, "Failed to restart daemon"); + return Err(anyhow::anyhow!( + "Daemon crashed and restart failed: {}", + restart_err + )); } + + // Retry command once + tracing::info!("Retrying command after daemon restart"); + return self.execute_command_internal(command).await; } result @@ -623,9 +543,8 @@ impl DaemonClient { terminal_info, }) .await - .map_err(|e| { + .inspect_err(|_| { self.error_context.dump_to_stderr(); - e })?; // Stream output chunks to stdout diff --git a/src/error_context.rs b/src/error_context.rs index 4bc59a1..d25b449 100644 --- a/src/error_context.rs +++ b/src/error_context.rs @@ -155,3 +155,124 @@ impl Visit for MessageVisitor { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_error_context_buffer_new() { + let buffer = ErrorContextBuffer::new(); + // Buffer should be empty initially + let entries = buffer.entries.lock(); + assert!(entries.is_empty()); + } + + #[test] + fn test_error_context_buffer_add_entry() { + let buffer = ErrorContextBuffer::new(); + + // Add an entry + buffer.add_entry(tracing::Level::INFO, "test message".to_string()); + + let entries = buffer.entries.lock(); + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].message, "test message"); + } + + #[test] + fn test_error_context_buffer_multiple_entries() { + let buffer = ErrorContextBuffer::new(); + + // Add multiple entries + buffer.add_entry(tracing::Level::DEBUG, "debug message".to_string()); + buffer.add_entry(tracing::Level::INFO, "info message".to_string()); + buffer.add_entry(tracing::Level::WARN, "warn message".to_string()); + buffer.add_entry(tracing::Level::ERROR, "error message".to_string()); + + let entries = buffer.entries.lock(); + assert_eq!(entries.len(), 4); + assert_eq!(entries[0].message, "debug message"); + assert_eq!(entries[1].message, "info message"); + assert_eq!(entries[2].message, "warn message"); + assert_eq!(entries[3].message, "error message"); + } + + #[test] + fn test_error_context_buffer_circular_eviction() { + let buffer = ErrorContextBuffer::new(); + + // Add more entries than BUFFER_SIZE + for i in 0..(BUFFER_SIZE + 10) { + buffer.add_entry(tracing::Level::INFO, format!("message {}", i)); + } + + let entries = buffer.entries.lock(); + + // Should have exactly BUFFER_SIZE entries + assert_eq!(entries.len(), BUFFER_SIZE); + + // Oldest entries should have been evicted + // First entry should be message 10 (since we added 10 extra) + assert_eq!(entries[0].message, "message 10"); + + // Last entry should be the most recent + assert_eq!( + entries[BUFFER_SIZE - 1].message, + format!("message {}", BUFFER_SIZE + 10 - 1) + ); + } + + #[test] + fn test_error_context_buffer_clone() { + let buffer = ErrorContextBuffer::new(); + buffer.add_entry(tracing::Level::INFO, "test".to_string()); + + // Clone should share the same underlying buffer + let buffer2 = buffer.clone(); + buffer2.add_entry(tracing::Level::WARN, "test2".to_string()); + + // Both should see both entries + let entries = buffer.entries.lock(); + assert_eq!(entries.len(), 2); + } + + #[test] + fn test_error_context_buffer_default() { + let buffer = ErrorContextBuffer::default(); + let entries = buffer.entries.lock(); + assert!(entries.is_empty()); + } + + #[test] + fn test_error_context_buffer_dump_empty() { + let buffer = ErrorContextBuffer::new(); + // This should not panic even with empty buffer + buffer.dump_to_stderr(); + } + + #[test] + fn test_error_context_buffer_dump_with_entries() { + let buffer = ErrorContextBuffer::new(); + buffer.add_entry(tracing::Level::INFO, "test message".to_string()); + buffer.add_entry(tracing::Level::ERROR, "error message".to_string()); + + // This should not panic + buffer.dump_to_stderr(); + } + + #[test] + fn test_log_entry_timestamps() { + let buffer = ErrorContextBuffer::new(); + + buffer.add_entry(tracing::Level::INFO, "first".to_string()); + std::thread::sleep(std::time::Duration::from_millis(10)); + buffer.add_entry(tracing::Level::INFO, "second".to_string()); + + let entries = buffer.entries.lock(); + assert_eq!(entries.len(), 2); + + // Second entry should have a later timestamp + assert!(entries[1].timestamp >= entries[0].timestamp); + } +} diff --git a/src/lib.rs b/src/lib.rs index 7039d5c..c8d6a2a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,6 +105,7 @@ use tokio_util::sync::CancellationToken; mod client; mod error_context; +mod process; mod server; mod terminal; mod transport; diff --git a/src/process.rs b/src/process.rs new file mode 100644 index 0000000..b54a390 --- /dev/null +++ b/src/process.rs @@ -0,0 +1,366 @@ +//! Platform-specific process management utilities. + +use anyhow::Result; +#[cfg(unix)] +use anyhow::bail; +#[cfg(unix)] +use tokio::time::{Duration, sleep}; + +/// Result of process termination attempt. +#[derive(Debug)] +pub enum TerminateResult { + /// Process was terminated successfully. + Terminated, + /// Process was already dead. + AlreadyDead, + /// Permission denied to signal/terminate process. + PermissionDenied, + /// Other error occurred. + Error(String), +} + +/// Check if a process with the given PID exists. +pub fn process_exists(pid: u32) -> Result { + platform::process_exists(pid) +} + +/// Terminate a process, attempting graceful shutdown first on Unix. +/// +/// # Platform Behavior +/// +/// - **Unix**: Sends SIGTERM for graceful shutdown, waits up to `graceful_timeout_ms`, +/// then sends SIGKILL if still running. +/// - **Windows**: Immediately terminates the process. Windows has no SIGTERM equivalent +/// for console applications, so graceful shutdown is not possible via this method. +pub async fn terminate_process(pid: u32, graceful_timeout_ms: u64) -> TerminateResult { + platform::terminate_process(pid, graceful_timeout_ms).await +} + +/// Quick kill without graceful shutdown attempt. +/// +/// Uses platform-specific kill commands: +/// - Unix: `kill` command +/// - Windows: `taskkill /F` +pub async fn kill_process(pid: u32) { + platform::kill_process(pid).await +} + +#[cfg(unix)] +mod platform { + use super::*; + use nix::errno::Errno; + use nix::sys::signal::{Signal, kill}; + use nix::unistd::Pid; + + pub fn process_exists(pid: u32) -> Result { + match kill(Pid::from_raw(pid as i32), None) { + Ok(_) => Ok(true), + Err(Errno::ESRCH) => Ok(false), + Err(Errno::EPERM) => Ok(true), // Exists but we can't signal it + Err(e) => bail!("Error checking process {}: {}", pid, e), + } + } + + pub async fn terminate_process(pid: u32, graceful_timeout_ms: u64) -> TerminateResult { + let nix_pid = Pid::from_raw(pid as i32); + + // Verify process exists first + match kill(nix_pid, None) { + Ok(_) => {} + Err(Errno::ESRCH) => return TerminateResult::AlreadyDead, + Err(Errno::EPERM) => return TerminateResult::PermissionDenied, + Err(e) => return TerminateResult::Error(format!("Failed to check process: {}", e)), + } + + // Send SIGTERM for graceful shutdown + if let Err(e) = kill(nix_pid, Signal::SIGTERM) { + match e { + Errno::ESRCH => return TerminateResult::AlreadyDead, + Errno::EPERM => return TerminateResult::PermissionDenied, + _ => return TerminateResult::Error(format!("SIGTERM failed: {}", e)), + } + } + + // Wait for graceful shutdown + let iterations = graceful_timeout_ms.div_ceil(100); + for _ in 0..iterations { + sleep(Duration::from_millis(100)).await; + match kill(nix_pid, None) { + Err(Errno::ESRCH) => return TerminateResult::Terminated, + Ok(_) => continue, + Err(Errno::EPERM) => return TerminateResult::PermissionDenied, + Err(e) => return TerminateResult::Error(format!("Check failed: {}", e)), + } + } + + // Still running, send SIGKILL + if let Err(e) = kill(nix_pid, Signal::SIGKILL) { + match e { + Errno::ESRCH => return TerminateResult::Terminated, + Errno::EPERM => return TerminateResult::PermissionDenied, + _ => return TerminateResult::Error(format!("SIGKILL failed: {}", e)), + } + } + + // Wait for SIGKILL to take effect + sleep(Duration::from_millis(500)).await; + + match kill(nix_pid, None) { + Err(Errno::ESRCH) => TerminateResult::Terminated, + Ok(_) => TerminateResult::Error("Process survived SIGKILL".to_string()), + Err(e) => TerminateResult::Error(format!("Final check failed: {}", e)), + } + } + + pub async fn kill_process(pid: u32) { + use tokio::process::Command; + let _ = Command::new("kill").arg(pid.to_string()).output().await; + } +} + +#[cfg(windows)] +mod platform { + use super::*; + use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE, WAIT_OBJECT_0}; + use windows_sys::Win32::System::Threading::{ + GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION, PROCESS_SYNCHRONIZE, + PROCESS_TERMINATE, TerminateProcess as WinTerminateProcess, WaitForSingleObject, + }; + + pub fn process_exists(pid: u32) -> Result { + let handle = unsafe { OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid) }; + + if handle.is_null() { + Ok(false) + } else { + // Check if process is actually still running using GetExitCodeProcess. + // OpenProcess can succeed on terminated processes until cleanup is complete. + let mut exit_code: u32 = 0; + let result = unsafe { GetExitCodeProcess(handle, &mut exit_code) }; + unsafe { CloseHandle(handle) }; + + if result == 0 { + // GetExitCodeProcess failed, assume process doesn't exist + Ok(false) + } else { + // STILL_ACTIVE (259) means the process is still running + Ok(exit_code == STILL_ACTIVE as u32) + } + } + } + + pub async fn terminate_process(pid: u32, _graceful_timeout_ms: u64) -> TerminateResult { + // Windows has no SIGTERM equivalent for console applications. + // TerminateProcess is immediate (like SIGKILL). + + let handle = unsafe { OpenProcess(PROCESS_TERMINATE | PROCESS_SYNCHRONIZE, 0, pid) }; + + if handle.is_null() { + // Check if process exists with limited permissions + let check = unsafe { OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid) }; + if check.is_null() { + return TerminateResult::AlreadyDead; + } + unsafe { CloseHandle(check) }; + return TerminateResult::PermissionDenied; + } + + let result = unsafe { WinTerminateProcess(handle, 1) }; + if result == 0 { + unsafe { CloseHandle(handle) }; + return TerminateResult::Error("TerminateProcess failed".to_string()); + } + + // Wait for process to exit (up to 5 seconds) + let wait = unsafe { WaitForSingleObject(handle, 5000) }; + unsafe { CloseHandle(handle) }; + + if wait == WAIT_OBJECT_0 { + TerminateResult::Terminated + } else { + TerminateResult::Error("Process did not terminate".to_string()) + } + } + + pub async fn kill_process(pid: u32) { + use tokio::process::Command; + let _ = Command::new("taskkill") + .args(["/F", "/PID", &pid.to_string()]) + .output() + .await; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_current_process_exists() { + let pid = std::process::id(); + assert!(process_exists(pid).unwrap()); + } + + #[tokio::test] + async fn test_nonexistent_process() { + // Very high PID unlikely to exist + let result = process_exists(4_000_000_000); + assert!(result.is_ok()); + assert!(!result.unwrap()); + } + + #[tokio::test] + async fn test_terminate_nonexistent_process() { + // Terminating a nonexistent process should return AlreadyDead + let result = terminate_process(4_000_000_000, 100).await; + assert!( + matches!(result, TerminateResult::AlreadyDead), + "Expected AlreadyDead for nonexistent process" + ); + } + + #[tokio::test] + async fn test_kill_nonexistent_process() { + // Kill on nonexistent process should not panic + kill_process(4_000_000_000).await; + } + + #[cfg(unix)] + #[tokio::test] + async fn test_terminate_spawned_process() { + use tokio::process::Command; + + // Spawn a simple sleep process + let mut child = Command::new("sleep") + .arg("60") + .spawn() + .expect("Failed to spawn sleep process"); + + let pid = child.id().expect("Process should have PID"); + + // Verify process exists + assert!(process_exists(pid).unwrap(), "Spawned process should exist"); + + // Terminate it with graceful timeout + let result = terminate_process(pid, 500).await; + + // Note: When terminating a direct child process, it becomes a zombie until + // the parent (us) waits on it. The terminate_process function may report + // "Process survived SIGKILL" because zombies still appear in the process table. + // This is expected behavior - in production, daemon processes are detached + // and reaped by init/systemd, not our process. + // + // Accept either Terminated or the "zombie" error case + assert!( + matches!( + result, + TerminateResult::Terminated | TerminateResult::Error(_) + ), + "Process should be killed, got {:?}", + result + ); + + // Reap the zombie process by waiting on the child + let exit_status = child.wait().await; + assert!( + exit_status.is_ok(), + "Should be able to wait on terminated child" + ); + + // Verify process no longer exists (after reaping) + assert!( + !process_exists(pid).unwrap(), + "Process should not exist after reaping" + ); + } + + #[cfg(unix)] + #[tokio::test] + async fn test_sigterm_then_sigkill_escalation() { + use std::process::Stdio; + use tokio::process::Command; + + // Spawn a process that traps SIGTERM and ignores it + // Using bash with a trap to ignore SIGTERM + let mut child = Command::new("bash") + .arg("-c") + .arg("trap '' TERM; sleep 60") + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .expect("Failed to spawn bash process"); + + let pid = child.id().expect("Process should have PID"); + + // Give the trap time to be set up + sleep(Duration::from_millis(100)).await; + + // Verify process exists + assert!(process_exists(pid).unwrap(), "Spawned process should exist"); + + // Terminate with very short graceful timeout (100ms) + // This should escalate to SIGKILL since SIGTERM is trapped + let result = terminate_process(pid, 100).await; + + // Note: Same zombie issue as above - the process may appear to survive + // because it's a zombie until we reap it. + assert!( + matches!( + result, + TerminateResult::Terminated | TerminateResult::Error(_) + ), + "Process should be killed (possibly as zombie), got {:?}", + result + ); + + // Reap the zombie process by waiting on the child + let exit_status = child.wait().await; + assert!( + exit_status.is_ok(), + "Should be able to wait on terminated child" + ); + + // Verify process no longer exists (after reaping) + assert!( + !process_exists(pid).unwrap(), + "Process should not exist after reaping" + ); + } + + #[cfg(windows)] + #[tokio::test] + async fn test_terminate_spawned_process_windows() { + use std::process::Stdio; + use tokio::process::Command; + + // Spawn a long-running process on Windows + let child = Command::new("timeout") + .args(["/t", "60", "/nobreak"]) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .expect("Failed to spawn timeout process"); + + let pid = child.id().expect("Process should have PID"); + + // Verify process exists + assert!(process_exists(pid).unwrap(), "Spawned process should exist"); + + // Terminate it (Windows has no graceful shutdown, just immediate termination) + let result = terminate_process(pid, 500).await; + + assert!( + matches!(result, TerminateResult::Terminated), + "Process should terminate successfully, got {:?}", + result + ); + + // Verify process no longer exists + assert!( + !process_exists(pid).unwrap(), + "Process should not exist after termination" + ); + } +} diff --git a/src/tests.rs b/src/tests.rs index 8faecbd..305610a 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -72,7 +72,7 @@ fn test_socket_message_serialization() { assert_eq!(command, "test command"); assert_eq!(ti.width, Some(80)); assert_eq!(ti.height, Some(24)); - assert_eq!(ti.is_tty, true); + assert!(ti.is_tty); assert_eq!(ti.color_support, ColorSupport::Truecolor); } _ => panic!("Wrong message type"), @@ -124,7 +124,9 @@ async fn test_handler_basic_output() { color_support: ColorSupport::Basic16, }; - let result = handler.handle("test", terminal_info, &mut output, cancel).await; + let result = handler + .handle("test", terminal_info, &mut output, cancel) + .await; assert!(result.is_ok()); assert_eq!(result.unwrap(), 0); // Success exit code assert_eq!(String::from_utf8(output).unwrap(), "Hello, World!"); diff --git a/src/transport.rs b/src/transport.rs index 5c1756a..43cd876 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,14 +1,27 @@ use crate::terminal::TerminalInfo; use anyhow::Result; use futures::{SinkExt, StreamExt}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +#[cfg(unix)] +use interprocess::local_socket::{GenericFilePath, ToFsName}; +#[cfg(windows)] +use interprocess::local_socket::{GenericNamespaced, ToNsName}; +use interprocess::local_socket::{ + ListenerOptions, + tokio::{Listener, Stream, prelude::*}, +}; +#[cfg(windows)] +use interprocess::os::windows::{ + local_socket::ListenerOptionsExt as _, security_descriptor::SecurityDescriptor, +}; +use serde::{Deserialize, Serialize, de::DeserializeOwned}; +#[cfg(unix)] +use std::fs; use std::{ collections::hash_map::DefaultHasher, - env, fs, + env, hash::{Hash, Hasher}, path::PathBuf, }; -use tokio::net::{UnixListener, UnixStream}; use tokio_util::codec::{Framed, LengthDelimitedCodec}; // Base62 character set for encoding @@ -58,39 +71,82 @@ pub fn deserialize_message(bytes: &[u8]) -> Result { Ok(message) } -// Internal: Unix socket server for handling RPC requests +/// Cross-platform socket name for the daemon (used on Windows for named pipes). +#[cfg_attr(unix, allow(dead_code))] +pub fn socket_name(daemon_name: &str, root_path: &str) -> String { + let short_id = hash_path_to_short_id(root_path); + format!("{short_id}-{daemon_name}.sock") +} + +/// Check if daemon socket likely exists (platform-aware). +pub fn daemon_socket_exists(daemon_name: &str, root_path: &str) -> bool { + // On Unix, check socket file. On Windows, use PID file as proxy. + #[cfg(unix)] + { + socket_path(daemon_name, root_path).exists() + } + #[cfg(windows)] + { + // Windows named pipes can't be checked via filesystem + pid_path(daemon_name, root_path).exists() + } +} + +// Internal: Cross-platform socket server for handling RPC requests pub struct SocketServer { - listener: UnixListener, + listener: Listener, socket_path: PathBuf, } impl SocketServer { pub async fn new(daemon_name: &str, root_path: &str) -> Result { - let socket_path = socket_path(daemon_name, root_path); - - // Remove existing socket file if it exists - if socket_path.exists() { - fs::remove_file(&socket_path)?; - } + let sock_path = socket_path(daemon_name, root_path); - let listener = UnixListener::bind(&socket_path)?; - - // Set socket permissions to 0600 (owner read/write only) + // Create listener using platform-appropriate naming + #[cfg(unix)] + let listener = { + // Remove existing socket file if it exists + if sock_path.exists() { + fs::remove_file(&sock_path)?; + } + ListenerOptions::new() + .name(sock_path.clone().to_fs_name::()?) + .create_tokio()? + }; + + #[cfg(windows)] + let listener = { + use widestring::u16cstr; + + let name = socket_name(daemon_name, root_path); + + // Create security descriptor that grants full control to owner only (equivalent to Unix 0o600) + // SDDL: D:P(A;;GA;;;OW) = DACL Protected, Allow Generic All to Owner + let sd = SecurityDescriptor::deserialize(u16cstr!("D:P(A;;GA;;;OW)")) + .map_err(|e| anyhow::anyhow!("Failed to create security descriptor: {}", e))?; + + ListenerOptions::new() + .name(name.to_ns_name::()?) + .security_descriptor(sd) + .create_tokio()? + }; + + // Set socket permissions to 0600 (owner read/write only) - Unix only #[cfg(unix)] { use std::os::unix::fs::PermissionsExt; let perms = fs::Permissions::from_mode(0o600); - fs::set_permissions(&socket_path, perms)?; + fs::set_permissions(&sock_path, perms)?; } Ok(Self { listener, - socket_path, + socket_path: sock_path, }) } pub async fn accept(&mut self) -> Result { - let (stream, _addr) = self.listener.accept().await?; + let stream = self.listener.accept().await?; Ok(SocketConnection::new(stream)) } @@ -99,22 +155,26 @@ impl SocketServer { } } -impl Drop for SocketServer { - fn drop(&mut self) { - // Clean up socket file - let _ = fs::remove_file(&self.socket_path); - } -} - -// Internal: Unix socket client for sending RPC requests +// Internal: Cross-platform socket client for sending RPC requests pub struct SocketClient { connection: SocketConnection, } impl SocketClient { pub async fn connect(daemon_name: &str, root_path: &str) -> Result { - let socket_path = socket_path(daemon_name, root_path); - let stream = UnixStream::connect(socket_path).await?; + // Connect using platform-appropriate naming + #[cfg(unix)] + let stream = { + let sock_path = socket_path(daemon_name, root_path); + Stream::connect(sock_path.to_fs_name::()?).await? + }; + + #[cfg(windows)] + let stream = { + let name = socket_name(daemon_name, root_path); + Stream::connect(name.to_ns_name::()?).await? + }; + Ok(Self { connection: SocketConnection::new(stream), }) @@ -133,13 +193,13 @@ impl SocketClient { } } -// Internal: Framed connection over Unix socket +// Internal: Framed connection over cross-platform socket pub struct SocketConnection { - framed: Framed, + framed: Framed, } impl SocketConnection { - pub fn new(stream: UnixStream) -> Self { + pub fn new(stream: Stream) -> Self { let codec = LengthDelimitedCodec::new(); let framed = Framed::new(stream, codec); Self { framed } @@ -170,12 +230,16 @@ impl SocketConnection { // Internal: Message types for socket communication #[derive(Serialize, Deserialize, Debug)] pub enum SocketMessage { - VersionCheck { build_timestamp: u64 }, + VersionCheck { + build_timestamp: u64, + }, Command { command: String, terminal_info: TerminalInfo, }, OutputChunk(Vec), - CommandComplete { exit_code: i32 }, + CommandComplete { + exit_code: i32, + }, CommandError(String), } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index fb601a9..84d3f0f 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -481,7 +481,7 @@ async fn test_concurrent_stress_10_plus_clients() -> Result<()> { let result = handle.await; assert!(result.is_ok(), "Client task panicked"); match result.unwrap() { - Ok(exit_code) if exit_code == 0 => success_count += 1, + Ok(0) => success_count += 1, Ok(exit_code) => panic!("Unexpected exit code: {}", exit_code), Err(_) => {} // Expected errors are ok in stress test } @@ -555,7 +555,7 @@ async fn test_connection_limit() -> Result<()> { let result = handle.await; assert!(result.is_ok(), "Client task panicked"); match result.unwrap() { - Ok(exit_code) if exit_code == 0 => success_count += 1, + Ok(0) => success_count += 1, Ok(exit_code) => panic!("Unexpected exit code: {}", exit_code), Err(e) => { // When server is at capacity, connection is dropped which causes various errors @@ -766,3 +766,432 @@ async fn test_with_auto_restart_enabled() -> Result<()> { Ok(()) } + +// ============================================================================ +// HIGH PRIORITY TESTS - Coverage gaps +// ============================================================================ + +// Test handler that writes large output then immediately returns +// This tests the critical fix at server.rs:330-344 for handler output/task completion race +#[derive(Clone)] +struct ImmediateOutputHandler { + output_size: usize, +} + +#[async_trait] +impl CommandHandler for ImmediateOutputHandler { + async fn handle( + &self, + _command: &str, + _terminal_info: TerminalInfo, + mut output: impl AsyncWrite + Send + Unpin, + _cancel: CancellationToken, + ) -> Result { + // Write large output in chunks then immediately return + let chunk = vec![b'X'; 1024]; + for _ in 0..(self.output_size / 1024) { + output.write_all(&chunk).await?; + } + // Handler completes immediately after writing - tests race condition + Ok(0) + } +} + +#[tokio::test] +async fn test_handler_completes_before_output_fully_read() -> Result<()> { + let (daemon_name, root_path) = generate_test_daemon_config(); + let build_timestamp = 1234567910; + // 64KB of output to test buffering + let handler = ImmediateOutputHandler { + output_size: 64 * 1024, + }; + + let (shutdown_handle, join_handle) = + start_test_daemon(&daemon_name, &root_path, build_timestamp, handler).await; + + let daemon_exe = PathBuf::from("./target/debug/examples/cli"); + let mut client = DaemonClient::connect_with_name_and_timestamp( + &daemon_name, + &root_path, + daemon_exe, + build_timestamp, + ) + .await?; + + // Execute command - all output should be received before CommandComplete + let result = client.execute_command("test".to_string()).await; + + assert!(result.is_ok(), "Command should succeed: {:?}", result); + assert_eq!(result.unwrap(), 0); + + stop_test_daemon(shutdown_handle, join_handle).await; + Ok(()) +} + +// Test handler that produces very large output (1MB+) +#[derive(Clone)] +struct LargeOutputHandler { + total_size: usize, +} + +#[async_trait] +impl CommandHandler for LargeOutputHandler { + async fn handle( + &self, + _command: &str, + _terminal_info: TerminalInfo, + mut output: impl AsyncWrite + Send + Unpin, + _cancel: CancellationToken, + ) -> Result { + let chunk = vec![b'A'; 8192]; // 8KB chunks + let chunks = self.total_size / 8192; + for i in 0..chunks { + output.write_all(&chunk).await?; + // Small delay every 100 chunks to simulate real-world streaming + if i % 100 == 0 { + sleep(Duration::from_millis(1)).await; + } + } + Ok(0) + } +} + +#[tokio::test] +async fn test_large_output_streaming() -> Result<()> { + let (daemon_name, root_path) = generate_test_daemon_config(); + let build_timestamp = 1234567911; + // 1MB output + let handler = LargeOutputHandler { + total_size: 1024 * 1024, + }; + + let (shutdown_handle, join_handle) = + start_test_daemon(&daemon_name, &root_path, build_timestamp, handler).await; + + let daemon_exe = PathBuf::from("./target/debug/examples/cli"); + let mut client = DaemonClient::connect_with_name_and_timestamp( + &daemon_name, + &root_path, + daemon_exe, + build_timestamp, + ) + .await?; + + let result = client.execute_command("large".to_string()).await; + + assert!(result.is_ok(), "Large output should stream successfully"); + assert_eq!(result.unwrap(), 0); + + stop_test_daemon(shutdown_handle, join_handle).await; + Ok(()) +} + +// Test handler that panics to verify error capture +#[derive(Clone)] +struct PanicHandler; + +#[async_trait] +impl CommandHandler for PanicHandler { + async fn handle( + &self, + _command: &str, + _terminal_info: TerminalInfo, + mut output: impl AsyncWrite + Send + Unpin, + _cancel: CancellationToken, + ) -> Result { + output.write_all(b"About to panic...\n").await?; + panic!("Test panic in handler"); + } +} + +#[tokio::test] +async fn test_handler_panic_reports_error() -> Result<()> { + let (daemon_name, root_path) = generate_test_daemon_config(); + let build_timestamp = 1234567912; + let handler = PanicHandler; + + let (shutdown_handle, join_handle) = + start_test_daemon(&daemon_name, &root_path, build_timestamp, handler).await; + + let daemon_exe = PathBuf::from("./target/debug/examples/cli"); + let mut client = DaemonClient::connect_with_name_and_timestamp( + &daemon_name, + &root_path, + daemon_exe, + build_timestamp, + ) + .await?; + + let result = client.execute_command("trigger panic".to_string()).await; + + // Should get an error (either from panic capture or connection close) + assert!(result.is_err(), "Panic should result in error"); + let error_msg = result.unwrap_err().to_string().to_lowercase(); + assert!( + error_msg.contains("panic") || error_msg.contains("closed") || error_msg.contains("error"), + "Error should indicate panic or connection issue: {}", + error_msg + ); + + // Server should still be running and able to accept new connections + sleep(Duration::from_millis(100)).await; + + // Try a new connection to verify server is still operational + let client2 = DaemonClient::connect_with_name_and_timestamp( + &daemon_name, + &root_path, + PathBuf::from("./target/debug/examples/cli"), + build_timestamp, + ) + .await; + + // New connection should succeed (server recovered from panic) + assert!( + client2.is_ok(), + "Server should still accept connections after handler panic" + ); + + stop_test_daemon(shutdown_handle, join_handle).await; + Ok(()) +} + +// Test stale socket and PID file cleanup +#[tokio::test] +async fn test_cleanup_stale_socket_and_pid() -> Result<()> { + let (daemon_name, root_path) = generate_test_daemon_config(); + let build_timestamp = 1234567913; + + // Get paths for socket and PID files + let socket_path = daemon_cli::socket_path(&daemon_name, &root_path); + let pid_path = daemon_cli::pid_path(&daemon_name, &root_path); + + // Create stale socket file (just a regular file, not an actual socket) + std::fs::write(&socket_path, "stale socket data")?; + + // Create stale PID file with a non-existent PID + std::fs::write(&pid_path, "999999999")?; + + // Verify files exist + assert!(socket_path.exists(), "Stale socket file should exist"); + assert!(pid_path.exists(), "Stale PID file should exist"); + + // Now start a daemon - it should clean up the stale files + let handler = EchoHandler; + let (shutdown_handle, join_handle) = + start_test_daemon(&daemon_name, &root_path, build_timestamp, handler).await; + + // Connect client - should succeed after cleanup + let daemon_exe = PathBuf::from("./target/debug/examples/cli"); + let mut client = DaemonClient::connect_with_name_and_timestamp( + &daemon_name, + &root_path, + daemon_exe, + build_timestamp, + ) + .await?; + + // Execute a command to verify everything works + let result = client.execute_command("test".to_string()).await; + assert!(result.is_ok(), "Command should succeed after cleanup"); + + stop_test_daemon(shutdown_handle, join_handle).await; + Ok(()) +} + +// Test rapid connect/disconnect stress +#[tokio::test] +async fn test_rapid_connect_disconnect_stress() -> Result<()> { + let (daemon_name, root_path) = generate_test_daemon_config(); + let build_timestamp = 1234567914; + let handler = EchoHandler; + + let (shutdown_handle, join_handle) = + start_test_daemon(&daemon_name, &root_path, build_timestamp, handler).await; + + let daemon_exe = PathBuf::from("./target/debug/examples/cli"); + + // Rapid connect/disconnect 50 times + let mut handles = vec![]; + for i in 0..50 { + let daemon_name_clone = daemon_name.clone(); + let root_path_clone = root_path.clone(); + let daemon_exe_clone = daemon_exe.clone(); + let handle = spawn(async move { + let client = DaemonClient::connect_with_name_and_timestamp( + &daemon_name_clone, + &root_path_clone, + daemon_exe_clone, + build_timestamp, + ) + .await; + + // Just connect and immediately drop + if let Ok(mut c) = client { + // Optionally execute a quick command + if i % 5 == 0 { + let _ = c.execute_command(format!("rapid-{}", i)).await; + } + } + Ok::<(), anyhow::Error>(()) + }); + handles.push(handle); + } + + // Wait for all to complete + let mut success_count = 0; + for handle in handles { + if handle.await.is_ok() { + success_count += 1; + } + } + + // Most connections should succeed + assert!( + success_count >= 40, + "At least 40 of 50 rapid connections should succeed, got {}", + success_count + ); + + // Verify server is still stable + sleep(Duration::from_millis(100)).await; + + let mut final_client = DaemonClient::connect_with_name_and_timestamp( + &daemon_name, + &root_path, + daemon_exe, + build_timestamp, + ) + .await?; + + let result = final_client.execute_command("final".to_string()).await; + assert!(result.is_ok(), "Server should still be stable after stress"); + + stop_test_daemon(shutdown_handle, join_handle).await; + Ok(()) +} + +// Test connection limit with immediate rejection (not queueing) +#[tokio::test] +async fn test_connection_limit_immediate_rejection() -> Result<()> { + let (daemon_name, root_path) = generate_test_daemon_config(); + let build_timestamp = 1234567915; + + // Handler that takes a while to complete + #[derive(Clone)] + struct SlowHandler; + + #[async_trait] + impl CommandHandler for SlowHandler { + async fn handle( + &self, + _command: &str, + _terminal_info: TerminalInfo, + mut output: impl AsyncWrite + Send + Unpin, + _cancel: CancellationToken, + ) -> Result { + output.write_all(b"Starting slow operation...\n").await?; + sleep(Duration::from_millis(500)).await; + output.write_all(b"Done\n").await?; + Ok(0) + } + } + + // Start server with connection limit of 2 + let (shutdown_handle, join_handle) = + start_test_daemon_with_limit(&daemon_name, &root_path, build_timestamp, SlowHandler, 2) + .await; + + let daemon_exe = PathBuf::from("./target/debug/examples/cli"); + + // Start 2 slow commands that will hold connections + let mut slow_handles = vec![]; + for i in 0..2 { + let daemon_name_clone = daemon_name.clone(); + let root_path_clone = root_path.clone(); + let daemon_exe_clone = daemon_exe.clone(); + let handle = spawn(async move { + let mut client = DaemonClient::connect_with_name_and_timestamp( + &daemon_name_clone, + &root_path_clone, + daemon_exe_clone, + build_timestamp, + ) + .await?; + client.execute_command(format!("slow-{}", i)).await + }); + slow_handles.push(handle); + } + + // Give them time to start + sleep(Duration::from_millis(100)).await; + + // Try to connect more clients - they should be rejected immediately + let mut rejected_count = 0; + for _ in 0..3 { + let client_result = DaemonClient::connect_with_name_and_timestamp( + &daemon_name, + &root_path, + daemon_exe.clone(), + build_timestamp, + ) + .await; + + if client_result.is_err() { + rejected_count += 1; + } else { + // If connection succeeded, try to execute - should fail + let mut client = client_result.unwrap(); + if client.execute_command("test".to_string()).await.is_err() { + rejected_count += 1; + } + } + } + + // Wait for slow handlers to complete + for handle in slow_handles { + let _ = handle.await; + } + + // At least some connections should have been rejected + assert!( + rejected_count >= 1, + "At least 1 connection should be rejected when at limit, got {}", + rejected_count + ); + + stop_test_daemon(shutdown_handle, join_handle).await; + Ok(()) +} + +// ============================================================================ +// UNIX-SPECIFIC TESTS +// ============================================================================ + +#[cfg(unix)] +#[tokio::test] +async fn test_unix_socket_permissions() -> Result<()> { + use std::os::unix::fs::PermissionsExt; + + let (daemon_name, root_path) = generate_test_daemon_config(); + let build_timestamp = 1234567916; + let handler = EchoHandler; + + let (shutdown_handle, join_handle) = + start_test_daemon(&daemon_name, &root_path, build_timestamp, handler).await; + + // Check socket permissions + let socket_path = daemon_cli::socket_path(&daemon_name, &root_path); + let metadata = std::fs::metadata(&socket_path)?; + let permissions = metadata.permissions(); + let mode = permissions.mode() & 0o777; // Get just the permission bits + + // Socket should have 0600 permissions (owner read/write only) + assert_eq!( + mode, 0o600, + "Socket should have 0600 permissions, got {:o}", + mode + ); + + stop_test_daemon(shutdown_handle, join_handle).await; + Ok(()) +} diff --git a/tests/version_tests.rs b/tests/version_tests.rs index 1e59faf..36d0244 100644 --- a/tests/version_tests.rs +++ b/tests/version_tests.rs @@ -339,3 +339,147 @@ async fn test_concurrent_version_handshakes() -> Result<()> { Ok(()) } + +// ============================================================================ +// ADDITIONAL TESTS - Coverage gaps +// ============================================================================ + +#[tokio::test] +async fn test_version_mismatch_triggers_client_action() -> Result<()> { + // This test verifies that version mismatch is properly detected and reported + // The actual restart logic is in the client, but we verify the server response + let daemon_name = "test-6007"; + let root_path = "/tmp/test-6007"; + let daemon_timestamp = 1000000000; // Old daemon version + let client_timestamp = 2000000000; // New client version + let handler = SimpleHandler; + + // Start server with old timestamp + let (server, _handle) = DaemonServer::new_with_name_and_timestamp( + daemon_name, + root_path, + daemon_timestamp, + handler, + 100, + ); + let _server_handle = spawn(async move { + server.run().await.ok(); + }); + + sleep(Duration::from_millis(100)).await; + + // Connect with newer client timestamp + let mut client = SocketClient::connect(daemon_name, root_path).await?; + + // Send version check with newer timestamp + client + .send_message(&SocketMessage::VersionCheck { + build_timestamp: client_timestamp, + }) + .await?; + + // Server should respond with its own (older) timestamp + let response = client.receive_message::().await?; + + match response { + Some(SocketMessage::VersionCheck { + build_timestamp: daemon_ts, + }) => { + // Verify server reported its actual timestamp + assert_eq!( + daemon_ts, daemon_timestamp, + "Server should report its own timestamp" + ); + // Verify timestamps don't match (client would trigger restart) + assert_ne!(daemon_ts, client_timestamp, "Timestamps should not match"); + } + _ => panic!("Expected VersionCheck response"), + } + + Ok(()) +} + +#[tokio::test] +async fn test_multiple_commands_same_connection() -> Result<()> { + let daemon_name = "test-6008"; + let root_path = "/tmp/test-6008"; + let build_timestamp = 8888888888; + let handler = SimpleHandler; + + // Start server + let (server, _handle) = DaemonServer::new_with_name_and_timestamp( + daemon_name, + root_path, + build_timestamp, + handler, + 100, + ); + let _server_handle = spawn(async move { + server.run().await.ok(); + }); + + sleep(Duration::from_millis(100)).await; + + let mut client = SocketClient::connect(daemon_name, root_path).await?; + + // First, perform version handshake + client + .send_message(&SocketMessage::VersionCheck { build_timestamp }) + .await?; + + let handshake_response = client.receive_message::().await?; + assert!(matches!( + handshake_response, + Some(SocketMessage::VersionCheck { .. }) + )); + + // Send first command + let terminal_info = TerminalInfo { + width: Some(80), + height: Some(24), + is_tty: true, + color_support: ColorSupport::Basic16, + }; + client + .send_message(&SocketMessage::Command { + command: "first command".to_string(), + terminal_info: terminal_info.clone(), + }) + .await?; + + // Receive first command output + let output1 = client.receive_message::().await?; + assert!( + matches!(output1, Some(SocketMessage::OutputChunk(_))), + "Should receive output for first command" + ); + + // Receive CommandComplete + let complete1 = client.receive_message::().await?; + assert!( + matches!( + complete1, + Some(SocketMessage::CommandComplete { exit_code: 0 }) + ), + "First command should complete successfully" + ); + + // Attempt to send a second command on the same connection + // The server uses one-shot semantics: it closes after handling one command + client + .send_message(&SocketMessage::Command { + command: "second command".to_string(), + terminal_info: terminal_info.clone(), + }) + .await?; + + // The connection should be closed by the server, so we expect EOF (None) + let response = client.receive_message::().await?; + assert!( + response.is_none(), + "Connection should be closed after first command (one-shot semantics), got: {:?}", + response + ); + + Ok(()) +}