From 288386c40a98193f7b50882ce0246bc63c96fc7a Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Mon, 19 Jan 2026 23:12:53 +0530 Subject: [PATCH 1/4] feat: implement system cloning and templating #86 --- README.md | 6 + cortex/cli.py | 213 +++++++++++++++++-- cortex/config_manager.py | 182 +++++++++++++++-- cortex/template_manager.py | 226 +++++++++++++++++++++ docs/CLONING_WORKFLOW.md | 102 ++++++++++ tests/unit/test_config_manager_services.py | 197 ++++++++++++++++++ tests/unit/test_template_manager.py | 148 ++++++++++++++ 7 files changed, 1041 insertions(+), 33 deletions(-) create mode 100644 cortex/template_manager.py create mode 100644 docs/CLONING_WORKFLOW.md create mode 100644 tests/unit/test_config_manager_services.py create mode 100644 tests/unit/test_template_manager.py diff --git a/README.md b/README.md index 24db1c178..c04be030e 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ cortex install "tools for video compression" | **Docker Permission Fixer** | Fix root-owned bind mount issues automatically | | **Audit Trail** | Complete history in `~/.cortex/history.db` | | **Hardware-Aware** | Detects GPU, CPU, memory for optimized packages | +| **System Cloning** | Capture, version, and replicate system states via templates | | **Multi-LLM Support** | Works with Claude, GPT-4, or local Ollama models | --- @@ -165,6 +166,9 @@ cortex history # Rollback an installation cortex rollback + +# Create a system backup template +cortex template create base-state --description "Baseline configuration" ``` ### Role Management @@ -190,6 +194,7 @@ cortex role set | `cortex sandbox ` | Test packages in Docker sandbox | | `cortex history` | View all past installations | | `cortex rollback ` | Undo a specific installation | +| `cortex template ` | System cloning and templating (create, deploy, list, show) | | `cortex --version` | Show version information | | `cortex --help` | Display help message | @@ -414,6 +419,7 @@ pip install -e . - [x] Dry-run preview mode - [x] Docker bind-mount permission fixer - [x] Automatic Role Discovery (AI-driven system context sensing) +- [x] System Cloning & Templating (Capture, Version, Clone) ### In Progress - [ ] Conflict resolution UI diff --git a/cortex/cli.py b/cortex/cli.py index 6638a8804..f4191570d 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -22,13 +22,7 @@ format_package_list, ) from cortex.env_manager import EnvironmentManager, get_env_manager -from cortex.i18n import ( - SUPPORTED_LANGUAGES, - LanguageConfig, - get_language, - set_language, - t, -) +from cortex.i18n import SUPPORTED_LANGUAGES, LanguageConfig, get_language, set_language, t from cortex.installation_history import InstallationHistory, InstallationStatus, InstallationType from cortex.llm.interpreter import CommandInterpreter from cortex.network_config import NetworkConfig @@ -1978,6 +1972,164 @@ def systemd(self, service: str, action: str = "status", verbose: bool = False): return run_systemd_helper(service, action, verbose) + def template(self, args): + """Handle template commands""" + from cortex.config_manager import ConfigManager + from cortex.template_manager import TemplateManager + + manager = TemplateManager() + cmd = args.template_command + + if not cmd: + self.console.print( + "[yellow]Usage: cortex template [create|deploy|list|show|delete|export|import][/yellow]" + ) + return 1 + + if cmd == "create": + console.print("📸 [bold]Capturing system state...[/bold]") + version = manager.create_template( + args.name, args.description, package_sources=args.sources + ) + + # Get counts for the summary + t = manager.get_template(args.name, version) + pkgs = t["config"].get("packages", []) + apps = [p for p in pkgs if p["source"] != "service"] + services = [p for p in pkgs if p["source"] == "service"] + + console.print(f" - {len(apps)} packages") + console.print( + f" - {len(t['config'].get('preferences', {})) + len(t['config'].get('environment_variables', {}))} configurations" + ) + console.print(f" - {len(services)} services") + console.print(f"[green]✓[/green] Template saved: [bold]{args.name}-{version}[/bold]\n") + return 0 + + elif cmd == "list": + templates = manager.list_templates() + if not templates: + console.print("No templates found.") + return 0 + + console.print("\n[bold cyan]Available System Templates:[/bold cyan]\n") + for t in templates: + console.print(f"• [bold]{t['name']}[/bold] (latest: {t['latest_version']})") + for v in t["versions"]: + console.print( + f" - {v['version']} ({v['created_at'][:10]}): {v['description']}" + ) + console.print() + return 0 + + elif cmd == "show" or cmd == "deploy" or cmd == "delete" or cmd == "export": + # These need a template specification + # Support both name:v1 and name-v1 syntax + if ":" in args.name: + parts = args.name.split(":", 1) + name = parts[0] + version = parts[1] + elif "-v" in args.name: + # Find the last -v + idx = args.name.rfind("-v") + name = args.name[:idx] + version = args.name[idx + 1 :] + else: + name = args.name + version = None + + if cmd == "show": + t = manager.get_template(name, version) + if not t: + console.print(f"[red]Error:[/red] Template '{args.name}' not found.") + return 1 + + console.print(f"\n[bold]{t['name']}:{t['version']}[/bold]") + console.print(f"Description: {t['description']}") + console.print(f"Created: {t['created_at']}") + console.print(f"OS: {t['config'].get('os', 'unknown')}") + + pkgs = t["config"].get("packages", []) + apps = [p for p in pkgs if p["source"] != "service"] + services = [p for p in pkgs if p["source"] == "service"] + + console.print("\n[bold]Summary:[/bold]") + console.print(f"- {len(apps)} Packages") + console.print( + f"- {len(t['config'].get('preferences', {})) + len(t['config'].get('environment_variables', {}))} Configurations" + ) + console.print(f"- {len(services)} Services") + return 0 + + elif cmd == "delete": + if manager.delete_template(name, version): + spec = f"{name}:{version}" if version else name + console.print(f"[green]✓[/green] Deleted template: {spec}") + return 0 + console.print(f"[red]Error:[/red] Template '{args.name}' not found.") + return 1 + + elif cmd == "export": + try: + path = manager.export_template(name, version or "v1", args.file) + console.print(f"[green]✓[/green] Template exported to: {path}") + return 0 + except Exception as e: + console.print(f"[red]Error:[/red] {e}") + return 1 + + elif cmd == "deploy": + t = manager.get_template(name, version) + if not t: + console.print(f"[red]Error:[/red] Template '{args.name}' not found.") + return 1 + + config_manager = ConfigManager() + + if args.dry_run: + console.print( + f"\n[bold cyan]Previewing deployment of {t['name']}:{t['version']}[/bold cyan]\n" + ) + diff = config_manager.diff_configuration(t["config"]) + + if diff["packages_to_install"]: + console.print( + f"📦 [bold]To Install:[/bold] {len(diff['packages_to_install'])} packages" + ) + if diff["services_to_update"]: + console.print( + f"⚙️ [bold]To Update:[/bold] {len(diff['services_to_update'])} services" + ) + + if not diff["packages_to_install"] and not diff["services_to_update"]: + console.print("[green]System already matches template.[/green]") + return 0 + + console.print("🚀 [bold]Deploying template...[/bold]") + result = config_manager.import_configuration( + str(manager.base_dir / t["name"] / t["version"] / "template.yaml"), + force=args.force, + ) + + if result: + console.print(" [green]✓[/green] Packages installed") + console.print(" [green]✓[/green] Configurations applied") + console.print(" [green]✓[/green] Services started") + console.print("[green]✓[/green] System cloned successfully\n") + return 0 + return 1 + + elif cmd == "import": + try: + name, version = manager.import_template(args.file) + console.print(f"[green]✓[/green] Template imported: [bold]{name}:{version}[/bold]") + return 0 + except Exception as e: + console.print(f"[red]Error:[/red] {e}") + return 1 + + return 0 + def gpu(self, action: str = "status", mode: str = None, verbose: bool = False): """Hybrid GPU (Optimus) manager""" from cortex.gpu_manager import run_gpu_manager @@ -3277,6 +3429,45 @@ def main(): benchmark_parser = subparsers.add_parser("benchmark", help="Run AI performance benchmark") benchmark_parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") + # Template commands + template_parser = subparsers.add_parser("template", help="System cloning and templating") + template_subparsers = template_parser.add_subparsers(dest="template_command") + + # Create + create_p = template_subparsers.add_parser("create", help="Create system template") + create_p.add_argument("name", help="Template name") + create_p.add_argument("--description", "-d", default="", help="Template description") + create_p.add_argument( + "--sources", "-s", nargs="+", help="Sources to capture (apt, pip, npm, service)" + ) + + # Deploy + deploy_p = template_subparsers.add_parser("deploy", help="Deploy template") + deploy_p.add_argument("name", help="Template name[:version]") + deploy_p.add_argument("--dry-run", action="store_true", help="Preview changes") + deploy_p.add_argument("--force", action="store_true", help="Skip compatibility checks") + deploy_p.add_argument("--to", help="Target system (e.g., server-02)") + + # List + template_subparsers.add_parser("list", help="List all templates") + + # Show + show_p = template_subparsers.add_parser("show", help="Show template details") + show_p.add_argument("name", help="Template name[:version]") + + # Delete + del_p = template_subparsers.add_parser("delete", help="Delete template") + del_p.add_argument("name", help="Template name[:version]") + + # Export + exp_p = template_subparsers.add_parser("export", help="Export template to ZIP") + exp_p.add_argument("name", help="Template name[:version]") + exp_p.add_argument("file", help="Output zip file") + + # Import + imp_p = template_subparsers.add_parser("import", help="Import template from ZIP") + imp_p.add_argument("file", help="Input zip file") + # Systemd helper command systemd_parser = subparsers.add_parser("systemd", help="Systemd service helper (plain English)") systemd_parser.add_argument("service", help="Service name") @@ -3907,12 +4098,10 @@ def main(): return cli.status() elif args.command == "benchmark": return cli.benchmark(verbose=getattr(args, "verbose", False)) + elif args.command == "template": + return cli.template(args) elif args.command == "systemd": - return cli.systemd( - args.service, - action=getattr(args, "action", "status"), - verbose=getattr(args, "verbose", False), - ) + return cli.systemd(args.service, args.action, args.verbose) elif args.command == "gpu": return cli.gpu( action=getattr(args, "action", "status"), diff --git a/cortex/config_manager.py b/cortex/config_manager.py index 3353fefb7..b86dff640 100755 --- a/cortex/config_manager.py +++ b/cortex/config_manager.py @@ -40,7 +40,8 @@ class ConfigManager: SOURCE_APT = "apt" SOURCE_PIP = "pip" SOURCE_NPM = "npm" - DEFAULT_SOURCES: ClassVar[list[str]] = [SOURCE_APT, SOURCE_PIP, SOURCE_NPM] + SOURCE_SERVICE = "service" + DEFAULT_SOURCES: ClassVar[list[str]] = [SOURCE_APT, SOURCE_PIP, SOURCE_NPM, SOURCE_SERVICE] def __init__(self, sandbox_executor=None): """ @@ -209,6 +210,59 @@ def detect_npm_packages(self) -> list[dict[str, Any]]: return packages + def detect_services(self) -> list[dict[str, Any]]: + """ + Detect running and enabled systemd services. + + Returns: + List of service dictionaries with name, state, and enabled status + """ + services = [] + + try: + # Get list of all service units + result = subprocess.run( + ["systemctl", "list-units", "--type=service", "--all", "--no-legend", "--no-pager"], + capture_output=True, + text=True, + timeout=self.DETECTION_TIMEOUT, + ) + + if result.returncode == 0: + for line in result.stdout.strip().split("\n"): + parts = line.split() + if len(parts) >= 4: + service_name = parts[0] + # Only track .service units + if not service_name.endswith(".service"): + continue + + active_state = parts[2] # active, inactive, failed + sub_state = parts[3] # running, dead, exited + + # Get enabled status + enabled_result = subprocess.run( + ["systemctl", "is-enabled", service_name], + capture_output=True, + text=True, + timeout=5, + ) + is_enabled = enabled_result.stdout.strip() == "enabled" + + services.append( + { + "name": service_name, + "active_state": active_state, + "sub_state": sub_state, + "enabled": is_enabled, + "source": self.SOURCE_SERVICE, + } + ) + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + return services + def detect_installed_packages(self, sources: list[str] | None = None) -> list[dict[str, Any]]: """ Detect all installed packages from specified sources. @@ -234,6 +288,9 @@ def detect_installed_packages(self, sources: list[str] | None = None) -> list[di if self.SOURCE_NPM in sources: all_packages.extend(self.detect_npm_packages()) + if self.SOURCE_SERVICE in sources: + all_packages.extend(self.detect_services()) + # Remove duplicates based on name and source (more efficient) unique_packages_dict = {} for pkg in all_packages: @@ -344,7 +401,7 @@ def export_configuration( except Exception as e: config["hardware"] = {"error": f"Failed to detect hardware: {e}"} - # Add packages + # Add packages and services config["packages"] = self.detect_installed_packages(sources=package_sources) # Add preferences if requested @@ -429,6 +486,32 @@ def validate_compatibility(self, config: dict[str, Any]) -> tuple[bool, str | No return True, None + def _categorize_service( + self, service: dict[str, Any], current_service_map: dict[str, dict[str, Any]] + ) -> tuple[str, dict[str, Any] | None]: + """ + Categorize a service as install, start, or already correct. + """ + name = service.get("name") + if not name: + return "skip", None + + if name not in current_service_map: + return "missing", service + + current = current_service_map[name] + + needs_change = False + if service.get("enabled") and not current.get("enabled"): + needs_change = True + if service.get("active_state") == "active" and current.get("active_state") != "active": + needs_change = True + + if needs_change: + return "update", {**service, "current_state": current.get("active_state")} + + return "already_correct", service + def _categorize_package( self, pkg: dict[str, Any], current_pkg_map: dict[tuple[str, str], str] ) -> tuple[str, dict[str, Any] | None]: @@ -486,29 +569,42 @@ def diff_configuration(self, config: dict[str, Any]) -> dict[str, Any]: "packages_to_upgrade": [], "packages_to_downgrade": [], "packages_already_installed": [], + "services_missing": [], + "services_to_update": [], "preferences_changed": {}, "warnings": [], } - # Get current packages - current_packages = self.detect_installed_packages() - current_pkg_map = {(pkg["name"], pkg["source"]): pkg["version"] for pkg in current_packages} + # Get current state + all_current = self.detect_installed_packages() + current_packages = [p for p in all_current if p["source"] != self.SOURCE_SERVICE] + current_services = [p for p in all_current if p["source"] == self.SOURCE_SERVICE] - # Compare packages from config - config_packages = config.get("packages", []) - for pkg in config_packages: - category, pkg_data = self._categorize_package(pkg, current_pkg_map) - - if category == "skip": - diff["warnings"].append(f"Malformed package entry skipped: {pkg}") - elif category == "install": - diff["packages_to_install"].append(pkg_data) - elif category == "upgrade": - diff["packages_to_upgrade"].append(pkg_data) - elif category == "downgrade": - diff["packages_to_downgrade"].append(pkg_data) - elif category == "already_installed": - diff["packages_already_installed"].append(pkg_data) + current_pkg_map = {(pkg["name"], pkg["source"]): pkg["version"] for pkg in current_packages} + current_service_map = {s["name"]: s for s in current_services} + + # Compare packages/services from config + config_items = config.get("packages", []) + for item in config_items: + source = item.get("source") + if source == self.SOURCE_SERVICE: + category, srv_data = self._categorize_service(item, current_service_map) + if category == "missing": + diff["services_missing"].append(srv_data) + elif category == "update": + diff["services_to_update"].append(srv_data) + else: + category, pkg_data = self._categorize_package(item, current_pkg_map) + if category == "skip": + diff["warnings"].append(f"Malformed package entry skipped: {item}") + elif category == "install": + diff["packages_to_install"].append(pkg_data) + elif category == "upgrade": + diff["packages_to_upgrade"].append(pkg_data) + elif category == "downgrade": + diff["packages_to_downgrade"].append(pkg_data) + elif category == "already_installed": + diff["packages_already_installed"].append(pkg_data) # Compare preferences current_prefs = self._load_preferences() @@ -658,12 +754,13 @@ def import_configuration( # Determine what to import if selective is None: - selective = ["packages", "preferences"] + selective = ["packages", "services", "preferences"] summary = { "installed": [], "upgraded": [], "downgraded": [], + "services_updated": [], "failed": [], "skipped": [], "preferences_updated": False, @@ -673,6 +770,10 @@ def import_configuration( if "packages" in selective: self._import_packages(config, summary) + # Import services + if "services" in selective: + self._import_services(config, summary) + # Import preferences if "preferences" in selective: self._import_preferences(config, summary) @@ -730,6 +831,45 @@ def _import_packages(self, config: dict[str, Any], summary: dict[str, Any]) -> N except Exception as e: summary["failed"].append(f"{pkg['name']} ({str(e)})") + def _import_services(self, config: dict[str, Any], summary: dict[str, Any]) -> None: + """ + Import and update service states from configuration. + """ + diff = self.diff_configuration(config) + services_to_process = diff["services_to_update"] + # Note: services_missing are ignored as we don't have unit files to create them + + for srv in services_to_process: + try: + success = self._update_service(srv) + if success: + summary["services_updated"].append(srv["name"]) + else: + summary["failed"].append(f"service:{srv['name']}") + except Exception as e: + summary["failed"].append(f"service:{srv['name']} ({str(e)})") + + def _update_service(self, srv: dict[str, Any]) -> bool: + """ + Update a service state (start/enable). + """ + name = srv["name"] + success = True + + if srv.get("enabled"): + cmd = ["sudo", "systemctl", "enable", name] + res = subprocess.run(cmd, capture_output=True) + if res.returncode != 0: + success = False + + if srv.get("active_state") == "active": + cmd = ["sudo", "systemctl", "start", name] + res = subprocess.run(cmd, capture_output=True) + if res.returncode != 0: + success = False + + return success + def _import_preferences(self, config: dict[str, Any], summary: dict[str, Any]) -> None: """ Import user preferences from configuration and save to disk. diff --git a/cortex/template_manager.py b/cortex/template_manager.py new file mode 100644 index 000000000..9b28b73e3 --- /dev/null +++ b/cortex/template_manager.py @@ -0,0 +1,226 @@ +""" +Template Manager for Cortex Linux +Handles lifecycle of system duplication templates. +""" + +import json +import os +import shutil +import zipfile +from datetime import datetime +from pathlib import Path +from typing import Any, Optional + +import yaml + +from cortex.config_manager import ConfigManager + + +class TemplateManager: + """ + Manages system templates for duplication and deployment. + + Templates are stored in ~/.cortex/templates/ + Structure: + ~/.cortex/templates/ + name/ + v1/ + template.yaml + metadata.json + v2/ + ... + """ + + def __init__(self, template_dir: Path | None = None): + self.base_dir = template_dir or Path.home() / ".cortex" / "templates" + self.base_dir.mkdir(parents=True, exist_ok=True) + self.config_manager = ConfigManager() + + def create_template( + self, name: str, description: str, package_sources: list[str] | None = None + ) -> str: + """ + Create a new system template. + """ + template_dir = self.base_dir / name + template_dir.mkdir(parents=True, exist_ok=True) + + # Get next version + versions = [d.name for d in template_dir.iterdir() if d.is_dir() and d.name.startswith("v")] + next_v = 1 + if versions: + next_v = max([int(v[1:]) for v in versions]) + 1 + + version_name = f"v{next_v}" + version_dir = template_dir / version_name + version_dir.mkdir() + + config_file = version_dir / "template.yaml" + metadata_file = version_dir / "metadata.json" + + # Capture system state + self.config_manager.export_configuration(str(config_file), package_sources=package_sources) + + # Save metadata + metadata = { + "name": name, + "version": version_name, + "description": description, + "created_at": datetime.now().isoformat(), + "cortex_version": self.config_manager.CORTEX_VERSION, + } + with open(version_dir / "metadata.json", "w") as f: + json.dump(metadata, f, indent=2) + + return version_name + + def list_templates(self) -> list[dict[str, Any]]: + """ + List all available templates and their versions. + """ + templates = [] + if not self.base_dir.exists(): + return [] + + for template_dir in self.base_dir.iterdir(): + if not template_dir.is_dir(): + continue + + versions = [] + for v_dir in template_dir.iterdir(): + if not v_dir.is_dir() or not v_dir.name.startswith("v"): + continue + + metadata_file = v_dir / "metadata.json" + if metadata_file.exists(): + with open(metadata_file) as f: + versions.append(json.load(f)) + + if versions: + # Sort versions by version number descending + versions.sort(key=lambda x: int(x["version"][1:]), reverse=True) + templates.append( + { + "name": template_dir.name, + "latest_version": versions[0]["version"], + "versions": versions, + } + ) + + return sorted(templates, key=lambda x: x["name"]) + + def get_template(self, name: str, version: str | None = None) -> dict[str, Any] | None: + """ + Retrieve a specific template version. + """ + template_path = self.base_dir / name + if not template_path.exists(): + return None + + if not version: + # Use latest version + versions = [ + v.name for v in template_path.iterdir() if v.is_dir() and v.name.startswith("v") + ] + if not versions: + return None + version = sorted(versions, key=lambda x: int(x[1:]))[-1] + + version_path = template_path / version + if not version_path.exists(): + return None + + metadata_file = version_path / "metadata.json" + config_file = version_path / "template.yaml" + + if not metadata_file.exists() or not config_file.exists(): + return None + + with open(metadata_file) as f: + data = json.load(f) + + with open(config_file) as f: + data["config"] = yaml.safe_load(f) + + return data + + def delete_template(self, name: str, version: str | None = None) -> bool: + """ + Delete a template or a specific version. + """ + template_path = self.base_dir / name + if not template_path.exists(): + return False + + if version: + version_path = template_path / version + if version_path.exists(): + shutil.rmtree(version_path) + # If no versions left, remove the template folder + if not any(template_path.iterdir()): + shutil.rmtree(template_path) + return True + return False + else: + shutil.rmtree(template_path) + return True + + def export_template(self, name: str, version: str, output_path: str) -> str: + """ + Export a template version as a ZIP file. + """ + template_data = self.base_dir / name / version + if not template_data.exists(): + raise ValueError(f"Template {name}:{version} not found") + + output_path = Path(output_path) + if output_path.suffix != ".zip": + output_path = output_path.with_suffix(".zip") + + with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zipf: + for root, _, files in os.walk(template_data): + for file in files: + file_path = Path(root) / file + zipf.write(file_path, file_path.relative_to(template_data)) + + return str(output_path) + + def import_template(self, input_path: str) -> tuple[str, str]: + """ + Import a template from a ZIP file. + """ + input_path = Path(input_path) + if not input_path.exists(): + raise FileNotFoundError(f"File not found: {input_path}") + + # Extract to temporary location to read metadata + import tempfile + + with tempfile.TemporaryDirectory() as tmpdir: + with zipfile.ZipFile(input_path, "r") as zipf: + zipf.extractall(tmpdir) + + metadata_file = Path(tmpdir) / "metadata.json" + if not metadata_file.exists(): + raise ValueError("Invalid template: missing metadata.json") + + with open(metadata_file) as f: + metadata = json.load(f) + + name = metadata["name"] + version = metadata["version"] + + target_path = self.base_dir / name / version + if target_path.exists(): + # Append import suffix if version already exists + import time + + version = f"{version}-import-{int(time.time())}" + target_path = self.base_dir / name / version + + target_path.mkdir(parents=True) + for item in Path(tmpdir).iterdir(): + if item.is_file(): + shutil.copy2(item, target_path / item.name) + + return name, version diff --git a/docs/CLONING_WORKFLOW.md b/docs/CLONING_WORKFLOW.md new file mode 100644 index 000000000..563107063 --- /dev/null +++ b/docs/CLONING_WORKFLOW.md @@ -0,0 +1,102 @@ +# System Cloning and Templating Workflow + +Cortex allows you to capture your entire system state as a reusable template, which can then be used to restore your system or clone it to another machine. This feature replicates your specific environment, server configurations, or project-specific baseline setups with a single command. + +## Core Concepts + +- **Templates**: A snapshot encompassing: + - **Packages**: `apt`, `pip`, and global `npm` packages. + - **Configurations**: User preferences and safe system environment variables. + - **Services**: `systemd` unit states (running/stopped/enabled). +- **VERSIONING**: Automatic version tracking (`v1`, `v2`, etc.) ensures you can rollback to previous "Golden States." +- **Differential Updates**: Cortex calculates the exact difference between your current system and the template, restoring only what is necessary. + +## Step-by-Step Workflow + +### 1. Capture Your System (📸) +Snapshot your current working environment. You can use any name and description that helps you remember what this system state is for. + +```bash +cortex template create --description "" +``` + +**Output Example:** +```text +📸 Capturing system state... + - 1250 packages + - 5 configurations + - 80 services +✓ Template saved: -v1 +``` + +> [!TIP] +> **Performance Optimization**: If you only care about services and want a fast capture, use: +> `cortex template create --sources service` + +### 2. Inspecting Templates +Review what is inside a template before deploying it. + +```bash +# List all templates and their versions +cortex template list + +# See exactly what's inside a specific version +cortex template show -v1 +``` + +### 3. Clone / Restore (🚀) +Replicate the configuration. On the same machine, this acts as "Self-Healing." On a new machine, it acts as a full "Clone." + +```bash +# Dry-run first to see the differential plan +cortex template deploy -v1 --dry-run + +# Perform the actual deployment +cortex template deploy -v1 --to +``` + +**Output:** +```text +🚀 Deploying template... + ✓ Packages installed + ✓ Configurations applied + ✓ Services started +✓ System cloned successfully +``` + +### 4. Sharing Templates +Move your configuration to a new system or share it with a team member. + +1. **On Machine A**: `cortex template export -v1 my_clone.zip` +2. **On Machine B**: `cortex template import my_clone.zip` +3. **Deploy on Machine B**: `cortex template deploy -v1` + +--- + +## Command Reference + +| Command | Description | +|---------|-------------| +| `create ` | Capture current system state as a new version | +| `list` | List all templates and their available versions | +| `show ` | Display counts and description of a template | +| `deploy ` | Restore or clone a template to the current system | +| `export ` | Pack a template version into a portable .zip | +| `import ` | Load a template version from a .zip file | +| `delete ` | Permanently remove a template version | + +## Best Practices + +### The "Self-Healing" Workflow +Regularly create a `baseline` template. If your system starts acting up or a service stops responding, simply run `cortex template deploy baseline` to restore the exact running state without a full reboot or manual debugging. + +### Team Onboarding +1. Create a template with all necessary tools for your specific project. +2. Export and share the ZIP with your team. +3. New members simply `import` and `deploy` to have a matching environment in minutes. + +## Troubleshooting + +- **Sudo Permissions**: Deploying packages or starting services requires `sudo` access. +- **Service Warnings**: If a service fails to restore, check `journalctl -u ` for system-level errors. +- **Version Mismatches**: Use `cortex template list` to verify you are using the correct version suffix (`-v1`). diff --git a/tests/unit/test_config_manager_services.py b/tests/unit/test_config_manager_services.py new file mode 100644 index 000000000..1eb0ec9ee --- /dev/null +++ b/tests/unit/test_config_manager_services.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 +""" +Unit tests for service extensions in ConfigManager. +""" + +import os +import sys +import unittest +from pathlib import Path +from unittest.mock import MagicMock, patch + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) + +from cortex.config_manager import ConfigManager + + +class TestConfigManagerServices(unittest.TestCase): + def setUp(self): + self.config_manager = ConfigManager() + + @patch("subprocess.run") + def test_detect_services(self, mock_run): + # Mock list-units + mock_units = MagicMock() + mock_units.returncode = 0 + mock_units.stdout = "test.service loaded active running Test Service\nother.service loaded inactive dead Other Service\n" + + # Mock is-enabled (called twice) + mock_enabled = MagicMock() + mock_enabled.returncode = 0 + mock_enabled.stdout = "enabled\n" + + mock_disabled = MagicMock() + mock_disabled.returncode = 0 + mock_disabled.stdout = "disabled\n" + + mock_run.side_effect = [mock_units, mock_enabled, mock_disabled] + + services = self.config_manager.detect_services() + + self.assertEqual(len(services), 2) + self.assertEqual(services[0]["name"], "test.service") + self.assertEqual(services[0]["active_state"], "active") + self.assertTrue(services[0]["enabled"]) + + self.assertEqual(services[1]["name"], "other.service") + self.assertEqual(services[1]["active_state"], "inactive") + self.assertFalse(services[1]["enabled"]) + + def test_categorize_service(self): + current_map = { + "test.service": {"name": "test.service", "active_state": "active", "enabled": True}, + "stopped.service": { + "name": "stopped.service", + "active_state": "inactive", + "enabled": True, + }, + } + + # Already correct + srv = {"name": "test.service", "active_state": "active", "enabled": True} + cat, data = self.config_manager._categorize_service(srv, current_map) + self.assertEqual(cat, "already_correct") + + # Update needed + srv = {"name": "stopped.service", "active_state": "active", "enabled": True} + cat, data = self.config_manager._categorize_service(srv, current_map) + self.assertEqual(cat, "update") + self.assertEqual(data["current_state"], "inactive") + + # Missing + srv = {"name": "new.service", "active_state": "active", "enabled": True} + cat, data = self.config_manager._categorize_service(srv, current_map) + self.assertEqual(cat, "missing") + + @patch("subprocess.run") + def test_update_service(self, mock_run): + mock_res = MagicMock() + mock_res.returncode = 0 + mock_run.return_value = mock_res + + srv = {"name": "test.service", "active_state": "active", "enabled": True} + success = self.config_manager._update_service(srv) + + self.assertTrue(success) + self.assertEqual(mock_run.call_count, 2) # enable and start + + @patch("subprocess.run") + def test_update_service_failure(self, mock_run): + mock_res = MagicMock() + mock_res.returncode = 1 + mock_run.return_value = mock_res + + srv = {"name": "fail.service", "active_state": "active", "enabled": True} + success = self.config_manager._update_service(srv) + self.assertFalse(success) + + @patch("cortex.config_manager.ConfigManager.detect_services") + @patch("cortex.config_manager.ConfigManager.detect_apt_packages") + @patch("cortex.config_manager.ConfigManager.detect_pip_packages") + @patch("cortex.config_manager.ConfigManager.detect_npm_packages") + @patch("cortex.hwprofiler.HardwareProfiler.profile") + def test_diff_configuration_with_services( + self, mock_profile, mock_npm, mock_pip, mock_apt, mock_srv + ): + mock_profile.return_value = {"os": "24.04"} + mock_apt.return_value = [] + mock_pip.return_value = [] + mock_npm.return_value = [] + mock_srv.return_value = [ + { + "name": "test.service", + "active_state": "active", + "enabled": False, + "source": "service", + } + ] + + template_config = { + "os": "24.04", + "packages": [ + { + "name": "test.service", + "active_state": "active", + "enabled": True, + "source": "service", + }, + { + "name": "new.service", + "active_state": "active", + "enabled": True, + "source": "service", + }, + ], + } + + diff = self.config_manager.diff_configuration(template_config) + + self.assertEqual(len(diff["services_to_update"]), 1) + self.assertEqual(diff["services_to_update"][0]["name"], "test.service") + self.assertEqual(len(diff["services_missing"]), 1) + self.assertEqual(diff["services_missing"][0]["name"], "new.service") + + @patch("cortex.config_manager.ConfigManager.diff_configuration") + @patch("cortex.config_manager.ConfigManager._update_service") + def test_import_services(self, mock_update, mock_diff): + mock_update.return_value = True + mock_diff.return_value = { + "services_missing": [{"name": "s1"}], + "services_to_update": [{"name": "s2"}], + } + + config = {"packages": []} + summary = {"installed": [], "failed": [], "services_updated": [], "services_failed": []} + + self.config_manager._import_services(config, summary) + + # summary has "services_updated" from _import_services + self.assertEqual(len(summary["services_updated"]), 1) # s1 is ignored as per implementation + self.assertEqual(mock_update.call_count, 1) + + @patch("cortex.config_manager.ConfigManager.detect_services") + @patch("cortex.config_manager.ConfigManager.detect_apt_packages") + def test_detect_installed_packages_with_services(self, mock_apt, mock_srv): + mock_apt.return_value = [{"name": "pkg1", "source": "apt"}] + mock_srv.return_value = [{"name": "srv1", "source": "service"}] + + items = self.config_manager.detect_installed_packages(sources=["apt", "service"]) + + self.assertEqual(len(items), 2) + sources = [i["source"] for i in items] + self.assertIn("apt", sources) + self.assertIn("service", sources) + + @patch("cortex.config_manager.ConfigManager.diff_configuration") + @patch("cortex.config_manager.ConfigManager._update_service") + def test_import_services_failure(self, mock_update, mock_diff): + mock_update.return_value = False + mock_diff.return_value = {"services_to_update": [{"name": "fail-srv"}]} + summary = {"installed": [], "failed": [], "services_updated": [], "services_failed": []} + + self.config_manager._import_services({}, summary) + self.assertIn("service:fail-srv", summary["failed"]) + + def test_categorize_service_skip(self): + cat, data = self.config_manager._categorize_service({}, {}) + self.assertEqual(cat, "skip") + + @patch("subprocess.run") + def test_detect_services_error(self, mock_run): + mock_run.side_effect = FileNotFoundError() + services = self.config_manager.detect_services() + self.assertEqual(services, []) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_template_manager.py b/tests/unit/test_template_manager.py new file mode 100644 index 000000000..aaae17553 --- /dev/null +++ b/tests/unit/test_template_manager.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +""" +Unit tests for TemplateManager. +""" + +import json +import os +import shutil +import sys +import tempfile +import unittest +import zipfile +from pathlib import Path +from unittest.mock import patch + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) + +from cortex.template_manager import TemplateManager + + +class TestTemplateManager(unittest.TestCase): + def setUp(self): + self.temp_dir = Path(tempfile.mkdtemp()) + self.template_manager = TemplateManager(template_dir=self.temp_dir) + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_create_template(self, mock_export): + mock_export.return_value = "Success" + + v1 = self.template_manager.create_template("test-template", "Description 1") + self.assertEqual(v1, "v1") + + v2 = self.template_manager.create_template("test-template", "Description 2") + self.assertEqual(v2, "v2") + + self.assertTrue((self.temp_dir / "test-template" / "v1" / "metadata.json").exists()) + self.assertTrue((self.temp_dir / "test-template" / "v2" / "metadata.json").exists()) + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_list_templates(self, mock_export): + self.template_manager.create_template("t1", "desc1") + self.template_manager.create_template("t2", "desc2") + + templates = self.template_manager.list_templates() + self.assertEqual(len(templates), 2) + self.assertEqual(templates[0]["name"], "t1") + self.assertEqual(templates[1]["name"], "t2") + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_get_template(self, mock_export): + # Create a dummy config file so get_template can read it + output_name = "test-t" + v = self.template_manager.create_template(output_name, "desc") + config_path = self.temp_dir / output_name / v / "template.yaml" + with open(config_path, "w") as f: + f.write("test: data") + + t = self.template_manager.get_template(output_name, v) + self.assertIsNotNone(t) + self.assertEqual(t["name"], output_name) + self.assertEqual(t["config"]["test"], "data") + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_export_import_template(self, mock_export): + name = "exp-template" + v = self.template_manager.create_template(name, "to export") + config_path = self.temp_dir / name / v / "template.yaml" + with open(config_path, "w") as f: + f.write("foo: bar") + + zip_path = self.temp_dir / "export.zip" + self.template_manager.export_template(name, v, str(zip_path)) + + self.assertTrue(zip_path.exists()) + + # Import as new template + new_name, new_v = self.template_manager.import_template(str(zip_path)) + self.assertEqual(new_name, name) + # It'll be a new version since v1 exists + self.assertIn("v1", new_v) + + imported = self.template_manager.get_template(new_name, new_v) + self.assertEqual(imported["config"]["foo"], "bar") + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_get_template_latest(self, mock_export): + name = "latest-test" + self.template_manager.create_template(name, "v1") + self.template_manager.create_template(name, "v2") + + # Create dummy files + for v in ["v1", "v2"]: + p = self.temp_dir / name / v + with open(p / "template.yaml", "w") as f: + f.write("f: b") + + t = self.template_manager.get_template(name) + self.assertEqual(t["version"], "v2") + + def test_get_template_not_found(self): + self.assertIsNone(self.template_manager.get_template("nonexistent")) + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_delete_template_and_version(self, mock_export): + name = "del-test" + self.template_manager.create_template(name, "v1") + self.template_manager.create_template(name, "v2") + + # Delete v2 + self.assertTrue(self.template_manager.delete_template(name, "v2")) + self.assertFalse((self.temp_dir / name / "v2").exists()) + self.assertTrue((self.temp_dir / name / "v1").exists()) + + # Delete v1 -> removes whole template dir + self.assertTrue(self.template_manager.delete_template(name, "v1")) + self.assertFalse((self.temp_dir / name).exists()) + + # Delete nonexistent + self.assertFalse(self.template_manager.delete_template("nope")) + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_export_errors(self, mock_export): + with self.assertRaises(ValueError): + self.template_manager.export_template("none", "v1", "out.zip") + + def test_import_errors(self): + # File not found + with self.assertRaises(FileNotFoundError): + self.template_manager.import_template("nonexistent.zip") + + # Missing metadata.json + zip_path = self.temp_dir / "invalid.zip" + with zipfile.ZipFile(zip_path, "w") as z: + z.writestr("random.txt", "data") + with self.assertRaises(ValueError): + self.template_manager.import_template(str(zip_path)) + + @patch("cortex.config_manager.ConfigManager.export_configuration") + def test_list_templates_empty(self, mock_export): + shutil.rmtree(self.temp_dir) + self.assertEqual(self.template_manager.list_templates(), []) + + +if __name__ == "__main__": + unittest.main() From a2b1f4953c13556ad655d21f840a8ae2bd02cdfd Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Tue, 20 Jan 2026 00:13:10 +0530 Subject: [PATCH 2/4] fix: resolve CI test failures in config_manager --- cortex/config_manager.py | 3 +-- tests/unit/test_config_manager.py | 4 +++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cortex/config_manager.py b/cortex/config_manager.py index b86dff640..ac08581f3 100755 --- a/cortex/config_manager.py +++ b/cortex/config_manager.py @@ -20,7 +20,6 @@ class ConfigManager: """ Manages configuration export/import for Cortex Linux. - Features: - Export current system state to YAML (packages, configs, preferences) - Import configuration from YAML file @@ -836,7 +835,7 @@ def _import_services(self, config: dict[str, Any], summary: dict[str, Any]) -> N Import and update service states from configuration. """ diff = self.diff_configuration(config) - services_to_process = diff["services_to_update"] + services_to_process = diff.get("services_to_update", []) # Note: services_missing are ignored as we don't have unit files to create them for srv in services_to_process: diff --git a/tests/unit/test_config_manager.py b/tests/unit/test_config_manager.py index 003a66cee..f39228edd 100644 --- a/tests/unit/test_config_manager.py +++ b/tests/unit/test_config_manager.py @@ -118,11 +118,13 @@ def test_detect_npm_packages_failure(self, mock_run): @patch.object(ConfigManager, "detect_apt_packages") @patch.object(ConfigManager, "detect_pip_packages") @patch.object(ConfigManager, "detect_npm_packages") - def test_detect_all_packages(self, mock_npm, mock_pip, mock_apt): + @patch.object(ConfigManager, "detect_services") + def test_detect_all_packages(self, mock_services, mock_npm, mock_pip, mock_apt): """Test detection of all packages from all sources.""" mock_apt.return_value = [{"name": "curl", "version": "7.0.0", "source": "apt"}] mock_pip.return_value = [{"name": "numpy", "version": "1.24.0", "source": "pip"}] mock_npm.return_value = [{"name": "typescript", "version": "5.0.0", "source": "npm"}] + mock_services.return_value = [] packages = self.config_manager.detect_installed_packages() From 85a968a70f6f64490dc85766fe1f851a224452f6 Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Tue, 20 Jan 2026 01:14:36 +0530 Subject: [PATCH 3/4] fix: address code review feedback --- cortex/cli.py | 275 ++++++++++++--------- cortex/config_manager.py | 80 ++++-- tests/unit/test_config_manager_services.py | 35 ++- 3 files changed, 219 insertions(+), 171 deletions(-) diff --git a/cortex/cli.py b/cortex/cli.py index f4191570d..1af443b08 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -6,7 +6,7 @@ import uuid from datetime import datetime, timezone from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Optional from rich.markdown import Markdown @@ -1974,159 +1974,188 @@ def systemd(self, service: str, action: str = "status", verbose: bool = False): def template(self, args): """Handle template commands""" - from cortex.config_manager import ConfigManager from cortex.template_manager import TemplateManager manager = TemplateManager() cmd = args.template_command if not cmd: - self.console.print( + console.print( "[yellow]Usage: cortex template [create|deploy|list|show|delete|export|import][/yellow]" ) return 1 if cmd == "create": - console.print("📸 [bold]Capturing system state...[/bold]") - version = manager.create_template( - args.name, args.description, package_sources=args.sources - ) + return self._template_create(manager, args) + elif cmd == "list": + return self._template_list(manager) + elif cmd == "show": + return self._template_show(manager, args) + elif cmd == "deploy": + return self._template_deploy(manager, args) + elif cmd == "delete": + return self._template_delete(manager, args) + elif cmd == "export": + return self._template_export(manager, args) + elif cmd == "import": + return self._template_import(manager, args) - # Get counts for the summary - t = manager.get_template(args.name, version) - pkgs = t["config"].get("packages", []) - apps = [p for p in pkgs if p["source"] != "service"] - services = [p for p in pkgs if p["source"] == "service"] + return 0 - console.print(f" - {len(apps)} packages") - console.print( - f" - {len(t['config'].get('preferences', {})) + len(t['config'].get('environment_variables', {}))} configurations" - ) - console.print(f" - {len(services)} services") - console.print(f"[green]✓[/green] Template saved: [bold]{args.name}-{version}[/bold]\n") - return 0 + def _parse_template_spec(self, name_spec: str) -> tuple[str, str | None]: + """Parse template name and version from spec (name:v1 or name-v1).""" + if ":" in name_spec: + parts = name_spec.split(":", 1) + return parts[0], parts[1] + elif "-v" in name_spec: + idx = name_spec.rfind("-v") + return name_spec[:idx], name_spec[idx + 1 :] + return name_spec, None + + def _template_create(self, manager, args) -> int: + """Handle 'template create' command.""" + console.print("📸 [bold]Capturing system state...[/bold]") + version = manager.create_template(args.name, args.description, package_sources=args.sources) + + template_data = manager.get_template(args.name, version) + if not template_data: + return 1 - elif cmd == "list": - templates = manager.list_templates() - if not templates: - console.print("No templates found.") - return 0 + pkgs = template_data["config"].get("packages", []) + apps = [p for p in pkgs if p["source"] != "service"] + services = [p for p in pkgs if p["source"] == "service"] - console.print("\n[bold cyan]Available System Templates:[/bold cyan]\n") - for t in templates: - console.print(f"• [bold]{t['name']}[/bold] (latest: {t['latest_version']})") - for v in t["versions"]: - console.print( - f" - {v['version']} ({v['created_at'][:10]}): {v['description']}" - ) - console.print() + console.print(f" - {len(apps)} packages") + console.print( + f" - {len(template_data['config'].get('preferences', {})) + len(template_data['config'].get('environment_variables', {}))} configurations" + ) + console.print(f" - {len(services)} services") + console.print(f"[green]✓[/green] Template saved: [bold]{args.name}-{version}[/bold]\n") + return 0 + + def _template_list(self, manager) -> int: + """Handle 'template list' command.""" + templates = manager.list_templates() + if not templates: + console.print("No templates found.") return 0 - elif cmd == "show" or cmd == "deploy" or cmd == "delete" or cmd == "export": - # These need a template specification - # Support both name:v1 and name-v1 syntax - if ":" in args.name: - parts = args.name.split(":", 1) - name = parts[0] - version = parts[1] - elif "-v" in args.name: - # Find the last -v - idx = args.name.rfind("-v") - name = args.name[:idx] - version = args.name[idx + 1 :] - else: - name = args.name - version = None + console.print("\n[bold cyan]Available System Templates:[/bold cyan]\n") + for template in templates: + console.print( + f"• [bold]{template['name']}[/bold] (latest: {template['latest_version']})" + ) + for v in template["versions"]: + console.print(f" - {v['version']} ({v['created_at'][:10]}): {v['description']}") + console.print() + return 0 - if cmd == "show": - t = manager.get_template(name, version) - if not t: - console.print(f"[red]Error:[/red] Template '{args.name}' not found.") - return 1 + def _template_show(self, manager, args) -> int: + """Handle 'template show' command.""" + name, version = self._parse_template_spec(args.name) + template_data = manager.get_template(name, version) + if not template_data: + console.print(f"[red]Error:[/red] Template '{args.name}' not found.") + return 1 - console.print(f"\n[bold]{t['name']}:{t['version']}[/bold]") - console.print(f"Description: {t['description']}") - console.print(f"Created: {t['created_at']}") - console.print(f"OS: {t['config'].get('os', 'unknown')}") + console.print(f"\n[bold]{template_data['name']}:{template_data['version']}[/bold]") + console.print(f"Description: {template_data['description']}") + console.print(f"Created: {template_data['created_at']}") + console.print(f"OS: {template_data['config'].get('os', 'unknown')}") - pkgs = t["config"].get("packages", []) - apps = [p for p in pkgs if p["source"] != "service"] - services = [p for p in pkgs if p["source"] == "service"] + pkgs = template_data["config"].get("packages", []) + apps = [p for p in pkgs if p["source"] != "service"] + services = [p for p in pkgs if p["source"] == "service"] - console.print("\n[bold]Summary:[/bold]") - console.print(f"- {len(apps)} Packages") - console.print( - f"- {len(t['config'].get('preferences', {})) + len(t['config'].get('environment_variables', {}))} Configurations" - ) - console.print(f"- {len(services)} Services") - return 0 + console.print("\n[bold]Summary:[/bold]") + console.print(f"- {len(apps)} Packages") + console.print( + f"- {len(template_data['config'].get('preferences', {})) + len(template_data['config'].get('environment_variables', {}))} Configurations" + ) + console.print(f"- {len(services)} Services") + return 0 - elif cmd == "delete": - if manager.delete_template(name, version): - spec = f"{name}:{version}" if version else name - console.print(f"[green]✓[/green] Deleted template: {spec}") - return 0 - console.print(f"[red]Error:[/red] Template '{args.name}' not found.") - return 1 + def _template_deploy(self, manager, args) -> int: + """Handle 'template deploy' command.""" + from cortex.config_manager import ConfigManager - elif cmd == "export": - try: - path = manager.export_template(name, version or "v1", args.file) - console.print(f"[green]✓[/green] Template exported to: {path}") - return 0 - except Exception as e: - console.print(f"[red]Error:[/red] {e}") - return 1 + name, version = self._parse_template_spec(args.name) + template_data = manager.get_template(name, version) + if not template_data: + console.print(f"[red]Error:[/red] Template '{args.name}' not found.") + return 1 - elif cmd == "deploy": - t = manager.get_template(name, version) - if not t: - console.print(f"[red]Error:[/red] Template '{args.name}' not found.") - return 1 + config_manager = ConfigManager() - config_manager = ConfigManager() + if args.dry_run: + console.print( + f"\n[bold cyan]Previewing deployment of {template_data['name']}:{template_data['version']}[/bold cyan]\n" + ) + diff = config_manager.diff_configuration(template_data["config"]) - if args.dry_run: - console.print( - f"\n[bold cyan]Previewing deployment of {t['name']}:{t['version']}[/bold cyan]\n" - ) - diff = config_manager.diff_configuration(t["config"]) + if diff["packages_to_install"]: + console.print( + f"📦 [bold]To Install:[/bold] {len(diff['packages_to_install'])} packages" + ) + if diff["services_to_update"]: + console.print( + f"⚙️ [bold]To Update:[/bold] {len(diff['services_to_update'])} services" + ) - if diff["packages_to_install"]: - console.print( - f"📦 [bold]To Install:[/bold] {len(diff['packages_to_install'])} packages" - ) - if diff["services_to_update"]: - console.print( - f"⚙️ [bold]To Update:[/bold] {len(diff['services_to_update'])} services" - ) + if not diff["packages_to_install"] and not diff["services_to_update"]: + console.print("[green]System already matches template.[/green]") + return 0 - if not diff["packages_to_install"] and not diff["services_to_update"]: - console.print("[green]System already matches template.[/green]") - return 0 + target_label = f" to {args.to}" if hasattr(args, "to") and args.to else "" + console.print(f"🚀 [bold]Deploying template{target_label}...[/bold]") + result = config_manager.import_configuration( + str( + manager.base_dir + / template_data["name"] + / template_data["version"] + / "template.yaml" + ), + force=args.force, + ) - console.print("🚀 [bold]Deploying template...[/bold]") - result = config_manager.import_configuration( - str(manager.base_dir / t["name"] / t["version"] / "template.yaml"), - force=args.force, - ) + if result: + console.print(" [green]✓[/green] Packages installed") + console.print(" [green]✓[/green] Configurations applied") + console.print(" [green]✓[/green] Services started") + console.print(f"[green]✓[/green] System cloned successfully{target_label}\n") + return 0 + return 1 - if result: - console.print(" [green]✓[/green] Packages installed") - console.print(" [green]✓[/green] Configurations applied") - console.print(" [green]✓[/green] Services started") - console.print("[green]✓[/green] System cloned successfully\n") - return 0 - return 1 + def _template_delete(self, manager, args) -> int: + """Handle 'template delete' command.""" + name, version = self._parse_template_spec(args.name) + if manager.delete_template(name, version): + spec = f"{name}:{version}" if version else name + console.print(f"[green]✓[/green] Deleted template: {spec}") + return 0 + console.print(f"[red]Error:[/red] Template '{args.name}' not found.") + return 1 - elif cmd == "import": - try: - name, version = manager.import_template(args.file) - console.print(f"[green]✓[/green] Template imported: [bold]{name}:{version}[/bold]") - return 0 - except Exception as e: - console.print(f"[red]Error:[/red] {e}") - return 1 + def _template_export(self, manager, args) -> int: + """Handle 'template export' command.""" + name, version = self._parse_template_spec(args.name) + try: + path = manager.export_template(name, version or "v1", args.file) + console.print(f"[green]✓[/green] Template exported to: {path}") + return 0 + except ValueError as e: + console.print(f"[red]Error:[/red] {e}") + return 1 + + def _template_import(self, manager, args) -> int: + """Handle 'template import' command.""" + try: + name, version = manager.import_template(args.file) + console.print(f"[green]✓[/green] Template imported: [bold]{name}:{version}[/bold]") + return 0 + except (FileNotFoundError, ValueError) as e: + console.print(f"[red]Error:[/red] {e}") + return 1 return 0 diff --git a/cortex/config_manager.py b/cortex/config_manager.py index ac08581f3..ec9502076 100755 --- a/cortex/config_manager.py +++ b/cortex/config_manager.py @@ -219,16 +219,50 @@ def detect_services(self) -> list[dict[str, Any]]: services = [] try: - # Get list of all service units - result = subprocess.run( - ["systemctl", "list-units", "--type=service", "--all", "--no-legend", "--no-pager"], + # 1. Get enabled status for all services (bulk) + enabled_map = {} + enabled_result = subprocess.run( + [ + "systemctl", + "list-unit-files", + "--type=service", + "--all", + "--no-legend", + "--no-pager", + ], + capture_output=True, + text=True, + timeout=self.DETECTION_TIMEOUT, + ) + if enabled_result.returncode == 0: + for line in enabled_result.stdout.strip().split("\n"): + if not line.strip(): + continue + parts = line.split() + if len(parts) >= 2: + unit = parts[0] + state = parts[1] + enabled_map[unit] = state == "enabled" + + # 2. Get active/sub state for all services (bulk) + units_result = subprocess.run( + [ + "systemctl", + "list-units", + "--type=service", + "--all", + "--no-legend", + "--no-pager", + ], capture_output=True, text=True, timeout=self.DETECTION_TIMEOUT, ) - if result.returncode == 0: - for line in result.stdout.strip().split("\n"): + if units_result.returncode == 0: + for line in units_result.stdout.strip().split("\n"): + if not line.strip(): + continue parts = line.split() if len(parts) >= 4: service_name = parts[0] @@ -239,21 +273,12 @@ def detect_services(self) -> list[dict[str, Any]]: active_state = parts[2] # active, inactive, failed sub_state = parts[3] # running, dead, exited - # Get enabled status - enabled_result = subprocess.run( - ["systemctl", "is-enabled", service_name], - capture_output=True, - text=True, - timeout=5, - ) - is_enabled = enabled_result.stdout.strip() == "enabled" - services.append( { "name": service_name, "active_state": active_state, "sub_state": sub_state, - "enabled": is_enabled, + "enabled": enabled_map.get(service_name, False), "source": self.SOURCE_SERVICE, } ) @@ -765,13 +790,16 @@ def import_configuration( "preferences_updated": False, } + # Compute diff once for efficiency + diff = self.diff_configuration(config) + # Import packages if "packages" in selective: - self._import_packages(config, summary) + self._import_packages(diff, summary) # Import services if "services" in selective: - self._import_services(config, summary) + self._import_services(diff, summary) # Import preferences if "preferences" in selective: @@ -779,22 +807,22 @@ def import_configuration( return summary - def _import_packages(self, config: dict[str, Any], summary: dict[str, Any]) -> None: + def _import_packages(self, diff: dict[str, Any], summary: dict[str, Any]) -> None: """ Import packages from configuration and update system state. - This method processes package installations by first computing the - difference between the current system state and the target configuration - using diff_configuration(). It then attempts to install, upgrade, or - downgrade packages as needed. + This method processes package installations based on the pre-computed + diff result. It attempts to install, upgrade, or downgrade packages + as needed. The method continues processing all packages even if individual packages fail to install, ensuring maximum success. Failed installations are tracked in the summary for user review. Args: - config: Configuration dictionary containing package specifications - Expected to have 'packages' key with list of package dicts + diff: Pre-computed diff dictionary from diff_configuration() + Expected to have keys: 'packages_to_install', 'packages_to_upgrade', + 'packages_to_downgrade' summary: Summary dictionary to update with results. Modified in-place with keys: 'installed', 'upgraded', 'failed' @@ -808,7 +836,6 @@ def _import_packages(self, config: dict[str, Any], summary: dict[str, Any]) -> N Each package is categorized based on diff results (install vs upgrade). Errors are caught and logged to allow processing to continue. """ - diff = self.diff_configuration(config) packages_to_process = ( diff["packages_to_install"] + diff["packages_to_upgrade"] @@ -830,11 +857,10 @@ def _import_packages(self, config: dict[str, Any], summary: dict[str, Any]) -> N except Exception as e: summary["failed"].append(f"{pkg['name']} ({str(e)})") - def _import_services(self, config: dict[str, Any], summary: dict[str, Any]) -> None: + def _import_services(self, diff: dict[str, Any], summary: dict[str, Any]) -> None: """ Import and update service states from configuration. """ - diff = self.diff_configuration(config) services_to_process = diff.get("services_to_update", []) # Note: services_missing are ignored as we don't have unit files to create them diff --git a/tests/unit/test_config_manager_services.py b/tests/unit/test_config_manager_services.py index 1eb0ec9ee..2e1b6d06d 100644 --- a/tests/unit/test_config_manager_services.py +++ b/tests/unit/test_config_manager_services.py @@ -20,21 +20,17 @@ def setUp(self): @patch("subprocess.run") def test_detect_services(self, mock_run): + # Mock list-unit-files + mock_unit_files = MagicMock() + mock_unit_files.returncode = 0 + mock_unit_files.stdout = "test.service enabled\nother.service disabled\n" + # Mock list-units mock_units = MagicMock() mock_units.returncode = 0 mock_units.stdout = "test.service loaded active running Test Service\nother.service loaded inactive dead Other Service\n" - # Mock is-enabled (called twice) - mock_enabled = MagicMock() - mock_enabled.returncode = 0 - mock_enabled.stdout = "enabled\n" - - mock_disabled = MagicMock() - mock_disabled.returncode = 0 - mock_disabled.stdout = "disabled\n" - - mock_run.side_effect = [mock_units, mock_enabled, mock_disabled] + mock_run.side_effect = [mock_unit_files, mock_units] services = self.config_manager.detect_services() @@ -141,19 +137,17 @@ def test_diff_configuration_with_services( self.assertEqual(len(diff["services_missing"]), 1) self.assertEqual(diff["services_missing"][0]["name"], "new.service") - @patch("cortex.config_manager.ConfigManager.diff_configuration") @patch("cortex.config_manager.ConfigManager._update_service") - def test_import_services(self, mock_update, mock_diff): + def test_import_services(self, mock_update): mock_update.return_value = True - mock_diff.return_value = { + diff = { "services_missing": [{"name": "s1"}], "services_to_update": [{"name": "s2"}], } - config = {"packages": []} - summary = {"installed": [], "failed": [], "services_updated": [], "services_failed": []} + summary = {"installed": [], "failed": [], "services_updated": []} - self.config_manager._import_services(config, summary) + self.config_manager._import_services(diff, summary) # summary has "services_updated" from _import_services self.assertEqual(len(summary["services_updated"]), 1) # s1 is ignored as per implementation @@ -172,14 +166,13 @@ def test_detect_installed_packages_with_services(self, mock_apt, mock_srv): self.assertIn("apt", sources) self.assertIn("service", sources) - @patch("cortex.config_manager.ConfigManager.diff_configuration") @patch("cortex.config_manager.ConfigManager._update_service") - def test_import_services_failure(self, mock_update, mock_diff): + def test_import_services_failure(self, mock_update): mock_update.return_value = False - mock_diff.return_value = {"services_to_update": [{"name": "fail-srv"}]} - summary = {"installed": [], "failed": [], "services_updated": [], "services_failed": []} + diff = {"services_to_update": [{"name": "fail-srv"}]} + summary = {"installed": [], "failed": [], "services_updated": []} - self.config_manager._import_services({}, summary) + self.config_manager._import_services(diff, summary) self.assertIn("service:fail-srv", summary["failed"]) def test_categorize_service_skip(self): From 1604ad8e6ced9cc185b450daaa0419aa1d950927 Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Tue, 20 Jan 2026 01:38:03 +0530 Subject: [PATCH 4/4] feat: implement security hardening, safety defaults, and typing for template system --- README.md | 2 +- cortex/cli.py | 28 +++++++++------ cortex/config_manager.py | 25 +++++++------ cortex/template_manager.py | 74 ++++++++++++++++++++++++++++++-------- 4 files changed, 92 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index c04be030e..913d3cecd 100644 --- a/README.md +++ b/README.md @@ -194,7 +194,7 @@ cortex role set | `cortex sandbox ` | Test packages in Docker sandbox | | `cortex history` | View all past installations | | `cortex rollback ` | Undo a specific installation | -| `cortex template ` | System cloning and templating (create, deploy, list, show) | +| `cortex template ` | System cloning and templating (create, deploy, list, show, delete, export, import) | | `cortex --version` | Show version information | | `cortex --help` | Display help message | diff --git a/cortex/cli.py b/cortex/cli.py index 1af443b08..4b1873d7d 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -1972,7 +1972,7 @@ def systemd(self, service: str, action: str = "status", verbose: bool = False): return run_systemd_helper(service, action, verbose) - def template(self, args): + def template(self, args: argparse.Namespace) -> int: """Handle template commands""" from cortex.template_manager import TemplateManager @@ -2012,7 +2012,7 @@ def _parse_template_spec(self, name_spec: str) -> tuple[str, str | None]: return name_spec[:idx], name_spec[idx + 1 :] return name_spec, None - def _template_create(self, manager, args) -> int: + def _template_create(self, manager: Any, args: argparse.Namespace) -> int: """Handle 'template create' command.""" console.print("📸 [bold]Capturing system state...[/bold]") version = manager.create_template(args.name, args.description, package_sources=args.sources) @@ -2033,7 +2033,7 @@ def _template_create(self, manager, args) -> int: console.print(f"[green]✓[/green] Template saved: [bold]{args.name}-{version}[/bold]\n") return 0 - def _template_list(self, manager) -> int: + def _template_list(self, manager: Any) -> int: """Handle 'template list' command.""" templates = manager.list_templates() if not templates: @@ -2050,7 +2050,7 @@ def _template_list(self, manager) -> int: console.print() return 0 - def _template_show(self, manager, args) -> int: + def _template_show(self, manager: Any, args: argparse.Namespace) -> int: """Handle 'template show' command.""" name, version = self._parse_template_spec(args.name) template_data = manager.get_template(name, version) @@ -2075,7 +2075,7 @@ def _template_show(self, manager, args) -> int: console.print(f"- {len(services)} Services") return 0 - def _template_deploy(self, manager, args) -> int: + def _template_deploy(self, manager: Any, args: argparse.Namespace) -> int: """Handle 'template deploy' command.""" from cortex.config_manager import ConfigManager @@ -2087,7 +2087,10 @@ def _template_deploy(self, manager, args) -> int: config_manager = ConfigManager() - if args.dry_run: + # Safety default: deploy is dry-run unless --execute is passed + is_dry_run = args.dry_run or not args.execute + + if is_dry_run: console.print( f"\n[bold cyan]Previewing deployment of {template_data['name']}:{template_data['version']}[/bold cyan]\n" ) @@ -2126,7 +2129,7 @@ def _template_deploy(self, manager, args) -> int: return 0 return 1 - def _template_delete(self, manager, args) -> int: + def _template_delete(self, manager: Any, args: argparse.Namespace) -> int: """Handle 'template delete' command.""" name, version = self._parse_template_spec(args.name) if manager.delete_template(name, version): @@ -2136,7 +2139,7 @@ def _template_delete(self, manager, args) -> int: console.print(f"[red]Error:[/red] Template '{args.name}' not found.") return 1 - def _template_export(self, manager, args) -> int: + def _template_export(self, manager: Any, args: argparse.Namespace) -> int: """Handle 'template export' command.""" name, version = self._parse_template_spec(args.name) try: @@ -2147,7 +2150,7 @@ def _template_export(self, manager, args) -> int: console.print(f"[red]Error:[/red] {e}") return 1 - def _template_import(self, manager, args) -> int: + def _template_import(self, manager: Any, args: argparse.Namespace) -> int: """Handle 'template import' command.""" try: name, version = manager.import_template(args.file) @@ -3473,7 +3476,12 @@ def main(): # Deploy deploy_p = template_subparsers.add_parser("deploy", help="Deploy template") deploy_p.add_argument("name", help="Template name[:version]") - deploy_p.add_argument("--dry-run", action="store_true", help="Preview changes") + deploy_p.add_argument( + "--dry-run", action="store_true", help="Preview changes (default behavior)" + ) + deploy_p.add_argument( + "--execute", action="store_true", help="Apply changes; defaults to dry-run" + ) deploy_p.add_argument("--force", action="store_true", help="Skip compatibility checks") deploy_p.add_argument("--to", help="Target system (e.g., server-02)") diff --git a/cortex/config_manager.py b/cortex/config_manager.py index ec9502076..b9c99cd9a 100755 --- a/cortex/config_manager.py +++ b/cortex/config_manager.py @@ -881,17 +881,20 @@ def _update_service(self, srv: dict[str, Any]) -> bool: name = srv["name"] success = True - if srv.get("enabled"): - cmd = ["sudo", "systemctl", "enable", name] - res = subprocess.run(cmd, capture_output=True) - if res.returncode != 0: - success = False - - if srv.get("active_state") == "active": - cmd = ["sudo", "systemctl", "start", name] - res = subprocess.run(cmd, capture_output=True) - if res.returncode != 0: - success = False + try: + if srv.get("enabled"): + cmd = ["sudo", "systemctl", "enable", name] + res = subprocess.run(cmd, capture_output=True, timeout=self.DETECTION_TIMEOUT) + if res.returncode != 0: + success = False + + if srv.get("active_state") == "active": + cmd = ["sudo", "systemctl", "start", name] + res = subprocess.run(cmd, capture_output=True, timeout=self.DETECTION_TIMEOUT) + if res.returncode != 0: + success = False + except (subprocess.TimeoutExpired, subprocess.SubprocessError): + success = False return success diff --git a/cortex/template_manager.py b/cortex/template_manager.py index 9b28b73e3..6e9734443 100644 --- a/cortex/template_manager.py +++ b/cortex/template_manager.py @@ -5,6 +5,7 @@ import json import os +import re import shutil import zipfile from datetime import datetime @@ -46,10 +47,16 @@ def create_template( template_dir.mkdir(parents=True, exist_ok=True) # Get next version - versions = [d.name for d in template_dir.iterdir() if d.is_dir() and d.name.startswith("v")] - next_v = 1 - if versions: - next_v = max([int(v[1:]) for v in versions]) + 1 + def get_v_num(v_str): + match = re.search(r"v(\d+)", v_str) + return int(match.group(1)) if match else 0 + + versions = [ + get_v_num(d.name) + for d in template_dir.iterdir() + if d.is_dir() and d.name.startswith("v") + ] + next_v = max(versions) + 1 if versions else 1 version_name = f"v{next_v}" version_dir = template_dir / version_name @@ -98,7 +105,11 @@ def list_templates(self) -> list[dict[str, Any]]: if versions: # Sort versions by version number descending - versions.sort(key=lambda x: int(x["version"][1:]), reverse=True) + def get_v_num(v_str): + match = re.search(r"v(\d+)", v_str) + return int(match.group(1)) if match else 0 + + versions.sort(key=lambda x: get_v_num(x["version"]), reverse=True) templates.append( { "name": template_dir.name, @@ -119,12 +130,16 @@ def get_template(self, name: str, version: str | None = None) -> dict[str, Any] if not version: # Use latest version + def get_v_num(v_str): + match = re.search(r"v(\d+)", v_str) + return int(match.group(1)) if match else 0 + versions = [ v.name for v in template_path.iterdir() if v.is_dir() and v.name.startswith("v") ] if not versions: return None - version = sorted(versions, key=lambda x: int(x[1:]))[-1] + version = sorted(versions, key=get_v_num)[-1] version_path = template_path / version if not version_path.exists(): @@ -189,7 +204,7 @@ def import_template(self, input_path: str) -> tuple[str, str]: """ Import a template from a ZIP file. """ - input_path = Path(input_path) + input_path = Path(input_path).resolve() if not input_path.exists(): raise FileNotFoundError(f"File not found: {input_path}") @@ -197,20 +212,49 @@ def import_template(self, input_path: str) -> tuple[str, str]: import tempfile with tempfile.TemporaryDirectory() as tmpdir: + tmpdir_path = Path(tmpdir).resolve() with zipfile.ZipFile(input_path, "r") as zipf: - zipf.extractall(tmpdir) - - metadata_file = Path(tmpdir) / "metadata.json" + for member in zipf.infolist(): + # Zip Slip protection: check for absolute paths or ".." + member_path = Path(member.filename) + if member_path.is_absolute() or ".." in member.filename: + continue # Skip unsafe members + + target = (tmpdir_path / member_path).resolve() + if not target.is_relative_to(tmpdir_path): + continue # Skip members outside tmpdir + + if member.is_dir(): + target.mkdir(parents=True, exist_ok=True) + else: + target.parent.mkdir(parents=True, exist_ok=True) + with zipf.open(member) as source, open(target, "wb") as dest: + shutil.copyfileobj(source, dest) + + metadata_file = tmpdir_path / "metadata.json" if not metadata_file.exists(): raise ValueError("Invalid template: missing metadata.json") with open(metadata_file) as f: metadata = json.load(f) - name = metadata["name"] - version = metadata["version"] + raw_name = metadata.get("name", "") + raw_version = metadata.get("version", "") + + # Sanitize name and version to prevent path traversal + if not re.match(r"^[a-zA-Z0-9._-]+$", raw_name) or not re.match( + r"^[a-zA-Z0-9._-]+$", raw_version + ): + raise ValueError("Invalid template: malformed name or version") + + name = raw_name + version = raw_version + + # Ensure target path is strictly within base_dir + target_path = (self.base_dir / name / version).resolve() + if not target_path.is_relative_to(self.base_dir.resolve()): + raise ValueError("Invalid template: target path outside templates directory") - target_path = self.base_dir / name / version if target_path.exists(): # Append import suffix if version already exists import time @@ -218,8 +262,8 @@ def import_template(self, input_path: str) -> tuple[str, str]: version = f"{version}-import-{int(time.time())}" target_path = self.base_dir / name / version - target_path.mkdir(parents=True) - for item in Path(tmpdir).iterdir(): + target_path.mkdir(parents=True, exist_ok=True) + for item in tmpdir_path.iterdir(): if item.is_file(): shutil.copy2(item, target_path / item.name)