From 25ea9db22cecd36610eb9d40e524490a563ffc24 Mon Sep 17 00:00:00 2001 From: Ilya Siamionau Date: Mon, 8 Sep 2025 18:58:49 +0200 Subject: [PATCH] CM-51935 - Fix pre-receive hook for bare repositories --- .../files_collector/commit_range_documents.py | 39 +++- tests/cli/files_collector/common.py | 38 ++++ .../test_commit_range_documents.py | 24 +-- .../test_pre_receive_commit_range.py | 173 ++++++++++++++++++ 4 files changed, 245 insertions(+), 29 deletions(-) create mode 100644 tests/cli/files_collector/common.py create mode 100644 tests/cli/files_collector/test_pre_receive_commit_range.py diff --git a/cycode/cli/files_collector/commit_range_documents.py b/cycode/cli/files_collector/commit_range_documents.py index fa7af193..ca11d9d6 100644 --- a/cycode/cli/files_collector/commit_range_documents.py +++ b/cycode/cli/files_collector/commit_range_documents.py @@ -107,7 +107,7 @@ def collect_commit_range_diff_documents( def calculate_pre_receive_commit_range(branch_update_details: str) -> Optional[str]: end_commit = _get_end_commit_from_branch_update_details(branch_update_details) - # branch is deleted, no need to perform scan + # the branch is deleted, no need to perform a scan if end_commit == consts.EMPTY_COMMIT_SHA: return None @@ -127,10 +127,21 @@ def _get_end_commit_from_branch_update_details(update_details: str) -> str: def _get_oldest_unupdated_commit_for_branch(commit: str) -> Optional[str]: - # get a list of commits by chronological order that are not in the remote repository yet + # get a list of commits by chronological order that is not in the remote repository yet # more info about rev-list command: https://git-scm.com/docs/git-rev-list repo = git_proxy.get_repo(os.getcwd()) - not_updated_commits = repo.git.rev_list(commit, '--topo-order', '--reverse', '--not', '--all') + + try: + not_updated_commits = repo.git.rev_list(commit, '--topo-order', '--reverse', '--not', '--all') + except git_proxy.get_git_command_error() as e: + # Handle case where the commit doesn't exist in the repository yet (e.g., first push to bare repo) + if 'bad object' in str(e) or 'unknown revision' in str(e): + logger.debug( + 'Commit does not exist in repository yet (likely first push), %s', {'commit': commit, 'error': str(e)} + ) + return None + # Re-raise other git errors + raise commits = not_updated_commits.splitlines() if not commits: @@ -311,9 +322,25 @@ def parse_commit_range_sast(commit_range: str, path: str) -> tuple[Optional[str] to_spec = consts.GIT_HEAD_COMMIT_REV try: - # Use rev_parse to resolve each specifier to its full commit SHA - from_commit_rev = repo.rev_parse(from_spec).hexsha - to_commit_rev = repo.rev_parse(to_spec).hexsha + # Use safe head reference resolution for HEAD references in bare repositories + if from_spec == consts.GIT_HEAD_COMMIT_REV: + safe_from_spec = get_safe_head_reference_for_diff(repo) + if safe_from_spec == consts.GIT_EMPTY_TREE_OBJECT: + logger.debug("Cannot resolve HEAD reference in bare repository for commit range '%s'", commit_range) + return None, None + from_commit_rev = repo.rev_parse(safe_from_spec).hexsha + else: + from_commit_rev = repo.rev_parse(from_spec).hexsha + + if to_spec == consts.GIT_HEAD_COMMIT_REV: + safe_to_spec = get_safe_head_reference_for_diff(repo) + if safe_to_spec == consts.GIT_EMPTY_TREE_OBJECT: + logger.debug("Cannot resolve HEAD reference in bare repository for commit range '%s'", commit_range) + return None, None + to_commit_rev = repo.rev_parse(safe_to_spec).hexsha + else: + to_commit_rev = repo.rev_parse(to_spec).hexsha + return from_commit_rev, to_commit_rev except git_proxy.get_git_command_error() as e: logger.warning("Failed to parse commit range '%s'", commit_range, exc_info=e) diff --git a/tests/cli/files_collector/common.py b/tests/cli/files_collector/common.py new file mode 100644 index 00000000..8ac8c347 --- /dev/null +++ b/tests/cli/files_collector/common.py @@ -0,0 +1,38 @@ +import os +import tempfile +from collections.abc import Generator +from contextlib import contextmanager + +from git import Repo + + +@contextmanager +def git_repository(path: str) -> Generator[Repo, None, None]: + """Context manager for Git repositories that ensures proper cleanup on Windows.""" + repo = Repo.init(path) + try: + yield repo + finally: + # Properly close the repository to release file handles + repo.close() + + +@contextmanager +def temporary_git_repository() -> Generator[tuple[str, Repo], None, None]: + """Combined context manager for temporary directory with Git repository.""" + with tempfile.TemporaryDirectory() as temp_dir, git_repository(temp_dir) as repo: + yield temp_dir, repo + + +def create_multiple_commits(repo: Repo, temp_dir: str, num_commits: int = 3) -> list: + """Helper function to create multiple commits in the repository.""" + commits = [] + for i in range(num_commits): + test_file = os.path.join(temp_dir, f'file{i}.py') + with open(test_file, 'w') as f: + f.write(f"print('file {i}')") + + repo.index.add([f'file{i}.py']) + commit = repo.index.commit(f'Commit {i + 1}') + commits.append(commit) + return commits diff --git a/tests/cli/files_collector/test_commit_range_documents.py b/tests/cli/files_collector/test_commit_range_documents.py index 9bf2474f..8b9f13ad 100644 --- a/tests/cli/files_collector/test_commit_range_documents.py +++ b/tests/cli/files_collector/test_commit_range_documents.py @@ -1,30 +1,8 @@ import os -import tempfile -from collections.abc import Generator -from contextlib import contextmanager - -from git import Repo from cycode.cli import consts from cycode.cli.files_collector.commit_range_documents import get_safe_head_reference_for_diff - - -@contextmanager -def git_repository(path: str) -> Generator[Repo, None, None]: - """Context manager for Git repositories that ensures proper cleanup on Windows.""" - repo = Repo.init(path) - try: - yield repo - finally: - # Properly close the repository to release file handles - repo.close() - - -@contextmanager -def temporary_git_repository() -> Generator[tuple[str, Repo], None, None]: - """Combined context manager for temporary directory with Git repository.""" - with tempfile.TemporaryDirectory() as temp_dir, git_repository(temp_dir) as repo: - yield temp_dir, repo +from tests.cli.files_collector.common import temporary_git_repository class TestGetSafeHeadReferenceForDiff: diff --git a/tests/cli/files_collector/test_pre_receive_commit_range.py b/tests/cli/files_collector/test_pre_receive_commit_range.py new file mode 100644 index 00000000..bb00fbe9 --- /dev/null +++ b/tests/cli/files_collector/test_pre_receive_commit_range.py @@ -0,0 +1,173 @@ +import os + +import pytest + +from cycode.cli import consts +from cycode.cli.files_collector.commit_range_documents import ( + _get_oldest_unupdated_commit_for_branch, + calculate_pre_receive_commit_range, + parse_commit_range_sast, +) +from tests.cli.files_collector.common import create_multiple_commits, temporary_git_repository + + +class TestParseCommitRangeSast: + """Test the SAST commit range parsing with bare repository support.""" + + @pytest.mark.parametrize( + ('commit_range', 'description'), + [ + ('HEAD', 'single HEAD reference'), + ('..HEAD', 'range ending with HEAD'), + ('HEAD..', 'range starting with HEAD'), + ('HEAD..HEAD', 'range with both HEAD references'), + ], + ) + def test_returns_none_for_head_references_in_bare_repository(self, commit_range: str, description: str) -> None: + """Test that HEAD references return None in bare repositories.""" + with temporary_git_repository() as (temp_dir, repo): + from_commit, to_commit = parse_commit_range_sast(commit_range, temp_dir) + + # Should return None for bare repositories since HEAD doesn't exist + assert from_commit is None + assert to_commit is None + + def test_works_correctly_with_head_references_when_commits_exist(self) -> None: + """Test that HEAD references work correctly when the repository has commits.""" + with temporary_git_repository() as (temp_dir, repo): + test_file = os.path.join(temp_dir, 'test.py') + with open(test_file, 'w') as f: + f.write("print('initial')") + + repo.index.add(['test.py']) + initial_commit = repo.index.commit('Initial commit') + + commit_range = initial_commit.hexsha # This gets interpreted as 'commit..HEAD' + from_commit, to_commit = parse_commit_range_sast(commit_range, temp_dir) + + # Should successfully resolve both commits + assert from_commit is not None + assert to_commit is not None + assert to_commit == initial_commit.hexsha # HEAD should resolve to the latest commit + + def test_handles_explicit_commit_ranges_correctly(self) -> None: + """Test that explicit commit ranges (no HEAD) work correctly.""" + with temporary_git_repository() as (temp_dir, repo): + commits = create_multiple_commits(repo, temp_dir) + + commit_range = f'{commits[0].hexsha}..{commits[2].hexsha}' + from_commit, to_commit = parse_commit_range_sast(commit_range, temp_dir) + + assert from_commit == commits[0].hexsha + assert to_commit == commits[2].hexsha + + def test_handles_three_dot_ranges_correctly(self) -> None: + """Test that three-dot commit ranges work correctly.""" + with temporary_git_repository() as (temp_dir, repo): + test_file = os.path.join(temp_dir, 'test.py') + with open(test_file, 'w') as f: + f.write("print('first')") + + repo.index.add(['test.py']) + first_commit = repo.index.commit('First commit') + + with open(test_file, 'w') as f: + f.write("print('second')") + + repo.index.add(['test.py']) + second_commit = repo.index.commit('Second commit') + + # Test three-dot range + commit_range = f'{first_commit.hexsha}...{second_commit.hexsha}' + + from_commit, to_commit = parse_commit_range_sast(commit_range, temp_dir) + + assert from_commit == first_commit.hexsha + assert to_commit == second_commit.hexsha + + +class TestGetOldestUnupdatedCommitForBranch: + """Test the oldest unupdated commit function with bare repository support.""" + + def test_returns_none_for_nonexistent_commit_in_bare_repository(self) -> None: + """Test that function returns None for non-existent commits in bare repositories.""" + with temporary_git_repository() as (temp_dir, repo): + # Use a fake commit SHA that doesn't exist + fake_commit_sha = '9cf90954ef26e7c58284f8ebf7dcd0fcf711152a' + + original_cwd = os.getcwd() + os.chdir(temp_dir) + + try: + # Should handle missing commit gracefully + result = _get_oldest_unupdated_commit_for_branch(fake_commit_sha) + assert result is None + finally: + os.chdir(original_cwd) + + def test_works_correctly_with_existing_commits(self) -> None: + """Test that the function works correctly when commits exist.""" + with temporary_git_repository() as (temp_dir, repo): + commits = create_multiple_commits(repo, temp_dir) + + original_cwd = os.getcwd() + os.chdir(temp_dir) + + try: + # Test with an existing commit + result = _get_oldest_unupdated_commit_for_branch(commits[-1].hexsha) + # Result depends on repository state, but should not crash + assert isinstance(result, (str, type(None))) + finally: + os.chdir(original_cwd) + + +class TestCalculatePreReceiveCommitRange: + """Test the pre-receive commit range calculation with bare repository support.""" + + def test_handles_first_push_to_bare_repository(self) -> None: + """Test the first push scenario (old commit is all zeros).""" + with temporary_git_repository() as (temp_dir, repo): + # Simulate the first push: old_commit=zeros, new_commit=actual_sha + zero_commit = consts.EMPTY_COMMIT_SHA + new_commit_sha = '9cf90954ef26e7c58284f8ebf7dcd0fcf711152a' + branch_update_details = f'{zero_commit} {new_commit_sha} refs/heads/main' + + original_cwd = os.getcwd() + os.chdir(temp_dir) + + try: + # Should handle the first push gracefully + commit_range = calculate_pre_receive_commit_range(branch_update_details) + # For the first push to bare repo, this typically returns None + assert commit_range is None or isinstance(commit_range, str) + finally: + os.chdir(original_cwd) + + def test_handles_branch_deletion(self) -> None: + """Test branch deletion scenario (new commit is all zeros).""" + old_commit_sha = '9cf90954ef26e7c58284f8ebf7dcd0fcf711152a' + zero_commit = consts.EMPTY_COMMIT_SHA + branch_update_details = f'{old_commit_sha} {zero_commit} refs/heads/feature' + + # Should return None for branch deletion + commit_range = calculate_pre_receive_commit_range(branch_update_details) + assert commit_range is None + + def test_normal_push_scenario(self) -> None: + """Test a normal push scenario with existing commits.""" + with temporary_git_repository() as (temp_dir, repo): + commits = create_multiple_commits(repo, temp_dir, num_commits=2) + + # Simulate normal push + branch_update_details = f'{commits[0].hexsha} {commits[1].hexsha} refs/heads/main' + + original_cwd = os.getcwd() + os.chdir(temp_dir) + + try: + commit_range = calculate_pre_receive_commit_range(branch_update_details) + # Should work without errors + assert commit_range is None or isinstance(commit_range, str) + finally: + os.chdir(original_cwd)