diff --git a/.github/chatmodes/create-gallery-example.chatmode.md b/.github/agents/create-gallery-example.chatmode.md similarity index 100% rename from .github/chatmodes/create-gallery-example.chatmode.md rename to .github/agents/create-gallery-example.chatmode.md diff --git a/.github/chatmodes/create-howto-example.chatmode.md b/.github/agents/create-howto-example.chatmode.md similarity index 100% rename from .github/chatmodes/create-howto-example.chatmode.md rename to .github/agents/create-howto-example.chatmode.md diff --git a/.github/chatmodes/documentation-writer.chatmode.md b/.github/agents/documentation-writer.chatmode.md similarity index 100% rename from .github/chatmodes/documentation-writer.chatmode.md rename to .github/agents/documentation-writer.chatmode.md diff --git a/docs/How To/Qtype Server/add_feedback_buttons.md b/docs/How To/Qtype Server/add_feedback_buttons.md new file mode 100644 index 00000000..39d45495 --- /dev/null +++ b/docs/How To/Qtype Server/add_feedback_buttons.md @@ -0,0 +1,40 @@ +# Add Feedback Buttons + +Collect user feedback (thumbs, ratings, or categories) directly in the QType UI by adding a `feedback` block to your flow. Feedback submission requires `telemetry` to be enabled so QType can attach the feedback to traces/spans. + +### QType YAML + +```yaml +flows: + - id: my_flow + interface: + type: Conversational + + feedback: + type: thumbs + explanation: true + +telemetry: + id: app_telemetry + provider: Phoenix + endpoint: http://localhost:6006/v1/traces +``` + +### Explanation + +- **flows[].feedback**: Enables a feedback widget on the flow’s outputs in the UI. +- **feedback.type**: Feedback widget type: `thumbs`, `rating`, or `category`. +- **feedback.explanation**: If `true`, prompts the user for an optional text explanation along with their feedback. +- **rating.scale**: For `rating` feedback, sets the maximum score (typically `5` or `10`). +- **category.categories**: For `category` feedback, the list of selectable labels. +- **category.allow_multiple**: For `category` feedback, allows selecting multiple labels. +- **telemetry**: Must be configured for feedback submission; QType records feedback as telemetry annotations. + +## See Also + +- [Serve Flows as UI](serve_flows_as_ui.md) +- [Use Conversational Interfaces](use_conversational_interfaces.md) +- [TelemetrySink Reference](../../components/TelemetrySink.md) +- [Example: Thumbs Feedback](../../../examples/feedback/thumbs_feedback_example.qtype.yaml) +- [Example: Rating Feedback](../../../examples/feedback/rating_feedback_example.qtype.yaml) +- [Example: Category Feedback](../../../examples/feedback/category_feedback_example.qtype.yaml) diff --git a/examples/feedback/category_feedback_example.qtype.yaml b/examples/feedback/category_feedback_example.qtype.yaml new file mode 100644 index 00000000..ea252fd8 --- /dev/null +++ b/examples/feedback/category_feedback_example.qtype.yaml @@ -0,0 +1,62 @@ +id: category_feedback_example +description: Example flow with categorical feedback collection + +flows: + - id: code_generator + description: Code generation with multi-category feedback + variables: + - id: requirement + type: text + - id: formatted_prompt + type: text + - id: generated_code + type: text + feedback: + type: category + categories: + - correct + - well_documented + - follows_best_practices + - efficient + - needs_improvement + allow_multiple: true + explanation: true + steps: + - type: PromptTemplate + id: prompt + template: | + Generate Python code for the following requirement: + + {requirement} + + Provide clean, well-documented code following Python best practices. + inputs: + - requirement + outputs: + - formatted_prompt + + - type: LLMInference + id: llm + model: nova + inputs: + - formatted_prompt + outputs: + - generated_code + inputs: + - requirement + outputs: + - generated_code + +models: + - id: nova + type: Model + provider: aws-bedrock + model_id: amazon.nova-pro-v1:0 + inference_params: + temperature: 0.2 + max_tokens: 1000 + +telemetry: + id: category_feedback_telemetry + provider: Phoenix + endpoint: http://localhost:6006/v1/traces diff --git a/examples/feedback/explode_feedback_example.qtype.yaml b/examples/feedback/explode_feedback_example.qtype.yaml new file mode 100644 index 00000000..34d21c36 --- /dev/null +++ b/examples/feedback/explode_feedback_example.qtype.yaml @@ -0,0 +1,72 @@ +id: explode_feedback_example +description: Example flow with Explode fan-out and feedback collection + +flows: + - id: topic_facts_generator + description: Generate interesting facts for multiple topics with feedback + variables: + - id: topics_json + type: text + - id: topics + type: list[text] + - id: topic + type: text + - id: formatted_prompt + type: text + - id: fact + type: text + feedback: + type: thumbs + explanation: true + steps: + - type: Decoder + id: decode + format: json + inputs: + - topics_json + outputs: + - topics + + - type: Explode + id: fan_out + inputs: + - topics + outputs: + - topic + + - type: PromptTemplate + id: prompt + template: | + Generate one interesting, concise fact about: {topic} + + Keep it to 1-2 sentences and make it engaging. + inputs: + - topic + outputs: + - formatted_prompt + + - type: LLMInference + id: llm + model: nova + inputs: + - formatted_prompt + outputs: + - fact + inputs: + - topics_json + outputs: + - fact + +models: + - id: nova + type: Model + provider: aws-bedrock + model_id: us.amazon.nova-lite-v1:0 + inference_params: + temperature: 0.7 + max_tokens: 200 + +telemetry: + id: explode_feedback_telemetry + provider: Phoenix + endpoint: http://localhost:6006/v1/traces diff --git a/examples/feedback/rating_feedback_example.qtype.yaml b/examples/feedback/rating_feedback_example.qtype.yaml new file mode 100644 index 00000000..ccf5c078 --- /dev/null +++ b/examples/feedback/rating_feedback_example.qtype.yaml @@ -0,0 +1,56 @@ +id: rating_feedback_example +description: Example flow with 1-10 rating scale feedback collection + +flows: + - id: document_summarizer + description: Document summarization with quality rating + variables: + - id: document_text + type: text + - id: formatted_prompt + type: text + - id: summary + type: text + feedback: + type: rating + scale: 10 + explanation: true + steps: + - type: PromptTemplate + id: prompt + template: | + Summarize the following document in 2-3 sentences: + + {document_text} + + Summary: + inputs: + - document_text + outputs: + - formatted_prompt + + - type: LLMInference + id: llm + model: nova + inputs: + - formatted_prompt + outputs: + - summary + inputs: + - document_text + outputs: + - summary + +models: + - id: nova + type: Model + provider: aws-bedrock + model_id: amazon.nova-pro-v1:0 + inference_params: + temperature: 0.2 + max_tokens: 1000 + +telemetry: + id: category_feedback_telemetry + provider: Phoenix + endpoint: http://localhost:6006/v1/traces diff --git a/examples/feedback/thumbs_feedback_example.qtype.yaml b/examples/feedback/thumbs_feedback_example.qtype.yaml new file mode 100644 index 00000000..d4b1f736 --- /dev/null +++ b/examples/feedback/thumbs_feedback_example.qtype.yaml @@ -0,0 +1,42 @@ +id: simple_thumbs_chatbot +description: A minimal chatbot example with thumbs up/down feedback + +flows: + - id: simple_chat + description: Simple conversational chatbot with feedback collection + variables: + - id: user_message + type: ChatMessage + - id: assistant_response + type: ChatMessage + interface: + type: Conversational + feedback: + type: thumbs + explanation: false + steps: + - type: LLMInference + id: chat_llm + model: nova + inputs: + - user_message + outputs: + - assistant_response + inputs: + - user_message + outputs: + - assistant_response + +models: + - id: nova + type: Model + provider: aws-bedrock + model_id: us.amazon.nova-lite-v1:0 + inference_params: + temperature: 0.9 + max_tokens: 300 + +telemetry: + id: chatbot_telemetry + provider: Phoenix + endpoint: http://localhost:6006/v1/traces \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 93a0cc66..fbe32fdb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,7 +71,7 @@ mcp = [ [dependency-groups] dev = [ - "arize-phoenix>=11.2.2", + "arize-phoenix>=12.35.0", "boto3>=1.34.0", "coverage>=7.0.0", "ipython>=8.37.0", diff --git a/qtype/dsl/model.py b/qtype/dsl/model.py index 27fe15ca..6c4679ed 100644 --- a/qtype/dsl/model.py +++ b/qtype/dsl/model.py @@ -5,13 +5,14 @@ from abc import ABC from enum import Enum from functools import partial -from typing import Annotated, Any, Literal, Type, Union +from typing import Annotated, Any, Literal, Type from pydantic import ( BaseModel, Field, RootModel, ValidationInfo, + field_validator, model_serializer, model_validator, ) @@ -600,6 +601,67 @@ class Agent(LLMInference): ) +class Feedback(StrictBaseModel): + """Base class for user feedback configurations on flow outputs.""" + + type: str = Field(..., description="Type of feedback widget to display.") + explanation: bool = Field( + default=False, + description="Whether to enable optional text explanation field.", + ) + + +class ThumbsFeedback(Feedback): + """Binary thumbs up/down feedback.""" + + type: Literal["thumbs"] = "thumbs" + + +class RatingFeedback(Feedback): + """Numerical rating feedback (1-5 or 1-10 scale).""" + + type: Literal["rating"] = "rating" + scale: int = Field( + default=5, + ge=2, + le=10, + description="Maximum value for rating scale (2-10).", + ) + + +class CategoryFeedback(Feedback): + """Categorical feedback with predefined tags.""" + + type: Literal["category"] = "category" + categories: list[str] = Field( + ..., + description="List of category labels users can select from.", + min_length=1, + ) + allow_multiple: bool = Field( + default=True, + description="Whether users can select multiple categories.", + ) + + @field_validator("categories") + @classmethod + def validate_categories_not_empty(cls, v: list[str]) -> list[str]: + """Validate that category strings are not empty.""" + for category in v: + if not category.strip(): + raise ValueError( + "Category labels must not be empty or whitespace-only" + ) + return v + + +# Create a union type for all feedback types (defined here before Flow) +FeedbackType = Annotated[ + ThumbsFeedback | RatingFeedback | CategoryFeedback, + Field(discriminator="type"), +] + + class Flow(StrictBaseModel): """Defines a flow of steps that can be executed in sequence or parallel. If input or output variables are not specified, they are inferred from @@ -616,6 +678,10 @@ class Flow(StrictBaseModel): ) interface: FlowInterface | None = Field(default=None) + feedback: FeedbackType | None = Field( + default=None, + description="Optional feedback configuration for collecting user ratings on flow outputs.", + ) variables: list[Variable] = Field( default_factory=list, description="List of variables available at the application scope.", @@ -1322,84 +1388,66 @@ class BedrockReranker(Reranker, ConcurrentStepMixin): # Create a union type for all tool types ToolType = Annotated[ - Union[ - APITool, - PythonFunctionTool, - ], + APITool | PythonFunctionTool, Field(discriminator="type"), ] # Create a union type for all source types -SourceType = Union[ - DocumentSource, - FileSource, - SQLSource, -] +SourceType = DocumentSource | FileSource | SQLSource # Create a union type for all authorization provider types -AuthProviderType = Union[ - APIKeyAuthProvider, - BearerTokenAuthProvider, - AWSAuthProvider, - OAuth2AuthProvider, - VertexAuthProvider, -] +AuthProviderType = ( + APIKeyAuthProvider + | BearerTokenAuthProvider + | AWSAuthProvider + | OAuth2AuthProvider + | VertexAuthProvider +) # Create a union type for all secret manager types SecretManagerType = Annotated[ - Union[ - AWSSecretManager - # Add future managers like KubernetesSecretManager here - ], + AWSSecretManager, # Add future managers like KubernetesSecretManager here Field(discriminator="type"), ] # Create a union type for all step types StepType = Annotated[ - Union[ - Agent, - Aggregate, - BedrockReranker, - Collect, - Construct, - Decoder, - DocToTextConverter, - DocumentEmbedder, - DocumentSearch, - DocumentSplitter, - DocumentSource, - Echo, - Explode, - FieldExtractor, - FileSource, - FileWriter, - IndexUpsert, - InvokeEmbedding, - InvokeFlow, - InvokeTool, - LLMInference, - PromptTemplate, - SQLSource, - VectorSearch, - ], + Agent + | Aggregate + | BedrockReranker + | Collect + | Construct + | Decoder + | DocToTextConverter + | DocumentEmbedder + | DocumentSearch + | DocumentSplitter + | DocumentSource + | Echo + | Explode + | FieldExtractor + | FileSource + | FileWriter + | IndexUpsert + | InvokeEmbedding + | InvokeFlow + | InvokeTool + | LLMInference + | PromptTemplate + | SQLSource + | VectorSearch, Field(discriminator="type"), ] # Create a union type for all index types IndexType = Annotated[ - Union[ - DocumentIndex, - VectorIndex, - ], + DocumentIndex | VectorIndex, Field(discriminator="type"), ] # Create a union type for all model types ModelType = Annotated[ - Union[ - EmbeddingModel, - Model, - ], + EmbeddingModel | Model, Field(discriminator="type"), ] @@ -1439,14 +1487,14 @@ class VariableList(RootModel[list[Variable]]): root: list[Variable] -DocumentType = Union[ - Application, - AuthorizationProviderList, - ModelList, - ToolList, - TypeList, - VariableList, -] +DocumentType = ( + Application + | AuthorizationProviderList + | ModelList + | ToolList + | TypeList + | VariableList +) class Document(RootModel[DocumentType]): diff --git a/qtype/interpreter/api.py b/qtype/interpreter/api.py index 5fe3091c..b1fa6a33 100644 --- a/qtype/interpreter/api.py +++ b/qtype/interpreter/api.py @@ -17,6 +17,7 @@ create_rest_endpoint, create_streaming_endpoint, ) +from qtype.interpreter.feedback_api import create_feedback_endpoint from qtype.interpreter.metadata_api import create_metadata_endpoints from qtype.semantic.model import Application @@ -110,6 +111,12 @@ async def shutdown_telemetry(): # Create metadata endpoints for flow discovery create_metadata_endpoints(app, self.definition) + # Create feedback submission endpoint + if self.definition.telemetry: + create_feedback_endpoint( + app, self.definition.telemetry, secret_manager + ) + # Create executor context context = ExecutorContext( secret_manager=secret_manager, diff --git a/qtype/interpreter/base/base_step_executor.py b/qtype/interpreter/base/base_step_executor.py index 01dea695..2743461f 100644 --- a/qtype/interpreter/base/base_step_executor.py +++ b/qtype/interpreter/base/base_step_executor.py @@ -2,6 +2,7 @@ import logging from abc import ABC, abstractmethod +from contextlib import nullcontext from typing import Any, AsyncIterator from aiostream import stream @@ -9,7 +10,7 @@ OpenInferenceSpanKindValues, SpanAttributes, ) -from opentelemetry import context, trace +from opentelemetry import trace from opentelemetry.trace import Status, StatusCode from qtype.interpreter.base.executor_context import ExecutorContext @@ -176,7 +177,10 @@ async def execute( Processed messages, with failed messages emitted first """ # Start a span for tracking - # Note: We manually manage the span lifecycle to allow yielding + # Note: We do NOT attach this span to context here to avoid + # making upstream steps children of this step when we consume + # the input stream. Instead, _process_message_with_telemetry + # will attach it when calling process_message(). span = self._tracer.start_span( f"step.{self.step.id}", attributes={ @@ -186,10 +190,8 @@ async def execute( }, ) - # Make this span the active context so child spans will nest under it - # Only attach if span is recording (i.e., real tracer is configured) - ctx = trace.set_span_in_context(span) - token = context.attach(ctx) if span.is_recording() else None + # Store span in self so _process_message_with_telemetry can access it + self._current_step_span = span # Initialize the cache # this is done once per execution so re-runs are fast @@ -287,10 +289,8 @@ async def process_item( span.set_status(Status(StatusCode.ERROR, f"Step failed: {e}")) raise finally: - # Detach the context and end the span - # Only detach if we successfully attached (span was recording) - if token is not None: - context.detach(token) + # Clean up step span reference and end the span + self._current_step_span = None span.end() @abstractmethod @@ -368,51 +368,72 @@ async def _process_message_with_telemetry( This method creates a child span for each message processing operation, automatically recording errors and success metrics. - The child span will automatically be nested under the current - active span in the context. + The child span will be nested under the step span. + + The step span context is attached here (not in execute()) to + ensure step spans are siblings under the flow span, not nested. """ - # Get current context and create child span within it - span = self._tracer.start_span( - f"step.{self.step.id}.process_message", - attributes={ - "session.id": message.session.session_id, - }, - ) + # Get step span and use context manager for proper handling + step_span = getattr(self, "_current_step_span", None) + if not step_span or not step_span.is_recording(): + step_span = None + + # Use context manager to attach step span + with ( + trace.use_span(step_span, end_on_exit=False) + if step_span + else nullcontext() + ): + # Create child span for this specific message processing + span = self._tracer.start_span( + f"step.{self.step.id}.process_message", + attributes={ + "session.id": message.session.session_id, + }, + ) - try: - output_count = 0 - error_occurred = False - - async for output_msg in self.process_message(message): - output_count += 1 - if output_msg.is_failed(): - error_occurred = True - span.add_event( - "message_failed", - { - "error": str(output_msg.error), - }, + try: + output_count = 0 + error_occurred = False + + async for output_msg in self.process_message(message): + output_count += 1 + if output_msg.is_failed(): + error_occurred = True + span.add_event( + "message_failed", + { + "error": str(output_msg.error), + }, + ) + # Enrich with process_message span for feedback tracking + span_context = span.get_span_context() + output_msg = output_msg.with_telemetry_metadata( + span_id=format(span_context.span_id, "016x"), + trace_id=format(span_context.trace_id, "032x"), ) - yield output_msg + yield output_msg - # Record processing metrics - span.set_attribute("message.outputs", output_count) + # Record processing metrics + span.set_attribute("message.outputs", output_count) - if error_occurred: + if error_occurred: + span.set_status( + Status( + StatusCode.ERROR, "Message processing had errors" + ) + ) + else: + span.set_status(Status(StatusCode.OK)) + + except Exception as e: + span.record_exception(e) span.set_status( - Status(StatusCode.ERROR, "Message processing had errors") + Status(StatusCode.ERROR, f"Processing failed: {e}") ) - else: - span.set_status(Status(StatusCode.OK)) - - except Exception as e: - span.record_exception(e) - span.set_status( - Status(StatusCode.ERROR, f"Processing failed: {e}") - ) - raise - finally: - span.end() + raise + finally: + span.end() async def finalize(self) -> AsyncIterator[FlowMessage]: """ diff --git a/qtype/interpreter/base/stream_emitter.py b/qtype/interpreter/base/stream_emitter.py index 6d8b5ebc..84fb86e4 100644 --- a/qtype/interpreter/base/stream_emitter.py +++ b/qtype/interpreter/base/stream_emitter.py @@ -35,6 +35,8 @@ async def process_message(self, message: FlowMessage): from typing import Any +from opentelemetry import trace + from qtype.interpreter.types import ( ErrorEvent, ReasoningStreamDeltaEvent, @@ -54,6 +56,24 @@ async def process_message(self, message: FlowMessage): from qtype.semantic.model import Step +def get_current_telemetry_metadata() -> dict[str, Any]: + """ + Get current OpenTelemetry span context as metadata dict. + + Returns: + Dictionary with span_id and trace_id if span is recording, + empty dict otherwise + """ + span = trace.get_current_span() + if span and span.is_recording(): + ctx = span.get_span_context() + return { + "span_id": format(ctx.span_id, "016x"), + "trace_id": format(ctx.trace_id, "032x"), + } + return {} + + class TextStreamContext: """ Async context manager for text streaming. @@ -83,7 +103,11 @@ async def __aenter__(self) -> TextStreamContext: """Emit TextStreamStartEvent when entering context.""" if self.on_stream_event: await self.on_stream_event( - TextStreamStartEvent(step=self.step, stream_id=self.stream_id) + TextStreamStartEvent( + step=self.step, + stream_id=self.stream_id, + metadata=get_current_telemetry_metadata(), + ) ) return self @@ -96,7 +120,11 @@ async def __aexit__( """Emit TextStreamEndEvent when exiting context.""" if self.on_stream_event: await self.on_stream_event( - TextStreamEndEvent(step=self.step, stream_id=self.stream_id) + TextStreamEndEvent( + step=self.step, + stream_id=self.stream_id, + metadata=get_current_telemetry_metadata(), + ) ) return False @@ -113,6 +141,7 @@ async def delta(self, text: str) -> None: step=self.step, stream_id=self.stream_id, delta=text, + metadata=get_current_telemetry_metadata(), ) ) @@ -158,7 +187,9 @@ async def __aexit__( if self._started and self.on_stream_event: await self.on_stream_event( ReasoningStreamEndEvent( - step=self.step, stream_id=self.stream_id + step=self.step, + stream_id=self.stream_id, + metadata=get_current_telemetry_metadata(), ) ) return False @@ -173,11 +204,15 @@ async def delta(self, text: str) -> None: text: The incremental reasoning content to append to the stream """ if self.on_stream_event: + metadata = get_current_telemetry_metadata() + # Emit start event on first delta if not self._started: await self.on_stream_event( ReasoningStreamStartEvent( - step=self.step, stream_id=self.stream_id + step=self.step, + stream_id=self.stream_id, + metadata=metadata, ) ) self._started = True @@ -187,6 +222,7 @@ async def delta(self, text: str) -> None: step=self.step, stream_id=self.stream_id, delta=text, + metadata=metadata, ) ) @@ -218,7 +254,11 @@ def __init__( async def __aenter__(self) -> StepBoundaryContext: """Emit StepStartEvent when entering context.""" if self.on_stream_event: - await self.on_stream_event(StepStartEvent(step=self.step)) + await self.on_stream_event( + StepStartEvent( + step=self.step, metadata=get_current_telemetry_metadata() + ) + ) return self async def __aexit__( @@ -229,7 +269,11 @@ async def __aexit__( ) -> bool: """Emit StepEndEvent when exiting context.""" if self.on_stream_event: - await self.on_stream_event(StepEndEvent(step=self.step)) + await self.on_stream_event( + StepEndEvent( + step=self.step, metadata=get_current_telemetry_metadata() + ) + ) return False @@ -277,6 +321,7 @@ async def __aenter__(self) -> ToolExecutionContext: tool_call_id=self.tool_call_id, tool_name=self.tool_name, tool_input=self.tool_input, + metadata=get_current_telemetry_metadata(), ) ) return self @@ -299,6 +344,7 @@ async def __aexit__( step=self.step, tool_call_id=self.tool_call_id, error_message=str(exc_val), + metadata=get_current_telemetry_metadata(), ) ) self._completed = True @@ -319,6 +365,7 @@ async def complete(self, output: Any) -> None: step=self.step, tool_call_id=self.tool_call_id, tool_output=output, + metadata=get_current_telemetry_metadata(), ) ) self._completed = True @@ -338,6 +385,7 @@ async def error(self, error_message: str) -> None: step=self.step, tool_call_id=self.tool_call_id, error_message=error_message, + metadata=get_current_telemetry_metadata(), ) ) self._completed = True @@ -387,6 +435,18 @@ def __init__( self.step = step self.on_stream_event = on_stream_event + def _get_span_context(self) -> tuple[str | None, str | None]: + """ + Get current OpenTelemetry span context. + + Returns: + Tuple of (span_id, trace_id) as hex strings, or (None, None) + + Deprecated: Use get_current_telemetry_metadata() instead. + """ + metadata = get_current_telemetry_metadata() + return metadata.get("span_id"), metadata.get("trace_id") + def text_stream(self, stream_id: str) -> TextStreamContext: """ Create a context manager for text streaming. @@ -459,7 +519,11 @@ async def status(self, message: str) -> None: """ if self.on_stream_event: await self.on_stream_event( - StatusEvent(step=self.step, message=message) + StatusEvent( + step=self.step, + message=message, + metadata=get_current_telemetry_metadata(), + ) ) async def error(self, error_message: str) -> None: @@ -471,5 +535,9 @@ async def error(self, error_message: str) -> None: """ if self.on_stream_event: await self.on_stream_event( - ErrorEvent(step=self.step, error_message=error_message) + ErrorEvent( + step=self.step, + error_message=error_message, + metadata=get_current_telemetry_metadata(), + ) ) diff --git a/qtype/interpreter/feedback_api.py b/qtype/interpreter/feedback_api.py new file mode 100644 index 00000000..62255903 --- /dev/null +++ b/qtype/interpreter/feedback_api.py @@ -0,0 +1,210 @@ +from __future__ import annotations + +import logging +from enum import Enum +from typing import Annotated, Any, Literal +from urllib.parse import urlparse + +from fastapi import FastAPI, HTTPException, status +from pydantic import BaseModel, Field + +from qtype.interpreter.base.secrets import SecretManagerBase +from qtype.semantic.model import TelemetrySink + +logger = logging.getLogger(__name__) + + +class TelemetryProvider(str, Enum): + """Supported telemetry providers.""" + + PHOENIX = "Phoenix" + LANGFUSE = "Langfuse" + + +def _format_feedback_label(feedback: FeedbackData) -> str: + """Format feedback data into a human-readable label.""" + if isinstance(feedback, ThumbsFeedbackData): + return "👍" if feedback.value else "👎" + elif isinstance(feedback, RatingFeedbackData): + return str(feedback.score) + elif isinstance(feedback, CategoryFeedbackData): + return ", ".join(feedback.categories) + return "unknown" + + +class ThumbsFeedbackData(BaseModel): + """Thumbs up/down feedback data.""" + + type: Literal["thumbs"] = "thumbs" + value: bool = Field( + ..., description="True for thumbs up, False for thumbs down." + ) + explanation: str | None = Field( + default=None, description="Optional text explanation for the feedback." + ) + + +class RatingFeedbackData(BaseModel): + """Numeric rating feedback data.""" + + type: Literal["rating"] = "rating" + score: int = Field(..., description="Numeric rating score (e.g., 1-5).") + explanation: str | None = Field( + default=None, description="Optional text explanation for the feedback." + ) + + +class CategoryFeedbackData(BaseModel): + """Category selection feedback data.""" + + type: Literal["category"] = "category" + categories: list[str] = Field( + ..., description="List of selected category labels." + ) + explanation: str | None = Field( + default=None, description="Optional text explanation for the feedback." + ) + + +FeedbackData = Annotated[ + ThumbsFeedbackData | RatingFeedbackData | CategoryFeedbackData, + Field(discriminator="type"), +] + + +class FeedbackRequest(BaseModel): + """Request model for submitting user feedback on a flow output.""" + + span_id: str = Field(..., description="Span ID of the output being rated.") + trace_id: str = Field(..., description="Trace ID of the flow execution.") + feedback: FeedbackData = Field( + ..., description="Feedback data (type determined by discriminator)." + ) + + +class FeedbackResponse(BaseModel): + """Response model for feedback submission.""" + + status: Literal["success"] = "success" + message: str = "Feedback submitted successfully" + + +def _create_telemetry_client( + telemetry: TelemetrySink, secret_manager: SecretManagerBase +) -> Any: + """ + Create telemetry client based on provider. + + Args: + telemetry: Telemetry sink configuration. + secret_manager: Secret manager for resolving secret references. + + Returns: + Provider-specific client instance, or None if not supported. + """ + if telemetry.provider == TelemetryProvider.PHOENIX.value: + from phoenix.client import Client + + # Resolve endpoint in case it's a secret reference + args = {"base_url": telemetry.endpoint} + args = secret_manager.resolve_secrets_in_dict( + args, f"telemetry sink '{telemetry.id}' endpoint" + ) + + # Phoenix Client expects just the base URL (e.g., http://localhost:6006) + # Parse the URL and reconstruct with just scheme and netloc (host:port) + parsed = urlparse(args["base_url"]) + base_url = f"{parsed.scheme}://{parsed.netloc}" + + return Client(base_url=base_url) + elif telemetry.provider == TelemetryProvider.LANGFUSE.value: + logger.warning( + "Langfuse feedback not yet implemented. " + "Feedback endpoint will not be created." + ) + return None + else: + logger.warning( + f"Feedback endpoint not created: unsupported telemetry " + f"provider '{telemetry.provider}'." + ) + return None + + +def create_feedback_endpoint( + app: FastAPI, telemetry: TelemetrySink, secret_manager: SecretManagerBase +) -> None: + """ + Register the feedback submission endpoint with the FastAPI application. + + This creates a POST /feedback endpoint that accepts feedback submissions + and forwards them to the configured telemetry backend. + + Args: + app: FastAPI application instance. + telemetry: Telemetry sink configuration. + secret_manager: Secret manager for resolving secret references. + """ + # Create client based on provider + client = _create_telemetry_client(telemetry, secret_manager) + if client is None: + return + + @app.post( + "/feedback", + response_model=FeedbackResponse, + tags=["feedback"], + summary="Submit user feedback on flow outputs", + description=( + "Submit user feedback (thumbs, rating, or category) on a " + "specific flow output. Feedback is sent to the telemetry " + "backend as span annotations." + ), + ) + async def submit_feedback(request: FeedbackRequest) -> FeedbackResponse: + """ + Submit user feedback on a flow output. + + The feedback is recorded as a span annotation in the telemetry backend. + """ + try: + if telemetry.provider == TelemetryProvider.PHOENIX.value: + # Submit to Phoenix using span annotations API + label = _format_feedback_label(request.feedback) + explanation = request.feedback.explanation + + # Calculate score based on feedback type + score = None + if isinstance(request.feedback, ThumbsFeedbackData): + score = 1.0 if request.feedback.value else 0.0 + elif isinstance(request.feedback, RatingFeedbackData): + score = float(request.feedback.score) + + client.spans.add_span_annotation( + span_id=request.span_id, + annotation_name="user_feedback", + label=label, + score=score, + explanation=explanation, + annotator_kind="HUMAN", + ) + + logger.info( + f"Feedback submitted to Phoenix for span {request.span_id}: " + f"{request.feedback.type} = {label}" + ) + + elif telemetry.provider == TelemetryProvider.LANGFUSE.value: + # TODO: Implement Langfuse feedback submission + raise NotImplementedError( + "Langfuse feedback not yet implemented" + ) + + return FeedbackResponse() + + except Exception as e: + logger.error(f"Failed to submit feedback: {e}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to submit feedback.", + ) from e diff --git a/qtype/interpreter/metadata_api.py b/qtype/interpreter/metadata_api.py index 3a52893a..e1f3c44a 100644 --- a/qtype/interpreter/metadata_api.py +++ b/qtype/interpreter/metadata_api.py @@ -5,6 +5,8 @@ from typing import Any from fastapi import FastAPI +from opentelemetry import trace +from opentelemetry.trace import NoOpTracerProvider from pydantic import BaseModel, Field from qtype.interpreter.typing import create_input_shape, create_output_shape @@ -43,6 +45,14 @@ class FlowMetadata(BaseModel): output_schema: dict[str, Any] = Field( ..., description="JSON schema for output" ) + feedback: dict[str, Any] | None = Field( + default=None, + description="Feedback configuration if enabled for this flow", + ) + telemetry_enabled: bool = Field( + default=False, + description="Whether telemetry is currently configured and recording", + ) def create_metadata_endpoints(app: FastAPI, application: Application) -> None: @@ -101,6 +111,15 @@ def _create_flow_metadata(flow: Flow) -> FlowMetadata: f"/flows/{flow.id}/stream" if flow.interface is not None else None ) + # Check if telemetry is enabled + provider = trace.get_tracer_provider() + telemetry_enabled = not isinstance(provider, NoOpTracerProvider) + + # Serialize feedback configuration if present + feedback_config = None + if flow.feedback: + feedback_config = flow.feedback.model_dump() + return FlowMetadata( id=flow.id, description=flow.description, @@ -112,4 +131,6 @@ def _create_flow_metadata(flow: Flow) -> FlowMetadata: ), input_schema=input_model.model_json_schema(), output_schema=output_model.model_json_schema(), + feedback=feedback_config, + telemetry_enabled=telemetry_enabled, ) diff --git a/qtype/interpreter/stream/chat/converter.py b/qtype/interpreter/stream/chat/converter.py index fced692e..a2d97beb 100644 --- a/qtype/interpreter/stream/chat/converter.py +++ b/qtype/interpreter/stream/chat/converter.py @@ -377,11 +377,22 @@ async def stream_events(): # Create converter for stateful event-to-chunk conversion converter = StreamEventConverter() + # Track telemetry metadata from events + telemetry_metadata: dict[str, str] = {} + # Process events and convert to chunks async for event in event_stream: if event is None: break # End of stream + # Extract telemetry metadata from first event that has it + if not telemetry_metadata and event.metadata: + if "span_id" in event.metadata and "trace_id" in event.metadata: + telemetry_metadata = { + "span_id": event.metadata["span_id"], + "trace_id": event.metadata["trace_id"], + } + # Convert event to chunks and yield as SSE for chunk in converter.convert(event): yield ( @@ -390,8 +401,15 @@ async def stream_events(): f"\n\n" ) - # End message stream with optional metadata - finish_chunk = FinishChunk(messageMetadata=output_metadata) # type: ignore[arg-type] + # Merge telemetry metadata with output_metadata for FinishChunk + final_metadata = {**telemetry_metadata} + if output_metadata: + final_metadata.update(output_metadata) + + # End message stream with metadata (includes telemetry) + finish_chunk = FinishChunk( + messageMetadata=final_metadata if final_metadata else None + ) yield ( f"data: " f"{finish_chunk.model_dump_json(by_alias=True, exclude_none=True)}" diff --git a/qtype/interpreter/types.py b/qtype/interpreter/types.py index d5790863..185ff588 100644 --- a/qtype/interpreter/types.py +++ b/qtype/interpreter/types.py @@ -38,7 +38,23 @@ def __bool__(self) -> bool: # and can be converted to Vercel UI chunks for frontend display -class TextStreamStartEvent(BaseModel): +class BaseStreamEvent(BaseModel): + """ + Base class for all stream events. + + Provides common metadata field for telemetry and other contextual data. + The metadata dict typically contains: + - span_id: OpenTelemetry span ID (16 hex chars) + - trace_id: OpenTelemetry trace ID (32 hex chars) + """ + + metadata: dict[str, Any] = Field( + default_factory=dict, + description="Metadata for telemetry and context tracking", + ) + + +class TextStreamStartEvent(BaseStreamEvent): """Signals the start of incremental text streaming. Use this when beginning to stream LLM-generated content or other @@ -55,7 +71,7 @@ class TextStreamStartEvent(BaseModel): ) -class TextStreamDeltaEvent(BaseModel): +class TextStreamDeltaEvent(BaseStreamEvent): """Carries an incremental chunk of text content. Use this for streaming LLM responses or other incremental text. @@ -72,7 +88,7 @@ class TextStreamDeltaEvent(BaseModel): delta: str = Field(description="Incremental text content to append") -class TextStreamEndEvent(BaseModel): +class TextStreamEndEvent(BaseStreamEvent): """Signals the completion of incremental text streaming. Use this to mark the end of a text stream. After this event, @@ -88,7 +104,7 @@ class TextStreamEndEvent(BaseModel): ) -class ReasoningStreamStartEvent(BaseModel): +class ReasoningStreamStartEvent(BaseStreamEvent): """Signals the start of incremental reasoning streaming. Use this when an agent begins outputting reasoning/thinking steps. @@ -105,7 +121,7 @@ class ReasoningStreamStartEvent(BaseModel): ) -class ReasoningStreamDeltaEvent(BaseModel): +class ReasoningStreamDeltaEvent(BaseStreamEvent): """Carries an incremental chunk of reasoning content. Use this for streaming agent reasoning/thinking steps. @@ -122,7 +138,7 @@ class ReasoningStreamDeltaEvent(BaseModel): delta: str = Field(description="Incremental reasoning content to append") -class ReasoningStreamEndEvent(BaseModel): +class ReasoningStreamEndEvent(BaseStreamEvent): """Signals the completion of incremental reasoning streaming. Use this to mark the end of a reasoning stream. After this event, @@ -138,7 +154,7 @@ class ReasoningStreamEndEvent(BaseModel): ) -class StatusEvent(BaseModel): +class StatusEvent(BaseStreamEvent): """Reports a complete status message from a step. Use this for non-streaming status updates like: @@ -155,7 +171,7 @@ class StatusEvent(BaseModel): message: str = Field(description="Complete status message to display") -class StepStartEvent(BaseModel): +class StepStartEvent(BaseStreamEvent): """Marks the beginning of a logical step boundary. Use this to group related events together visually in the UI. @@ -168,7 +184,7 @@ class StepStartEvent(BaseModel): step: Step -class StepEndEvent(BaseModel): +class StepEndEvent(BaseStreamEvent): """Marks the end of a logical step boundary. Use this to close a step boundary opened by StepStartEvent. @@ -180,7 +196,7 @@ class StepEndEvent(BaseModel): step: Step -class ToolExecutionStartEvent(BaseModel): +class ToolExecutionStartEvent(BaseStreamEvent): """Signals the start of tool execution. Use this when a tool is about to be invoked, either by an LLM @@ -198,7 +214,7 @@ class ToolExecutionStartEvent(BaseModel): ) -class ToolExecutionEndEvent(BaseModel): +class ToolExecutionEndEvent(BaseStreamEvent): """Signals the completion of tool execution. Use this when a tool has finished executing successfully. @@ -214,7 +230,7 @@ class ToolExecutionEndEvent(BaseModel): tool_output: Any = Field(description="Output returned by the tool") -class ToolExecutionErrorEvent(BaseModel): +class ToolExecutionErrorEvent(BaseStreamEvent): """Signals that tool execution failed. Use this when a tool encounters an error during execution. @@ -230,7 +246,7 @@ class ToolExecutionErrorEvent(BaseModel): error_message: str = Field(description="Description of the error") -class ErrorEvent(BaseModel): +class ErrorEvent(BaseStreamEvent): """Signals a general error occurred during step execution. Use this for errors that aren't specific to tool execution. @@ -330,11 +346,34 @@ class FlowMessage(BaseModel): description="Mapping of variable IDs to their values.", ) error: Optional[StepError] = None + metadata: dict[str, Any] = Field( + default_factory=dict, + description="Metadata for telemetry, span IDs, and other system-level data.", + ) def is_failed(self) -> bool: """Checks if this state has encountered an error.""" return self.error is not None + def with_telemetry_metadata( + self, span_id: str, trace_id: str + ) -> "FlowMessage": + """Create a copy with telemetry metadata added. + + Args: + span_id: OpenTelemetry span ID (16 hex chars) + trace_id: OpenTelemetry trace ID (32 hex chars) + + Returns: + New FlowMessage with telemetry metadata + """ + updated_metadata = { + **self.metadata, + "span_id": span_id, + "trace_id": trace_id, + } + return self.model_copy(update={"metadata": updated_metadata}) + def is_set(self, var_id: str) -> bool: """Check if a variable is set (not UNSET, may be None).""" value = self.variables.get(var_id, UNSET) diff --git a/qtype/interpreter/typing.py b/qtype/interpreter/typing.py index 61d95deb..8f85b51b 100644 --- a/qtype/interpreter/typing.py +++ b/qtype/interpreter/typing.py @@ -60,10 +60,19 @@ def _fields_from_variables(variables: list[Variable]) -> dict: def create_output_shape(flow: Flow) -> Type[BaseModel]: + fields = _fields_from_variables(flow.outputs) + # Add metadata field for telemetry (span_id, trace_id) + fields["metadata"] = ( + dict[str, Any], + Field( + default_factory=dict, + description="Telemetry metadata including span_id and trace_id", + ), + ) return create_model( f"{flow.id}Result", __base__=BaseModel, - **_fields_from_variables(flow.outputs), + **fields, ) # type: ignore @@ -133,7 +142,10 @@ def flow_results_to_output_container( errors.append(m.error.model_dump()) else: output_instance = output_shape(**m.variables) - outputs.append(output_instance.model_dump()) + output_dict = output_instance.model_dump() + # Merge metadata from FlowMessage + output_dict["metadata"] = {**output_dict["metadata"], **m.metadata} + outputs.append(output_dict) return output_container(outputs=outputs, errors=errors) diff --git a/qtype/semantic/checker.py b/qtype/semantic/checker.py index e0a51d58..bfffaab3 100644 --- a/qtype/semantic/checker.py +++ b/qtype/semantic/checker.py @@ -624,8 +624,24 @@ def _validate_application(application: Application) -> None: Raises: QTypeSemanticError: If SecretReference is used but secret_manager is not configured, or if secret_manager - configuration is invalid + configuration is invalid, or if feedback is configured + without telemetry """ + # Check if feedback is configured without telemetry + if application.telemetry is None: + flows_with_feedback = [ + flow.id for flow in application.flows if flow.feedback is not None + ] + if flows_with_feedback: + raise QTypeSemanticError( + ( + f"Application '{application.id}' has flows with feedback " + f"configured but no telemetry sink defined. " + f"Flows with feedback: {', '.join(flows_with_feedback)}. " + "Please add a telemetry configuration to the application." + ) + ) + if application.secret_manager is None: # Check if any SecretReference is used in the application if _has_secret_reference(application): @@ -707,30 +723,25 @@ def _validate_bindings( Raises: QTypeSemanticError: If any binding keys don't match valid IDs - """ # Check input_bindings + """ + # Check input_bindings for binding_key in input_bindings.keys(): if binding_key not in valid_input_ids: raise QTypeSemanticError( - ( - f"{step_type} step '{step_id}' has input_binding " - f"'{binding_key}' which does not match any {component_type} " - f"parameter. {component_type.capitalize()} '{component_id}' has input " - f"parameters: {sorted(valid_input_ids)}. {component_type.capitalize()} " - f"parameter '{binding_key}' not defined in {component_type}." - ) + f"{step_type} step '{step_id}' has input_binding " + f"'{binding_key}' which does not exist. " + f"Valid {component_type} '{component_id}' input parameters: " + f"{sorted(valid_input_ids)}" ) # Check output_bindings for binding_key in output_bindings.keys(): if binding_key not in valid_output_ids: raise QTypeSemanticError( - ( - f"{step_type} step '{step_id}' has output_binding " - f"'{binding_key}' which does not match any {component_type} " - f"parameter. {component_type.capitalize()} '{component_id}' has output " - f"parameters: {sorted(valid_output_ids)}. {component_type.capitalize()} " - f"parameter '{binding_key}' not defined in {component_type}." - ) + f"{step_type} step '{step_id}' has output_binding " + f"'{binding_key}' which does not exist. " + f"Valid {component_type} '{component_id}' output parameters: " + f"{sorted(valid_output_ids)}" ) diff --git a/qtype/semantic/generate.py b/qtype/semantic/generate.py index bef20737..48d745cc 100644 --- a/qtype/semantic/generate.py +++ b/qtype/semantic/generate.py @@ -31,14 +31,19 @@ def _is_dsl_type(type_obj: Any) -> bool: FIELDS_TO_IGNORE = {"Application.references"} TYPES_TO_IGNORE = { + "CategoryFeedback", "CustomType", "DecoderFormat", "Document", + "Feedback", + "FeedbackType", "ListType", "PrimitiveTypeEnum", + "RatingFeedback", "StrictBaseModel", - "TypeDefinition", + "ThumbsFeedback", "ToolParameter", + "TypeDefinition", "Variable", } @@ -141,10 +146,15 @@ def generate_semantic_model(args: argparse.Namespace) -> None: # Import enums, mixins, and type aliases from qtype.base.types import BatchableStepMixin, BatchConfig, CachedStepMixin, ConcurrencyConfig, ConcurrentStepMixin # noqa: F401 from qtype.dsl.model import ( # noqa: F401 + CategoryFeedback, CustomType, DecoderFormat, + Feedback, + FeedbackType, ListType, - PrimitiveTypeEnum + PrimitiveTypeEnum, + RatingFeedback, + ThumbsFeedback, ) from qtype.dsl.model import Variable as DSLVariable # noqa: F401 from qtype.dsl.model import VariableType # noqa: F401 diff --git a/qtype/semantic/model.py b/qtype/semantic/model.py index 4b79cdd6..f6b50657 100644 --- a/qtype/semantic/model.py +++ b/qtype/semantic/model.py @@ -28,10 +28,15 @@ ) from qtype.dsl.model import VariableType # noqa: F401 from qtype.dsl.model import ( # noqa: F401 + CategoryFeedback, CustomType, DecoderFormat, + Feedback, + FeedbackType, ListType, PrimitiveTypeEnum, + RatingFeedback, + ThumbsFeedback, ) from qtype.dsl.model import Variable as DSLVariable # noqa: F401 from qtype.semantic.base_types import ImmutableModel @@ -216,6 +221,12 @@ class Flow(BaseModel): description="List of steps or references to steps", ) interface: FlowInterface | None = Field(None) + feedback: ThumbsFeedback | RatingFeedback | CategoryFeedback | None = ( + Field( + None, + description="Optional feedback configuration for collecting user ratings on flow outputs.", + ) + ) variables: list[Variable] = Field( default_factory=list, description="List of variables available at the application scope.", diff --git a/schema/qtype.schema.json b/schema/qtype.schema.json index c59cd85f..798570d8 100644 --- a/schema/qtype.schema.json +++ b/schema/qtype.schema.json @@ -930,6 +930,44 @@ "title": "CacheConfig", "type": "object" }, + "CategoryFeedback": { + "additionalProperties": false, + "description": "Categorical feedback with predefined tags.", + "properties": { + "type": { + "const": "category", + "default": "category", + "title": "Type", + "type": "string" + }, + "explanation": { + "default": false, + "description": "Whether to enable optional text explanation field.", + "title": "Explanation", + "type": "boolean" + }, + "categories": { + "description": "List of category labels users can select from.", + "items": { + "type": "string" + }, + "minItems": 1, + "title": "Categories", + "type": "array" + }, + "allow_multiple": { + "default": true, + "description": "Whether users can select multiple categories.", + "title": "Allow Multiple", + "type": "boolean" + } + }, + "required": [ + "categories" + ], + "title": "CategoryFeedback", + "type": "object" + }, "Collect": { "additionalProperties": false, "description": "A step that collects all inputs and creates a single list to return.", @@ -2370,6 +2408,37 @@ ], "default": null }, + "feedback": { + "anyOf": [ + { + "discriminator": { + "mapping": { + "category": "#/$defs/CategoryFeedback", + "rating": "#/$defs/RatingFeedback", + "thumbs": "#/$defs/ThumbsFeedback" + }, + "propertyName": "type" + }, + "oneOf": [ + { + "$ref": "#/$defs/ThumbsFeedback" + }, + { + "$ref": "#/$defs/RatingFeedback" + }, + { + "$ref": "#/$defs/CategoryFeedback" + } + ] + }, + { + "type": "null" + } + ], + "default": null, + "description": "Optional feedback configuration for collecting user ratings on flow outputs.", + "title": "Feedback" + }, "variables": { "description": "List of variables available at the application scope.", "items": { @@ -3295,6 +3364,32 @@ "title": "PythonFunctionTool", "type": "object" }, + "RatingFeedback": { + "additionalProperties": false, + "description": "Numerical rating feedback (1-5 or 1-10 scale).", + "properties": { + "type": { + "const": "rating", + "default": "rating", + "title": "Type", + "type": "string" + }, + "explanation": { + "default": false, + "description": "Whether to enable optional text explanation field.", + "title": "Explanation", + "type": "boolean" + }, + "scale": { + "default": 5, + "description": "Maximum value for rating scale.", + "title": "Scale", + "type": "integer" + } + }, + "title": "RatingFeedback", + "type": "object" + }, "Reference_AWSAuthProvider_": { "properties": { "$ref": { @@ -3659,6 +3754,26 @@ "title": "TextWidget", "type": "string" }, + "ThumbsFeedback": { + "additionalProperties": false, + "description": "Binary thumbs up/down feedback.", + "properties": { + "type": { + "const": "thumbs", + "default": "thumbs", + "title": "Type", + "type": "string" + }, + "explanation": { + "default": false, + "description": "Whether to enable optional text explanation field.", + "title": "Explanation", + "type": "boolean" + } + }, + "title": "ThumbsFeedback", + "type": "object" + }, "ToolList": { "description": "Schema for a standalone list of tools.", "items": { diff --git a/tests/interpreter/test_step_executor.py b/tests/interpreter/test_step_executor.py index 13190203..10b56718 100644 --- a/tests/interpreter/test_step_executor.py +++ b/tests/interpreter/test_step_executor.py @@ -7,6 +7,7 @@ import pytest from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider from qtype.base.types import ConcurrencyConfig from qtype.interpreter.base.base_step_executor import StepExecutor @@ -388,6 +389,53 @@ async def test_finalize_hook(self, simple_step, session, executor_context): assert results[-1].variables.get("finalized") == "true" # Note: Progress count timing depends on stream implementation details + async def test_span_metadata_enrichment(self, simple_step, session): + """Test that span_id and trace_id are added to message metadata.""" + # Set up a real TracerProvider that records spans + tracer_provider = TracerProvider() + tracer = tracer_provider.get_tracer(__name__) + + # Create context with recording tracer + context = ExecutorContext( + secret_manager=NoOpSecretManager(), + tracer=tracer, + ) + + executor = MockExecutor( + simple_step, + context, + suffix="_processed", + ) + results = await collect_stream(executor, ["msg1", "msg2"], session) + + assert len(results) == 2 + + # Each message should have span_id and trace_id in metadata + for result in results: + assert "span_id" in result.metadata + assert "trace_id" in result.metadata + + # Verify they are valid hex strings + span_id = result.metadata["span_id"] + trace_id = result.metadata["trace_id"] + + # span_id should be 16 hex chars (64-bit) + assert len(span_id) == 16 + assert all(c in "0123456789abcdef" for c in span_id) + + # trace_id should be 32 hex chars (128-bit) + assert len(trace_id) == 32 + assert all(c in "0123456789abcdef" for c in trace_id) + + # Each message should have a unique process_message span_id + # (this allows per-message feedback instead of per-step) + span_ids = [r.metadata["span_id"] for r in results] + assert len(set(span_ids)) == len(results) + + # Trace ID should be the same for all messages in the same execution + trace_ids = [r.metadata["trace_id"] for r in results] + assert len(set(trace_ids)) == 1 + async def test_dependencies_injection(self, simple_step, executor_context): """Test that dependencies are injected and accessible.""" test_dep = {"key": "value"} diff --git a/tests/semantic/checker-error-specs/invalid_feedback_without_telemetry.qtype.yaml b/tests/semantic/checker-error-specs/invalid_feedback_without_telemetry.qtype.yaml new file mode 100644 index 00000000..d1dd5a60 --- /dev/null +++ b/tests/semantic/checker-error-specs/invalid_feedback_without_telemetry.qtype.yaml @@ -0,0 +1,27 @@ +id: test_feedback_without_telemetry +description: Test that feedback requires telemetry to be configured + +flows: + - id: chat + description: Flow with feedback but no telemetry (should fail validation) + interface: + type: Conversational + feedback: + type: thumbs + explanation: false + variables: + - id: msg + type: ChatMessage + steps: + - type: Echo + id: echo + inputs: + - msg + outputs: + - msg + inputs: + - msg + outputs: + - msg + +# Note: No telemetry configured - this should cause validation error diff --git a/tests/semantic/test_checker_validation.py b/tests/semantic/test_checker_validation.py index de3dace6..26d6838b 100644 --- a/tests/semantic/test_checker_validation.py +++ b/tests/semantic/test_checker_validation.py @@ -105,25 +105,29 @@ "invalid_secret_manager_wrong_auth_type.qtype.yaml", "AWSSecretManager 'my_secret_manager' requires an AWSAuthProvider", ), + ( + "invalid_feedback_without_telemetry.qtype.yaml", + "has flows with feedback configured but no telemetry sink defined", + ), ( "invalid_complete_flow_no_text_output.qtype.yaml", "final step 'echo_step' is of type 'Echo' which does not support streaming", ), ( "invalid_invoke_tool_wrong_input_binding.qtype.yaml", - "Tool parameter 'wrong_param' not defined in tool", + "which does not exist", ), ( "invalid_invoke_tool_wrong_output_binding.qtype.yaml", - "Tool parameter 'result' not defined in tool", + "which does not exist", ), ( "invalid_invoke_flow_wrong_input_binding.qtype.yaml", - "Flow parameter 'wrong_param' not defined in flow", + "which does not exist", ), ( "invalid_invoke_flow_wrong_output_binding.qtype.yaml", - "Flow parameter 'wrong_output' not defined in flow", + "which does not exist", ), ], ) diff --git a/tests/semantic/test_feedback_validation.py b/tests/semantic/test_feedback_validation.py new file mode 100644 index 00000000..56a02a19 --- /dev/null +++ b/tests/semantic/test_feedback_validation.py @@ -0,0 +1,116 @@ +"""Tests for semantic validation of feedback configurations.""" + +from __future__ import annotations + +from qtype.semantic import loader + + +class TestFeedbackSemanticValidation: + """Test semantic validation rules for feedback.""" + + def test_thumbs_feedback_loads_correctly(self, tmp_path): + """Test that thumbs feedback configuration loads and validates.""" + yaml_content = """ +id: test_app +flows: + - id: test_flow + feedback: + type: thumbs + explanation: true + steps: + - type: Echo + id: echo1 + +telemetry: + id: test_telemetry + provider: Phoenix + endpoint: http://localhost:6006/v1/traces +""" + yaml_file = tmp_path / "test.yaml" + yaml_file.write_text(yaml_content) + + app, _ = loader.load(str(yaml_file)) + flow = app.flows[0] + assert flow.feedback is not None + assert flow.feedback.type == "thumbs" + assert flow.feedback.explanation is True + + def test_rating_feedback_loads_correctly(self, tmp_path): + """Test that rating feedback configuration loads and validates.""" + yaml_content = """ +id: test_app +flows: + - id: test_flow + feedback: + type: rating + scale: 10 + explanation: false + steps: + - type: Echo + id: echo1 + +telemetry: + id: test_telemetry + provider: Phoenix + endpoint: http://localhost:6006/v1/traces +""" + yaml_file = tmp_path / "test.yaml" + yaml_file.write_text(yaml_content) + + app, _ = loader.load(str(yaml_file)) + flow = app.flows[0] + assert flow.feedback is not None + assert flow.feedback.type == "rating" + assert flow.feedback.scale == 10 + assert flow.feedback.explanation is False + + def test_category_feedback_loads_correctly(self, tmp_path): + """Test that category feedback configuration loads and validates.""" + yaml_content = """ +id: test_app +flows: + - id: test_flow + feedback: + type: category + categories: + - accurate + - helpful + - creative + allow_multiple: true + explanation: true + steps: + - type: Echo + id: echo1 + +telemetry: + id: test_telemetry + provider: Phoenix + endpoint: http://localhost:6006/v1/traces +""" + yaml_file = tmp_path / "test.yaml" + yaml_file.write_text(yaml_content) + + app, _ = loader.load(str(yaml_file)) + flow = app.flows[0] + assert flow.feedback is not None + assert flow.feedback.type == "category" + assert flow.feedback.categories == ["accurate", "helpful", "creative"] + assert flow.feedback.allow_multiple is True + assert flow.feedback.explanation is True + + def test_flow_without_feedback(self, tmp_path): + """Test that flows work without feedback configuration.""" + yaml_content = """ +id: test_app +flows: + - id: test_flow + steps: + - type: Echo + id: echo1 +""" + yaml_file = tmp_path / "test.yaml" + yaml_file.write_text(yaml_content) + + app, _ = loader.load(str(yaml_file)) + flow = app.flows[0] + assert flow.feedback is None diff --git a/ui/components/FlowResponseCard.tsx b/ui/components/FlowResponseCard.tsx index a469e136..d41c5d20 100644 --- a/ui/components/FlowResponseCard.tsx +++ b/ui/components/FlowResponseCard.tsx @@ -6,7 +6,9 @@ "use client"; +import { FeedbackButton } from "@/components/feedback"; import { Alert, AlertDescription } from "@/components/ui/Alert"; +import { getTelemetryIdsFromValue, METADATA_FIELD } from "@/lib/telemetry"; import { MarkdownContainer } from "./MarkdownContainer"; import { @@ -21,6 +23,7 @@ import { } from "./outputs"; import type { SchemaProperty, ResponseData } from "@/types"; +import type { FeedbackConfig } from "@/types/FlowMetadata"; interface ResponsePropertyProps { name: string; @@ -129,11 +132,15 @@ function ResponseProperty({ name, property, value }: ResponsePropertyProps) { interface FlowResponseCardProps { responseSchema?: SchemaProperty | null; responseData?: ResponseData; + feedbackConfig?: FeedbackConfig | null; + telemetryEnabled?: boolean; } export default function FlowResponseCard({ responseSchema, responseData, + feedbackConfig, + telemetryEnabled = false, }: FlowResponseCardProps) { if (!responseData) { return ( @@ -158,11 +165,14 @@ export default function FlowResponseCard({ ? (responseData as Record).outputs || responseData : responseData || {}; + const telemetryIds = getTelemetryIdsFromValue(responseData); + return (
{responseSchema.properties && - Object.entries(responseSchema.properties).map( - ([propertyName, propertySchema]) => { + Object.entries(responseSchema.properties) + .filter(([propertyName]) => propertyName !== METADATA_FIELD) + .map(([propertyName, propertySchema]) => { const value = (outputsData as Record)[ propertyName ]; @@ -179,8 +189,17 @@ export default function FlowResponseCard({ value={value} /> ); - }, - )} + })} + + {feedbackConfig && telemetryEnabled && telemetryIds && ( +
+ +
+ )}
); } diff --git a/ui/components/FlowResponseTable.tsx b/ui/components/FlowResponseTable.tsx index e9df5b0a..dfc9fec9 100644 --- a/ui/components/FlowResponseTable.tsx +++ b/ui/components/FlowResponseTable.tsx @@ -19,14 +19,19 @@ import { Download } from "lucide-react"; import Papa from "papaparse"; import { useMemo, useState } from "react"; +import { FeedbackButton } from "@/components/feedback"; import { Button } from "@/components/ui/Button"; import { Input } from "@/components/ui/Input"; +import { getTelemetryIdsFromMetadata, METADATA_FIELD } from "@/lib/telemetry"; import type { SchemaProperty, ResponseData } from "@/types"; +import type { FeedbackConfig } from "@/types/FlowMetadata"; interface FlowResponseTableProps { responseSchema?: SchemaProperty | null; outputs: ResponseData[]; + feedbackConfig?: FeedbackConfig | null; + telemetryEnabled?: boolean; } function formatCellValue(value: ResponseData, qtypeType?: string): string { @@ -61,6 +66,8 @@ function formatCellValue(value: ResponseData, qtypeType?: string): string { export default function FlowResponseTable({ responseSchema, outputs, + feedbackConfig, + telemetryEnabled = false, }: FlowResponseTableProps) { const [searchText, setSearchText] = useState(""); const [sorting, setSorting] = useState([]); @@ -78,18 +85,48 @@ export default function FlowResponseTable({ const columns = useMemo>[]>(() => { if (!responseSchema?.properties) return []; - return Object.entries(responseSchema.properties).map(([key, schema]) => { - const prop = schema as SchemaProperty; - return { - accessorKey: key, - header: prop.title || key, - cell: ({ row }) => { - const value = row.original[key]; - return formatCellValue(value, prop.qtype_type); + const dataColumns: ColumnDef>[] = + Object.entries(responseSchema.properties) + .filter(([key]) => key !== METADATA_FIELD) + .map(([key, schema]) => { + const prop = schema as SchemaProperty; + return { + accessorKey: key, + header: prop.title || key, + cell: (ctx) => { + const value = ctx.row.original[key]; + return formatCellValue(value, prop.qtype_type); + }, + }; + }); + + // Add feedback column if enabled + if (feedbackConfig && telemetryEnabled) { + dataColumns.push({ + id: "feedback", + header: "Feedback", + cell: (ctx) => { + const telemetryIds = getTelemetryIdsFromMetadata( + ctx.row.original[METADATA_FIELD], + ); + + if (!telemetryIds) { + return null; + } + + return ( + + ); }, - }; - }); - }, [responseSchema]); + }); + } + + return dataColumns; + }, [responseSchema, feedbackConfig, telemetryEnabled]); const table = useReactTable({ data, @@ -106,18 +143,23 @@ export default function FlowResponseTable({ }); const handleDownloadCSV = () => { + const exportableProperties = Object.entries( + responseSchema?.properties ?? {}, + ) + .filter(([key]) => key !== METADATA_FIELD) + .map(([key, schema]) => ({ + key, + schema: schema as SchemaProperty, + })); + const csvData = table.getFilteredRowModel().rows.map((row) => { const rowData: Record = {}; - columns.forEach((col) => { - const key = (col as { accessorKey: string }).accessorKey; - const propertySchema = responseSchema?.properties?.[key] as - | SchemaProperty - | undefined; - rowData[String(col.header)] = formatCellValue( - row.original[key], - propertySchema?.qtype_type, - ); - }); + + for (const { key, schema } of exportableProperties) { + const header = schema.title || key; + rowData[header] = formatCellValue(row.original[key], schema.qtype_type); + } + return rowData; }); diff --git a/ui/components/chat/MessageBubble.tsx b/ui/components/chat/MessageBubble.tsx index f1fbe8db..a00c2194 100644 --- a/ui/components/chat/MessageBubble.tsx +++ b/ui/components/chat/MessageBubble.tsx @@ -1,6 +1,8 @@ import { Bot, User } from "lucide-react"; +import { FeedbackButton } from "@/components/feedback"; import { Avatar, AvatarFallback } from "@/components/ui/Avatar"; +import { getTelemetryIdsFromMetadata } from "@/lib/telemetry"; import { MarkdownContainer } from "../MarkdownContainer"; import { Thinking } from "../outputs"; @@ -13,10 +15,13 @@ import { FileDisplay } from "."; import type { Message } from "./types"; import type { MessagePartWithText } from "./types/MessagePart"; import type { FileAttachment } from "@/types"; +import type { FeedbackConfig } from "@/types/FlowMetadata"; interface MessageBubbleProps { message: Message; isStreaming?: boolean; + feedbackConfig?: FeedbackConfig | null; + telemetryEnabled?: boolean; } interface StreamingPart { @@ -24,7 +29,12 @@ interface StreamingPart { [key: string]: unknown; } -function MessageBubble({ message, isStreaming = false }: MessageBubbleProps) { +function MessageBubble({ + message, + isStreaming = false, + feedbackConfig, + telemetryEnabled = false, +}: MessageBubbleProps) { const isUser = message.role === "user"; const reasoningContent = getPartContent( @@ -50,6 +60,15 @@ function MessageBubble({ message, isStreaming = false }: MessageBubbleProps) { isStreaming, ); + const telemetryIds = getTelemetryIdsFromMetadata(message.metadata); + + const showFeedback = + !isUser && + !isStreaming && + feedbackConfig && + telemetryEnabled && + telemetryIds; + return (
))} + + {showFeedback && ( +
+ +
+ )}
{isUser && ( diff --git a/ui/components/feedback/CategoryFeedback.tsx b/ui/components/feedback/CategoryFeedback.tsx new file mode 100644 index 00000000..554890b0 --- /dev/null +++ b/ui/components/feedback/CategoryFeedback.tsx @@ -0,0 +1,78 @@ +/** + * CategoryFeedback Component + * + * Categorical feedback with predefined tags (single or multi-select) + */ + +"use client"; + +import { Check } from "lucide-react"; +import { useState } from "react"; + +import { Button } from "@/components/ui/Button"; + +interface CategoryFeedbackProps { + categories: string[]; + allowMultiple: boolean; + onFeedback: (feedback: { type: "category"; categories: string[] }) => void; +} + +export function CategoryFeedback({ + categories, + allowMultiple, + onFeedback, +}: CategoryFeedbackProps) { + const [selectedCategories, setSelectedCategories] = useState>( + new Set(), + ); + + const handleCategoryClick = (category: string) => { + if (allowMultiple) { + // Multi-select mode + const newSelected = new Set(selectedCategories); + if (newSelected.has(category)) { + newSelected.delete(category); + } else { + newSelected.add(category); + } + setSelectedCategories(newSelected); + } else { + // Single-select mode - submit immediately + onFeedback({ type: "category", categories: [category] }); + } + }; + + const handleSubmit = () => { + if (selectedCategories.size > 0) { + onFeedback({ + type: "category", + categories: Array.from(selectedCategories), + }); + } + }; + + return ( +
+ {categories.map((category) => { + const isSelected = selectedCategories.has(category); + return ( + + ); + })} + {allowMultiple && selectedCategories.size > 0 && ( + + )} +
+ ); +} diff --git a/ui/components/feedback/FeedbackButton.tsx b/ui/components/feedback/FeedbackButton.tsx new file mode 100644 index 00000000..e5ad8bf5 --- /dev/null +++ b/ui/components/feedback/FeedbackButton.tsx @@ -0,0 +1,141 @@ +/** + * FeedbackButton Component + * + * Displays feedback UI based on flow configuration and handles submission to Phoenix + */ + +"use client"; + +import { Check, Loader2 } from "lucide-react"; +import { useState } from "react"; + +import { apiClient } from "@/lib/apiClient"; + +import { CategoryFeedback } from "./CategoryFeedback"; +import { FeedbackExplanationModal } from "./FeedbackExplanationModal"; +import { RatingFeedback } from "./RatingFeedback"; +import { ThumbsFeedback } from "./ThumbsFeedback"; + +import type { FeedbackData, FeedbackSubmission } from "@/types/Feedback"; +import type { FeedbackConfig } from "@/types/FlowMetadata"; + +interface FeedbackButtonProps { + feedbackConfig: FeedbackConfig; + spanId: string; + traceId: string; +} + +export function FeedbackButton({ + feedbackConfig, + spanId, + traceId, +}: FeedbackButtonProps) { + const [submitted, setSubmitted] = useState(false); + const [isSubmitting, setIsSubmitting] = useState(false); + const [error, setError] = useState(null); + const [showExplanation, setShowExplanation] = useState(false); + const [pendingFeedback, setPendingFeedback] = useState( + null, + ); + + if (submitted) { + return ( +
+ + Feedback submitted +
+ ); + } + + if (isSubmitting) { + return ( +
+ + Submitting... +
+ ); + } + + const handleFeedbackSubmit = async ( + feedback: FeedbackData, + explanation?: string, + ) => { + setIsSubmitting(true); + setError(null); + + try { + const feedbackWithExplanation: FeedbackData = explanation + ? { ...feedback, explanation } + : feedback; + + const submission: FeedbackSubmission = { + span_id: spanId, + trace_id: traceId, + feedback: feedbackWithExplanation, + }; + + await apiClient.submitFeedback(submission); + + setSubmitted(true); + setPendingFeedback(null); + setShowExplanation(false); + } catch (err) { + setError( + err instanceof Error ? err.message : "Failed to submit feedback", + ); + setPendingFeedback(null); + setShowExplanation(false); + } finally { + setIsSubmitting(false); + } + }; + + const handleFeedbackClick = (feedback: FeedbackData) => { + // If explanation is enabled, show modal first + if (feedbackConfig.explanation) { + setPendingFeedback(feedback); + setShowExplanation(true); + } else { + // Submit directly without explanation + handleFeedbackSubmit(feedback); + } + }; + + return ( +
+
+ {feedbackConfig.type === "thumbs" && ( + + )} + {feedbackConfig.type === "rating" && ( + + )} + {feedbackConfig.type === "category" && ( + + )} +
+ + {error &&
{error}
} + + {showExplanation && pendingFeedback && ( + { + setShowExplanation(false); + setPendingFeedback(null); + }} + onSubmit={(explanation) => { + handleFeedbackSubmit(pendingFeedback, explanation); + }} + /> + )} +
+ ); +} diff --git a/ui/components/feedback/FeedbackExplanationModal.tsx b/ui/components/feedback/FeedbackExplanationModal.tsx new file mode 100644 index 00000000..9592167c --- /dev/null +++ b/ui/components/feedback/FeedbackExplanationModal.tsx @@ -0,0 +1,79 @@ +/** + * FeedbackExplanationModal Component + * + * Optional modal for adding text explanation to feedback + */ + +"use client"; + +import { X } from "lucide-react"; +import { useEffect, useState } from "react"; + +import { Button } from "@/components/ui/Button"; +import { Card } from "@/components/ui/Card"; +import { Textarea } from "@/components/ui/textarea"; + +interface FeedbackExplanationModalProps { + isOpen: boolean; + onClose: () => void; + onSubmit: (explanation?: string) => void; +} + +export function FeedbackExplanationModal({ + isOpen, + onClose, + onSubmit, +}: FeedbackExplanationModalProps) { + const [explanation, setExplanation] = useState(""); + + useEffect(() => { + if (!isOpen) { + setExplanation(""); + } + }, [isOpen]); + + if (!isOpen) return null; + + const handleSubmit = () => { + onSubmit(explanation.trim() || undefined); + }; + + const handleSkip = () => { + onSubmit(undefined); + }; + + return ( +
+ +
+

Add Explanation (Optional)

+ +
+ +
+