diff --git a/SimpleAgent/.env.example b/SimpleAgent/.env.example index e637a4f..cc7b9a7 100644 --- a/SimpleAgent/.env.example +++ b/SimpleAgent/.env.example @@ -48,8 +48,25 @@ DEBUG_MODE=False # Output directory for all file operations OUTPUT_DIR=output +# Input directory for user files that the agent can access +INPUT_DIR=input + # Memory file location (relative to OUTPUT_DIR) MEMORY_FILE=memory.json +# ============================================================================= +# INPUT FILE SYSTEM SETTINGS +# ============================================================================= + +# Maximum input file size in bytes (default: 10MB) +MAX_INPUT_FILE_SIZE=10485760 + +# Allowed input file extensions (comma-separated) +ALLOWED_INPUT_EXTENSIONS=.txt,.json,.csv,.md,.py,.js,.html,.css,.xml,.yaml,.yml + +# ============================================================================= +# EXTERNAL INTEGRATIONS +# ============================================================================= + # GitHub Token - Optional, for GitHub operations -# GITHUB_TOKEN= \ No newline at end of file +# GITHUB_TOKEN= diff --git a/SimpleAgent/commands/file_ops/access_input_file/__init__.py b/SimpleAgent/commands/file_ops/access_input_file/__init__.py new file mode 100644 index 0000000..676a123 --- /dev/null +++ b/SimpleAgent/commands/file_ops/access_input_file/__init__.py @@ -0,0 +1,189 @@ +""" +Access Input File Command + +This command allows the agent to securely access files from the input directory. +It provides read-only access to user-placed files with proper security validation. +""" + +import json +from typing import Dict, Any, List + +from core.execution.tool_manager import register_command +from core.utils.input_manager import InputManager + + +def access_input_file(filename: str = "", operation: str = "read", search_term: str = None, encoding: str = "utf-8") -> str: + """ + Access and read input files securely. + + Args: + filename: Name of the file to access + operation: Operation to perform ("read", "info", "list", "search", "json", "csv") + search_term: Term to search for (only used with "search" operation) + encoding: Text encoding to use for reading (default: utf-8) + + Returns: + String containing the requested information or file content + """ + input_manager = InputManager() + + try: + if operation == "list": + # List all available input files + files = input_manager.list_input_files() + if not files: + return "No input files found in the input directory." + + result = "Available input files:\n" + for file_info in files: + size_kb = file_info.size / 1024 + result += f"- {file_info.name} ({size_kb:.1f} KB, {file_info.extension}, modified: {file_info.modified_time.strftime('%Y-%m-%d %H:%M')})\n" + + return result.strip() + + elif operation == "info": + # Get detailed information about a specific file + if not filename: + return "Error: filename is required for 'info' operation" + + file_info = input_manager.get_input_file_info(filename) + size_kb = file_info.size / 1024 + + return f"""File Information for '{filename}': +- Size: {file_info.size} bytes ({size_kb:.1f} KB) +- Type: {file_info.extension} ({file_info.mime_type}) +- Modified: {file_info.modified_time.strftime('%Y-%m-%d %H:%M:%S')} +- Text file: {'Yes' if file_info.is_text else 'No'} +- Encoding: {file_info.encoding or 'Unknown'}""" + + elif operation == "read": + # Read the full content of a file + if not filename: + return "Error: filename is required for 'read' operation" + + content = input_manager.read_input_file(filename, encoding) + + # Add a header with file info + file_info = input_manager.get_input_file_info(filename) + size_kb = file_info.size / 1024 + + header = f"=== Content of '{filename}' ({size_kb:.1f} KB) ===\n" + return header + content + + elif operation == "search": + # Search for a term within a file + if not filename: + return "Error: filename is required for 'search' operation" + if not search_term: + return "Error: search_term is required for 'search' operation" + + results = input_manager.search_file_content(filename, search_term, case_sensitive=False) + + if not results: + return f"No matches found for '{search_term}' in '{filename}'" + + result = f"Found {len(results)} matches for '{search_term}' in '{filename}':\n\n" + for line_num, line_content in results: + result += f"Line {line_num}: {line_content.strip()}\n" + + return result.strip() + + elif operation == "json": + # Read and parse a JSON file + if not filename: + return "Error: filename is required for 'json' operation" + + data = input_manager.read_json_file(filename) + + # Format the JSON nicely + formatted_json = json.dumps(data, indent=2, ensure_ascii=False) + + header = f"=== JSON Content of '{filename}' ===\n" + return header + formatted_json + + elif operation == "csv": + # Read a CSV file and return lines + if not filename: + return "Error: filename is required for 'csv' operation" + + lines = input_manager.read_csv_lines(filename) + + if not lines: + return f"CSV file '{filename}' is empty or contains no valid lines" + + result = f"=== CSV Content of '{filename}' ({len(lines)} lines) ===\n" + for i, line in enumerate(lines[:50], 1): # Show first 50 lines + result += f"{i}: {line}\n" + + if len(lines) > 50: + result += f"\n... and {len(lines) - 50} more lines" + + return result.strip() + + elif operation == "summary": + # Get a summary of all input files + summary = input_manager.get_file_summary() + + result = f"Input Directory Summary:\n" + result += f"- Total files: {summary['total_files']}\n" + result += f"- Total size: {summary['total_size'] / 1024:.1f} KB\n" + + if summary['file_types']: + result += f"- File types: {', '.join(f'{ext}({count})' for ext, count in summary['file_types'].items())}\n" + + if summary['files']: + result += f"\nFiles:\n" + for file_data in summary['files']: + result += f" - {file_data['name']} ({file_data['size']/1024:.1f} KB)\n" + + return result.strip() + + else: + return f"Error: Unknown operation '{operation}'. Available operations: read, info, list, search, json, csv, summary" + + except FileNotFoundError as e: + return f"Error: File not found - {str(e)}" + except PermissionError as e: + return f"Error: Access denied - {str(e)}" + except ValueError as e: + return f"Error: Invalid file or operation - {str(e)}" + except Exception as e: + return f"Error: {str(e)}" + + +# Command schema for the tool manager +ACCESS_INPUT_FILE_SCHEMA = { + "type": "function", + "function": { + "name": "access_input_file", + "description": "Access and read files from the input directory. Supports various operations like reading content, getting file info, listing files, searching content, and parsing JSON/CSV files.", + "parameters": { + "type": "object", + "properties": { + "filename": { + "type": "string", + "description": "Name of the file to access (required for most operations except 'list' and 'summary')" + }, + "operation": { + "type": "string", + "enum": ["read", "info", "list", "search", "json", "csv", "summary"], + "description": "Operation to perform: 'read' (read full content), 'info' (get file details), 'list' (list all files), 'search' (search for text), 'json' (parse JSON), 'csv' (read CSV lines), 'summary' (get directory summary)", + "default": "read" + }, + "search_term": { + "type": "string", + "description": "Term to search for (only used with 'search' operation)" + }, + "encoding": { + "type": "string", + "description": "Text encoding to use for reading files", + "default": "utf-8" + } + }, + "required": [] + } + } +} + +# Register the command +register_command("access_input_file", access_input_file, ACCESS_INPUT_FILE_SCHEMA) diff --git a/SimpleAgent/core/__init__.py b/SimpleAgent/core/__init__.py index 39d8c76..652535b 100644 --- a/SimpleAgent/core/__init__.py +++ b/SimpleAgent/core/__init__.py @@ -11,6 +11,7 @@ from core.conversation.memory import MemoryManager from core.agent.run_manager import RunManager from core.utils.security import get_secure_path +from core.utils.input_manager import InputManager __all__ = [ "SimpleAgent", @@ -19,5 +20,6 @@ "ExecutionManager", "MemoryManager", "RunManager", - "get_secure_path" -] \ No newline at end of file + "get_secure_path", + "InputManager" +] diff --git a/SimpleAgent/core/agent/agent.py b/SimpleAgent/core/agent/agent.py index e0c0d0c..ba3bd7a 100644 --- a/SimpleAgent/core/agent/agent.py +++ b/SimpleAgent/core/agent/agent.py @@ -20,6 +20,7 @@ from core.agent.run_manager import RunManager from core.utils.security import get_secure_path from core.utils.config import DEFAULT_MODEL, OUTPUT_DIR +from core.utils.input_manager import InputManager class SimpleAgent: @@ -47,6 +48,9 @@ def __init__(self, model: str = None, output_dir: str = None): self.execution_manager = self.run_manager.execution_manager self.memory_manager = self.run_manager.memory_manager + # Initialize input manager for accessing input files + self.input_manager = InputManager() + # Initialize memory self.memory = self.memory_manager.get_memory() @@ -134,4 +138,58 @@ def get_secure_path(self, file_path: str) -> str: Returns: Modified file path within output directory """ - return get_secure_path(file_path, self.output_dir) \ No newline at end of file + return get_secure_path(file_path, self.output_dir) + + def list_input_files(self) -> List[str]: + """ + List all available input files. + + Returns: + List of input file names + """ + files = self.input_manager.list_input_files() + return [f.name for f in files] + + def read_input_file(self, filename: str) -> str: + """ + Read the contents of an input file. + + Args: + filename: Name of the file to read + + Returns: + File contents as string + """ + return self.input_manager.read_input_file(filename) + + def get_input_file_info(self, filename: str) -> Dict[str, Any]: + """ + Get information about an input file. + + Args: + filename: Name of the file + + Returns: + Dictionary with file information + """ + file_info = self.input_manager.get_input_file_info(filename) + return { + 'name': file_info.name, + 'size': file_info.size, + 'extension': file_info.extension, + 'mime_type': file_info.mime_type, + 'modified_time': file_info.modified_time.isoformat(), + 'is_text': file_info.is_text + } + + def input_file_exists(self, filename: str) -> bool: + """ + Check if an input file exists. + + Args: + filename: Name of the file to check + + Returns: + True if file exists and is accessible, False otherwise + """ + return self.input_manager.file_exists(filename) diff --git a/SimpleAgent/core/agent/run_manager.py b/SimpleAgent/core/agent/run_manager.py index 788c37b..f116bf1 100644 --- a/SimpleAgent/core/agent/run_manager.py +++ b/SimpleAgent/core/agent/run_manager.py @@ -38,9 +38,9 @@ def __init__(self, model: str, output_dir: str = OUTPUT_DIR): output_dir: The output directory for file operations """ self.output_dir = output_dir - self.conversation_manager = ConversationManager() - self.execution_manager = ExecutionManager(model=model, output_dir=output_dir) self.memory_manager = MemoryManager() + self.conversation_manager = ConversationManager(memory_manager=self.memory_manager) + self.execution_manager = ExecutionManager(model=model, output_dir=output_dir) self.summarizer = ChangeSummarizer() self.loop_detector = LoopDetector(window_size=5, similarity_threshold=0.7) @@ -73,6 +73,9 @@ def run(self, user_instruction: str, max_steps: int = 10, auto_continue: int = 0 print(f"šŸ› ļø Requires Tools: {task_goal.requires_tools}") if task_goal.expected_deliverables: print(f"šŸ“¦ Expected Deliverables: {', '.join(task_goal.expected_deliverables)}") + + # Set task objective for context compression + self.conversation_manager.set_task_objective(task_goal.primary_objective) # Get current date and time information for the system message current_datetime = time.strftime("%Y-%m-%d %H:%M:%S") @@ -173,6 +176,10 @@ def run(self, user_instruction: str, max_steps: int = 10, auto_continue: int = 0 self.conversation_manager.update_system_message(system_content) + # Check if context compression is needed before processing + if self.conversation_manager.compress_if_needed(): + print("šŸ”„ Context compressed - continuing with optimized history") + print(f"\n--- Step {step}/{max_steps} ---") try: @@ -373,6 +380,14 @@ def run(self, user_instruction: str, max_steps: int = 10, auto_continue: int = 0 self.execution_manager.stop_requested = True print("\nāœ… Agent execution interrupted by user") + # Show context compression statistics if any compression occurred + compression_stats = self.conversation_manager.get_compression_stats() + if compression_stats["total_compressions"] > 0: + print(f"\nšŸ—œļø Context Compression Statistics:") + print(f" šŸ“Š Total compressions: {compression_stats['total_compressions']}") + print(f" šŸ’¾ Total tokens saved: {compression_stats['total_tokens_saved']}") + print(f" ⚔ Enabled long-running task capability") + # Generate a final summary of all changes if changes_made: final_summary = self.summarizer.summarize_changes(changes_made) diff --git a/SimpleAgent/core/config.py b/SimpleAgent/core/config.py index e5ba67d..ceed03c 100644 --- a/SimpleAgent/core/config.py +++ b/SimpleAgent/core/config.py @@ -38,6 +38,26 @@ if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) +# Input directory - All input files should be placed here for agent access +# Can be customized through environment variable +# Use absolute path to ensure it works regardless of current working directory +_input_dir_relative = os.getenv("INPUT_DIR", "input") +if os.path.isabs(_input_dir_relative): + INPUT_DIR = _input_dir_relative +else: + # Get the directory where this config file is located (core/) + _config_dir = os.path.dirname(os.path.abspath(__file__)) + # Go up one level to get to the SimpleAgent root directory + _project_root = os.path.dirname(_config_dir) + INPUT_DIR = os.path.join(_project_root, _input_dir_relative) + +if not os.path.exists(INPUT_DIR): + os.makedirs(INPUT_DIR) + +# Input file settings +MAX_INPUT_FILE_SIZE = int(os.getenv("MAX_INPUT_FILE_SIZE", str(10 * 1024 * 1024))) # 10MB default +ALLOWED_INPUT_EXTENSIONS = os.getenv("ALLOWED_INPUT_EXTENSIONS", ".txt,.json,.csv,.md,.py,.js,.html,.css,.xml,.yaml,.yml").split(",") + # Memory settings MEMORY_FILE = os.path.join(OUTPUT_DIR, os.getenv("MEMORY_FILE", "memory.json")) @@ -71,4 +91,4 @@ def create_client(): if not OPENAI_API_KEY: raise ValueError("OPENAI_API_KEY must be set when using OpenAI provider") from openai import OpenAI - return OpenAI(api_key=OPENAI_API_KEY) \ No newline at end of file + return OpenAI(api_key=OPENAI_API_KEY) diff --git a/SimpleAgent/core/conversation/context_compressor.py b/SimpleAgent/core/conversation/context_compressor.py new file mode 100644 index 0000000..840bd72 --- /dev/null +++ b/SimpleAgent/core/conversation/context_compressor.py @@ -0,0 +1,264 @@ +""" +Context Compressor Module + +This module implements intelligent context compression for long-running agent tasks + +The compressor maintains key decisions, events, and outcomes while reducing token usage +for conversations that exceed context window limits. +""" + +import json +import time +from typing import List, Dict, Any, Optional, Tuple +from dataclasses import dataclass, asdict + +from core.utils.config import create_client, CONTEXT_COMPRESSION_MODEL + + +@dataclass +class CompressedContext: + """Represents compressed context information""" + key_decisions: List[str] + critical_events: List[str] + tool_outcomes: List[str] + current_state: str + unresolved_issues: List[str] + compression_timestamp: float + original_message_count: int + compressed_message_count: int + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for JSON serialization.""" + return asdict(self) + + +class ContextCompressor: + """ + Intelligently compresses conversation history to maintain key information + while staying within token limits for long-running tasks. + """ + + def __init__(self, model: str = None): + self.model = model or CONTEXT_COMPRESSION_MODEL + self.client = create_client() + self.compression_threshold = 50 # Messages before compression kicks in + self.keep_recent_count = 10 # Always keep this many recent messages + + def should_compress(self, conversation_history: List[Dict[str, Any]]) -> bool: + """ + Determine if the conversation history should be compressed. + + Args: + conversation_history: The full conversation history + + Returns: + True if compression is needed + """ + return len(conversation_history) > self.compression_threshold + + def compress_context(self, conversation_history: List[Dict[str, Any]], + task_objective: str = None) -> Tuple[List[Dict[str, Any]], CompressedContext]: + """ + Compress conversation history while preserving key decisions and context. + + Args: + conversation_history: The full conversation history + task_objective: The main task objective for context + + Returns: + (compressed_history, compression_metadata) + """ + if not self.should_compress(conversation_history): + return conversation_history, None + + # Always preserve system message and recent messages + system_messages = [msg for msg in conversation_history if msg.get("role") == "system"] + recent_messages = conversation_history[-self.keep_recent_count:] + + # Messages to compress (middle portion) + messages_to_compress = conversation_history[len(system_messages):-self.keep_recent_count] + + # Clean up any orphaned tool messages in recent messages to avoid API errors + recent_messages = self._clean_orphaned_tool_messages(recent_messages) + + if not messages_to_compress: + return conversation_history, None + + # Create compression prompt + compression_prompt = self._create_compression_prompt(messages_to_compress, task_objective) + + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": compression_prompt}], + temperature=0.1, + max_tokens=1000 + ) + + compression_result = json.loads(response.choices[0].message.content) + + # Create compressed context object + compressed_context = CompressedContext( + key_decisions=compression_result.get("key_decisions", []), + critical_events=compression_result.get("critical_events", []), + tool_outcomes=compression_result.get("tool_outcomes", []), + current_state=compression_result.get("current_state", ""), + unresolved_issues=compression_result.get("unresolved_issues", []), + compression_timestamp=time.time(), + original_message_count=len(messages_to_compress), + compressed_message_count=1 # Will be replaced by summary message + ) + + # Create a single summary message to replace the compressed portion + summary_content = self._format_compression_summary(compressed_context) + summary_message = { + "role": "system", + "content": summary_content, + "metadata": {"type": "compressed_context", "timestamp": compressed_context.compression_timestamp} + } + + # Build new conversation history and ensure it's clean + compressed_history = system_messages + [summary_message] + recent_messages + compressed_history = self._clean_orphaned_tool_messages(compressed_history) + + return compressed_history, compressed_context + + except Exception as e: + print(f"āš ļø Context compression failed: {e}") + # Fallback: just keep system and recent messages + return system_messages + recent_messages, None + + def _create_compression_prompt(self, messages: List[Dict[str, Any]], task_objective: str = None) -> str: + """Create the prompt for context compression.""" + + # Convert messages to a readable format + messages_text = "" + for i, msg in enumerate(messages): + role = msg.get("role", "unknown") + content = msg.get("content", "") + + # Truncate very long messages + if len(content) > 1000: + content = content[:1000] + "... [truncated]" + + messages_text += f"\n[{i+1}] {role.upper()}: {content}\n" + + # Include tool calls and results if present + if "tool_calls" in msg: + for tool_call in msg["tool_calls"]: + function_name = tool_call.get("function", {}).get("name", "unknown") + messages_text += f" → Called tool: {function_name}\n" + + objective_context = f"\nTask Objective: {task_objective}\n" if task_objective else "" + + return f"""You are compressing conversation history for a long-running AI agent task. +Extract and preserve ONLY the most critical information that future decisions depend on. + +{objective_context} +Conversation History to Compress: +{messages_text} + +Return a JSON object with these fields: +{{ + "key_decisions": ["Decision 1", "Decision 2", ...], + "critical_events": ["Event 1", "Event 2", ...], + "tool_outcomes": ["Tool result 1", "Tool result 2", ...], + "current_state": "Brief description of current progress/state", + "unresolved_issues": ["Issue 1", "Issue 2", ...] +}} + +Focus on: +- Decisions that affect future actions +- Successful tool operations and their results +- Errors or failures that need to be remembered +- Current progress towards the objective +- Any constraints or requirements discovered + +Ignore: +- Conversational filler +- Detailed explanations that don't affect decisions +- Redundant information +- Step-by-step reasoning (keep only conclusions)""" + + def _format_compression_summary(self, compressed_context: CompressedContext) -> str: + """Format the compressed context into a readable summary.""" + + summary = f"""=== COMPRESSED CONTEXT SUMMARY === +(Compressed {compressed_context.original_message_count} messages at {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(compressed_context.compression_timestamp))}) + +CURRENT STATE: {compressed_context.current_state} + +KEY DECISIONS MADE:""" + + for decision in compressed_context.key_decisions: + summary += f"\n• {decision}" + + summary += "\n\nCRITICAL EVENTS:" + for event in compressed_context.critical_events: + summary += f"\n• {event}" + + summary += "\n\nTOOL OUTCOMES:" + for outcome in compressed_context.tool_outcomes: + summary += f"\n• {outcome}" + + if compressed_context.unresolved_issues: + summary += "\n\nUNRESOLVED ISSUES:" + for issue in compressed_context.unresolved_issues: + summary += f"\n• {issue}" + + summary += "\n=== END COMPRESSED CONTEXT ===\n" + + return summary + + def _clean_orphaned_tool_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Remove tool messages that don't have corresponding tool_calls to avoid API errors. + + Args: + messages: List of messages to clean + + Returns: + Cleaned list of messages + """ + cleaned_messages = [] + tool_call_ids = set() + + # First pass: collect all tool_call_ids + for msg in messages: + if msg.get("role") == "assistant" and "tool_calls" in msg: + for tool_call in msg["tool_calls"]: + tool_call_ids.add(tool_call.get("id")) + + # Second pass: only keep tool messages that have corresponding tool_calls + for msg in messages: + if msg.get("role") == "tool": + tool_call_id = msg.get("tool_call_id") + if tool_call_id in tool_call_ids: + cleaned_messages.append(msg) + # Skip orphaned tool messages + else: + cleaned_messages.append(msg) + + return cleaned_messages + + def estimate_token_savings(self, original_history: List[Dict[str, Any]], + compressed_history: List[Dict[str, Any]]) -> Dict[str, int]: + """ + Estimate token savings from compression. + + Returns: + Dictionary with token estimates + """ + def estimate_tokens(text: str) -> int: + # Rough estimation: ~4 characters per token + return len(str(text)) // 4 + + original_tokens = sum(estimate_tokens(msg.get("content", "")) for msg in original_history) + compressed_tokens = sum(estimate_tokens(msg.get("content", "")) for msg in compressed_history) + + return { + "original_tokens": original_tokens, + "compressed_tokens": compressed_tokens, + "tokens_saved": original_tokens - compressed_tokens, + "compression_ratio": compressed_tokens / original_tokens if original_tokens > 0 else 1.0 + } \ No newline at end of file diff --git a/SimpleAgent/core/conversation/conversation.py b/SimpleAgent/core/conversation/conversation.py index 76fb7c6..a478d73 100644 --- a/SimpleAgent/core/conversation/conversation.py +++ b/SimpleAgent/core/conversation/conversation.py @@ -4,17 +4,22 @@ This module handles the conversation history and message management for SimpleAgent. """ -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Optional, Tuple +from core.conversation.context_compressor import ContextCompressor, CompressedContext class ConversationManager: """ - Manages the conversation history for the SimpleAgent. + Manages the conversation history for the SimpleAgent with intelligent context compression. """ - def __init__(self): + def __init__(self, memory_manager=None): """Initialize the conversation manager.""" self.conversation_history = [] + self.context_compressor = ContextCompressor() + self.compression_history = [] # Track compression events + self.task_objective = None # Set by the agent to help with compression + self.memory_manager = memory_manager # Optional memory manager for persistence def add_message(self, role: str, content: str, **kwargs) -> None: """ @@ -52,4 +57,86 @@ def clear(self) -> None: def get_history(self) -> List[Dict[str, Any]]: """Get the current conversation history.""" - return self.conversation_history \ No newline at end of file + return self.conversation_history + + def set_task_objective(self, objective: str) -> None: + """ + Set the main task objective for better context compression. + + Args: + objective: The primary objective of the current task + """ + self.task_objective = objective + + def compress_if_needed(self) -> bool: + """ + Compress conversation history if it's getting too long. + + Returns: + True if compression was performed + """ + if not self.context_compressor.should_compress(self.conversation_history): + return False + + print(f"šŸ—œļø Compressing conversation history ({len(self.conversation_history)} messages)") + + compressed_history, compression_metadata = self.context_compressor.compress_context( + self.conversation_history, + self.task_objective + ) + + if compressed_history and compression_metadata: + # Calculate savings + token_savings = self.context_compressor.estimate_token_savings( + self.conversation_history, + compressed_history + ) + + # Update conversation history + old_length = len(self.conversation_history) + self.conversation_history = compressed_history + new_length = len(self.conversation_history) + + # Track compression event + compression_event = { + "timestamp": compression_metadata.compression_timestamp, + "original_messages": old_length, + "compressed_messages": new_length, + "token_savings": token_savings, + "metadata": compression_metadata.to_dict() # Convert to dict for JSON serialization + } + self.compression_history.append(compression_event) + + # Persist to memory if available + if self.memory_manager: + self.memory_manager.add_context_compression(compression_event) + + print(f"āœ… Compressed {old_length} → {new_length} messages") + print(f"šŸ’¾ Estimated token savings: {token_savings['tokens_saved']} tokens ({token_savings['compression_ratio']:.1%} of original)") + + return True + else: + print("āš ļø Context compression failed, continuing with full history") + return False + + def get_compression_stats(self) -> Dict[str, Any]: + """ + Get statistics about context compression usage. + + Returns: + Dictionary with compression statistics + """ + if not self.compression_history: + return {"total_compressions": 0, "total_tokens_saved": 0} + + total_tokens_saved = sum( + comp["token_savings"]["tokens_saved"] + for comp in self.compression_history + ) + + return { + "total_compressions": len(self.compression_history), + "total_tokens_saved": total_tokens_saved, + "latest_compression": self.compression_history[-1]["timestamp"] if self.compression_history else None, + "compression_events": self.compression_history + } \ No newline at end of file diff --git a/SimpleAgent/core/conversation/memory.py b/SimpleAgent/core/conversation/memory.py index f7940a9..793bb54 100644 --- a/SimpleAgent/core/conversation/memory.py +++ b/SimpleAgent/core/conversation/memory.py @@ -42,9 +42,9 @@ def _load_memory(self) -> Dict[str, Any]: return memory except (json.JSONDecodeError, IOError) as e: print(f"Error loading memory from {self.memory_file}: {e}") - return {"conversations": [], "files_created": [], "files_modified": []} + return {"conversations": [], "files_created": [], "files_modified": [], "context_compressions": []} else: - return {"conversations": [], "files_created": [], "files_modified": []} + return {"conversations": [], "files_created": [], "files_modified": [], "context_compressions": []} def save_memory(self) -> None: """Save the agent's memory to the memory file.""" @@ -87,6 +87,17 @@ def add_file_modified(self, file_path: str) -> None: if file_path not in self.memory["files_modified"]: self.memory["files_modified"].append(file_path) + def add_context_compression(self, compression_event: Dict[str, Any]) -> None: + """ + Add a context compression event to the memory. + + Args: + compression_event: The compression event data + """ + if "context_compressions" not in self.memory: + self.memory["context_compressions"] = [] + self.memory["context_compressions"].append(compression_event) + def get_memory(self) -> Dict[str, Any]: """ Get the current memory. diff --git a/SimpleAgent/core/utils/config.py b/SimpleAgent/core/utils/config.py index e5ba67d..4167d23 100644 --- a/SimpleAgent/core/utils/config.py +++ b/SimpleAgent/core/utils/config.py @@ -24,6 +24,7 @@ DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", "gpt-4o") # Main agent (fast + powerful) METACOGNITION_MODEL = os.getenv("METACOGNITION_MODEL", "gpt-4") # Reflection/analysis (thoughtful) SUMMARIZER_MODEL = os.getenv("SUMMARIZER_MODEL", "gpt-3.5-turbo") # Summaries (fast + cheap) +CONTEXT_COMPRESSION_MODEL = os.getenv("CONTEXT_COMPRESSION_MODEL", "gpt-3.5-turbo") # Context compression (efficient) # For LM-Studio, we might want to use the same model for both if API_PROVIDER == "lmstudio": @@ -31,6 +32,7 @@ DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", "deepseek-r1-distill-llama-8b") METACOGNITION_MODEL = os.getenv("METACOGNITION_MODEL", "deepseek-r1-distill-llama-8b") SUMMARIZER_MODEL = os.getenv("SUMMARIZER_MODEL", "deepseek-r1-distill-llama-8b") + CONTEXT_COMPRESSION_MODEL = os.getenv("CONTEXT_COMPRESSION_MODEL", "deepseek-r1-distill-llama-8b") # Output directory - All file operations MUST happen within this directory # Can be customized through environment variable @@ -38,6 +40,26 @@ if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) +# Input directory - All input files should be placed here for agent access +# Can be customized through environment variable +# Use absolute path to ensure it works regardless of current working directory +_input_dir_relative = os.getenv("INPUT_DIR", "input") +if os.path.isabs(_input_dir_relative): + INPUT_DIR = _input_dir_relative +else: + # Get the directory where this config file is located (core/utils/) + _config_dir = os.path.dirname(os.path.abspath(__file__)) + # Go up two levels to get to the SimpleAgent root directory + _project_root = os.path.dirname(os.path.dirname(_config_dir)) + INPUT_DIR = os.path.join(_project_root, _input_dir_relative) + +if not os.path.exists(INPUT_DIR): + os.makedirs(INPUT_DIR) + +# Input file settings +MAX_INPUT_FILE_SIZE = int(os.getenv("MAX_INPUT_FILE_SIZE", str(10 * 1024 * 1024))) # 10MB default +ALLOWED_INPUT_EXTENSIONS = os.getenv("ALLOWED_INPUT_EXTENSIONS", ".txt,.json,.csv,.md,.py,.js,.html,.css,.xml,.yaml,.yml").split(",") + # Memory settings MEMORY_FILE = os.path.join(OUTPUT_DIR, os.getenv("MEMORY_FILE", "memory.json")) @@ -71,4 +93,4 @@ def create_client(): if not OPENAI_API_KEY: raise ValueError("OPENAI_API_KEY must be set when using OpenAI provider") from openai import OpenAI - return OpenAI(api_key=OPENAI_API_KEY) \ No newline at end of file + return OpenAI(api_key=OPENAI_API_KEY) diff --git a/SimpleAgent/core/utils/input_manager.py b/SimpleAgent/core/utils/input_manager.py new file mode 100644 index 0000000..d6f1bf7 --- /dev/null +++ b/SimpleAgent/core/utils/input_manager.py @@ -0,0 +1,297 @@ +""" +Input Manager Module + +This module provides secure input file management for SimpleAgent. +It handles reading, listing, and validating input files while maintaining security. + +Security Notes: +- All input file operations are restricted to the input directory +- File size and extension validation prevents malicious files +- Read-only access ensures input files cannot be modified +""" + +import os +import json +import mimetypes +from typing import List, Dict, Any, Optional, Tuple +from dataclasses import dataclass +from datetime import datetime + +from core.utils.config import INPUT_DIR, MAX_INPUT_FILE_SIZE, ALLOWED_INPUT_EXTENSIONS +from core.utils.security import get_secure_input_path, validate_input_file + + +@dataclass +class InputFileInfo: + """Information about an input file.""" + name: str + path: str + size: int + extension: str + mime_type: str + modified_time: datetime + is_text: bool + encoding: Optional[str] = None + + +class InputManager: + """ + Manages secure access to input files for SimpleAgent. + Provides methods to list, read, and validate input files. + """ + + def __init__(self): + self.input_dir = INPUT_DIR + self._ensure_input_dir_exists() + + def _ensure_input_dir_exists(self): + """Ensure the input directory exists.""" + if not os.path.exists(self.input_dir): + os.makedirs(self.input_dir) + + def list_input_files(self) -> List[InputFileInfo]: + """ + List all valid input files in the input directory. + + Returns: + List of InputFileInfo objects for valid files + """ + files = [] + + if not os.path.exists(self.input_dir): + return files + + try: + for filename in os.listdir(self.input_dir): + file_path = os.path.join(self.input_dir, filename) + + # Skip directories and hidden files + if not os.path.isfile(file_path) or filename.startswith('.'): + continue + + # Validate the file + if validate_input_file(filename): + try: + file_info = self._get_file_info(filename) + files.append(file_info) + except Exception: + # Skip files that can't be processed + continue + + except Exception: + # If we can't list the directory, return empty list + pass + + return sorted(files, key=lambda f: f.name.lower()) + + def _get_file_info(self, filename: str) -> InputFileInfo: + """ + Get detailed information about a file. + + Args: + filename: Name of the file + + Returns: + InputFileInfo object with file details + """ + secure_path = get_secure_input_path(filename) + stat = os.stat(secure_path) + + # Get file extension and MIME type + extension = os.path.splitext(filename)[1].lower() + mime_type, encoding = mimetypes.guess_type(filename) + + # Determine if it's a text file + is_text = ( + mime_type and mime_type.startswith('text/') or + extension in ['.txt', '.md', '.json', '.csv', '.py', '.js', '.html', '.css', '.xml', '.yaml', '.yml'] + ) + + return InputFileInfo( + name=filename, + path=secure_path, + size=stat.st_size, + extension=extension, + mime_type=mime_type or 'application/octet-stream', + modified_time=datetime.fromtimestamp(stat.st_mtime), + is_text=is_text, + encoding=encoding + ) + + def read_input_file(self, filename: str, encoding: str = 'utf-8') -> str: + """ + Securely read the contents of an input file. + + Args: + filename: Name of the file to read + encoding: Text encoding to use (default: utf-8) + + Returns: + File contents as string + + Raises: + FileNotFoundError: If file doesn't exist + PermissionError: If file access is not allowed + ValueError: If file is invalid or too large + UnicodeDecodeError: If file can't be decoded with specified encoding + """ + secure_path = get_secure_input_path(filename) + + try: + with open(secure_path, 'r', encoding=encoding) as f: + content = f.read() + return content + except UnicodeDecodeError as e: + # Try with different encodings + for fallback_encoding in ['latin-1', 'cp1252', 'iso-8859-1']: + try: + with open(secure_path, 'r', encoding=fallback_encoding) as f: + content = f.read() + return content + except UnicodeDecodeError: + continue + # If all encodings fail, raise the original error + raise e + + def read_input_file_binary(self, filename: str) -> bytes: + """ + Read the binary contents of an input file. + + Args: + filename: Name of the file to read + + Returns: + File contents as bytes + + Raises: + FileNotFoundError: If file doesn't exist + PermissionError: If file access is not allowed + ValueError: If file is invalid or too large + """ + secure_path = get_secure_input_path(filename) + + with open(secure_path, 'rb') as f: + content = f.read() + return content + + def get_input_file_info(self, filename: str) -> InputFileInfo: + """ + Get information about a specific input file. + + Args: + filename: Name of the file + + Returns: + InputFileInfo object with file details + + Raises: + FileNotFoundError: If file doesn't exist + PermissionError: If file access is not allowed + ValueError: If file is invalid + """ + # Validate file first + if not validate_input_file(filename): + raise ValueError(f"Invalid input file: {filename}") + + return self._get_file_info(filename) + + def file_exists(self, filename: str) -> bool: + """ + Check if an input file exists and is valid. + + Args: + filename: Name of the file to check + + Returns: + True if file exists and is valid, False otherwise + """ + return validate_input_file(filename) + + def get_file_summary(self) -> Dict[str, Any]: + """ + Get a summary of all input files. + + Returns: + Dictionary with file summary information + """ + files = self.list_input_files() + + total_size = sum(f.size for f in files) + file_types = {} + + for file_info in files: + ext = file_info.extension or 'no extension' + file_types[ext] = file_types.get(ext, 0) + 1 + + return { + 'total_files': len(files), + 'total_size': total_size, + 'file_types': file_types, + 'files': [ + { + 'name': f.name, + 'size': f.size, + 'type': f.extension, + 'modified': f.modified_time.isoformat() + } + for f in files + ] + } + + def read_json_file(self, filename: str) -> Dict[str, Any]: + """ + Read and parse a JSON input file. + + Args: + filename: Name of the JSON file + + Returns: + Parsed JSON data as dictionary + + Raises: + FileNotFoundError: If file doesn't exist + ValueError: If file is not valid JSON + """ + content = self.read_input_file(filename) + try: + return json.loads(content) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in file {filename}: {e}") + + def read_csv_lines(self, filename: str) -> List[str]: + """ + Read a CSV file and return lines. + + Args: + filename: Name of the CSV file + + Returns: + List of lines from the CSV file + """ + content = self.read_input_file(filename) + return [line.strip() for line in content.splitlines() if line.strip()] + + def search_file_content(self, filename: str, search_term: str, case_sensitive: bool = False) -> List[Tuple[int, str]]: + """ + Search for a term within a file's content. + + Args: + filename: Name of the file to search + search_term: Term to search for + case_sensitive: Whether search should be case sensitive + + Returns: + List of tuples (line_number, line_content) containing the search term + """ + content = self.read_input_file(filename) + lines = content.splitlines() + results = [] + + search_term_processed = search_term if case_sensitive else search_term.lower() + + for i, line in enumerate(lines, 1): + line_processed = line if case_sensitive else line.lower() + if search_term_processed in line_processed: + results.append((i, line)) + + return results diff --git a/SimpleAgent/core/utils/security.py b/SimpleAgent/core/utils/security.py index fa99d98..c7aea42 100644 --- a/SimpleAgent/core/utils/security.py +++ b/SimpleAgent/core/utils/security.py @@ -11,7 +11,7 @@ """ import os -from core.utils.config import OUTPUT_DIR +from core.utils.config import OUTPUT_DIR, INPUT_DIR, MAX_INPUT_FILE_SIZE, ALLOWED_INPUT_EXTENSIONS def get_secure_path(file_path: str, base_dir: str = OUTPUT_DIR) -> str: """ @@ -66,4 +66,87 @@ def get_secure_path(file_path: str, base_dir: str = OUTPUT_DIR) -> str: # If the path escapes output directory, block access raise PermissionError(f"Security Error: Attempted to access file outside the output directory: {abs_combined_path}") - return combined_path \ No newline at end of file + return combined_path + + +def get_secure_input_path(file_path: str) -> str: + """ + Securely convert any file path to be within the input directory. + This prevents path traversal attacks and ensures input file operations are contained. + + Args: + file_path: Original file path + + Returns: + Modified file path within the input directory + + Raises: + PermissionError: If the path attempts to escape the input directory + FileNotFoundError: If the input file doesn't exist + ValueError: If the file extension is not allowed or file is too large + """ + # Normalize path separators to system default + file_path = file_path.replace('/', os.path.sep).replace('\\', os.path.sep) + + # Get just the basename to handle absolute paths or traversal attempts + file_name = os.path.basename(file_path) + + # If no filename provided, raise error + if not file_name: + raise ValueError("No filename provided") + + # Remove any leading dots, slashes, or path traversal patterns + clean_path = file_path + while clean_path.startswith(('.', os.path.sep)): + clean_path = clean_path.lstrip('.' + os.path.sep) + + # If path is empty after cleaning, just use filename + if not clean_path: + clean_path = file_name + + # Construct the full path within input directory + combined_path = os.path.normpath(os.path.join(INPUT_DIR, clean_path)) + + # Final security check: ensure the resolved path is within input directory + abs_combined_path = os.path.abspath(combined_path) + abs_input_dir = os.path.abspath(INPUT_DIR) + + if not abs_combined_path.startswith(abs_input_dir): + raise PermissionError(f"Security Error: Attempted to access file outside the input directory: {abs_combined_path}") + + # Check if file exists + if not os.path.exists(combined_path): + raise FileNotFoundError(f"Input file not found: {file_name}") + + # Check if it's actually a file (not a directory) + if not os.path.isfile(combined_path): + raise ValueError(f"Path is not a file: {file_name}") + + # Check file extension + file_ext = os.path.splitext(file_name)[1].lower() + if file_ext not in ALLOWED_INPUT_EXTENSIONS: + raise ValueError(f"File extension '{file_ext}' not allowed. Allowed extensions: {', '.join(ALLOWED_INPUT_EXTENSIONS)}") + + # Check file size + file_size = os.path.getsize(combined_path) + if file_size > MAX_INPUT_FILE_SIZE: + raise ValueError(f"File too large: {file_size} bytes. Maximum allowed: {MAX_INPUT_FILE_SIZE} bytes") + + return combined_path + + +def validate_input_file(file_path: str) -> bool: + """ + Validate if an input file can be safely accessed. + + Args: + file_path: Path to the input file + + Returns: + True if file is valid and safe to access, False otherwise + """ + try: + get_secure_input_path(file_path) + return True + except (PermissionError, FileNotFoundError, ValueError): + return False