diff --git a/CHANGELOG.md b/CHANGELOG.md index 173597a..2d9b015 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.33.0] - 2026-01-07 +### Added +- Support for following investigation methods: + - List investigations + - Get investigation details + - Fetch associated investigation + - Trigger investigation + ## [0.32.0] - 2026-01-05 ### Added - Support for classify log method diff --git a/CLI.md b/CLI.md index caadf2d..cea768f 100644 --- a/CLI.md +++ b/CLI.md @@ -983,6 +983,58 @@ secops case --ids "case-123,case-456" > **Note**: The case management uses a batch API that can retrieve multiple cases in a single request. You can provide up to 1000 case IDs separated by commas. +### Investigation Management + +Chronicle investigations provide automated analysis and recommendations for alerts and cases. Use these commands to list, retrieve, trigger, and fetch associated investigations. + +#### List investigations + +```bash +# List all investigations +secops investigation list + +# List with pagination +secops investigation list --page-size 50 + +# List with pagination token +secops investigation list --page-size 50 --page-token "token" +``` + +#### Get investigation details + +```bash +# Get a specific investigation by ID +secops investigation get --id "inv_123" +``` + +#### Trigger investigation for an alert + +```bash +# Trigger an investigation for a specific alert +secops investigation trigger --alert-id "alert_123" +``` + +#### Fetch associated investigations + +```bash +# Fetch investigations associated with specific alerts +secops investigation fetch-associated \ + --detection-type "ALERT" \ + --alert-ids "alert_123,alert_456" \ + --association-limit 5 + +# Fetch investigations associated with a case +secops investigation fetch-associated \ + --detection-type "CASE" \ + --case-ids "case_123" + +# Fetch with ordering +secops investigation fetch-associated \ + --detection-type "ALERT" \ + --alert-ids "alert_123" \ + --order-by "createTime desc" +``` + ### Data Export List available log types for export: diff --git a/README.md b/README.md index d904d76..f17373b 100644 --- a/README.md +++ b/README.md @@ -1403,6 +1403,92 @@ case = cases.get_case("case-id-1") > **Note**: The case management API uses the `legacy:legacyBatchGetCases` endpoint to retrieve multiple cases in a single request. You can retrieve up to 1000 cases in a single batch. +### Investigation Management + +Chronicle investigations provide automated analysis and recommendations for alerts and cases. The SDK provides methods to list, retrieve, trigger, and fetch associated investigations. + +#### List investigations + +Retrieve all investigations in your Chronicle instance: + +```python +# List all investigations +result = chronicle.list_investigations() +investigations = result.get("investigations", []) + +for inv in investigations: + print(f"Investigation: {inv['displayName']}") + print(f" Status: {inv.get('status', 'N/A')}") + print(f" Verdict: {inv.get('verdict', 'N/A')}") + +# List with pagination +result = chronicle.list_investigations(page_size=50, page_token="token") +``` + +#### Get investigation details + +Retrieve a specific investigation by its ID: + +```python +# Get investigation by ID +investigation = chronicle.get_investigation(investigation_id="inv_123") + +print(f"Name: {investigation['displayName']}") +print(f"Status: {investigation.get('status')}") +print(f"Verdict: {investigation.get('verdict')}") +print(f"Confidence: {investigation.get('confidence')}") +``` + +#### Trigger investigation for an alert + +Create a new investigation for a specific alert: + +```python +# Trigger investigation for an alert +investigation = chronicle.trigger_investigation(alert_id="alert_123") + +print(f"Investigation created: {investigation['name']}") +print(f"Status: {investigation.get('status')}") +print(f"Trigger type: {investigation.get('triggerType')}") +``` + +#### Fetch associated investigations + +Retrieve investigations associated with alerts or cases: + +```python +from secops.chronicle import DetectionType + +# Fetch investigations for specific alerts +result = chronicle.fetch_associated_investigations( + detection_type=DetectionType.ALERT, + alert_ids=["alert_123", "alert_456"], + association_limit_per_detection=5 +) + +# Process associations +associations_list = result.get("associationsList", {}) +for alert_id, data in associations_list.items(): + investigations = data.get("investigations", []) + print(f"Alert {alert_id}: {len(investigations)} investigation(s)") + + for inv in investigations: + print(f" - {inv['displayName']}: {inv.get('verdict', 'N/A')}") + +# Fetch investigations for cases +case_result = chronicle.fetch_associated_investigations( + detection_type=DetectionType.CASE, + case_ids=["case_123"], + association_limit_per_detection=3 +) + +# You can also use string values for detection_type +result = chronicle.fetch_associated_investigations( + detection_type="ALERT", # or "DETECTION_TYPE_ALERT" + alert_ids=["alert_123"] +) +``` + ### Generating UDM Key/Value Mapping Chronicle provides a feature to generate UDM key-value mapping for a given row log. diff --git a/api_module_mapping.md b/api_module_mapping.md index 7684b74..79d86f0 100644 --- a/api_module_mapping.md +++ b/api_module_mapping.md @@ -221,6 +221,10 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/ |ingestionLogLabels.list |v1alpha| | | |ingestionLogNamespaces.get |v1alpha| | | |ingestionLogNamespaces.list |v1alpha| | | +|investigations.fetchAssociated |v1alpha|chronicle.investigations.fetch_associated_investigations |secops investigation fetch-associated | +|investigations.get |v1alpha|chronicle.investigations.get_investigation |secops investigation get | +|investigations.list |v1alpha|chronicle.investigations.list_investigations |secops investigation list | +|investigations.trigger |v1alpha|chronicle.investigations.trigger_investigation |secops investigation trigger | |iocs.batchGet |v1alpha| | | |iocs.findFirstAndLastSeen |v1alpha| | | |iocs.get |v1alpha| | | diff --git a/examples/investigations_example.py b/examples/investigations_example.py new file mode 100644 index 0000000..d82d476 --- /dev/null +++ b/examples/investigations_example.py @@ -0,0 +1,386 @@ +#!/usr/bin/env python3 +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Example usage of the Google SecOps SDK for Investigations.""" + +import argparse +import json + +from secops import SecOpsClient +from secops.chronicle import DetectionType + + +def get_client(project_id, customer_id, region): + """Initialize and return the Chronicle client. + + Args: + project_id: Google Cloud Project ID. + customer_id: Chronicle Customer ID (UUID). + region: Chronicle region (us or eu). + + Returns: + Chronicle client instance. + """ + client = SecOpsClient() + chronicle = client.chronicle( + customer_id=customer_id, project_id=project_id, region=region + ) + return chronicle + + +def example_list_investigations(chronicle): + """Example 1: List Investigations.""" + print("\n=== Example 1: List Investigations ===") + + try: + response = chronicle.list_investigations(page_size=10) + + investigations = response.get("investigations", []) + total_size = response.get("totalSize", 0) + next_page_token = response.get("nextPageToken") + + print(f"\nFound {len(investigations)} investigation(s) in this page") + print(f"Total investigations matching request: {total_size}") + + if investigations: + print("\nFirst investigation details:") + sample = investigations[0] + print(f"Name: {sample.get('name')}") + print(f"Display Name: {sample.get('displayName', 'N/A')}") + print(f"Status: {sample.get('status', 'N/A')}") + print(f"Verdict: {sample.get('verdict', 'N/A')}") + print(f"Confidence: {sample.get('confidence', 'N/A')}") + print(f"Summary: {sample.get('summary', 'N/A')[:100]}...") + + investigation_id = sample.get("name", "").split("/")[-1] + print(f"Investigation ID: {investigation_id}") + + if next_page_token: + print(f"\nNext page token available: {next_page_token[:20]}...") + else: + print("No investigations found in your Chronicle instance.") + + except Exception as e: + print(f"Error listing investigations: {e}") + + +def example_get_investigation(chronicle, investigation_id): + """Example 2: Get Investigation by ID. + + Args: + chronicle: Chronicle client instance. + investigation_id: ID of the investigation to retrieve. + """ + print("\n=== Example 2: Get Investigation by ID ===") + + if not investigation_id: + print("No investigation ID provided.") + return + + try: + print(f"\nRetrieving investigation ID: {investigation_id}") + investigation = chronicle.get_investigation(investigation_id) + + print("\nInvestigation details:") + print(f"Name: {investigation.get('name')}") + print(f"Display Name: {investigation.get('displayName', 'N/A')}") + print(f"Status: {investigation.get('status', 'N/A')}") + print(f"Verdict: {investigation.get('verdict', 'N/A')}") + print(f"Confidence: {investigation.get('confidence', 'N/A')}") + print(f"Trigger Type: {investigation.get('triggerType', 'N/A')}") + + summary = investigation.get("summary", "") + if summary: + print(f"Summary: {summary[:200]}...") + + next_steps = investigation.get("nextSteps", []) + if next_steps: + print(f"\nRecommended next steps ({len(next_steps)}):") + for idx, step in enumerate(next_steps[:3], 1): + print(f" {idx}. {step.get('title', 'N/A')}") + + findings = investigation.get("findings", []) + if findings: + print(f"\nFindings: {len(findings)} finding(s)") + + entities = investigation.get("entities", []) + if entities: + print(f"Entities: {len(entities)} entity/entities") + + alerts = investigation.get("alerts", {}).get("ids", []) + if alerts: + print(f"Associated alerts: {len(alerts)} alert(s)") + + cases = investigation.get("cases", {}).get("ids", []) + if cases: + print(f"Associated cases: {len(cases)} case(s)") + + except Exception as e: + print(f"Error getting investigation: {e}") + + +def example_fetch_associated_investigations(chronicle, alert_id): + """Example 3: Fetch Associated Investigations for Alert. + + Args: + chronicle: Chronicle client instance. + alert_id: Alert ID to fetch investigations for. + """ + print("\n=== Example 3: Fetch Associated Investigations ===") + + if not alert_id: + print("No alert ID provided. Skipping this example.") + return + + try: + print(f"\nFetching investigations for alert ID: {alert_id}") + + response = chronicle.fetch_associated_investigations( + detection_type=DetectionType.ALERT, + alert_ids=[alert_id], + association_limit_per_detection=5, + ) + + associations_list = response.get("associationsList", {}) + experimental_alert = response.get("experimentalAlert", {}) + + print(f"\nResponse contains {len(associations_list)} alert(s)") + + for alert_key, association_data in associations_list.items(): + print(f"\nAlert ID: {alert_key}") + + is_experimental = experimental_alert.get(alert_key, False) + if is_experimental: + print(" Note: This is an experimental alert") + + investigations = association_data.get("investigations", []) + print(f" Associated investigations: {len(investigations)}") + + for idx, inv in enumerate(investigations, 1): + print(f"\n Investigation {idx}:") + print(f" Name: {inv.get('name')}") + print(f" Display Name: {inv.get('displayName', 'N/A')}") + print(f" Status: {inv.get('status', 'N/A')}") + print(f" Verdict: {inv.get('verdict', 'N/A')}") + + if not associations_list: + print("No associated investigations found.") + + except Exception as e: + print(f"Error fetching associated investigations: {e}") + + +def example_trigger_investigation(chronicle, alert_id): + """Example 4: Trigger Investigation for Alert. + + Args: + chronicle: Chronicle client instance. + alert_id: Alert ID to trigger investigation for. + """ + print("\n=== Example 4: Trigger Investigation ===") + + if not alert_id: + print( + "No alert ID provided. Please provide an alert ID to " + "trigger an investigation." + ) + return + + try: + print(f"\nTriggering investigation for alert ID: {alert_id}") + print( + "Note: This will create a new investigation. " + "Use with caution in production." + ) + + confirmation = ( + input("\nDo you want to proceed? (yes/no): ").strip().lower() + ) + if confirmation != "yes": + print("Skipping investigation trigger.") + return + + investigation = chronicle.trigger_investigation(alert_id) + + print("\nInvestigation triggered successfully!") + print(f"Name: {investigation.get('name')}") + print(f"Display Name: {investigation.get('displayName', 'N/A')}") + print(f"Status: {investigation.get('status', 'N/A')}") + print(f"Trigger Type: {investigation.get('triggerType', 'N/A')}") + + investigation_id = investigation.get("name", "").split("/")[-1] + print(f"Investigation ID: {investigation_id}") + + except Exception as e: + print(f"Error triggering investigation: {e}") + print( + "Note: Make sure the alert ID exists and is valid for " + "investigation." + ) + + +def example_list_investigations_with_pagination(chronicle): + """Example 5: List Investigations with Pagination.""" + print("\n=== Example 5: List Investigations with Pagination ===") + + try: + page_size = 5 + total_fetched = 0 + page_num = 1 + next_page_token = None + + print(f"\nFetching investigations (page size: {page_size})") + + while True: + response = chronicle.list_investigations( + page_size=page_size, page_token=next_page_token + ) + + investigations = response.get("investigations", []) + next_page_token = response.get("nextPageToken") + + print(f"\nPage {page_num}:") + print(f" Investigations in this page: {len(investigations)}") + total_fetched += len(investigations) + + for idx, inv in enumerate(investigations, 1): + print( + f" {idx}. {inv.get('name', 'N/A')} - " + f"{inv.get('status', 'N/A')}" + ) + + page_num += 1 + + if not next_page_token or page_num > 3: + break + + print(f"\nTotal investigations fetched: {total_fetched}") + if next_page_token: + print("More pages available...") + + except Exception as e: + print(f"Error during pagination: {e}") + + +EXAMPLES = { + "1": example_list_investigations, + "2": example_get_investigation, + "3": example_fetch_associated_investigations, + "4": example_trigger_investigation, + "5": example_list_investigations_with_pagination, +} + + +def main(): + """Main function to run examples.""" + parser = argparse.ArgumentParser( + description="Run Chronicle Investigations examples" + ) + parser.add_argument( + "--project_id", required=True, help="Google Cloud Project ID" + ) + parser.add_argument( + "--customer_id", required=True, help="Chronicle Customer ID (UUID)" + ) + parser.add_argument( + "--region", default="us", help="Chronicle region (us or eu)" + ) + parser.add_argument( + "--example", + "-e", + help=( + "Example number to run (1-5). " + "If not specified, runs all applicable examples." + ), + ) + parser.add_argument( + "--investigation_id", help="Investigation ID for example 2" + ) + parser.add_argument("--alert_id", help="Alert ID for examples 3 and 4") + + args = parser.parse_args() + + chronicle = get_client(args.project_id, args.customer_id, args.region) + + if args.example: + if args.example in EXAMPLES: + example_func = EXAMPLES[args.example] + if args.example == "2": + investigation_id = args.investigation_id + if not investigation_id: + print( + "No investigation ID provided. " + "Fetching from list operation..." + ) + try: + response = chronicle.list_investigations(page_size=1) + investigations = response.get("investigations", []) + if investigations: + investigation_id = ( + investigations[0].get("name", "").split("/")[-1] + ) + print(f"Using investigation ID: {investigation_id}") + else: + print("No investigations found.") + except Exception as e: + print(f"Error fetching investigation ID: {e}") + example_func(chronicle, investigation_id) + elif args.example in ["3", "4"]: + example_func(chronicle, args.alert_id) + else: + example_func(chronicle) + else: + print( + f"Invalid example number. " + f"Available examples: {', '.join(sorted(EXAMPLES.keys()))}" + ) + else: + print("Running all applicable examples...") + example_list_investigations(chronicle) + + investigation_id = args.investigation_id + if not investigation_id: + print( + "\nNo investigation ID provided. " + "Fetching from list operation..." + ) + try: + response = chronicle.list_investigations(page_size=1) + investigations = response.get("investigations", []) + if investigations: + investigation_id = ( + investigations[0].get("name", "").split("/")[-1] + ) + print(f"Using investigation ID: {investigation_id}") + except Exception as e: + print(f"Error fetching investigation ID: {e}") + + if investigation_id: + example_get_investigation(chronicle, investigation_id) + + if args.alert_id: + example_fetch_associated_investigations(chronicle, args.alert_id) + + example_list_investigations_with_pagination(chronicle) + + print( + "\n\nNote: Example 4 (trigger investigation) requires " + "confirmation and was skipped in batch mode." + ) + print("Run it separately with: --example 4 --alert_id YOUR_ALERT_ID") + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 84981c6..7f7825a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "secops" -version = "0.32.0" +version = "0.33.0" description = "Python SDK for wrapping the Google SecOps API for common use cases" readme = "README.md" requires-python = ">=3.10" diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py index 3c6010a..2070547 100644 --- a/src/secops/chronicle/__init__.py +++ b/src/secops/chronicle/__init__.py @@ -61,6 +61,12 @@ SuggestedAction, ) from secops.chronicle.ioc import list_iocs +from secops.chronicle.investigations import ( + fetch_associated_investigations, + get_investigation, + list_investigations, + trigger_investigation, +) from secops.chronicle.log_ingest import ( create_forwarder, delete_forwarder, @@ -99,6 +105,7 @@ DataExport, DataExportStage, DataExportStatus, + DetectionType, Entity, EntityMetadata, EntityMetrics, @@ -206,6 +213,11 @@ "summarize_entity", # IoC "list_iocs", + # Investigations + "fetch_associated_investigations", + "get_investigation", + "list_investigations", + "trigger_investigation", # Case "get_cases", # Alert @@ -322,6 +334,7 @@ "DashboardView", "InputInterval", "ListBasis", + "DetectionType", "TileType", # Data Table and Reference List "DataTableColumnType", diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index 0c8773f..1217f3d 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -97,6 +97,18 @@ from secops.chronicle.gemini import opt_in_to_gemini as _opt_in_to_gemini from secops.chronicle.gemini import query_gemini as _query_gemini from secops.chronicle.ioc import list_iocs as _list_iocs +from secops.chronicle.investigations import ( + fetch_associated_investigations as _fetch_associated_investigations, +) +from secops.chronicle.investigations import ( + get_investigation as _get_investigation, +) +from secops.chronicle.investigations import ( + list_investigations as _list_investigations, +) +from secops.chronicle.investigations import ( + trigger_investigation as _trigger_investigation, +) from secops.chronicle.log_ingest import create_forwarder as _create_forwarder from secops.chronicle.log_ingest import delete_forwarder as _delete_forwarder from secops.chronicle.log_ingest import get_forwarder as _get_forwarder @@ -1663,6 +1675,105 @@ def test_pipeline( """ return _test_pipeline(self, pipeline, input_logs) + # Investigation methods + + def fetch_associated_investigations( + self, + detection_type: str, + alert_ids: list[str] | None = None, + case_ids: list[str] | None = None, + association_limit_per_detection: int | None = None, + order_by: str | None = None, + ) -> dict[str, Any]: + """Fetches investigations associated with alerts or cases. + + Args: + detection_type: Type of identifiers. Can be a DetectionType + enum value or string. Valid values: + - DetectionType.ALERT + - DetectionType.CASE + - DetectionType.UNSPECIFIED + alert_ids: Alert IDs to fetch investigations for (max 100). + case_ids: Case IDs to fetch investigations for (max 100). + association_limit_per_detection: Max associations per + detection (default 1, max 5). + order_by: Ordering of associations. Supported fields: + "createTime", "createTime desc", "updateTime", + "updateTime desc". + + Returns: + Dictionary containing associations list and experimental flags. + + Raises: + APIError: If the API request fails. + """ + return _fetch_associated_investigations( + self, + detection_type, + alert_ids, + case_ids, + association_limit_per_detection, + order_by, + ) + + def get_investigation(self, investigation_id: str) -> dict[str, Any]: + """Gets an investigation by ID. + + Args: + investigation_id: ID of the investigation to retrieve. + + Returns: + Dictionary containing investigation information. + + Raises: + APIError: If the API request fails. + """ + return _get_investigation(self, investigation_id) + + def list_investigations( + self, + page_size: int | None = None, + page_token: str | None = None, + filter_expr: str | None = None, + order_by: str | None = None, + ) -> dict[str, Any]: + """Lists investigations. + + Args: + page_size: Maximum number of investigations to return + (default 100, max 1000). + page_token: Page token for pagination. + filter_expr: Filter expression. Supported fields: + "alertId", "caseId". Example: 'alertId="alert123"' + order_by: Ordering of investigations. Default is create time + descending. Supported fields: "startTime", "endTime", + "displayName". + + Returns: + Dictionary containing investigations, next page token, and + total size. + + Raises: + APIError: If the API request fails. + """ + return _list_investigations( + self, page_size, page_token, filter_expr, order_by + ) + + def trigger_investigation(self, alert_id: str) -> dict[str, Any]: + """Triggers an investigation for a specific alert. + + Args: + alert_id: The alert ID for which to trigger investigation. + + Returns: + Dictionary containing the created investigation. + + Raises: + APIError: If the API request fails. + """ + return _trigger_investigation(self, alert_id) + def list_rules( self, view: str | None = "FULL", diff --git a/src/secops/chronicle/investigations.py b/src/secops/chronicle/investigations.py new file mode 100644 index 0000000..cc084ff --- /dev/null +++ b/src/secops/chronicle/investigations.py @@ -0,0 +1,201 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Provides investigation management for Chronicle.""" + +from typing import Any + +from secops.chronicle.models import APIVersion, DetectionType +from secops.chronicle.utils.request_utils import chronicle_request + + +def fetch_associated_investigations( + client: "ChronicleClient", + detection_type: str, + alert_ids: list[str] | None = None, + case_ids: list[str] | None = None, + association_limit_per_detection: int | None = None, + order_by: str | None = None, +) -> dict[str, Any]: + """Fetches investigations associated with alerts or cases. + + Args: + client: ChronicleClient instance. + detection_type: The type of the identifiers provided. Can be a + DetectionType enum value or string. Valid values: + - DetectionType.ALERT + - DetectionType.CASE + - DetectionType.UNSPECIFIED + alert_ids: The alert IDs for which associated investigations need + to be fetched. Maximum of 100 alert IDs. + case_ids: The case IDs for which associated investigations need to + be fetched. Maximum of 100 case IDs. + association_limit_per_detection: Maximum number of associations to + return per detection. Default is 1. Maximum value is 5. + order_by: Configures ordering of associations. Supported fields: + "createTime", "createTime desc", "updateTime", "updateTime desc". + + Returns: + Dictionary containing: + - associationsList: Map of alert/case ID to investigation list + - experimentalAlert: Map of alert/case ID to experimental flag + + Raises: + APIError: If the API request fails. + """ + # Validate and convert detection_type to proper format + if isinstance(detection_type, str): + try: + detection_type = DetectionType(detection_type) + except ValueError: + try: + detection_type = DetectionType[detection_type.upper()] + except KeyError as ke: + valid = [f"{m.name} or {m.value}" for m in DetectionType] + raise ValueError( + f'Invalid detection_type: "{detection_type}". ' + f'Valid values: {", ".join(valid)}' + ) from ke + + params: dict[str, Any] = {"detectionType": detection_type} + + if alert_ids is not None: + params["alertIds"] = alert_ids + + if case_ids is not None: + params["caseIds"] = case_ids + + if association_limit_per_detection is not None: + params["associationLimitPerDetection"] = association_limit_per_detection + + if order_by: + params["orderBy"] = order_by + + return chronicle_request( + client, + method="GET", + endpoint_path="investigations:fetchAssociated", + api_version=APIVersion.V1ALPHA, + params=params, + error_message="Failed to fetch associated investigations", + ) + + +def get_investigation( + client: "ChronicleClient", investigation_id: str +) -> dict[str, Any]: + """Gets an investigation by ID. + + Args: + client: ChronicleClient instance. + investigation_id: ID of the investigation to retrieve. Can be + either just the ID or the full resource name. + + Returns: + Dictionary containing investigation information. + + Raises: + APIError: If the API request fails. + """ + if not investigation_id.startswith("projects/"): + endpoint_path = f"investigations/{investigation_id}" + else: + endpoint_path = investigation_id + + return chronicle_request( + client, + method="GET", + endpoint_path=endpoint_path, + api_version=APIVersion.V1ALPHA, + error_message="Failed to get investigation", + ) + + +def list_investigations( + client: "ChronicleClient", + page_size: int | None = None, + page_token: str | None = None, + filter_expr: str | None = None, + order_by: str | None = None, +) -> dict[str, Any]: + """Lists investigations. + + Args: + client: ChronicleClient instance. + page_size: Maximum number of investigations to return. Default is + 100. Maximum value is 1000. + page_token: Page token from a previous list call to retrieve the + next page. + filter_expr: Filter expression to restrict results. + Note: Filters may not be fully supported by the API. + Example: 'alertId="alert123"' (syntax may vary) + order_by: Configures ordering of investigations. Default is by + create time descending. Supported fields: "startTime", + "endTime", "displayName". + + Returns: + Dictionary containing: + - investigations: List of investigation objects + - nextPageToken: Token for next page (if more results exist) + - totalSize: Total number of investigations matching request + + Raises: + APIError: If the API request fails. + """ + params: dict[str, Any] = {} + if page_size is not None: + params["pageSize"] = page_size + if page_token: + params["pageToken"] = page_token + if filter_expr: + params["filter"] = filter_expr + if order_by: + params["orderBy"] = order_by + + return chronicle_request( + client, + method="GET", + endpoint_path="investigations", + api_version=APIVersion.V1ALPHA, + params=params, + error_message="Failed to list investigations", + ) + + +def trigger_investigation( + client: "ChronicleClient", alert_id: str +) -> dict[str, Any]: + """Triggers an investigation for a specific alert. + + Args: + client: ChronicleClient instance. + alert_id: The alert ID for which the investigation needs to be + triggered. + + Returns: + Dictionary containing the created investigation. + + Raises: + APIError: If the API request fails. + """ + body = {"alertId": alert_id} + + return chronicle_request( + client, + method="POST", + endpoint_path="investigations:trigger", + api_version=APIVersion.V1ALPHA, + json=body, + error_message="Failed to trigger investigation", + ) diff --git a/src/secops/chronicle/models.py b/src/secops/chronicle/models.py index 6374c6e..0074bc5 100644 --- a/src/secops/chronicle/models.py +++ b/src/secops/chronicle/models.py @@ -62,6 +62,17 @@ def __str__(self) -> str: return self.value +class DetectionType(StrEnum): + """Detection type for investigation associations. + + The type of identifiers provided for fetching associated investigations. + """ + + UNSPECIFIED = "DETECTION_TYPE_UNSPECIFIED" + ALERT = "DETECTION_TYPE_ALERT" + CASE = "DETECTION_TYPE_CASE" + + @dataclass class TimeInterval: """Time interval with start and end times.""" diff --git a/src/secops/cli/cli_client.py b/src/secops/cli/cli_client.py index 37dd23b..b397e0d 100644 --- a/src/secops/cli/cli_client.py +++ b/src/secops/cli/cli_client.py @@ -23,6 +23,7 @@ from secops.cli.commands.forwarder import setup_forwarder_command from secops.cli.commands.gemini import setup_gemini_command from secops.cli.commands.help import setup_help_command +from secops.cli.commands.investigation import setup_investigation_command from secops.cli.commands.iocs import setup_iocs_command from secops.cli.commands.log import setup_log_command from secops.cli.commands.log_processing import ( @@ -163,6 +164,7 @@ def build_parser() -> argparse.ArgumentParser: setup_udm_search_view_command(subparsers) setup_stats_command(subparsers) setup_entity_command(subparsers) + setup_investigation_command(subparsers) setup_iocs_command(subparsers) setup_log_command(subparsers) setup_log_processing_command(subparsers) diff --git a/src/secops/cli/commands/featured_content_rules.py b/src/secops/cli/commands/featured_content_rules.py index 22d8940..704927e 100644 --- a/src/secops/cli/commands/featured_content_rules.py +++ b/src/secops/cli/commands/featured_content_rules.py @@ -55,6 +55,6 @@ def handle_featured_content_rules_list_command(args, chronicle): filter_expression=getattr(args, "filter_expression", None), ) output_formatter(out, getattr(args, "output", "json")) - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught print(f"Error listing featured content rules: {e}", file=sys.stderr) sys.exit(1) diff --git a/src/secops/cli/commands/investigation.py b/src/secops/cli/commands/investigation.py new file mode 100644 index 0000000..9d308f1 --- /dev/null +++ b/src/secops/cli/commands/investigation.py @@ -0,0 +1,247 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Google SecOps CLI investigation commands""" + +import sys + +from secops.chronicle import DetectionType +from secops.cli.utils.formatters import output_formatter + + +def setup_investigation_command(subparsers): + """Set up the investigation command parser. + + Args: + subparsers: Subparser object from argparse + """ + investigation_parser = subparsers.add_parser( + "investigation", help="Manage investigations" + ) + investigation_subparsers = investigation_parser.add_subparsers( + dest="investigation_command", help="Investigation subcommands" + ) + + investigation_parser.set_defaults( + func=lambda args, _: investigation_parser.print_help() + ) + + _setup_list_subcommand(investigation_subparsers) + _setup_get_subcommand(investigation_subparsers) + _setup_fetch_associated_subcommand(investigation_subparsers) + _setup_trigger_subcommand(investigation_subparsers) + + +def _setup_list_subcommand(subparsers): + """Set up the list investigations subcommand. + + Args: + subparsers: Subparser object for investigation commands + """ + list_parser = subparsers.add_parser("list", help="List investigations") + list_parser.add_argument( + "--page-size", + "--page_size", + dest="page_size", + type=int, + help="Maximum investigations to return (default 100, max 1000)", + ) + list_parser.add_argument( + "--page-token", + "--page_token", + dest="page_token", + help="Page token for pagination", + ) + list_parser.add_argument( + "--filter", + help="Filter expression (e.g., 'alertId=\"alert123\"')", + ) + list_parser.add_argument( + "--order-by", + "--order_by", + dest="order_by", + help=("Order by field (e.g., 'startTime', 'endTime', 'displayName')"), + ) + list_parser.set_defaults(func=_handle_list) + + +def _setup_get_subcommand(subparsers): + """Set up the get investigation subcommand. + + Args: + subparsers: Subparser object for investigation commands + """ + get_parser = subparsers.add_parser("get", help="Get investigation by ID") + get_parser.add_argument( + "--id", + dest="investigation_id", + required=True, + help="Investigation ID to retrieve", + ) + get_parser.set_defaults(func=_handle_get) + + +def _setup_fetch_associated_subcommand(subparsers): + """Set up fetch associated investigations subcommand. + + Args: + subparsers: Subparser object for investigation commands + """ + fetch_parser = subparsers.add_parser( + "fetch-associated", + help="Fetch investigations associated with alerts or cases", + ) + fetch_parser.add_argument( + "--detection-type", + "--detection_type", + dest="detection_type", + required=True, + choices=["ALERT", "CASE", "UNSPECIFIED"], + help="Type of identifiers (ALERT, CASE, or UNSPECIFIED)", + ) + fetch_parser.add_argument( + "--alert-ids", + "--alert_ids", + dest="alert_ids", + help="Comma-separated list of alert IDs (max 100)", + ) + fetch_parser.add_argument( + "--case-ids", + "--case_ids", + dest="case_ids", + help="Comma-separated list of case IDs (max 100)", + ) + fetch_parser.add_argument( + "--association-limit", + "--association_limit", + dest="association_limit", + type=int, + help="Max associations per detection (default 1, max 5)", + ) + fetch_parser.add_argument( + "--order-by", + "--order_by", + dest="order_by", + help="Order by field (e.g., 'createTime', 'updateTime')", + ) + fetch_parser.set_defaults(func=_handle_fetch_associated) + + +def _setup_trigger_subcommand(subparsers): + """Set up trigger investigation subcommand. + + Args: + subparsers: Subparser object for investigation commands + """ + trigger_parser = subparsers.add_parser( + "trigger", help="Trigger investigation for an alert" + ) + trigger_parser.add_argument( + "--alert-id", + "--alert_id", + dest="alert_id", + required=True, + help="Alert ID to trigger investigation for", + ) + trigger_parser.set_defaults(func=_handle_trigger) + + +def _handle_list(args, chronicle): + """Handle list investigations command. + + Args: + args: Command line arguments + chronicle: Chronicle client instance + """ + try: + result = chronicle.list_investigations( + page_size=args.page_size, + page_token=args.page_token, + filter_expr=args.filter, + order_by=args.order_by, + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def _handle_get(args, chronicle): + """Handle get investigation command. + + Args: + args: Command line arguments + chronicle: Chronicle client instance + """ + try: + result = chronicle.get_investigation( + investigation_id=args.investigation_id + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def _handle_fetch_associated(args, chronicle): + """Handle fetch associated investigations command. + + Args: + args: Command line arguments + chronicle: Chronicle client instance + """ + if not args.alert_ids and not args.case_ids: + print( + "Error: Must provide either --alert-ids or --case-ids", + file=sys.stderr, + ) + sys.exit(1) + + alert_ids = None + if args.alert_ids: + alert_ids = [id.strip() for id in args.alert_ids.split(",")] + + case_ids = None + if args.case_ids: + case_ids = [id.strip() for id in args.case_ids.split(",")] + + detection_type = DetectionType[args.detection_type] + + try: + result = chronicle.fetch_associated_investigations( + detection_type=detection_type, + alert_ids=alert_ids, + case_ids=case_ids, + association_limit_per_detection=args.association_limit, + order_by=args.order_by, + ) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def _handle_trigger(args, chronicle): + """Handle trigger investigation command. + + Args: + args: Command line arguments + chronicle: Chronicle client instance + """ + try: + result = chronicle.trigger_investigation(alert_id=args.alert_id) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) diff --git a/tests/chronicle/test_investigations.py b/tests/chronicle/test_investigations.py new file mode 100644 index 0000000..1cfebdc --- /dev/null +++ b/tests/chronicle/test_investigations.py @@ -0,0 +1,573 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Tests for Chronicle investigation functions.""" + +import pytest +from unittest.mock import Mock, patch + +from secops.chronicle.client import ChronicleClient +from secops.chronicle.investigations import ( + fetch_associated_investigations, + get_investigation, + list_investigations, + trigger_investigation, +) +from secops.chronicle.models import DetectionType +from secops.exceptions import APIError + + +@pytest.fixture +def chronicle_client(): + """Create a Chronicle client for testing.""" + with patch("secops.auth.SecOpsAuth") as mock_auth: + mock_session = Mock() + mock_session.headers = {} + mock_auth.return_value.session = mock_session + return ChronicleClient( + customer_id="test-customer", project_id="test-project" + ) + + +@pytest.fixture +def mock_response(): + """Create a mock API response.""" + mock = Mock() + mock.status_code = 200 + mock.json.return_value = {"success": True} + return mock + + +@pytest.fixture +def mock_error_response(): + """Create a mock error API response.""" + mock = Mock() + mock.status_code = 400 + mock.text = "Error message" + mock.raise_for_status.side_effect = Exception("API Error") + return mock + + +def test_fetch_associated_investigations_with_alert_ids( + chronicle_client, mock_response +): + """Test fetch_associated_investigations with alert IDs.""" + mock_response.json.return_value = { + "associationsList": { + "alert1": { + "investigations": [ + { + "name": "projects/123/locations/us/instances/456/" + "investigations/inv1" + } + ] + } + } + } + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = fetch_associated_investigations( + chronicle_client, + detection_type=DetectionType.ALERT, + alert_ids=["alert1", "alert2"], + association_limit_per_detection=5, + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "GET" + assert "investigations:fetchAssociated" in call_args[1]["url"] + assert call_args[1]["params"]["detectionType"] == "DETECTION_TYPE_ALERT" + assert call_args[1]["params"]["alertIds"] == ["alert1", "alert2"] + assert call_args[1]["params"]["associationLimitPerDetection"] == 5 + assert result["associationsList"] is not None + + +def test_fetch_associated_investigations_with_case_ids( + chronicle_client, mock_response +): + """Test fetch_associated_investigations with case IDs.""" + mock_response.json.return_value = { + "associationsList": { + "case1": { + "investigations": [ + { + "name": "projects/123/locations/us/instances/456/" + "investigations/inv1" + } + ] + } + } + } + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = fetch_associated_investigations( + chronicle_client, + detection_type=DetectionType.CASE, + case_ids=["case1"], + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["params"]["detectionType"] == "DETECTION_TYPE_CASE" + assert call_args[1]["params"]["caseIds"] == ["case1"] + assert result["associationsList"] is not None + + +def test_fetch_associated_investigations_with_string_detection_type( + chronicle_client, mock_response +): + """Test fetch_associated_investigations with string detection type.""" + mock_response.json.return_value = {"associationsList": {}} + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = fetch_associated_investigations( + chronicle_client, + detection_type="DETECTION_TYPE_ALERT", + alert_ids=["alert1"], + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["params"]["detectionType"] == "DETECTION_TYPE_ALERT" + assert result is not None + + +def test_fetch_associated_investigations_with_enum_name( + chronicle_client, mock_response +): + """Test fetch_associated_investigations with enum name string.""" + mock_response.json.return_value = {"associationsList": {}} + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = fetch_associated_investigations( + chronicle_client, detection_type="ALERT", alert_ids=["alert1"] + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["params"]["detectionType"] == "DETECTION_TYPE_ALERT" + assert result is not None + + +def test_fetch_associated_investigations_with_order_by( + chronicle_client, mock_response +): + """Test fetch_associated_investigations with order_by parameter.""" + mock_response.json.return_value = {"associationsList": {}} + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = fetch_associated_investigations( + chronicle_client, + detection_type=DetectionType.ALERT, + alert_ids=["alert1"], + order_by="createTime desc", + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["params"]["orderBy"] == "createTime desc" + assert result is not None + + +def test_fetch_associated_investigations_invalid_detection_type( + chronicle_client, +): + """Test fetch_associated_investigations with invalid detection type.""" + with pytest.raises(ValueError) as exc_info: + fetch_associated_investigations( + chronicle_client, + detection_type="INVALID_TYPE", + alert_ids=["alert1"], + ) + + assert "Invalid detection_type" in str(exc_info.value) + assert "INVALID_TYPE" in str(exc_info.value) + + +def test_fetch_associated_investigations_error( + chronicle_client, mock_error_response +): + """Test fetch_associated_investigations with error response.""" + with patch.object( + chronicle_client.session, "request", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + fetch_associated_investigations( + chronicle_client, + detection_type=DetectionType.ALERT, + alert_ids=["alert1"], + ) + + assert "Failed to fetch associated investigations" in str( + exc_info.value + ) + + +def test_get_investigation_with_id(chronicle_client, mock_response): + """Test get_investigation with investigation ID.""" + investigation_id = "82fb18cb-bfc0-4d7f-acf2-80508e145da2" + mock_response.json.return_value = { + "name": f"projects/123/locations/us/instances/456/investigations/" + f"{investigation_id}", + "verdict": "FALSE_POSITIVE", + "status": "STATUS_COMPLETED_SUCCESS", + } + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = get_investigation(chronicle_client, investigation_id) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "GET" + assert f"investigations/{investigation_id}" in call_args[1]["url"] + assert result["verdict"] == "FALSE_POSITIVE" + assert result["status"] == "STATUS_COMPLETED_SUCCESS" + + +def test_get_investigation_with_full_resource_name( + chronicle_client, mock_response +): + """Test get_investigation with full resource name.""" + full_name = ( + "projects/123/locations/us/instances/456/investigations/" + "82fb18cb-bfc0-4d7f-acf2-80508e145da2" + ) + mock_response.json.return_value = { + "name": full_name, + "verdict": "MALICIOUS", + "status": "STATUS_COMPLETED_SUCCESS", + } + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = get_investigation(chronicle_client, full_name) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "GET" + assert full_name in call_args[1]["url"] + assert result["verdict"] == "MALICIOUS" + + +def test_get_investigation_error(chronicle_client, mock_error_response): + """Test get_investigation with error response.""" + with patch.object( + chronicle_client.session, "request", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + get_investigation( + chronicle_client, "82fb18cb-bfc0-4d7f-acf2-80508e145da2" + ) + + assert "Failed to get investigation" in str(exc_info.value) + + +def test_list_investigations_default(chronicle_client, mock_response): + """Test list_investigations with default parameters.""" + mock_response.json.return_value = { + "investigations": [ + {"name": "inv1", "verdict": "MALICIOUS"}, + {"name": "inv2", "verdict": "FALSE_POSITIVE"}, + ], + "totalSize": 2, + } + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = list_investigations(chronicle_client) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "GET" + assert "investigations" in call_args[1]["url"] + assert "params" in call_args[1] + assert len(result["investigations"]) == 2 + + +def test_list_investigations_with_page_size(chronicle_client, mock_response): + """Test list_investigations with page_size parameter.""" + mock_response.json.return_value = { + "investigations": [{"name": "inv1"}], + "totalSize": 1, + } + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = list_investigations(chronicle_client, page_size=10) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["params"]["pageSize"] == 10 + assert result["totalSize"] == 1 + + +def test_list_investigations_with_page_token(chronicle_client, mock_response): + """Test list_investigations with page_token parameter.""" + mock_response.json.return_value = { + "investigations": [{"name": "inv1"}], + "nextPageToken": "token123", + } + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = list_investigations( + chronicle_client, page_token="previous_token" + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["params"]["pageToken"] == "previous_token" + assert result["nextPageToken"] == "token123" + + +def test_list_investigations_with_filter(chronicle_client, mock_response): + """Test list_investigations with filter expression.""" + mock_response.json.return_value = { + "investigations": [{"name": "inv1"}], + "totalSize": 1, + } + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = list_investigations( + chronicle_client, filter_expr='alertId="alert123"' + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["params"]["filter"] == 'alertId="alert123"' + assert result["totalSize"] == 1 + + +def test_list_investigations_with_order_by(chronicle_client, mock_response): + """Test list_investigations with order_by parameter.""" + mock_response.json.return_value = { + "investigations": [{"name": "inv1"}], + "totalSize": 1, + } + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = list_investigations( + chronicle_client, order_by="startTime desc" + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["params"]["orderBy"] == "startTime desc" + assert result["totalSize"] == 1 + + +def test_list_investigations_with_all_params(chronicle_client, mock_response): + """Test list_investigations with all parameters.""" + mock_response.json.return_value = { + "investigations": [{"name": "inv1"}], + "nextPageToken": "next_token", + "totalSize": 100, + } + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = list_investigations( + chronicle_client, + page_size=50, + page_token="current_token", + filter_expr='status="COMPLETED"', + order_by="endTime", + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + params = call_args[1]["params"] + assert params["pageSize"] == 50 + assert params["pageToken"] == "current_token" + assert params["filter"] == 'status="COMPLETED"' + assert params["orderBy"] == "endTime" + assert result["totalSize"] == 100 + + +def test_list_investigations_empty_results(chronicle_client, mock_response): + """Test list_investigations with empty results.""" + mock_response.json.return_value = {"investigations": [], "totalSize": 0} + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = list_investigations(chronicle_client) + + mock_request.assert_called_once() + assert result["investigations"] == [] + assert result["totalSize"] == 0 + + +def test_list_investigations_error(chronicle_client, mock_error_response): + """Test list_investigations with error response.""" + with patch.object( + chronicle_client.session, "request", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + list_investigations(chronicle_client) + + assert "Failed to list investigations" in str(exc_info.value) + + +def test_trigger_investigation_success(chronicle_client, mock_response): + """Test trigger_investigation with successful response.""" + alert_id = "de_e0b58924-dc71-ad17-8cc0-603b7d54b1ad" + mock_response.json.return_value = { + "name": "projects/123/locations/us/instances/456/investigations/inv1", + "verdict": "UNSPECIFIED", + "status": "STATUS_IN_PROGRESS", + "alerts": {"ids": [alert_id]}, + } + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = trigger_investigation(chronicle_client, alert_id) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "POST" + assert "investigations:trigger" in call_args[1]["url"] + assert call_args[1]["json"]["alertId"] == alert_id + assert result["status"] == "STATUS_IN_PROGRESS" + assert alert_id in result["alerts"]["ids"] + + +def test_trigger_investigation_with_different_alert_format( + chronicle_client, mock_response +): + """Test trigger_investigation with different alert ID format.""" + alert_id = "alert_12345" + mock_response.json.return_value = { + "name": "projects/123/locations/us/instances/456/investigations/inv1", + "verdict": "UNSPECIFIED", + "status": "STATUS_IN_PROGRESS", + } + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = trigger_investigation(chronicle_client, alert_id) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["method"] == "POST" + assert call_args[1]["json"]["alertId"] == alert_id + assert result["status"] == "STATUS_IN_PROGRESS" + + +def test_trigger_investigation_error(chronicle_client, mock_error_response): + """Test trigger_investigation with error response.""" + with patch.object( + chronicle_client.session, "request", return_value=mock_error_response + ): + with pytest.raises(APIError) as exc_info: + trigger_investigation( + chronicle_client, "de_e0b58924-dc71-ad17-8cc0-603b7d54b1ad" + ) + + assert "Failed to trigger investigation" in str(exc_info.value) + + +def test_fetch_associated_investigations_unspecified_type( + chronicle_client, mock_response +): + """Test fetch_associated_investigations with UNSPECIFIED type.""" + mock_response.json.return_value = {"associationsList": {}} + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = fetch_associated_investigations( + chronicle_client, + detection_type=DetectionType.UNSPECIFIED, + alert_ids=["alert1"], + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + assert ( + call_args[1]["params"]["detectionType"] + == "DETECTION_TYPE_UNSPECIFIED" + ) + assert result is not None + + +def test_fetch_associated_investigations_no_optional_params( + chronicle_client, mock_response +): + """Test fetch_associated_investigations without optional params.""" + mock_response.json.return_value = {"associationsList": {}} + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = fetch_associated_investigations( + chronicle_client, + detection_type=DetectionType.ALERT, + alert_ids=["alert1"], + ) + + mock_request.assert_called_once() + call_args = mock_request.call_args + params = call_args[1]["params"] + assert "associationLimitPerDetection" not in params + assert "orderBy" not in params + assert result is not None + + +def test_list_investigations_no_optional_params( + chronicle_client, mock_response +): + """Test list_investigations without optional parameters.""" + mock_response.json.return_value = { + "investigations": [{"name": "inv1"}], + "totalSize": 1, + } + + with patch.object( + chronicle_client.session, "request", return_value=mock_response + ) as mock_request: + result = list_investigations(chronicle_client) + + mock_request.assert_called_once() + call_args = mock_request.call_args + params = call_args[1]["params"] + assert "pageSize" not in params or params.get("pageSize") is None + assert "pageToken" not in params or params.get("pageToken") is None + assert "filter" not in params + assert "orderBy" not in params + assert result["totalSize"] == 1 diff --git a/tests/chronicle/test_investigations_integration.py b/tests/chronicle/test_investigations_integration.py new file mode 100644 index 0000000..87a5dd3 --- /dev/null +++ b/tests/chronicle/test_investigations_integration.py @@ -0,0 +1,150 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Integration tests for Chronicle Investigations API. + +These tests require valid credentials and API access. +Note: Investigations cannot be deleted via API, so cleanup is limited. +""" +import pytest +from datetime import datetime, timedelta + +from secops import SecOpsClient +from secops.chronicle import DetectionType +from secops.exceptions import APIError + +from ..config import CHRONICLE_CONFIG, SERVICE_ACCOUNT_JSON + + +@pytest.mark.integration +def test_list_and_get_investigation(): + """Test listing and getting investigations workflow.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + try: + # Step 1: List investigations + list_result = chronicle.list_investigations() + assert isinstance(list_result, dict) + assert "investigations" in list_result + assert isinstance(list_result["investigations"], list) + print(f"Found {len(list_result['investigations'])} investigations") + + if not list_result["investigations"]: + pytest.skip("No investigations available to test get operation") + + # Step 2: Get a specific investigation + inv = list_result["investigations"][0] + investigation_id = inv["name"].split("/")[-1] + print(f"Testing get for investigation: {investigation_id}") + + get_result = chronicle.get_investigation(investigation_id) + assert isinstance(get_result, dict) + assert "name" in get_result + assert investigation_id in get_result["name"] + print(f"Successfully retrieved investigation: {get_result['name']}") + + if "verdict" in get_result: + print(f"Investigation verdict: {get_result['verdict']}") + if "status" in get_result: + print(f"Investigation status: {get_result['status']}") + + except APIError as e: + print(f"API Error: {str(e)}") + pytest.skip(f"List/get investigation test skipped: {str(e)}") + + +@pytest.mark.integration +def test_list_investigations_with_pagination(): + """Test listing investigations with pagination parameters.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + try: + result = chronicle.list_investigations(page_size=5) + assert isinstance(result, dict) + assert "investigations" in result + assert len(result["investigations"]) <= 5 + print( + f"Retrieved {len(result['investigations'])} " + f"investigations with page_size=5" + ) + + if "nextPageToken" in result: + next_result = chronicle.list_investigations( + page_size=5, page_token=result["nextPageToken"] + ) + assert isinstance(next_result, dict) + assert "investigations" in next_result + print( + f"Retrieved next page with " + f"{len(next_result['investigations'])} investigations" + ) + + except APIError as e: + print(f"API Error: {str(e)}") + pytest.skip(f"List investigations pagination test skipped: {str(e)}") + + +@pytest.mark.integration +def test_trigger_and_fetch_associated_workflow(): + """Test triggering investigation and fetching associated ones.""" + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + alert_id = None + + try: + # Step 1: Get an alert ID to trigger investigation + print("Fetching alerts to get an alert ID...") + end_time = datetime.now() + start_time = end_time - timedelta(days=30) + alerts_result = chronicle.get_alerts( + start_time=start_time, end_time=end_time, max_alerts=1 + ) + if not alerts_result.get("alerts"): + pytest.skip("No alerts available to test trigger operation") + alert_id = alerts_result["alerts"]["alerts"][0]["id"] + print(f"Using alert ID: {alert_id}") + + # Step 2: Trigger investigation for the alert + print(f"Triggering investigation for alert: {alert_id}") + trigger_result = chronicle.trigger_investigation(alert_id) + assert isinstance(trigger_result, dict) + assert "name" in trigger_result + print( + f"Investigation triggered successfully: " + f"{trigger_result['name']}" + ) + + if "status" in trigger_result: + print(f"Investigation status: {trigger_result['status']}") + + # Step 3: Fetch associated investigations for the alert + print(f"Fetching investigations associated with alert: {alert_id}") + fetch_result = chronicle.fetch_associated_investigations( + detection_type=DetectionType.ALERT, + alert_ids=[alert_id], + association_limit_per_detection=5, + ) + assert isinstance(fetch_result, dict) + print(f"Fetch associated result: {fetch_result.keys()}") + + if "associationsList" in fetch_result: + associations = fetch_result["associationsList"] + print(f"Found associations for {len(associations)} detections") + + except APIError as e: + print(f"API Error: {str(e)}") + pytest.skip(f"Trigger/fetch workflow test skipped: {str(e)}") diff --git a/tests/cli/test_investigation_integration.py b/tests/cli/test_investigation_integration.py new file mode 100644 index 0000000..406cfdd --- /dev/null +++ b/tests/cli/test_investigation_integration.py @@ -0,0 +1,296 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Integration tests for the SecOps CLI investigation commands.""" + +import json +import subprocess +from datetime import datetime, timedelta + +import pytest + +from tests.config import CHRONICLE_CONFIG + + +@pytest.mark.integration +def test_cli_investigation_list_and_get(cli_env, common_args): + """Test investigation list and get commands in a workflow.""" + # Step 1: List investigations + list_cmd = ["secops"] + common_args + ["investigation", "list"] + + list_result = subprocess.run( + list_cmd, env=cli_env, capture_output=True, text=True + ) + assert list_result.returncode == 0 + + try: + list_output = json.loads(list_result.stdout) + assert isinstance(list_output, dict) + assert "investigations" in list_output + assert isinstance(list_output["investigations"], list) + print(f"Found {len(list_output['investigations'])} investigations") + + if not list_output.get("investigations"): + pytest.skip("No investigations found to test get command") + + # Step 2: Get a specific investigation + investigation_id = list_output["investigations"][0]["name"].split("/")[ + -1 + ] + print(f"Testing get for investigation: {investigation_id}") + + get_cmd = ( + [ + "secops", + ] + + common_args + + [ + "investigation", + "get", + "--id", + investigation_id, + ] + ) + + get_result = subprocess.run( + get_cmd, env=cli_env, capture_output=True, text=True + ) + assert get_result.returncode == 0 + + get_output = json.loads(get_result.stdout) + assert isinstance(get_output, dict) + assert "name" in get_output + assert investigation_id in get_output["name"] + print(f"Successfully retrieved investigation: {get_output['name']}") + + except json.JSONDecodeError: + pytest.fail("Failed to parse JSON output") + + +@pytest.mark.integration +def test_cli_investigation_list_with_pagination(cli_env, common_args): + """Test the investigation list command with page size.""" + cmd = ( + [ + "secops", + ] + + common_args + + ["investigation", "list", "--page-size", "5"] + ) + + result = subprocess.run(cmd, env=cli_env, capture_output=True, text=True) + + assert result.returncode == 0 + + try: + output = json.loads(result.stdout) + assert isinstance(output, dict) + assert "investigations" in output + assert len(output["investigations"]) <= 5 + print( + f"Retrieved {len(output['investigations'])} " + f"investigations with page_size=5" + ) + except json.JSONDecodeError: + assert "Error:" not in result.stdout + + +@pytest.mark.integration +def test_cli_investigation_trigger_and_fetch_workflow(cli_env, common_args): + """Test triggering and fetching associated investigations workflow.""" + # Step 1: Get an alert ID + end_time = datetime.now() + start_time = end_time - timedelta(days=7) + + alert_cmd = ( + ["secops"] + + common_args + + [ + "alert", + "--start-time", + start_time.isoformat(), + "--end-time", + end_time.isoformat(), + "--max-alerts", + "5", + ] + ) + + alert_result = subprocess.run( + alert_cmd, env=cli_env, capture_output=True, text=True + ) + + if alert_result.returncode != 0: + pytest.skip("Could not fetch alerts to test trigger operation") + + try: + alerts_data = json.loads(alert_result.stdout) + if not alerts_data or "alerts" not in alerts_data: + pytest.skip("No alerts available to test trigger operation") + + nested_alerts = alerts_data["alerts"].get("alerts", []) + if not nested_alerts: + pytest.skip("No alerts available to test trigger operation") + + alert_id = nested_alerts[0]["id"] + print(f"Using alert ID: {alert_id}") + + except (json.JSONDecodeError, KeyError): + pytest.skip("Could not parse alerts response") + + # Step 2: Trigger investigation + trigger_cmd = ( + [ + "secops", + ] + + common_args + + [ + "investigation", + "trigger", + "--alert-id", + alert_id, + ] + ) + + trigger_result = subprocess.run( + trigger_cmd, env=cli_env, capture_output=True, text=True + ) + + assert trigger_result.returncode == 0 + + try: + trigger_output = json.loads(trigger_result.stdout) + assert isinstance(trigger_output, dict) + assert "name" in trigger_output + print( + f"Investigation triggered successfully: " + f"{trigger_output['name']}" + ) + except json.JSONDecodeError: + pytest.fail("Failed to parse trigger response") + + # Step 3: Fetch associated investigations + fetch_cmd = ( + [ + "secops", + ] + + common_args + + [ + "investigation", + "fetch-associated", + "--detection-type", + "ALERT", + "--alert-ids", + alert_id, + "--association-limit", + "5", + ] + ) + + fetch_result = subprocess.run( + fetch_cmd, env=cli_env, capture_output=True, text=True + ) + + assert fetch_result.returncode == 0 + + try: + fetch_output = json.loads(fetch_result.stdout) + assert isinstance(fetch_output, dict) + print(f"Fetch associated result keys: {fetch_output.keys()}") + + if "associationsList" in fetch_output: + print( + f"Found associations for " + f"{len(fetch_output['associationsList'])} detections" + ) + except json.JSONDecodeError: + pytest.fail("Failed to parse fetch response") + + +@pytest.mark.integration +def test_cli_investigation_fetch_associated_with_multiple_alerts( + cli_env, common_args +): + """Test fetching associated investigations with multiple alert IDs.""" + # Get multiple alert IDs + end_time = datetime.now() + start_time = end_time - timedelta(days=7) + + alert_cmd = ( + ["secops"] + + common_args + + [ + "alert", + "--start-time", + start_time.isoformat(), + "--end-time", + end_time.isoformat(), + "--max-alerts", + "10", + ] + ) + + alert_result = subprocess.run( + alert_cmd, env=cli_env, capture_output=True, text=True + ) + + if alert_result.returncode != 0: + pytest.skip("Could not fetch alerts") + + try: + alerts_data = json.loads(alert_result.stdout) + if not alerts_data or "alerts" not in alerts_data: + pytest.skip("No alerts available for this test") + + nested_alerts = alerts_data["alerts"].get("alerts", []) + if not nested_alerts or len(nested_alerts) < 2: + pytest.skip("Need at least 2 alerts for this test") + + alert_ids = [alert["id"] for alert in nested_alerts[:3]] + alert_ids_str = ",".join(alert_ids) + print(f"Using alert IDs: {alert_ids_str}") + + except (json.JSONDecodeError, KeyError): + pytest.skip("Could not parse alerts response") + + # Fetch associated investigations for multiple alerts + fetch_cmd = ( + [ + "secops", + ] + + common_args + + [ + "investigation", + "fetch-associated", + "--detection-type", + "ALERT", + "--alert-ids", + alert_ids_str, + ] + ) + + fetch_result = subprocess.run( + fetch_cmd, env=cli_env, capture_output=True, text=True + ) + + assert fetch_result.returncode == 0 + + try: + fetch_output = json.loads(fetch_result.stdout) + assert isinstance(fetch_output, dict) + print( + f"Successfully fetched associations for " f"{len(alert_ids)} alerts" + ) + except json.JSONDecodeError: + pytest.fail("Failed to parse fetch response")