diff --git a/README.md b/README.md index 24db1c178..913d3cecd 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ cortex install "tools for video compression" | **Docker Permission Fixer** | Fix root-owned bind mount issues automatically | | **Audit Trail** | Complete history in `~/.cortex/history.db` | | **Hardware-Aware** | Detects GPU, CPU, memory for optimized packages | +| **System Cloning** | Capture, version, and replicate system states via templates | | **Multi-LLM Support** | Works with Claude, GPT-4, or local Ollama models | --- @@ -165,6 +166,9 @@ cortex history # Rollback an installation cortex rollback + +# Create a system backup template +cortex template create base-state --description "Baseline configuration" ``` ### Role Management @@ -190,6 +194,7 @@ cortex role set | `cortex sandbox ` | Test packages in Docker sandbox | | `cortex history` | View all past installations | | `cortex rollback ` | Undo a specific installation | +| `cortex template ` | System cloning and templating (create, deploy, list, show, delete, export, import) | | `cortex --version` | Show version information | | `cortex --help` | Display help message | @@ -414,6 +419,7 @@ pip install -e . - [x] Dry-run preview mode - [x] Docker bind-mount permission fixer - [x] Automatic Role Discovery (AI-driven system context sensing) +- [x] System Cloning & Templating (Capture, Version, Clone) ### In Progress - [ ] Conflict resolution UI diff --git a/cortex/cli.py b/cortex/cli.py index 020197cdb..6afc97791 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -8,7 +8,7 @@ from collections.abc import Callable from datetime import datetime, timezone from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Optional from rich.markdown import Markdown @@ -24,13 +24,7 @@ format_package_list, ) from cortex.env_manager import EnvironmentManager, get_env_manager -from cortex.i18n import ( - SUPPORTED_LANGUAGES, - LanguageConfig, - get_language, - set_language, - t, -) +from cortex.i18n import SUPPORTED_LANGUAGES, LanguageConfig, get_language, set_language, t from cortex.installation_history import InstallationHistory, InstallationStatus, InstallationType from cortex.llm.interpreter import CommandInterpreter from cortex.network_config import NetworkConfig @@ -2747,6 +2741,196 @@ def systemd(self, service: str, action: str = "status", verbose: bool = False): return run_systemd_helper(service, action, verbose) + def template(self, args: argparse.Namespace) -> int: + """Handle template commands""" + from cortex.template_manager import TemplateManager + + manager = TemplateManager() + cmd = args.template_command + + if not cmd: + console.print( + "[yellow]Usage: cortex template [create|deploy|list|show|delete|export|import][/yellow]" + ) + return 1 + + if cmd == "create": + return self._template_create(manager, args) + elif cmd == "list": + return self._template_list(manager) + elif cmd == "show": + return self._template_show(manager, args) + elif cmd == "deploy": + return self._template_deploy(manager, args) + elif cmd == "delete": + return self._template_delete(manager, args) + elif cmd == "export": + return self._template_export(manager, args) + elif cmd == "import": + return self._template_import(manager, args) + + return 0 + + def _parse_template_spec(self, name_spec: str) -> tuple[str, str | None]: + """Parse template name and version from spec (name:v1 or name-v1).""" + if ":" in name_spec: + parts = name_spec.split(":", 1) + return parts[0], parts[1] + elif "-v" in name_spec: + idx = name_spec.rfind("-v") + return name_spec[:idx], name_spec[idx + 1 :] + return name_spec, None + + def _template_create(self, manager: Any, args: argparse.Namespace) -> int: + """Handle 'template create' command.""" + console.print("📸 [bold]Capturing system state...[/bold]") + version = manager.create_template(args.name, args.description, package_sources=args.sources) + + template_data = manager.get_template(args.name, version) + if not template_data: + return 1 + + pkgs = template_data["config"].get("packages", []) + apps = [p for p in pkgs if p["source"] != "service"] + services = [p for p in pkgs if p["source"] == "service"] + + console.print(f" - {len(apps)} packages") + console.print( + f" - {len(template_data['config'].get('preferences', {})) + len(template_data['config'].get('environment_variables', {}))} configurations" + ) + console.print(f" - {len(services)} services") + console.print(f"[green]✓[/green] Template saved: [bold]{args.name}-{version}[/bold]\n") + return 0 + + def _template_list(self, manager: Any) -> int: + """Handle 'template list' command.""" + templates = manager.list_templates() + if not templates: + console.print("No templates found.") + return 0 + + console.print("\n[bold cyan]Available System Templates:[/bold cyan]\n") + for template in templates: + console.print( + f"• [bold]{template['name']}[/bold] (latest: {template['latest_version']})" + ) + for v in template["versions"]: + console.print(f" - {v['version']} ({v['created_at'][:10]}): {v['description']}") + console.print() + return 0 + + def _template_show(self, manager: Any, args: argparse.Namespace) -> int: + """Handle 'template show' command.""" + name, version = self._parse_template_spec(args.name) + template_data = manager.get_template(name, version) + if not template_data: + console.print(f"[red]Error:[/red] Template '{args.name}' not found.") + return 1 + + console.print(f"\n[bold]{template_data['name']}:{template_data['version']}[/bold]") + console.print(f"Description: {template_data['description']}") + console.print(f"Created: {template_data['created_at']}") + console.print(f"OS: {template_data['config'].get('os', 'unknown')}") + + pkgs = template_data["config"].get("packages", []) + apps = [p for p in pkgs if p["source"] != "service"] + services = [p for p in pkgs if p["source"] == "service"] + + console.print("\n[bold]Summary:[/bold]") + console.print(f"- {len(apps)} Packages") + console.print( + f"- {len(template_data['config'].get('preferences', {})) + len(template_data['config'].get('environment_variables', {}))} Configurations" + ) + console.print(f"- {len(services)} Services") + return 0 + + def _template_deploy(self, manager: Any, args: argparse.Namespace) -> int: + """Handle 'template deploy' command.""" + from cortex.config_manager import ConfigManager + + name, version = self._parse_template_spec(args.name) + template_data = manager.get_template(name, version) + if not template_data: + console.print(f"[red]Error:[/red] Template '{args.name}' not found.") + return 1 + + config_manager = ConfigManager() + + # Safety default: deploy is dry-run unless --execute is passed + is_dry_run = args.dry_run or not args.execute + + if is_dry_run: + console.print( + f"\n[bold cyan]Previewing deployment of {template_data['name']}:{template_data['version']}[/bold cyan]\n" + ) + diff = config_manager.diff_configuration(template_data["config"]) + + if diff["packages_to_install"]: + console.print( + f"📦 [bold]To Install:[/bold] {len(diff['packages_to_install'])} packages" + ) + if diff["services_to_update"]: + console.print( + f"⚙️ [bold]To Update:[/bold] {len(diff['services_to_update'])} services" + ) + + if not diff["packages_to_install"] and not diff["services_to_update"]: + console.print("[green]System already matches template.[/green]") + return 0 + + target_label = f" to {args.to}" if hasattr(args, "to") and args.to else "" + console.print(f"🚀 [bold]Deploying template{target_label}...[/bold]") + result = config_manager.import_configuration( + str( + manager.base_dir + / template_data["name"] + / template_data["version"] + / "template.yaml" + ), + force=args.force, + ) + + if result: + console.print(" [green]✓[/green] Packages installed") + console.print(" [green]✓[/green] Configurations applied") + console.print(" [green]✓[/green] Services started") + console.print(f"[green]✓[/green] System cloned successfully{target_label}\n") + return 0 + return 1 + + def _template_delete(self, manager: Any, args: argparse.Namespace) -> int: + """Handle 'template delete' command.""" + name, version = self._parse_template_spec(args.name) + if manager.delete_template(name, version): + spec = f"{name}:{version}" if version else name + console.print(f"[green]✓[/green] Deleted template: {spec}") + return 0 + console.print(f"[red]Error:[/red] Template '{args.name}' not found.") + return 1 + + def _template_export(self, manager: Any, args: argparse.Namespace) -> int: + """Handle 'template export' command.""" + name, version = self._parse_template_spec(args.name) + try: + path = manager.export_template(name, version or "v1", args.file) + console.print(f"[green]✓[/green] Template exported to: {path}") + return 0 + except ValueError as e: + console.print(f"[red]Error:[/red] {e}") + return 1 + + def _template_import(self, manager: Any, args: argparse.Namespace) -> int: + """Handle 'template import' command.""" + try: + name, version = manager.import_template(args.file) + console.print(f"[green]✓[/green] Template imported: [bold]{name}:{version}[/bold]") + return 0 + except (FileNotFoundError, ValueError) as e: + console.print(f"[red]Error:[/red] {e}") + return 1 + + return 0 + def gpu(self, action: str = "status", mode: str = None, verbose: bool = False): """Hybrid GPU (Optimus) manager""" from cortex.gpu_manager import run_gpu_manager @@ -4088,6 +4272,50 @@ def main(): benchmark_parser = subparsers.add_parser("benchmark", help="Run AI performance benchmark") benchmark_parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") + # Template commands + template_parser = subparsers.add_parser("template", help="System cloning and templating") + template_subparsers = template_parser.add_subparsers(dest="template_command") + + # Create + create_p = template_subparsers.add_parser("create", help="Create system template") + create_p.add_argument("name", help="Template name") + create_p.add_argument("--description", "-d", default="", help="Template description") + create_p.add_argument( + "--sources", "-s", nargs="+", help="Sources to capture (apt, pip, npm, service)" + ) + + # Deploy + deploy_p = template_subparsers.add_parser("deploy", help="Deploy template") + deploy_p.add_argument("name", help="Template name[:version]") + deploy_p.add_argument( + "--dry-run", action="store_true", help="Preview changes (default behavior)" + ) + deploy_p.add_argument( + "--execute", action="store_true", help="Apply changes; defaults to dry-run" + ) + deploy_p.add_argument("--force", action="store_true", help="Skip compatibility checks") + deploy_p.add_argument("--to", help="Target system (e.g., server-02)") + + # List + template_subparsers.add_parser("list", help="List all templates") + + # Show + show_p = template_subparsers.add_parser("show", help="Show template details") + show_p.add_argument("name", help="Template name[:version]") + + # Delete + del_p = template_subparsers.add_parser("delete", help="Delete template") + del_p.add_argument("name", help="Template name[:version]") + + # Export + exp_p = template_subparsers.add_parser("export", help="Export template to ZIP") + exp_p.add_argument("name", help="Template name[:version]") + exp_p.add_argument("file", help="Output zip file") + + # Import + imp_p = template_subparsers.add_parser("import", help="Import template from ZIP") + imp_p.add_argument("file", help="Input zip file") + # Systemd helper command systemd_parser = subparsers.add_parser("systemd", help="Systemd service helper (plain English)") systemd_parser.add_argument("service", help="Service name") @@ -4788,12 +5016,10 @@ def main(): return cli.status() elif args.command == "benchmark": return cli.benchmark(verbose=getattr(args, "verbose", False)) + elif args.command == "template": + return cli.template(args) elif args.command == "systemd": - return cli.systemd( - args.service, - action=getattr(args, "action", "status"), - verbose=getattr(args, "verbose", False), - ) + return cli.systemd(args.service, args.action, args.verbose) elif args.command == "gpu": return cli.gpu( action=getattr(args, "action", "status"), diff --git a/cortex/config_manager.py b/cortex/config_manager.py index 262ab3dd6..62d1722fa 100755 --- a/cortex/config_manager.py +++ b/cortex/config_manager.py @@ -23,7 +23,6 @@ class ConfigManager: """ Manages configuration export/import for Cortex Linux. - Features: - Export current system state to YAML (packages, configs, preferences) - Import configuration from YAML file @@ -43,7 +42,8 @@ class ConfigManager: SOURCE_APT = "apt" SOURCE_PIP = "pip" SOURCE_NPM = "npm" - DEFAULT_SOURCES: ClassVar[list[str]] = [SOURCE_APT, SOURCE_PIP, SOURCE_NPM] + SOURCE_SERVICE = "service" + DEFAULT_SOURCES: ClassVar[list[str]] = [SOURCE_APT, SOURCE_PIP, SOURCE_NPM, SOURCE_SERVICE] def __init__(self, sandbox_executor=None): """ @@ -213,6 +213,84 @@ def detect_npm_packages(self) -> list[dict[str, Any]]: return packages + def detect_services(self) -> list[dict[str, Any]]: + """ + Detect running and enabled systemd services. + + Returns: + List of service dictionaries with name, state, and enabled status + """ + services = [] + + try: + # 1. Get enabled status for all services (bulk) + enabled_map = {} + enabled_result = subprocess.run( + [ + "systemctl", + "list-unit-files", + "--type=service", + "--all", + "--no-legend", + "--no-pager", + ], + capture_output=True, + text=True, + timeout=self.DETECTION_TIMEOUT, + ) + if enabled_result.returncode == 0: + for line in enabled_result.stdout.strip().split("\n"): + if not line.strip(): + continue + parts = line.split() + if len(parts) >= 2: + unit = parts[0] + state = parts[1] + enabled_map[unit] = state == "enabled" + + # 2. Get active/sub state for all services (bulk) + units_result = subprocess.run( + [ + "systemctl", + "list-units", + "--type=service", + "--all", + "--no-legend", + "--no-pager", + ], + capture_output=True, + text=True, + timeout=self.DETECTION_TIMEOUT, + ) + + if units_result.returncode == 0: + for line in units_result.stdout.strip().split("\n"): + if not line.strip(): + continue + parts = line.split() + if len(parts) >= 4: + service_name = parts[0] + # Only track .service units + if not service_name.endswith(".service"): + continue + + active_state = parts[2] # active, inactive, failed + sub_state = parts[3] # running, dead, exited + + services.append( + { + "name": service_name, + "active_state": active_state, + "sub_state": sub_state, + "enabled": enabled_map.get(service_name, False), + "source": self.SOURCE_SERVICE, + } + ) + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + return services + def detect_installed_packages(self, sources: list[str] | None = None) -> list[dict[str, Any]]: """ Detect all installed packages from specified sources. @@ -238,6 +316,9 @@ def detect_installed_packages(self, sources: list[str] | None = None) -> list[di if self.SOURCE_NPM in sources: all_packages.extend(self.detect_npm_packages()) + if self.SOURCE_SERVICE in sources: + all_packages.extend(self.detect_services()) + # Remove duplicates based on name and source (more efficient) unique_packages_dict = {} for pkg in all_packages: @@ -349,7 +430,7 @@ def export_configuration( except Exception as e: config["hardware"] = {"error": f"Failed to detect hardware: {e}"} - # Add packages + # Add packages and services config["packages"] = self.detect_installed_packages(sources=package_sources) # Add preferences if requested @@ -434,6 +515,32 @@ def validate_compatibility(self, config: dict[str, Any]) -> tuple[bool, str | No return True, None + def _categorize_service( + self, service: dict[str, Any], current_service_map: dict[str, dict[str, Any]] + ) -> tuple[str, dict[str, Any] | None]: + """ + Categorize a service as install, start, or already correct. + """ + name = service.get("name") + if not name: + return "skip", None + + if name not in current_service_map: + return "missing", service + + current = current_service_map[name] + + needs_change = False + if service.get("enabled") and not current.get("enabled"): + needs_change = True + if service.get("active_state") == "active" and current.get("active_state") != "active": + needs_change = True + + if needs_change: + return "update", {**service, "current_state": current.get("active_state")} + + return "already_correct", service + def _categorize_package( self, pkg: dict[str, Any], current_pkg_map: dict[tuple[str, str], str] ) -> tuple[str, dict[str, Any] | None]: @@ -496,29 +603,42 @@ def diff_configuration(self, config: dict[str, Any]) -> dict[str, Any]: "packages_to_upgrade": [], "packages_to_downgrade": [], "packages_already_installed": [], + "services_missing": [], + "services_to_update": [], "preferences_changed": {}, "warnings": [], } - # Get current packages - current_packages = self.detect_installed_packages() - current_pkg_map = {(pkg["name"], pkg["source"]): pkg["version"] for pkg in current_packages} + # Get current state + all_current = self.detect_installed_packages() + current_packages = [p for p in all_current if p["source"] != self.SOURCE_SERVICE] + current_services = [p for p in all_current if p["source"] == self.SOURCE_SERVICE] - # Compare packages from config - config_packages = config.get("packages", []) - for pkg in config_packages: - category, pkg_data = self._categorize_package(pkg, current_pkg_map) - - if category == "skip": - diff["warnings"].append(f"Malformed package entry skipped: {pkg}") - elif category == "install": - diff["packages_to_install"].append(pkg_data) - elif category == "upgrade": - diff["packages_to_upgrade"].append(pkg_data) - elif category == "downgrade": - diff["packages_to_downgrade"].append(pkg_data) - elif category == "already_installed": - diff["packages_already_installed"].append(pkg_data) + current_pkg_map = {(pkg["name"], pkg["source"]): pkg["version"] for pkg in current_packages} + current_service_map = {s["name"]: s for s in current_services} + + # Compare packages/services from config + config_items = config.get("packages", []) + for item in config_items: + source = item.get("source") + if source == self.SOURCE_SERVICE: + category, srv_data = self._categorize_service(item, current_service_map) + if category == "missing": + diff["services_missing"].append(srv_data) + elif category == "update": + diff["services_to_update"].append(srv_data) + else: + category, pkg_data = self._categorize_package(item, current_pkg_map) + if category == "skip": + diff["warnings"].append(f"Malformed package entry skipped: {item}") + elif category == "install": + diff["packages_to_install"].append(pkg_data) + elif category == "upgrade": + diff["packages_to_upgrade"].append(pkg_data) + elif category == "downgrade": + diff["packages_to_downgrade"].append(pkg_data) + elif category == "already_installed": + diff["packages_already_installed"].append(pkg_data) # Compare preferences current_prefs = self._load_preferences() @@ -669,20 +789,28 @@ def import_configuration( # Determine what to import if selective is None: - selective = ["packages", "preferences"] + selective = ["packages", "services", "preferences"] summary = { "installed": [], "upgraded": [], "downgraded": [], + "services_updated": [], "failed": [], "skipped": [], "preferences_updated": False, } + # Compute diff once for efficiency + diff = self.diff_configuration(config) + # Import packages if "packages" in selective: - self._import_packages(config, summary) + self._import_packages(diff, summary) + + # Import services + if "services" in selective: + self._import_services(diff, summary) # Import preferences if "preferences" in selective: @@ -690,22 +818,22 @@ def import_configuration( return summary - def _import_packages(self, config: dict[str, Any], summary: dict[str, Any]) -> None: + def _import_packages(self, diff: dict[str, Any], summary: dict[str, Any]) -> None: """ Import packages from configuration and update system state. - This method processes package installations by first computing the - difference between the current system state and the target configuration - using diff_configuration(). It then attempts to install, upgrade, or - downgrade packages as needed. + This method processes package installations based on the pre-computed + diff result. It attempts to install, upgrade, or downgrade packages + as needed. The method continues processing all packages even if individual packages fail to install, ensuring maximum success. Failed installations are tracked in the summary for user review. Args: - config: Configuration dictionary containing package specifications - Expected to have 'packages' key with list of package dicts + diff: Pre-computed diff dictionary from diff_configuration() + Expected to have keys: 'packages_to_install', 'packages_to_upgrade', + 'packages_to_downgrade' summary: Summary dictionary to update with results. Modified in-place with keys: 'installed', 'upgraded', 'failed' @@ -719,7 +847,6 @@ def _import_packages(self, config: dict[str, Any], summary: dict[str, Any]) -> N Each package is categorized based on diff results (install vs upgrade). Errors are caught and logged to allow processing to continue. """ - diff = self.diff_configuration(config) packages_to_process = ( diff["packages_to_install"] + diff["packages_to_upgrade"] @@ -741,6 +868,47 @@ def _import_packages(self, config: dict[str, Any], summary: dict[str, Any]) -> N except Exception as e: summary["failed"].append(f"{pkg['name']} ({str(e)})") + def _import_services(self, diff: dict[str, Any], summary: dict[str, Any]) -> None: + """ + Import and update service states from configuration. + """ + services_to_process = diff.get("services_to_update", []) + # Note: services_missing are ignored as we don't have unit files to create them + + for srv in services_to_process: + try: + success = self._update_service(srv) + if success: + summary["services_updated"].append(srv["name"]) + else: + summary["failed"].append(f"service:{srv['name']}") + except Exception as e: + summary["failed"].append(f"service:{srv['name']} ({str(e)})") + + def _update_service(self, srv: dict[str, Any]) -> bool: + """ + Update a service state (start/enable). + """ + name = srv["name"] + success = True + + try: + if srv.get("enabled"): + cmd = ["sudo", "systemctl", "enable", name] + res = subprocess.run(cmd, capture_output=True, timeout=self.DETECTION_TIMEOUT) + if res.returncode != 0: + success = False + + if srv.get("active_state") == "active": + cmd = ["sudo", "systemctl", "start", name] + res = subprocess.run(cmd, capture_output=True, timeout=self.DETECTION_TIMEOUT) + if res.returncode != 0: + success = False + except (subprocess.TimeoutExpired, subprocess.SubprocessError): + success = False + + return success + def _import_preferences(self, config: dict[str, Any], summary: dict[str, Any]) -> None: """ Import user preferences from configuration and save to disk. diff --git a/cortex/template_manager.py b/cortex/template_manager.py new file mode 100644 index 000000000..6e9734443 --- /dev/null +++ b/cortex/template_manager.py @@ -0,0 +1,270 @@ +""" +Template Manager for Cortex Linux +Handles lifecycle of system duplication templates. +""" + +import json +import os +import re +import shutil +import zipfile +from datetime import datetime +from pathlib import Path +from typing import Any, Optional + +import yaml + +from cortex.config_manager import ConfigManager + + +class TemplateManager: + """ + Manages system templates for duplication and deployment. + + Templates are stored in ~/.cortex/templates/ + Structure: + ~/.cortex/templates/ + name/ + v1/ + template.yaml + metadata.json + v2/ + ... + """ + + def __init__(self, template_dir: Path | None = None): + self.base_dir = template_dir or Path.home() / ".cortex" / "templates" + self.base_dir.mkdir(parents=True, exist_ok=True) + self.config_manager = ConfigManager() + + def create_template( + self, name: str, description: str, package_sources: list[str] | None = None + ) -> str: + """ + Create a new system template. + """ + template_dir = self.base_dir / name + template_dir.mkdir(parents=True, exist_ok=True) + + # Get next version + def get_v_num(v_str): + match = re.search(r"v(\d+)", v_str) + return int(match.group(1)) if match else 0 + + versions = [ + get_v_num(d.name) + for d in template_dir.iterdir() + if d.is_dir() and d.name.startswith("v") + ] + next_v = max(versions) + 1 if versions else 1 + + version_name = f"v{next_v}" + version_dir = template_dir / version_name + version_dir.mkdir() + + config_file = version_dir / "template.yaml" + metadata_file = version_dir / "metadata.json" + + # Capture system state + self.config_manager.export_configuration(str(config_file), package_sources=package_sources) + + # Save metadata + metadata = { + "name": name, + "version": version_name, + "description": description, + "created_at": datetime.now().isoformat(), + "cortex_version": self.config_manager.CORTEX_VERSION, + } + with open(version_dir / "metadata.json", "w") as f: + json.dump(metadata, f, indent=2) + + return version_name + + def list_templates(self) -> list[dict[str, Any]]: + """ + List all available templates and their versions. + """ + templates = [] + if not self.base_dir.exists(): + return [] + + for template_dir in self.base_dir.iterdir(): + if not template_dir.is_dir(): + continue + + versions = [] + for v_dir in template_dir.iterdir(): + if not v_dir.is_dir() or not v_dir.name.startswith("v"): + continue + + metadata_file = v_dir / "metadata.json" + if metadata_file.exists(): + with open(metadata_file) as f: + versions.append(json.load(f)) + + if versions: + # Sort versions by version number descending + def get_v_num(v_str): + match = re.search(r"v(\d+)", v_str) + return int(match.group(1)) if match else 0 + + versions.sort(key=lambda x: get_v_num(x["version"]), reverse=True) + templates.append( + { + "name": template_dir.name, + "latest_version": versions[0]["version"], + "versions": versions, + } + ) + + return sorted(templates, key=lambda x: x["name"]) + + def get_template(self, name: str, version: str | None = None) -> dict[str, Any] | None: + """ + Retrieve a specific template version. + """ + template_path = self.base_dir / name + if not template_path.exists(): + return None + + if not version: + # Use latest version + def get_v_num(v_str): + match = re.search(r"v(\d+)", v_str) + return int(match.group(1)) if match else 0 + + versions = [ + v.name for v in template_path.iterdir() if v.is_dir() and v.name.startswith("v") + ] + if not versions: + return None + version = sorted(versions, key=get_v_num)[-1] + + version_path = template_path / version + if not version_path.exists(): + return None + + metadata_file = version_path / "metadata.json" + config_file = version_path / "template.yaml" + + if not metadata_file.exists() or not config_file.exists(): + return None + + with open(metadata_file) as f: + data = json.load(f) + + with open(config_file) as f: + data["config"] = yaml.safe_load(f) + + return data + + def delete_template(self, name: str, version: str | None = None) -> bool: + """ + Delete a template or a specific version. + """ + template_path = self.base_dir / name + if not template_path.exists(): + return False + + if version: + version_path = template_path / version + if version_path.exists(): + shutil.rmtree(version_path) + # If no versions left, remove the template folder + if not any(template_path.iterdir()): + shutil.rmtree(template_path) + return True + return False + else: + shutil.rmtree(template_path) + return True + + def export_template(self, name: str, version: str, output_path: str) -> str: + """ + Export a template version as a ZIP file. + """ + template_data = self.base_dir / name / version + if not template_data.exists(): + raise ValueError(f"Template {name}:{version} not found") + + output_path = Path(output_path) + if output_path.suffix != ".zip": + output_path = output_path.with_suffix(".zip") + + with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zipf: + for root, _, files in os.walk(template_data): + for file in files: + file_path = Path(root) / file + zipf.write(file_path, file_path.relative_to(template_data)) + + return str(output_path) + + def import_template(self, input_path: str) -> tuple[str, str]: + """ + Import a template from a ZIP file. + """ + input_path = Path(input_path).resolve() + if not input_path.exists(): + raise FileNotFoundError(f"File not found: {input_path}") + + # Extract to temporary location to read metadata + import tempfile + + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir_path = Path(tmpdir).resolve() + with zipfile.ZipFile(input_path, "r") as zipf: + for member in zipf.infolist(): + # Zip Slip protection: check for absolute paths or ".." + member_path = Path(member.filename) + if member_path.is_absolute() or ".." in member.filename: + continue # Skip unsafe members + + target = (tmpdir_path / member_path).resolve() + if not target.is_relative_to(tmpdir_path): + continue # Skip members outside tmpdir + + if member.is_dir(): + target.mkdir(parents=True, exist_ok=True) + else: + target.parent.mkdir(parents=True, exist_ok=True) + with zipf.open(member) as source, open(target, "wb") as dest: + shutil.copyfileobj(source, dest) + + metadata_file = tmpdir_path / "metadata.json" + if not metadata_file.exists(): + raise ValueError("Invalid template: missing metadata.json") + + with open(metadata_file) as f: + metadata = json.load(f) + + raw_name = metadata.get("name", "") + raw_version = metadata.get("version", "") + + # Sanitize name and version to prevent path traversal + if not re.match(r"^[a-zA-Z0-9._-]+$", raw_name) or not re.match( + r"^[a-zA-Z0-9._-]+$", raw_version + ): + raise ValueError("Invalid template: malformed name or version") + + name = raw_name + version = raw_version + + # Ensure target path is strictly within base_dir + target_path = (self.base_dir / name / version).resolve() + if not target_path.is_relative_to(self.base_dir.resolve()): + raise ValueError("Invalid template: target path outside templates directory") + + if target_path.exists(): + # Append import suffix if version already exists + import time + + version = f"{version}-import-{int(time.time())}" + target_path = self.base_dir / name / version + + target_path.mkdir(parents=True, exist_ok=True) + for item in tmpdir_path.iterdir(): + if item.is_file(): + shutil.copy2(item, target_path / item.name) + + return name, version diff --git a/docs/CLONING_WORKFLOW.md b/docs/CLONING_WORKFLOW.md new file mode 100644 index 000000000..563107063 --- /dev/null +++ b/docs/CLONING_WORKFLOW.md @@ -0,0 +1,102 @@ +# System Cloning and Templating Workflow + +Cortex allows you to capture your entire system state as a reusable template, which can then be used to restore your system or clone it to another machine. This feature replicates your specific environment, server configurations, or project-specific baseline setups with a single command. + +## Core Concepts + +- **Templates**: A snapshot encompassing: + - **Packages**: `apt`, `pip`, and global `npm` packages. + - **Configurations**: User preferences and safe system environment variables. + - **Services**: `systemd` unit states (running/stopped/enabled). +- **VERSIONING**: Automatic version tracking (`v1`, `v2`, etc.) ensures you can rollback to previous "Golden States." +- **Differential Updates**: Cortex calculates the exact difference between your current system and the template, restoring only what is necessary. + +## Step-by-Step Workflow + +### 1. Capture Your System (📸) +Snapshot your current working environment. You can use any name and description that helps you remember what this system state is for. + +```bash +cortex template create --description "" +``` + +**Output Example:** +```text +📸 Capturing system state... + - 1250 packages + - 5 configurations + - 80 services +✓ Template saved: -v1 +``` + +> [!TIP] +> **Performance Optimization**: If you only care about services and want a fast capture, use: +> `cortex template create --sources service` + +### 2. Inspecting Templates +Review what is inside a template before deploying it. + +```bash +# List all templates and their versions +cortex template list + +# See exactly what's inside a specific version +cortex template show -v1 +``` + +### 3. Clone / Restore (🚀) +Replicate the configuration. On the same machine, this acts as "Self-Healing." On a new machine, it acts as a full "Clone." + +```bash +# Dry-run first to see the differential plan +cortex template deploy -v1 --dry-run + +# Perform the actual deployment +cortex template deploy -v1 --to +``` + +**Output:** +```text +🚀 Deploying template... + ✓ Packages installed + ✓ Configurations applied + ✓ Services started +✓ System cloned successfully +``` + +### 4. Sharing Templates +Move your configuration to a new system or share it with a team member. + +1. **On Machine A**: `cortex template export -v1 my_clone.zip` +2. **On Machine B**: `cortex template import my_clone.zip` +3. **Deploy on Machine B**: `cortex template deploy -v1` + +--- + +## Command Reference + +| Command | Description | +|---------|-------------| +| `create ` | Capture current system state as a new version | +| `list` | List all templates and their available versions | +| `show ` | Display counts and description of a template | +| `deploy ` | Restore or clone a template to the current system | +| `export ` | Pack a template version into a portable .zip | +| `import ` | Load a template version from a .zip file | +| `delete ` | Permanently remove a template version | + +## Best Practices + +### The "Self-Healing" Workflow +Regularly create a `baseline` template. If your system starts acting up or a service stops responding, simply run `cortex template deploy baseline` to restore the exact running state without a full reboot or manual debugging. + +### Team Onboarding +1. Create a template with all necessary tools for your specific project. +2. Export and share the ZIP with your team. +3. New members simply `import` and `deploy` to have a matching environment in minutes. + +## Troubleshooting + +- **Sudo Permissions**: Deploying packages or starting services requires `sudo` access. +- **Service Warnings**: If a service fails to restore, check `journalctl -u ` for system-level errors. +- **Version Mismatches**: Use `cortex template list` to verify you are using the correct version suffix (`-v1`). diff --git a/tests/unit/test_config_manager.py b/tests/unit/test_config_manager.py index 003a66cee..f39228edd 100644 --- a/tests/unit/test_config_manager.py +++ b/tests/unit/test_config_manager.py @@ -118,11 +118,13 @@ def test_detect_npm_packages_failure(self, mock_run): @patch.object(ConfigManager, "detect_apt_packages") @patch.object(ConfigManager, "detect_pip_packages") @patch.object(ConfigManager, "detect_npm_packages") - def test_detect_all_packages(self, mock_npm, mock_pip, mock_apt): + @patch.object(ConfigManager, "detect_services") + def test_detect_all_packages(self, mock_services, mock_npm, mock_pip, mock_apt): """Test detection of all packages from all sources.""" mock_apt.return_value = [{"name": "curl", "version": "7.0.0", "source": "apt"}] mock_pip.return_value = [{"name": "numpy", "version": "1.24.0", "source": "pip"}] mock_npm.return_value = [{"name": "typescript", "version": "5.0.0", "source": "npm"}] + mock_services.return_value = [] packages = self.config_manager.detect_installed_packages() diff --git a/tests/unit/test_config_manager_services.py b/tests/unit/test_config_manager_services.py new file mode 100644 index 000000000..2e1b6d06d --- /dev/null +++ b/tests/unit/test_config_manager_services.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +""" +Unit tests for service extensions in ConfigManager. +""" + +import os +import sys +import unittest +from pathlib import Path +from unittest.mock import MagicMock, patch + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) + +from cortex.config_manager import ConfigManager + + +class TestConfigManagerServices(unittest.TestCase): + def setUp(self): + self.config_manager = ConfigManager() + + @patch("subprocess.run") + def test_detect_services(self, mock_run): + # Mock list-unit-files + mock_unit_files = MagicMock() + mock_unit_files.returncode = 0 + mock_unit_files.stdout = "test.service enabled\nother.service disabled\n" + + # Mock list-units + mock_units = MagicMock() + mock_units.returncode = 0 + mock_units.stdout = "test.service loaded active running Test Service\nother.service loaded inactive dead Other Service\n" + + mock_run.side_effect = [mock_unit_files, mock_units] + + services = self.config_manager.detect_services() + + self.assertEqual(len(services), 2) + self.assertEqual(services[0]["name"], "test.service") + self.assertEqual(services[0]["active_state"], "active") + self.assertTrue(services[0]["enabled"]) + + self.assertEqual(services[1]["name"], "other.service") + self.assertEqual(services[1]["active_state"], "inactive") + self.assertFalse(services[1]["enabled"]) + + def test_categorize_service(self): + current_map = { + "test.service": {"name": "test.service", "active_state": "active", "enabled": True}, + "stopped.service": { + "name": "stopped.service", + "active_state": "inactive", + "enabled": True, + }, + } + + # Already correct + srv = {"name": "test.service", "active_state": "active", "enabled": True} + cat, data = self.config_manager._categorize_service(srv, current_map) + self.assertEqual(cat, "already_correct") + + # Update needed + srv = {"name": "stopped.service", "active_state": "active", "enabled": True} + cat, data = self.config_manager._categorize_service(srv, current_map) + self.assertEqual(cat, "update") + self.assertEqual(data["current_state"], "inactive") + + # Missing + srv = {"name": "new.service", "active_state": "active", "enabled": True} + cat, data = self.config_manager._categorize_service(srv, current_map) + self.assertEqual(cat, "missing") + + @patch("subprocess.run") + def test_update_service(self, mock_run): + mock_res = MagicMock() + mock_res.returncode = 0 + mock_run.return_value = mock_res + + srv = {"name": "test.service", "active_state": "active", "enabled": True} + success = self.config_manager._update_service(srv) + + self.assertTrue(success) + self.assertEqual(mock_run.call_count, 2) # enable and start + + @patch("subprocess.run") + def test_update_service_failure(self, mock_run): + mock_res = MagicMock() + mock_res.returncode = 1 + mock_run.return_value = mock_res + + srv = {"name": "fail.service", "active_state": "active", "enabled": True} + success = self.config_manager._update_service(srv) + self.assertFalse(success) + + @patch("cortex.config_manager.ConfigManager.detect_services") + @patch("cortex.config_manager.ConfigManager.detect_apt_packages") + @patch("cortex.config_manager.ConfigManager.detect_pip_packages") + @patch("cortex.config_manager.ConfigManager.detect_npm_packages") + @patch("cortex.hwprofiler.HardwareProfiler.profile") + def test_diff_configuration_with_services( + self, mock_profile, mock_npm, mock_pip, mock_apt, mock_srv + ): + mock_profile.return_value = {"os": "24.04"} + mock_apt.return_value = [] + mock_pip.return_value = [] + mock_npm.return_value = [] + mock_srv.return_value = [ + { + "name": "test.service", + "active_state": "active", + "enabled": False, + "source": "service", + } + ] + + template_config = { + "os": "24.04", + "packages": [ + { + "name": "test.service", + "active_state": "active", + "enabled": True, + "source": "service", + }, + { + "name": "new.service", + "active_state": "active", + "enabled": True, + "source": "service", + }, + ], + } + + diff = self.config_manager.diff_configuration(template_config) + + self.assertEqual(len(diff["services_to_update"]), 1) + self.assertEqual(diff["services_to_update"][0]["name"], "test.service") + self.assertEqual(len(diff["services_missing"]), 1) + self.assertEqual(diff["services_missing"][0]["name"], "new.service") + + @patch("cortex.config_manager.ConfigManager._update_service") + def test_import_services(self, mock_update): + mock_update.return_value = True + diff = { + "services_missing": [{"name": "s1"}], + "services_to_update": [{"name": "s2"}], + } + + summary = {"installed": [], "failed": [], "services_updated": []} + + self.config_manager._import_services(diff, summary) + + # summary has "services_updated" from _import_services + self.assertEqual(len(summary["services_updated"]), 1) # s1 is ignored as per implementation + self.assertEqual(mock_update.call_count, 1) + + @patch("cortex.config_manager.ConfigManager.detect_services") + @patch("cortex.config_manager.ConfigManager.detect_apt_packages") + def test_detect_installed_packages_with_services(self, mock_apt, mock_srv): + mock_apt.return_value = [{"name": "pkg1", "source": "apt"}] + mock_srv.return_value = [{"name": "srv1", "source": "service"}] + + items = self.config_manager.detect_installed_packages(sources=["apt", "service"]) + + self.assertEqual(len(items), 2) + sources = [i["source"] for i in items] + self.assertIn("apt", sources) + self.assertIn("service", sources) + + @patch("cortex.config_manager.ConfigManager._update_service") + def test_import_services_failure(self, mock_update): + mock_update.return_value = False + diff = {"services_to_update": [{"name": "fail-srv"}]} + summary = {"installed": [], "failed": [], "services_updated": []} + + self.config_manager._import_services(diff, summary) + self.assertIn("service:fail-srv", summary["failed"]) + + def test_categorize_service_skip(self): + cat, data = self.config_manager._categorize_service({}, {}) + self.assertEqual(cat, "skip") + + @patch("subprocess.run") + def test_detect_services_error(self, mock_run): + mock_run.side_effect = FileNotFoundError() + services = self.config_manager.detect_services() + self.assertEqual(services, []) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_template_manager.py b/tests/unit/test_template_manager.py new file mode 100644 index 000000000..aaae17553 --- /dev/null +++ b/tests/unit/test_template_manager.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +""" +Unit tests for TemplateManager. +""" + +import json +import os +import shutil +import sys +import tempfile +import unittest +import zipfile +from pathlib import Path +from unittest.mock import patch + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) + +from cortex.template_manager import TemplateManager + + +class TestTemplateManager(unittest.TestCase): + def setUp(self): + self.temp_dir = Path(tempfile.mkdtemp()) + self.template_manager = TemplateManager(template_dir=self.temp_dir) + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_create_template(self, mock_export): + mock_export.return_value = "Success" + + v1 = self.template_manager.create_template("test-template", "Description 1") + self.assertEqual(v1, "v1") + + v2 = self.template_manager.create_template("test-template", "Description 2") + self.assertEqual(v2, "v2") + + self.assertTrue((self.temp_dir / "test-template" / "v1" / "metadata.json").exists()) + self.assertTrue((self.temp_dir / "test-template" / "v2" / "metadata.json").exists()) + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_list_templates(self, mock_export): + self.template_manager.create_template("t1", "desc1") + self.template_manager.create_template("t2", "desc2") + + templates = self.template_manager.list_templates() + self.assertEqual(len(templates), 2) + self.assertEqual(templates[0]["name"], "t1") + self.assertEqual(templates[1]["name"], "t2") + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_get_template(self, mock_export): + # Create a dummy config file so get_template can read it + output_name = "test-t" + v = self.template_manager.create_template(output_name, "desc") + config_path = self.temp_dir / output_name / v / "template.yaml" + with open(config_path, "w") as f: + f.write("test: data") + + t = self.template_manager.get_template(output_name, v) + self.assertIsNotNone(t) + self.assertEqual(t["name"], output_name) + self.assertEqual(t["config"]["test"], "data") + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_export_import_template(self, mock_export): + name = "exp-template" + v = self.template_manager.create_template(name, "to export") + config_path = self.temp_dir / name / v / "template.yaml" + with open(config_path, "w") as f: + f.write("foo: bar") + + zip_path = self.temp_dir / "export.zip" + self.template_manager.export_template(name, v, str(zip_path)) + + self.assertTrue(zip_path.exists()) + + # Import as new template + new_name, new_v = self.template_manager.import_template(str(zip_path)) + self.assertEqual(new_name, name) + # It'll be a new version since v1 exists + self.assertIn("v1", new_v) + + imported = self.template_manager.get_template(new_name, new_v) + self.assertEqual(imported["config"]["foo"], "bar") + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_get_template_latest(self, mock_export): + name = "latest-test" + self.template_manager.create_template(name, "v1") + self.template_manager.create_template(name, "v2") + + # Create dummy files + for v in ["v1", "v2"]: + p = self.temp_dir / name / v + with open(p / "template.yaml", "w") as f: + f.write("f: b") + + t = self.template_manager.get_template(name) + self.assertEqual(t["version"], "v2") + + def test_get_template_not_found(self): + self.assertIsNone(self.template_manager.get_template("nonexistent")) + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_delete_template_and_version(self, mock_export): + name = "del-test" + self.template_manager.create_template(name, "v1") + self.template_manager.create_template(name, "v2") + + # Delete v2 + self.assertTrue(self.template_manager.delete_template(name, "v2")) + self.assertFalse((self.temp_dir / name / "v2").exists()) + self.assertTrue((self.temp_dir / name / "v1").exists()) + + # Delete v1 -> removes whole template dir + self.assertTrue(self.template_manager.delete_template(name, "v1")) + self.assertFalse((self.temp_dir / name).exists()) + + # Delete nonexistent + self.assertFalse(self.template_manager.delete_template("nope")) + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_export_errors(self, mock_export): + with self.assertRaises(ValueError): + self.template_manager.export_template("none", "v1", "out.zip") + + def test_import_errors(self): + # File not found + with self.assertRaises(FileNotFoundError): + self.template_manager.import_template("nonexistent.zip") + + # Missing metadata.json + zip_path = self.temp_dir / "invalid.zip" + with zipfile.ZipFile(zip_path, "w") as z: + z.writestr("random.txt", "data") + with self.assertRaises(ValueError): + self.template_manager.import_template(str(zip_path)) + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_list_templates_empty(self, mock_export): + shutil.rmtree(self.temp_dir) + self.assertEqual(self.template_manager.list_templates(), []) + + +if __name__ == "__main__": + unittest.main()