From 398c55c0fe6c42aea878215c4728f752bdee139a Mon Sep 17 00:00:00 2001 From: tangou Date: Fri, 28 Nov 2025 02:27:34 +0800 Subject: [PATCH 1/7] feat(middlewares): A2A and MCP Session Affinity --- veadk/a2a/ve_middlewares.py | 179 +++++++++++++++++++++++++++++++++++- 1 file changed, 174 insertions(+), 5 deletions(-) diff --git a/veadk/a2a/ve_middlewares.py b/veadk/a2a/ve_middlewares.py index 3da16d52..0fa5b316 100644 --- a/veadk/a2a/ve_middlewares.py +++ b/veadk/a2a/ve_middlewares.py @@ -18,15 +18,13 @@ from HTTP requests and storing them in the credential service. """ -import logging from typing import Callable, Literal, Optional - from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response from volcenginesdkcore.rest import ApiException - - +import json +from starlette.datastructures import MutableHeaders from veadk.auth.ve_credential_service import VeCredentialService from veadk.utils.auth import ( extract_delegation_chain_from_jwt, @@ -39,8 +37,9 @@ IdentityClient, get_default_identity_client, ) +from veadk.utils.logger import get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) class A2AAuthMiddleware(BaseHTTPMiddleware): @@ -311,3 +310,173 @@ def __init__(self, app): ) return ConfiguredA2AAuthMiddleware + + +class A2ASessionAffinityMiddleware(BaseHTTPMiddleware): + _PATCHED = False + + def __init__(self, app, upstream_header: str = "x-session-id"): + super().__init__(app) + self.upstream_header = upstream_header + if not A2ASessionAffinityMiddleware._PATCHED: + self._patch() + A2ASessionAffinityMiddleware._PATCHED = True + + def _patch(self): + from a2a.server.apps.jsonrpc.jsonrpc_app import JSONRPCApplication + from a2a.types import SendStreamingMessageRequest, TaskResubscriptionRequest + + header_name = self.upstream_header + orig_create_response = JSONRPCApplication._create_response + + async def patched_process_streaming(self_app, request_id, a2a_request, context): + """pre-extract the first event to get context_id""" + request_obj = a2a_request.root + + if isinstance(request_obj, SendStreamingMessageRequest): + gen = self_app.handler.on_message_send_stream(request_obj, context) + elif isinstance(request_obj, TaskResubscriptionRequest): + gen = self_app.handler.on_resubscribe_to_task(request_obj, context) + else: + return self_app._create_response(context, None) + + # pre-extract the first event + try: + first = await gen.__anext__() + except StopAsyncIteration: + # empty generator, return empty stream + async def empty(): + return + yield # make it a generator + + return self_app._create_response(context, empty()) + + # extract context_id + try: + ctx_id = getattr(first.root.result, "context_id", None) + if ctx_id: + context.state["_ctx"] = ctx_id + logger.info(f"[Affinity] extract context_id: {ctx_id}") + except Exception as e: + logger.error(f"[Affinity] extract context_id failed: {e}") + + # re-compose generator + async def combined(): + yield first + async for item in gen: + yield item + + return self_app._create_response(context, combined()) + + def patched_create_response(self_app, context, handler_result): + """add header""" + response = orig_create_response(self_app, context, handler_result) + + ctx_id = context.state.get("_ctx") + + # non-streaming: extract from body + if not ctx_id and hasattr(response, "body"): + try: + ctx_id = ( + json.loads(response.body).get("result", {}).get("contextId") + ) + except Exception: + pass + + if ctx_id: + response.headers[header_name] = ctx_id + logger.info(f"[Affinity] Header: {ctx_id}") + + return response + + JSONRPCApplication._process_streaming_request = patched_process_streaming + JSONRPCApplication._create_response = patched_create_response + logger.info("[Affinity] Patch 完成") + + async def dispatch(self, request, call_next): + return await call_next(request) + + +class MCPSessionAffinityMiddleware: + """MCP session affinity middleware (ASGI level)""" + + def __init__(self, app, upstream_header: str = "x-session-id"): + self.app = app + self.upstream_header = upstream_header.lower() + self.upstream_header_bytes = upstream_header.lower().encode() + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + # check if it is a MCP path + path = scope.get("path", "") + if not path.startswith("/mcp"): + await self.app(scope, receive, send) + return + + # collect request body to extract session_id + body_parts = [] + session_id = None + + async def receive_wrapper(): + message = await receive() + if message["type"] == "http.request": + body = message.get("body", b"") + body_parts.append(body) + return message + + # read the full request body first + first_message = await receive() + if first_message["type"] == "http.request": + body_parts.append(first_message.get("body", b"")) + # if there is more data + more_body = first_message.get("more_body", False) + while more_body: + msg = await receive() + if msg["type"] == "http.request": + body_parts.append(msg.get("body", b"")) + more_body = msg.get("more_body", False) + else: + break + + # try to extract session_id from the request body + full_body = b"".join(body_parts) + try: + data = json.loads(full_body) + # MCP tools/call format: {"method": "tools/call", "params": {"arguments": {"session_id": "..."}}} + if isinstance(data, dict): + params = data.get("params", {}) + args = params.get("arguments", {}) + session_id = args.get("session_id") + if session_id: + logger.info(f"[MCP Affinity] extract session_id: {session_id}") + except Exception as e: + logger.error(f"[MCP Affinity] parse request body failed: {e}") + + # replay the request body + body_sent = False + + async def replay_receive(): + nonlocal body_sent + if not body_sent: + body_sent = True + return {"type": "http.request", "body": full_body, "more_body": False} + return await receive() + + # wrap send to inject header + header_value = session_id.encode() if session_id else None + + async def send_wrapper(message): + if message["type"] == "http.response.start" and header_value: + headers = MutableHeaders(raw=list(message.get("headers", []))) + headers[self.upstream_header] = session_id + message = { + **message, + "headers": headers.raw, + } + logger.info(f"[MCP Affinity] Header set: {session_id}") + await send(message) + + await self.app(scope, replay_receive, send_wrapper) From ba3c52befc582e73176960f6a1745175c1403d0a Mon Sep 17 00:00:00 2001 From: tangou Date: Wed, 10 Dec 2025 23:17:02 +0800 Subject: [PATCH 2/7] feat(vefaas): update session affinity --- veadk/a2a/remote_ve_agent.py | 33 +++++++ veadk/a2a/ve_middlewares.py | 179 +---------------------------------- veadk/runner.py | 17 ++++ 3 files changed, 55 insertions(+), 174 deletions(-) diff --git a/veadk/a2a/remote_ve_agent.py b/veadk/a2a/remote_ve_agent.py index 1eddedaa..96bdaf64 100644 --- a/veadk/a2a/remote_ve_agent.py +++ b/veadk/a2a/remote_ve_agent.py @@ -286,6 +286,39 @@ async def _pre_run(self, ctx: InvocationContext) -> None: # Inject auth token if credential service is available await self._inject_auth_token(ctx) + # Inject session_id to http headers + if hasattr(self, "_a2a_client") and isinstance(self._a2a_client, BaseClient): + if hasattr(self._a2a_client, "_transport") and hasattr( + self._a2a_client._transport, "httpx_client" + ): + # get session_id from InvocationContext + import uuid + + parent_name = ( + ctx.agent.parent_agent.name + if hasattr(ctx.agent, "parent_agent") and ctx.agent.parent_agent + else self.name + ) + user_id = ( + ctx.session.user_id + if hasattr(ctx, "session") + and hasattr(ctx.session, "user_id") + and ctx.session.user_id + else "unknown" + ) + session_id = ( + ctx.session.id + if hasattr(ctx, "session") + and hasattr(ctx.session, "id") + and ctx.session.id + else str(uuid.uuid4()) + ) + x_session_id = f"{parent_name}_{user_id}_{session_id}" + self._a2a_client._transport.httpx_client.headers.update( + {"x-session-id": x_session_id} + ) + logger.debug(f"Auto-injected session_id header: {x_session_id}") + async def _inject_auth_token(self, ctx: InvocationContext) -> None: """Inject authentication token from credential service into the HTTP client. diff --git a/veadk/a2a/ve_middlewares.py b/veadk/a2a/ve_middlewares.py index 0fa5b316..3da16d52 100644 --- a/veadk/a2a/ve_middlewares.py +++ b/veadk/a2a/ve_middlewares.py @@ -18,13 +18,15 @@ from HTTP requests and storing them in the credential service. """ +import logging from typing import Callable, Literal, Optional + from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response from volcenginesdkcore.rest import ApiException -import json -from starlette.datastructures import MutableHeaders + + from veadk.auth.ve_credential_service import VeCredentialService from veadk.utils.auth import ( extract_delegation_chain_from_jwt, @@ -37,9 +39,8 @@ IdentityClient, get_default_identity_client, ) -from veadk.utils.logger import get_logger -logger = get_logger(__name__) +logger = logging.getLogger(__name__) class A2AAuthMiddleware(BaseHTTPMiddleware): @@ -310,173 +311,3 @@ def __init__(self, app): ) return ConfiguredA2AAuthMiddleware - - -class A2ASessionAffinityMiddleware(BaseHTTPMiddleware): - _PATCHED = False - - def __init__(self, app, upstream_header: str = "x-session-id"): - super().__init__(app) - self.upstream_header = upstream_header - if not A2ASessionAffinityMiddleware._PATCHED: - self._patch() - A2ASessionAffinityMiddleware._PATCHED = True - - def _patch(self): - from a2a.server.apps.jsonrpc.jsonrpc_app import JSONRPCApplication - from a2a.types import SendStreamingMessageRequest, TaskResubscriptionRequest - - header_name = self.upstream_header - orig_create_response = JSONRPCApplication._create_response - - async def patched_process_streaming(self_app, request_id, a2a_request, context): - """pre-extract the first event to get context_id""" - request_obj = a2a_request.root - - if isinstance(request_obj, SendStreamingMessageRequest): - gen = self_app.handler.on_message_send_stream(request_obj, context) - elif isinstance(request_obj, TaskResubscriptionRequest): - gen = self_app.handler.on_resubscribe_to_task(request_obj, context) - else: - return self_app._create_response(context, None) - - # pre-extract the first event - try: - first = await gen.__anext__() - except StopAsyncIteration: - # empty generator, return empty stream - async def empty(): - return - yield # make it a generator - - return self_app._create_response(context, empty()) - - # extract context_id - try: - ctx_id = getattr(first.root.result, "context_id", None) - if ctx_id: - context.state["_ctx"] = ctx_id - logger.info(f"[Affinity] extract context_id: {ctx_id}") - except Exception as e: - logger.error(f"[Affinity] extract context_id failed: {e}") - - # re-compose generator - async def combined(): - yield first - async for item in gen: - yield item - - return self_app._create_response(context, combined()) - - def patched_create_response(self_app, context, handler_result): - """add header""" - response = orig_create_response(self_app, context, handler_result) - - ctx_id = context.state.get("_ctx") - - # non-streaming: extract from body - if not ctx_id and hasattr(response, "body"): - try: - ctx_id = ( - json.loads(response.body).get("result", {}).get("contextId") - ) - except Exception: - pass - - if ctx_id: - response.headers[header_name] = ctx_id - logger.info(f"[Affinity] Header: {ctx_id}") - - return response - - JSONRPCApplication._process_streaming_request = patched_process_streaming - JSONRPCApplication._create_response = patched_create_response - logger.info("[Affinity] Patch 完成") - - async def dispatch(self, request, call_next): - return await call_next(request) - - -class MCPSessionAffinityMiddleware: - """MCP session affinity middleware (ASGI level)""" - - def __init__(self, app, upstream_header: str = "x-session-id"): - self.app = app - self.upstream_header = upstream_header.lower() - self.upstream_header_bytes = upstream_header.lower().encode() - - async def __call__(self, scope, receive, send): - if scope["type"] != "http": - await self.app(scope, receive, send) - return - - # check if it is a MCP path - path = scope.get("path", "") - if not path.startswith("/mcp"): - await self.app(scope, receive, send) - return - - # collect request body to extract session_id - body_parts = [] - session_id = None - - async def receive_wrapper(): - message = await receive() - if message["type"] == "http.request": - body = message.get("body", b"") - body_parts.append(body) - return message - - # read the full request body first - first_message = await receive() - if first_message["type"] == "http.request": - body_parts.append(first_message.get("body", b"")) - # if there is more data - more_body = first_message.get("more_body", False) - while more_body: - msg = await receive() - if msg["type"] == "http.request": - body_parts.append(msg.get("body", b"")) - more_body = msg.get("more_body", False) - else: - break - - # try to extract session_id from the request body - full_body = b"".join(body_parts) - try: - data = json.loads(full_body) - # MCP tools/call format: {"method": "tools/call", "params": {"arguments": {"session_id": "..."}}} - if isinstance(data, dict): - params = data.get("params", {}) - args = params.get("arguments", {}) - session_id = args.get("session_id") - if session_id: - logger.info(f"[MCP Affinity] extract session_id: {session_id}") - except Exception as e: - logger.error(f"[MCP Affinity] parse request body failed: {e}") - - # replay the request body - body_sent = False - - async def replay_receive(): - nonlocal body_sent - if not body_sent: - body_sent = True - return {"type": "http.request", "body": full_body, "more_body": False} - return await receive() - - # wrap send to inject header - header_value = session_id.encode() if session_id else None - - async def send_wrapper(message): - if message["type"] == "http.response.start" and header_value: - headers = MutableHeaders(raw=list(message.get("headers", []))) - headers[self.upstream_header] = session_id - message = { - **message, - "headers": headers.raw, - } - logger.info(f"[MCP Affinity] Header set: {session_id}") - await send(message) - - await self.app(scope, replay_receive, send_wrapper) diff --git a/veadk/runner.py b/veadk/runner.py index fff6bfdc..dfaf9f66 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -481,6 +481,10 @@ async def run( f"Auto create session: {session.id}, user_id: {session.user_id}, app_name: {self.app_name}" ) + import uuid + + x_session_id = f"{self.agent.name}_{user_id}_{session_id or uuid.uuid4()}" + self._inject_header_to_tools(x_session_id=x_session_id) final_output = "" for converted_message in converted_messages: try: @@ -732,3 +736,16 @@ async def save_session_to_long_term_memory( await self.long_term_memory.add_session_to_memory(session, kwargs=kwargs) logger.info(f"Add session `{session.id}` to long term memory.") + + def _inject_header_to_tools(self, x_session_id: str): + """Auto inject header to McpToolset""" + from google.adk.tools.mcp_tool import McpToolset + + for tool in self.agent.tools: + if isinstance(tool, McpToolset): + original_provider = tool._header_provider + tool._header_provider = lambda ctx, sid=x_session_id: { + **(original_provider(ctx) if original_provider else {}), + "x-session-id": sid, + } + logger.debug(f"Injected session context to McpToolset: {x_session_id}") From 0db566815820da31289d40a8ff6f4789fd09296c Mon Sep 17 00:00:00 2001 From: tangou Date: Mon, 12 Jan 2026 11:09:30 +0800 Subject: [PATCH 3/7] fix(vefaas): x-session-id to x-session-id-veadk --- veadk/a2a/remote_ve_agent.py | 2 +- veadk/runner.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/veadk/a2a/remote_ve_agent.py b/veadk/a2a/remote_ve_agent.py index 96bdaf64..a1de1587 100644 --- a/veadk/a2a/remote_ve_agent.py +++ b/veadk/a2a/remote_ve_agent.py @@ -315,7 +315,7 @@ async def _pre_run(self, ctx: InvocationContext) -> None: ) x_session_id = f"{parent_name}_{user_id}_{session_id}" self._a2a_client._transport.httpx_client.headers.update( - {"x-session-id": x_session_id} + {"x-session-id-veadk": x_session_id} ) logger.debug(f"Auto-injected session_id header: {x_session_id}") diff --git a/veadk/runner.py b/veadk/runner.py index 6dd2de54..ae771aaf 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -756,6 +756,6 @@ def _inject_header_to_tools(self, x_session_id: str): original_provider = tool._header_provider tool._header_provider = lambda ctx, sid=x_session_id: { **(original_provider(ctx) if original_provider else {}), - "x-session-id": sid, + "x-session-id-veadk": sid, } logger.debug(f"Injected session context to McpToolset: {x_session_id}") From 189dceec55557e44a26a1376131358fb4f39bb58 Mon Sep 17 00:00:00 2001 From: tangou Date: Mon, 12 Jan 2026 12:05:58 +0800 Subject: [PATCH 4/7] feat(vefaas): update runner --- veadk/runner.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/veadk/runner.py b/veadk/runner.py index ae771aaf..89216ce6 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -751,11 +751,14 @@ def _inject_header_to_tools(self, x_session_id: str): """Auto inject header to McpToolset""" from google.adk.tools.mcp_tool import McpToolset + x_session_id_key = "x-session-id-veadk" for tool in self.agent.tools: if isinstance(tool, McpToolset): original_provider = tool._header_provider tool._header_provider = lambda ctx, sid=x_session_id: { **(original_provider(ctx) if original_provider else {}), - "x-session-id-veadk": sid, + x_session_id_key: sid, } - logger.debug(f"Injected session context to McpToolset: {x_session_id}") + logger.debug( + f"mcp client inject {x_session_id_key} to McpToolset: {x_session_id}" + ) From 07427dd7da89b35660d50478c5f066c38019e8b7 Mon Sep 17 00:00:00 2001 From: tangou Date: Mon, 12 Jan 2026 23:57:35 +0800 Subject: [PATCH 5/7] feat(vefaas): update remoteveagent --- veadk/a2a/remote_ve_agent.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/veadk/a2a/remote_ve_agent.py b/veadk/a2a/remote_ve_agent.py index a1de1587..296ba717 100644 --- a/veadk/a2a/remote_ve_agent.py +++ b/veadk/a2a/remote_ve_agent.py @@ -314,10 +314,13 @@ async def _pre_run(self, ctx: InvocationContext) -> None: else str(uuid.uuid4()) ) x_session_id = f"{parent_name}_{user_id}_{session_id}" + x_session_id_key = "x-session-id-veadk" self._a2a_client._transport.httpx_client.headers.update( - {"x-session-id-veadk": x_session_id} + {x_session_id_key: x_session_id} + ) + logger.debug( + f"a2a client inject {x_session_id_key} to header: {x_session_id}" ) - logger.debug(f"Auto-injected session_id header: {x_session_id}") async def _inject_auth_token(self, ctx: InvocationContext) -> None: """Inject authentication token from credential service into the HTTP client. From 43637430f56aae133b7fc3b7faa43f7f557a0076 Mon Sep 17 00:00:00 2001 From: tangou Date: Tue, 13 Jan 2026 01:14:46 +0800 Subject: [PATCH 6/7] feat(vefaas): update enable affinity plug --- veadk/cloud/cloud_agent_engine.py | 28 ++++++++++----- veadk/cloud/cloud_app.py | 2 ++ veadk/integrations/ve_apig/ve_apig.py | 51 ++++++++++++++++++++++++++- veadk/integrations/ve_faas/ve_faas.py | 44 ++++++++++++++++++++--- 4 files changed, 111 insertions(+), 14 deletions(-) diff --git a/veadk/cloud/cloud_agent_engine.py b/veadk/cloud/cloud_agent_engine.py index 3d884858..3ccb3a8b 100644 --- a/veadk/cloud/cloud_agent_engine.py +++ b/veadk/cloud/cloud_agent_engine.py @@ -221,6 +221,7 @@ def deploy( identity_user_pool_name: str = "", identity_client_name: str = "", local_test: bool = False, + enable_session_affinity: bool = False, ) -> CloudApp: """Deploys a local agent project to Volcengine FaaS, creating necessary resources. @@ -287,15 +288,20 @@ def deploy( identity_client_name = f"{application_name}-id-cli-{formatted_timestamp()}" try: - vefaas_application_url, app_id, function_id = self._vefaas_service.deploy( - path=path, - name=application_name, - gateway_name=gateway_name, - gateway_service_name=gateway_service_name, - gateway_upstream_name=gateway_upstream_name, - enable_key_auth=enable_key_auth, + vefaas_application_url, app_id, function_id, affinity_binding_id = ( + self._vefaas_service.deploy( + path=path, + name=application_name, + gateway_name=gateway_name, + gateway_service_name=gateway_service_name, + gateway_upstream_name=gateway_upstream_name, + enable_key_auth=enable_key_auth, + enable_session_affinity=enable_session_affinity, + ) ) - _ = function_id # for future use + _ = function_id + if affinity_binding_id: + logger.info(f"Session affinity plugin bindng_id: {affinity_binding_id}") veapig_gateway_id, _, veapig_route_id = ( self._vefaas_service.get_application_route(app_id=app_id) @@ -393,6 +399,7 @@ def deploy( vefaas_application_name=application_name, vefaas_endpoint=vefaas_application_url, vefaas_application_id=app_id, + affinity_binding_id=affinity_binding_id, ) except Exception as e: raise ValueError( @@ -483,3 +490,8 @@ def update_function_code( raise ValueError( f"Failed to update agent project on Volcengine FaaS platform. Error: {e}" ) + + def disable_session_affinity(self, binding_id: str) -> None: + """Disable session affinity by deleting plugin binding.""" + self._vefaas_service.disable_session_affinity(binding_id) + logger.info(f"Session affinity plugin binding {binding_id} deleted.") diff --git a/veadk/cloud/cloud_app.py b/veadk/cloud/cloud_app.py index 9c7a746e..665f9edd 100644 --- a/veadk/cloud/cloud_app.py +++ b/veadk/cloud/cloud_app.py @@ -62,6 +62,7 @@ def __init__( vefaas_endpoint: str = "", vefaas_application_id: str = "", use_agent_card: bool = False, + affinity_binding_id: str | None = None, ): """Initializes the CloudApp with VeFaaS application details. @@ -92,6 +93,7 @@ def __init__( self.vefaas_application_id = vefaas_application_id self.vefaas_application_name = vefaas_application_name self.use_agent_card = use_agent_card + self.affinity_binding_id = affinity_binding_id # vefaas must be set one of three if ( diff --git a/veadk/integrations/ve_apig/ve_apig.py b/veadk/integrations/ve_apig/ve_apig.py index 5bd9e332..bf66a814 100644 --- a/veadk/integrations/ve_apig/ve_apig.py +++ b/veadk/integrations/ve_apig/ve_apig.py @@ -13,7 +13,7 @@ # limitations under the License. import time - +import json import volcenginesdkcore from volcenginesdkapig import APIGApi from volcenginesdkapig20221112 import APIG20221112Api, UpstreamListForCreateRouteInput @@ -347,3 +347,52 @@ def create( "upstream_id": upstream_id, "route_ids": route_ids, } + + def create_session_affinity_plugin(self, gateway_id: str) -> str: + """Create session affinity plugin on gateway. Returns plugin_id.""" + response = ve_request( + request_body={ + "PluginName": "wasm-session-affinity-pro", + "PluginConfig": "", + "GatewayId": gateway_id, + "Enable": True, + }, + action="CreatePlugin", + ak=self.ak, + sk=self.sk, + service="apig", + version="2022-11-12", + region=self.region, + host="open.volcengineapi.com", + ) + return response["Result"]["PluginID"] + + def bind_session_affinity_plugin(self, service_id: str) -> str: + """Bind session affinity plugin to service. Returns binding_id.""" + plugin_config = json.dumps( + { + "Position": "Header", + "DownstreamSessionKey": "x-session-id-veadk", + "UpstreamHeaders": [], + "FailureModeAllow": False, + } + ) + return self.create_plugin_binding( + scope="SERVICE", + target=service_id, + plugin_name="wasm-session-affinity-pro", + plugin_config=plugin_config, + ) + + def delete_plugin_binding(self, binding_id: str) -> None: + """Delete a plugin binding by id.""" + ve_request( + request_body={"Id": binding_id}, + action="DeletePluginBinding", + ak=self.ak, + sk=self.sk, + service="apig", + version="2021-03-03", + region=self.region, + host="open.volcengineapi.com", + ) diff --git a/veadk/integrations/ve_faas/ve_faas.py b/veadk/integrations/ve_faas/ve_faas.py index cdc03869..0cc9e686 100644 --- a/veadk/integrations/ve_faas/ve_faas.py +++ b/veadk/integrations/ve_faas/ve_faas.py @@ -444,7 +444,8 @@ def deploy( gateway_service_name: str = "", gateway_upstream_name: str = "", enable_key_auth: bool = False, - ) -> tuple[str, str, str]: + enable_session_affinity: bool = False, + ) -> tuple[str, str, str, str | None]: """Deploy an agent project to VeFaaS service. Args: @@ -456,7 +457,7 @@ def deploy( enable_key_auth (bool, optional): Enable key auth. Defaults to False. Returns: - tuple[str, str, str]: (url, app_id, function_id) + tuple[str, str, str, str | None]: (url, app_id, function_id, affinity_binding_id) """ # Naming check if "_" in name: @@ -508,7 +509,11 @@ def deploy( logger.info(f"VeFaaS application {name} with ID {app_id} deployed on {url}.") - return url, app_id, function_id + # Enable session affinity plugin + affinity_binding_id = None + if enable_session_affinity: + affinity_binding_id = self._enable_session_affinity(app_id) + return url, app_id, function_id, affinity_binding_id def _create_image_function(self, function_name: str, image: str): """Create function using container image instead of code upload.""" @@ -678,7 +683,8 @@ def deploy_image( gateway_name: str = "", gateway_service_name: str = "", gateway_upstream_name: str = "", - ) -> tuple[str, str, str]: + enable_session_affinity: bool = False, + ) -> tuple[str, str, str, str | None]: """Deploy application using container image. Args: @@ -767,7 +773,11 @@ def deploy_image( logger.info(f"VeFaaS application {name} with ID {app_id} deployed on {url}.") - return url, app_id, function_id + # Enable session affinity plugin + affinity_binding_id = None + if enable_session_affinity: + affinity_binding_id = self._enable_session_affinity(app_id) + return url, app_id, function_id, affinity_binding_id def _get_application_logs(self, app_id: str) -> list[str]: response = _ = ve_request( @@ -786,3 +796,27 @@ def _get_application_logs(self, app_id: str) -> list[str]: return logs except Exception as _: raise ValueError(f"Get application log failed. Response: {response}") + + def _enable_session_affinity(self, app_id: str) -> str | None: + """Enable session affinity for application. Returns binding_id.""" + route_info = self.get_application_route(app_id=app_id) + if not route_info: + logger.warning(f"Cannot get route info for app {app_id}, skip affinity") + return None + gateway_id, service_id, _ = route_info + # Create plugin on gateway (ignore if already exists) + try: + self.apig_client.create_session_affinity_plugin(gateway_id) + except Exception: + logger.warning( + f"Create session affinity plugin failed. Gateway ID: {gateway_id}" + ) + # Bind plugin to service + binding_id = self.apig_client.bind_session_affinity_plugin(service_id) + logger.info(f"Session affinity enabled, binding_id: {binding_id}") + return binding_id + + def disable_session_affinity(self, binding_id: str) -> None: + """Disable session affinity by deleting plugin binding.""" + self.apig_client.delete_plugin_binding(binding_id) + logger.info(f"Session affinity disabled, binding_id: {binding_id}") From 22574bf2d3605d616ec5f1865a1109860e18d6ad Mon Sep 17 00:00:00 2001 From: tangou Date: Tue, 13 Jan 2026 09:16:14 +0800 Subject: [PATCH 7/7] feat(vefaas): fix tests --- tests/test_cloud.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_cloud.py b/tests/test_cloud.py index 0f82c195..c14ac087 100644 --- a/tests/test_cloud.py +++ b/tests/test_cloud.py @@ -48,6 +48,7 @@ async def test_cloud(): test_endpoint, "app-123", "func-456", + None, ) # Mock update operation