Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
655 changes: 355 additions & 300 deletions src/lexecon/api/server.py

Large diffs are not rendered by default.

284 changes: 158 additions & 126 deletions src/lexecon/audit_export/service.py

Large diffs are not rendered by default.

172 changes: 83 additions & 89 deletions src/lexecon/compliance/eu_ai_act/article_11_technical_docs.py

Large diffs are not rendered by default.

142 changes: 74 additions & 68 deletions src/lexecon/compliance/eu_ai_act/article_12_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,25 @@
"""

import json
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, asdict

from lexecon.ledger.chain import LedgerChain, LedgerEntry


class RetentionClass(Enum):
"""Retention classification per EU AI Act Article 12."""

HIGH_RISK = "high_risk" # 10 years minimum
STANDARD = "standard" # 6 months minimum
GDPR_INTERSECT = "gdpr_intersect" # Subject to data subject rights


class RecordStatus(Enum):
"""Status of records in retention system."""

ACTIVE = "active" # Within retention period
EXPIRING = "expiring" # Approaching retention deadline
LEGAL_HOLD = "legal_hold" # Frozen for investigation
Expand All @@ -39,6 +41,7 @@ class RecordStatus(Enum):
@dataclass
class RetentionPolicy:
"""Retention policy for a class of records."""

classification: RetentionClass
retention_days: int
auto_anonymize: bool = True
Expand All @@ -49,6 +52,7 @@ class RetentionPolicy:
@dataclass
class ComplianceRecord:
"""Article 12 compliant record wrapper."""

record_id: str
original_entry: Dict[str, Any]
retention_class: RetentionClass
Expand Down Expand Up @@ -78,22 +82,22 @@ def __init__(self, ledger: LedgerChain):
retention_days=3650, # 10 years
auto_anonymize=True,
legal_basis="EU AI Act Article 12 - high-risk system monitoring",
data_subject_rights=False # Exception for regulatory compliance
data_subject_rights=False, # Exception for regulatory compliance
),
RetentionClass.STANDARD: RetentionPolicy(
classification=RetentionClass.STANDARD,
retention_days=180, # 6 months
auto_anonymize=True,
legal_basis="Legitimate interest - security monitoring",
data_subject_rights=True
data_subject_rights=True,
),
RetentionClass.GDPR_INTERSECT: RetentionPolicy(
classification=RetentionClass.GDPR_INTERSECT,
retention_days=90, # 90 days default
auto_anonymize=True,
legal_basis="Consent - user data processing",
data_subject_rights=True
)
data_subject_rights=True,
),
}

def classify_entry(self, entry: LedgerEntry) -> RetentionClass:
Expand Down Expand Up @@ -134,8 +138,14 @@ def classify_entry(self, entry: LedgerEntry) -> RetentionClass:
def _contains_personal_data(self, data: Dict[str, Any]) -> bool:
"""Check if data contains personal information."""
personal_indicators = [
"email", "name", "user_id", "ip_address",
"phone", "address", "ssn", "passport"
"email",
"name",
"user_id",
"ip_address",
"phone",
"address",
"ssn",
"passport",
]
data_str = json.dumps(data).lower()
return any(indicator in data_str for indicator in personal_indicators)
Expand All @@ -145,12 +155,13 @@ def wrap_entry(self, entry: LedgerEntry) -> ComplianceRecord:
classification = self.classify_entry(entry)
policy = self.policies[classification]

created_at = datetime.fromisoformat(entry.timestamp.replace('Z', '+00:00'))
created_at = datetime.fromisoformat(entry.timestamp.replace("Z", "+00:00"))
expires_at = created_at + timedelta(days=policy.retention_days)

# Check if under legal hold
legal_holds = [
hold_id for hold_id, hold in self.legal_holds.items()
hold_id
for hold_id, hold in self.legal_holds.items()
if entry.entry_id in hold.get("entry_ids", [])
]

Expand All @@ -163,7 +174,7 @@ def wrap_entry(self, entry: LedgerEntry) -> ComplianceRecord:
created_at=entry.timestamp,
expires_at=expires_at.isoformat(),
status=status,
legal_holds=legal_holds
legal_holds=legal_holds,
)

def get_retention_status(self) -> Dict[str, Any]:
Expand All @@ -181,30 +192,26 @@ def get_retention_status(self) -> Dict[str, Any]:
by_class[record.retention_class] += 1
by_status[record.status] += 1

expires = datetime.fromisoformat(record.expires_at.replace('Z', '+00:00'))
expires = datetime.fromisoformat(record.expires_at.replace("Z", "+00:00"))
if (expires - now).days <= 30:
expiring_soon += 1

return {
"total_records": total,
"by_classification": {
cls.value: count for cls, count in by_class.items()
},
"by_status": {
status.value: count for status, count in by_status.items()
},
"by_classification": {cls.value: count for cls, count in by_class.items()},
"by_status": {status.value: count for status, count in by_status.items()},
"expiring_within_30_days": expiring_soon,
"legal_holds_active": len(self.legal_holds),
"oldest_record": self.ledger.entries[0].timestamp if self.ledger.entries else None,
"newest_record": self.ledger.entries[-1].timestamp if self.ledger.entries else None
"newest_record": self.ledger.entries[-1].timestamp if self.ledger.entries else None,
}

def apply_legal_hold(
self,
hold_id: str,
entry_ids: Optional[List[str]] = None,
reason: str = "",
requester: str = "system"
requester: str = "system",
) -> Dict[str, Any]:
"""
Apply legal hold to records.
Expand All @@ -221,14 +228,10 @@ def apply_legal_hold(
"applied_at": datetime.utcnow().isoformat(),
"requester": requester,
"entry_ids": entry_ids,
"status": "active"
"status": "active",
}

