Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fedcode/activitypub.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
"purl-ap-profile": "purl_string",
"review-page": "review_id",
"repository-page": "repository_id",
"note-page": "note_id",
"note-page": "uuid",
"vulnerability-page": "vulnerability_id",
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,16 @@
# See https://github.com/nexB/federatedcode for support or download.
# See https://aboutcode.org for more information about AboutCode.org OSS projects.
#

from traceback import format_exc as traceback_format_exc

from django.core.management.base import BaseCommand
from django.core.management.base import CommandError

from fedcode.importer import Importer
from fedcode.models import FederateRequest
from fedcode.models import SyncRequest
from fedcode.signatures import FEDERATEDCODE_PRIVATE_KEY
from fedcode.signatures import HttpSignature


def sync_task():
"""
sync_task is a task to run the Importer and save the status
"""
for sync_r in SyncRequest.objects.all().order_by("created_at"):
if not sync_r.done:
try:
repo = sync_r.repo
repo.git_repo_obj.remotes.origin.pull()
importer = Importer(repo, repo.admin)
importer.run()
sync_r.done = True
except Exception as e:
sync_r.error_message = e
finally:
sync_r.save()


def send_fed_req_task():
"""
send_fed_req_task is a task to send the http signed request to the target and save the status of the request
Expand All @@ -53,11 +35,5 @@ def send_fed_req_task():


class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument("task", choices=["sync", "federate"])

def handle(self, *args, **options):
if options["task"] == "sync":
sync_task()
elif options["task"] == "federate":
send_fed_req_task()
send_fed_req_task()
83 changes: 83 additions & 0 deletions fedcode/management/commands/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# FederatedCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/federatedcode for support or download.
# See https://aboutcode.org for more information about AboutCode.org OSS projects.
#

from django.core.management.base import BaseCommand
from django.core.management.base import CommandError

from fedcode.pipelines import sync_scancode_scans
from fedcode.pipelines import sync_vulnerablecode

SYNC_REGISTRY = [
sync_scancode_scans.SyncScanCodeScans,
sync_vulnerablecode.SyncVulnerableCode,
]

SYNC_REGISTRY = {x.pipeline_id: x for x in SYNC_REGISTRY}


class Command(BaseCommand):
help = "Sync metadata from git repository"

def add_arguments(self, parser):
parser.add_argument(
"--list",
action="store_true",
help="List available pipelines",
)
parser.add_argument("--all", action="store_true", help="Sync all repo data.")

parser.add_argument("pipelines", nargs="*", help="Pipeline ID")

def handle(self, *args, **options):
try:
if options["list"]:
self.list_pipelines()
elif options["all"]:
self.import_data(pipelines=SYNC_REGISTRY.values())
else:
pipelines = options["pipelines"]
if not pipelines:
raise CommandError(
'Please provide at least one pipeline to execute or use "--all".'
)
self.import_data(validate_pipelines(pipelines))
except KeyboardInterrupt:
raise CommandError("Keyboard interrupt received. Stopping...")

def list_pipelines(self):
self.stdout.write("Metadata can be synced from the following pipelines:")
self.stdout.write("\n".join(SYNC_REGISTRY))

def import_data(self, pipelines):
"""Execute the given ``pipeline``."""
failed_pipelines = []

for pipeline in pipelines:
self.stdout.write(f"Syncing data using {pipeline.pipeline_id}")
status, error = pipeline().execute()
if status != 0:
self.stdout.write(error)
failed_pipelines.append(pipeline.pipeline_id)

if failed_pipelines:
raise CommandError(f"{len(failed_pipelines)} failed!: {','.join(failed_pipelines)}")


def validate_pipelines(pipelines):
validated_pipelines = []
unknown_pipelines = []
for pipeline in pipelines:
try:
validated_pipelines.append(SYNC_REGISTRY[pipeline])
except KeyError:
unknown_pipelines.append(pipeline)
if unknown_pipelines:
raise CommandError(f"Unknown pipelines: {unknown_pipelines}")

return validated_pipelines
97 changes: 97 additions & 0 deletions fedcode/pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# FederatedCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/federatedcode for support or download.
# See https://aboutcode.org for more information about AboutCode.org OSS projects.
#

import logging
from datetime import datetime
from datetime import timezone
from timeit import default_timer as timer

from aboutcode.pipeline import BasePipeline
from aboutcode.pipeline import humanize_time

module_logger = logging.getLogger(__name__)


class classproperty(object):
def __init__(self, fget):
self.fget = fget

def __get__(self, owner_self, owner_cls):
return self.fget(owner_cls)


class FederatedCodePipeline(BasePipeline):
pipeline_id = None # Unique Pipeline ID

def on_failure(self):
"""
Tasks to run in the event that pipeline execution fails.

