From dc04bf41d94932dad9588549e57e5322c1599399 Mon Sep 17 00:00:00 2001 From: bsatapat Date: Thu, 25 Sep 2025 16:10:50 +0530 Subject: [PATCH] Added rag_chunks to streaming_query --- src/app/endpoints/streaming_query.py | 82 +++++++++++++++++++++++----- 1 file changed, 69 insertions(+), 13 deletions(-) diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index 3775995a9..344daeb01 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -135,7 +135,7 @@ def stream_start_event(conversation_id: str) -> str: ) -def stream_end_event(metadata_map: dict) -> str: +def stream_end_event(metadata_map: dict, summary: TurnSummary) -> str: """ Yield the end of the data stream. @@ -151,20 +151,67 @@ def stream_end_event(metadata_map: dict) -> str: str: A Server-Sent Events (SSE) formatted string representing the end of the data stream. """ + # Process RAG chunks + rag_chunks = [ + { + "content": chunk.content, + "source": chunk.source, + "score": chunk.score + } + for chunk in summary.rag_chunks + ] + + # Extract referenced documents from RAG chunks, enriching from metadata_map and deduping + referenced_docs: list[dict[str, str | None]] = [] + doc_urls: set[str] = set() + doc_ids: set[str] = set() + metas_by_id: dict[str, dict[str, Any]] = { + k: v for k, v in metadata_map.items() if isinstance(v, dict) + } + + for chunk in summary.rag_chunks: + src = chunk.source + if not src: + continue + if src.startswith("http"): + if src not in doc_urls: + doc_urls.add(src) + referenced_docs.append( + {"doc_url": src, "doc_title": src.rsplit("/", 1)[-1] or src} + ) + else: + # Treat as document_id; enrich from metadata_map when available + if src in doc_ids: + continue + doc_ids.add(src) + meta = metas_by_id.get(src, {}) + doc_url = meta.get("docs_url") + title = meta.get("title") + if doc_url: + if doc_url in doc_urls: + continue + doc_urls.add(doc_url) + referenced_docs.append( + { + "doc_url": doc_url, + "doc_title": title or (doc_url.rsplit("/", 1)[-1] if doc_url else src), + } + ) + + # Add any additional referenced documents from metadata_map not already present + for meta in metas_by_id.values(): + doc_url = meta.get("docs_url") + title = meta.get("title") + if doc_url and doc_url not in doc_urls: + doc_urls.add(doc_url) + referenced_docs.append({"doc_url": doc_url, "doc_title": title}) + return format_stream_data( { "event": "end", "data": { - "referenced_documents": [ - { - "doc_url": v["docs_url"], - "doc_title": v["title"], - } - for v in filter( - lambda v: ("docs_url" in v) and ("title" in v), - metadata_map.values(), - ) - ], + "rag_chunks": rag_chunks, + "referenced_documents": referenced_docs, "truncated": None, # TODO(jboos): implement truncated "input_tokens": 0, # TODO(jboos): implement input tokens "output_tokens": 0, # TODO(jboos): implement output tokens @@ -680,11 +727,20 @@ async def response_generator( chunk_id += 1 yield event - yield stream_end_event(metadata_map) + yield stream_end_event(metadata_map, summary) if not is_transcripts_enabled(): logger.debug("Transcript collection is disabled in the configuration") else: + # Convert RAG chunks to serializable format for store_transcript + rag_chunks_for_transcript = [ + { + "content": chunk.content, + "source": chunk.source, + "score": chunk.score + } + for chunk in summary.rag_chunks + ] store_transcript( user_id=user_id, conversation_id=conversation_id, @@ -694,7 +750,7 @@ async def response_generator( query=query_request.query, query_request=query_request, summary=summary, - rag_chunks=[], # TODO(lucasagomes): implement rag_chunks + rag_chunks=rag_chunks_for_transcript, truncated=False, # TODO(lucasagomes): implement truncation as part # of quota work attachments=query_request.attachments or [],