return {
"hold_id": hold_id,
"records_affected": len(entry_ids),
"status": "applied"
}
return {"hold_id": hold_id, "records_affected": len(entry_ids), "status": "applied"}

def release_legal_hold(self, hold_id: str, releaser: str = "system") -> Dict[str, Any]:
"""Release a legal hold."""
Expand All @@ -242,25 +245,21 @@ def release_legal_hold(self, hold_id: str, releaser: str = "system") -> Dict[str

affected_count = len(hold["entry_ids"])

return {
"hold_id": hold_id,
"records_affected": affected_count,
"status": "released"
}
return {"hold_id": hold_id, "records_affected": affected_count, "status": "released"}

def _record_to_dict(self, record: ComplianceRecord) -> Dict[str, Any]:
"""Convert ComplianceRecord to dict with enum values as strings."""
record_dict = asdict(record)
# Convert enums to their string values
record_dict['retention_class'] = record.retention_class.value
record_dict['status'] = record.status.value
record_dict["retention_class"] = record.retention_class.value
record_dict["status"] = record.status.value
return record_dict

def generate_regulatory_package(
self,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
entry_types: Optional[List[str]] = None
entry_types: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
Generate complete package for regulatory requests.
Expand Down Expand Up @@ -297,39 +296,32 @@ def generate_regulatory_package(
package = {
"package_type": "EU_AI_ACT_ARTICLE_12_REGULATORY_RESPONSE",
"generated_at": datetime.utcnow().isoformat(),
"period": {
"start": start_date or "inception",
"end": end_date or "present"
},
"period": {"start": start_date or "inception", "end": end_date or "present"},
"summary": {
"total_records": len(records),
"decisions": len(decisions),
"policy_changes": len(policy_changes),
"decision_outcomes": decision_outcomes,
"retention_status": self.get_retention_status()
"retention_status": self.get_retention_status(),
},
"integrity_verification": {
"ledger_valid": self.ledger.verify_integrity()["valid"],
"chain_intact": self.ledger.verify_integrity()["chain_intact"],
"root_hash": self.ledger.entries[-1].entry_hash if self.ledger.entries else None
"root_hash": self.ledger.entries[-1].entry_hash if self.ledger.entries else None,
},
"records": [self._record_to_dict(r) for r in records],
"compliance_attestation": {
"article_12_compliance": True,
"retention_policies_applied": True,
"audit_trail_integrity": True,
"legal_holds_documented": len(self.legal_holds) > 0,
"generated_by": "Lexecon Compliance System"
}
"generated_by": "Lexecon Compliance System",
},
}

return package

def export_for_regulator(
self,
format: str = "json",
**kwargs
) -> str:
def export_for_regulator(self, format: str = "json", **kwargs) -> str:
"""
Export regulatory package in requested format.

Expand Down Expand Up @@ -401,33 +393,42 @@ def _format_csv_package(self, package: Dict[str, Any]) -> str:
import io

output = io.StringIO()
if not package['records']:
if not package["records"]:
return "No records found"

# CSV headers
fieldnames = [
"record_id", "event_type", "timestamp", "retention_class",
"expires_at", "status", "decision", "actor", "action"
"record_id",
"event_type",
"timestamp",
"retention_class",
"expires_at",
"status",
"decision",
"actor",
"action",
]

writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()

for record in package['records']:
entry = record['original_entry']
data = entry.get('data', {})

writer.writerow({
"record_id": record['record_id'],
"event_type": entry['event_type'],
"timestamp": record['created_at'],
"retention_class": record['retention_class'], # Already converted to string
"expires_at": record['expires_at'],
"status": record['status'], # Already converted to string
"decision": data.get('decision', ''),
"actor": data.get('actor', ''),
"action": data.get('action', '')
})
for record in package["records"]:
entry = record["original_entry"]
data = entry.get("data", {})

writer.writerow(
{
"record_id": record["record_id"],
"event_type": entry["event_type"],
"timestamp": record["created_at"],
"retention_class": record["retention_class"], # Already converted to string
"expires_at": record["expires_at"],
"status": record["status"], # Already converted to string
"decision": data.get("decision", ""),
"actor": data.get("actor", ""),
"action": data.get("action", ""),
}
)

return output.getvalue()

Expand All @@ -453,7 +454,7 @@ def anonymize_record(self, entry_id: str) -> Dict[str, Any]:
return {
"error": "Cannot anonymize - under legal hold",
"entry_id": entry_id,
"legal_holds": record.legal_holds
"legal_holds": record.legal_holds,
}

# Anonymize personal data fields
Expand All @@ -464,7 +465,7 @@ def anonymize_record(self, entry_id: str) -> Dict[str, Any]:
"status": "anonymized",
"anonymized_at": datetime.utcnow().isoformat(),
"original_hash": entry.entry_hash,
"note": "Personal data removed, decision metadata retained for compliance"
"note": "Personal data removed, decision metadata retained for compliance",
}

def _anonymize_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
Expand All @@ -473,8 +474,13 @@ def _anonymize_data(self, data: Dict[str, Any]) -> Dict[str, Any]:

# Fields to anonymize
personal_fields = [
"actor", "user_intent", "request_id",
"email", "name", "user_id", "ip_address"
"actor",
"user_intent",
"request_id",
"email",
"name",
"user_id",
"ip_address",
]

for field in personal_fields:
Expand Down
Loading