diff --git a/scanner_py/README.md b/scanner_py/README.md index 6a8db18..943aeac 100644 --- a/scanner_py/README.md +++ b/scanner_py/README.md @@ -222,7 +222,7 @@ Scanning images [████████████████░░░░] 8 ``` - `✓` = Successful scans -- `✗` = Failed scans +- `✗` = Failed scans - `🚫` = Skipped scans (unsigned images) ## Project Structure @@ -275,4 +275,3 @@ This Python package is a port of the original bash scripts: ## License MIT License - see LICENSE file for details. - diff --git a/scanner_py/__init__.py b/scanner_py/__init__.py index b91200e..7767208 100644 --- a/scanner_py/__init__.py +++ b/scanner_py/__init__.py @@ -23,4 +23,3 @@ "AttestationExtractor", "ChainguardVerifier", ] - diff --git a/scanner_py/__main__.py b/scanner_py/__main__.py index c7b06fa..674c291 100644 --- a/scanner_py/__main__.py +++ b/scanner_py/__main__.py @@ -20,4 +20,3 @@ def main() -> int: if __name__ == "__main__": sys.exit(main()) - diff --git a/scanner_py/cli/__init__.py b/scanner_py/cli/__init__.py index ec99d5e..e563ea0 100644 --- a/scanner_py/cli/__init__.py +++ b/scanner_py/cli/__init__.py @@ -111,4 +111,3 @@ def main(argv: Optional[List[str]] = None) -> int: __all__ = ["main", "create_main_parser"] - diff --git a/scanner_py/cli/cosign_scan.py b/scanner_py/cli/cosign_scan.py index 14e7ad9..94a605a 100644 --- a/scanner_py/cli/cosign_scan.py +++ b/scanner_py/cli/cosign_scan.py @@ -186,9 +186,9 @@ def run_scan_image(args: argparse.Namespace) -> int: if not args.dry_run: spinner = Spinner("Verifying image signature...") spinner.spin() - + result = scanner.scan(args.image, dry_run=args.dry_run) - + if result.success: spinner.finish("Scan completed", success=True) elif result.skipped: diff --git a/scanner_py/cli/extract.py b/scanner_py/cli/extract.py index 5f33bdd..33f38bc 100644 --- a/scanner_py/cli/extract.py +++ b/scanner_py/cli/extract.py @@ -153,13 +153,13 @@ def run_extract(args: argparse.Namespace) -> int: if args.list: spinner = Spinner("Discovering attestations...") spinner.spin() - + attestations = extractor.list_attestations(args.image) - + if not attestations.attestations: spinner.finish("No attestations found", success=False) return 1 - + spinner.finish(f"Found {len(attestations.attestations)} attestation type(s)") print() print("━" * 50) @@ -168,7 +168,7 @@ def run_extract(args: argparse.Namespace) -> int: print() print(f" {'Count':>5} {'Type':<12} URI") print(" " + "─" * 45) - + for pred_type, count in sorted(attestations.attestations.items()): # Get friendly name friendly_name = "unknown" @@ -212,7 +212,7 @@ def run_extract(args: argparse.Namespace) -> int: if content is None: spinner.finish("Extraction failed", success=False) - + if is_verbose(): print() print("ℹ️ Available attestations for this image:") diff --git a/scanner_py/cli/generate_report.py b/scanner_py/cli/generate_report.py index 7134aa2..ec19e31 100644 --- a/scanner_py/cli/generate_report.py +++ b/scanner_py/cli/generate_report.py @@ -87,7 +87,7 @@ def create_generate_report_parser(subparsers: Any) -> argparse.ArgumentParser: default="HIGH", help="Minimum CVE level to consider relevant (default: HIGH)", ) - + # Filter options parser.add_argument( "--filter-unaddressed", "-u", @@ -115,7 +115,7 @@ def load_summary_from_dir(input_dir: str) -> Optional[Dict[str, Any]]: if not summary_path.exists(): print(f"❌ Summary file not found: {summary_path}", file=sys.stderr) return None - + try: with open(summary_path) as f: return json.load(f) @@ -130,7 +130,7 @@ def load_summary_from_file(filepath: str) -> Optional[Dict[str, Any]]: if not path.exists(): print(f"❌ File not found: {filepath}", file=sys.stderr) return None - + try: with open(path) as f: return json.load(f) @@ -143,7 +143,7 @@ def merge_summaries(summaries: List[Dict[str, Any]]) -> Dict[str, Any]: """Merge multiple scan summaries into one combined summary.""" if len(summaries) == 1: return summaries[0] - + # Combine the summaries combined = { "scan_summary": { @@ -163,7 +163,7 @@ def merge_summaries(summaries: List[Dict[str, Any]]) -> Dict[str, Any]: "skipped_scans": [], "cve_analysis": [], } - + namespaces = [] for summary in summaries: ss = summary.get("scan_summary", {}) @@ -174,14 +174,14 @@ def merge_summaries(summaries: List[Dict[str, Any]]) -> Dict[str, Any]: combined["scan_summary"]["failed_scans"] += ss.get("failed_scans", 0) combined["scan_summary"]["skipped_scans"] += ss.get("skipped_scans", 0) namespaces.append(ss.get("namespace", "unknown")) - + combined["successful_scans"].extend(summary.get("successful_scans", [])) combined["failed_scans"].extend(summary.get("failed_scans", [])) combined["skipped_scans"].extend(summary.get("skipped_scans", [])) combined["cve_analysis"].extend(summary.get("cve_analysis", [])) - + combined["scan_summary"]["namespace"] = ", ".join(set(namespaces)) - + return combined @@ -194,17 +194,17 @@ def generate_markdown_report( ) -> str: """ Generate CVE analysis summary in Markdown format. - + This produces output compatible with oras-scan's format for easy combination in CI pipelines. - + Args: summary_data: Scan summary dictionary min_cve_level: Minimum CVE level considered relevant title: Custom report title filter_unaddressed: Only show images with unaddressed CVEs filter_missing_triage: Only show images with missing triage files - + Returns: Markdown formatted string """ @@ -213,14 +213,14 @@ def generate_markdown_report( cve_analysis = summary_data.get("cve_analysis", []) failed_scans = summary_data.get("failed_scans", []) skipped_scans = summary_data.get("skipped_scans", []) - + # Header report_title = title or "🔍 CVE Analysis Summary" lines.append(f"# {report_title}") lines.append("") lines.append(f"Generated on: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC") lines.append("") - + # Scan info lines.append("## 📊 Scan Information") lines.append("") @@ -234,25 +234,25 @@ def generate_markdown_report( lines.append(f"| **Skipped (unsigned)** | 🚫 {ss.get('skipped_scans', 0)} |") lines.append(f"| **Minimum CVE Level** | `{min_cve_level}` |") lines.append("") - + # CVE Analysis table if cve_analysis: lines.append("## 🛡️ CVE Analysis by Image") lines.append("") lines.append(f"> Minimum CVE Level: **{min_cve_level}** (levels below this are considered irrelevant)") lines.append("") - + # Table header lines.append("| Image | Unaddressed CVEs | Addressed CVEs | Irrelevant CVEs | Triage | Triage File | Chainguard |") lines.append("|-------|:----------------:|:--------------:|:---------------:|:-----------:|:-----------:|:----------:|") - + total_unaddressed = 0 total_addressed = 0 total_irrelevant = 0 images_with_triage = 0 images_with_chainguard = 0 displayed_count = 0 - + for analysis in cve_analysis: # Get CVE counts critical = analysis.get("critical", 0) @@ -260,17 +260,17 @@ def generate_markdown_report( medium = analysis.get("medium", 0) low = analysis.get("low", 0) triaged = analysis.get("triaged", 0) - + # Calculate categories based on min_cve_level=HIGH unaddressed = critical + high addressed = triaged irrelevant = medium + low - + # Get status has_triage = triaged > 0 # CVEs were actually triaged/addressed has_triage_file = analysis.get("triage_file") is not None # Triage file exists is_chainguard = analysis.get("is_chainguard", False) - + # Accumulate totals total_unaddressed += unaddressed total_addressed += addressed @@ -279,34 +279,34 @@ def generate_markdown_report( images_with_triage += 1 if is_chainguard: images_with_chainguard += 1 - + # Apply filters show_entry = True if filter_unaddressed and unaddressed == 0: show_entry = False if filter_missing_triage and has_triage_file: show_entry = False - + if not show_entry and (filter_unaddressed or filter_missing_triage): continue - + displayed_count += 1 - + # Format image name image_short = analysis["image"].split("/")[-1] if len(image_short) > 40: image_short = image_short[:37] + "..." - + # Format cells unaddr_str = f"✅ {unaddressed}" if unaddressed == 0 else f"🔴 **{unaddressed}**" triage_str = "✅ Present" if has_triage else "❌ Missing" triage_file_str = "✅" if has_triage_file else "❌" chainguard_str = "✅" if is_chainguard else "❌" - + lines.append(f"| `{image_short}` | {unaddr_str} | {addressed} | {irrelevant} | {triage_str} | {triage_file_str} | {chainguard_str} |") - + lines.append("") - + # Statistics lines.append("## 📈 Statistics") lines.append("") @@ -320,7 +320,7 @@ def generate_markdown_report( lines.append(f"| **Images with Triage** | {images_with_triage}/{len(cve_analysis)} |") lines.append(f"| **Images with Chainguard Base** | {images_with_chainguard}/{len(cve_analysis)} |") lines.append("") - + # Filter information if filter_unaddressed and filter_missing_triage: lines.append("**Filter applied:** Showing images with unaddressed CVEs OR missing triage files") @@ -331,14 +331,14 @@ def generate_markdown_report( else: lines.append("**Filter applied:** Showing all images") lines.append("") - + # Result badge if total_unaddressed == 0: lines.append("> 🎉 **All relevant CVEs have been addressed!**") else: lines.append(f"> ⚠️ **{total_unaddressed} unaddressed CVEs need attention**") lines.append("") - + # Failed scans section if failed_scans: lines.append("## ❌ Failed Scans") @@ -355,7 +355,7 @@ def generate_markdown_report( lines.append("") lines.append("") lines.append("") - + # Skipped scans section if skipped_scans: lines.append("## 🚫 Skipped Scans (Unsigned Images)") @@ -368,11 +368,11 @@ def generate_markdown_report( lines.append("") lines.append("") lines.append("") - + # Footer lines.append("---") lines.append(f"*Generated by scanner-py*") - + return "\n".join(lines) @@ -385,7 +385,7 @@ def print_cli_summary( """Print scan summary to CLI.""" ss = summary_data.get("scan_summary", {}) cve_analysis = summary_data.get("cve_analysis", []) - + print("━" * 120) print("📊 SCAN SUMMARY") print("━" * 120) @@ -398,14 +398,14 @@ def print_cli_summary( print(f" ❌ Failed: {ss.get('failed_scans', 0)}") print(f" 🚫 Skipped: {ss.get('skipped_scans', 0)} (unsigned)") print() - + if cve_analysis: print("━" * 120) print("🔍 CVE ANALYSIS SUMMARY") print("=" * 120) print(f"Minimum CVE Level: {min_cve_level} (levels below this are considered irrelevant)") print() - + # Print table header header = ( f"{'Image':<35} " @@ -417,34 +417,34 @@ def print_cli_summary( f"{'Chainguard Base':>16}" ) print(header) - + total_unaddressed = 0 total_addressed = 0 total_irrelevant = 0 images_with_triage = 0 images_with_chainguard = 0 - + for analysis in cve_analysis: critical = analysis.get("critical", 0) high = analysis.get("high", 0) medium = analysis.get("medium", 0) low = analysis.get("low", 0) triaged = analysis.get("triaged", 0) - + unaddressed = critical + high addressed = triaged irrelevant = medium + low - + has_triage = triaged > 0 # CVEs were actually triaged/addressed has_triage_file = analysis.get("triage_file") is not None # Triage file exists is_chainguard = analysis.get("is_chainguard", False) - + # Apply filters if filter_unaddressed and unaddressed == 0: continue if filter_missing_triage and has_triage_file: continue - + total_unaddressed += unaddressed total_addressed += addressed total_irrelevant += irrelevant @@ -452,13 +452,13 @@ def print_cli_summary( images_with_triage += 1 if is_chainguard: images_with_chainguard += 1 - + image_short = analysis["image"].split("/")[-1][:33] unaddr_icon = "✅" if unaddressed == 0 else "🔴" triage_str = "✅ Yes" if has_triage else "❌ No" triage_file_str = "✅ Yes" if has_triage_file else "❌ No" chainguard_str = "✅ Yes" if is_chainguard else "❌ No" - + row = ( f"{image_short:<35} " f"{unaddr_icon} {unaddressed:<14} " @@ -469,7 +469,7 @@ def print_cli_summary( f"{chainguard_str:>16}" ) print(row) - + print() print("━" * 120) print("📈 STATISTICS") @@ -480,7 +480,7 @@ def print_cli_summary( print(f"Images with triage: {images_with_triage}/{len(cve_analysis)}") print(f"Images with Chainguard base: {images_with_chainguard}/{len(cve_analysis)}") print() - + if total_unaddressed == 0: print("🎉 All relevant CVEs have been addressed!") else: @@ -490,20 +490,20 @@ def print_cli_summary( def run_generate_report(args: argparse.Namespace) -> int: """ Run report generation from existing scan results. - + Args: args: Parsed command line arguments - + Returns: Exit code """ # Setup logging log_level = LogLevel.VERBOSE if args.verbose else LogLevel.INFO setup_logging(log_level) - + # Load summaries summaries = [] - + if args.input_dir: summary = load_summary_from_dir(args.input_dir) if summary is None: @@ -515,14 +515,14 @@ def run_generate_report(args: argparse.Namespace) -> int: if summary is None: return 1 summaries.append(summary) - + if not summaries: print("❌ No scan summaries loaded", file=sys.stderr) return 1 - + # Merge if multiple combined = merge_summaries(summaries) - + # Generate output if args.output_format == "json": output_content = json.dumps(combined, indent=2) @@ -534,7 +534,7 @@ def run_generate_report(args: argparse.Namespace) -> int: filter_unaddressed=args.filter_unaddressed, filter_missing_triage=args.filter_missing_triage, ) - + # Print CLI summary print() print_cli_summary( @@ -544,12 +544,12 @@ def run_generate_report(args: argparse.Namespace) -> int: filter_missing_triage=args.filter_missing_triage, ) print() - + # Write output if args.output: output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) - + if args.append and output_path.exists(): with open(output_path, "a") as f: f.write("\n\n---\n\n") @@ -566,6 +566,5 @@ def run_generate_report(args: argparse.Namespace) -> int: print("MARKDOWN OUTPUT") print("=" * 80) print(output_content) - - return 0 + return 0 diff --git a/scanner_py/cli/k8s_scanner.py b/scanner_py/cli/k8s_scanner.py index baab3ff..12ba323 100644 --- a/scanner_py/cli/k8s_scanner.py +++ b/scanner_py/cli/k8s_scanner.py @@ -25,51 +25,51 @@ def prepare_trivy_db(verbose: bool = False) -> bool: """ Prepare Trivy database by cleaning and downloading fresh DB. - + This should be run once before parallel scans to avoid race conditions and ensure all scans use the same database version. - + Steps: 1. trivy clean --all (clean existing db) 2. trivy image --download-db-only (download fresh db) - + Returns: True if successful """ # Step 1: Clean existing database if verbose: logger.info("Cleaning existing Trivy database...") - + clean_args = ["trivy", "clean", "--all"] if not verbose: clean_args.append("--quiet") - + clean_result = run_command(clean_args, timeout=60) if not clean_result.success: if verbose: logger.warning(f"Failed to clean Trivy cache (may not exist): {clean_result.stderr}") # Continue anyway - might be first run - + # Step 2: Download fresh database if verbose: logger.info("Downloading Trivy vulnerability database...") - + download_args = ["trivy", "image", "--download-db-only"] if not verbose: download_args.append("--quiet") - + download_result = run_command( download_args, timeout=300, # DB download can take a while ) - + if not download_result.success: logger.error(f"Failed to download Trivy database: {download_result.stderr}") return False - + if verbose: logger.info("Trivy database ready") - + return True @@ -248,7 +248,7 @@ def run_cosign_scanner(args: argparse.Namespace) -> int: """ import time start_time = time.time() - + # Setup logging - errors only shown in verbose mode log_level = LogLevel.VERBOSE if args.verbose else LogLevel.INFO setup_logging(log_level, show_errors=args.verbose) @@ -405,7 +405,7 @@ def run_cosign_scanner(args: argparse.Namespace) -> int: print("📋 TEST FLOW: Processing only the first valid image") if not args.verbose: prev_log_level = suppress_logging() - + progress = ProgressBar( len(images_to_scan), "Test scan", @@ -414,13 +414,13 @@ def run_cosign_scanner(args: argparse.Namespace) -> int: for image in images_to_scan: result = scanner.scan(image) results.append(result) - + status = "success" if result.success else ("skipped" if result.skipped else "failed") progress.update(status=status, current_item=image.split("/")[-1]) - + if result.success and not result.skipped: break - + progress.finish() else: # Parallel scanning with progress bar @@ -441,16 +441,16 @@ def run_cosign_scanner(args: argparse.Namespace) -> int: try: result = future.result() results.append(result) - + if result.success: status = "success" elif result.skipped: status = "skipped" else: status = "failed" - + progress.update(status=status, current_item=image.split("/")[-1]) - + except Exception as e: results.append(ScanResult( image=image, @@ -472,12 +472,12 @@ def run_cosign_scanner(args: argparse.Namespace) -> int: prev_log_level = None if not args.verbose: prev_log_level = suppress_logging() - + try: spinner = Spinner("Checking Chainguard base images...") spinner.spin() chainguard_verifier = ChainguardVerifier() - + def check_chainguard(result: ScanResult) -> Tuple[ScanResult, Any]: """Check Chainguard for a single result.""" try: @@ -485,19 +485,19 @@ def check_chainguard(result: ScanResult) -> Tuple[ScanResult, Any]: return result, cg_result except Exception: return result, None - + # Run Chainguard checks in parallel with ThreadPoolExecutor(max_workers=min(len(successful_results), 5)) as executor: futures = { executor.submit(check_chainguard, result): result for result in successful_results } - + completed = 0 for future in as_completed(futures): result, cg_result = future.result() completed += 1 - + if cg_result: result.is_chainguard = cg_result.is_chainguard result.base_image = cg_result.base_image @@ -506,9 +506,9 @@ def check_chainguard(result: ScanResult) -> Tuple[ScanResult, Any]: result.is_chainguard = False result.base_image = "unknown" result.signature_verified = False - + spinner.update(f"Checked {completed}/{len(successful_results)} images...") - + spinner.finish(f"Chainguard verification complete ({len(successful_results)} images)") finally: if prev_log_level is not None: @@ -516,20 +516,20 @@ def check_chainguard(result: ScanResult) -> Tuple[ScanResult, Any]: # Generate summary AFTER Chainguard check summary = generate_summary(args, results, extraction_result) - + # Determine output paths json_path = Path(args.summary_json) if args.summary_json else output_dir / "scan-summary.json" markdown_path = Path(args.markdown_output) if args.markdown_output else output_dir / "cosign-cve-summary.md" - + # Ensure parent directories exist json_path.parent.mkdir(parents=True, exist_ok=True) markdown_path.parent.mkdir(parents=True, exist_ok=True) - + summary.save(str(json_path)) # Generate markdown summary for GitHub Actions markdown_content = generate_markdown_summary(summary, args.min_cve_level) - + # Handle append mode for combining CI outputs if args.append and markdown_path.exists(): with open(markdown_path, "a") as f: @@ -594,22 +594,22 @@ def generate_summary( def generate_markdown_summary(summary: ScanSummary, min_cve_level: str) -> str: """ Generate CVE analysis summary in Markdown format for GitHub Actions. - + This can be used with GITHUB_STEP_SUMMARY to display results in PR/Action summaries. - + Args: summary: Scan summary data min_cve_level: Minimum CVE level considered relevant - + Returns: Markdown formatted string """ lines = [] - + # Header lines.append("# 🔍 COSIGN SCAN CVE Analysis Summary") lines.append("") - + # Scan info lines.append("## 📊 Scan Information") lines.append("") @@ -623,47 +623,47 @@ def generate_markdown_summary(summary: ScanSummary, min_cve_level: str) -> str: lines.append(f"| **Skipped (unsigned)** | 🚫 {summary.skipped_scans} |") lines.append(f"| **Minimum CVE Level** | `{min_cve_level}` |") lines.append("") - + # CVE Analysis table if summary.cve_analysis: lines.append("## 🛡️ CVE Analysis by Image") lines.append("") lines.append(f"> Minimum CVE Level: **{min_cve_level}** (levels below this are considered irrelevant)") lines.append("") - + # Table header lines.append("| Image | Unaddressed | Addressed | Irrelevant | Triage | Triage File | Chainguard |") lines.append("|-------|:-----------:|:---------:|:----------:|:------:|:-----------:|:----------:|") - + total_unaddressed = 0 total_addressed = 0 total_irrelevant = 0 images_with_triage = 0 images_with_chainguard = 0 - + for analysis in summary.cve_analysis: image_short = analysis["image"].split("/")[-1] # Truncate if too long if len(image_short) > 40: image_short = image_short[:37] + "..." - + # Get CVE counts critical = analysis.get("critical", 0) high = analysis.get("high", 0) medium = analysis.get("medium", 0) low = analysis.get("low", 0) triaged = analysis.get("triaged", 0) - + # Calculate categories unaddressed = critical + high addressed = triaged irrelevant = medium + low - + # Get status has_triage = triaged > 0 # CVEs were actually triaged/addressed has_triage_file = analysis.get("triage_file") is not None # Triage file exists is_chainguard = analysis.get("is_chainguard", False) - + total_unaddressed += unaddressed total_addressed += addressed total_irrelevant += irrelevant @@ -671,17 +671,17 @@ def generate_markdown_summary(summary: ScanSummary, min_cve_level: str) -> str: images_with_triage += 1 if is_chainguard: images_with_chainguard += 1 - + # Format cells unaddr_str = f"✅ {unaddressed}" if unaddressed == 0 else f"🔴 **{unaddressed}**" triage_str = "✅" if has_triage else "❌" triage_file_str = "✅" if has_triage_file else "❌" chainguard_str = "✅" if is_chainguard else "❌" - + lines.append(f"| `{image_short}` | {unaddr_str} | {addressed} | {irrelevant} | {triage_str} | {triage_file_str} | {chainguard_str} |") - + lines.append("") - + # Statistics lines.append("## 📈 Statistics") lines.append("") @@ -693,14 +693,14 @@ def generate_markdown_summary(summary: ScanSummary, min_cve_level: str) -> str: lines.append(f"| **Images with Triage** | {images_with_triage}/{len(summary.cve_analysis)} |") lines.append(f"| **Images with Chainguard Base** | {images_with_chainguard}/{len(summary.cve_analysis)} |") lines.append("") - + # Result badge if total_unaddressed == 0: lines.append("> 🎉 **All relevant CVEs have been addressed!**") else: lines.append(f"> ⚠️ **{total_unaddressed} unaddressed CVEs need attention**") lines.append("") - + # Failed scans section if summary.failed_scans > 0: lines.append("## ❌ Failed Scans") @@ -714,7 +714,7 @@ def generate_markdown_summary(summary: ScanSummary, min_cve_level: str) -> str: lines.append("") lines.append("") lines.append("") - + # Skipped scans section if summary.skipped_scans > 0: lines.append("## 🚫 Skipped Scans (Unsigned Images)") @@ -727,11 +727,11 @@ def generate_markdown_summary(summary: ScanSummary, min_cve_level: str) -> str: lines.append("") lines.append("") lines.append("") - + # Footer lines.append("---") lines.append(f"*Generated by scanner-py on {summary.timestamp.strftime('%Y-%m-%d %H:%M:%S')} UTC*") - + return "\n".join(lines) @@ -790,21 +790,21 @@ def print_summary(summary: ScanSummary, min_cve_level: str, verbose: bool = Fals for analysis in summary.cve_analysis: image_short = analysis["image"].split("/")[-1][:33] - + # Get CVE counts critical = analysis.get("critical", 0) high = analysis.get("high", 0) medium = analysis.get("medium", 0) low = analysis.get("low", 0) triaged = analysis.get("triaged", 0) - + # Unaddressed = Critical + High (above min_cve_level) unaddressed = critical + high # Addressed = triaged CVEs addressed = triaged # Irrelevant = Medium + Low (below min_cve_level HIGH) irrelevant = medium + low - + # Get triage and chainguard status has_triage = triaged > 0 # CVEs were actually triaged/addressed has_triage_file = analysis.get("triage_file") is not None # Triage file exists diff --git a/scanner_py/cli/oras_scan.py b/scanner_py/cli/oras_scan.py index 0cd023d..b7e2cab 100644 --- a/scanner_py/cli/oras_scan.py +++ b/scanner_py/cli/oras_scan.py @@ -38,28 +38,28 @@ class ImageScanResult: image_ref: str # Image without registry prefix success: bool = False error: Optional[str] = None - + # CVE data high_critical_cves: Set[str] = field(default_factory=set) triaged_cves: Set[str] = field(default_factory=set) - + # Triage file status has_triage: bool = False triage_file: Optional[str] = None - + # Output directory output_dir: Optional[str] = None - + @property def unaddressed_cves(self) -> Set[str]: """CVEs found but not triaged.""" return self.high_critical_cves - self.triaged_cves - + @property def addressed_cves(self) -> Set[str]: """CVEs that are both found and triaged.""" return self.high_critical_cves & self.triaged_cves - + @property def irrelevant_cves(self) -> Set[str]: """Triaged CVEs not found in scan (may have been fixed).""" @@ -189,42 +189,42 @@ def get_images_from_kubernetes( ) -> List[str]: """ Get unique container images from Kubernetes pods. - + Equivalent to: kubectl get pods -o json | jq -r '.items[] | (.spec.containers[].image)' """ args = ["kubectl", "get", "pods", "-o", "json"] - + if namespace: args.extend(["-n", namespace]) if kubeconfig: args.extend(["--kubeconfig", kubeconfig]) if context: args.extend(["--context", context]) - + result = run_command(args, timeout=60) if not result.success: logger.error(f"Failed to get pods: {result.stderr}") return [] - + try: data = json.loads(result.stdout) images = set() - + for item in data.get("items", []): spec = item.get("spec", {}) - + # Get images from containers for container in spec.get("containers", []): if image := container.get("image"): images.add(image) - + # Get images from init containers for container in spec.get("initContainers", []): if image := container.get("image"): images.add(image) - + return sorted(images) - + except json.JSONDecodeError as e: logger.error(f"Failed to parse kubectl output: {e}") return [] @@ -282,51 +282,51 @@ def sanitize_filename(image: str) -> str: def prepare_trivy_db(verbose: bool = False) -> bool: """ Prepare Trivy database by cleaning and downloading fresh DB. - + This should be run once before parallel scans to avoid race conditions and ensure all scans use the same database version. - + Steps: 1. trivy clean --all (clean existing db) 2. trivy image --download-db-only (download fresh db) - + Returns: True if successful """ # Step 1: Clean existing database if verbose: logger.info("Cleaning existing Trivy database...") - + clean_args = ["trivy", "clean", "--all"] if not verbose: clean_args.append("--quiet") - + clean_result = run_command(clean_args, timeout=60) if not clean_result.success: if verbose: logger.warning(f"Failed to clean Trivy cache (may not exist): {clean_result.stderr}") # Continue anyway - might be first run - + # Step 2: Download fresh database if verbose: logger.info("Downloading Trivy vulnerability database...") - + download_args = ["trivy", "image", "--download-db-only"] if not verbose: download_args.append("--quiet") - + download_result = run_command( download_args, timeout=300, # DB download can take a while ) - + if not download_result.success: logger.error(f"Failed to download Trivy database: {download_result.stderr}") return False - + if verbose: logger.info("Trivy database ready") - + return True @@ -338,11 +338,11 @@ def run_trivy_scan( ) -> tuple[bool, Optional[str]]: """ Run Trivy vulnerability scan on an image. - + Equivalent to: trivy image --scanners vuln --format json "$image" - + Note: Uses --skip-db-update since DB is pre-downloaded by prepare_trivy_db() - + Returns: Tuple of (success, error_message) """ @@ -355,13 +355,13 @@ def run_trivy_scan( "--timeout", f"{timeout}s", image, ] - + # Only suppress output when not in verbose mode if not verbose: args.insert(2, "--quiet") - + result = run_command(args, timeout=timeout + 30) - + if not result.success: error_msg = result.stderr.strip() if result.stderr else "Unknown error" if verbose: @@ -372,24 +372,24 @@ def run_trivy_scan( for line in result.stderr.strip().split('\n')[:10]: logger.error(f" {line}") return False, error_msg - + return True, None def extract_high_critical_cves(trivy_json_file: str) -> Set[str]: """ Extract HIGH and CRITICAL CVE IDs from Trivy JSON output. - - Equivalent to: jq -r '.Results[] | select(.Vulnerabilities != null) | - .Vulnerabilities[] | select(.Severity == "HIGH" or .Severity == "CRITICAL") | + + Equivalent to: jq -r '.Results[] | select(.Vulnerabilities != null) | + .Vulnerabilities[] | select(.Severity == "HIGH" or .Severity == "CRITICAL") | .VulnerabilityID' """ cves = set() - + try: with open(trivy_json_file) as f: data = json.load(f) - + for result in data.get("Results", []): vulns = result.get("Vulnerabilities") or [] for vuln in vulns: @@ -398,10 +398,10 @@ def extract_high_critical_cves(trivy_json_file: str) -> Set[str]: cve_id = vuln.get("VulnerabilityID") if cve_id: cves.add(cve_id) - + except (json.JSONDecodeError, FileNotFoundError) as e: logger.debug(f"Failed to parse Trivy output: {e}") - + return cves @@ -412,7 +412,7 @@ def find_triage_reference( ) -> Optional[str]: """ Find ORAS referrer containing triage.toml. - + Equivalent to bash: jq -r '.referrers[] | [.reference, (.annotations.content // "")] | @tsv' then checking if content contains "triage.toml" """ @@ -421,7 +421,7 @@ def find_triage_reference( ["oras", "discover", image, "--format", "json"], timeout=timeout, ) - + if not result.success: # Fallback with --plain-http result = run_command( @@ -432,22 +432,22 @@ def find_triage_reference( if verbose: logger.debug(f"No ORAS referrers found for {image}: {result.stderr}") return None - + try: data = json.loads(result.stdout) referrers = data.get("referrers", []) - + for ref in referrers: reference = ref.get("reference") annotations = ref.get("annotations", {}) content = annotations.get("content", "") - + if "triage.toml" in content: return reference - + except json.JSONDecodeError: pass - + return None @@ -460,7 +460,7 @@ def fetch_triage_toml( ) -> bool: """ Fetch triage.toml blob from ORAS. - + Equivalent to: oras manifest fetch $triage_reference > manifest.json manifest_digest=$(jq -r '.layers[0].digest' manifest.json) @@ -471,28 +471,28 @@ def fetch_triage_toml( ["oras", "manifest", "fetch", triage_reference], timeout=timeout, ) - + if not result.success: return False - + # Save manifest try: manifest = json.loads(result.stdout) with open(manifest_file, "w") as f: json.dump(manifest, f, indent=2) - + # Get layer digest layers = manifest.get("layers", []) if not layers: return False - + layer_digest = layers[0].get("digest") if not layer_digest: return False - + except (json.JSONDecodeError, KeyError, IndexError): return False - + # Fetch blob image_name = get_image_name(image) result = run_command( @@ -500,48 +500,48 @@ def fetch_triage_toml( "--output", output_file], timeout=timeout, ) - + if not result.success: return False - + # Verify file is not empty if not Path(output_file).exists() or Path(output_file).stat().st_size == 0: Path(output_file).unlink(missing_ok=True) return False - + return True def parse_triage_toml(triage_file: str) -> Set[str]: """ Parse CVE IDs from triage.toml file. - + Equivalent to bash: - grep -v '^[[:space:]]*#' "$triage_file" | - grep -o '\\(only: \\)?\\[trivy\\.[A-Z0-9\\-]*\\]' | - grep -o 'trivy\\.[A-Z0-9\\-]*' | + grep -v '^[[:space:]]*#' "$triage_file" | + grep -o '\\(only: \\)?\\[trivy\\.[A-Z0-9\\-]*\\]' | + grep -o 'trivy\\.[A-Z0-9\\-]*' | sed 's/trivy\\.//' """ cves = set() - + try: with open(triage_file) as f: content = f.read() - + # Remove comment lines - lines = [line for line in content.split("\n") + lines = [line for line in content.split("\n") if not line.strip().startswith("#")] content = "\n".join(lines) - + # Pattern: [trivy.CVE-XXXX-XXXX] or only: [trivy.CVE-XXXX-XXXX] pattern = r'\[trivy\.(CVE-[A-Z0-9\-]+)\]' - + for match in re.finditer(pattern, content): cves.add(match.group(1)) - + except FileNotFoundError: pass - + return cves @@ -558,16 +558,16 @@ def scan_image( image=image, image_ref=get_image_ref(image), ) - + # Create output directory image_filename = sanitize_filename(image) image_dir = output_dir / image_filename image_dir.mkdir(parents=True, exist_ok=True) result.output_dir = str(image_dir) - + # Save image reference (image_dir / "image.txt").write_text(result.image_ref) - + # Run Trivy scan trivy_output = image_dir / "cosign-scan.json" success, error_msg = run_trivy_scan(image, str(trivy_output), timeout, verbose) @@ -579,20 +579,20 @@ def scan_image( # Save error to file for debugging (image_dir / "error.txt").write_text(result.error) return result - + # Extract HIGH/CRITICAL CVEs result.high_critical_cves = extract_high_critical_cves(str(trivy_output)) (image_dir / "high-critical-cves.txt").write_text( "\n".join(sorted(result.high_critical_cves)) ) - + # Find and fetch triage.toml triage_ref = find_triage_reference(image, verbose=verbose) - + if triage_ref: triage_file = image_dir / "triage.toml" manifest_file = image_dir / "manifest.json" - + if fetch_triage_toml( image, triage_ref, str(triage_file), str(manifest_file) ): @@ -602,7 +602,7 @@ def scan_image( (image_dir / "triaged-cves.txt").write_text( "\n".join(sorted(result.triaged_cves)) ) - + result.success = True return result @@ -615,13 +615,13 @@ def generate_markdown_report( ) -> str: """ Generate beautiful Markdown vulnerability report. - + Equivalent to 3-gen-report.sh but with enhanced formatting. """ lines = [] successful = [r for r in results if r.success] failed = [r for r in results if not r.success] - + # Calculate totals first for summary total_unaddressed_cves = sum(len(r.unaddressed_cves) for r in successful) total_addressed_cves = sum(len(r.addressed_cves) for r in successful) @@ -629,17 +629,17 @@ def generate_markdown_report( images_with_unaddressed = sum(1 for r in successful if r.unaddressed_cves) images_with_missing_triage = sum(1 for r in successful if not r.has_triage) images_with_triage = sum(1 for r in successful if r.has_triage) - + # Header with status badge lines.append("# 🔍 ORAS SCAN CVE Analysis Summary") lines.append("") - + if total_unaddressed_cves == 0: lines.append("> 🎉 **All HIGH/CRITICAL CVEs have been addressed!**") else: lines.append(f"> ⚠️ **{total_unaddressed_cves} unaddressed CVEs across {images_with_unaddressed} images need attention**") lines.append("") - + # Scan info table lines.append("## 📊 Scan Overview") lines.append("") @@ -652,7 +652,7 @@ def generate_markdown_report( lines.append(f"| **Images with Triage** | {images_with_triage} |") lines.append(f"| **Images Missing Triage** | {images_with_missing_triage} |") lines.append("") - + # CVE Statistics lines.append("## 📈 CVE Statistics") lines.append("") @@ -662,11 +662,11 @@ def generate_markdown_report( lines.append(f"| ✅ **Addressed** | {total_addressed_cves} | HIGH/CRITICAL CVEs covered by triage |") lines.append(f"| ⚪ **Irrelevant** | {total_irrelevant_cves} | Triaged CVEs no longer detected |") lines.append("") - + # Main vulnerability table lines.append("## 🛡️ Image Analysis") lines.append("") - + # Filter info if filter_unaddressed and filter_missing_triage: lines.append("> **Filter:** Showing images with unaddressed CVEs OR missing triage files") @@ -675,23 +675,23 @@ def generate_markdown_report( elif filter_missing_triage: lines.append("> **Filter:** Showing only images with missing triage files") lines.append("") - + # Table header lines.append("| Image | Status | Unaddressed | Addressed | Triage |") lines.append("|-------|:------:|:-----------:|:---------:|:------:|") - + displayed_count = 0 hidden_clean_count = 0 # Images with no triage AND no CVEs - + for result in successful: has_unaddressed = len(result.unaddressed_cves) > 0 has_any_cves = len(result.high_critical_cves) > 0 - + # Hide images with no triage AND no vulnerabilities (nothing to report) if not result.has_triage and not has_any_cves: hidden_clean_count += 1 continue - + # Apply user filters show_entry = True if filter_unaddressed or filter_missing_triage: @@ -700,40 +700,40 @@ def generate_markdown_report( show_entry = True if filter_missing_triage and not result.has_triage: show_entry = True - + if not show_entry: continue - + displayed_count += 1 - + # Format image name (truncate if too long) image_name = result.image_ref if len(image_name) > 45: image_name = image_name[:42] + "..." - + # Status icon: 🔴 = has unaddressed CVEs, ✅ = all clear status = "🔴" if has_unaddressed else "✅" - + # CVE counts with formatting unaddressed_count = len(result.unaddressed_cves) addressed_count = len(result.addressed_cves) - + if unaddressed_count > 0: unaddressed_str = f"**{unaddressed_count}**" else: unaddressed_str = "0" - + triage_str = "✅" if result.has_triage else "❌" - + lines.append(f"| `{image_name}` | {status} | {unaddressed_str} | {addressed_count} | {triage_str} |") - + lines.append("") if hidden_clean_count > 0: lines.append(f"*Showing {displayed_count} of {len(successful)} images ({hidden_clean_count} clean images without triage hidden)*") else: lines.append(f"*Showing {displayed_count} of {len(successful)} images*") lines.append("") - + # Detailed CVE breakdown (if there are unaddressed CVEs) if total_unaddressed_cves > 0: lines.append("## 🔴 Unaddressed CVEs Detail") @@ -741,7 +741,7 @@ def generate_markdown_report( lines.append("
") lines.append("Click to expand CVE details") lines.append("") - + for result in successful: if result.unaddressed_cves: lines.append(f"### `{result.image_ref}`") @@ -749,10 +749,10 @@ def generate_markdown_report( for cve in sorted(result.unaddressed_cves): lines.append(f"- {cve}") lines.append("") - + lines.append("
") lines.append("") - + # Failed scans section if failed: lines.append("## ❌ Failed Scans") @@ -765,11 +765,11 @@ def generate_markdown_report( lines.append("") lines.append("") lines.append("") - + # Footer lines.append("---") lines.append("*Generated by `scanner-py oras-scan`*") - + return "\n".join(lines) @@ -782,27 +782,27 @@ def print_cli_summary( """Print beautiful summary to CLI matching the markdown report format.""" successful = [r for r in results if r.success] failed = [r for r in results if not r.success] - + total_unaddressed = sum(len(r.unaddressed_cves) for r in successful) total_addressed = sum(len(r.addressed_cves) for r in successful) total_irrelevant = sum(len(r.irrelevant_cves) for r in successful) images_with_triage = sum(1 for r in successful if r.has_triage) images_with_missing_triage = sum(1 for r in successful if not r.has_triage) images_with_unaddressed = sum(1 for r in successful if r.unaddressed_cves) - + print() print("━" * 100) print("🔍 VULNERABILITY TRIAGE REPORT") print("━" * 100) print() - + # Status banner if total_unaddressed == 0: print(" 🎉 All HIGH/CRITICAL CVEs have been addressed!") else: print(f" ⚠️ {total_unaddressed} unaddressed CVEs across {images_with_unaddressed} images need attention") print() - + # Scan Overview print("┌─────────────────────────────────────────────────────────────────────────────┐") print("│ 📊 SCAN OVERVIEW │") @@ -814,7 +814,7 @@ def print_cli_summary( print(f"│ Images Missing Triage: {images_with_missing_triage:<50} │") print("└─────────────────────────────────────────────────────────────────────────────┘") print() - + # CVE Statistics print("┌─────────────────────────────────────────────────────────────────────────────┐") print("│ 📈 CVE STATISTICS │") @@ -824,12 +824,12 @@ def print_cli_summary( print(f"│ ⚪ Irrelevant: {total_irrelevant:<8} Triaged CVEs no longer detected │") print("└─────────────────────────────────────────────────────────────────────────────┘") print() - + # Image Analysis Table print("┌─────────────────────────────────────────────────────────────────────────────────────────────────┐") print("│ 🛡️ IMAGE ANALYSIS │") print("├─────────────────────────────────────────────────────────────────────────────────────────────────┤") - + # Filter info if filter_unaddressed and filter_missing_triage: print("│ Filter: Showing images with unaddressed CVEs OR missing triage files │") @@ -837,24 +837,24 @@ def print_cli_summary( print("│ Filter: Showing only images with unaddressed CVEs │") elif filter_missing_triage: print("│ Filter: Showing only images with missing triage files │") - + print("├─────────────────────────────────────────────────────────────────────────────────────────────────┤") - + # Table header print(f"│ {'Image':<45} {'Status':^8} {'Unaddr':^8} {'Addr':^8} {'Triage':^8} │") print("├─────────────────────────────────────────────────────────────────────────────────────────────────┤") - + displayed = 0 hidden_clean = 0 for result in successful: has_unaddressed = len(result.unaddressed_cves) > 0 has_any_cves = len(result.high_critical_cves) > 0 - + # Hide images with no triage AND no vulnerabilities (nothing to report) if not result.has_triage and not has_any_cves: hidden_clean += 1 continue - + # Apply user filters show_entry = True if filter_unaddressed or filter_missing_triage: @@ -863,33 +863,33 @@ def print_cli_summary( show_entry = True if filter_missing_triage and not result.has_triage: show_entry = True - + if not show_entry: continue - + displayed += 1 - + # Truncate image name image_name = result.image_ref if len(image_name) > 43: image_name = image_name[:40] + "..." - + # Status icon: 🔴 = has unaddressed CVEs, ✅ = all clear status = "🔴" if has_unaddressed else "✅" - + unaddressed_count = len(result.unaddressed_cves) addressed_count = len(result.addressed_cves) triage_str = "✅" if result.has_triage else "❌" - + print(f"│ {image_name:<45} {status:^8} {unaddressed_count:^8} {addressed_count:^8} {triage_str:^8} │") - + print("└─────────────────────────────────────────────────────────────────────────────────────────────────┘") if hidden_clean > 0: print(f" Showing {displayed} of {len(successful)} images ({hidden_clean} clean images without triage hidden)") else: print(f" Showing {displayed} of {len(successful)} images") print() - + # Failed scans details if failed: print("┌─────────────────────────────────────────────────────────────────────────────┐") @@ -903,7 +903,7 @@ def print_cli_summary( print(f"│ ... and {len(failed) - 5} more │") print("└─────────────────────────────────────────────────────────────────────────────┘") print() - + # Unaddressed CVEs detail (if any) if total_unaddressed > 0 and total_unaddressed <= 20: print("┌─────────────────────────────────────────────────────────────────────────────┐") @@ -923,101 +923,101 @@ def print_cli_summary( def load_results_from_dir(output_dir: Path) -> List[ImageScanResult]: """Load scan results from existing output directory.""" results = [] - + if not output_dir.exists(): return results - + for image_dir in output_dir.iterdir(): if not image_dir.is_dir(): continue - + # Read image name image_file = image_dir / "image.txt" if not image_file.exists(): continue - + image_ref = image_file.read_text().strip() - + result = ImageScanResult( image=image_dir.name, # sanitized name image_ref=image_ref, output_dir=str(image_dir), ) - + # Read HIGH/CRITICAL CVEs cve_file = image_dir / "high-critical-cves.txt" if cve_file.exists(): content = cve_file.read_text().strip() if content: result.high_critical_cves = set(content.split("\n")) - + # Read triaged CVEs triaged_file = image_dir / "triaged-cves.txt" if triaged_file.exists(): content = triaged_file.read_text().strip() if content: result.triaged_cves = set(content.split("\n")) - + # Check triage file triage_file = image_dir / "triage.toml" if triage_file.exists() and triage_file.stat().st_size > 0: result.has_triage = True result.triage_file = str(triage_file) - + result.success = True results.append(result) - + return results def run_oras_scan(args: argparse.Namespace) -> int: """ Run triage scan - main entry point. - + Equivalent to running all three oras-scan bash scripts. """ # Setup logging log_level = LogLevel.VERBOSE if args.verbose else LogLevel.INFO setup_logging(log_level) - + output_dir = Path(args.output_dir) - + # Report-only mode: just generate report from existing results if args.report_only: print("📊 Generating report from existing scan results...") results = load_results_from_dir(output_dir) - + if not results: print(f"❌ No scan results found in {output_dir}", file=sys.stderr) return 1 - + print_cli_summary( results, namespace=args.namespace, filter_unaddressed=args.filter_unaddressed, filter_missing_triage=args.filter_missing_triage, ) - + report = generate_markdown_report( results, filter_unaddressed=args.filter_unaddressed, filter_missing_triage=args.filter_missing_triage, namespace=args.namespace, ) - + if args.output: Path(args.output).write_text(report) print(f"📄 Report saved to: {args.output}") - + return 0 - + # Check prerequisites for tool in ["kubectl", "trivy", "oras", "jq"]: result = run_command(["which", tool], timeout=5) if not result.success: print(f"❌ Missing required tool: {tool}", file=sys.stderr) return 1 - + # Print header print("🔍 ORAS Scan - Vulnerability Scanner") print() @@ -1026,23 +1026,23 @@ def run_oras_scan(args: argparse.Namespace) -> int: print(f" • Output: {args.output_dir}") print(f" • Parallel: {args.parallel}") print() - + # Step 1: Get images from Kubernetes spinner = Spinner("Getting images from Kubernetes...") spinner.spin() - + images = get_images_from_kubernetes( args.namespace, kubeconfig=args.kubeconfig, context=args.context, ) - + if not images: spinner.finish("No images found", success=False) return 1 - + spinner.finish(f"Found {len(images)} unique images") - + # Apply ignore patterns if args.ignore_file: ignore_patterns = load_ignore_patterns(args.ignore_file) @@ -1050,16 +1050,16 @@ def run_oras_scan(args: argparse.Namespace) -> int: images = filter_images(images, ignore_patterns) ignored = original_count - len(images) print(f"📂 Loaded {len(ignore_patterns)} ignore patterns ({ignored} images filtered)") - + if not images: print("⚠️ No images to scan after filtering") return 0 - + # Prepare output directory if output_dir.exists(): shutil.rmtree(output_dir) output_dir.mkdir(parents=True, exist_ok=True) - + # Prepare Trivy database (clean + download fresh) spinner = Spinner("Preparing Trivy vulnerability database...") spinner.spin() @@ -1067,32 +1067,32 @@ def run_oras_scan(args: argparse.Namespace) -> int: spinner.finish("Failed to prepare Trivy database", success=False) return 1 spinner.finish("Trivy database ready") - + # Step 2: Scan images print() results: List[ImageScanResult] = [] - + progress = ProgressBar( len(images), "Scanning images", ProgressStyle(width=30), ) - + with ThreadPoolExecutor(max_workers=args.parallel) as executor: futures = { executor.submit(scan_image, image, output_dir, args.timeout, args.verbose): image for image in images } - + for future in as_completed(futures): image = futures[future] try: result = future.result() results.append(result) - + status = "success" if result.success else "failed" progress.update(status=status, current_item=image.split("/")[-1]) - + except Exception as e: logger.error(f"Error scanning {image}: {e}") results.append(ImageScanResult( @@ -1101,9 +1101,9 @@ def run_oras_scan(args: argparse.Namespace) -> int: error=str(e), )) progress.update(status="failed", current_item=image.split("/")[-1]) - + progress.finish() - + # Print CLI summary print_cli_summary( results, @@ -1111,7 +1111,7 @@ def run_oras_scan(args: argparse.Namespace) -> int: filter_unaddressed=args.filter_unaddressed, filter_missing_triage=args.filter_missing_triage, ) - + # Step 3: Generate report (unless scan-only mode) if not args.scan_only: report = generate_markdown_report( @@ -1120,16 +1120,15 @@ def run_oras_scan(args: argparse.Namespace) -> int: filter_missing_triage=args.filter_missing_triage, namespace=args.namespace, ) - + if args.output: Path(args.output).write_text(report) print(f"📄 Report saved to: {args.output}") else: print() print(report) - + print() print("✅ ORAS scan completed!") - - return 0 + return 0 diff --git a/scanner_py/cli/verify_chainguard.py b/scanner_py/cli/verify_chainguard.py index 78cbdc0..8697c3a 100644 --- a/scanner_py/cli/verify_chainguard.py +++ b/scanner_py/cli/verify_chainguard.py @@ -145,20 +145,20 @@ def run_chainguard(args: argparse.Namespace) -> int: print("📋 RESULTS") print("━" * 50) print() - + base_display = result.base_image or "unknown" if len(base_display) > 45: base_display = "..." + base_display[-42:] - + print(f" Base Image: {base_display}") - + cg_status = "✅ Yes" if result.is_chainguard else "❌ No" print(f" Is Chainguard: {cg_status}") - + sig_status = "✅ Verified" if result.signature_verified else "❌ Not verified" print(f" Signature: {sig_status}") print() - + if result.is_chainguard and result.signature_verified: print("🎉 Image is built on a verified Chainguard base!") elif result.is_chainguard: @@ -169,7 +169,7 @@ def run_chainguard(args: argparse.Namespace) -> int: print(" Use --output-level verbose for details") else: print("ℹ️ Image does not use a Chainguard base") - + elif args.output_level == "none": # Machine-readable output for automation print(f"IS_CHAINGUARD={str(result.is_chainguard).lower()}") diff --git a/scanner_py/config/cosign-ignore.yaml b/scanner_py/config/cosign-ignore.yaml index 2623193..6bf1472 100644 --- a/scanner_py/config/cosign-ignore.yaml +++ b/scanner_py/config/cosign-ignore.yaml @@ -7,4 +7,4 @@ proxy-ghcr/cloudnative-pg/postgresql brancz/kube-rbac-proxy proxy-ghcr/zitadel/zitadel prometheuscommunity/postgres-exporter -proxy-ghcr/zitadel/zitadel \ No newline at end of file +proxy-ghcr/zitadel/zitadel diff --git a/scanner_py/config/oras-ignore.yaml b/scanner_py/config/oras-ignore.yaml index 06c53ea..5cf92ba 100644 --- a/scanner_py/config/oras-ignore.yaml +++ b/scanner_py/config/oras-ignore.yaml @@ -19,7 +19,6 @@ temporalio/ui cloudnative-pg/postgresql cloudnative-pg/cloudnative-pg bitnamilegacy/mlflow -# Using the new build/push action job inference-images/model-downloader document-index-mcp/document-index-mcp container-images/finetuning_api @@ -28,4 +27,4 @@ pharia-numinous-images/mcp-code-sandbox assistant-applications/generate-template assistant-applications/summary-template pharia-os-images/pharia-os-manager -pharia-os-images/phariaos-applications-proxy \ No newline at end of file +pharia-os-images/phariaos-applications-proxy diff --git a/scanner_py/core/__init__.py b/scanner_py/core/__init__.py index 13bdc59..7dc4b0f 100644 --- a/scanner_py/core/__init__.py +++ b/scanner_py/core/__init__.py @@ -16,4 +16,3 @@ "KubernetesImageExtractor", "ScanCache", ] - diff --git a/scanner_py/core/attestation.py b/scanner_py/core/attestation.py index a5af8c2..238ee17 100644 --- a/scanner_py/core/attestation.py +++ b/scanner_py/core/attestation.py @@ -172,13 +172,13 @@ def list_attestations( # Extract predicate types from bundles IN PARALLEL # This is a major performance improvement - each bundle fetch takes ~1-2 sec predicate_types: Dict[str, int] = {} - + def fetch_predicate_type(ref: dict) -> Optional[str]: ref_digest = ref.get("digest") if not ref_digest: return None return self._get_predicate_type_from_bundle(image, ref_digest) - + # Use ThreadPoolExecutor for parallel fetching max_workers = min(len(bundle_refs), 5) # Cap at 5 parallel requests with ThreadPoolExecutor(max_workers=max_workers) as executor: @@ -186,7 +186,7 @@ def fetch_predicate_type(ref: dict) -> Optional[str]: executor.submit(fetch_predicate_type, ref): ref for ref in bundle_refs } - + for future in as_completed(futures): try: pred_type = future.result() @@ -504,41 +504,41 @@ def extract_triage( class LegacyTriageExtractor: """ Extract legacy TOML triage files from container images using ORAS. - + This is equivalent to the oras-scan bash scripts that look for triage.toml files attached as ORAS referrers with "triage.toml" in the annotation content. - + The TOML format contains sections like: [trivy.CVE-2024-1234] reason = "Not applicable" """ - + def __init__(self, timeout: int = 60): """ Initialize the extractor. - + Args: timeout: Timeout for operations """ self.timeout = timeout - + def _get_image_name(self, full_image: str) -> str: """Get image name without version/digest.""" if "@" in full_image: return full_image.split("@")[0] return full_image.split(":")[0] - + def find_triage_reference(self, image: str) -> Optional[str]: """ Find ORAS referrer containing triage.toml. - + Equivalent to the bash script logic that searches for "triage.toml" in annotation content. - + Args: image: Image reference - + Returns: Reference digest or None """ @@ -547,7 +547,7 @@ def find_triage_reference(self, image: str) -> Optional[str]: ["oras", "discover", image, "--format", "json"], timeout=self.timeout, ) - + if not result.success: # Try with --plain-http as fallback result = run_command( @@ -557,36 +557,36 @@ def find_triage_reference(self, image: str) -> Optional[str]: if not result.success: logger.debug(f"Failed to discover referrers for {image}") return None - + try: data = json.loads(result.stdout) referrers = data.get("referrers", []) except json.JSONDecodeError: logger.debug("Failed to parse oras discover output") return None - + # Find referrer with triage.toml in annotation content for ref in referrers: annotations = ref.get("annotations", {}) content = annotations.get("content", "") - + if "triage.toml" in content: return ref.get("reference") - + return None - + def extract_triage_toml( self, image: str, output_file: str ) -> Optional[str]: """ Extract triage.toml file from an image. - + Equivalent to oras-scan/2-oras-scan.sh triage extraction. - + Args: image: Image reference output_file: Path to save triage.toml - + Returns: Path to extracted file or None """ @@ -594,24 +594,24 @@ def extract_triage_toml( if not triage_ref: logger.debug(f"No triage.toml found for {image}") return None - + # Fetch manifest to get layer digest result = run_command( ["oras", "manifest", "fetch", triage_ref], timeout=30, ) - + if not result.success: logger.debug(f"Failed to fetch manifest for {triage_ref}") return None - + try: manifest = json.loads(result.stdout) layer_digest = manifest["layers"][0]["digest"] except (json.JSONDecodeError, KeyError, IndexError): logger.debug("Failed to parse manifest") return None - + # Fetch the blob (triage.toml content) image_name = self._get_image_name(image) result = run_command( @@ -619,56 +619,56 @@ def extract_triage_toml( "--output", output_file], timeout=30, ) - + if not result.success: logger.debug(f"Failed to fetch triage blob") return None - + # Verify file is not empty if not Path(output_file).exists() or Path(output_file).stat().st_size == 0: Path(output_file).unlink(missing_ok=True) return None - + logger.debug(f"Successfully extracted triage.toml to {output_file}") return output_file - + def parse_triage_toml(self, triage_file: str) -> Set[str]: """ Parse CVE IDs from a triage.toml file. - + Equivalent to the bash grep patterns: grep -o '\\(only: \\)?\\[trivy\\.[A-Z0-9\\-]*\\]' | sed 's/trivy\\.//' - + Args: triage_file: Path to triage.toml file - + Returns: Set of CVE IDs (e.g., {"CVE-2024-1234", "CVE-2023-5678"}) """ cves: Set[str] = set() - + try: with open(triage_file, "r") as f: content = f.read() except FileNotFoundError: return cves - + # Pattern to match [trivy.CVE-XXXX-XXXX] sections # This matches: [trivy.CVE-2024-1234] or only: [trivy.CVE-2024-1234] pattern = r'\[trivy\.(CVE-[A-Z0-9\-]+)\]' - + for match in re.finditer(pattern, content): cves.add(match.group(1)) - + return cves - + def has_triage(self, image: str) -> bool: """ Check if image has a legacy triage.toml file. - + Args: image: Image reference - + Returns: True if triage.toml exists """ @@ -680,11 +680,11 @@ class TriageExtractor: Unified triage extractor supporting both formats: - Cosign attestation (JSON format) - Legacy TOML format (triage.toml via ORAS) - + This combines functionality from both cosign-based attestation extraction and the legacy oras-scan bash scripts. """ - + def __init__( self, certificate_oidc_issuer: str = AttestationExtractor.DEFAULT_OIDC_ISSUER, @@ -693,10 +693,10 @@ def __init__( ): """ Initialize the unified triage extractor. - + Args: certificate_oidc_issuer: OIDC issuer for cosign verification - certificate_identity_regexp: Identity regexp for cosign verification + certificate_identity_regexp: Identity regexp for cosign verification timeout: Timeout for operations """ self.cosign_extractor = AttestationExtractor( @@ -706,19 +706,19 @@ def __init__( ) self.legacy_extractor = LegacyTriageExtractor(timeout=timeout) self.timeout = timeout - + def extract_triage( self, image: str, output_dir: str ) -> Optional[Dict[str, Any]]: """ Extract triage data from an image, trying both formats. - + Tries cosign attestation first, then falls back to legacy TOML. - + Args: image: Image reference output_dir: Directory to save extracted files - + Returns: Dictionary with triage info: { @@ -730,19 +730,19 @@ def extract_triage( """ output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) - + # Try cosign attestation first (JSON format) cosign_file = output_path / "triage.json" if self.cosign_extractor.extract_triage(image, str(cosign_file)): try: with open(cosign_file) as f: triage_data = json.load(f) - + # Extract CVE IDs from cosign triage format # Format: {"predicate": {"trivy": {"CVE-ID": {...}, ...}}} trivy_data = triage_data.get("predicate", {}).get("trivy", {}) cve_ids = set(trivy_data.keys()) - + return { "format": "cosign", "file": str(cosign_file), @@ -750,27 +750,27 @@ def extract_triage( } except (json.JSONDecodeError, KeyError): logger.debug("Failed to parse cosign triage") - + # Fall back to legacy TOML format toml_file = output_path / "triage.toml" if self.legacy_extractor.extract_triage_toml(image, str(toml_file)): cve_ids = self.legacy_extractor.parse_triage_toml(str(toml_file)) - + return { "format": "toml", "file": str(toml_file), "cve_ids": cve_ids, } - + return None - + def has_triage(self, image: str) -> bool: """ Check if image has any triage (cosign or legacy). - + Args: image: Image reference - + Returns: True if triage exists in any format """ @@ -778,7 +778,6 @@ def has_triage(self, image: str) -> bool: attestations = self.cosign_extractor.list_attestations(image) if attestations.has_triage(): return True - + # Check legacy TOML return self.legacy_extractor.has_triage(image) - diff --git a/scanner_py/core/cache.py b/scanner_py/core/cache.py index 14aa945..5bd8d62 100644 --- a/scanner_py/core/cache.py +++ b/scanner_py/core/cache.py @@ -18,38 +18,38 @@ class DigestCache: """ Thread-safe in-memory cache for image digests. - + Eliminates redundant `crane digest` calls during a single scan run. Each call takes ~1-2 seconds, and we often call it 4-6 times per image. """ - + def __init__(self): self._cache: Dict[str, Optional[str]] = {} self._lock = threading.Lock() - + def get(self, image: str) -> Optional[str]: """Get cached digest for an image.""" with self._lock: return self._cache.get(image) - + def has(self, image: str) -> bool: """Check if digest is cached (including None values).""" with self._lock: return image in self._cache - + def set(self, image: str, digest: Optional[str]) -> None: """Cache a digest for an image.""" with self._lock: self._cache[image] = digest - + def get_or_fetch(self, image: str, timeout: int = 30) -> Optional[str]: """ Get cached digest or fetch and cache it. - + Args: image: Image reference timeout: Timeout for crane digest call - + Returns: Digest string or None """ @@ -58,19 +58,19 @@ def get_or_fetch(self, image: str, timeout: int = 30) -> Optional[str]: if cached: logger.debug(f"Digest cache hit for {image}") return cached - + # Fetch digest result = run_command(["crane", "digest", image], timeout=timeout) digest = result.stdout.strip() if result.success else None - + self.set(image, digest) return digest - + def clear(self) -> None: """Clear the cache.""" with self._lock: self._cache.clear() - + def stats(self) -> Dict[str, int]: """Get cache statistics.""" with self._lock: @@ -118,7 +118,7 @@ class CacheStats: class ScanCache: """ Cache for scan results, SBOMs, and attestations. - + Uses image digests as cache keys for accuracy. """ @@ -138,7 +138,7 @@ def __init__( """ if cache_dir is None: cache_dir = os.path.expanduser("~/.cache/k8s-image-scanner") - + self.cache_dir = Path(cache_dir) self.ttl_hours = ttl_hours self.ttl_seconds = ttl_hours * 3600 @@ -185,7 +185,7 @@ def _is_file_valid(self, filepath: Path) -> bool: """Check if a cached file is still valid (within TTL).""" if not filepath.exists(): return False - + file_age = time.time() - filepath.stat().st_mtime return file_age < self.ttl_seconds @@ -203,7 +203,7 @@ def get_attestation_type(self, image: str) -> Optional[str]: return None cache_file = self._get_image_cache_dir(image) / "attestation-type.json" - + if self._is_file_valid(cache_file): try: with open(cache_file) as f: @@ -214,7 +214,7 @@ def get_attestation_type(self, image: str) -> Optional[str]: return atype except (json.JSONDecodeError, FileNotFoundError): pass - + return None def set_attestation_type(self, image: str, attestation_type: str) -> None: @@ -263,11 +263,11 @@ def get_sbom(self, image: str, sbom_type: str = "cyclonedx") -> Optional[Path]: return None cache_file = self._get_image_cache_dir(image) / f"sbom-{sbom_type}.json" - + if self._is_file_valid(cache_file): logger.debug(f"Cache hit: SBOM ({sbom_type}) for {image}") return cache_file - + return None def set_sbom(self, image: str, sbom_file: str, sbom_type: str = "cyclonedx") -> None: @@ -303,11 +303,11 @@ def get_triage(self, image: str) -> Optional[Path]: return None cache_file = self._get_image_cache_dir(image) / "triage.json" - + if self._is_file_valid(cache_file): logger.debug(f"Cache hit: triage for {image}") return cache_file - + return None def set_triage(self, image: str, triage_file: str) -> None: @@ -358,7 +358,7 @@ def get_stats(self) -> CacheStats: for entry in self.cache_dir.iterdir(): if not entry.is_dir(): continue - + stats.total_images += 1 # Check attestation type @@ -430,4 +430,3 @@ def format_bytes(b: int) -> str: print() print("💡 Tip: Use --clear-cache to clear expired entries, " "or --no-cache to disable caching") - diff --git a/scanner_py/core/chainguard.py b/scanner_py/core/chainguard.py index 352bbcb..a34e07e 100644 --- a/scanner_py/core/chainguard.py +++ b/scanner_py/core/chainguard.py @@ -33,7 +33,7 @@ def to_dict(self) -> dict: class ChainguardVerifier: """ Verify if a Docker image is built using a Chainguard base image. - + Supports both public Chainguard images (cgr.dev/chainguard/*) and Aleph Alpha production images (cgr.dev/aleph-alpha.com/*). """ @@ -329,4 +329,3 @@ def print_result(self, result: ChainguardVerificationResult) -> None: print(f" - Signature Verified: {result.signature_verified}") if result.error: print(f" - Error: {result.error}") - diff --git a/scanner_py/core/kubernetes.py b/scanner_py/core/kubernetes.py index 4e60f83..5ccae6d 100644 --- a/scanner_py/core/kubernetes.py +++ b/scanner_py/core/kubernetes.py @@ -43,7 +43,7 @@ class ImageExtractionResult: class KubernetesImageExtractor: """ Extract container images from Kubernetes namespace. - + Discovers images from pods, deployments, daemonsets, statefulsets, jobs, and cronjobs. """ @@ -235,11 +235,11 @@ def _extract_registry(image: str) -> str: parts = image.split("/") if len(parts) == 1: return "docker.io" - + first = parts[0] if "." in first or ":" in first or first == "localhost": return first - + return "docker.io" def load_ignore_patterns(self, filepath: str) -> List[str]: @@ -265,4 +265,3 @@ def load_ignore_patterns(self, filepath: str) -> List[str]: if is_verbose(): logger.warning(f"Ignore file not found: {filepath}") return patterns - diff --git a/scanner_py/core/scanner.py b/scanner_py/core/scanner.py index aeb64aa..14c4501 100644 --- a/scanner_py/core/scanner.py +++ b/scanner_py/core/scanner.py @@ -218,7 +218,7 @@ def extract_cve_details(self, report_file: str) -> List[CVEDetails]: class ImageScanner: """ Complete image scanner combining attestation extraction and Trivy scanning. - + Main scanning logic equivalent to cosign-scan-image.sh """ diff --git a/scanner_py/core/verification.py b/scanner_py/core/verification.py index 865f4ac..455c9e0 100644 --- a/scanner_py/core/verification.py +++ b/scanner_py/core/verification.py @@ -26,7 +26,7 @@ class VerificationResult: class CosignVerifier: """ Verify container image signatures using cosign. - + Supports both keyless and key-based verification modes. """ @@ -172,4 +172,3 @@ def is_signed(self, image: str) -> bool: """ result = self.verify(image) return result.success - diff --git a/scanner_py/models/__init__.py b/scanner_py/models/__init__.py index 73eca26..9f8354a 100644 --- a/scanner_py/models/__init__.py +++ b/scanner_py/models/__init__.py @@ -17,4 +17,3 @@ "ScanSummary", "CVESeverity", ] - diff --git a/scanner_py/models/scan_result.py b/scanner_py/models/scan_result.py index e917278..93fb8d2 100644 --- a/scanner_py/models/scan_result.py +++ b/scanner_py/models/scan_result.py @@ -108,25 +108,25 @@ class ScanResult: skip_reason: Optional[str] = None error: Optional[str] = None metadata: Optional[ImageMetadata] = None - + # CVE counts critical_count: int = 0 high_count: int = 0 medium_count: int = 0 low_count: int = 0 triaged_count: int = 0 - + # CVE lists unaddressed_cves: List[str] = field(default_factory=list) addressed_cves: List[str] = field(default_factory=list) irrelevant_cves: List[str] = field(default_factory=list) cve_details: List[CVEDetails] = field(default_factory=list) - + # Chainguard info is_chainguard: bool = False base_image: str = "unknown" signature_verified: bool = False - + # File paths output_dir: Optional[str] = None sbom_file: Optional[str] = None @@ -178,7 +178,7 @@ class ScanSummary: skipped_scans: int = 0 format: str = "table" severity_filter: str = "HIGH,CRITICAL" - + # Results successful_images: List[str] = field(default_factory=list) failed_images: List[Dict[str, str]] = field(default_factory=list) @@ -214,4 +214,3 @@ def save(self, filepath: str) -> None: """Save summary to file.""" with open(filepath, "w") as f: f.write(self.to_json()) - diff --git a/scanner_py/pyproject.toml b/scanner_py/pyproject.toml index 01e09c5..dfffc59 100644 --- a/scanner_py/pyproject.toml +++ b/scanner_py/pyproject.toml @@ -84,4 +84,3 @@ disallow_untyped_defs = true testpaths = ["tests"] python_files = ["test_*.py"] addopts = "-v --tb=short" - diff --git a/scanner_py/requirements.txt b/scanner_py/requirements.txt index 56d7b27..72b4c22 100644 --- a/scanner_py/requirements.txt +++ b/scanner_py/requirements.txt @@ -35,4 +35,3 @@ # black>=23.0.0 # mypy>=1.0.0 # ruff>=0.1.0 - diff --git a/scanner_py/utils/logging.py b/scanner_py/utils/logging.py index a41e6ec..f8e1ca0 100644 --- a/scanner_py/utils/logging.py +++ b/scanner_py/utils/logging.py @@ -85,11 +85,11 @@ def filter(self, record: logging.LogRecord) -> bool: # Always show CRITICAL if record.levelno >= logging.CRITICAL: return True - + # In non-verbose mode, suppress ERROR and WARNING if not _verbose_mode and record.levelno in (logging.ERROR, logging.WARNING): return False - + return True @@ -107,7 +107,7 @@ def setup_logging( show_errors: Whether to show errors (when False, errors only in verbose) """ global _verbose_mode - + # Register custom levels logging.addLevelName(STEP, "STEP") logging.addLevelName(RESULT, "RESULT") @@ -167,9 +167,9 @@ def get_logger(name: str = "scanner_py") -> logging.Logger: def suppress_logging() -> int: """ Temporarily suppress all logging output. - + Returns the previous log level so it can be restored. - + Returns: Previous log level """ @@ -182,7 +182,7 @@ def suppress_logging() -> int: def restore_logging(previous_level: int) -> None: """ Restore logging to a previous level. - + Args: previous_level: Log level to restore """ diff --git a/scanner_py/utils/progress.py b/scanner_py/utils/progress.py index 9eb4e00..a1abdd6 100644 --- a/scanner_py/utils/progress.py +++ b/scanner_py/utils/progress.py @@ -26,7 +26,7 @@ class ProgressStyle: class ProgressBar: """ A beautiful progress bar for terminal output. - + Supports: - Percentage display - Item counts @@ -55,7 +55,7 @@ def __init__( self.description = description self.style = style or ProgressStyle() self.file = file or sys.stderr - + self.current = 0 self.start_time = time.time() self.success_count = 0 @@ -80,7 +80,7 @@ def _get_eta(self) -> str: """Calculate estimated time remaining.""" if self.current == 0: return "calculating..." - + elapsed = time.time() - self.start_time rate = self.current / elapsed remaining = (self.total - self.current) / rate if rate > 0 else 0 @@ -110,7 +110,7 @@ def update( current_item: Current item being processed """ self.current += n - + if status == "success": self.success_count += 1 elif status == "failed": @@ -153,22 +153,22 @@ def _render(self, current_item: str = "") -> None: counts.append(f"\033[93m🚫{self.skipped_count}\033[0m") else: counts.append(f"🚫{self.skipped_count}") - + status_str = " ".join(counts) # Build line parts = [] if self.description: parts.append(self.description) - + parts.append(f"[{bar}]") - + if self.style.show_percentage: parts.append(f"{progress * 100:5.1f}%") - + if self.style.show_count: parts.append(f"({self.current}/{self.total})") - + if status_str: parts.append(status_str) @@ -180,16 +180,16 @@ def _render(self, current_item: str = "") -> None: parts.append(current_item) line = " ".join(parts) - + # Clear and print using ANSI escape codes self._clear_line() - + if self._is_tty(): # \r moves to beginning, line already cleared by _clear_line self.file.write(f"\r{line}") else: self.file.write(f"{line}\n") - + self.file.flush() self._last_line_length = len(line) + 10 # Extra padding for ANSI codes @@ -201,23 +201,23 @@ def finish(self, message: str = "") -> None: message: Final message to display """ self._clear_line() - + # Final status elapsed = time.time() - self.start_time elapsed_str = self._format_time(elapsed) - + parts = [] if self.description: parts.append(self.description) - + # Final icon if self.failed_count == 0: parts.append("✅") else: parts.append("⚠️") - + parts.append(f"Completed {self.current}/{self.total}") - + if self.success_count > 0: parts.append(f"({self.success_count} ✓") if self.failed_count > 0: @@ -225,12 +225,12 @@ def finish(self, message: str = "") -> None: if self.skipped_count > 0: parts.append(f", {self.skipped_count} 🚫") parts[-1] += ")" - + parts.append(f"in {elapsed_str}") - + if message: parts.append(f"- {message}") - + line = " ".join(parts) self.file.write(f"{line}\n") self.file.flush() @@ -267,7 +267,7 @@ def spin(self) -> None: """Advance spinner by one frame.""" if not self._is_tty(): return - + char = self.style.spinner_chars[self._frame % len(self.style.spinner_chars)] # Use ANSI escape codes to clear line and write self.file.write(f"\r\033[K{char} {self.message}") @@ -284,7 +284,7 @@ def finish(self, message: str = "", success: bool = True) -> None: if self._is_tty(): # Use ANSI escape codes to clear the line self.file.write("\r\033[K") - + icon = "✅" if success else "❌" final_msg = message or self.message self.file.write(f"{icon} {final_msg}\n") @@ -316,7 +316,7 @@ def progress_context( class MultiProgress: """ Track progress for multiple parallel tasks. - + Each task shows individual progress while maintaining an overall view. """ @@ -341,7 +341,7 @@ def __init__( self.description = description self.style = style or ProgressStyle() self.file = file or sys.stderr - + self.total = len(tasks) self.completed = 0 self.results: dict = {} @@ -366,47 +366,47 @@ def complete_task( def _render(self) -> None: """Render progress.""" progress = self.completed / self.total if self.total > 0 else 0 - + success = sum(1 for s in self.results.values() if s == "success") failed = sum(1 for s in self.results.values() if s == "failed") skipped = sum(1 for s in self.results.values() if s == "skipped") - + # Build status string parts = [] if self.description: parts.append(self.description) - + parts.append(f"[{self.completed}/{self.total}]") parts.append(f"{progress * 100:.0f}%") - + if success > 0: parts.append(f"✓{success}") if failed > 0: parts.append(f"✗{failed}") if skipped > 0: parts.append(f"🚫{skipped}") - + line = " ".join(parts) - + if hasattr(self.file, 'isatty') and self.file.isatty(): self.file.write(f"\r{line}" + " " * 20) else: self.file.write(f"{line}\n") - + self.file.flush() def finish(self) -> None: """Finish and show summary.""" elapsed = time.time() - self.start_time - + success = sum(1 for s in self.results.values() if s == "success") failed = sum(1 for s in self.results.values() if s == "failed") skipped = sum(1 for s in self.results.values() if s == "skipped") - + # Clear line if hasattr(self.file, 'isatty') and self.file.isatty(): self.file.write("\r" + " " * 80 + "\r") - + icon = "✅" if failed == 0 else "⚠️" self.file.write( f"{icon} {self.description} completed: " @@ -414,4 +414,3 @@ def finish(self) -> None: f"({elapsed:.1f}s)\n" ) self.file.flush() - diff --git a/scanner_py/utils/registry.py b/scanner_py/utils/registry.py index 5348ce4..765068e 100644 --- a/scanner_py/utils/registry.py +++ b/scanner_py/utils/registry.py @@ -29,12 +29,12 @@ def extract_registry(image: str) -> str: parts = image.split("/") if len(parts) == 1: return "docker.io" - + # Check if first part looks like a registry first = parts[0] if "." in first or ":" in first or first == "localhost": return first - + # Default to docker.io return "docker.io" @@ -54,14 +54,14 @@ def is_registry_accessible(self, image: str) -> bool: if registry in self._accessible_registries: logger.debug(f"Registry already known to be accessible: {registry}") return True - + if registry in self._inaccessible_registries: logger.debug(f"Registry already known to be inaccessible: {registry}") return False # Test accessibility logger.debug(f"Checking registry accessibility: {registry} using image: {image}") - + result = run_command( ["docker", "manifest", "inspect", image], timeout=30, @@ -100,4 +100,3 @@ def inaccessible_registries(self) -> Set[str]: def accessible_registries(self) -> Set[str]: """Get set of accessible registries.""" return self._accessible_registries.copy() - diff --git a/scanner_py/utils/subprocess.py b/scanner_py/utils/subprocess.py index 5db5810..511bdde 100644 --- a/scanner_py/utils/subprocess.py +++ b/scanner_py/utils/subprocess.py @@ -140,4 +140,3 @@ def check_prerequisites(tools: List[str]) -> List[str]: if not check_tool_available(tool): missing.append(tool) return missing -