Implement cleanup or other tasks that need to be performed
on pipeline failure, such as:
- Removing cloned repositories.
- Deleting downloaded archives.
"""
pass

def execute(self):
"""Execute each steps in the order defined on this pipeline class."""
self.log(f"Pipeline [{self.pipeline_name}] starting")

steps = self.pipeline_class.get_steps(groups=self.selected_groups)
steps_count = len(steps)
pipeline_start_time = timer()

for current_index, step in enumerate(steps, start=1):
step_name = step.__name__

if self.selected_steps and step_name not in self.selected_steps:
self.log(f"Step [{step_name}] skipped")
continue

self.set_current_step(f"{current_index}/{steps_count} {step_name}")
self.log(f"Step [{step_name}] starting")
step_start_time = timer()

try:
step(self)
except Exception as exception:
self.log("Pipeline failed")
on_failure_start_time = timer()
self.log(f"Running [on_failure] tasks")
self.on_failure()
on_failure_run_time = timer() - on_failure_start_time
self.log(f"Completed [on_failure] tasks in {humanize_time(on_failure_run_time)}")

return 1, self.output_from_exception(exception)

step_run_time = timer() - step_start_time
self.log(f"Step [{step_name}] completed in {humanize_time(step_run_time)}")

self.set_current_step("") # Reset the `current_step` field on completion
pipeline_run_time = timer() - pipeline_start_time
self.log(f"Pipeline completed in {humanize_time(pipeline_run_time)}")

return 0, ""

def log(self, message, level=logging.INFO):
"""Log the given `message` to the current module logger and execution_log."""
now_local = datetime.now(timezone.utc).astimezone()
timestamp = now_local.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
message = f"{timestamp} {message}"
module_logger.log(level, message)
self.append_to_log(message)

@classproperty
def pipeline_id(cls):
"""Return unique pipeline_id set in cls.pipeline_id"""

if cls.pipeline_id is None or cls.pipeline_id == "":
raise NotImplementedError("pipeline_id is not defined or is empty")
return cls.pipeline_id
112 changes: 112 additions & 0 deletions fedcode/pipelines/sync_scancode_scans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# FederatedCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/federatedcode for support or download.
# See https://aboutcode.org for more information about AboutCode.org OSS projects.
#

from pathlib import Path
from traceback import format_exc as traceback_format_exc

from aboutcode.pipeline import LoopProgress

from fedcode.models import Package
from fedcode.models import Repository
from fedcode.pipelines import FederatedCodePipeline
from fedcode.pipes import utils


class SyncScanCodeScans(FederatedCodePipeline):
"""Sync Package scans from FederatedCode git repositories."""

pipeline_id = "sync_scancode_scans"

@classmethod
def steps(cls):
return (
cls.get_git_repos,
cls.sync_scan_repositories,
)

def get_git_repos(self):
self.git_repos = Repository.objects.all()

def sync_scan_repositories(self):
repositories_count = self.git_repos.count()
self.log(f"Syncing package scans from {repositories_count:,d} repositories")

synced_package_scan_count = 0
progress = LoopProgress(total_iterations=repositories_count, logger=self.log)
for repository in progress.iter(self.git_repos.iterator(chunk_size=2000)):
repository.git_repo_obj.remotes.origin.pull()
synced_package_scan_count += sync_scancodeio_scan(
repository=repository,
logger=self.log,
)

self.log(f"Successfully synced {synced_package_scan_count:,d} package scans")


def sync_scancodeio_scan(repository, logger):
repo = repository.git_repo_obj
latest_commit_hash = repo.head.commit.hexsha
latest_commit = repo.commit(latest_commit_hash)

if last_commit_hash := repository.last_imported_commit:
last_imported_commit = repo.commit(last_commit_hash)
diffs = last_imported_commit.diff(latest_commit)
scans = [item for item in diffs if item.a_path.endswith("scancodeio.json")]
scan_count = sync_scan_from_diff(diffs=scans, repository=repository, logger=logger)
else:
scan_count = sync_all_scan(repository=repository, logger=logger)

repository.last_imported_commit = latest_commit_hash
repository.save()

return scan_count


def sync_scan_from_diff(diffs, repository, logger):
scans = [
item
for item in diffs
if item.a_path.endswith("scancodeio.json") or item.b_path.endswith("scancodeio.json")
]
scan_count = len(scans)

logger(f"Syncing {scan_count:,d} package scan from {repository.url}")
progress = LoopProgress(total_iterations=scan_count, logger=logger)
for scan in progress.iter(scans):
change_type = scan.change_type
if change_type in ("A", "M", "R"):
scan_path = scan.b_path
action = utils.create_note
elif change_type == "D":
scan_path = scan.a_path
action = utils.delete_note

purl = utils.package_metadata_path_to_purl(path=Path(scan_path), version=False)
package, _ = Package.objects.get_or_create(purl=str(purl), service=repository.admin)
note = utils.get_scan_note(path=Path(scan_path))
action(pkg=package, note_dict=note)
return scan_count


def sync_all_scan(repository, logger):
repo = repository.git_repo_obj
root = Path(repo.working_dir)
scan_count = sum(1 for _ in root.rglob("scancodeio.json"))

scans = root.rglob("scancodeio.json")
logger(f"Syncing {scan_count:,d} package scan from {repo.remotes.origin.url}")

progress = LoopProgress(total_iterations=scan_count, logger=logger)
for scan in progress.iter(scans):
relative_path = scan.relative_to(root)
purl = utils.package_metadata_path_to_purl(relative_path, version=False)
package, _ = Package.objects.get_or_create(purl=str(purl), service=repository.admin)
note = utils.get_scan_note(path=relative_path)
utils.create_note(pkg=package, note_dict=note)
return scan_count
Loading