Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose-library.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ services:
- TAVILY_SEARCH_API_KEY=${TAVILY_SEARCH_API_KEY:-}
# OpenAI
- OPENAI_API_KEY=${OPENAI_API_KEY}
- E2E_OPENAI_MODEL=${E2E_OPENAI_MODEL:-gpt-4-turbo}
- E2E_OPENAI_MODEL=${E2E_OPENAI_MODEL:-gpt-4o-mini}
# Azure
- AZURE_API_KEY=${AZURE_API_KEY:-}
# RHAIIS
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
- TAVILY_SEARCH_API_KEY=${TAVILY_SEARCH_API_KEY:-}
# OpenAI
- OPENAI_API_KEY=${OPENAI_API_KEY}
- E2E_OPENAI_MODEL=${E2E_OPENAI_MODEL}
- E2E_OPENAI_MODEL=${E2E_OPENAI_MODEL:-gpt-4o-mini}
# Azure
- AZURE_API_KEY=${AZURE_API_KEY}
# RHAIIS
Expand Down
11 changes: 9 additions & 2 deletions run.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,15 @@ storage:
namespace: prompts
backend: kv_default
registered_resources:
models: []
shields: []
models:
- model_id: gpt-4o-mini
provider_id: openai
model_type: llm
provider_model_id: gpt-4o-mini
shields:
- shield_id: llama-guard
provider_id: llama-guard
provider_shield_id: openai/gpt-4o-mini
vector_dbs: []
datasets: []
scoring_fns: []
Expand Down
4 changes: 2 additions & 2 deletions src/app/endpoints/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
from typing import Annotated, Any, Optional, cast

from fastapi import APIRouter, Depends, HTTPException, Request
from litellm.exceptions import RateLimitError
from llama_stack_client import (
APIConnectionError,
APIStatusError,
AsyncLlamaStackClient, # type: ignore
AsyncLlamaStackClient,
RateLimitError, # type: ignore
)
from llama_stack_client.types import Shield, UserMessage # type: ignore
from llama_stack_client.types.alpha.agents.turn import Turn
Expand Down
37 changes: 25 additions & 12 deletions src/app/endpoints/query_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from llama_stack.apis.agents.openai_responses import (
OpenAIResponseObject,
)
from llama_stack_client import AsyncLlamaStackClient # type: ignore
from llama_stack_client import AsyncLlamaStackClient

