diff --git a/python/packages/autogen-ext/src/autogen_ext/code_executors/local/__init__.py b/python/packages/autogen-ext/src/autogen_ext/code_executors/local/__init__.py index f21d9fe4b8ef..8f2118fb3eb0 100644 --- a/python/packages/autogen-ext/src/autogen_ext/code_executors/local/__init__.py +++ b/python/packages/autogen-ext/src/autogen_ext/code_executors/local/__init__.py @@ -144,6 +144,37 @@ async def example(): $functions""" + SECURITY_PREAMBLE: ClassVar[ + str + ] = """ +import sys, os +from pathlib import Path + +def _audit_hook(event, args): + if event in ["open", "io.open", "os.open"]: + mode = "r" + if event == "os.open": + flags = args[1] + if flags & (os.O_WRONLY | os.O_RDWR | os.O_APPEND | os.O_CREAT | os.O_TRUNC): mode = "w" + else: + if len(args) > 1: mode = args[1] + if isinstance(mode, int): return + + if any(c in mode for c in "wax+"): + path = args[0] + if isinstance(path, int): return + try: + p = Path(path).resolve() + cwd = Path.cwd().resolve() + except Exception: + return + + if not str(p).startswith(str(cwd)) and str(p) != os.devnull: + raise PermissionError(f"Security: Access denied to {path}") + +sys.addaudithook(_audit_hook) +""" + def __init__( self, timeout: int = 60, @@ -388,6 +419,10 @@ async def _execute_code_dont_check_setup( filename = f"tmp_code_{code_hash}.{ext}" + # Inject security preamble for Python + if lang == "python": + code = self.SECURITY_PREAMBLE + "\n" + code + written_file = (self.work_dir / filename).resolve() with written_file.open("w", encoding="utf-8") as f: f.write(code)