Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ pydantic
pyyaml
python-whois
concurrent
duckduckgo_search==8.0.0
duckduckgo_search==8.0.0
bandit
66 changes: 60 additions & 6 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,45 +81,99 @@ def scan_repo_with_trufflehog(url: str, os: str) -> list[str]:

return findings

def scan_repo_with_bandit(repo_path: str) -> dict:
"""
Uses Bandit to scan a local directory for security issues
Takes a path string of the repo to scan, returns a dict of findings in JSON format
Relies on bandit being installed
"""
# Run the command and capture the output
result = subprocess.run(
[
"bandit", "-f", "json", "-r", repo_path,
],
capture_output=True,
text=True,
encoding="utf-8",
)

if result.returncode != 0:
print("Error running Bandit:", result.stderr)
return {}

# Parse the JSON output
try:
findings = json.loads(result.stdout)
except json.JSONDecodeError as e:
print("Failed to parse Bandit output:", e)
return {}

return findings


def scan_repos(repos: list[str], max_workers: int, os: str) -> dict:

def scan_repos(repos: list[str], max_workers: int, os_name: str) -> dict:
"""
Concurrently loops through a list of provided GitHub repos and scans them with trufflehog
Takes a list of repos, and a max number of workers/threads to use
Outputs a dict in the following format with any potential findings:
{
"<REPO URL1>": {
"trufflehog_findings": [{finding1}, {finding2}]
"bandit_findings": [{finding1}, {finding2}]
},
"<REPO URL2>": {
"trufflehog_findings": [{finding1}, {finding2}]
"bandit_findings": [{finding1}, {finding2}]
}
}
"""

results = {}

def scan_repo(repo: str) -> dict:
"""
Helper function to scan a single repo with both tools
"""
# Clone the repo to a temporary directory
repo_name = repo.rstrip("/").split("/")[-1]
temp_dir = f"./temp/{repo_name}"
os.makedirs("./temp", exist_ok=True)
try:
subprocess.run(["git", "clone", repo, temp_dir], check=True)
except subprocess.CalledProcessError as e:
print(f"Error cloning repo {repo}: {e}")
return {}

# Scan the repo with both tools
trufflehog_findings = scan_repo_with_trufflehog(repo, os_name)
bandit_findings = scan_repo_with_bandit(temp_dir)
subprocess.run(["rm", "-rf", temp_dir], check=True)
return {
"trufflehog_findings": trufflehog_findings,
"bandit_findings": bandit_findings,
}

# Using a ThreadPoolExecutor for concurrent execution
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_repo = {
executor.submit(scan_repo_with_trufflehog, repo, os): repo for repo in repos
executor.submit(scan_repo, repo): repo for repo in repos
}

for future in tqdm(
as_completed(future_to_repo), total=len(repos), desc="Scanning Repos"
):
repo = future_to_repo[future]
try:
findings = future.result()
if findings:
results[repo] = {"trufflehog_findings": findings}
else:
results[repo] = {}
results[repo] = findings
except Exception as exc:
print(f"Error processing repo {repo}: {exc}")
results[repo] = {}
return results



class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
Expand Down