diff --git a/tests/test_cloud.py b/tests/test_cloud.py index 0f82c195..c14ac087 100644 --- a/tests/test_cloud.py +++ b/tests/test_cloud.py @@ -48,6 +48,7 @@ async def test_cloud(): test_endpoint, "app-123", "func-456", + None, ) # Mock update operation diff --git a/veadk/a2a/remote_ve_agent.py b/veadk/a2a/remote_ve_agent.py index 1eddedaa..296ba717 100644 --- a/veadk/a2a/remote_ve_agent.py +++ b/veadk/a2a/remote_ve_agent.py @@ -286,6 +286,42 @@ async def _pre_run(self, ctx: InvocationContext) -> None: # Inject auth token if credential service is available await self._inject_auth_token(ctx) + # Inject session_id to http headers + if hasattr(self, "_a2a_client") and isinstance(self._a2a_client, BaseClient): + if hasattr(self._a2a_client, "_transport") and hasattr( + self._a2a_client._transport, "httpx_client" + ): + # get session_id from InvocationContext + import uuid + + parent_name = ( + ctx.agent.parent_agent.name + if hasattr(ctx.agent, "parent_agent") and ctx.agent.parent_agent + else self.name + ) + user_id = ( + ctx.session.user_id + if hasattr(ctx, "session") + and hasattr(ctx.session, "user_id") + and ctx.session.user_id + else "unknown" + ) + session_id = ( + ctx.session.id + if hasattr(ctx, "session") + and hasattr(ctx.session, "id") + and ctx.session.id + else str(uuid.uuid4()) + ) + x_session_id = f"{parent_name}_{user_id}_{session_id}" + x_session_id_key = "x-session-id-veadk" + self._a2a_client._transport.httpx_client.headers.update( + {x_session_id_key: x_session_id} + ) + logger.debug( + f"a2a client inject {x_session_id_key} to header: {x_session_id}" + ) + async def _inject_auth_token(self, ctx: InvocationContext) -> None: """Inject authentication token from credential service into the HTTP client. diff --git a/veadk/cloud/cloud_agent_engine.py b/veadk/cloud/cloud_agent_engine.py index 3d884858..3ccb3a8b 100644 --- a/veadk/cloud/cloud_agent_engine.py +++ b/veadk/cloud/cloud_agent_engine.py @@ -221,6 +221,7 @@ def deploy( identity_user_pool_name: str = "", identity_client_name: str = "", local_test: bool = False, + enable_session_affinity: bool = False, ) -> CloudApp: """Deploys a local agent project to Volcengine FaaS, creating necessary resources. @@ -287,15 +288,20 @@ def deploy( identity_client_name = f"{application_name}-id-cli-{formatted_timestamp()}" try: - vefaas_application_url, app_id, function_id = self._vefaas_service.deploy( - path=path, - name=application_name, - gateway_name=gateway_name, - gateway_service_name=gateway_service_name, - gateway_upstream_name=gateway_upstream_name, - enable_key_auth=enable_key_auth, + vefaas_application_url, app_id, function_id, affinity_binding_id = ( + self._vefaas_service.deploy( + path=path, + name=application_name, + gateway_name=gateway_name, + gateway_service_name=gateway_service_name, + gateway_upstream_name=gateway_upstream_name, + enable_key_auth=enable_key_auth, + enable_session_affinity=enable_session_affinity, + ) ) - _ = function_id # for future use + _ = function_id + if affinity_binding_id: + logger.info(f"Session affinity plugin bindng_id: {affinity_binding_id}") veapig_gateway_id, _, veapig_route_id = ( self._vefaas_service.get_application_route(app_id=app_id) @@ -393,6 +399,7 @@ def deploy( vefaas_application_name=application_name, vefaas_endpoint=vefaas_application_url, vefaas_application_id=app_id, + affinity_binding_id=affinity_binding_id, ) except Exception as e: raise ValueError( @@ -483,3 +490,8 @@ def update_function_code( raise ValueError( f"Failed to update agent project on Volcengine FaaS platform. Error: {e}" ) + + def disable_session_affinity(self, binding_id: str) -> None: + """Disable session affinity by deleting plugin binding.""" + self._vefaas_service.disable_session_affinity(binding_id) + logger.info(f"Session affinity plugin binding {binding_id} deleted.") diff --git a/veadk/cloud/cloud_app.py b/veadk/cloud/cloud_app.py index 9c7a746e..665f9edd 100644 --- a/veadk/cloud/cloud_app.py +++ b/veadk/cloud/cloud_app.py @@ -62,6 +62,7 @@ def __init__( vefaas_endpoint: str = "", vefaas_application_id: str = "", use_agent_card: bool = False, + affinity_binding_id: str | None = None, ): """Initializes the CloudApp with VeFaaS application details. @@ -92,6 +93,7 @@ def __init__( self.vefaas_application_id = vefaas_application_id self.vefaas_application_name = vefaas_application_name self.use_agent_card = use_agent_card + self.affinity_binding_id = affinity_binding_id # vefaas must be set one of three if ( diff --git a/veadk/integrations/ve_apig/ve_apig.py b/veadk/integrations/ve_apig/ve_apig.py index 5bd9e332..bf66a814 100644 --- a/veadk/integrations/ve_apig/ve_apig.py +++ b/veadk/integrations/ve_apig/ve_apig.py @@ -13,7 +13,7 @@ # limitations under the License. import time - +import json import volcenginesdkcore from volcenginesdkapig import APIGApi from volcenginesdkapig20221112 import APIG20221112Api, UpstreamListForCreateRouteInput @@ -347,3 +347,52 @@ def create( "upstream_id": upstream_id, "route_ids": route_ids, } + + def create_session_affinity_plugin(self, gateway_id: str) -> str: + """Create session affinity plugin on gateway. Returns plugin_id.""" + response = ve_request( + request_body={ + "PluginName": "wasm-session-affinity-pro", + "PluginConfig": "", + "GatewayId": gateway_id, + "Enable": True, + }, + action="CreatePlugin", + ak=self.ak, + sk=self.sk, + service="apig", + version="2022-11-12", + region=self.region, + host="open.volcengineapi.com", + ) + return response["Result"]["PluginID"] + + def bind_session_affinity_plugin(self, service_id: str) -> str: + """Bind session affinity plugin to service. Returns binding_id.""" + plugin_config = json.dumps( + { + "Position": "Header", + "DownstreamSessionKey": "x-session-id-veadk", + "UpstreamHeaders": [], + "FailureModeAllow": False, + } + ) + return self.create_plugin_binding( + scope="SERVICE", + target=service_id, + plugin_name="wasm-session-affinity-pro", + plugin_config=plugin_config, + ) + + def delete_plugin_binding(self, binding_id: str) -> None: + """Delete a plugin binding by id.""" + ve_request( + request_body={"Id": binding_id}, + action="DeletePluginBinding", + ak=self.ak, + sk=self.sk, + service="apig", + version="2021-03-03", + region=self.region, + host="open.volcengineapi.com", + ) diff --git a/veadk/integrations/ve_faas/ve_faas.py b/veadk/integrations/ve_faas/ve_faas.py index cdc03869..0cc9e686 100644 --- a/veadk/integrations/ve_faas/ve_faas.py +++ b/veadk/integrations/ve_faas/ve_faas.py @@ -444,7 +444,8 @@ def deploy( gateway_service_name: str = "", gateway_upstream_name: str = "", enable_key_auth: bool = False, - ) -> tuple[str, str, str]: + enable_session_affinity: bool = False, + ) -> tuple[str, str, str, str | None]: """Deploy an agent project to VeFaaS service. Args: @@ -456,7 +457,7 @@ def deploy( enable_key_auth (bool, optional): Enable key auth. Defaults to False. Returns: - tuple[str, str, str]: (url, app_id, function_id) + tuple[str, str, str, str | None]: (url, app_id, function_id, affinity_binding_id) """ # Naming check if "_" in name: @@ -508,7 +509,11 @@ def deploy( logger.info(f"VeFaaS application {name} with ID {app_id} deployed on {url}.") - return url, app_id, function_id + # Enable session affinity plugin + affinity_binding_id = None + if enable_session_affinity: + affinity_binding_id = self._enable_session_affinity(app_id) + return url, app_id, function_id, affinity_binding_id def _create_image_function(self, function_name: str, image: str): """Create function using container image instead of code upload.""" @@ -678,7 +683,8 @@ def deploy_image( gateway_name: str = "", gateway_service_name: str = "", gateway_upstream_name: str = "", - ) -> tuple[str, str, str]: + enable_session_affinity: bool = False, + ) -> tuple[str, str, str, str | None]: """Deploy application using container image. Args: @@ -767,7 +773,11 @@ def deploy_image( logger.info(f"VeFaaS application {name} with ID {app_id} deployed on {url}.") - return url, app_id, function_id + # Enable session affinity plugin + affinity_binding_id = None + if enable_session_affinity: + affinity_binding_id = self._enable_session_affinity(app_id) + return url, app_id, function_id, affinity_binding_id def _get_application_logs(self, app_id: str) -> list[str]: response = _ = ve_request( @@ -786,3 +796,27 @@ def _get_application_logs(self, app_id: str) -> list[str]: return logs except Exception as _: raise ValueError(f"Get application log failed. Response: {response}") + + def _enable_session_affinity(self, app_id: str) -> str | None: + """Enable session affinity for application. Returns binding_id.""" + route_info = self.get_application_route(app_id=app_id) + if not route_info: + logger.warning(f"Cannot get route info for app {app_id}, skip affinity") + return None + gateway_id, service_id, _ = route_info + # Create plugin on gateway (ignore if already exists) + try: + self.apig_client.create_session_affinity_plugin(gateway_id) + except Exception: + logger.warning( + f"Create session affinity plugin failed. Gateway ID: {gateway_id}" + ) + # Bind plugin to service + binding_id = self.apig_client.bind_session_affinity_plugin(service_id) + logger.info(f"Session affinity enabled, binding_id: {binding_id}") + return binding_id + + def disable_session_affinity(self, binding_id: str) -> None: + """Disable session affinity by deleting plugin binding.""" + self.apig_client.delete_plugin_binding(binding_id) + logger.info(f"Session affinity disabled, binding_id: {binding_id}") diff --git a/veadk/runner.py b/veadk/runner.py index 7c901978..6c708671 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -494,6 +494,10 @@ async def run( f"Auto create session: {session.id}, user_id: {session.user_id}, app_name: {self.app_name}" ) + import uuid + + x_session_id = f"{self.agent.name}_{user_id}_{session_id or uuid.uuid4()}" + self._inject_header_to_tools(x_session_id=x_session_id) final_output = "" for converted_message in converted_messages: try: @@ -747,3 +751,19 @@ async def save_session_to_long_term_memory( await self.long_term_memory.add_session_to_memory(session, kwargs=kwargs) logger.info(f"Add session `{session.id}` to long term memory.") + + def _inject_header_to_tools(self, x_session_id: str): + """Auto inject header to McpToolset""" + from google.adk.tools.mcp_tool import McpToolset + + x_session_id_key = "x-session-id-veadk" + for tool in self.agent.tools: + if isinstance(tool, McpToolset): + original_provider = tool._header_provider + tool._header_provider = lambda ctx, sid=x_session_id: { + **(original_provider(ctx) if original_provider else {}), + x_session_id_key: sid, + } + logger.debug( + f"mcp client inject {x_session_id_key} to McpToolset: {x_session_id}" + )