Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 18 additions & 158 deletions veadk/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@

import uuid

from google.adk.agents import LlmAgent, RunConfig
from google.adk.agents import LlmAgent
from google.adk.agents.base_agent import BaseAgent
from google.adk.agents.context_cache_config import ContextCacheConfig
from google.adk.agents.llm_agent import InstructionProvider, ToolUnion
from google.adk.agents.run_config import StreamingMode
from google.adk.examples.base_example_provider import BaseExampleProvider
from google.adk.models.lite_llm import LiteLlm
from google.adk.runners import Runner
from google.genai import types
from pydantic import ConfigDict, Field
from typing_extensions import Any

Expand All @@ -42,7 +40,6 @@
DEFAULT_AGENT_NAME,
DEFAULT_MODEL_EXTRA_CONFIG,
)
from veadk.evaluation import EvalSetRecorder
from veadk.knowledgebase import KnowledgeBase
from veadk.memory.long_term_memory import LongTermMemory
from veadk.memory.short_term_memory import ShortTermMemory
Expand Down Expand Up @@ -87,6 +84,8 @@ class Agent(LlmAgent):
tracers (list[BaseTracer]): List of tracers used for telemetry and monitoring.
enable_authz (bool): Whether to enable agent authorization checks.
auto_save_session (bool): Whether to automatically save sessions to long-term memory.
skills (list[str]): List of skills that equip the agent with specific capabilities.
example_store (Optional[BaseExampleProvider]): Example store for providing example Q/A.
"""

model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow")
Expand Down Expand Up @@ -147,6 +146,8 @@ class Agent(LlmAgent):

skills: list[str] = Field(default_factory=list)

example_store: Optional[BaseExampleProvider] = None

def model_post_init(self, __context: Any) -> None:
super().model_post_init(None) # for sub_agents init

Expand Down Expand Up @@ -282,6 +283,11 @@ def model_post_init(self, __context: Any) -> None:
if self.skills:
self.load_skills()

if self.example_store:
from google.adk.tools.example_tool import ExampleTool

self.tools.append(ExampleTool(examples=self.example_store))

logger.info(f"VeADK version: {VERSION}")

logger.info(f"{self.__class__.__name__} `{self.name}` init done.")
Expand All @@ -297,18 +303,19 @@ def update_model(self, model_name: str):

def load_skills(self):
from pathlib import Path

from veadk.skills.skill import Skill
from veadk.skills.utils import (
load_skills_from_directory,
load_skills_from_cloud,
load_skills_from_directory,
)
from veadk.tools.builtin_tools.playwright import playwright_tools
from veadk.tools.skills_tools import (
SkillsTool,
bash_tool,
edit_file_tool,
read_file_tool,
write_file_tool,
edit_file_tool,
bash_tool,
)

skills: Dict[str, Skill] = {}
Expand Down Expand Up @@ -338,61 +345,6 @@ def load_skills(self):
self.tools.append(bash_tool)
self.tools.append(playwright_tools)

async def _run(
self,
runner,
user_id: str,
session_id: str,
message: types.Content,
stream: bool,
run_processor: Optional[BaseRunProcessor] = None,
):
"""Internal run method with run processor support.

Args:
runner: The Runner instance.
user_id: User ID for the session.
session_id: Session ID.
message: The message to send.
stream: Whether to stream the output.
run_processor: Optional run processor to use. If not provided, uses self.run_processor.

Returns:
The final output string.
"""
stream_mode = StreamingMode.SSE if stream else StreamingMode.NONE

# Use provided run_processor or fall back to instance's run_processor
processor = run_processor or self.run_processor

@processor.process_run(runner=runner, message=message)
async def event_generator():
async for event in runner.run_async(
user_id=user_id,
session_id=session_id,
new_message=message,
run_config=RunConfig(streaming_mode=stream_mode),
):
if event.get_function_calls():
for function_call in event.get_function_calls():
logger.debug(f"Function call: {function_call}")
elif (
event.content is not None
and event.content.parts[0].text is not None
and len(event.content.parts[0].text.strip()) > 0
):
yield event.content.parts[0].text

final_output = ""
async for chunk in event_generator():
if stream:
print(chunk, end="", flush=True)
final_output += chunk
if stream:
print() # end with a new line

return final_output

def _prepare_tracers(self):
enable_apmplus_tracer = os.getenv("ENABLE_APMPLUS", "false").lower() == "true"
enable_cozeloop_tracer = os.getenv("ENABLE_COZELOOP", "false").lower() == "true"
Expand Down Expand Up @@ -439,99 +391,7 @@ def _prepare_tracers(self):
f"Opentelemetry Tracer init {len(self.tracers[0].exporters)} exporters" # type: ignore
)

async def run(
self,
prompt: str | list[str],
stream: bool = False,
app_name: str = "veadk_app",
user_id: str = "veadk_user",
session_id="veadk_session",
load_history_sessions_from_db: bool = False,
db_url: str = "",
collect_runtime_data: bool = False,
eval_set_id: str = "",
save_session_to_memory: bool = False,
run_processor: Optional[BaseRunProcessor] = None,
):
"""Running the agent. The runner and session service will be created automatically.

For production, consider using Google-ADK runner to run agent, rather than invoking this method.

