diff --git a/cortex/cli.py b/cortex/cli.py index 4a997f73..020197cd 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -5,6 +5,7 @@ import sys import time import uuid +from collections.abc import Callable from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING, Any @@ -51,6 +52,7 @@ HELP_SKIP_CONFIRM = "Skip confirmation prompt" if TYPE_CHECKING: + from cortex.daemon_client import DaemonClient, DaemonResponse from cortex.shell_env_analyzer import ShellEnvironmentAnalyzer # Suppress noisy log messages in normal operation @@ -2016,6 +2018,723 @@ def progress_callback(message: str, percent: float) -> None: return 0 + # Daemon Commands + # -------------------------- + + def daemon(self, args: argparse.Namespace) -> int: + """Handle daemon commands: install, uninstall, config, reload-config, version, ping, shutdown. + + Available commands: + - install/uninstall: Manage systemd service files (Python-side) + - config: Get daemon configuration via IPC + - reload-config: Reload daemon configuration via IPC + - version: Get daemon version via IPC + - ping: Test daemon connectivity via IPC + - shutdown: Request daemon shutdown via IPC + - run-tests: Run daemon test suite + """ + action = getattr(args, "daemon_action", None) + + if action == "install": + return self._daemon_install(args) + elif action == "uninstall": + return self._daemon_uninstall(args) + elif action == "config": + return self._daemon_config() + elif action == "reload-config": + return self._daemon_reload_config() + elif action == "version": + return self._daemon_version() + elif action == "ping": + return self._daemon_ping() + elif action == "shutdown": + return self._daemon_shutdown() + elif action == "run-tests": + return self._daemon_run_tests(args) + else: + cx_print("Usage: cortex daemon ", "info") + cx_print("", "info") + cx_print("Available commands:", "info") + cx_print(" install Install and enable the daemon service", "info") + cx_print(" uninstall Remove the daemon service", "info") + cx_print(" config Show daemon configuration", "info") + cx_print(" reload-config Reload daemon configuration", "info") + cx_print(" version Show daemon version", "info") + cx_print(" ping Test daemon connectivity", "info") + cx_print(" shutdown Request daemon shutdown", "info") + cx_print(" run-tests Run daemon test suite", "info") + return 0 + + def _update_history_on_failure( + self, history: InstallationHistory, install_id: str | None, error_msg: str + ) -> None: + """ + Helper method to update installation history on failure. + + Args: + history: InstallationHistory instance. + install_id: Installation ID to update, or None if not available. + error_msg: Error message to record. + """ + if install_id: + try: + history.update_installation(install_id, InstallationStatus.FAILED, error_msg) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + + def _daemon_ipc_call( + self, + operation_name: str, + ipc_func: "Callable[[DaemonClient], DaemonResponse]", + ) -> tuple[bool, "DaemonResponse | None"]: + """ + Helper method for daemon IPC calls with centralized error handling. + + Args: + operation_name: Human-readable name of the operation for error messages. + ipc_func: A callable that takes a DaemonClient and returns a DaemonResponse. + + Returns: + Tuple of (success: bool, response: DaemonResponse | None) + On error, response is None and an error message is printed. + """ + # Initialize audit logging + history = InstallationHistory() + start_time = datetime.now(timezone.utc) + install_id = None + + try: + # Record operation start + install_id = history.record_installation( + InstallationType.CONFIG, + ["cortexd"], + [f"daemon.{operation_name}"], + start_time, + ) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + + try: + from cortex.daemon_client import ( + DaemonClient, + DaemonConnectionError, + DaemonNotInstalledError, + DaemonResponse, + ) + + client = DaemonClient() + response = ipc_func(client) + + # Update history with success/failure + if install_id: + try: + if response and response.success: + history.update_installation(install_id, InstallationStatus.SUCCESS) + else: + error_msg = ( + response.error if response and response.error else "IPC call failed" + ) + history.update_installation( + install_id, InstallationStatus.FAILED, error_msg + ) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + + return True, response + + except DaemonNotInstalledError as e: + error_msg = str(e) + cx_print(f"{error_msg}", "error") + self._update_history_on_failure(history, install_id, error_msg) + return False, None + except DaemonConnectionError as e: + error_msg = str(e) + cx_print(f"{error_msg}", "error") + self._update_history_on_failure(history, install_id, error_msg) + return False, None + except ImportError: + error_msg = "Daemon client not available." + cx_print(error_msg, "error") + self._update_history_on_failure(history, install_id, error_msg) + return False, None + except Exception as e: + error_msg = f"Unexpected error during {operation_name}: {e}" + cx_print(error_msg, "error") + self._update_history_on_failure(history, install_id, error_msg) + return False, None + + def _daemon_install(self, args: argparse.Namespace) -> int: + """Install the cortexd daemon using setup_daemon.py.""" + import subprocess + from pathlib import Path + + cx_header("Installing Cortex Daemon") + + # Find setup_daemon.py + daemon_dir = Path(__file__).parent.parent / "daemon" + setup_script = daemon_dir / "scripts" / "setup_daemon.py" + + if not setup_script.exists(): + error_msg = f"Setup script not found at {setup_script}" + cx_print(error_msg, "error") + cx_print("Please ensure the daemon directory is present.", "error") + return 1 + + execute = getattr(args, "execute", False) + + if not execute: + cx_print("This will build and install the cortexd daemon.", "info") + cx_print("", "info") + cx_print("The setup wizard will:", "info") + cx_print(" 1. Check and install build dependencies", "info") + cx_print(" 2. Build the daemon from source", "info") + cx_print(" 3. Install systemd service files", "info") + cx_print(" 4. Enable and start the service", "info") + cx_print("", "info") + cx_print("Run with --execute to proceed:", "info") + cx_print(" cortex daemon install --execute", "dim") + # Don't record dry-runs in audit history + return 0 + + # Initialize audit logging only when execution will actually run + history = InstallationHistory() + start_time = datetime.now(timezone.utc) + install_id = None + + try: + # Record operation start + install_id = history.record_installation( + InstallationType.CONFIG, + ["cortexd"], + ["cortex daemon install"], + start_time, + ) + except Exception as e: + cx_print(f"Warning: Could not initialize audit logging: {e}", "warning") + + # Run setup_daemon.py + cx_print("Running daemon setup wizard...", "info") + try: + result = subprocess.run( + [sys.executable, str(setup_script)], + check=False, + ) + + # Record completion + if install_id: + try: + if result.returncode == 0: + history.update_installation(install_id, InstallationStatus.SUCCESS) + else: + error_msg = f"Setup script returned exit code {result.returncode}" + history.update_installation( + install_id, InstallationStatus.FAILED, error_msg + ) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + + return result.returncode + except subprocess.SubprocessError as e: + error_msg = f"Subprocess error during daemon install: {str(e)}" + cx_print(error_msg, "error") + if install_id: + try: + history.update_installation(install_id, InstallationStatus.FAILED, error_msg) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + return 1 + except Exception as e: + error_msg = f"Unexpected error during daemon install: {str(e)}" + cx_print(error_msg, "error") + if install_id: + try: + history.update_installation(install_id, InstallationStatus.FAILED, error_msg) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + return 1 + + def _daemon_uninstall(self, args: argparse.Namespace) -> int: + """Uninstall the cortexd daemon.""" + import subprocess + from pathlib import Path + + cx_header("Uninstalling Cortex Daemon") + + execute = getattr(args, "execute", False) + + if not execute: + cx_print("This will stop and remove the cortexd daemon.", "warning") + cx_print("", "info") + cx_print("This will:", "info") + cx_print(" 1. Stop the cortexd service", "info") + cx_print(" 2. Disable the service", "info") + cx_print(" 3. Remove systemd unit files", "info") + cx_print(" 4. Remove the daemon binary", "info") + cx_print("", "info") + cx_print("Run with --execute to proceed:", "info") + cx_print(" cortex daemon uninstall --execute", "dim") + # Don't record dry-runs in audit history + return 0 + + # Initialize audit logging only when execution will actually run + history = InstallationHistory() + start_time = datetime.now(timezone.utc) + install_id = None + + try: + # Record operation start + install_id = history.record_installation( + InstallationType.CONFIG, + ["cortexd"], + ["cortex daemon uninstall"], + start_time, + ) + except Exception as e: + cx_print(f"Warning: Could not initialize audit logging: {e}", "warning") + + # Find uninstall script + daemon_dir = Path(__file__).parent.parent / "daemon" + uninstall_script = daemon_dir / "scripts" / "uninstall.sh" + + if uninstall_script.exists(): + cx_print("Running uninstall script...", "info") + try: + # Security: Lock down script permissions before execution + # Set read-only permissions for non-root users to prevent tampering + import stat + + script_stat = uninstall_script.stat() + # Remove write permissions for group and others, keep owner read/execute + uninstall_script.chmod(stat.S_IRUSR | stat.S_IXUSR) + + result = subprocess.run( + ["sudo", "bash", str(uninstall_script)], + check=False, + capture_output=True, + text=True, + ) + + # Record completion + if install_id: + try: + if result.returncode == 0: + history.update_installation(install_id, InstallationStatus.SUCCESS) + else: + error_msg = f"Uninstall script returned exit code {result.returncode}" + if result.stderr: + error_msg += f": {result.stderr[:500]}" + history.update_installation( + install_id, InstallationStatus.FAILED, error_msg + ) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + + return result.returncode + except subprocess.SubprocessError as e: + error_msg = f"Subprocess error during daemon uninstall: {str(e)}" + cx_print(error_msg, "error") + if install_id: + try: + history.update_installation( + install_id, InstallationStatus.FAILED, error_msg + ) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + return 1 + except Exception as e: + error_msg = f"Unexpected error during daemon uninstall: {str(e)}" + cx_print(error_msg, "error") + if install_id: + try: + history.update_installation( + install_id, InstallationStatus.FAILED, error_msg + ) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + return 1 + else: + # Manual uninstall + cx_print("Running manual uninstall...", "info") + commands = [ + ["sudo", "systemctl", "stop", "cortexd"], + ["sudo", "systemctl", "disable", "cortexd"], + ["sudo", "rm", "-f", "/etc/systemd/system/cortexd.service"], + ["sudo", "rm", "-f", "/etc/systemd/system/cortexd.socket"], + ["sudo", "rm", "-f", "/usr/local/bin/cortexd"], + ["sudo", "systemctl", "daemon-reload"], + ] + + try: + any_failed = False + error_messages = [] + + for cmd in commands: + cmd_str = " ".join(cmd) + cx_print(f" Running: {cmd_str}", "dim") + + # Update installation history with command info (append to existing record) + if install_id: + try: + # Append command info to existing installation record + # instead of creating orphan records + history.update_installation( + install_id, + InstallationStatus.IN_PROGRESS, + f"Executing: {cmd_str}", + ) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + + result = subprocess.run(cmd, check=False, capture_output=True, text=True) + + # Track failures + if result.returncode != 0: + any_failed = True + error_msg = ( + f"Command '{cmd_str}' failed with return code {result.returncode}" + ) + if result.stderr: + error_msg += f": {result.stderr[:500]}" + error_messages.append(error_msg) + cx_print(f" Failed: {error_msg}", "error") + + # Update history and return based on overall success + if any_failed: + combined_error = "; ".join(error_messages) + cx_print("Daemon uninstall failed.", "error") + if install_id: + try: + history.update_installation( + install_id, InstallationStatus.FAILED, combined_error + ) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + return 1 + else: + cx_print("Daemon uninstalled.", "success") + # Record success + if install_id: + try: + history.update_installation(install_id, InstallationStatus.SUCCESS) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + return 0 + except subprocess.SubprocessError as e: + error_msg = f"Subprocess error during manual uninstall: {str(e)}" + cx_print(error_msg, "error") + if install_id: + try: + history.update_installation( + install_id, InstallationStatus.FAILED, error_msg + ) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + return 1 + except Exception as e: + error_msg = f"Unexpected error during manual uninstall: {str(e)}" + cx_print(error_msg, "error") + if install_id: + try: + history.update_installation( + install_id, InstallationStatus.FAILED, error_msg + ) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + return 1 + + def _daemon_config(self) -> int: + """Get daemon configuration via IPC.""" + from rich.table import Table + + cx_header("Daemon Configuration") + + success, response = self._daemon_ipc_call("config.get", lambda c: c.config_get()) + if not success: + return 1 + + if response.success and response.result: + table = Table(title="Current Configuration", show_header=True) + table.add_column("Setting", style="cyan") + table.add_column("Value", style="green") + + for key, value in response.result.items(): + table.add_row(key, str(value)) + + console.print(table) + return 0 + else: + cx_print(f"Failed to get config: {response.error}", "error") + return 1 + + def _daemon_reload_config(self) -> int: + """Reload daemon configuration via IPC.""" + cx_header("Reloading Daemon Configuration") + + success, response = self._daemon_ipc_call("config.reload", lambda c: c.config_reload()) + if not success: + return 1 + + if response.success: + cx_print("Configuration reloaded successfully!", "success") + return 0 + else: + cx_print(f"Failed to reload config: {response.error}", "error") + return 1 + + def _daemon_version(self) -> int: + """Get daemon version via IPC.""" + cx_header("Daemon Version") + + success, response = self._daemon_ipc_call("version", lambda c: c.version()) + if not success: + return 1 + + if response.success and response.result: + name = response.result.get("name", "cortexd") + version = response.result.get("version", "unknown") + cx_print(f"{name} version {version}", "success") + return 0 + else: + cx_print(f"Failed to get version: {response.error}", "error") + return 1 + + def _daemon_ping(self) -> int: + """Test daemon connectivity via IPC.""" + import time + + cx_header("Daemon Ping") + + start = time.time() + success, response = self._daemon_ipc_call("ping", lambda c: c.ping()) + elapsed = (time.time() - start) * 1000 # ms + + if not success: + return 1 + + if response.success: + cx_print(f"Pong! Response time: {elapsed:.1f}ms", "success") + return 0 + else: + cx_print(f"Ping failed: {response.error}", "error") + return 1 + + def _daemon_shutdown(self) -> int: + """Request daemon shutdown via IPC.""" + cx_header("Requesting Daemon Shutdown") + + success, response = self._daemon_ipc_call("shutdown", lambda c: c.shutdown()) + if not success: + return 1 + + if response.success: + cx_print("Daemon shutdown requested successfully!", "success") + return 0 + cx_print(f"Failed to request shutdown: {response.error}", "error") + return 1 + + def _daemon_run_tests(self, args: argparse.Namespace) -> int: + """Run the daemon test suite.""" + import subprocess + + cx_header("Daemon Tests") + + # Initialize audit logging + history = InstallationHistory() + start_time = datetime.now(timezone.utc) + install_id = None + + try: + # Record operation start + install_id = history.record_installation( + InstallationType.CONFIG, + ["cortexd"], + ["daemon.run-tests"], + start_time, + ) + except Exception: + # Continue even if audit logging fails + pass + + # Find daemon directory + daemon_dir = Path(__file__).parent.parent / "daemon" + build_dir = daemon_dir / "build" + tests_dir = build_dir / "tests" # Test binaries are in build/tests/ + + # Define test binaries + unit_tests = [ + "test_config", + "test_protocol", + "test_rate_limiter", + "test_logger", + "test_common", + ] + integration_tests = ["test_ipc_server", "test_handlers", "test_daemon"] + all_tests = unit_tests + integration_tests + + # Check if tests are built + def check_tests_built() -> tuple[bool, list[str]]: + """Check which test binaries exist.""" + existing = [] + for test in all_tests: + if (tests_dir / test).exists(): + existing.append(test) + return len(existing) > 0, existing + + tests_built, existing_tests = check_tests_built() + + if not tests_built: + error_msg = "Tests are not built." + cx_print(error_msg, "warning") + cx_print("", "info") + cx_print("To build tests, run the setup wizard with test building enabled:", "info") + cx_print("", "info") + cx_print(" [bold]python daemon/scripts/setup_daemon.py[/bold]", "info") + cx_print("", "info") + cx_print("When prompted, answer 'yes' to build the test suite.", "info") + cx_print("", "info") + cx_print("Or build manually:", "info") + cx_print(" cd daemon && ./scripts/build.sh Release --with-tests", "dim") + if install_id: + try: + history.update_installation(install_id, InstallationStatus.FAILED, error_msg) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + return 1 + + # Determine which tests to run + test_filter = getattr(args, "test", None) + run_unit = getattr(args, "unit", False) + run_integration = getattr(args, "integration", False) + verbose = getattr(args, "verbose", False) + + tests_to_run = [] + + if test_filter: + # Run a specific test + # Allow partial matching (e.g., "config" matches "test_config") + test_name = test_filter if test_filter.startswith("test_") else f"test_{test_filter}" + if test_name in existing_tests: + tests_to_run = [test_name] + else: + error_msg = f"Test '{test_filter}' not found or not built." + cx_print(error_msg, "error") + cx_print("", "info") + cx_print("Available tests:", "info") + for t in existing_tests: + cx_print(f" • {t}", "info") + if install_id: + try: + history.update_installation( + install_id, InstallationStatus.FAILED, error_msg + ) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + return 1 + elif run_unit and not run_integration: + tests_to_run = [t for t in unit_tests if t in existing_tests] + if not tests_to_run: + error_msg = "No unit tests built." + cx_print(error_msg, "warning") + if install_id: + try: + history.update_installation( + install_id, InstallationStatus.FAILED, error_msg + ) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + return 1 + elif run_integration and not run_unit: + tests_to_run = [t for t in integration_tests if t in existing_tests] + if not tests_to_run: + error_msg = "No integration tests built." + cx_print(error_msg, "warning") + if install_id: + try: + history.update_installation( + install_id, InstallationStatus.FAILED, error_msg + ) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + return 1 + else: + # Run all available tests + tests_to_run = existing_tests + + # Show what we're running + cx_print(f"Running {len(tests_to_run)} test(s)...", "info") + cx_print("", "info") + + # Use ctest for running tests + ctest_args = ["ctest", "--output-on-failure"] + + if verbose: + ctest_args.append("-V") + + # Filter specific tests if not running all + if test_filter or run_unit or run_integration: + # ctest uses -R for regex filtering + test_regex = "|".join(tests_to_run) + ctest_args.extend(["-R", test_regex]) + + try: + result = subprocess.run( + ctest_args, + cwd=str(build_dir), + check=False, + ) + + if result.returncode == 0: + cx_print("", "info") + cx_print("All tests passed!", "success") + if install_id: + try: + history.update_installation(install_id, InstallationStatus.SUCCESS) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + return 0 + else: + error_msg = f"Test execution failed with return code {result.returncode}" + cx_print("", "info") + cx_print("Some tests failed.", "error") + if install_id: + try: + history.update_installation( + install_id, InstallationStatus.FAILED, error_msg + ) + except Exception: + # Continue even if audit logging fails - don't break the main flow + pass + return 1 + except subprocess.SubprocessError as e: + error_msg = f"Subprocess error during test execution: {str(e)}" + cx_print(error_msg, "error") + self._update_history_on_failure(history, install_id, error_msg) + return 1 + except Exception as e: + error_msg = f"Unexpected error during test execution: {str(e)}" + cx_print(error_msg, "error") + self._update_history_on_failure(history, install_id, error_msg) + return 1 + def benchmark(self, verbose: bool = False): """Run AI performance benchmark and display scores""" from cortex.benchmark import run_benchmark @@ -3246,6 +3965,7 @@ def show_rich_help(): table.add_row("docker permissions", "Fix Docker bind-mount permissions") table.add_row("sandbox ", "Test packages in Docker sandbox") table.add_row("update", "Check for and install updates") + table.add_row("daemon ", "Manage the cortexd background daemon") table.add_row("doctor", "System health check") table.add_row("troubleshoot", "Interactive system troubleshooter") @@ -3580,6 +4300,62 @@ def main(): # config show - show all configuration config_subs.add_parser("show", help="Show all current configuration") + # --- Daemon Commands --- + daemon_parser = subparsers.add_parser("daemon", help="Manage the cortexd background daemon") + daemon_subs = daemon_parser.add_subparsers(dest="daemon_action", help="Daemon actions") + + # daemon install [--execute] + daemon_install_parser = daemon_subs.add_parser( + "install", help="Install and enable the daemon service" + ) + daemon_install_parser.add_argument( + "--execute", action="store_true", help="Actually run the installation" + ) + + # daemon uninstall [--execute] + daemon_uninstall_parser = daemon_subs.add_parser( + "uninstall", help="Stop and remove the daemon service" + ) + daemon_uninstall_parser.add_argument( + "--execute", action="store_true", help="Actually run the uninstallation" + ) + + # daemon config - uses config.get IPC handler + daemon_subs.add_parser("config", help="Show current daemon configuration") + + # daemon reload-config - uses config.reload IPC handler + daemon_subs.add_parser("reload-config", help="Reload daemon configuration from disk") + + # daemon version - uses version IPC handler + daemon_subs.add_parser("version", help="Show daemon version") + + # daemon ping - uses ping IPC handler + daemon_subs.add_parser("ping", help="Test daemon connectivity") + + # daemon shutdown - uses shutdown IPC handler + daemon_subs.add_parser("shutdown", help="Request daemon shutdown") + + # daemon run-tests - run daemon test suite + daemon_run_tests_parser = daemon_subs.add_parser( + "run-tests", + help="Run daemon test suite (runs all tests by default when no filters are provided)", + ) + daemon_run_tests_parser.add_argument("--unit", action="store_true", help="Run only unit tests") + daemon_run_tests_parser.add_argument( + "--integration", action="store_true", help="Run only integration tests" + ) + daemon_run_tests_parser.add_argument( + "--test", + "-t", + type=str, + metavar="NAME", + help="Run a specific test (e.g., test_config, test_daemon)", + ) + daemon_run_tests_parser.add_argument( + "--verbose", "-v", action="store_true", help="Show verbose test output" + ) + # -------------------------- + # --- Sandbox Commands (Docker-based package testing) --- sandbox_parser = subparsers.add_parser( "sandbox", help="Test packages in isolated Docker sandbox" @@ -4087,6 +4863,8 @@ def main(): return 0 if activate_license(args.license_key) else 1 elif args.command == "update": return cli.update(args) + elif args.command == "daemon": + return cli.daemon(args) elif args.command == "wifi": from cortex.wifi_driver import run_wifi_driver diff --git a/cortex/installation_history.py b/cortex/installation_history.py index 38716f85..fdcf223d 100644 --- a/cortex/installation_history.py +++ b/cortex/installation_history.py @@ -15,6 +15,7 @@ import subprocess import sys from dataclasses import asdict, dataclass +from datetime import timezone from enum import Enum from pathlib import Path @@ -150,7 +151,12 @@ def _get_package_info(self, package_name: str) -> PackageSnapshot | None: """Get current state of a package""" # Check if package is installed success, stdout, _ = self._run_command( - ["dpkg-query", "-W", "-f=${Status}|${Version}", package_name] + [ + "dpkg-query", + "-W", + "-f=${Status}|${Version}", + package_name, + ] ) if not success: @@ -288,7 +294,7 @@ def record_installation( cursor.execute( """ INSERT INTO installations VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, + """, ( install_id, timestamp, @@ -304,7 +310,7 @@ def record_installation( ), ) - conn.commit() + conn.commit() logger.info(f"Installation {install_id} recorded") return install_id @@ -331,32 +337,35 @@ def update_installation( return packages = json.loads(result[0]) - start_time = datetime.datetime.fromisoformat(result[1]) - duration = (datetime.datetime.now() - start_time).total_seconds() - - # Create after snapshot - after_snapshot = self._create_snapshot(packages) - - # Update record - cursor.execute( - """ - UPDATE installations - SET status = ?, - after_snapshot = ?, - error_message = ?, - duration_seconds = ? - WHERE id = ? - """, - ( - status.value, - json.dumps([asdict(s) for s in after_snapshot]), - error_message, - duration, - install_id, - ), - ) + start_time = datetime.datetime.fromisoformat(result[1]) + # Normalize start_time to UTC if it's naive + if start_time.tzinfo is None: + start_time = start_time.replace(tzinfo=timezone.utc) + duration = (datetime.datetime.now(timezone.utc) - start_time).total_seconds() + + # Create after snapshot + after_snapshot = self._create_snapshot(packages) + + # Update record + cursor.execute( + """ + UPDATE installations + SET status = ?, + after_snapshot = ?, + error_message = ?, + duration_seconds = ? + WHERE id = ? + """, + ( + status.value, + json.dumps([asdict(s) for s in after_snapshot]), + error_message, + duration, + install_id, + ), + ) - conn.commit() + conn.commit() logger.info(f"Installation {install_id} updated: {status.value}") except Exception as e: @@ -590,7 +599,15 @@ def export_history(self, filepath: str, format: str = "json"): with open(filepath, "w", newline="") as f: writer = csv.writer(f) writer.writerow( - ["ID", "Timestamp", "Operation", "Packages", "Status", "Duration", "Error"] + [ + "ID", + "Timestamp", + "Operation", + "Packages", + "Status", + "Duration", + "Error", + ] ) for r in history: diff --git a/tests/test_daemon_commands.py b/tests/test_daemon_commands.py new file mode 100644 index 00000000..621b4eef --- /dev/null +++ b/tests/test_daemon_commands.py @@ -0,0 +1,392 @@ +""" +Tests for daemon command functionality in CortexCLI. + +Tests cover: +- install, uninstall, config, reload-config, version, ping, shutdown, run-tests +""" + +import os +import sys +import tempfile +import unittest +from pathlib import Path +from unittest.mock import ANY, MagicMock, Mock, patch + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from cortex.cli import CortexCLI + + +class TestDaemonCommands(unittest.TestCase): + def setUp(self) -> None: + self.cli = CortexCLI() + self._temp_dir = tempfile.TemporaryDirectory() + self._temp_home = Path(self._temp_dir.name) + + def tearDown(self) -> None: + self._temp_dir.cleanup() + + def _create_mock_uninstall_script(self, exists=True): + """Helper to create a mock uninstall script Path object.""" + mock_uninstall_script = Mock() + mock_uninstall_script.exists.return_value = exists + mock_uninstall_script.chmod = Mock() + mock_uninstall_script.stat.return_value = Mock() + mock_uninstall_script.__str__ = lambda x: "/path/to/uninstall.sh" + mock_uninstall_script.__fspath__ = lambda x: "/path/to/uninstall.sh" + return mock_uninstall_script + + def _setup_path_side_effect(self, mock_path_class, mock_uninstall_script): + """Helper to set up Path class side effect.""" + + def path_side_effect(*args, **kwargs): + path_str = str(args[0]) if args else "" + if "uninstall.sh" in path_str: + return mock_uninstall_script + # Return a regular mock for other paths + mock_path = Mock() + mock_path.exists.return_value = False + return mock_path + + mock_path_class.side_effect = path_side_effect + + def _setup_subprocess_side_effect(self, mock_subprocess, handle_script=False): + """Helper to set up subprocess side effect.""" + + def subprocess_side_effect(*args, **kwargs): + mock_result = Mock() + if "dpkg-query" in str(args[0]): + # InstallationHistory call + mock_result.returncode = 0 + mock_result.stdout = "install ok installed|1.0.0" + mock_result.stderr = "" + elif handle_script and "bash" in str(args[0]) and "uninstall" in str(args[0]): + # Uninstall script execution + mock_result.returncode = 0 + mock_result.stderr = "" + else: + # Manual uninstall commands or other calls + mock_result.returncode = 0 + mock_result.stderr = "" + return mock_result + + mock_subprocess.side_effect = subprocess_side_effect + + def test_daemon_no_action(self): + """Test daemon command with no action shows help.""" + args = Mock() + args.daemon_action = None + result = self.cli.daemon(args) + self.assertEqual(result, 0) + + @patch("cortex.cli.cx_header") + @patch("cortex.cli.cx_print") + @patch("cortex.cli.Path.exists") + @patch("subprocess.run") + def test_daemon_install_dry_run(self, mock_subprocess, mock_exists, mock_print, mock_header): + """Test daemon install without --execute flag (dry run).""" + args = Mock() + args.execute = False + args.daemon_action = "install" + mock_exists.return_value = True + + # Mock subprocess for InstallationHistory calls + mock_result = Mock() + mock_result.returncode = 0 + mock_result.stdout = "install ok installed|1.0.0" + mock_result.stderr = "" + mock_subprocess.return_value = mock_result + + result = self.cli._daemon_install(args) + self.assertEqual(result, 0) + # Should not call subprocess for setup script (only for package checks) + # Allow for InstallationHistory calls but verify no setup script execution + setup_calls = [ + call for call in mock_subprocess.call_args_list if "setup_daemon.py" in str(call) + ] + self.assertEqual(len(setup_calls), 0) + + @patch("cortex.cli.cx_header") + @patch("cortex.cli.cx_print") + @patch("cortex.cli.Path.exists") + def test_daemon_install_script_not_found(self, mock_exists, mock_print, mock_header): + """Test daemon install when setup script is missing.""" + args = Mock() + args.execute = True + args.daemon_action = "install" + mock_exists.return_value = False + + result = self.cli._daemon_install(args) + self.assertEqual(result, 1) + + @patch("cortex.cli.cx_header") + @patch("cortex.cli.cx_print") + @patch("cortex.cli.Path.exists") + @patch("subprocess.run") + def test_daemon_uninstall_dry_run(self, mock_subprocess, mock_exists, mock_print, mock_header): + """Test daemon uninstall without --execute flag (dry run).""" + args = Mock() + args.execute = False + args.daemon_action = "uninstall" + mock_exists.return_value = True + + # Mock subprocess for InstallationHistory calls + mock_result = Mock() + mock_result.returncode = 0 + mock_result.stdout = "install ok installed|1.0.0" + mock_result.stderr = "" + mock_subprocess.return_value = mock_result + + result = self.cli._daemon_uninstall(args) + self.assertEqual(result, 0) + # Should not call subprocess for uninstall (only for package checks) + # Allow for InstallationHistory calls but verify no uninstall execution + uninstall_calls = [ + call + for call in mock_subprocess.call_args_list + if "uninstall" in str(call) or "systemctl" in str(call) + ] + self.assertEqual(len(uninstall_calls), 0) + + @patch("cortex.cli.cx_header") + @patch("cortex.cli.cx_print") + @patch("cortex.cli.Path") + @patch("subprocess.run") + def test_daemon_uninstall_with_script( + self, mock_subprocess, mock_path_class, mock_print, mock_header + ): + """Test daemon uninstall with uninstall script.""" + args = Mock() + args.execute = True + args.daemon_action = "uninstall" + + # Create a mock Path object + mock_uninstall_script = self._create_mock_uninstall_script(exists=True) + self._setup_path_side_effect(mock_path_class, mock_uninstall_script) + self._setup_subprocess_side_effect(mock_subprocess, handle_script=True) + + result = self.cli._daemon_uninstall(args) + self.assertEqual(result, 0) + # Should have called subprocess for both package check and script execution + self.assertGreater(mock_subprocess.call_count, 0) + + @patch("cortex.cli.cx_header") + @patch("cortex.cli.cx_print") + @patch("cortex.cli.Path") + @patch("subprocess.run") + def test_daemon_uninstall_manual( + self, mock_subprocess, mock_path_class, mock_print, mock_header + ): + """Test daemon uninstall with manual commands (no script).""" + args = Mock() + args.execute = True + args.daemon_action = "uninstall" + + # Create a mock Path object for uninstall script that doesn't exist + mock_uninstall_script = self._create_mock_uninstall_script(exists=False) + self._setup_path_side_effect(mock_path_class, mock_uninstall_script) + self._setup_subprocess_side_effect(mock_subprocess, handle_script=False) + + result = self.cli._daemon_uninstall(args) + self.assertEqual(result, 0) + # Should have called subprocess multiple times for manual commands + self.assertGreater(mock_subprocess.call_count, 1) + + @patch("cortex.cli.CortexCLI._daemon_ipc_call") + def test_daemon_config_success(self, mock_ipc_call): + """Test daemon config command with successful response.""" + from cortex.daemon_client import DaemonResponse + + mock_response = DaemonResponse( + success=True, + result={"socket_path": "/run/cortex/cortex.sock", "log_level": "info"}, + ) + mock_ipc_call.return_value = (True, mock_response) + + result = self.cli._daemon_config() + self.assertEqual(result, 0) + mock_ipc_call.assert_called_once() + + @patch("cortex.cli.CortexCLI._daemon_ipc_call") + def test_daemon_config_failure(self, mock_ipc_call): + """Test daemon config command with failed response.""" + from cortex.daemon_client import DaemonResponse + + mock_response = DaemonResponse(success=False, error="Connection failed") + mock_ipc_call.return_value = (True, mock_response) + + result = self.cli._daemon_config() + self.assertEqual(result, 1) + + @patch("cortex.cli.CortexCLI._daemon_ipc_call") + def test_daemon_config_connection_error(self, mock_ipc_call): + """Test daemon config command when connection fails.""" + mock_ipc_call.return_value = (False, None) + + result = self.cli._daemon_config() + self.assertEqual(result, 1) + + @patch("cortex.cli.CortexCLI._daemon_ipc_call") + def test_daemon_reload_config_success(self, mock_ipc_call): + """Test daemon reload-config command with successful response.""" + from cortex.daemon_client import DaemonResponse + + mock_response = DaemonResponse(success=True, result={"reloaded": True}) + mock_ipc_call.return_value = (True, mock_response) + + result = self.cli._daemon_reload_config() + self.assertEqual(result, 0) + + @patch("cortex.cli.CortexCLI._daemon_ipc_call") + def test_daemon_reload_config_failure(self, mock_ipc_call): + """Test daemon reload-config command with failed response.""" + from cortex.daemon_client import DaemonResponse + + mock_response = DaemonResponse(success=False, error="Config reload failed") + mock_ipc_call.return_value = (True, mock_response) + + result = self.cli._daemon_reload_config() + self.assertEqual(result, 1) + + @patch("cortex.cli.CortexCLI._daemon_ipc_call") + def test_daemon_version_success(self, mock_ipc_call): + """Test daemon version command with successful response.""" + from cortex.daemon_client import DaemonResponse + + mock_response = DaemonResponse(success=True, result={"version": "1.0.0", "name": "cortexd"}) + mock_ipc_call.return_value = (True, mock_response) + + result = self.cli._daemon_version() + self.assertEqual(result, 0) + + @patch("cortex.cli.CortexCLI._daemon_ipc_call") + def test_daemon_ping_success(self, mock_ipc_call): + """Test daemon ping command with successful response.""" + from cortex.daemon_client import DaemonResponse + + mock_response = DaemonResponse(success=True, result={"pong": True}) + mock_ipc_call.return_value = (True, mock_response) + + result = self.cli._daemon_ping() + self.assertEqual(result, 0) + + @patch("cortex.cli.CortexCLI._daemon_ipc_call") + def test_daemon_shutdown_success(self, mock_ipc_call): + """Test daemon shutdown command with successful response.""" + from cortex.daemon_client import DaemonResponse + + mock_response = DaemonResponse(success=True, result={"shutdown": "initiated"}) + mock_ipc_call.return_value = (True, mock_response) + + result = self.cli._daemon_shutdown() + self.assertEqual(result, 0) + + @patch("cortex.cli.cx_header") + @patch("cortex.cli.cx_print") + @patch("cortex.cli.Path.exists") + def test_daemon_run_tests_not_built(self, mock_exists, mock_print, mock_header): + """Test daemon run-tests when tests are not built.""" + args = Mock() + args.test = None + args.unit = False + args.integration = False + args.verbose = False + + # Mock tests directory doesn't exist + mock_exists.return_value = False + + result = self.cli._daemon_run_tests(args) + self.assertEqual(result, 1) + + @patch("cortex.cli.cx_header") + @patch("cortex.cli.cx_print") + @patch("cortex.cli.Path.exists") + @patch("subprocess.run") + def test_daemon_run_tests_success(self, mock_subprocess, mock_exists, mock_print, mock_header): + """Test daemon run-tests with successful execution.""" + args = Mock() + args.test = None + args.unit = False + args.integration = False + args.verbose = False + + # Mock Path.exists to return True for test files + # _daemon_run_tests checks (tests_dir / test).exists() for each test + # We need to return True when checking for test file existence + mock_exists.return_value = True + + # Mock subprocess result + mock_result = Mock() + mock_result.returncode = 0 + mock_subprocess.return_value = mock_result + + result = self.cli._daemon_run_tests(args) + self.assertEqual(result, 0) + mock_subprocess.assert_called() + + @patch("cortex.cli.cx_print") + @patch("cortex.cli.InstallationHistory") + @patch("cortex.daemon_client.DaemonClient") + def test_daemon_ipc_call_success( + self, mock_daemon_client_class, mock_history_class, mock_print + ): + """Test _daemon_ipc_call helper with successful IPC call.""" + from cortex.daemon_client import DaemonResponse + + # Setup mocks + mock_history = Mock() + mock_history_class.return_value = mock_history + mock_history.record_installation.return_value = "test-install-id" + + mock_client = Mock() + mock_daemon_client_class.return_value = mock_client + + mock_response = DaemonResponse(success=True, result={"test": "data"}) + + # Create a mock IPC function that uses the client and returns the response + def mock_ipc_func(client): + # Verify the client is passed correctly + self.assertIs(client, mock_client) + return mock_response + + # Test _daemon_ipc_call directly + success, response = self.cli._daemon_ipc_call("test_operation", mock_ipc_func) + + # Verify results + self.assertTrue(success) + self.assertIsNotNone(response) + self.assertEqual(response, mock_response) + mock_daemon_client_class.assert_called_once() + mock_history.record_installation.assert_called_once() + mock_history.update_installation.assert_called_once_with("test-install-id", ANY) + + @patch("cortex.cli.InstallationHistory") + def test_update_history_on_failure(self, mock_history_class): + """Test _update_history_on_failure helper method.""" + mock_history = Mock() + mock_history_class.return_value = mock_history + + history = mock_history + install_id = "123" + error_msg = "Test error" + + self.cli._update_history_on_failure(history, install_id, error_msg) + mock_history.update_installation.assert_called_once_with(install_id, ANY, error_msg) + + @patch("cortex.cli.InstallationHistory") + def test_update_history_on_failure_no_id(self, mock_history_class): + """Test _update_history_on_failure with no install_id.""" + mock_history = Mock() + mock_history_class.return_value = mock_history + + history = mock_history + install_id = None + error_msg = "Test error" + + self.cli._update_history_on_failure(history, install_id, error_msg) + # Should not call update_installation when install_id is None + mock_history.update_installation.assert_not_called() + + +if __name__ == "__main__": + unittest.main()