From 016aeaf84cb1be236ec53229f084d65d547cb1ae Mon Sep 17 00:00:00 2001 From: Indigo-10 Date: Wed, 16 Apr 2025 10:37:46 -0400 Subject: [PATCH] add stuff --- requirements.txt | 3 ++- utils.py | 66 +++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/requirements.txt b/requirements.txt index ada8285..0301e48 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ pydantic pyyaml python-whois concurrent -duckduckgo_search==8.0.0 \ No newline at end of file +duckduckgo_search==8.0.0 +bandit \ No newline at end of file diff --git a/utils.py b/utils.py index 01907f3..affb244 100644 --- a/utils.py +++ b/utils.py @@ -81,8 +81,38 @@ def scan_repo_with_trufflehog(url: str, os: str) -> list[str]: return findings +def scan_repo_with_bandit(repo_path: str) -> dict: + """ + Uses Bandit to scan a local directory for security issues + Takes a path string of the repo to scan, returns a dict of findings in JSON format + Relies on bandit being installed + """ + # Run the command and capture the output + result = subprocess.run( + [ + "bandit", "-f", "json", "-r", repo_path, + ], + capture_output=True, + text=True, + encoding="utf-8", + ) + + if result.returncode != 0: + print("Error running Bandit:", result.stderr) + return {} + + # Parse the JSON output + try: + findings = json.loads(result.stdout) + except json.JSONDecodeError as e: + print("Failed to parse Bandit output:", e) + return {} + + return findings + -def scan_repos(repos: list[str], max_workers: int, os: str) -> dict: + +def scan_repos(repos: list[str], max_workers: int, os_name: str) -> dict: """ Concurrently loops through a list of provided GitHub repos and scans them with trufflehog Takes a list of repos, and a max number of workers/threads to use @@ -90,36 +120,60 @@ def scan_repos(repos: list[str], max_workers: int, os: str) -> dict: { "": { "trufflehog_findings": [{finding1}, {finding2}] + "bandit_findings": [{finding1}, {finding2}] }, "": { "trufflehog_findings": [{finding1}, {finding2}] + "bandit_findings": [{finding1}, {finding2}] } } """ results = {} + def scan_repo(repo: str) -> dict: + """ + Helper function to scan a single repo with both tools + """ + # Clone the repo to a temporary directory + repo_name = repo.rstrip("/").split("/")[-1] + temp_dir = f"./temp/{repo_name}" + os.makedirs("./temp", exist_ok=True) + try: + subprocess.run(["git", "clone", repo, temp_dir], check=True) + except subprocess.CalledProcessError as e: + print(f"Error cloning repo {repo}: {e}") + return {} + + # Scan the repo with both tools + trufflehog_findings = scan_repo_with_trufflehog(repo, os_name) + bandit_findings = scan_repo_with_bandit(temp_dir) + subprocess.run(["rm", "-rf", temp_dir], check=True) + return { + "trufflehog_findings": trufflehog_findings, + "bandit_findings": bandit_findings, + } + # Using a ThreadPoolExecutor for concurrent execution with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_repo = { - executor.submit(scan_repo_with_trufflehog, repo, os): repo for repo in repos + executor.submit(scan_repo, repo): repo for repo in repos } + for future in tqdm( as_completed(future_to_repo), total=len(repos), desc="Scanning Repos" ): repo = future_to_repo[future] try: findings = future.result() - if findings: - results[repo] = {"trufflehog_findings": findings} - else: - results[repo] = {} + results[repo] = findings except Exception as exc: print(f"Error processing repo {repo}: {exc}") results[repo] = {} return results + class CustomJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime):