Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 6 additions & 23 deletions src/app/endpoints/query_v2.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Handler for REST API call to provide answer to query using Response API."""

import json
import logging
from typing import Annotated, Any, cast

Expand Down Expand Up @@ -38,7 +39,10 @@
get_topic_summary_system_prompt,
)
from utils.mcp_headers import mcp_headers_dependency
from utils.responses import extract_text_from_response_output_item
from utils.responses import (
extract_text_from_response_output_item,
parse_referenced_documents_from_responses_api,
)
from utils.shields import detect_shield_violations, get_available_shields
from utils.token_counter import TokenCounter
from utils.types import ToolCallSummary, TurnSummary
Expand Down Expand Up @@ -132,7 +136,7 @@ def _build_tool_call_summary( # pylint: disable=too-many-return-statements,too-
id=str(getattr(output_item, "id")),
name=DEFAULT_RAG_TOOL,
args=args,
response=response_payload,
response=json.dumps(response_payload) if response_payload else None,
)

if item_type == "web_search_call":
Expand Down Expand Up @@ -394,27 +398,6 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche
return (summary, conversation_id, referenced_documents, token_usage)


def parse_referenced_documents_from_responses_api(
response: OpenAIResponseObject, # pylint: disable=unused-argument
) -> list[ReferencedDocument]:
"""
Parse referenced documents from OpenAI Responses API response.

Args:
response: The OpenAI Response API response object

Returns:
list[ReferencedDocument]: List of referenced documents with doc_url and doc_title
"""
# TODO(ltomasbo): need to parse source documents from Responses API response.
# The Responses API has a different structure than Agent API for referenced documents.
# Need to extract from:
# - OpenAIResponseOutputMessageFileSearchToolCall.results
# - OpenAIResponseAnnotationCitation in message content
# - OpenAIResponseAnnotationFileCitation in message content
return []


def extract_token_usage_from_responses_api(
response: OpenAIResponseObject,
model: str,
Expand Down
178 changes: 178 additions & 0 deletions src/utils/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

from typing import Any

from pydantic import AnyUrl, ValidationError

from models.responses import ReferencedDocument


def extract_text_from_response_output_item(output_item: Any) -> str:
"""Extract assistant message text from a Responses API output item.
Expand Down Expand Up @@ -54,3 +58,177 @@ def extract_text_from_response_output_item(output_item: Any) -> str:
text_fragments.append(str(dict_text))

return "".join(text_fragments)


def _parse_file_search_result(
result: Any,
) -> tuple[str | None, str | None]:
"""
Extract filename and URL from a file search result.

Args:
result: A file search result (dict or object)

Returns:
tuple[str | None, str | None]: (doc_url, filename) tuple
"""
# Handle both object and dict access
if isinstance(result, dict):
filename = result.get("filename")
attributes = result.get("attributes", {})
else:
filename = getattr(result, "filename", None)
attributes = getattr(result, "attributes", {}) or {}

# Try to get URL from attributes - look for common URL fields
doc_url = (
attributes.get("link") or attributes.get("url") or attributes.get("doc_url")
)
# Treat empty string as None for URL to satisfy AnyUrl | None
final_url = doc_url if doc_url else None
return (final_url, filename)


def _parse_annotation(
annotation: Any,
) -> tuple[str | None, str | None, str | None]:
"""
Extract type, URL, and title from an annotation.

Args:
annotation: An annotation (dict or object)

Returns:
tuple[str | None, str | None, str | None]: (type, url, title) tuple
"""
# Handle both object and dict access for annotations
if isinstance(annotation, dict):
anno_type = annotation.get("type")
anno_url = annotation.get("url")
anno_title = annotation.get("title") or annotation.get("filename")
else:
anno_type = getattr(annotation, "type", None)
anno_url = getattr(annotation, "url", None)
anno_title = getattr(annotation, "title", None) or getattr(
annotation, "filename", None
)
return (anno_type, anno_url, anno_title)


def _add_document_if_unique(
documents: list[ReferencedDocument],
seen_docs: set[tuple[str | None, str | None]],
doc_url: str | None,
doc_title: str | None,
) -> None:
"""
Add document to list if not already seen.

Args:
documents: List of documents to append to
seen_docs: Set of seen (url, title) tuples
doc_url: Document URL string (may be None)
doc_title: Document title (may be None)
"""
if (doc_url, doc_title) not in seen_docs:
# Convert string URL to AnyUrl type; None is acceptable as-is.
validated_url: AnyUrl | None = None
if doc_url:
try:
validated_url = AnyUrl(doc_url) # type: ignore[arg-type]
except ValidationError:
# Skip documents with invalid URLs
return
documents.append(ReferencedDocument(doc_url=validated_url, doc_title=doc_title))
seen_docs.add((doc_url, doc_title))


def _parse_file_search_output(
output_item: Any,
documents: list[ReferencedDocument],
seen_docs: set[tuple[str | None, str | None]],
) -> None:
"""
Parse file search results from an output item.

Args:
output_item: Output item of type "file_search_call"
documents: List to append found documents to
seen_docs: Set of seen (url, title) tuples
"""
results = getattr(output_item, "results", []) or []
for result in results:
doc_url, filename = _parse_file_search_result(result)
# If we have at least a filename or url
if filename or doc_url:
_add_document_if_unique(documents, seen_docs, doc_url, filename)


def _parse_message_annotations(
output_item: Any,
documents: list[ReferencedDocument],
seen_docs: set[tuple[str | None, str | None]],
) -> None:
"""
Parse annotations from a message output item.

Args:
output_item: Output item of type "message"
documents: List to append found documents to
seen_docs: Set of seen (url, title) tuples
"""
content = getattr(output_item, "content", None)
if not isinstance(content, list):
return

