diff --git a/veadk/community/langchain_ai/tools/execute_skills.py b/veadk/community/langchain_ai/tools/execute_skills.py new file mode 100644 index 00000000..b5a1fedd --- /dev/null +++ b/veadk/community/langchain_ai/tools/execute_skills.py @@ -0,0 +1,241 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +from typing import List, Optional + +from langchain.tools import ToolRuntime, tool + +from veadk.auth.veauth.utils import get_credential_from_vefaas_iam +from veadk.config import getenv +from veadk.utils.logger import get_logger +from veadk.utils.volcengine_sign import ve_request + +logger = get_logger(__name__) + + +def _clean_ansi_codes(text: str) -> str: + """Remove ANSI escape sequences (color codes, etc.)""" + import re + + ansi_escape = re.compile(r"\x1b\[[0-9;]*m") + return ansi_escape.sub("", text) + + +def _format_execution_result(result_str: str) -> str: + """Format the execution results, handle escape characters and JSON structures""" + try: + result_json = json.loads(result_str) + + if not result_json.get("success"): + message = result_json.get("message", "Unknown error") + outputs = result_json.get("data", {}).get("outputs", []) + if outputs and isinstance(outputs[0], dict): + error_msg = outputs[0].get("ename", "Unknown error") + return f"Execution failed: {message}, {error_msg}" + + outputs = result_json.get("data", {}).get("outputs", []) + if not outputs: + return "No output generated" + + formatted_lines = [] + for output in outputs: + if output and isinstance(output, dict) and "text" in output: + text = output["text"] + text = _clean_ansi_codes(text) + text = text.replace("\\n", "\n") + formatted_lines.append(text) + + return "".join(formatted_lines).strip() + + except json.JSONDecodeError: + return _clean_ansi_codes(result_str) + except Exception as e: + logger.warning(f"Error formatting result: {e}, returning raw result") + return result_str + + +@tool +def execute_skills( + workflow_prompt: str, + runtime: ToolRuntime, + skills: Optional[List[str]] = None, + timeout: int = 900, +) -> str: + """execute skills in a code sandbox and return the output. + For C++ code, don't execute it directly, compile and execute via Python; write sources and object files to /tmp. + + Args: + workflow_prompt (str): instruction of workflow + skills (Optional[List[str]]): The skills will be invoked + timeout (int, optional): The timeout in seconds for the code execution, less than or equal to 900. Defaults to 900. + + Returns: + str: The output of the code execution. + """ + + tool_id = getenv("AGENTKIT_TOOL_ID") + + service = getenv( + "AGENTKIT_TOOL_SERVICE_CODE", "agentkit" + ) # temporary service for code run tool + region = getenv("AGENTKIT_TOOL_REGION", "cn-beijing") + host = getenv( + "AGENTKIT_TOOL_HOST", service + "." + region + ".volces.com" + ) # temporary host for code run tool + logger.debug(f"tools endpoint: {host}") + + session_id = runtime.session_id # type: ignore + agent_name = runtime.context.agent_name # type: ignore + user_id = runtime.context.user_id # type: ignore + tool_user_session_id = agent_name + "_" + user_id + "_" + session_id + logger.debug(f"tool_user_session_id: {tool_user_session_id}") + + logger.debug( + f"Execute skills in session_id={session_id}, tool_id={tool_id}, host={host}, service={service}, region={region}, timeout={timeout}" + ) + + header = {} + + ak = os.getenv("VOLCENGINE_ACCESS_KEY") + sk = os.getenv("VOLCENGINE_SECRET_KEY") + if not (ak and sk): + logger.debug( + "Get AK/SK from environment variables failed. Try to use credential from Iam." + ) + credential = get_credential_from_vefaas_iam() + ak = credential.access_key_id + sk = credential.secret_access_key + header = {"X-Security-Token": credential.session_token} + else: + logger.debug("Successfully get AK/SK from environment variables.") + + cmd = ["python", "agent.py", workflow_prompt] + if skills: + cmd.extend(["--skills"] + skills) + + # TODO: remove after agentkit supports custom environment variables setting + res = ve_request( + request_body={}, + action="GetCallerIdentity", + ak=ak, + sk=sk, + service="sts", + version="2018-01-01", + region=region, + host="sts.volcengineapi.com", + header=header, + ) + try: + account_id = res["Result"]["AccountId"] + except KeyError as e: + logger.error(f"Error occurred while getting account id: {e}, response is {res}") + return res + + env_vars = { + "TOS_SKILLS_DIR": f"tos://agentkit-platform-{account_id}/skills/", + "TOOL_USER_SESSION_ID": tool_user_session_id, + } + + code = f""" +import subprocess +import os +import time +import select +import sys + +env = os.environ.copy() +for key, value in {env_vars!r}.items(): + if key not in env: + env[key] = value + +process = subprocess.Popen( + {cmd!r}, + cwd='/home/gem/veadk_skills', + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + env=env, + bufsize=1, + universal_newlines=True +) + +start_time = time.time() +timeout = {timeout - 10} + +with open('/tmp/agent.log', 'w') as log_file: + while True: + if time.time() - start_time > timeout: + process.kill() + log_file.write('log_type=stderr request_id=x function_id=y revision_number=1 Process timeout\\n') + break + + reads = [process.stdout.fileno(), process.stderr.fileno()] + ret = select.select(reads, [], [], 1) + + for fd in ret[0]: + if fd == process.stdout.fileno(): + line = process.stdout.readline() + if line: + log_file.write(f'log_type=stdout request_id=x function_id=y revision_number=1 {{line}}') + log_file.flush() + if fd == process.stderr.fileno(): + line = process.stderr.readline() + if line: + log_file.write(f'log_type=stderr request_id=x function_id=y revision_number=1 {{line}}') + log_file.flush() + + if process.poll() is not None: + break + + for line in process.stdout: + log_file.write(f'log_type=stdout request_id=x function_id=y revision_number=1 {{line}}') + for line in process.stderr: + log_file.write(f'log_type=stderr request_id=x function_id=y revision_number=1 {{line}}') + +with open('/tmp/agent.log', 'r') as log_file: + output = log_file.read() + print(output) + """ + + res = ve_request( + request_body={ + "ToolId": tool_id, + "UserSessionId": tool_user_session_id, + "OperationType": "RunCode", + "OperationPayload": json.dumps( + { + "code": code, + "timeout": timeout, + "kernel_name": "python3", + } + ), + }, + action="InvokeTool", + ak=ak, + sk=sk, + service=service, + version="2025-10-30", + region=region, + host=host, + header=header, + ) + logger.debug(f"Invoke run code response: {res}") + + try: + return _format_execution_result(res["Result"]["Result"]) + except KeyError as e: + logger.error(f"Error occurred while running code: {e}, response is {res}") + return res diff --git a/veadk/community/langchain_ai/tools/load_memory.py b/veadk/community/langchain_ai/tools/load_memory.py index 65b5432c..86421851 100644 --- a/veadk/community/langchain_ai/tools/load_memory.py +++ b/veadk/community/langchain_ai/tools/load_memory.py @@ -35,7 +35,7 @@ def load_memory(query: str, runtime: ToolRuntime) -> list[str]: return ["Long-term memory store is not initialized."] app_name = store.index - user_id = runtime.context.user_id + user_id = runtime.context.user_id # type: ignore logger.info(f"Load memory for user {user_id} with query {query}") response = store.search((app_name, user_id), query=query) diff --git a/veadk/community/langchain_ai/tools/run_code.py b/veadk/community/langchain_ai/tools/run_code.py new file mode 100644 index 00000000..14356ede --- /dev/null +++ b/veadk/community/langchain_ai/tools/run_code.py @@ -0,0 +1,112 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +from langchain.tools import ToolRuntime, tool + +from veadk.auth.veauth.utils import get_credential_from_vefaas_iam +from veadk.config import getenv +from veadk.utils.logger import get_logger +from veadk.utils.volcengine_sign import ve_request + +logger = get_logger(__name__) + + +@tool +def run_code(code: str, language: str, runtime: ToolRuntime, timeout: int = 30) -> str: + """Run code in a code sandbox and return the output. + For C++ code, don't execute it directly, compile and execute via Python; write sources and object files to /tmp. + + Args: + code (str): The code to run. + language (str): The programming language of the code. Language must be one of the supported languages: python3. + timeout (int, optional): The timeout in seconds for the code execution. Defaults to 30. + + Returns: + str: The output of the code execution. + """ + + tool_id = getenv("AGENTKIT_TOOL_ID") + + service = getenv( + "AGENTKIT_TOOL_SERVICE_CODE", "agentkit" + ) # temporary service for code run tool + region = getenv("AGENTKIT_TOOL_REGION", "cn-beijing") + host = getenv( + "AGENTKIT_TOOL_HOST", service + "." + region + ".volces.com" + ) # temporary host for code run tool + scheme = os.getenv("AGENTKIT_TOOL_SCHEME", "https").lower() + if scheme not in {"http", "https"}: + scheme = "https" + logger.debug(f"tools endpoint: {host}") + + session_id = runtime.context.session_id # type: ignore + user_id = runtime.context.user_id # type: ignore + agent_name = runtime.context.agent_name # type: ignore + + tool_user_session_id = agent_name + "_" + user_id + "_" + session_id + logger.debug(f"tool_user_session_id: {tool_user_session_id}") + + logger.debug( + f"Running code in language: {language}, session_id={session_id}, code={code}, tool_id={tool_id}, host={host}, service={service}, region={region}, timeout={timeout}" + ) + + header = {} + + logger.debug("Get AK/SK from tool context failed.") + ak = os.getenv("VOLCENGINE_ACCESS_KEY") + sk = os.getenv("VOLCENGINE_SECRET_KEY") + if not (ak and sk): + logger.debug( + "Get AK/SK from environment variables failed. Try to use credential from Iam." + ) + credential = get_credential_from_vefaas_iam() + ak = credential.access_key_id + sk = credential.secret_access_key + header = {"X-Security-Token": credential.session_token} + else: + logger.debug("Successfully get AK/SK from environment variables.") + + res = ve_request( + request_body={ + "ToolId": tool_id, + "UserSessionId": tool_user_session_id, + "OperationType": "RunCode", + "OperationPayload": json.dumps( + { + "code": code, + "timeout": timeout, + "kernel_name": language, + } + ), + }, + action="InvokeTool", + ak=ak, + sk=sk, + service=service, + version="2025-10-30", + region=region, + host=host, + header=header, + scheme=scheme, # type: ignore + ) + logger.debug(f"Invoke run code response: {res}") + + try: + return res["Result"]["Result"] + except KeyError as e: + logger.error(f"Error occurred while running code: {e}, response is {res}") + return res diff --git a/veadk/community/langchain_ai/tools/web_search.py b/veadk/community/langchain_ai/tools/web_search.py new file mode 100644 index 00000000..999fac1a --- /dev/null +++ b/veadk/community/langchain_ai/tools/web_search.py @@ -0,0 +1,89 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +The document of this tool see: https://www.volcengine.com/docs/85508/1650263 +""" + +import os + +from langchain.tools import tool + +from veadk.auth.veauth.utils import get_credential_from_vefaas_iam +from veadk.utils.logger import get_logger +from veadk.utils.volcengine_sign import ve_request + +logger = get_logger(__name__) + + +@tool +def web_search(query: str) -> list[str]: + """Search a query in websites. + + Args: + query: The query to search. + + Returns: + A list of result documents. + """ + + # First try to get tool-specific AK/SK + ak = os.getenv("TOOL_WEB_SEARCH_ACCESS_KEY") + sk = os.getenv("TOOL_WEB_SEARCH_SECRET_KEY") + if ak and sk: + logger.debug("Successfully get tool-specific AK/SK.") + + session_token = "" + + if not (ak and sk): + logger.debug("Get AK/SK from tool context failed.") + ak = os.getenv("VOLCENGINE_ACCESS_KEY") + sk = os.getenv("VOLCENGINE_SECRET_KEY") + if not (ak and sk): + logger.debug("Get AK/SK from environment variables failed.") + credential = get_credential_from_vefaas_iam() + ak = credential.access_key_id + sk = credential.secret_access_key + session_token = credential.session_token + else: + logger.debug("Successfully get AK/SK from environment variables.") + else: + logger.debug("Successfully get AK/SK from tool context.") + + response = ve_request( + request_body={ + "Query": query, + "SearchType": "web", + "Count": 5, + "NeedSummary": True, + }, + action="WebSearch", + ak=ak, + sk=sk, + service="volc_torchlight_api", + version="2025-01-01", + region="cn-beijing", + host="mercury.volcengineapi.com", + header={"X-Security-Token": session_token}, + ) + + try: + results: list = response["Result"]["WebResults"] + final_results = [] + for result in results: + final_results.append(result["Summary"].strip()) + return final_results + except Exception as e: + logger.error(f"Web search failed {e}, response body: {response}") + return [response]