From e4d89d0651b60904be79b4b191295078058b9d12 Mon Sep 17 00:00:00 2001 From: "M. Bilgehan Ertan" Date: Fri, 4 Apr 2025 21:20:44 +0200 Subject: [PATCH 1/5] refactor: client file, remove repetitive patterns --- python_catalyst/client.py | 683 +++++++++++++++----------------------- 1 file changed, 263 insertions(+), 420 deletions(-) diff --git a/python_catalyst/client.py b/python_catalyst/client.py index 6b94325..31153ca 100644 --- a/python_catalyst/client.py +++ b/python_catalyst/client.py @@ -340,6 +340,151 @@ def extract_entities_from_member_content( return entities + def _process_entity( + self, + entity: Dict, + entity_type: str, + converter_method: str, + related_objects: List, + collected_object_refs: List, + entity_mappings: Dict, + external_reference: stix2.ExternalReference = None, + ) -> None: + """ + Process a single entity and add it to the report. + + Args: + entity: The entity dictionary + entity_type: Type of the entity (e.g., 'malware', 'tool') + converter_method: Name of the converter method to use + related_objects: List to add the created objects to + collected_object_refs: List to add object refs to for the report + entity_mappings: Dictionary mapping entity types to lists of entity IDs + external_reference: Optional reference to the report + """ + entity_id = entity.get("id") + entity_value = entity.get("value") + context = entity.get("context") + + if entity_id and entity_value: + converter_func = getattr(self.converter, converter_method) + stix_object = converter_func( + entity_id, + entity_value, + context, + report_reference=external_reference, + ) + + related_objects.append(stix_object) + collected_object_refs.append(stix_object.id) + entity_mappings[entity_type].append(stix_object.id) + + if self.logger: + self.logger.debug(f"Added {entity_type}: {entity_value}") + + def _process_entities( + self, + entities: List[Dict], + entity_type: str, + converter_method: str, + related_objects: List, + collected_object_refs: List, + entity_mappings: Dict, + external_reference: stix2.ExternalReference = None, + ) -> None: + """ + Process a list of entities of the same type. + + Args: + entities: List of entity dictionaries + entity_type: Type of the entities + converter_method: Name of the converter method to use + related_objects: List to add the created objects to + collected_object_refs: List to add object refs to for the report + entity_mappings: Dictionary mapping entity types to lists of entity IDs + external_reference: Optional reference to the report + """ + for entity in entities: + self._process_entity( + entity, + entity_type, + converter_method, + related_objects, + collected_object_refs, + entity_mappings, + external_reference, + ) + + def _process_threat_actor( + self, + threat_actor: Dict, + related_objects: List, + collected_object_refs: List, + entity_mappings: Dict, + external_reference: stix2.ExternalReference = None, + ) -> None: + """ + Process a threat actor entity with detailed information. + + Args: + threat_actor: The threat actor dictionary + related_objects: List to add the created objects to + collected_object_refs: List to add object refs to for the report + entity_mappings: Dictionary mapping entity types to lists of entity IDs + external_reference: Optional reference to the report + """ + entity_id = threat_actor.get("id") + entity_value = threat_actor.get("value") + context = threat_actor.get("context") + + if entity_id and entity_value: + try: + detailed_threat_actor = self._get_threat_actor_details(entity_id) + if self.logger: + self.logger.debug( + f"Retrieved detailed information for threat actor: {entity_value}" + ) + + ta_object = self.converter.create_detailed_threat_actor( + detailed_threat_actor, + context, + report_reference=external_reference, + ) + + is_abstract = detailed_threat_actor.get("is_abstract", False) + related_objects.append(ta_object) + collected_object_refs.append(ta_object.id) + + if is_abstract: + if "intrusion-set" not in entity_mappings: + entity_mappings["intrusion-set"] = [] + entity_mappings["intrusion-set"].append(ta_object.id) + if self.logger: + self.logger.debug(f"Added intrusion set: {entity_value}") + else: + entity_mappings["threat-actor"].append(ta_object.id) + if self.logger: + self.logger.debug(f"Added threat actor: {entity_value}") + + except Exception as e: + if self.logger: + self.logger.warning( + f"Failed to fetch detailed threat actor info for {entity_id}: {str(e)}" + ) + # Fall back to basic threat actor creation + ta_object = self.converter.create_threat_actor( + entity_id, + entity_value, + context, + report_reference=external_reference, + ) + related_objects.append(ta_object) + collected_object_refs.append(ta_object.id) + entity_mappings["threat-actor"].append(ta_object.id) + + if self.logger: + self.logger.debug(f"Added threat actor: {entity_value}") + def create_report_from_member_content_with_references( self, content: Dict ) -> Tuple[Dict, List[Dict]]: @@ -391,7 +536,9 @@ def create_report_from_member_content_with_references( f"report--{str(uuid.uuid5(uuid.NAMESPACE_URL, f'catalyst-{content_id}'))}" ) + # Initialize collections related_objects = [] + collected_object_refs = [content_marking.id] entity_mappings = { "threat-actor": [], @@ -405,6 +552,7 @@ def create_report_from_member_content_with_references( "location": [], } + # Setup custom properties custom_properties = { "x_opencti_report_status": 2, "x_opencti_create_indicator": True, @@ -424,447 +572,142 @@ def create_report_from_member_content_with_references( if "tags" in content and isinstance(content["tags"], list): custom_properties["x_catalyst_tags"] = content["tags"] - if content_id: - try: - all_entities = self.extract_entities_from_member_content(content_id) - - collected_object_refs = [content_marking.id] - - # Create the external reference for the report - external_reference = self.converter._create_external_reference( - source_name=self.converter.author_name, - external_id=content_id, - is_report=True, - ) - - # Process observables - for observable in all_entities.get("observable", []): - entity_id = observable.get("id") - entity_value = observable.get("value") - entity_type = observable.get("type") - - if entity_id and entity_value and entity_type: - observable_data = { - "id": entity_id, - "value": entity_value, - "type": entity_type, - "post_id": content_id, - } + if not content_id: + if self.logger: + self.logger.error(f"Error creating report from content {content_id}") + return None, [] - observable_data["tlp_marking"] = content_marking + try: + all_entities = self.extract_entities_from_member_content(content_id) + external_reference = self.converter._create_external_reference( + source_name=self.converter.author_name, + external_id=content_id, + is_report=True, + ) - ( - indicator, - relationships, - observable_obj, - ) = self.converter.convert_observable_to_stix( - observable_data, - report_reference=external_reference, - report_id=report_id, - ) + # Process observables + for observable in all_entities.get("observable", []): + entity_id = observable.get("id") + entity_value = observable.get("value") + entity_type = observable.get("type") + + if entity_id and entity_value and entity_type: + observable_data = { + "id": entity_id, + "value": entity_value, + "type": entity_type, + "post_id": content_id, + "tlp_marking": content_marking, + } + + ( + indicator, + relationships, + observable_obj, + ) = self.converter.convert_observable_to_stix( + observable_data, + report_reference=external_reference, + report_id=report_id, + ) - if indicator: - # Add the indicator to the report - related_objects.append(indicator) - collected_object_refs.append(indicator.id) - entity_mappings["indicator"].append(indicator.id) - - # Add the observable to the report if it exists - if observable_obj: - related_objects.append(observable_obj) - collected_object_refs.append(observable_obj.id) - if self.logger: - self.logger.debug( - f"Added observable: {entity_value} ({entity_type})" - ) - - # Add the relationships to related objects if they exist - for relationship in relationships: - related_objects.append(relationship) - collected_object_refs.append(relationship.id) - if self.logger: - self.logger.debug( - f"Added relationship for observable {entity_value}" - ) + if indicator: + related_objects.append(indicator) + collected_object_refs.append(indicator.id) + entity_mappings["indicator"].append(indicator.id) + if observable_obj: + related_objects.append(observable_obj) + collected_object_refs.append(observable_obj.id) if self.logger: self.logger.debug( - f"Added indicator: {entity_value} ({entity_type})" - ) - - # Process threat actors - for threat_actor in all_entities.get( - "threatactor", [] - ) + all_entities.get("threat_actor", []): - entity_id = threat_actor.get("id") - entity_value = threat_actor.get("value") - context = threat_actor.get("context") - - if entity_id and entity_value: - try: - # Fetch detailed information for the threat actor - detailed_threat_actor = self._get_threat_actor_details( - entity_id + f"Added observable: {entity_value} ({entity_type})" ) - if self.logger: - self.logger.debug( - f"Retrieved detailed information for threat actor: {entity_value}" - ) - - ta_object = self.converter.create_detailed_threat_actor( - detailed_threat_actor, - context, - report_reference=external_reference, - ) - - # Process relationships from detailed threat actor data regardless of type - """ - # 1. Process attack patterns - attack_pattern_data = detailed_threat_actor.get("attack_patterns", []) - if isinstance(attack_pattern_data, list): - for attack_pattern in attack_pattern_data: - pattern_id = attack_pattern.get("id") - pattern_name = attack_pattern.get("name") - - if pattern_id and pattern_name: - # Create the attack pattern - attack_pattern_obj = self.converter.create_attack_pattern( - pattern_id, - pattern_name, - attack_pattern.get("description"), - report_reference=external_reference, - ) - related_objects.append(attack_pattern_obj) - collected_object_refs.append(attack_pattern_obj.id) - - # Create relationship - rel = self._create_relationship_objects( - ta_object.id, - attack_pattern_obj.id, - "uses", - external_reference - ) - related_objects.append(rel) - collected_object_refs.append(rel.id) - - if self.logger: - self.logger.debug(f"Added attack pattern {pattern_name} used by {entity_value}") - - # 2. Process campaigns - campaign_data = detailed_threat_actor.get("campaigns", []) - if isinstance(campaign_data, list): - for campaign in campaign_data: - campaign_id = campaign.get("id") - campaign_name = campaign.get("name") - - if campaign_id and campaign_name: - # Create the campaign - campaign_obj = self.converter.create_campaign( - campaign_id, - campaign_name, - campaign.get("description"), - report_reference=external_reference, - ) - related_objects.append(campaign_obj) - collected_object_refs.append(campaign_obj.id) - - # Create relationship with appropriate relationship type - rel = self._create_relationship_objects( - ta_object.id, - campaign_obj.id, - "attributed-to", - external_reference - ) - related_objects.append(rel) - collected_object_refs.append(rel.id) - - if self.logger: - self.logger.debug(f"Added campaign {campaign_name} attributed to {entity_value}") - - # 3. Process countries/suspected origins - origin_data = detailed_threat_actor.get("suspected_origins", []) - if isinstance(origin_data, list): - for country in origin_data: - country_id = country.get("id") - country_name = country.get("name") - - if country_id and country_name: - # Create the location - location_obj = self.converter.create_country_location( - country_id, - country_name, - None, - report_reference=external_reference, - ) - related_objects.append(location_obj) - collected_object_refs.append(location_obj.id) - - # Create relationship - rel = self._create_relationship_objects( - ta_object.id, - location_obj.id, - "originates-from", - external_reference - ) - related_objects.append(rel) - collected_object_refs.append(rel.id) - - if self.logger: - self.logger.debug(f"Added suspected origin {country_name} for {entity_value}") - """ - is_abstract = detailed_threat_actor.get( - "is_abstract", False - ) - - related_objects.append(ta_object) - collected_object_refs.append(ta_object.id) - - if is_abstract: - if "intrusion-set" not in entity_mappings: - entity_mappings["intrusion-set"] = [] - entity_mappings["intrusion-set"].append(ta_object.id) - if self.logger: - self.logger.debug( - f"Added intrusion set: {entity_value}" - ) - else: - entity_mappings["threat-actor"].append(ta_object.id) - if self.logger: - self.logger.debug( - f"Added threat actor: {entity_value}" - ) - except Exception as e: - if self.logger: - self.logger.warning( - f"Failed to fetch detailed threat actor info for {entity_id}: {str(e)}" - ) - # Fall back to basic threat actor creation - ta_object = self.converter.create_threat_actor( - entity_id, - entity_value, - context, - report_reference=external_reference, - ) - - related_objects.append(ta_object) - collected_object_refs.append(ta_object.id) - entity_mappings["threat-actor"].append(ta_object.id) - - if self.logger: - self.logger.debug(f"Added threat actor: {entity_value}") - - # Process malware - for malware in all_entities.get("malware", []): - entity_id = malware.get("id") - entity_value = malware.get("value") - context = malware.get("context") - - if entity_id and entity_value: - malware_object = self.converter.create_malware( - entity_id, - entity_value, - context, - report_reference=external_reference, - ) - related_objects.append(malware_object) - collected_object_refs.append(malware_object.id) - entity_mappings["malware"].append(malware_object.id) - - if self.logger: - self.logger.debug(f"Added malware: {entity_value}") - - # Process tools - for tool in all_entities.get("tool", []): - entity_id = tool.get("id") - entity_value = tool.get("value") - context = tool.get("context") - - if entity_id and entity_value: - tool_object = self.converter.create_tool( - entity_id, - entity_value, - context, - report_reference=external_reference, - ) - related_objects.append(tool_object) - collected_object_refs.append(tool_object.id) - entity_mappings["tool"].append(tool_object.id) - - if self.logger: - self.logger.debug(f"Added tool: {entity_value}") - - # Process vulnerabilities - for vuln in all_entities.get("vulnerability", []): - entity_id = vuln.get("id") - entity_value = vuln.get("value") - context = vuln.get("context") - - if entity_id and entity_value: - vuln_object = self.converter.create_vulnerability( - entity_id, - entity_value, - context, - report_reference=external_reference, - ) - related_objects.append(vuln_object) - collected_object_refs.append(vuln_object.id) - entity_mappings["vulnerability"].append(vuln_object.id) - - if self.logger: - self.logger.debug(f"Added vulnerability: {entity_value}") - - # Process campaigns - for campaign in all_entities.get("campaign", []): - entity_id = campaign.get("id") - entity_value = campaign.get("value") - context = campaign.get("context") - - if entity_id and entity_value: - campaign_object = self.converter.create_campaign( - entity_id, - entity_value, - context, - report_reference=external_reference, - ) - related_objects.append(campaign_object) - collected_object_refs.append(campaign_object.id) - entity_mappings["campaign"].append(campaign_object.id) - - if self.logger: - self.logger.debug(f"Added campaign: {entity_value}") - - # Process organizations - for org in all_entities.get("organization", []): - entity_id = org.get("id") - entity_value = org.get("value") - context = org.get("context") - - if entity_id and entity_value: - identity = self.converter.create_organization_identity( - entity_id, - entity_value, - context, - report_reference=external_reference, - ) - related_objects.append(identity) - collected_object_refs.append(identity.id) - entity_mappings["identity"].append(identity.id) - + for relationship in relationships: + related_objects.append(relationship) + collected_object_refs.append(relationship.id) if self.logger: self.logger.debug( - f"Added organization identity: {entity_value}" + f"Added relationship for observable {entity_value}" ) - # Process industries - for industry in all_entities.get("industry", []): - entity_id = industry.get("id") - entity_value = industry.get("value") - context = industry.get("context") - - if entity_id and entity_value: - identity = self.converter.create_industry_identity( - entity_id, - entity_value, - context, - report_reference=external_reference, - ) - related_objects.append(identity) - collected_object_refs.append(identity.id) - entity_mappings["identity"].append(identity.id) + # Process threat actors + for threat_actor in all_entities.get("threatactor", []) + all_entities.get( + "threat_actor", [] + ): + self._process_threat_actor( + threat_actor, + related_objects, + collected_object_refs, + entity_mappings, + external_reference, + ) - if self.logger: - self.logger.debug( - f"Added industry identity: {entity_value}" - ) + # Process other entity types + entity_processors = { + "malware": "create_malware", + "tool": "create_tool", + "vulnerability": "create_vulnerability", + "campaign": "create_campaign", + "organization": ("create_organization_identity", "identity"), + "industry": ("create_industry_identity", "identity"), + "sector": ("create_sector_identity", "identity"), + "country": ("create_country_location", "location"), + } - # Process sectors - for sector in all_entities.get("sector", []): - entity_id = sector.get("id") - entity_value = sector.get("value") - context = sector.get("context") - - if entity_id and entity_value: - identity = self.converter.create_sector_identity( - entity_id, - entity_value, - context, - report_reference=external_reference, - ) - related_objects.append(identity) - collected_object_refs.append(identity.id) - entity_mappings["identity"].append(identity.id) + for entity_type, processor in entity_processors.items(): + if isinstance(processor, tuple): + converter_method, mapping_type = processor + else: + converter_method = processor + mapping_type = entity_type + + self._process_entities( + all_entities.get(entity_type, []), + mapping_type, + converter_method, + related_objects, + collected_object_refs, + entity_mappings, + external_reference, + ) - if self.logger: - self.logger.debug(f"Added sector identity: {entity_value}") - - # Process countries - for country in all_entities.get("country", []): - entity_id = country.get("id") - entity_value = country.get("value") - context = country.get("context") - - if entity_id and entity_value: - location = self.converter.create_country_location( - entity_id, - entity_value, - context, - report_reference=external_reference, - ) - related_objects.append(location) - collected_object_refs.append(location.id) - entity_mappings["location"].append(location.id) + # Get detailed content and create report + detailed_content = ( + self.get_member_content(content_id) + if self.catalyst_authenticated + else self.get_member_content(slug) + ) + content = detailed_content.get("content", "") + + report = self.converter.create_report( + content_id=content_id, + title=title, + description=content or summary or description, + published=published, + modified=modified, + object_refs=collected_object_refs, + object_marking_refs=[content_marking.id], + labels=labels if labels else None, + custom_properties=custom_properties, + ) - if self.logger: - self.logger.debug(f"Added country location: {entity_value}") - - # TTP relationship processing - REMOVED - # self._process_ttp_entities( - # all_entities, - # related_objects, - # collected_object_refs, - # entity_mappings, - # external_reference - # ) - detailed_content = ( - self.get_member_content(content_id) - if self.catalyst_authenticated - else self.get_member_content(slug) + if self.logger: + self.logger.info( + f"Created report with {len(related_objects)} related objects" ) - content = detailed_content.get("content", "") - report = self.converter.create_report( - content_id=content_id, - title=title, - description=content or summary or description, - published=published, - modified=modified, - object_refs=collected_object_refs, - object_marking_refs=[content_marking.id], - labels=labels if labels else None, - custom_properties=custom_properties, + self.logger.debug( + f"Completed creation of report {report.id} with {len(report.object_refs)} referenced objects" ) - if self.logger: - self.logger.info( - f"Created report with {len(related_objects)} related objects" - ) + return report, related_objects - except Exception as e: - if self.logger: - self.logger.error( - f"Error creating report from content {content_id}: {str(e)}" - ) - raise - else: + except Exception as e: if self.logger: - self.logger.error(f"Error creating report from content {content_id}") - - if self.logger: - self.logger.debug( - f"Completed creation of report {report.id} with {len(report.object_refs)} referenced objects" - ) - - return report, related_objects + self.logger.error( + f"Error creating report from content {content_id}: {str(e)}" + ) + raise def create_report_from_member_content( self, content: Dict From 0f9f3406121e6a339ba7f5b6efc12d6b85045936 Mon Sep 17 00:00:00 2001 From: "M. Bilgehan Ertan" Date: Fri, 4 Apr 2025 21:41:32 +0200 Subject: [PATCH 2/5] fix: stages on precommit hook --- .pre-commit-config.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 06913f6..461ae29 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,6 +13,7 @@ repos: - id: detect-private-key - id: no-commit-to-branch args: [--branch, main, --branch, master] + stages: [manual, commit-msg] - repo: https://github.com/pycqa/isort rev: 5.12.0 From 64b990c8e0b9c847d8260c44a1459864ff3c4c81 Mon Sep 17 00:00:00 2001 From: "M. Bilgehan Ertan" Date: Fri, 4 Apr 2025 21:42:15 +0200 Subject: [PATCH 3/5] fix: stages on precommit hook --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 461ae29..2ef1c4e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,7 +13,7 @@ repos: - id: detect-private-key - id: no-commit-to-branch args: [--branch, main, --branch, master] - stages: [manual, commit-msg] + stages: [manual, commit-msg, pre-commit, prepare-commit-msg] - repo: https://github.com/pycqa/isort rev: 5.12.0 From 84786628eba55153632a63fbfb7325da22482a23 Mon Sep 17 00:00:00 2001 From: "M. Bilgehan Ertan" Date: Fri, 4 Apr 2025 21:44:40 +0200 Subject: [PATCH 4/5] fix: stages on precommit hook --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2ef1c4e..5975848 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,7 +13,7 @@ repos: - id: detect-private-key - id: no-commit-to-branch args: [--branch, main, --branch, master] - stages: [manual, commit-msg, pre-commit, prepare-commit-msg] + stages: [commit-msg, pre-commit, prepare-commit-msg] - repo: https://github.com/pycqa/isort rev: 5.12.0 From 70d50dd6f5105c84375d5199287dc34883a71220 Mon Sep 17 00:00:00 2001 From: "M. Bilgehan Ertan" Date: Fri, 4 Apr 2025 21:50:19 +0200 Subject: [PATCH 5/5] fix: trailing whitespace --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index f56561c..d35ca9d 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -36,4 +36,4 @@ jobs: - name: Run pytest run: | - pytest . + pytest .