for part in content:
# Skip if part is a string or doesn't have annotations
if isinstance(part, str):
continue

annotations = getattr(part, "annotations", []) or []
for annotation in annotations:
anno_type, anno_url, anno_title = _parse_annotation(annotation)

if anno_type == "url_citation":
# Treat empty string as None
final_url = anno_url if anno_url else None
_add_document_if_unique(documents, seen_docs, final_url, anno_title)
elif anno_type == "file_citation":
_add_document_if_unique(documents, seen_docs, None, anno_title)


def parse_referenced_documents_from_responses_api(
response: Any,
) -> list[ReferencedDocument]:
"""
Parse referenced documents from OpenAI Responses API response.

This function extracts document references from two sources:
1. file_search_call results - Documents retrieved via RAG/file search
2. message content annotations - Citation annotations in assistant messages

Args:
response: The OpenAI Response API response object (OpenAIResponseObject)

Returns:
list[ReferencedDocument]: List of unique referenced documents with doc_url and doc_title
"""
documents: list[ReferencedDocument] = []
# Use a set to track unique documents by (doc_url, doc_title) tuple
seen_docs: set[tuple[str | None, str | None]] = set()

if not response.output:
return documents

for output_item in response.output:
item_type = getattr(output_item, "type", None)

# 1. Parse from file_search_call results
if item_type == "file_search_call":
_parse_file_search_output(output_item, documents, seen_docs)
# 2. Parse from message content annotations
elif item_type == "message":
_parse_message_annotations(output_item, documents, seen_docs)

return documents
97 changes: 96 additions & 1 deletion tests/unit/app/endpoints/test_query_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,15 @@ async def test_retrieve_response_parses_output_and_tool_calls(
mock_client = mocker.Mock()

# Build output with content variants and tool calls
part1 = mocker.Mock(text="Hello ")
part1.annotations = [] # Ensure annotations is a list to avoid iteration error
part2 = mocker.Mock(text="world")
part2.annotations = []

output_item_1 = mocker.Mock()
output_item_1.type = "message"
output_item_1.role = "assistant"
output_item_1.content = [mocker.Mock(text="Hello "), mocker.Mock(text="world")]
output_item_1.content = [part1, part2]

output_item_2 = mocker.Mock()
output_item_2.type = "message"
Expand Down Expand Up @@ -710,3 +715,93 @@ async def test_retrieve_response_no_violation_with_shields(

# Verify that the validation error metric was NOT incremented
validation_metric.inc.assert_not_called()


def _create_message_output_with_annotations(mocker: MockerFixture) -> Any:
"""Create a message output item with url and file citations."""
content_part = mocker.Mock()
content_part.type = "output_text"
content_part.text = "Here is a citation."

annotation1 = mocker.Mock()
annotation1.type = "url_citation"
annotation1.url = "http://example.com/doc1"
annotation1.title = "Doc 1"

annotation2 = mocker.Mock()
annotation2.type = "file_citation"
annotation2.filename = "file1.txt"
annotation2.url = None
annotation2.title = None

content_part.annotations = [annotation1, annotation2]

output_item = mocker.Mock()
output_item.type = "message"
output_item.role = "assistant"
output_item.content = [content_part]
return output_item


@pytest.mark.asyncio
async def test_retrieve_response_parses_referenced_documents(
mocker: MockerFixture,
) -> None:
"""Test that retrieve_response correctly parses referenced documents from response."""
mock_client = mocker.Mock()

# 1. Output item with message content annotations (citations)
output_item_1 = _create_message_output_with_annotations(mocker)

# 2. Output item with file search tool call results
output_item_2 = mocker.Mock()
output_item_2.type = "file_search_call"
output_item_2.queries = (
[]
) # Ensure queries is a list to avoid iteration error in tool summary
output_item_2.status = "completed"
output_item_2.results = [
{"filename": "file2.pdf", "attributes": {"url": "http://example.com/doc2"}},
{"filename": "file3.docx", "attributes": {}}, # No URL
]

response_obj = mocker.Mock()
response_obj.id = "resp-docs"
response_obj.output = [output_item_1, output_item_2]
response_obj.usage = None

mock_client.responses.create = mocker.AsyncMock(return_value=response_obj)
mock_vector_stores = mocker.Mock()
mock_vector_stores.data = []
mock_client.vector_stores.list = mocker.AsyncMock(return_value=mock_vector_stores)
mock_client.shields.list = mocker.AsyncMock(return_value=[])

mocker.patch("app.endpoints.query_v2.get_system_prompt", return_value="PROMPT")
mocker.patch("app.endpoints.query_v2.configuration", mocker.Mock(mcp_servers=[]))

qr = QueryRequest(query="query with docs")
_summary, _conv_id, referenced_docs, _token_usage = await retrieve_response(
mock_client, "model-docs", qr, token="tkn", provider_id="test-provider"
)

assert len(referenced_docs) == 4

# Verify Doc 1 (URL citation)
doc1 = next((d for d in referenced_docs if d.doc_title == "Doc 1"), None)
assert doc1
assert str(doc1.doc_url) == "http://example.com/doc1"

# Verify file1.txt (File citation)
doc2 = next((d for d in referenced_docs if d.doc_title == "file1.txt"), None)
assert doc2
assert doc2.doc_url is None

# Verify file2.pdf (File search result with URL)
doc3 = next((d for d in referenced_docs if d.doc_title == "file2.pdf"), None)
assert doc3
assert str(doc3.doc_url) == "http://example.com/doc2"

# Verify file3.docx (File search result without URL)
doc4 = next((d for d in referenced_docs if d.doc_title == "file3.docx"), None)
assert doc4
assert doc4.doc_url is None
Loading