diff --git a/pyproject.toml b/pyproject.toml index 33f32109..ca0fc79d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "humanloop" [tool.poetry] name = "humanloop" -version = "0.8.40b5" +version = "0.8.40b6" description = "" readme = "README.md" authors = [] diff --git a/pytest.ini b/pytest.ini index 8ab80e5d..fc8cea7a 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,3 @@ [pytest] addopts = -n auto +asyncio_mode = auto \ No newline at end of file diff --git a/src/humanloop/client.py b/src/humanloop/client.py index 805b15d9..ca3054d2 100644 --- a/src/humanloop/client.py +++ b/src/humanloop/client.py @@ -10,8 +10,10 @@ from humanloop.base_client import AsyncBaseHumanloop, BaseHumanloop from humanloop.core.client_wrapper import SyncClientWrapper -from humanloop.decorators.flow import flow as flow_decorator_factory -from humanloop.decorators.prompt import prompt_decorator_factory +from humanloop.decorators.flow import a_flow_decorator_factory as a_flow_decorator_factory +from humanloop.decorators.flow import flow_decorator_factory as flow_decorator_factory +from humanloop.decorators.prompt import a_prompt_decorator_factory, prompt_decorator_factory +from humanloop.decorators.tool import a_tool_decorator_factory as a_tool_decorator_factory from humanloop.decorators.tool import tool_decorator_factory as tool_decorator_factory from humanloop.environment import HumanloopEnvironment from humanloop.evals import run_eval @@ -273,6 +275,50 @@ def call_llm(messages): """ return prompt_decorator_factory(path=path) + def a_prompt( + self, + *, + path: str, + ): + """Auto-instrument LLM providers and create [Prompt](https://humanloop.com/docs/explanation/prompts) + Logs on Humanloop from them, for async functions. + + ```python + @a_prompt(path="My Async Prompt") + async def call_llm_async(messages): + client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) + response = await client.chat.completions.create( + model="gpt-4o", + temperature=0.8, + frequency_penalty=0.5, + max_tokens=200, + messages=messages, + ) + return response.choices[0].message.content + + Calling the function above creates a new Log on Humanloop + against this Prompt version: + { + provider: "openai", + model: "gpt-4o", + endpoint: "chat", + max_tokens: 200, + temperature: 0.8, + frequency_penalty: 0.5, + } + ``` + + If a different model, endpoint, or hyperparameter is used, a new + Prompt version is created. + + :param path: The path where the Prompt is created. If not + provided, the function name is used as the path and the File + is created in the root of your Humanloop organization workspace. + + :param prompt_kernel: Attributes that define the Prompt. See `class:DecoratorPromptKernelRequestParams` + """ + return a_prompt_decorator_factory(path=path) + def tool( self, *, @@ -331,6 +377,64 @@ def calculator(a: int, b: Optional[int]) -> int: setup_values=setup_values, ) + def a_tool( + self, + *, + path: str, + attributes: Optional[dict[str, Any]] = None, + setup_values: Optional[dict[str, Any]] = None, + ): + """Manage async [Tool](https://humanloop.com/docs/explanation/tools) Files through code. + + The decorator inspects the wrapped async function's source code to infer the Tool's + JSON Schema. If the function declaration changes, a new Tool version + is upserted with an updated JSON Schema. + + For example: + + ```python + # Adding @a_tool on this function + @humanloop_client.a_tool(path="async_calculator") + async def async_calculator(a: int, b: Optional[int]) -> int: + \"\"\"Add two numbers together asynchronously.\"\"\" + return a + b + + # Creates a Tool with this JSON Schema: + { + strict: True, + function: { + "name": "async_calculator", + "description": "Add two numbers together asynchronously.", + "parameters": { + type: "object", + properties: { + a: {type: "integer"}, + b: {type: "integer"} + }, + required: ["a"], + }, + } + } + ``` + + The return value of the decorated function must be JSON serializable. + + If the function raises an exception, the created Log will have `output` + set to null, and the `error` field populated. + + :param path: The path of the File in the Humanloop workspace. + + :param setup_values: Values needed to setup the Tool, defined in [JSON Schema](https://json-schema.org/) + + :param attributes: Additional fields to describe the Tool. Helpful to separate Tool versions from each other with details on how they were created or used. + """ + return a_tool_decorator_factory( + opentelemetry_tracer=self._opentelemetry_tracer, + path=path, + attributes=attributes, + setup_values=setup_values, + ) + def flow( self, *, @@ -394,6 +498,70 @@ def agent(): attributes=attributes, ) + def a_flow( + self, + *, + path: str, + attributes: Optional[dict[str, Any]] = None, + ): + """Trace SDK logging calls through [Flows](https://humanloop.com/docs/explanation/flows) for async functions. + + Use it as the entrypoint of your async LLM feature. Logging calls like `prompts.call(...)`, + `tools.call(...)`, or other Humanloop decorators will be automatically added to the trace. + + For example: + + ```python + @a_prompt(template="You are an assistant on the following topics: {{topics}}.") + async def call_llm_async(messages): + client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) + response = await client.chat.completions.create( + model="gpt-4o", + temperature=0.8, + frequency_penalty=0.5, + max_tokens=200, + messages=messages, + ) + return response.choices[0].message.content + + @a_flow(attributes={"version": "v1"}) + async def async_agent(): + while True: + messages = [] + user_input = input("You: ") + if user_input == "exit": + break + messages.append({"role": "user", "content": user_input}) + response = await call_llm_async(messages) + messages.append({"role": "assistant", "content": response}) + print(f"Assistant: {response}") + ``` + + Each call to async_agent will create a trace corresponding to the conversation + session. Multiple Prompt Logs will be created as the LLM is called. They + will be added to the trace, allowing you to see the whole conversation + in the UI. + + If the function returns a ChatMessage-like object, the Log will + populate the `output_message` field. Otherwise, it will serialize + the return value and populate the `output` field. + + If an exception is raised, the output fields will be set to None + and the error message will be set in the Log's `error` field. + + :param path: The path to the Flow. If not provided, the function name + will be used as the path and the File will be created in the root + of your organization workspace. + + :param attributes: Additional fields to describe the Flow. Helpful to separate Flow versions from each other with details on how they were created or used. + """ + return a_flow_decorator_factory( + client=self, + opentelemetry_tracer=self._opentelemetry_tracer, + path=path, + attributes=attributes, + ) + def pull(self, path: Optional[str] = None, environment: Optional[str] = None) -> Tuple[List[str], List[str]]: """Pull Prompt and Agent files from Humanloop to local filesystem. diff --git a/src/humanloop/context.py b/src/humanloop/context.py index 70fd70d7..55735dd4 100644 --- a/src/humanloop/context.py +++ b/src/humanloop/context.py @@ -1,13 +1,14 @@ +import threading from contextlib import contextmanager from dataclasses import dataclass -import threading from typing import Any, Callable, Generator, Literal, Optional + from opentelemetry import context as context_api from humanloop.error import HumanloopRuntimeError from humanloop.otel.constants import ( - HUMANLOOP_CONTEXT_EVALUATION, HUMANLOOP_CONTEXT_DECORATOR, + HUMANLOOP_CONTEXT_EVALUATION, HUMANLOOP_CONTEXT_TRACE_ID, ) diff --git a/src/humanloop/core/client_wrapper.py b/src/humanloop/core/client_wrapper.py index 5edb2911..8d287d51 100644 --- a/src/humanloop/core/client_wrapper.py +++ b/src/humanloop/core/client_wrapper.py @@ -14,10 +14,10 @@ def __init__(self, *, api_key: str, base_url: str, timeout: typing.Optional[floa def get_headers(self) -> typing.Dict[str, str]: headers: typing.Dict[str, str] = { - "User-Agent": "humanloop/0.8.40b5", + "User-Agent": "humanloop/0.8.40b6", "X-Fern-Language": "Python", "X-Fern-SDK-Name": "humanloop", - "X-Fern-SDK-Version": "0.8.40b5", + "X-Fern-SDK-Version": "0.8.40b6", } headers["X-API-KEY"] = self.api_key return headers diff --git a/src/humanloop/decorators/flow.py b/src/humanloop/decorators/flow.py index f35a699c..4773c1fa 100644 --- a/src/humanloop/decorators/flow.py +++ b/src/humanloop/decorators/flow.py @@ -1,9 +1,10 @@ import logging +import typing from functools import wraps -from typing import Any, Callable, Optional, TypeVar -from typing_extensions import ParamSpec +from typing import Any, Awaitable, Callable, Literal, Optional, TypeVar, Union, overload from opentelemetry.trace import Span, Tracer +from typing_extensions import ParamSpec from humanloop.base_client import BaseHumanloop from humanloop.context import ( @@ -12,18 +13,18 @@ set_decorator_context, set_trace_id, ) -from humanloop.evals.run import HumanloopRuntimeError -from humanloop.types.chat_message import ChatMessage from humanloop.decorators.helpers import bind_args +from humanloop.evals.run import HumanloopRuntimeError from humanloop.evals.types import FileEvalConfig from humanloop.otel.constants import ( - HUMANLOOP_FILE_TYPE_KEY, - HUMANLOOP_LOG_KEY, HUMANLOOP_FILE_PATH_KEY, + HUMANLOOP_FILE_TYPE_KEY, HUMANLOOP_FLOW_SPAN_NAME, + HUMANLOOP_LOG_KEY, ) from humanloop.otel.helpers import process_output, write_to_opentelemetry_span from humanloop.requests import FlowKernelRequestParams as FlowDict +from humanloop.types.chat_message import ChatMessage from humanloop.types.flow_log_response import FlowLogResponse logger = logging.getLogger("humanloop.sdk") @@ -33,101 +34,271 @@ R = TypeVar("R") -def flow( +def flow_decorator_factory( client: "BaseHumanloop", opentelemetry_tracer: Tracer, path: str, attributes: Optional[dict[str, Any]] = None, ): - flow_kernel = {"attributes": attributes or {}} - def decorator(func: Callable[P, R]) -> Callable[P, Optional[R]]: decorator_path = path or func.__name__ file_type = "flow" + flow_kernel = {"attributes": attributes or {}} + + wrapper = _wrapper_factory( + client=client, + opentelemetry_tracer=opentelemetry_tracer, + func=func, + path=path, + flow_kernel=flow_kernel, + is_awaitable=False, + ) + + wrapper.file = FileEvalConfig( # type: ignore + path=decorator_path, + type=file_type, # type: ignore [arg-type, typeddict-item] + version=FlowDict(**flow_kernel), # type: ignore + callable=wrapper, + ) + + return wrapper + + return decorator + + +def a_flow_decorator_factory( + client: "BaseHumanloop", + opentelemetry_tracer: Tracer, + path: str, + attributes: Optional[dict[str, Any]] = None, +): + def decorator(func: Callable[P, Awaitable[R]]): + decorator_path = path or func.__name__ + file_type = "flow" + flow_kernel = {"attributes": attributes or {}} + + wrapper = _wrapper_factory( + client=client, + opentelemetry_tracer=opentelemetry_tracer, + func=func, + path=path, + flow_kernel=flow_kernel, + is_awaitable=True, + ) + + wrapper.file = FileEvalConfig( # type: ignore + path=decorator_path, + type=file_type, # type: ignore [arg-type, typeddict-item] + version=FlowDict(**flow_kernel), # type: ignore + callable=wrapper, + ) + + return wrapper + + return decorator + + +@overload +def _wrapper_factory( + client: "BaseHumanloop", + opentelemetry_tracer: Tracer, + func: Callable[P, Awaitable[R]], + path: str, + flow_kernel: dict[str, Any], + is_awaitable: Literal[True], +) -> Callable[P, Awaitable[Optional[R]]]: ... + + +@overload +def _wrapper_factory( + client: "BaseHumanloop", + opentelemetry_tracer: Tracer, + func: Callable[P, R], + path: str, + flow_kernel: dict[str, Any], + is_awaitable: Literal[False], +) -> Callable[P, Optional[R]]: ... + + +def _wrapper_factory( # type: ignore [misc] + client: "BaseHumanloop", + opentelemetry_tracer: Tracer, + func: Union[Callable[P, Awaitable[R]], Callable[P, R]], + path: str, + flow_kernel: dict[str, Any], + is_awaitable: bool, +): + if is_awaitable: + func = typing.cast(Callable[P, Awaitable[R]], func) + + @wraps(func) + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]: + span: Span + with set_decorator_context( + DecoratorContext( + path=path, + type="flow", + version=flow_kernel, + ) + ) as decorator_context: + with opentelemetry_tracer.start_as_current_span(HUMANLOOP_FLOW_SPAN_NAME) as span: # type: ignore + span, flow_log = _process_inputs( + client=client, + span=span, + decorator_context=decorator_context, + decorator_path=path, + file_type="flow", + func=func, + args=args, + kwargs=kwargs, + ) + + with set_trace_id(flow_log.id): + func_output: Optional[R] + try: + func_output = await func(*args, **kwargs) # type: ignore [misc] + error = None + except HumanloopRuntimeError as e: + # Critical error, re-raise + client.logs.delete(id=flow_log.id) + span.record_exception(e) + raise e + except Exception as e: + logger.error(f"Error calling {func.__name__}: {e}") + error = e + func_output = None + + _process_output( + func=func, + span=span, + func_output=func_output, + error=error, + flow_log=flow_log, + ) + + # Return the output of the decorated function + return func_output + else: + func = typing.cast(Callable[P, R], func) @wraps(func) def wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]: span: Span with set_decorator_context( DecoratorContext( - path=decorator_path, + path=path, type="flow", version=flow_kernel, ) ) as decorator_context: with opentelemetry_tracer.start_as_current_span(HUMANLOOP_FLOW_SPAN_NAME) as span: # type: ignore - span.set_attribute(HUMANLOOP_FILE_PATH_KEY, decorator_path) - span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, file_type) - trace_id = get_trace_id() - func_args = bind_args(func, args, kwargs) - - # Create the trace ahead so we have a parent ID to reference - init_log_inputs = { - "inputs": {k: v for k, v in func_args.items() if k != "messages"}, - "messages": func_args.get("messages"), - "trace_parent_id": trace_id, - } - this_flow_log: FlowLogResponse = client.flows._log( # type: ignore [attr-defined] - path=decorator_context.path, - flow=decorator_context.version, - log_status="incomplete", - **init_log_inputs, + span, flow_log = _process_inputs( + client=client, + span=span, + decorator_context=decorator_context, + decorator_path=path, + file_type="flow", + func=func, + args=args, + kwargs=kwargs, ) - with set_trace_id(this_flow_log.id): + with set_trace_id(flow_log.id): func_output: Optional[R] - log_output: Optional[str] - log_error: Optional[str] - log_output_message: Optional[ChatMessage] try: func_output = func(*args, **kwargs) - if ( - isinstance(func_output, dict) - and len(func_output.keys()) == 2 - and "role" in func_output - and "content" in func_output - ): - log_output_message = func_output # type: ignore [assignment] - log_output = None - else: - log_output = process_output(func=func, output=func_output) - log_output_message = None - log_error = None + error = None except HumanloopRuntimeError as e: # Critical error, re-raise - client.logs.delete(id=this_flow_log.id) + client.logs.delete(id=flow_log.id) span.record_exception(e) raise e except Exception as e: logger.error(f"Error calling {func.__name__}: {e}") - log_output = None - log_output_message = None - log_error = str(e) + error = e func_output = None - updated_flow_log = { - "log_status": "complete", - "output": log_output, - "error": log_error, - "output_message": log_output_message, - "id": this_flow_log.id, - } - # Write the Flow Log to the Span on HL_LOG_OT_KEY - write_to_opentelemetry_span( - span=span, # type: ignore [arg-type] - key=HUMANLOOP_LOG_KEY, - value=updated_flow_log, # type: ignore + _process_output( + func=func, + span=span, + func_output=func_output, + error=error, + flow_log=flow_log, ) + # Return the output of the decorated function - return func_output # type: ignore [return-value] + return func_output - wrapper.file = FileEvalConfig( # type: ignore - path=decorator_path, - type=file_type, # type: ignore [arg-type, typeddict-item] - version=FlowDict(**flow_kernel), # type: ignore - callable=wrapper, - ) + return wrapper - return wrapper - return decorator +def _process_inputs( + client: "BaseHumanloop", + span: Span, + decorator_context: DecoratorContext, + decorator_path: str, + file_type: str, + func: Callable[P, R], + args: tuple[Any, ...], + kwargs: dict[str, Any], +): + span.set_attribute(HUMANLOOP_FILE_PATH_KEY, decorator_path) + span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, file_type) + trace_id = get_trace_id() + func_args = bind_args(func, args, kwargs) + # Create the trace ahead so we have a parent ID to reference + init_log_inputs = { + "inputs": {k: v for k, v in func_args.items() if k != "messages"}, + "messages": func_args.get("messages"), + "trace_parent_id": trace_id, + } + flow_log: FlowLogResponse = client.flows._log( # type: ignore [attr-defined] + path=decorator_context.path, + flow=decorator_context.version, + log_status="incomplete", + **init_log_inputs, + ) + return span, flow_log + + +def _process_output( + func: Union[Callable[P, R], Callable[P, Awaitable[R]]], + span: Span, + func_output: Optional[R], + error: Optional[Exception], + flow_log: FlowLogResponse, +): + log_output: Optional[str] + log_error: Optional[str] + log_output_message: Optional[ChatMessage] + if not error: + if ( + isinstance(func_output, dict) + and len(func_output.keys()) == 2 + and "role" in func_output + and "content" in func_output + ): + log_output_message = func_output # type: ignore [assignment] + log_output = None + else: + log_output = process_output(func=func, output=func_output) + log_output_message = None + log_error = None + else: + log_output = None + log_output_message = None + log_error = str(error) + func_output = None + updated_flow_log = { + "log_status": "complete", + "output": log_output, + "error": log_error, + "output_message": log_output_message, + "id": flow_log.id, + } + # Write the Flow Log to the Span on HL_LOG_OT_KEY + write_to_opentelemetry_span( + span=span, # type: ignore [arg-type] + key=HUMANLOOP_LOG_KEY, + value=updated_flow_log, # type: ignore + ) diff --git a/src/humanloop/decorators/prompt.py b/src/humanloop/decorators/prompt.py index fa4a62ae..4c6ed1cd 100644 --- a/src/humanloop/decorators/prompt.py +++ b/src/humanloop/decorators/prompt.py @@ -1,8 +1,9 @@ -from functools import wraps import logging +import typing +from functools import wraps +from typing import Awaitable, Callable, Literal, TypeVar, Union, overload from typing_extensions import ParamSpec -from typing import Callable, TypeVar from humanloop.context import DecoratorContext, set_decorator_context from humanloop.evals.types import FileEvalConfig @@ -15,20 +16,33 @@ def prompt_decorator_factory(path: str): def decorator(func: Callable[P, R]) -> Callable[P, R]: - @wraps(func) - def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: - with set_decorator_context( - DecoratorContext( - path=path, - type="prompt", - version={ - # TODO: Implement a reverse-lookup of the template - "template": None, - }, - ) - ): - output = func(*args, **kwargs) - return output + wrapper = _wrapper_factory( + func=func, + path=path, + is_awaitable=False, + ) + + wrapper.file = FileEvalConfig( # type: ignore [attr-defined] + path=path, + type="prompt", + version={ # type: ignore [typeddict-item] + "template": None, + }, + callable=wrapper, + ) + + return wrapper + + return decorator + + +def a_prompt_decorator_factory(path: str): + def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]: + wrapper = _wrapper_factory( + func=func, + path=path, + is_awaitable=True, + ) wrapper.file = FileEvalConfig( # type: ignore [attr-defined] path=path, @@ -42,3 +56,72 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: return wrapper return decorator + + +@overload +def _wrapper_factory( + func: Callable[P, Awaitable[R]], + path: str, + is_awaitable: Literal[True], +) -> Callable[P, Awaitable[R]]: ... + + +@overload +def _wrapper_factory( + func: Callable[P, R], + path: str, + is_awaitable: Literal[False], +) -> Callable[P, R]: ... + + +def _wrapper_factory( # type: ignore [misc] + func: Union[Callable[P, Awaitable[R]], Callable[P, R]], + path: str, + is_awaitable: bool, +): + """Create a wrapper function for a prompt-decorated function. + + Args: + func: The function to decorate + path: The path to the prompt + is_awaitable: Whether the function is an async function + + Returns: + A wrapper function that sets up the decorator context + """ + if is_awaitable: + func = typing.cast(Callable[P, Awaitable[R]], func) + + @wraps(func) + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + with set_decorator_context( + DecoratorContext( + path=path, + type="prompt", + version={ + # TODO: Implement a reverse-lookup of the template + "template": None, + }, + ) + ): + output = await func(*args, **kwargs) # type: ignore [misc] + return output # type: ignore [return-value] + else: + func = typing.cast(Callable[P, R], func) + + @wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + with set_decorator_context( + DecoratorContext( + path=path, + type="prompt", + version={ + # TODO: Implement a reverse-lookup of the template + "template": None, + }, + ) + ): + output = func(*args, **kwargs) # type: ignore [misc] + return output # type: ignore [return-value] + + return wrapper diff --git a/src/humanloop/decorators/tool.py b/src/humanloop/decorators/tool.py index 43a73182..6599d189 100644 --- a/src/humanloop/decorators/tool.py +++ b/src/humanloop/decorators/tool.py @@ -7,10 +7,10 @@ from dataclasses import dataclass from functools import wraps from inspect import Parameter -from typing import Any, Callable, Literal, Mapping, Optional, Sequence, TypeVar, TypedDict, Union -from typing_extensions import ParamSpec +from typing import Any, Awaitable, Callable, Literal, Mapping, Optional, Sequence, TypedDict, TypeVar, Union, overload -from opentelemetry.trace import Tracer +from opentelemetry.trace import Span, Tracer +from typing_extensions import ParamSpec from humanloop.context import get_evaluation_context, get_trace_id from humanloop.decorators.helpers import bind_args @@ -18,9 +18,9 @@ from humanloop.evals.run import HumanloopRuntimeError from humanloop.otel.constants import ( HUMANLOOP_FILE_KEY, + HUMANLOOP_FILE_PATH_KEY, HUMANLOOP_FILE_TYPE_KEY, HUMANLOOP_LOG_KEY, - HUMANLOOP_FILE_PATH_KEY, ) from humanloop.otel.helpers import process_output, write_to_opentelemetry_span from humanloop.requests.tool_function import ToolFunctionParams @@ -55,73 +55,269 @@ def decorator(func: Callable[P, R]) -> Callable[P, Optional[R]]: # Mypy complains about adding attribute on function, but it's nice DX func.json_schema = tool_kernel["function"] # type: ignore + wrapper = _wrapper_factory( + opentelemetry_tracer=opentelemetry_tracer, + func=func, + path=path, + tool_kernel=tool_kernel, + is_awaitable=False, + ) + + wrapper.file = FileEvalConfig( # type: ignore + path=path, + type=file_type, # type: ignore [arg-type, typeddict-item] + version=tool_kernel, + callable=wrapper, + ) + + return wrapper + + return decorator + + +def a_tool_decorator_factory( + opentelemetry_tracer: Tracer, + path: str, + attributes: Optional[dict[str, Any]] = None, + setup_values: Optional[dict[str, Any]] = None, +): + def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[Optional[R]]]: + file_type = "tool" + + tool_kernel = _build_tool_kernel( + func=func, + attributes=attributes, + setup_values=setup_values, + strict=True, + ) + + # Mypy complains about adding attribute on function, but it's nice DX + func.json_schema = tool_kernel["function"] # type: ignore + + wrapper = _wrapper_factory( + opentelemetry_tracer=opentelemetry_tracer, + func=func, + path=path, + tool_kernel=tool_kernel, + is_awaitable=True, + ) + + wrapper.file = FileEvalConfig( # type: ignore + path=path, + type=file_type, # type: ignore [arg-type, typeddict-item] + version=tool_kernel, + callable=wrapper, + ) + + return wrapper + + return decorator + + +@overload +def _wrapper_factory( + opentelemetry_tracer: Tracer, + func: Callable[P, Awaitable[R]], + path: str, + tool_kernel: ToolKernelRequestParams, + is_awaitable: Literal[True], +) -> Callable[P, Awaitable[Optional[R]]]: ... + + +@overload +def _wrapper_factory( + opentelemetry_tracer: Tracer, + func: Callable[P, R], + path: str, + tool_kernel: ToolKernelRequestParams, + is_awaitable: Literal[False], +) -> Callable[P, Optional[R]]: ... + + +def _wrapper_factory( # type: ignore [misc] + opentelemetry_tracer: Tracer, + func: Union[Callable[P, Awaitable[R]], Callable[P, R]], + path: str, + tool_kernel: ToolKernelRequestParams, + is_awaitable: bool, +): + if is_awaitable: + func = typing.cast(Callable[P, Awaitable[R]], func) + @wraps(func) - def wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]: + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]: evaluation_context = get_evaluation_context() if evaluation_context is not None: if evaluation_context.path == path: raise HumanloopRuntimeError("Tools cannot be evaluated with the `evaluations.run()` utility.") with opentelemetry_tracer.start_as_current_span("humanloop.tool") as span: - # Write the Tool Kernel to the Span on HL_FILE_OT_KEY - write_to_opentelemetry_span( - span=span, # type: ignore [arg-type] - key=HUMANLOOP_FILE_KEY, - value=tool_kernel, # type: ignore [arg-type] + span, log_inputs = _process_inputs( + span=span, + opentelemetry_tracer=opentelemetry_tracer, + path=path, + tool_kernel=tool_kernel, + func=func, + args=args, + kwargs=kwargs, ) - span.set_attribute(HUMANLOOP_FILE_PATH_KEY, path) - span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, file_type) - log_inputs: dict[str, Any] = bind_args(func, args, kwargs) - log_error: Optional[str] - log_output: str + func_output: Optional[R] + try: + func_output = await func(*args, **kwargs) # type: ignore [misc] + error = None + except HumanloopRuntimeError as e: + # Critical error, re-raise + span.record_exception(e) + raise e + except Exception as e: + logger.error(f"Error calling {func.__name__}: {e}") + error = e + func_output = None + + _process_output( + span=span, + func=func, + func_output=func_output, + error=error, + log_inputs=log_inputs, + ) + + # Return the output of the decorated function + return func_output + else: + func = typing.cast(Callable[P, R], func) + + @wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]: + evaluation_context = get_evaluation_context() + if evaluation_context is not None: + if evaluation_context.path == path: + raise HumanloopRuntimeError("Tools cannot be evaluated with the `evaluations.run()` utility.") + with opentelemetry_tracer.start_as_current_span("humanloop.tool") as span: + span, log_inputs = _process_inputs( + span=span, + opentelemetry_tracer=opentelemetry_tracer, + path=path, + tool_kernel=tool_kernel, + func=func, + args=args, + kwargs=kwargs, + ) func_output: Optional[R] try: - func_output = func(*args, **kwargs) - log_output = process_output( - func=func, - output=func_output, - ) - log_error = None + func_output = func(*args, **kwargs) # type: ignore [misc] + error = None except HumanloopRuntimeError as e: # Critical error, re-raise + span.record_exception(e) raise e except Exception as e: logger.error(f"Error calling {func.__name__}: {e}") - output = None - log_output = process_output( - func=func, - output=output, - ) - log_error = str(e) - - # Populate Tool Log attributes - tool_log = { - "inputs": log_inputs, - "output": log_output, - "error": log_error, - "trace_parent_id": get_trace_id(), - } - # Write the Tool Log to the Span on HL_LOG_OT_KEY - write_to_opentelemetry_span( - span=span, # type: ignore [arg-type] - key=HUMANLOOP_LOG_KEY, - value=tool_log, # type: ignore [arg-type] + error = e + func_output = None + + _process_output( + span=span, + func=func, + func_output=func_output, + error=error, + log_inputs=log_inputs, ) # Return the output of the decorated function return func_output - wrapper.file = FileEvalConfig( # type: ignore - path=path, - type=file_type, # type: ignore [arg-type, typeddict-item] - version=tool_kernel, - callable=wrapper, - ) + return wrapper - return wrapper - return decorator +def _process_inputs( + span: Span, + opentelemetry_tracer: Tracer, + path: str, + tool_kernel: ToolKernelRequestParams, + func: Callable[P, R], + args: tuple[Any, ...], + kwargs: dict[str, Any], +) -> tuple[Span, dict[str, Any]]: + """Process inputs before executing the decorated function. + + This handles setting up the OpenTelemetry span and preparing the log inputs. + + Args: + span: The current OpenTelemetry span + opentelemetry_tracer: The OpenTelemetry tracer + path: The path to the tool + tool_kernel: The tool kernel request parameters + func: The decorated function + args: The positional arguments passed to the function + kwargs: The keyword arguments passed to the function + + Returns: + A tuple containing the span and the log inputs + """ + # Write the Tool Kernel to the Span on HL_FILE_OT_KEY + write_to_opentelemetry_span( + span=span, # type: ignore [arg-type] + key=HUMANLOOP_FILE_KEY, + value=tool_kernel, # type: ignore [arg-type] + ) + span.set_attribute(HUMANLOOP_FILE_PATH_KEY, path) + span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, "tool") + + log_inputs: dict[str, Any] = bind_args(func, args, kwargs) + + return span, log_inputs + + +def _process_output( + span: Span, + func: Union[Callable[P, R], Callable[P, Awaitable[R]]], + func_output: Optional[R], + error: Optional[Exception], + log_inputs: dict[str, Any], +) -> None: + """Process outputs after executing the decorated function. + + This handles processing the function output, error logging, and writing to the OpenTelemetry span. + + Args: + span: The current OpenTelemetry span + func: The decorated function + func_output: The output from the function execution + error: Any exception that occurred during function execution + log_inputs: The input parameters logged during processing + """ + log_error: Optional[str] + log_output: str + + if not error: + log_output = process_output( + func=func, + output=func_output, + ) + log_error = None + else: + output = None + log_output = process_output( + func=func, + output=output, + ) + log_error = str(error) + + # Populate Tool Log attributes + tool_log = { + "inputs": log_inputs, + "output": log_output, + "error": log_error, + "trace_parent_id": get_trace_id(), + } + # Write the Tool Log to the Span on HL_LOG_OT_KEY + write_to_opentelemetry_span( + span=span, # type: ignore [arg-type] + key=HUMANLOOP_LOG_KEY, + value=tool_log, # type: ignore [arg-type] + ) def _build_tool_kernel( diff --git a/src/humanloop/evals/run.py b/src/humanloop/evals/run.py index 4d641836..8af53606 100644 --- a/src/humanloop/evals/run.py +++ b/src/humanloop/evals/run.py @@ -8,8 +8,8 @@ not be called directly. """ +import asyncio import copy -from dataclasses import dataclass import inspect import json import logging @@ -19,11 +19,14 @@ import time import typing from concurrent.futures import Future, ThreadPoolExecutor +from dataclasses import dataclass from datetime import datetime from functools import partial from logging import INFO from typing import ( + Any, Callable, + Coroutine, Dict, List, Literal, @@ -34,19 +37,21 @@ Union, ) +from pydantic import ValidationError + from humanloop import EvaluatorResponse, FlowResponse, PromptResponse, ToolResponse from humanloop.agents.client import AgentsClient -from humanloop.core.api_error import ApiError from humanloop.context import ( EvaluationContext, get_evaluation_context, set_evaluation_context, ) +from humanloop.core.api_error import ApiError from humanloop.error import HumanloopRuntimeError from humanloop.evals.types import ( DatasetEvalConfig, - EvaluatorEvalConfig, EvaluatorCheck, + EvaluatorEvalConfig, FileEvalConfig, ) @@ -71,15 +76,13 @@ from humanloop.types import NumericEvaluatorStatsResponse as NumericStats from humanloop.types import PromptKernelRequest as Prompt from humanloop.types import ToolKernelRequest as Tool -from humanloop.types.agent_response import AgentResponse from humanloop.types.agent_kernel_request import AgentKernelRequest as Agent +from humanloop.types.agent_response import AgentResponse from humanloop.types.datapoint_response import DatapointResponse from humanloop.types.dataset_response import DatasetResponse from humanloop.types.evaluation_run_response import EvaluationRunResponse from humanloop.types.log_response import LogResponse from humanloop.types.run_stats_response import RunStatsResponse -from pydantic import ValidationError - if typing.TYPE_CHECKING: from humanloop.client import BaseHumanloop @@ -116,6 +119,8 @@ AgentsClient, ) +T = TypeVar("T") # Add TypeVar T definition + def print_error(message: str) -> None: """Print a formatted error message to stdout.""" @@ -453,7 +458,7 @@ def _resolve_file(client: "BaseHumanloop", file_config: FileEvalConfig) -> tuple except ApiError: if not version or not path or file_id: raise HumanloopRuntimeError( - "File does not exist on Humanloop. Please provide a `file.path` and a version to create a new version.", + "File does not exist on Humanloop. Please provide a `file.path` and a `file.version` to create a new version.", ) return _upsert_file(file_config=file_config, client=client), callable or None @@ -819,19 +824,58 @@ def _get_new_run( return evaluation, run +def run_coroutine_sync(coroutine: Coroutine[Any, Any, T], timeout: float = 30) -> T: + """Run a coroutine in a new event loop and return its result synchronously.""" + + def run_in_new_loop(): + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + try: + return new_loop.run_until_complete(asyncio.wait_for(coroutine, timeout=timeout)) + finally: + new_loop.close() + + # If we're already in an event loop, run in a new thread + try: + asyncio.get_running_loop() + # We're in an event loop, run in thread + import concurrent.futures + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + return executor.submit(run_in_new_loop).result() + except RuntimeError: + # No event loop running, we can use run_in_new_loop directly + return run_in_new_loop() + + def _call_function( function: Callable, type: FileType, datapoint: DatapointResponse, ) -> str: datapoint_dict = datapoint.dict() + + # Determine if the function is a coroutine function (async def) + is_awaitable = asyncio.iscoroutinefunction(function) + if "messages" in datapoint_dict and datapoint_dict["messages"] is not None: - output = function( - **datapoint_dict["inputs"], - messages=datapoint_dict["messages"], - ) + if is_awaitable: + coroutine = function( + **datapoint_dict["inputs"], + messages=datapoint_dict["messages"], + ) + output = run_coroutine_sync(coroutine) + else: + output = function( + **datapoint_dict["inputs"], + messages=datapoint_dict["messages"], + ) else: - output = function(**datapoint_dict["inputs"]) + if is_awaitable: + coroutine = function(**datapoint_dict["inputs"]) + output = run_coroutine_sync(coroutine) + else: + output = function(**datapoint_dict["inputs"]) if not isinstance(output, str): try: diff --git a/src/humanloop/otel/exporter/__init__.py b/src/humanloop/otel/exporter/__init__.py index 6596d152..e59137ec 100644 --- a/src/humanloop/otel/exporter/__init__.py +++ b/src/humanloop/otel/exporter/__init__.py @@ -1,23 +1,21 @@ import logging - import time import typing from queue import Empty as EmptyQueue from queue import Queue from threading import Thread -from typing import Optional, Sequence +from typing import Callable, Optional, Sequence +import requests from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult -import requests -from typing import Callable from humanloop.context import get_evaluation_context from humanloop.evals.run import HumanloopRuntimeError from humanloop.otel.constants import ( + HUMANLOOP_FILE_PATH_KEY, HUMANLOOP_FILE_TYPE_KEY, HUMANLOOP_LOG_KEY, - HUMANLOOP_FILE_PATH_KEY, ) from humanloop.otel.exporter.proto import serialize_span from humanloop.otel.helpers import ( @@ -27,7 +25,6 @@ write_to_opentelemetry_span, ) - if typing.TYPE_CHECKING: from humanloop.client import Humanloop diff --git a/src/humanloop/otel/exporter/proto.py b/src/humanloop/otel/exporter/proto.py index 85dde94b..bf606db8 100644 --- a/src/humanloop/otel/exporter/proto.py +++ b/src/humanloop/otel/exporter/proto.py @@ -1,13 +1,15 @@ -from opentelemetry.proto.common.v1.common_pb2 import KeyValue, AnyValue, InstrumentationScope +from google.protobuf.json_format import MessageToJson +from opentelemetry.proto.common.v1.common_pb2 import AnyValue, InstrumentationScope, KeyValue from opentelemetry.proto.trace.v1.trace_pb2 import ( - TracesData, ResourceSpans, ScopeSpans, + TracesData, +) +from opentelemetry.proto.trace.v1.trace_pb2 import ( Span as ProtoBufferSpan, ) -from google.protobuf.json_format import MessageToJson - from opentelemetry.sdk.trace import ReadableSpan + from humanloop.otel.helpers import is_llm_provider_call diff --git a/src/humanloop/otel/helpers.py b/src/humanloop/otel/helpers.py index 37ca8cea..737ed0d3 100644 --- a/src/humanloop/otel/helpers.py +++ b/src/humanloop/otel/helpers.py @@ -5,7 +5,6 @@ from opentelemetry.trace import SpanKind from opentelemetry.util.types import AttributeValue - NestedDict = dict[str, Union["NestedDict", AttributeValue]] NestedList = list[Union["NestedList", NestedDict]] diff --git a/src/humanloop/otel/processor.py b/src/humanloop/otel/processor.py index c04922ba..ffa0602c 100644 --- a/src/humanloop/otel/processor.py +++ b/src/humanloop/otel/processor.py @@ -1,15 +1,14 @@ import logging - from opentelemetry.sdk.trace import ReadableSpan, Span from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter from humanloop.context import get_decorator_context, get_evaluation_context, get_trace_id from humanloop.otel.constants import ( HUMANLOOP_FILE_KEY, + HUMANLOOP_FILE_PATH_KEY, HUMANLOOP_FILE_TYPE_KEY, HUMANLOOP_LOG_KEY, - HUMANLOOP_FILE_PATH_KEY, ) from humanloop.otel.helpers import is_llm_provider_call diff --git a/src/humanloop/overload.py b/src/humanloop/overload.py index ee8a8dc1..c986d752 100644 --- a/src/humanloop/overload.py +++ b/src/humanloop/overload.py @@ -264,7 +264,7 @@ def overload_client( def log_wrapper(self: T, **kwargs) -> LogResponseType: return _overload_log(self, file_syncer, use_local_files, **kwargs) - client.log = types.MethodType(log_wrapper, client) # type: ignore [method-assign, union-attr] + client.log = types.MethodType(log_wrapper, client) # type: ignore [method-assign] # Overload call method for Prompt and Agent clients if _get_file_type_from_client(client) in FileSyncer.SERIALIZABLE_FILE_TYPES: diff --git a/src/humanloop/prompt_utils.py b/src/humanloop/prompt_utils.py index 1747e286..55fcf38b 100644 --- a/src/humanloop/prompt_utils.py +++ b/src/humanloop/prompt_utils.py @@ -1,14 +1,12 @@ import copy -from typing import Any, Dict, List, Optional, TypeVar, Sequence import logging - import re +from typing import Any, Dict, List, Optional, Sequence, TypeVar -from .requests.chat_message import ChatMessageParams from .prompts.requests.prompt_request_template import ( PromptRequestTemplateParams, ) - +from .requests.chat_message import ChatMessageParams logger = logging.getLogger(__name__) diff --git a/tests/custom/integration/test_decorators.py b/tests/custom/integration/test_decorators.py index 59638896..e4be8e4a 100644 --- a/tests/custom/integration/test_decorators.py +++ b/tests/custom/integration/test_decorators.py @@ -1,7 +1,8 @@ +import asyncio import time from typing import Any -from openai import OpenAI +from openai import AsyncOpenAI, OpenAI from tests.custom.integration.conftest import GetHumanloopClientFn @@ -131,14 +132,17 @@ def test_flow_decorator_populates_output_message( try: humanloop_client = get_humanloop_client() + # GIVEN a flow that returns a ChatMessage like dict @humanloop_client.flow(path=f"{sdk_test_dir}/test_flow_log_output_message") def my_flow(question: str) -> dict[str, Any]: return {"role": "user", "content": question} + # WHEN the flow is called assert "france" in my_flow("What is the capital of the France?")["content"].lower() time.sleep(5) + # THEN the Flow is created and the Log has output_message populated flow_response = humanloop_client.files.retrieve_by_path(path=f"{sdk_test_dir}/test_flow_log_output_message") assert flow_response is not None flow_logs_response = humanloop_client.logs.list(file_id=flow_response.id, page=1, size=50) @@ -152,3 +156,155 @@ def my_flow(question: str) -> dict[str, Any]: flow_response = humanloop_client.files.retrieve_by_path(path=f"{sdk_test_dir}/test_flow_log_output_message") if flow_response is not None: humanloop_client.flows.delete(id=flow_response.id) + + +async def test_async_flow_decorator( + get_humanloop_client: GetHumanloopClientFn, + sdk_test_dir: str, +): + humanloop_client = get_humanloop_client() + + # GIVEN an async flow that returns a string + @humanloop_client.a_flow(path=f"{sdk_test_dir}/test_async_flow") + async def my_flow(question: str) -> str: + return "baz!" + + # WHEN the flow is called + assert "baz!" == await my_flow("test") + + # Wait for the flow and log to propagate to integration backend + await asyncio.sleep(3) + + # THEN the file exists on Humanloop + flow_file_response = humanloop_client.files.retrieve_by_path(path=f"{sdk_test_dir}/test_async_flow") + assert flow_file_response is not None + + # THEN a Log exists on the File + flow_logs_response = humanloop_client.logs.list( + file_id=flow_file_response.id, + page=1, + size=50, + ) + assert flow_logs_response.items is not None and len(flow_logs_response.items) == 1 + assert flow_logs_response.items[0].output == "baz!" and flow_logs_response.items[0].inputs == {"question": "test"} + + +async def test_async_tool_decorator( + get_humanloop_client: GetHumanloopClientFn, + sdk_test_dir: str, +): + humanloop_client = get_humanloop_client() + + # GIVEN an async tool that returns a string + @humanloop_client.a_tool(path=f"{sdk_test_dir}/test_async_tool") + async def my_tool(question: str) -> str: + return "baz!" + + # THEN the tool has a json_schema + assert hasattr(my_tool, "json_schema") + + # WHEN the tool is called + await my_tool("test") + + # Wait for the flow and log to propagate to integration backend + await asyncio.sleep(3) + + # THEN the file exists on Humanloop + tool_file_response = humanloop_client.files.retrieve_by_path(path=f"{sdk_test_dir}/test_async_tool") + assert tool_file_response is not None + + # THEN a Log exists on the File + tool_logs_response = humanloop_client.logs.list( + file_id=tool_file_response.id, + page=1, + size=50, + ) + assert tool_logs_response.items is not None and len(tool_logs_response.items) == 1 + assert tool_logs_response.items[0].output == "baz!" and tool_logs_response.items[0].inputs == {"question": "test"} + + +async def test_async_prompt_decorator( + get_humanloop_client: GetHumanloopClientFn, + openai_key: str, + sdk_test_dir: str, +): + humanloop_client = get_humanloop_client() + + # GIVEN an async prompt that calls OpenAI + @humanloop_client.a_prompt(path=f"{sdk_test_dir}/test_async_prompt") + async def my_prompt(question: str) -> str: + openai_client = AsyncOpenAI(api_key=openai_key) + + response = await openai_client.chat.completions.create( + model="gpt-4o-mini", + messages=[{"role": "user", "content": question}], + ) + + assert response.choices[0].message.content is not None + return response.choices[0].message.content + + # WHEN the prompt is called + # THEN the output is not null + output = await my_prompt("What is the capital of the France?") + assert output is not None + + # Wait for the prompt and log to propagate to integration backend + await asyncio.sleep(3) + + # THEN the file exists on Humanloop + prompt_file_response = humanloop_client.files.retrieve_by_path(path=f"{sdk_test_dir}/test_async_prompt") + assert prompt_file_response is not None + + # THEN a Log exists on the File + prompt_logs_response = humanloop_client.logs.list(file_id=prompt_file_response.id, page=1, size=50) + assert prompt_logs_response.items is not None and len(prompt_logs_response.items) == 1 + # THEN output_message matches the one intercepted from OpenAI response + assert prompt_logs_response.items[0].output_message.content == output # type: ignore [union-attr] + + +async def test_async_flow_decorator_with_trace( + get_humanloop_client: GetHumanloopClientFn, + openai_key: str, + sdk_test_dir: str, +): + humanloop_client = get_humanloop_client() + + # GIVEN async flow and prompt decorators + @humanloop_client.a_prompt(path=f"{sdk_test_dir}/test_async_prompt_with_trace") + async def my_prompt(question: str) -> str: + openai_client = AsyncOpenAI(api_key=openai_key) + + response = await openai_client.chat.completions.create( + model="gpt-4o-mini", + messages=[{"role": "user", "content": question}], + ) + + assert response.choices[0].message.content is not None + return response.choices[0].message.content + + @humanloop_client.a_flow(path=f"{sdk_test_dir}/test_async_flow_with_trace") + async def my_flow(question: str) -> str: + return await my_prompt(question="test") + + # WHEN the flow is called + await my_flow("test") + + # Wait for the flow and log to propagate to integration backend + await asyncio.sleep(3) + + # THEN both files exist on Humanloop + flow_file_response = humanloop_client.files.retrieve_by_path(path=f"{sdk_test_dir}/test_async_flow_with_trace") + assert flow_file_response is not None + prompt_file_response = humanloop_client.files.retrieve_by_path(path=f"{sdk_test_dir}/test_async_prompt_with_trace") + assert prompt_file_response is not None + + # THEN a Log exists on the File + flow_logs_response = humanloop_client.logs.list(file_id=flow_file_response.id, page=1, size=50) + assert flow_logs_response.items is not None and len(flow_logs_response.items) == 1 + assert flow_logs_response.items[0].output is not None and flow_logs_response.items[0].inputs == {"question": "test"} + flow_log_with_trace_response = humanloop_client.logs.get(id=flow_logs_response.items[0].id) + # THEN a Prompt Log is added to the Flow Log trace + assert ( + flow_log_with_trace_response["trace_children"] is not None # type: ignore [index] + and len(flow_log_with_trace_response["trace_children"]) == 1 # type: ignore [index] + ) diff --git a/tests/custom/integration/test_evals.py b/tests/custom/integration/test_evals.py index d8ba8996..88cc91a2 100644 --- a/tests/custom/integration/test_evals.py +++ b/tests/custom/integration/test_evals.py @@ -1,3 +1,4 @@ +import asyncio import time from typing import Any @@ -410,3 +411,99 @@ def test_agent_eval_works_upserting( evaluation_id = evaluations_response.items[0].id runs_response = humanloop_client.evaluations.list_runs_for_evaluation(id=evaluation_id) # type: ignore [attr-defined, arg-type] assert runs_response.runs[0].status == "completed" + + +async def test_async_eval_a_flow_decorator( + get_humanloop_client: GetHumanloopClientFn, + eval_dataset: ResourceIdentifiers, + output_not_null_evaluator: ResourceIdentifiers, + sdk_test_dir: str, +): + humanloop_client = get_humanloop_client() + flow_path = f"{sdk_test_dir}/Test Async Flow" + + # GIVEN an async flow with a decorator + @humanloop_client.a_flow(path=flow_path, attributes={"foo": "bar"}) + async def my_async_flow(question: str) -> str: + # Simulate async operation + await asyncio.sleep(0.1) + return "async result!" + + # WHEN we run an evaluation with the async flow + humanloop_client.evaluations.run( # type: ignore [attr-defined] + name="test_async_eval_run", + file={ + "path": flow_path, + "type": "flow", + "callable": my_async_flow, + }, + dataset={ + "path": eval_dataset.file_path, + }, + evaluators=[ + { + "path": output_not_null_evaluator.file_path, + } + ], + ) + + # Get the flow to verify it exists + flow = humanloop_client.files.retrieve_by_path(path=flow_path) + assert flow is not None + + # THEN the evaluation finishes successfully + evaluations_response = humanloop_client.evaluations.list(file_id=flow.id) + assert evaluations_response.items and len(evaluations_response.items) == 1 + evaluation_id = evaluations_response.items[0].id + runs_response = humanloop_client.evaluations.list_runs_for_evaluation(id=evaluation_id) + assert runs_response.runs[0].status == "completed" + + +async def test_eval_simple_async_callable( + get_humanloop_client: GetHumanloopClientFn, + eval_dataset: ResourceIdentifiers, + output_not_null_evaluator: ResourceIdentifiers, + sdk_test_dir: str, +): + humanloop_client = get_humanloop_client() + + flow_path = f"{sdk_test_dir}/Test Async Flow" + + # GIVEN a simple async callable + async def my_async_callable(question: str) -> str: + return "It's complicated don't worry about it!" + + # WHEN we run an evaluation with the async callable + humanloop_client.evaluations.run( # type: ignore [attr-defined] + name="test_async_eval_run", + file={ + "path": flow_path, + "type": "flow", + "version": { + "attributes": { + "foo": "bar", + }, + }, + "callable": my_async_callable, + }, + dataset={ + "path": eval_dataset.file_path, + }, + evaluators=[ + { + "path": output_not_null_evaluator.file_path, + } + ], + ) + + # THEN the Flow is created + flow = humanloop_client.files.retrieve_by_path(path=flow_path) + assert flow is not None + + # THEN the evaluation finishes successfully + evaluations_response = humanloop_client.evaluations.list(file_id=flow.id) + assert evaluations_response.items and len(evaluations_response.items) == 1 + # THEN the evaluation is completed + evaluation_id = evaluations_response.items[0].id + runs_response = humanloop_client.evaluations.list_runs_for_evaluation(id=evaluation_id) + assert runs_response.runs[0].status == "completed"