Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions cycode/cli/files_collector/commit_range_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def collect_commit_range_diff_documents(
def calculate_pre_receive_commit_range(branch_update_details: str) -> Optional[str]:
end_commit = _get_end_commit_from_branch_update_details(branch_update_details)

# branch is deleted, no need to perform scan
# the branch is deleted, no need to perform a scan
if end_commit == consts.EMPTY_COMMIT_SHA:
return None

Expand All @@ -127,10 +127,21 @@ def _get_end_commit_from_branch_update_details(update_details: str) -> str:


def _get_oldest_unupdated_commit_for_branch(commit: str) -> Optional[str]:
# get a list of commits by chronological order that are not in the remote repository yet
# get a list of commits by chronological order that is not in the remote repository yet
# more info about rev-list command: https://git-scm.com/docs/git-rev-list
repo = git_proxy.get_repo(os.getcwd())
not_updated_commits = repo.git.rev_list(commit, '--topo-order', '--reverse', '--not', '--all')

try:
not_updated_commits = repo.git.rev_list(commit, '--topo-order', '--reverse', '--not', '--all')
except git_proxy.get_git_command_error() as e:
# Handle case where the commit doesn't exist in the repository yet (e.g., first push to bare repo)
if 'bad object' in str(e) or 'unknown revision' in str(e):
logger.debug(
'Commit does not exist in repository yet (likely first push), %s', {'commit': commit, 'error': str(e)}
)
return None
# Re-raise other git errors
raise

commits = not_updated_commits.splitlines()
if not commits:
Expand Down Expand Up @@ -311,9 +322,25 @@ def parse_commit_range_sast(commit_range: str, path: str) -> tuple[Optional[str]
to_spec = consts.GIT_HEAD_COMMIT_REV

try:
# Use rev_parse to resolve each specifier to its full commit SHA
from_commit_rev = repo.rev_parse(from_spec).hexsha
to_commit_rev = repo.rev_parse(to_spec).hexsha
# Use safe head reference resolution for HEAD references in bare repositories
if from_spec == consts.GIT_HEAD_COMMIT_REV:
safe_from_spec = get_safe_head_reference_for_diff(repo)
if safe_from_spec == consts.GIT_EMPTY_TREE_OBJECT:
logger.debug("Cannot resolve HEAD reference in bare repository for commit range '%s'", commit_range)
return None, None
from_commit_rev = repo.rev_parse(safe_from_spec).hexsha
else:
from_commit_rev = repo.rev_parse(from_spec).hexsha

if to_spec == consts.GIT_HEAD_COMMIT_REV:
safe_to_spec = get_safe_head_reference_for_diff(repo)
if safe_to_spec == consts.GIT_EMPTY_TREE_OBJECT:
logger.debug("Cannot resolve HEAD reference in bare repository for commit range '%s'", commit_range)
return None, None
to_commit_rev = repo.rev_parse(safe_to_spec).hexsha
else:
to_commit_rev = repo.rev_parse(to_spec).hexsha

return from_commit_rev, to_commit_rev
except git_proxy.get_git_command_error() as e:
logger.warning("Failed to parse commit range '%s'", commit_range, exc_info=e)
Expand Down
38 changes: 38 additions & 0 deletions tests/cli/files_collector/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
import tempfile
from collections.abc import Generator
from contextlib import contextmanager

from git import Repo


@contextmanager
def git_repository(path: str) -> Generator[Repo, None, None]:
"""Context manager for Git repositories that ensures proper cleanup on Windows."""
repo = Repo.init(path)
try:
yield repo
finally:
# Properly close the repository to release file handles
repo.close()


@contextmanager
def temporary_git_repository() -> Generator[tuple[str, Repo], None, None]:
"""Combined context manager for temporary directory with Git repository."""
with tempfile.TemporaryDirectory() as temp_dir, git_repository(temp_dir) as repo:
yield temp_dir, repo


def create_multiple_commits(repo: Repo, temp_dir: str, num_commits: int = 3) -> list:
"""Helper function to create multiple commits in the repository."""
commits = []
for i in range(num_commits):
test_file = os.path.join(temp_dir, f'file{i}.py')
with open(test_file, 'w') as f:
f.write(f"print('file {i}')")

repo.index.add([f'file{i}.py'])
commit = repo.index.commit(f'Commit {i + 1}')
commits.append(commit)
return commits
24 changes: 1 addition & 23 deletions tests/cli/files_collector/test_commit_range_documents.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,8 @@
import os
import tempfile
from collections.abc import Generator
from contextlib import contextmanager

from git import Repo

from cycode.cli import consts
from cycode.cli.files_collector.commit_range_documents import get_safe_head_reference_for_diff


@contextmanager
def git_repository(path: str) -> Generator[Repo, None, None]:
"""Context manager for Git repositories that ensures proper cleanup on Windows."""
repo = Repo.init(path)
try:
yield repo
finally:
# Properly close the repository to release file handles
repo.close()


@contextmanager
def temporary_git_repository() -> Generator[tuple[str, Repo], None, None]:
"""Combined context manager for temporary directory with Git repository."""
with tempfile.TemporaryDirectory() as temp_dir, git_repository(temp_dir) as repo:
yield temp_dir, repo
from tests.cli.files_collector.common import temporary_git_repository


class TestGetSafeHeadReferenceForDiff:
Expand Down
173 changes: 173 additions & 0 deletions tests/cli/files_collector/test_pre_receive_commit_range.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import os

import pytest

from cycode.cli import consts
from cycode.cli.files_collector.commit_range_documents import (
_get_oldest_unupdated_commit_for_branch,
calculate_pre_receive_commit_range,
parse_commit_range_sast,
)
from tests.cli.files_collector.common import create_multiple_commits, temporary_git_repository


class TestParseCommitRangeSast:
"""Test the SAST commit range parsing with bare repository support."""

@pytest.mark.parametrize(
('commit_range', 'description'),
[
('HEAD', 'single HEAD reference'),
('..HEAD', 'range ending with HEAD'),
('HEAD..', 'range starting with HEAD'),
('HEAD..HEAD', 'range with both HEAD references'),
],
)
def test_returns_none_for_head_references_in_bare_repository(self, commit_range: str, description: str) -> None:
"""Test that HEAD references return None in bare repositories."""
with temporary_git_repository() as (temp_dir, repo):
from_commit, to_commit = parse_commit_range_sast(commit_range, temp_dir)

# Should return None for bare repositories since HEAD doesn't exist
assert from_commit is None
assert to_commit is None

def test_works_correctly_with_head_references_when_commits_exist(self) -> None:
"""Test that HEAD references work correctly when the repository has commits."""
with temporary_git_repository() as (temp_dir, repo):
test_file = os.path.join(temp_dir, 'test.py')
with open(test_file, 'w') as f:
f.write("print('initial')")

repo.index.add(['test.py'])
initial_commit = repo.index.commit('Initial commit')

commit_range = initial_commit.hexsha # This gets interpreted as 'commit..HEAD'
from_commit, to_commit = parse_commit_range_sast(commit_range, temp_dir)

# Should successfully resolve both commits
assert from_commit is not None
assert to_commit is not None
assert to_commit == initial_commit.hexsha # HEAD should resolve to the latest commit

def test_handles_explicit_commit_ranges_correctly(self) -> None:
"""Test that explicit commit ranges (no HEAD) work correctly."""
with temporary_git_repository() as (temp_dir, repo):
commits = create_multiple_commits(repo, temp_dir)

commit_range = f'{commits[0].hexsha}..{commits[2].hexsha}'
from_commit, to_commit = parse_commit_range_sast(commit_range, temp_dir)

assert from_commit == commits[0].hexsha
assert to_commit == commits[2].hexsha

def test_handles_three_dot_ranges_correctly(self) -> None:
"""Test that three-dot commit ranges work correctly."""
with temporary_git_repository() as (temp_dir, repo):
test_file = os.path.join(temp_dir, 'test.py')
with open(test_file, 'w') as f:
f.write("print('first')")

repo.index.add(['test.py'])
first_commit = repo.index.commit('First commit')

with open(test_file, 'w') as f:
f.write("print('second')")

repo.index.add(['test.py'])
second_commit = repo.index.commit('Second commit')

# Test three-dot range
commit_range = f'{first_commit.hexsha}...{second_commit.hexsha}'

from_commit, to_commit = parse_commit_range_sast(commit_range, temp_dir)

assert from_commit == first_commit.hexsha
assert to_commit == second_commit.hexsha


class TestGetOldestUnupdatedCommitForBranch:
"""Test the oldest unupdated commit function with bare repository support."""

def test_returns_none_for_nonexistent_commit_in_bare_repository(self) -> None:
"""Test that function returns None for non-existent commits in bare repositories."""
with temporary_git_repository() as (temp_dir, repo):
# Use a fake commit SHA that doesn't exist
fake_commit_sha = '9cf90954ef26e7c58284f8ebf7dcd0fcf711152a'

original_cwd = os.getcwd()
os.chdir(temp_dir)

try:
# Should handle missing commit gracefully
result = _get_oldest_unupdated_commit_for_branch(fake_commit_sha)
assert result is None
finally:
os.chdir(original_cwd)

def test_works_correctly_with_existing_commits(self) -> None:
"""Test that the function works correctly when commits exist."""
with temporary_git_repository() as (temp_dir, repo):
commits = create_multiple_commits(repo, temp_dir)

original_cwd = os.getcwd()
os.chdir(temp_dir)

try:
# Test with an existing commit
result = _get_oldest_unupdated_commit_for_branch(commits[-1].hexsha)
# Result depends on repository state, but should not crash
assert isinstance(result, (str, type(None)))
finally:
os.chdir(original_cwd)


class TestCalculatePreReceiveCommitRange:
"""Test the pre-receive commit range calculation with bare repository support."""

def test_handles_first_push_to_bare_repository(self) -> None:
"""Test the first push scenario (old commit is all zeros)."""
with temporary_git_repository() as (temp_dir, repo):
# Simulate the first push: old_commit=zeros, new_commit=actual_sha
zero_commit = consts.EMPTY_COMMIT_SHA
new_commit_sha = '9cf90954ef26e7c58284f8ebf7dcd0fcf711152a'
branch_update_details = f'{zero_commit} {new_commit_sha} refs/heads/main'

original_cwd = os.getcwd()
os.chdir(temp_dir)

try:
# Should handle the first push gracefully
commit_range = calculate_pre_receive_commit_range(branch_update_details)
# For the first push to bare repo, this typically returns None
assert commit_range is None or isinstance(commit_range, str)
finally:
os.chdir(original_cwd)

def test_handles_branch_deletion(self) -> None:
"""Test branch deletion scenario (new commit is all zeros)."""
old_commit_sha = '9cf90954ef26e7c58284f8ebf7dcd0fcf711152a'
zero_commit = consts.EMPTY_COMMIT_SHA
branch_update_details = f'{old_commit_sha} {zero_commit} refs/heads/feature'

# Should return None for branch deletion
commit_range = calculate_pre_receive_commit_range(branch_update_details)
assert commit_range is None

def test_normal_push_scenario(self) -> None:
"""Test a normal push scenario with existing commits."""
with temporary_git_repository() as (temp_dir, repo):
commits = create_multiple_commits(repo, temp_dir, num_commits=2)

# Simulate normal push
branch_update_details = f'{commits[0].hexsha} {commits[1].hexsha} refs/heads/main'

original_cwd = os.getcwd()
os.chdir(temp_dir)

try:
commit_range = calculate_pre_receive_commit_range(branch_update_details)
# Should work without errors
assert commit_range is None or isinstance(commit_range, str)
finally:
os.chdir(original_cwd)
Loading