From 80479e905ebc629bf5ffadbd45e1518b3033bacd Mon Sep 17 00:00:00 2001 From: Lou Kratz <219901029+loukratz-bv@users.noreply.github.com> Date: Wed, 11 Feb 2026 09:31:07 -0500 Subject: [PATCH 1/4] feat: Add Feedback Option for Flows --- .../category_feedback_example.qtype.yaml | 62 ++++++ .../explode_feedback_example.qtype.yaml | 72 +++++++ .../rating_feedback_example.qtype.yaml | 56 ++++++ .../thumbs_feedback_example.qtype.yaml | 42 ++++ pyproject.toml | 2 +- qtype/dsl/model.py | 55 ++++++ qtype/interpreter/api.py | 7 + qtype/interpreter/base/base_step_executor.py | 115 ++++++----- qtype/interpreter/base/stream_emitter.py | 84 +++++++- qtype/interpreter/feedback_api.py | 185 ++++++++++++++++++ qtype/interpreter/metadata_api.py | 21 ++ qtype/interpreter/stream/chat/converter.py | 22 ++- qtype/interpreter/types.py | 46 +++-- qtype/interpreter/typing.py | 17 +- qtype/semantic/checker.py | 18 +- qtype/semantic/model.py | 40 ++++ schema/qtype.schema.json | 115 +++++++++++ tests/interpreter/test_step_executor.py | 48 +++++ ...alid_feedback_without_telemetry.qtype.yaml | 27 +++ tests/semantic/test_checker_validation.py | 4 + tests/semantic/test_feedback_validation.py | 101 ++++++++++ ui/components/FlowResponseCard.tsx | 49 ++++- ui/components/FlowResponseTable.tsx | 58 +++++- ui/components/chat/MessageBubble.tsx | 34 +++- ui/components/feedback/CategoryFeedback.tsx | 78 ++++++++ ui/components/feedback/FeedbackButton.tsx | 173 ++++++++++++++++ .../feedback/FeedbackExplanationModal.tsx | 72 +++++++ ui/components/feedback/RatingFeedback.tsx | 51 +++++ ui/components/feedback/ThumbsFeedback.tsx | 40 ++++ ui/components/feedback/index.ts | 5 + ui/components/flows/Chat.tsx | 2 + ui/components/flows/Rest.tsx | 4 + ui/lib/apiClient.ts | 19 +- ui/types/Feedback.ts | 37 ++++ ui/types/FlowMetadata.ts | 22 +++ uv.lock | 72 +++++-- 36 files changed, 1750 insertions(+), 105 deletions(-) create mode 100644 examples/feedback/category_feedback_example.qtype.yaml create mode 100644 examples/feedback/explode_feedback_example.qtype.yaml create mode 100644 examples/feedback/rating_feedback_example.qtype.yaml create mode 100644 examples/feedback/thumbs_feedback_example.qtype.yaml create mode 100644 qtype/interpreter/feedback_api.py create mode 100644 tests/semantic/checker-error-specs/invalid_feedback_without_telemetry.qtype.yaml create mode 100644 tests/semantic/test_feedback_validation.py create mode 100644 ui/components/feedback/CategoryFeedback.tsx create mode 100644 ui/components/feedback/FeedbackButton.tsx create mode 100644 ui/components/feedback/FeedbackExplanationModal.tsx create mode 100644 ui/components/feedback/RatingFeedback.tsx create mode 100644 ui/components/feedback/ThumbsFeedback.tsx create mode 100644 ui/components/feedback/index.ts create mode 100644 ui/types/Feedback.ts diff --git a/examples/feedback/category_feedback_example.qtype.yaml b/examples/feedback/category_feedback_example.qtype.yaml new file mode 100644 index 00000000..ea252fd8 --- /dev/null +++ b/examples/feedback/category_feedback_example.qtype.yaml @@ -0,0 +1,62 @@ +id: category_feedback_example +description: Example flow with categorical feedback collection + +flows: + - id: code_generator + description: Code generation with multi-category feedback + variables: + - id: requirement + type: text + - id: formatted_prompt + type: text + - id: generated_code + type: text + feedback: + type: category + categories: + - correct + - well_documented + - follows_best_practices + - efficient + - needs_improvement + allow_multiple: true + explanation: true + steps: + - type: PromptTemplate + id: prompt + template: | + Generate Python code for the following requirement: + + {requirement} + + Provide clean, well-documented code following Python best practices. + inputs: + - requirement + outputs: + - formatted_prompt + + - type: LLMInference + id: llm + model: nova + inputs: + - formatted_prompt + outputs: + - generated_code + inputs: + - requirement + outputs: + - generated_code + +models: + - id: nova + type: Model + provider: aws-bedrock + model_id: amazon.nova-pro-v1:0 + inference_params: + temperature: 0.2 + max_tokens: 1000 + +telemetry: + id: category_feedback_telemetry + provider: Phoenix + endpoint: http://localhost:6006/v1/traces diff --git a/examples/feedback/explode_feedback_example.qtype.yaml b/examples/feedback/explode_feedback_example.qtype.yaml new file mode 100644 index 00000000..34d21c36 --- /dev/null +++ b/examples/feedback/explode_feedback_example.qtype.yaml @@ -0,0 +1,72 @@ +id: explode_feedback_example +description: Example flow with Explode fan-out and feedback collection + +flows: + - id: topic_facts_generator + description: Generate interesting facts for multiple topics with feedback + variables: + - id: topics_json + type: text + - id: topics + type: list[text] + - id: topic + type: text + - id: formatted_prompt + type: text + - id: fact + type: text + feedback: + type: thumbs + explanation: true + steps: + - type: Decoder + id: decode + format: json + inputs: + - topics_json + outputs: + - topics + + - type: Explode + id: fan_out + inputs: + - topics + outputs: + - topic + + - type: PromptTemplate + id: prompt + template: | + Generate one interesting, concise fact about: {topic} + + Keep it to 1-2 sentences and make it engaging. + inputs: + - topic + outputs: + - formatted_prompt + + - type: LLMInference + id: llm + model: nova + inputs: + - formatted_prompt + outputs: + - fact + inputs: + - topics_json + outputs: + - fact + +models: + - id: nova + type: Model + provider: aws-bedrock + model_id: us.amazon.nova-lite-v1:0 + inference_params: + temperature: 0.7 + max_tokens: 200 + +telemetry: + id: explode_feedback_telemetry + provider: Phoenix + endpoint: http://localhost:6006/v1/traces diff --git a/examples/feedback/rating_feedback_example.qtype.yaml b/examples/feedback/rating_feedback_example.qtype.yaml new file mode 100644 index 00000000..ccf5c078 --- /dev/null +++ b/examples/feedback/rating_feedback_example.qtype.yaml @@ -0,0 +1,56 @@ +id: rating_feedback_example +description: Example flow with 1-10 rating scale feedback collection + +flows: + - id: document_summarizer + description: Document summarization with quality rating + variables: + - id: document_text + type: text + - id: formatted_prompt + type: text + - id: summary + type: text + feedback: + type: rating + scale: 10 + explanation: true + steps: + - type: PromptTemplate + id: prompt + template: | + Summarize the following document in 2-3 sentences: + + {document_text} + + Summary: + inputs: + - document_text + outputs: + - formatted_prompt + + - type: LLMInference + id: llm + model: nova + inputs: + - formatted_prompt + outputs: + - summary + inputs: + - document_text + outputs: + - summary + +models: + - id: nova + type: Model + provider: aws-bedrock + model_id: amazon.nova-pro-v1:0 + inference_params: + temperature: 0.2 + max_tokens: 1000 + +telemetry: + id: category_feedback_telemetry + provider: Phoenix + endpoint: http://localhost:6006/v1/traces diff --git a/examples/feedback/thumbs_feedback_example.qtype.yaml b/examples/feedback/thumbs_feedback_example.qtype.yaml new file mode 100644 index 00000000..d4b1f736 --- /dev/null +++ b/examples/feedback/thumbs_feedback_example.qtype.yaml @@ -0,0 +1,42 @@ +id: simple_thumbs_chatbot +description: A minimal chatbot example with thumbs up/down feedback + +flows: + - id: simple_chat + description: Simple conversational chatbot with feedback collection + variables: + - id: user_message + type: ChatMessage + - id: assistant_response + type: ChatMessage + interface: + type: Conversational + feedback: + type: thumbs + explanation: false + steps: + - type: LLMInference + id: chat_llm + model: nova + inputs: + - user_message + outputs: + - assistant_response + inputs: + - user_message + outputs: + - assistant_response + +models: + - id: nova + type: Model + provider: aws-bedrock + model_id: us.amazon.nova-lite-v1:0 + inference_params: + temperature: 0.9 + max_tokens: 300 + +telemetry: + id: chatbot_telemetry + provider: Phoenix + endpoint: http://localhost:6006/v1/traces \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 93a0cc66..fbe32fdb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,7 +71,7 @@ mcp = [ [dependency-groups] dev = [ - "arize-phoenix>=11.2.2", + "arize-phoenix>=12.35.0", "boto3>=1.34.0", "coverage>=7.0.0", "ipython>=8.37.0", diff --git a/qtype/dsl/model.py b/qtype/dsl/model.py index 27fe15ca..4fdefd94 100644 --- a/qtype/dsl/model.py +++ b/qtype/dsl/model.py @@ -600,6 +600,57 @@ class Agent(LLMInference): ) +class Feedback(StrictBaseModel): + """Base class for user feedback configurations on flow outputs.""" + + type: str = Field(..., description="Type of feedback widget to display.") + explanation: bool = Field( + default=False, + description="Whether to enable optional text explanation field.", + ) + + +class ThumbsFeedback(Feedback): + """Binary thumbs up/down feedback.""" + + type: Literal["thumbs"] = "thumbs" + + +class RatingFeedback(Feedback): + """Numerical rating feedback (1-5 or 1-10 scale).""" + + type: Literal["rating"] = "rating" + scale: int = Field( + default=5, description="Maximum value for rating scale." + ) + + +class CategoryFeedback(Feedback): + """Categorical feedback with predefined tags.""" + + type: Literal["category"] = "category" + categories: list[str] = Field( + ..., + description="List of category labels users can select from.", + min_length=1, + ) + allow_multiple: bool = Field( + default=True, + description="Whether users can select multiple categories.", + ) + + +# Create a union type for all feedback types (defined here before Flow) +FeedbackType = Annotated[ + Union[ + ThumbsFeedback, + RatingFeedback, + CategoryFeedback, + ], + Field(discriminator="type"), +] + + class Flow(StrictBaseModel): """Defines a flow of steps that can be executed in sequence or parallel. If input or output variables are not specified, they are inferred from @@ -616,6 +667,10 @@ class Flow(StrictBaseModel): ) interface: FlowInterface | None = Field(default=None) + feedback: FeedbackType | None = Field( + default=None, + description="Optional feedback configuration for collecting user ratings on flow outputs.", + ) variables: list[Variable] = Field( default_factory=list, description="List of variables available at the application scope.", diff --git a/qtype/interpreter/api.py b/qtype/interpreter/api.py index 5fe3091c..b1fa6a33 100644 --- a/qtype/interpreter/api.py +++ b/qtype/interpreter/api.py @@ -17,6 +17,7 @@ create_rest_endpoint, create_streaming_endpoint, ) +from qtype.interpreter.feedback_api import create_feedback_endpoint from qtype.interpreter.metadata_api import create_metadata_endpoints from qtype.semantic.model import Application @@ -110,6 +111,12 @@ async def shutdown_telemetry(): # Create metadata endpoints for flow discovery create_metadata_endpoints(app, self.definition) + # Create feedback submission endpoint + if self.definition.telemetry: + create_feedback_endpoint( + app, self.definition.telemetry, secret_manager + ) + # Create executor context context = ExecutorContext( secret_manager=secret_manager, diff --git a/qtype/interpreter/base/base_step_executor.py b/qtype/interpreter/base/base_step_executor.py index 01dea695..7440d427 100644 --- a/qtype/interpreter/base/base_step_executor.py +++ b/qtype/interpreter/base/base_step_executor.py @@ -176,7 +176,10 @@ async def execute( Processed messages, with failed messages emitted first """ # Start a span for tracking - # Note: We manually manage the span lifecycle to allow yielding + # Note: We do NOT attach this span to context here to avoid + # making upstream steps children of this step when we consume + # the input stream. Instead, _process_message_with_telemetry + # will attach it when calling process_message(). span = self._tracer.start_span( f"step.{self.step.id}", attributes={ @@ -186,10 +189,8 @@ async def execute( }, ) - # Make this span the active context so child spans will nest under it - # Only attach if span is recording (i.e., real tracer is configured) - ctx = trace.set_span_in_context(span) - token = context.attach(ctx) if span.is_recording() else None + # Store span in self so _process_message_with_telemetry can access it + self._current_step_span = span # Initialize the cache # this is done once per execution so re-runs are fast @@ -287,10 +288,8 @@ async def process_item( span.set_status(Status(StatusCode.ERROR, f"Step failed: {e}")) raise finally: - # Detach the context and end the span - # Only detach if we successfully attached (span was recording) - if token is not None: - context.detach(token) + # Clean up step span reference and end the span + self._current_step_span = None span.end() @abstractmethod @@ -368,51 +367,77 @@ async def _process_message_with_telemetry( This method creates a child span for each message processing operation, automatically recording errors and success metrics. - The child span will automatically be nested under the current - active span in the context. + The child span will be nested under the step span. + + The step span context is attached here (not in execute()) to + ensure step spans are siblings under the flow span, not nested. """ - # Get current context and create child span within it - span = self._tracer.start_span( - f"step.{self.step.id}.process_message", - attributes={ - "session.id": message.session.session_id, - }, - ) + # Attach step span context so process_message span becomes its child + step_span = getattr(self, "_current_step_span", None) + if step_span and step_span.is_recording(): + ctx = trace.set_span_in_context(step_span) + token = context.attach(ctx) + else: + token = None try: - output_count = 0 - error_occurred = False - - async for output_msg in self.process_message(message): - output_count += 1 - if output_msg.is_failed(): - error_occurred = True - span.add_event( - "message_failed", - { - "error": str(output_msg.error), - }, + # Create child span for this specific message processing + span = self._tracer.start_span( + f"step.{self.step.id}.process_message", + attributes={ + "session.id": message.session.session_id, + }, + ) + + try: + output_count = 0 + error_occurred = False + + async for output_msg in self.process_message(message): + output_count += 1 + if output_msg.is_failed(): + error_occurred = True + span.add_event( + "message_failed", + { + "error": str(output_msg.error), + }, + ) + # Enrich with process_message span for feedback tracking + span_context = span.get_span_context() + updated_metadata = { + **output_msg.metadata, + "span_id": format(span_context.span_id, "016x"), + "trace_id": format(span_context.trace_id, "032x"), + } + yield output_msg.model_copy( + update={"metadata": updated_metadata} ) - yield output_msg - # Record processing metrics - span.set_attribute("message.outputs", output_count) + # Record processing metrics + span.set_attribute("message.outputs", output_count) + + if error_occurred: + span.set_status( + Status( + StatusCode.ERROR, "Message processing had errors" + ) + ) + else: + span.set_status(Status(StatusCode.OK)) - if error_occurred: + except Exception as e: + span.record_exception(e) span.set_status( - Status(StatusCode.ERROR, "Message processing had errors") + Status(StatusCode.ERROR, f"Processing failed: {e}") ) - else: - span.set_status(Status(StatusCode.OK)) - - except Exception as e: - span.record_exception(e) - span.set_status( - Status(StatusCode.ERROR, f"Processing failed: {e}") - ) - raise + raise + finally: + span.end() finally: - span.end() + # Detach step span context + if token is not None: + context.detach(token) async def finalize(self) -> AsyncIterator[FlowMessage]: """ diff --git a/qtype/interpreter/base/stream_emitter.py b/qtype/interpreter/base/stream_emitter.py index 6d8b5ebc..84fb86e4 100644 --- a/qtype/interpreter/base/stream_emitter.py +++ b/qtype/interpreter/base/stream_emitter.py @@ -35,6 +35,8 @@ async def process_message(self, message: FlowMessage): from typing import Any +from opentelemetry import trace + from qtype.interpreter.types import ( ErrorEvent, ReasoningStreamDeltaEvent, @@ -54,6 +56,24 @@ async def process_message(self, message: FlowMessage): from qtype.semantic.model import Step +def get_current_telemetry_metadata() -> dict[str, Any]: + """ + Get current OpenTelemetry span context as metadata dict. + + Returns: + Dictionary with span_id and trace_id if span is recording, + empty dict otherwise + """ + span = trace.get_current_span() + if span and span.is_recording(): + ctx = span.get_span_context() + return { + "span_id": format(ctx.span_id, "016x"), + "trace_id": format(ctx.trace_id, "032x"), + } + return {} + + class TextStreamContext: """ Async context manager for text streaming. @@ -83,7 +103,11 @@ async def __aenter__(self) -> TextStreamContext: """Emit TextStreamStartEvent when entering context.""" if self.on_stream_event: await self.on_stream_event( - TextStreamStartEvent(step=self.step, stream_id=self.stream_id) + TextStreamStartEvent( + step=self.step, + stream_id=self.stream_id, + metadata=get_current_telemetry_metadata(), + ) ) return self @@ -96,7 +120,11 @@ async def __aexit__( """Emit TextStreamEndEvent when exiting context.""" if self.on_stream_event: await self.on_stream_event( - TextStreamEndEvent(step=self.step, stream_id=self.stream_id) + TextStreamEndEvent( + step=self.step, + stream_id=self.stream_id, + metadata=get_current_telemetry_metadata(), + ) ) return False @@ -113,6 +141,7 @@ async def delta(self, text: str) -> None: step=self.step, stream_id=self.stream_id, delta=text, + metadata=get_current_telemetry_metadata(), ) ) @@ -158,7 +187,9 @@ async def __aexit__( if self._started and self.on_stream_event: await self.on_stream_event( ReasoningStreamEndEvent( - step=self.step, stream_id=self.stream_id + step=self.step, + stream_id=self.stream_id, + metadata=get_current_telemetry_metadata(), ) ) return False @@ -173,11 +204,15 @@ async def delta(self, text: str) -> None: text: The incremental reasoning content to append to the stream """ if self.on_stream_event: + metadata = get_current_telemetry_metadata() + # Emit start event on first delta if not self._started: await self.on_stream_event( ReasoningStreamStartEvent( - step=self.step, stream_id=self.stream_id + step=self.step, + stream_id=self.stream_id, + metadata=metadata, ) ) self._started = True @@ -187,6 +222,7 @@ async def delta(self, text: str) -> None: step=self.step, stream_id=self.stream_id, delta=text, + metadata=metadata, ) ) @@ -218,7 +254,11 @@ def __init__( async def __aenter__(self) -> StepBoundaryContext: """Emit StepStartEvent when entering context.""" if self.on_stream_event: - await self.on_stream_event(StepStartEvent(step=self.step)) + await self.on_stream_event( + StepStartEvent( + step=self.step, metadata=get_current_telemetry_metadata() + ) + ) return self async def __aexit__( @@ -229,7 +269,11 @@ async def __aexit__( ) -> bool: """Emit StepEndEvent when exiting context.""" if self.on_stream_event: - await self.on_stream_event(StepEndEvent(step=self.step)) + await self.on_stream_event( + StepEndEvent( + step=self.step, metadata=get_current_telemetry_metadata() + ) + ) return False @@ -277,6 +321,7 @@ async def __aenter__(self) -> ToolExecutionContext: tool_call_id=self.tool_call_id, tool_name=self.tool_name, tool_input=self.tool_input, + metadata=get_current_telemetry_metadata(), ) ) return self @@ -299,6 +344,7 @@ async def __aexit__( step=self.step, tool_call_id=self.tool_call_id, error_message=str(exc_val), + metadata=get_current_telemetry_metadata(), ) ) self._completed = True @@ -319,6 +365,7 @@ async def complete(self, output: Any) -> None: step=self.step, tool_call_id=self.tool_call_id, tool_output=output, + metadata=get_current_telemetry_metadata(), ) ) self._completed = True @@ -338,6 +385,7 @@ async def error(self, error_message: str) -> None: step=self.step, tool_call_id=self.tool_call_id, error_message=error_message, + metadata=get_current_telemetry_metadata(), ) ) self._completed = True @@ -387,6 +435,18 @@ def __init__( self.step = step self.on_stream_event = on_stream_event + def _get_span_context(self) -> tuple[str | None, str | None]: + """ + Get current OpenTelemetry span context. + + Returns: + Tuple of (span_id, trace_id) as hex strings, or (None, None) + + Deprecated: Use get_current_telemetry_metadata() instead. + """ + metadata = get_current_telemetry_metadata() + return metadata.get("span_id"), metadata.get("trace_id") + def text_stream(self, stream_id: str) -> TextStreamContext: """ Create a context manager for text streaming. @@ -459,7 +519,11 @@ async def status(self, message: str) -> None: """ if self.on_stream_event: await self.on_stream_event( - StatusEvent(step=self.step, message=message) + StatusEvent( + step=self.step, + message=message, + metadata=get_current_telemetry_metadata(), + ) ) async def error(self, error_message: str) -> None: @@ -471,5 +535,9 @@ async def error(self, error_message: str) -> None: """ if self.on_stream_event: await self.on_stream_event( - ErrorEvent(step=self.step, error_message=error_message) + ErrorEvent( + step=self.step, + error_message=error_message, + metadata=get_current_telemetry_metadata(), + ) ) diff --git a/qtype/interpreter/feedback_api.py b/qtype/interpreter/feedback_api.py new file mode 100644 index 00000000..85b496dd --- /dev/null +++ b/qtype/interpreter/feedback_api.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +import logging +from typing import Annotated, Any, Literal, Union +from urllib.parse import urlparse + +from fastapi import FastAPI, HTTPException, status +from pydantic import BaseModel, Field + +from qtype.semantic.model import TelemetrySink + +logger = logging.getLogger(__name__) + + +def _format_feedback_label(feedback: FeedbackData) -> str: + """Format feedback data into a human-readable label.""" + if isinstance(feedback, ThumbsFeedbackData): + return "👍" if feedback.value else "👎" + elif isinstance(feedback, RatingFeedbackData): + return str(feedback.score) + elif isinstance(feedback, CategoryFeedbackData): + return ", ".join(feedback.categories) + return "unknown" + + +class ThumbsFeedbackData(BaseModel): + """Thumbs up/down feedback data.""" + + type: Literal["thumbs"] = "thumbs" + value: bool = Field( + ..., description="True for thumbs up, False for thumbs down." + ) + explanation: str | None = Field( + default=None, description="Optional text explanation for the feedback." + ) + + +class RatingFeedbackData(BaseModel): + """Numeric rating feedback data.""" + + type: Literal["rating"] = "rating" + score: int = Field(..., description="Numeric rating score (e.g., 1-5).") + explanation: str | None = Field( + default=None, description="Optional text explanation for the feedback." + ) + + +class CategoryFeedbackData(BaseModel): + """Category selection feedback data.""" + + type: Literal["category"] = "category" + categories: list[str] = Field( + ..., description="List of selected category labels." + ) + explanation: str | None = Field( + default=None, description="Optional text explanation for the feedback." + ) + + +FeedbackData = Annotated[ + Union[ThumbsFeedbackData, RatingFeedbackData, CategoryFeedbackData], + Field(discriminator="type"), +] + + +class FeedbackRequest(BaseModel): + """Request model for submitting user feedback on a flow output.""" + + span_id: str = Field(..., description="Span ID of the output being rated.") + trace_id: str = Field(..., description="Trace ID of the flow execution.") + feedback: FeedbackData = Field( + ..., description="Feedback data (type determined by discriminator)." + ) + + +class FeedbackResponse(BaseModel): + """Response model for feedback submission.""" + + status: Literal["success"] = "success" + message: str = "Feedback submitted successfully" + + +def create_feedback_endpoint( + app: FastAPI, telemetry: TelemetrySink, secret_manager: Any +) -> None: + """ + Register the feedback submission endpoint with the FastAPI application. + + This creates a POST /feedback endpoint that accepts feedback submissions + and forwards them to the configured telemetry backend. + + Args: + app: FastAPI application instance. + telemetry: Telemetry sink configuration. + secret_manager: Secret manager for resolving secret references. + """ + # Create client based on provider + client = None + + if telemetry.provider == "Phoenix": + from phoenix.client import Client + + # Resolve endpoint in case it's a secret reference + args = {"base_url": telemetry.endpoint} + args = secret_manager.resolve_secrets_in_dict( + args, f"telemetry sink '{telemetry.id}' endpoint" + ) + + # Phoenix Client expects just the base URL (e.g., http://localhost:6006) + # Parse the URL and reconstruct with just scheme and netloc (host:port) + parsed = urlparse(args["base_url"]) + base_url = f"{parsed.scheme}://{parsed.netloc}" + + client = Client(base_url=base_url) + elif telemetry.provider == "Langfuse": + logger.warning( + "Langfuse feedback not yet implemented. " + "Feedback endpoint will not be created." + ) + return + else: + logger.warning( + f"Feedback endpoint not created: unsupported telemetry " + f"provider '{telemetry.provider}'." + ) + return + + @app.post( + "/feedback", + response_model=FeedbackResponse, + tags=["feedback"], + summary="Submit user feedback on flow outputs", + description=( + "Submit user feedback (thumbs, rating, or category) on a " + "specific flow output. Feedback is sent to the telemetry " + "backend as span annotations." + ), + ) + async def submit_feedback(request: FeedbackRequest) -> FeedbackResponse: + """ + Submit user feedback on a flow output. + + The feedback is recorded as a span annotation in the telemetry backend. + """ + try: + if telemetry.provider == "Phoenix": + # Submit to Phoenix using span annotations API + label = _format_feedback_label(request.feedback) + explanation = getattr(request.feedback, "explanation", None) + + # Calculate score based on feedback type + score = None + if isinstance(request.feedback, ThumbsFeedbackData): + score = 1.0 if request.feedback.value else 0.0 + elif isinstance(request.feedback, RatingFeedbackData): + score = float(request.feedback.score) + + client.spans.add_span_annotation( + span_id=request.span_id, + annotation_name="user_feedback", + label=label, + score=score, + explanation=explanation, + annotator_kind="HUMAN", + ) + + logger.info( + f"Feedback submitted to Phoenix for span {request.span_id}: " + f"{request.feedback.type} = {label}" + ) + + elif telemetry.provider == "Langfuse": + # TODO: Implement Langfuse feedback submission + raise NotImplementedError( + "Langfuse feedback not yet implemented" + ) + + return FeedbackResponse() + + except Exception as e: + logger.error(f"Failed to submit feedback: {e}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to submit feedback.", + ) from e diff --git a/qtype/interpreter/metadata_api.py b/qtype/interpreter/metadata_api.py index 3a52893a..e1f3c44a 100644 --- a/qtype/interpreter/metadata_api.py +++ b/qtype/interpreter/metadata_api.py @@ -5,6 +5,8 @@ from typing import Any from fastapi import FastAPI +from opentelemetry import trace +from opentelemetry.trace import NoOpTracerProvider from pydantic import BaseModel, Field from qtype.interpreter.typing import create_input_shape, create_output_shape @@ -43,6 +45,14 @@ class FlowMetadata(BaseModel): output_schema: dict[str, Any] = Field( ..., description="JSON schema for output" ) + feedback: dict[str, Any] | None = Field( + default=None, + description="Feedback configuration if enabled for this flow", + ) + telemetry_enabled: bool = Field( + default=False, + description="Whether telemetry is currently configured and recording", + ) def create_metadata_endpoints(app: FastAPI, application: Application) -> None: @@ -101,6 +111,15 @@ def _create_flow_metadata(flow: Flow) -> FlowMetadata: f"/flows/{flow.id}/stream" if flow.interface is not None else None ) + # Check if telemetry is enabled + provider = trace.get_tracer_provider() + telemetry_enabled = not isinstance(provider, NoOpTracerProvider) + + # Serialize feedback configuration if present + feedback_config = None + if flow.feedback: + feedback_config = flow.feedback.model_dump() + return FlowMetadata( id=flow.id, description=flow.description, @@ -112,4 +131,6 @@ def _create_flow_metadata(flow: Flow) -> FlowMetadata: ), input_schema=input_model.model_json_schema(), output_schema=output_model.model_json_schema(), + feedback=feedback_config, + telemetry_enabled=telemetry_enabled, ) diff --git a/qtype/interpreter/stream/chat/converter.py b/qtype/interpreter/stream/chat/converter.py index fced692e..a2d97beb 100644 --- a/qtype/interpreter/stream/chat/converter.py +++ b/qtype/interpreter/stream/chat/converter.py @@ -377,11 +377,22 @@ async def stream_events(): # Create converter for stateful event-to-chunk conversion converter = StreamEventConverter() + # Track telemetry metadata from events + telemetry_metadata: dict[str, str] = {} + # Process events and convert to chunks async for event in event_stream: if event is None: break # End of stream + # Extract telemetry metadata from first event that has it + if not telemetry_metadata and event.metadata: + if "span_id" in event.metadata and "trace_id" in event.metadata: + telemetry_metadata = { + "span_id": event.metadata["span_id"], + "trace_id": event.metadata["trace_id"], + } + # Convert event to chunks and yield as SSE for chunk in converter.convert(event): yield ( @@ -390,8 +401,15 @@ async def stream_events(): f"\n\n" ) - # End message stream with optional metadata - finish_chunk = FinishChunk(messageMetadata=output_metadata) # type: ignore[arg-type] + # Merge telemetry metadata with output_metadata for FinishChunk + final_metadata = {**telemetry_metadata} + if output_metadata: + final_metadata.update(output_metadata) + + # End message stream with metadata (includes telemetry) + finish_chunk = FinishChunk( + messageMetadata=final_metadata if final_metadata else None + ) yield ( f"data: " f"{finish_chunk.model_dump_json(by_alias=True, exclude_none=True)}" diff --git a/qtype/interpreter/types.py b/qtype/interpreter/types.py index d5790863..631e5e8a 100644 --- a/qtype/interpreter/types.py +++ b/qtype/interpreter/types.py @@ -38,7 +38,23 @@ def __bool__(self) -> bool: # and can be converted to Vercel UI chunks for frontend display -class TextStreamStartEvent(BaseModel): +class BaseStreamEvent(BaseModel): + """ + Base class for all stream events. + + Provides common metadata field for telemetry and other contextual data. + The metadata dict typically contains: + - span_id: OpenTelemetry span ID (16 hex chars) + - trace_id: OpenTelemetry trace ID (32 hex chars) + """ + + metadata: dict[str, Any] = Field( + default_factory=dict, + description="Metadata for telemetry and context tracking", + ) + + +class TextStreamStartEvent(BaseStreamEvent): """Signals the start of incremental text streaming. Use this when beginning to stream LLM-generated content or other @@ -55,7 +71,7 @@ class TextStreamStartEvent(BaseModel): ) -class TextStreamDeltaEvent(BaseModel): +class TextStreamDeltaEvent(BaseStreamEvent): """Carries an incremental chunk of text content. Use this for streaming LLM responses or other incremental text. @@ -72,7 +88,7 @@ class TextStreamDeltaEvent(BaseModel): delta: str = Field(description="Incremental text content to append") -class TextStreamEndEvent(BaseModel): +class TextStreamEndEvent(BaseStreamEvent): """Signals the completion of incremental text streaming. Use this to mark the end of a text stream. After this event, @@ -88,7 +104,7 @@ class TextStreamEndEvent(BaseModel): ) -class ReasoningStreamStartEvent(BaseModel): +class ReasoningStreamStartEvent(BaseStreamEvent): """Signals the start of incremental reasoning streaming. Use this when an agent begins outputting reasoning/thinking steps. @@ -105,7 +121,7 @@ class ReasoningStreamStartEvent(BaseModel): ) -class ReasoningStreamDeltaEvent(BaseModel): +class ReasoningStreamDeltaEvent(BaseStreamEvent): """Carries an incremental chunk of reasoning content. Use this for streaming agent reasoning/thinking steps. @@ -122,7 +138,7 @@ class ReasoningStreamDeltaEvent(BaseModel): delta: str = Field(description="Incremental reasoning content to append") -class ReasoningStreamEndEvent(BaseModel): +class ReasoningStreamEndEvent(BaseStreamEvent): """Signals the completion of incremental reasoning streaming. Use this to mark the end of a reasoning stream. After this event, @@ -138,7 +154,7 @@ class ReasoningStreamEndEvent(BaseModel): ) -class StatusEvent(BaseModel): +class StatusEvent(BaseStreamEvent): """Reports a complete status message from a step. Use this for non-streaming status updates like: @@ -155,7 +171,7 @@ class StatusEvent(BaseModel): message: str = Field(description="Complete status message to display") -class StepStartEvent(BaseModel): +class StepStartEvent(BaseStreamEvent): """Marks the beginning of a logical step boundary. Use this to group related events together visually in the UI. @@ -168,7 +184,7 @@ class StepStartEvent(BaseModel): step: Step -class StepEndEvent(BaseModel): +class StepEndEvent(BaseStreamEvent): """Marks the end of a logical step boundary. Use this to close a step boundary opened by StepStartEvent. @@ -180,7 +196,7 @@ class StepEndEvent(BaseModel): step: Step -class ToolExecutionStartEvent(BaseModel): +class ToolExecutionStartEvent(BaseStreamEvent): """Signals the start of tool execution. Use this when a tool is about to be invoked, either by an LLM @@ -198,7 +214,7 @@ class ToolExecutionStartEvent(BaseModel): ) -class ToolExecutionEndEvent(BaseModel): +class ToolExecutionEndEvent(BaseStreamEvent): """Signals the completion of tool execution. Use this when a tool has finished executing successfully. @@ -214,7 +230,7 @@ class ToolExecutionEndEvent(BaseModel): tool_output: Any = Field(description="Output returned by the tool") -class ToolExecutionErrorEvent(BaseModel): +class ToolExecutionErrorEvent(BaseStreamEvent): """Signals that tool execution failed. Use this when a tool encounters an error during execution. @@ -230,7 +246,7 @@ class ToolExecutionErrorEvent(BaseModel): error_message: str = Field(description="Description of the error") -class ErrorEvent(BaseModel): +class ErrorEvent(BaseStreamEvent): """Signals a general error occurred during step execution. Use this for errors that aren't specific to tool execution. @@ -330,6 +346,10 @@ class FlowMessage(BaseModel): description="Mapping of variable IDs to their values.", ) error: Optional[StepError] = None + metadata: Dict[str, Any] = Field( + default_factory=dict, + description="Metadata for telemetry, span IDs, and other system-level data.", + ) def is_failed(self) -> bool: """Checks if this state has encountered an error.""" diff --git a/qtype/interpreter/typing.py b/qtype/interpreter/typing.py index 61d95deb..37cb70fc 100644 --- a/qtype/interpreter/typing.py +++ b/qtype/interpreter/typing.py @@ -60,10 +60,19 @@ def _fields_from_variables(variables: list[Variable]) -> dict: def create_output_shape(flow: Flow) -> Type[BaseModel]: + fields = _fields_from_variables(flow.outputs) + # Add metadata field for telemetry (span_id, trace_id) + fields["metadata"] = ( + dict[str, Any] | None, + Field( + default=None, + description="Telemetry metadata including span_id and trace_id", + ), + ) return create_model( f"{flow.id}Result", __base__=BaseModel, - **_fields_from_variables(flow.outputs), + **fields, ) # type: ignore @@ -133,7 +142,11 @@ def flow_results_to_output_container( errors.append(m.error.model_dump()) else: output_instance = output_shape(**m.variables) - outputs.append(output_instance.model_dump()) + output_dict = output_instance.model_dump() + # Include metadata (span_id, trace_id) if present + if m.metadata: + output_dict["metadata"] = m.metadata + outputs.append(output_dict) return output_container(outputs=outputs, errors=errors) diff --git a/qtype/semantic/checker.py b/qtype/semantic/checker.py index e0a51d58..51604bf1 100644 --- a/qtype/semantic/checker.py +++ b/qtype/semantic/checker.py @@ -624,8 +624,24 @@ def _validate_application(application: Application) -> None: Raises: QTypeSemanticError: If SecretReference is used but secret_manager is not configured, or if secret_manager - configuration is invalid + configuration is invalid, or if feedback is configured + without telemetry """ + # Check if feedback is configured without telemetry + if application.telemetry is None: + flows_with_feedback = [ + flow.id for flow in application.flows if flow.feedback is not None + ] + if flows_with_feedback: + raise QTypeSemanticError( + ( + f"Application '{application.id}' has flows with feedback " + f"configured but no telemetry sink defined. " + f"Flows with feedback: {', '.join(flows_with_feedback)}. " + "Please add a telemetry configuration to the application." + ) + ) + if application.secret_manager is None: # Check if any SecretReference is used in the application if _has_secret_reference(application): diff --git a/qtype/semantic/model.py b/qtype/semantic/model.py index 4b79cdd6..476ce5b8 100644 --- a/qtype/semantic/model.py +++ b/qtype/semantic/model.py @@ -158,6 +158,15 @@ class AuthorizationProviderList(BaseModel): root: list[AuthorizationProvider] = Field(...) +class Feedback(BaseModel): + """Base class for user feedback configurations on flow outputs.""" + + type: str = Field(..., description="Type of feedback widget to display.") + explanation: bool = Field( + False, description="Whether to enable optional text explanation field." + ) + + class ConstantPath(BaseModel): """Semantic version of ConstantPath.""" @@ -216,6 +225,12 @@ class Flow(BaseModel): description="List of steps or references to steps", ) interface: FlowInterface | None = Field(None) + feedback: ThumbsFeedback | RatingFeedback | CategoryFeedback | None = ( + Field( + None, + description="Optional feedback configuration for collecting user ratings on flow outputs.", + ) + ) variables: list[Variable] = Field( default_factory=list, description="List of variables available at the application scope.", @@ -669,6 +684,31 @@ class Writer(Step, BatchableStepMixin): id: str = Field(..., description="Unique ID of the data writer.") +class CategoryFeedback(Feedback): + """Categorical feedback with predefined tags.""" + + type: Literal["category"] = Field("category") + categories: list[str] = Field( + ..., description="List of category labels users can select from." + ) + allow_multiple: bool = Field( + True, description="Whether users can select multiple categories." + ) + + +class RatingFeedback(Feedback): + """Numerical rating feedback (1-5 or 1-10 scale).""" + + type: Literal["rating"] = Field("rating") + scale: int = Field(5, description="Maximum value for rating scale.") + + +class ThumbsFeedback(Feedback): + """Binary thumbs up/down feedback.""" + + type: Literal["thumbs"] = Field("thumbs") + + class DocumentIndex(Index): """Document search index for text-based search (e.g., Elasticsearch, OpenSearch).""" diff --git a/schema/qtype.schema.json b/schema/qtype.schema.json index c59cd85f..798570d8 100644 --- a/schema/qtype.schema.json +++ b/schema/qtype.schema.json @@ -930,6 +930,44 @@ "title": "CacheConfig", "type": "object" }, + "CategoryFeedback": { + "additionalProperties": false, + "description": "Categorical feedback with predefined tags.", + "properties": { + "type": { + "const": "category", + "default": "category", + "title": "Type", + "type": "string" + }, + "explanation": { + "default": false, + "description": "Whether to enable optional text explanation field.", + "title": "Explanation", + "type": "boolean" + }, + "categories": { + "description": "List of category labels users can select from.", + "items": { + "type": "string" + }, + "minItems": 1, + "title": "Categories", + "type": "array" + }, + "allow_multiple": { + "default": true, + "description": "Whether users can select multiple categories.", + "title": "Allow Multiple", + "type": "boolean" + } + }, + "required": [ + "categories" + ], + "title": "CategoryFeedback", + "type": "object" + }, "Collect": { "additionalProperties": false, "description": "A step that collects all inputs and creates a single list to return.", @@ -2370,6 +2408,37 @@ ], "default": null }, + "feedback": { + "anyOf": [ + { + "discriminator": { + "mapping": { + "category": "#/$defs/CategoryFeedback", + "rating": "#/$defs/RatingFeedback", + "thumbs": "#/$defs/ThumbsFeedback" + }, + "propertyName": "type" + }, + "oneOf": [ + { + "$ref": "#/$defs/ThumbsFeedback" + }, + { + "$ref": "#/$defs/RatingFeedback" + }, + { + "$ref": "#/$defs/CategoryFeedback" + } + ] + }, + { + "type": "null" + } + ], + "default": null, + "description": "Optional feedback configuration for collecting user ratings on flow outputs.", + "title": "Feedback" + }, "variables": { "description": "List of variables available at the application scope.", "items": { @@ -3295,6 +3364,32 @@ "title": "PythonFunctionTool", "type": "object" }, + "RatingFeedback": { + "additionalProperties": false, + "description": "Numerical rating feedback (1-5 or 1-10 scale).", + "properties": { + "type": { + "const": "rating", + "default": "rating", + "title": "Type", + "type": "string" + }, + "explanation": { + "default": false, + "description": "Whether to enable optional text explanation field.", + "title": "Explanation", + "type": "boolean" + }, + "scale": { + "default": 5, + "description": "Maximum value for rating scale.", + "title": "Scale", + "type": "integer" + } + }, + "title": "RatingFeedback", + "type": "object" + }, "Reference_AWSAuthProvider_": { "properties": { "$ref": { @@ -3659,6 +3754,26 @@ "title": "TextWidget", "type": "string" }, + "ThumbsFeedback": { + "additionalProperties": false, + "description": "Binary thumbs up/down feedback.", + "properties": { + "type": { + "const": "thumbs", + "default": "thumbs", + "title": "Type", + "type": "string" + }, + "explanation": { + "default": false, + "description": "Whether to enable optional text explanation field.", + "title": "Explanation", + "type": "boolean" + } + }, + "title": "ThumbsFeedback", + "type": "object" + }, "ToolList": { "description": "Schema for a standalone list of tools.", "items": { diff --git a/tests/interpreter/test_step_executor.py b/tests/interpreter/test_step_executor.py index 13190203..10b56718 100644 --- a/tests/interpreter/test_step_executor.py +++ b/tests/interpreter/test_step_executor.py @@ -7,6 +7,7 @@ import pytest from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider from qtype.base.types import ConcurrencyConfig from qtype.interpreter.base.base_step_executor import StepExecutor @@ -388,6 +389,53 @@ async def test_finalize_hook(self, simple_step, session, executor_context): assert results[-1].variables.get("finalized") == "true" # Note: Progress count timing depends on stream implementation details + async def test_span_metadata_enrichment(self, simple_step, session): + """Test that span_id and trace_id are added to message metadata.""" + # Set up a real TracerProvider that records spans + tracer_provider = TracerProvider() + tracer = tracer_provider.get_tracer(__name__) + + # Create context with recording tracer + context = ExecutorContext( + secret_manager=NoOpSecretManager(), + tracer=tracer, + ) + + executor = MockExecutor( + simple_step, + context, + suffix="_processed", + ) + results = await collect_stream(executor, ["msg1", "msg2"], session) + + assert len(results) == 2 + + # Each message should have span_id and trace_id in metadata + for result in results: + assert "span_id" in result.metadata + assert "trace_id" in result.metadata + + # Verify they are valid hex strings + span_id = result.metadata["span_id"] + trace_id = result.metadata["trace_id"] + + # span_id should be 16 hex chars (64-bit) + assert len(span_id) == 16 + assert all(c in "0123456789abcdef" for c in span_id) + + # trace_id should be 32 hex chars (128-bit) + assert len(trace_id) == 32 + assert all(c in "0123456789abcdef" for c in trace_id) + + # Each message should have a unique process_message span_id + # (this allows per-message feedback instead of per-step) + span_ids = [r.metadata["span_id"] for r in results] + assert len(set(span_ids)) == len(results) + + # Trace ID should be the same for all messages in the same execution + trace_ids = [r.metadata["trace_id"] for r in results] + assert len(set(trace_ids)) == 1 + async def test_dependencies_injection(self, simple_step, executor_context): """Test that dependencies are injected and accessible.""" test_dep = {"key": "value"} diff --git a/tests/semantic/checker-error-specs/invalid_feedback_without_telemetry.qtype.yaml b/tests/semantic/checker-error-specs/invalid_feedback_without_telemetry.qtype.yaml new file mode 100644 index 00000000..d1dd5a60 --- /dev/null +++ b/tests/semantic/checker-error-specs/invalid_feedback_without_telemetry.qtype.yaml @@ -0,0 +1,27 @@ +id: test_feedback_without_telemetry +description: Test that feedback requires telemetry to be configured + +flows: + - id: chat + description: Flow with feedback but no telemetry (should fail validation) + interface: + type: Conversational + feedback: + type: thumbs + explanation: false + variables: + - id: msg + type: ChatMessage + steps: + - type: Echo + id: echo + inputs: + - msg + outputs: + - msg + inputs: + - msg + outputs: + - msg + +# Note: No telemetry configured - this should cause validation error diff --git a/tests/semantic/test_checker_validation.py b/tests/semantic/test_checker_validation.py index de3dace6..026d417e 100644 --- a/tests/semantic/test_checker_validation.py +++ b/tests/semantic/test_checker_validation.py @@ -105,6 +105,10 @@ "invalid_secret_manager_wrong_auth_type.qtype.yaml", "AWSSecretManager 'my_secret_manager' requires an AWSAuthProvider", ), + ( + "invalid_feedback_without_telemetry.qtype.yaml", + "has flows with feedback configured but no telemetry sink defined", + ), ( "invalid_complete_flow_no_text_output.qtype.yaml", "final step 'echo_step' is of type 'Echo' which does not support streaming", diff --git a/tests/semantic/test_feedback_validation.py b/tests/semantic/test_feedback_validation.py new file mode 100644 index 00000000..ce1f0eae --- /dev/null +++ b/tests/semantic/test_feedback_validation.py @@ -0,0 +1,101 @@ +"""Tests for semantic validation of feedback configurations.""" + +from __future__ import annotations + +from qtype.semantic import loader + + +class TestFeedbackSemanticValidation: + """Test semantic validation rules for feedback.""" + + def test_thumbs_feedback_loads_correctly(self, tmp_path): + """Test that thumbs feedback configuration loads and validates.""" + yaml_content = """ +id: test_app +flows: + - id: test_flow + feedback: + type: thumbs + explanation: true + steps: + - type: Echo + id: echo1 +""" + yaml_file = tmp_path / "test.yaml" + yaml_file.write_text(yaml_content) + + app, _ = loader.load(str(yaml_file)) + flow = app.flows[0] + assert flow.feedback is not None + assert flow.feedback.type == "thumbs" + assert flow.feedback.explanation is True + + def test_rating_feedback_loads_correctly(self, tmp_path): + """Test that rating feedback configuration loads and validates.""" + yaml_content = """ +id: test_app +flows: + - id: test_flow + feedback: + type: rating + scale: 10 + explanation: false + steps: + - type: Echo + id: echo1 +""" + yaml_file = tmp_path / "test.yaml" + yaml_file.write_text(yaml_content) + + app, _ = loader.load(str(yaml_file)) + flow = app.flows[0] + assert flow.feedback is not None + assert flow.feedback.type == "rating" + assert flow.feedback.scale == 10 + assert flow.feedback.explanation is False + + def test_category_feedback_loads_correctly(self, tmp_path): + """Test that category feedback configuration loads and validates.""" + yaml_content = """ +id: test_app +flows: + - id: test_flow + feedback: + type: category + categories: + - accurate + - helpful + - creative + allow_multiple: true + explanation: true + steps: + - type: Echo + id: echo1 +""" + yaml_file = tmp_path / "test.yaml" + yaml_file.write_text(yaml_content) + + app, _ = loader.load(str(yaml_file)) + flow = app.flows[0] + assert flow.feedback is not None + assert flow.feedback.type == "category" + assert flow.feedback.categories == ["accurate", "helpful", "creative"] + assert flow.feedback.allow_multiple is True + assert flow.feedback.explanation is True + + def test_flow_without_feedback(self, tmp_path): + """Test that flows work without feedback configuration.""" + yaml_content = """ +id: test_app +flows: + - id: test_flow + steps: + - type: Echo + id: echo1 +""" + yaml_file = tmp_path / "test.yaml" + yaml_file.write_text(yaml_content) + + app, _ = loader.load(str(yaml_file)) + flow = app.flows[0] + assert flow.feedback is None diff --git a/ui/components/FlowResponseCard.tsx b/ui/components/FlowResponseCard.tsx index a469e136..8f8e4121 100644 --- a/ui/components/FlowResponseCard.tsx +++ b/ui/components/FlowResponseCard.tsx @@ -6,6 +6,7 @@ "use client"; +import { FeedbackButton } from "@/components/feedback"; import { Alert, AlertDescription } from "@/components/ui/Alert"; import { MarkdownContainer } from "./MarkdownContainer"; @@ -21,6 +22,7 @@ import { } from "./outputs"; import type { SchemaProperty, ResponseData } from "@/types"; +import type { FeedbackConfig } from "@/types/FlowMetadata"; interface ResponsePropertyProps { name: string; @@ -129,11 +131,15 @@ function ResponseProperty({ name, property, value }: ResponsePropertyProps) { interface FlowResponseCardProps { responseSchema?: SchemaProperty | null; responseData?: ResponseData; + feedbackConfig?: FeedbackConfig | null; + telemetryEnabled?: boolean; } export default function FlowResponseCard({ responseSchema, responseData, + feedbackConfig, + telemetryEnabled = false, }: FlowResponseCardProps) { if (!responseData) { return ( @@ -158,11 +164,36 @@ export default function FlowResponseCard({ ? (responseData as Record).outputs || responseData : responseData || {}; + // Extract metadata (span_id, trace_id) from response + const metadata = + responseData && typeof responseData === "object" + ? (responseData as Record).metadata + : null; + + const spanId = + metadata && typeof metadata === "object" + ? (metadata as Record).span_id + : null; + + const traceId = + metadata && typeof metadata === "object" + ? (metadata as Record).trace_id + : null; + + const showFeedback = + feedbackConfig && + telemetryEnabled && + spanId && + traceId && + typeof spanId === "string" && + typeof traceId === "string"; + return (
{responseSchema.properties && - Object.entries(responseSchema.properties).map( - ([propertyName, propertySchema]) => { + Object.entries(responseSchema.properties) + .filter(([propertyName]) => propertyName !== "metadata") + .map(([propertyName, propertySchema]) => { const value = (outputsData as Record)[ propertyName ]; @@ -179,8 +210,18 @@ export default function FlowResponseCard({ value={value} /> ); - }, - )} + })} + + {showFeedback && ( +
+ +
+ )}
); } diff --git a/ui/components/FlowResponseTable.tsx b/ui/components/FlowResponseTable.tsx index e9df5b0a..089924cb 100644 --- a/ui/components/FlowResponseTable.tsx +++ b/ui/components/FlowResponseTable.tsx @@ -19,14 +19,18 @@ import { Download } from "lucide-react"; import Papa from "papaparse"; import { useMemo, useState } from "react"; +import { FeedbackButton } from "@/components/feedback"; import { Button } from "@/components/ui/Button"; import { Input } from "@/components/ui/Input"; import type { SchemaProperty, ResponseData } from "@/types"; +import type { FeedbackConfig } from "@/types/FlowMetadata"; interface FlowResponseTableProps { responseSchema?: SchemaProperty | null; outputs: ResponseData[]; + feedbackConfig?: FeedbackConfig | null; + telemetryEnabled?: boolean; } function formatCellValue(value: ResponseData, qtypeType?: string): string { @@ -61,6 +65,8 @@ function formatCellValue(value: ResponseData, qtypeType?: string): string { export default function FlowResponseTable({ responseSchema, outputs, + feedbackConfig, + telemetryEnabled = false, }: FlowResponseTableProps) { const [searchText, setSearchText] = useState(""); const [sorting, setSorting] = useState([]); @@ -78,18 +84,50 @@ export default function FlowResponseTable({ const columns = useMemo>[]>(() => { if (!responseSchema?.properties) return []; - return Object.entries(responseSchema.properties).map(([key, schema]) => { - const prop = schema as SchemaProperty; - return { - accessorKey: key, - header: prop.title || key, + const dataColumns = Object.entries(responseSchema.properties) + .filter(([key]) => key !== "metadata") + .map(([key, schema]) => { + const prop = schema as SchemaProperty; + return { + accessorKey: key, + header: prop.title || key, + cell: ({ row }) => { + const value = row.original[key]; + return formatCellValue(value, prop.qtype_type); + }, + }; + }); + + // Add feedback column if enabled + if (feedbackConfig && telemetryEnabled) { + dataColumns.push({ + id: "feedback", + header: "Feedback", cell: ({ row }) => { - const value = row.original[key]; - return formatCellValue(value, prop.qtype_type); + const metadata = row.original.metadata as + | Record + | undefined; + const spanId = metadata?.span_id as string | undefined; + const traceId = metadata?.trace_id as string | undefined; + + if (!spanId || !traceId) { + return null; + } + + return ( + + ); }, - }; - }); - }, [responseSchema]); + }); + } + + return dataColumns; + }, [responseSchema, feedbackConfig, telemetryEnabled]); const table = useReactTable({ data, diff --git a/ui/components/chat/MessageBubble.tsx b/ui/components/chat/MessageBubble.tsx index f1fbe8db..501a4a84 100644 --- a/ui/components/chat/MessageBubble.tsx +++ b/ui/components/chat/MessageBubble.tsx @@ -1,5 +1,6 @@ import { Bot, User } from "lucide-react"; +import { FeedbackButton } from "@/components/feedback"; import { Avatar, AvatarFallback } from "@/components/ui/Avatar"; import { MarkdownContainer } from "../MarkdownContainer"; @@ -13,10 +14,13 @@ import { FileDisplay } from "."; import type { Message } from "./types"; import type { MessagePartWithText } from "./types/MessagePart"; import type { FileAttachment } from "@/types"; +import type { FeedbackConfig } from "@/types/FlowMetadata"; interface MessageBubbleProps { message: Message; isStreaming?: boolean; + feedbackConfig?: FeedbackConfig | null; + telemetryEnabled?: boolean; } interface StreamingPart { @@ -24,7 +28,12 @@ interface StreamingPart { [key: string]: unknown; } -function MessageBubble({ message, isStreaming = false }: MessageBubbleProps) { +function MessageBubble({ + message, + isStreaming = false, + feedbackConfig, + telemetryEnabled = false, +}: MessageBubbleProps) { const isUser = message.role === "user"; const reasoningContent = getPartContent( @@ -50,6 +59,18 @@ function MessageBubble({ message, isStreaming = false }: MessageBubbleProps) { isStreaming, ); + // Extract span_id and trace_id from metadata for feedback + const spanId = message.metadata?.span_id as string | undefined; + const traceId = message.metadata?.trace_id as string | undefined; + + const showFeedback = + !isUser && + !isStreaming && + feedbackConfig && + telemetryEnabled && + spanId && + traceId; + return (
))} + + {showFeedback && ( +
+ +
+ )}
{isUser && ( diff --git a/ui/components/feedback/CategoryFeedback.tsx b/ui/components/feedback/CategoryFeedback.tsx new file mode 100644 index 00000000..554890b0 --- /dev/null +++ b/ui/components/feedback/CategoryFeedback.tsx @@ -0,0 +1,78 @@ +/** + * CategoryFeedback Component + * + * Categorical feedback with predefined tags (single or multi-select) + */ + +"use client"; + +import { Check } from "lucide-react"; +import { useState } from "react"; + +import { Button } from "@/components/ui/Button"; + +interface CategoryFeedbackProps { + categories: string[]; + allowMultiple: boolean; + onFeedback: (feedback: { type: "category"; categories: string[] }) => void; +} + +export function CategoryFeedback({ + categories, + allowMultiple, + onFeedback, +}: CategoryFeedbackProps) { + const [selectedCategories, setSelectedCategories] = useState>( + new Set(), + ); + + const handleCategoryClick = (category: string) => { + if (allowMultiple) { + // Multi-select mode + const newSelected = new Set(selectedCategories); + if (newSelected.has(category)) { + newSelected.delete(category); + } else { + newSelected.add(category); + } + setSelectedCategories(newSelected); + } else { + // Single-select mode - submit immediately + onFeedback({ type: "category", categories: [category] }); + } + }; + + const handleSubmit = () => { + if (selectedCategories.size > 0) { + onFeedback({ + type: "category", + categories: Array.from(selectedCategories), + }); + } + }; + + return ( +
+ {categories.map((category) => { + const isSelected = selectedCategories.has(category); + return ( + + ); + })} + {allowMultiple && selectedCategories.size > 0 && ( + + )} +
+ ); +} diff --git a/ui/components/feedback/FeedbackButton.tsx b/ui/components/feedback/FeedbackButton.tsx new file mode 100644 index 00000000..91d0e0ca --- /dev/null +++ b/ui/components/feedback/FeedbackButton.tsx @@ -0,0 +1,173 @@ +/** + * FeedbackButton Component + * + * Displays feedback UI based on flow configuration and handles submission to Phoenix + */ + +"use client"; + +import { Check, Loader2 } from "lucide-react"; +import { useState } from "react"; + +import { apiClient } from "@/lib/apiClient"; + +import { CategoryFeedback } from "./CategoryFeedback"; +import { FeedbackExplanationModal } from "./FeedbackExplanationModal"; +import { RatingFeedback } from "./RatingFeedback"; +import { ThumbsFeedback } from "./ThumbsFeedback"; + +import type { FeedbackConfig } from "@/types/FlowMetadata"; + +interface FeedbackButtonProps { + feedbackConfig: FeedbackConfig; + spanId: string; + traceId: string; + telemetryEnabled: boolean; +} + +export function FeedbackButton({ + feedbackConfig, + spanId, + traceId, + telemetryEnabled, +}: FeedbackButtonProps) { + const [submitted, setSubmitted] = useState(false); + const [isSubmitting, setIsSubmitting] = useState(false); + const [error, setError] = useState(null); + const [showExplanation, setShowExplanation] = useState(false); + const [pendingFeedback, setPendingFeedback] = useState<{ + type: "thumbs" | "rating" | "category"; + value?: boolean; + score?: number; + categories?: string[]; + } | null>(null); + + if (!telemetryEnabled) { + return null; // Don't show feedback if telemetry is not enabled + } + + if (submitted) { + return ( +
+ + Feedback submitted +
+ ); + } + + if (isSubmitting) { + return ( +
+ + Submitting... +
+ ); + } + + const handleFeedbackSubmit = async ( + feedback: { + type: "thumbs" | "rating" | "category"; + value?: boolean; + score?: number; + categories?: string[]; + }, + explanation?: string, + ) => { + setIsSubmitting(true); + setError(null); + + try { + // Construct feedback data based on type + let feedbackData: + | { type: "thumbs"; value: boolean; explanation?: string } + | { type: "rating"; score: number; explanation?: string } + | { type: "category"; categories: string[]; explanation?: string }; + + if (feedback.type === "thumbs" && feedback.value !== undefined) { + feedbackData = { type: "thumbs", value: feedback.value, explanation }; + } else if (feedback.type === "rating" && feedback.score !== undefined) { + feedbackData = { type: "rating", score: feedback.score, explanation }; + } else if (feedback.type === "category" && feedback.categories) { + feedbackData = { + type: "category", + categories: feedback.categories, + explanation, + }; + } else { + throw new Error("Invalid feedback data"); + } + + await apiClient.submitFeedback({ + span_id: spanId, + trace_id: traceId, + feedback: feedbackData, + }); + + setSubmitted(true); + setPendingFeedback(null); + setShowExplanation(false); + } catch (err) { + setError( + err instanceof Error ? err.message : "Failed to submit feedback", + ); + setPendingFeedback(null); + setShowExplanation(false); + } finally { + setIsSubmitting(false); + } + }; + + const handleFeedbackClick = (feedback: { + type: "thumbs" | "rating" | "category"; + value?: boolean; + score?: number; + categories?: string[]; + }) => { + // If explanation is enabled, show modal first + if (feedbackConfig.explanation) { + setPendingFeedback(feedback); + setShowExplanation(true); + } else { + // Submit directly without explanation + handleFeedbackSubmit(feedback); + } + }; + + return ( +
+
+ {feedbackConfig.type === "thumbs" && ( + + )} + {feedbackConfig.type === "rating" && ( + + )} + {feedbackConfig.type === "category" && ( + + )} +
+ + {error &&
{error}
} + + {showExplanation && pendingFeedback && ( + { + setShowExplanation(false); + setPendingFeedback(null); + }} + onSubmit={(explanation) => { + handleFeedbackSubmit(pendingFeedback, explanation); + }} + /> + )} +
+ ); +} diff --git a/ui/components/feedback/FeedbackExplanationModal.tsx b/ui/components/feedback/FeedbackExplanationModal.tsx new file mode 100644 index 00000000..69fffa91 --- /dev/null +++ b/ui/components/feedback/FeedbackExplanationModal.tsx @@ -0,0 +1,72 @@ +/** + * FeedbackExplanationModal Component + * + * Optional modal for adding text explanation to feedback + */ + +"use client"; + +import { X } from "lucide-react"; +import { useState } from "react"; + +import { Button } from "@/components/ui/Button"; +import { Card } from "@/components/ui/Card"; + +interface FeedbackExplanationModalProps { + isOpen: boolean; + onClose: () => void; + onSubmit: (explanation?: string) => void; +} + +export function FeedbackExplanationModal({ + isOpen, + onClose, + onSubmit, +}: FeedbackExplanationModalProps) { + const [explanation, setExplanation] = useState(""); + + if (!isOpen) return null; + + const handleSubmit = () => { + onSubmit(explanation.trim() || undefined); + }; + + const handleSkip = () => { + onSubmit(undefined); + }; + + return ( +
+ +
+

Add Explanation (Optional)

+ +
+ +
+