Args:
prompt (str | list[str]): The prompt to run the agent.
stream (bool, optional): Whether to stream the output. Defaults to False.
app_name (str, optional): The name of the application. Defaults to "veadk_app".
user_id (str, optional): The id of the user. Defaults to "veadk_user".
session_id (str, optional): The id of the session. Defaults to "veadk_session".
load_history_sessions_from_db (bool, optional): Whether to load history sessions from database. Defaults to False.
db_url (str, optional): The url of the database. Defaults to "".
collect_runtime_data (bool, optional): Whether to collect runtime data. Defaults to False.
eval_set_id (str, optional): The id of the eval set. Defaults to "".
save_session_to_memory (bool, optional): Whether to save this turn session to memory. Defaults to False.
run_processor (Optional[BaseRunProcessor], optional): Optional run processor to use for this run.
If not provided, uses the agent's default run_processor. Defaults to None.
"""

logger.warning(
"Running agent in this function is only for development and testing, do not use this function in production. For production, consider using `Google ADK Runner` to run agent, rather than invoking this method."
)
logger.info(
f"Run agent {self.name}: app_name: {app_name}, user_id: {user_id}, session_id: {session_id}."
)
prompt = [prompt] if isinstance(prompt, str) else prompt

# memory service
short_term_memory = ShortTermMemory(
backend="database" if load_history_sessions_from_db else "local",
db_url=db_url,
async def run(self, **kwargs):
raise NotImplementedError(
"Run method in VeADK agent is deprecated since version 0.5.6. Please use runner.run_async instead. Ref: https://agentkit.gitbook.io/docs/runner/overview"
)
session_service = short_term_memory.session_service
await short_term_memory.create_session(
app_name=app_name, user_id=user_id, session_id=session_id
)

# runner
runner = Runner(
agent=self,
app_name=app_name,
session_service=session_service,
memory_service=self.long_term_memory,
)

logger.info(f"Begin to process prompt {prompt}")
# run
final_output = ""
for _prompt in prompt:
message = types.Content(role="user", parts=[types.Part(text=_prompt)])
final_output = await self._run(
runner, user_id, session_id, message, stream, run_processor
)

# VeADK features
if save_session_to_memory:
assert self.long_term_memory is not None, (
"Long-term memory is not initialized in agent"
)
session = await session_service.get_session(
app_name=app_name,
user_id=user_id,
session_id=session_id,
)
if session:
await self.long_term_memory.add_session_to_memory(session)
logger.info(f"Add session `{session.id}` to your long-term memory.")
else:
logger.error(
f"Session {session_id} not found in session service, cannot save to long-term memory."
)

if collect_runtime_data:
eval_set_recorder = EvalSetRecorder(session_service, eval_set_id)
dump_path = await eval_set_recorder.dump(app_name, user_id, session_id)
self._dump_path = dump_path # just for test/debug/instrumentation

if self.tracers:
for tracer in self.tracers:
tracer.dump(user_id=user_id, session_id=session_id)

return final_output
108 changes: 108 additions & 0 deletions veadk/examples/in_memory_example_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from google.adk.examples.base_example_provider import BaseExampleProvider
from google.adk.examples.example import Example as ADKExample
from google.genai.types import Content, FunctionCall, Part
from typing_extensions import override

from veadk.examples.types import Example


class InMemoryExampleStore(BaseExampleProvider):
def __init__(
self,
name: str = "in_memory_example_store",
examples: list[Example | ADKExample] | None = None,
):
self.name = name
if examples:
self.examples: list[ADKExample] = self.convert_examples_to_adk_examples(
examples
)
else:
self.examples: list[ADKExample] = []

def add_example(self, example: Example | ADKExample):
"""Add an example to the provider.

Args:
example: A VeADK example or ADK example.
"""
self.examples.append(self.convert_examples_to_adk_examples([example])[0])

def convert_examples_to_adk_examples(
self,
examples: list[Example | ADKExample],
) -> list[ADKExample]:
"""Convert VeADK example to ADK example.

Args:
examples: A list of VeADK example or ADK example.

Returns:
A list of ADK example.
"""
adk_examples = []
for example in examples:
if isinstance(example, ADKExample):
adk_examples.append(example)
else:
output_string_content = (
Content(parts=[Part(text=example.expected_output)], role="model")
if example.expected_output
else None
)
output_fc_content = (
Content(
parts=[
Part(
function_call=FunctionCall(
name=example.expected_function_call.function_name,
args=example.expected_function_call.arguments,
)
)
],
role="model",
)
if example.expected_function_call
else None
)

output = []
if output_string_content:
output.append(output_string_content)
if output_fc_content:
output.append(output_fc_content)

adk_examples.append(
ADKExample(
input=Content(parts=[Part(text=example.input)], role="user"),
output=output,
)
)
return adk_examples

@override
def get_examples(self, query: str) -> list[ADKExample]:
"""Simply return all examples.

Args:
query: The query to get examples for.

Returns:
A list of Example objects.
"""
return self.examples
28 changes: 28 additions & 0 deletions veadk/examples/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any

from pydantic import BaseModel, Field


class ExampleFunctionCall(BaseModel):
function_name: str
arguments: dict[str, Any] = Field(default_factory=dict)


class Example(BaseModel):
input: str
expected_output: str | None = None
expected_function_call: ExampleFunctionCall | None = None