import metrics
from app.endpoints.query import (
Expand Down Expand Up @@ -42,7 +42,10 @@
)
from utils.mcp_headers import mcp_headers_dependency
from utils.responses import extract_text_from_response_output_item
from utils.shields import detect_shield_violations, get_available_shields
from utils.shields import (
append_turn_to_conversation,
run_shield_moderation,
)
from utils.suid import normalize_conversation_id, to_llama_stack_conversation_id
from utils.token_counter import TokenCounter
from utils.types import RAGChunk, ToolCallSummary, ToolResultSummary, TurnSummary
Expand Down Expand Up @@ -322,9 +325,6 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche
and the conversation ID, the list of parsed referenced documents,
and token usage information.
"""
# List available shields for Responses API
available_shields = await get_available_shields(client)

# use system prompt from request or default one
system_prompt = get_system_prompt(query_request, configuration)
logger.debug("Using system prompt: %s", system_prompt)
Expand Down Expand Up @@ -370,6 +370,26 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche
conversation_id,
)

# Run shield moderation before calling LLM
moderation_result = await run_shield_moderation(client, input_text)
if moderation_result.blocked:
violation_message = moderation_result.message or ""
await append_turn_to_conversation(
client, llama_stack_conv_id, input_text, violation_message
)
summary = TurnSummary(
llm_response=violation_message,
tool_calls=[],
tool_results=[],
rag_chunks=[],
)
return (
summary,
normalize_conversation_id(conversation_id),
[],
TokenCounter(),
)

# Create OpenAI response using responses API
create_kwargs: dict[str, Any] = {
"input": input_text,
Expand All @@ -381,10 +401,6 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche
"conversation": llama_stack_conv_id,
}

# Add shields to extra_body if available
if available_shields:
create_kwargs["extra_body"] = {"guardrails": available_shields}

response = await client.responses.create(**create_kwargs)
response = cast(OpenAIResponseObject, response)
logger.info("Response: %s", response)
Expand All @@ -410,9 +426,6 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche
if tool_result:
tool_results.append(tool_result)

# Check for shield violations across all output items
detect_shield_violations(response.output)

logger.info(
"Response processing complete - Tool calls: %d, Response length: %d chars",
len(tool_calls),
Expand Down
4 changes: 2 additions & 2 deletions src/app/endpoints/streaming_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

from fastapi import APIRouter, Depends, Request
from fastapi.responses import StreamingResponse
from litellm.exceptions import RateLimitError
from llama_stack_client import (
APIConnectionError,
AsyncLlamaStackClient, # type: ignore
AsyncLlamaStackClient,
RateLimitError, # type: ignore
)
from llama_stack_client.types import UserMessage # type: ignore
from llama_stack_client.types.alpha.agents.agent_turn_response_stream_chunk import (
Expand Down
92 changes: 74 additions & 18 deletions src/app/endpoints/streaming_query_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@
from fastapi import APIRouter, Depends, Request
from fastapi.responses import StreamingResponse
from llama_stack.apis.agents.openai_responses import (
OpenAIResponseContentPartOutputText,
OpenAIResponseMessage,
OpenAIResponseObject,
OpenAIResponseObjectStream,
OpenAIResponseObjectStreamResponseCompleted,
OpenAIResponseObjectStreamResponseContentPartAdded,
OpenAIResponseObjectStreamResponseOutputTextDelta,
OpenAIResponseOutputMessageContentOutputText,
)
from llama_stack_client import AsyncLlamaStackClient

Expand Down Expand Up @@ -53,7 +59,10 @@
from utils.quota import consume_tokens, get_available_quotas
from utils.suid import normalize_conversation_id, to_llama_stack_conversation_id
from utils.mcp_headers import mcp_headers_dependency
from utils.shields import detect_shield_violations, get_available_shields
from utils.shields import (
append_turn_to_conversation,
run_shield_moderation,
)
Comment on lines +62 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential PII/secrets leak: blocked-path stores full input_text (including attachment content) into conversation.
Line 448-Line 450 appends input_text to the conversation on violation; but input_text includes raw attachment bodies (Line 417-Line 423). This can persist secrets even though the request is blocked. Consider storing only query_request.query (or a redacted attachment summary) in the conversation items.

Proposed mitigation (store redacted user message on violation)
-        await append_turn_to_conversation(
-            client, llama_stack_conv_id, input_text, violation_message
-        )
+        # Avoid persisting raw attachment contents on blocked requests.
+        user_message_for_storage = query_request.query
+        if query_request.attachments:
+            user_message_for_storage += "\n\n[Attachments redacted]"
+        await append_turn_to_conversation(
+            client, llama_stack_conv_id, user_message_for_storage, violation_message
+        )

Also applies to: 444-454

🤖 Prompt for AI Agents
In @src/app/endpoints/streaming_query_v2.py around lines 62 - 65, The code
currently appends the full input_text (which may include raw attachment bodies
built earlier) to the conversation when a shield violation is detected; change
the blocked-path behavior so append_turn_to_conversation receives a redacted
user message instead of full input_text — e.g., pass query_request.query or a
short redacted attachment summary created from the attachment loop — and ensure
any variable named input_text (or the place where attachment bodies are
concatenated) is not stored on violation; update the code paths where
append_turn_to_conversation is called after run_shield_moderation to use the
redacted string.

from utils.token_counter import TokenCounter
from utils.transcripts import store_transcript
from utils.types import ToolCallSummary, TurnSummary
Expand Down Expand Up @@ -234,12 +243,6 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat
# Capture the response object for token usage extraction
latest_response_object = getattr(chunk, "response", None)

# Check for shield violations in the completed response
if latest_response_object:
output = getattr(latest_response_object, "output", None)
if output is not None:
detect_shield_violations(output)

if not emitted_turn_complete:
final_message = summary.llm_response or "".join(text_parts)
if not final_message:
Expand Down Expand Up @@ -394,9 +397,6 @@ async def retrieve_response( # pylint: disable=too-many-locals
tuple: A tuple containing the streaming response object
and the conversation ID.
"""
# List available shields for Responses API
available_shields = await get_available_shields(client)

# use system prompt from request or default one
system_prompt = get_system_prompt(query_request, configuration)
logger.debug("Using system prompt: %s", system_prompt)
Expand Down Expand Up @@ -441,6 +441,18 @@ async def retrieve_response( # pylint: disable=too-many-locals
conversation_id,
)

# Run shield moderation before calling LLM
moderation_result = await run_shield_moderation(client, input_text)
if moderation_result.blocked:
violation_message = moderation_result.message or ""
await append_turn_to_conversation(
client, llama_stack_conv_id, input_text, violation_message
)
return (
create_violation_stream(violation_message, moderation_result.shield_model),
normalize_conversation_id(conversation_id),
)

create_params: dict[str, Any] = {
"input": input_text,
"model": model_id,
Expand All @@ -451,14 +463,58 @@ async def retrieve_response( # pylint: disable=too-many-locals
"conversation": llama_stack_conv_id,
}

# Add shields to extra_body if available
if available_shields:
create_params["extra_body"] = {"guardrails": available_shields}

response = await client.responses.create(**create_params)
response_stream = cast(AsyncIterator[OpenAIResponseObjectStream], response)
# async for chunk in response_stream:
# logger.error("Chunk: %s", chunk.model_dump_json())
# Return the normalized conversation_id
# The response_generator will emit it in the start event

return response_stream, normalize_conversation_id(conversation_id)


async def create_violation_stream(
message: str,
shield_model: str | None = None,
) -> AsyncIterator[OpenAIResponseObjectStream]:
"""Generate a minimal streaming response for cases where input is blocked by a shield.

This yields only the essential streaming events to indicate that the input was rejected.
Dummy item identifiers are used solely for protocol compliance and are not used later.
"""
response_id = "resp_shield_violation"

# Content part added (triggers empty initial token)
yield OpenAIResponseObjectStreamResponseContentPartAdded(
content_index=0,
response_id=response_id,
item_id="msg_shield_violation_1",
output_index=0,
part=OpenAIResponseContentPartOutputText(text=""),
sequence_number=0,
)

# Text delta
yield OpenAIResponseObjectStreamResponseOutputTextDelta(
content_index=1,
delta=message,
item_id="msg_shield_violation_2",
output_index=1,
sequence_number=1,
)

# Completed response
yield OpenAIResponseObjectStreamResponseCompleted(
response=OpenAIResponseObject(
id=response_id,
created_at=0, # not used
model=shield_model or "shield",
output=[
OpenAIResponseMessage(
id="msg_shield_violation_3",
content=[
OpenAIResponseOutputMessageContentOutputText(text=message)
],
role="assistant",
status="completed",
)
],
status="completed",
)
)
Comment on lines +472 to +520
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

llama-stack 0.3.5 streaming response event schema item_id output_index content_index consistency

💡 Result:

Brief summary for llama-stack (v0.3.5) streaming response event fields — meaning and consistency guarantees:

  • item_id — unique identifier for a single output item (stays the same across all streaming events that update that item). Use this to correlate delta/done/other events for the same output item. [1][3]

  • output_index — integer index of the output item’s position in the response output array (stable for the lifetime of that item in this response). Use this to order/match items inside the response list. [1][2]

  • content_index — integer index of the part within an output item’s content (e.g., for multi-part content or multiple content arrays); used to locate which content element a delta or done event refers to. [1][2]

  • sequence_number — monotonically increasing integer used to order streaming events globally for the response (useful to correctly interleave events when receiving them out of network order). Events for the same item/content will have sequence numbers that reflect the correct event ordering. [1][2]

Consistency notes / invariants to rely on:

  • item_id + output_index together identify the particular output item in a response; item_id is the stable unique id. [1][3]
  • content_index identifies which content element inside that item is being updated; deltas and done events include the content_index they apply to. [1][2]
  • sequence_number provides total ordering for events (clients should apply events in sequence_number order to reconstruct text/function-call arguments/etc.). [1][2]
  • Event types appear under a fixed "type" field (e.g., response.output_text.delta, response.output_text.done, response.content_part.added, response.content_part.done) so clients can branch handling by type. [1][2]

Sources:

  • llama-stack OpenAPI/streaming spec (schema definitions showing item_id, output_index, content_index, sequence_number and event types). [1][2][3]

🏁 Script executed:

# Find and examine the actual code at the specified lines
find . -type f -name "streaming_query_v2.py" | head -5

Repository: lightspeed-core/lightspeed-stack

Length of output: 117


🏁 Script executed:

# Read the actual code at the specified lines
sed -n '472,520p' ./src/app/endpoints/streaming_query_v2.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 1734


create_violation_stream must keep item_id, output_index, and content_index consistent across streaming events.

The current implementation uses different IDs and indices across events: content_part.added (lines 484–492) uses item_id="msg_shield_violation_1", output_index=0, content_index=0, but output_text.delta (lines 494–500) uses item_id="msg_shield_violation_2", output_index=1, content_index=1. According to the llama-stack 0.3.5 streaming schema, item_id must remain stable to correlate events for the same output item, and output_index and content_index must be stable for the lifetime of that item. Inconsistent values will break consumers that reconstruct messages by correlating events via these fields.

Proposed fix (consistent IDs/indices)
 async def create_violation_stream(
     message: str,
     shield_model: str | None = None,
 ) -> AsyncIterator[OpenAIResponseObjectStream]:
@@
     response_id = "resp_shield_violation"
+    item_id = "msg_shield_violation"
+    output_index = 0
+    content_index = 0
 
     # Content part added (triggers empty initial token)
     yield OpenAIResponseObjectStreamResponseContentPartAdded(
-        content_index=0,
+        content_index=content_index,
         response_id=response_id,
-        item_id="msg_shield_violation_1",
-        output_index=0,
+        item_id=item_id,
+        output_index=output_index,
         part=OpenAIResponseContentPartOutputText(text=""),
         sequence_number=0,
     )
 
     # Text delta
     yield OpenAIResponseObjectStreamResponseOutputTextDelta(
-        content_index=1,
+        content_index=content_index,
         delta=message,
-        item_id="msg_shield_violation_2",
-        output_index=1,
+        item_id=item_id,
+        output_index=output_index,
         sequence_number=1,
     )
@@
             output=[
                 OpenAIResponseMessage(
-                    id="msg_shield_violation_3",
+                    id=item_id,
                     content=[
                         OpenAIResponseOutputMessageContentOutputText(text=message)
                     ],
                     role="assistant",
                     status="completed",
                 )
             ],
             status="completed",
         )
     )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def create_violation_stream(
message: str,
shield_model: str | None = None,
) -> AsyncIterator[OpenAIResponseObjectStream]:
"""Generate a minimal streaming response for cases where input is blocked by a shield.
This yields only the essential streaming events to indicate that the input was rejected.
Dummy item identifiers are used solely for protocol compliance and are not used later.
"""
response_id = "resp_shield_violation"
# Content part added (triggers empty initial token)
yield OpenAIResponseObjectStreamResponseContentPartAdded(
content_index=0,
response_id=response_id,
item_id="msg_shield_violation_1",
output_index=0,
part=OpenAIResponseContentPartOutputText(text=""),
sequence_number=0,
)
# Text delta
yield OpenAIResponseObjectStreamResponseOutputTextDelta(
content_index=1,
delta=message,
item_id="msg_shield_violation_2",
output_index=1,
sequence_number=1,
)
# Completed response
yield OpenAIResponseObjectStreamResponseCompleted(
response=OpenAIResponseObject(
id=response_id,
created_at=0, # not used
model=shield_model or "shield",
output=[
OpenAIResponseMessage(
id="msg_shield_violation_3",
content=[
OpenAIResponseOutputMessageContentOutputText(text=message)
],
role="assistant",
status="completed",
)
],
status="completed",
)
)
async def create_violation_stream(
message: str,
shield_model: str | None = None,
) -> AsyncIterator[OpenAIResponseObjectStream]:
"""Generate a minimal streaming response for cases where input is blocked by a shield.
This yields only the essential streaming events to indicate that the input was rejected.
Dummy item identifiers are used solely for protocol compliance and are not used later.
"""
response_id = "resp_shield_violation"
item_id = "msg_shield_violation"
output_index = 0
content_index = 0
# Content part added (triggers empty initial token)
yield OpenAIResponseObjectStreamResponseContentPartAdded(
content_index=content_index,
response_id=response_id,
item_id=item_id,
output_index=output_index,
part=OpenAIResponseContentPartOutputText(text=""),
sequence_number=0,
)
# Text delta
yield OpenAIResponseObjectStreamResponseOutputTextDelta(
content_index=content_index,
delta=message,
item_id=item_id,
output_index=output_index,
sequence_number=1,
)
# Completed response
yield OpenAIResponseObjectStreamResponseCompleted(
response=OpenAIResponseObject(
id=response_id,
created_at=0, # not used
model=shield_model or "shield",
output=[
OpenAIResponseMessage(
id=item_id,
content=[
OpenAIResponseOutputMessageContentOutputText(text=message)
],
role="assistant",
status="completed",
)
],
status="completed",
)
)
🤖 Prompt for AI Agents
In @src/app/endpoints/streaming_query_v2.py around lines 472 - 520,
create_violation_stream currently mixes item_id, output_index, and content_index
across events which breaks stream correlation; make these values stable for the
same output item: pick a single item_id (e.g., "msg_shield_violation_1") and
reuse it for the ContentPartAdded, OutputTextDelta, and the final
OpenAIResponseMessage, and keep content_index and output_index the same (e.g.,
content_index=0 and output_index=0) across those yields while still incrementing
sequence_number as appropriate; update the
OpenAIResponseObjectStreamResponseOutputTextDelta and the
OpenAIResponseObjectStreamResponseCompleted/OpenAIResponseMessage entries to use
the same item_id, content_index, and output_index used in
OpenAIResponseObjectStreamResponseContentPartAdded.

Loading
Loading