diff --git a/crates/coverage-report/src/streaming_expected_differences.json b/crates/coverage-report/src/streaming_expected_differences.json index 0a8ba15..0cd2bce 100644 --- a/crates/coverage-report/src/streaming_expected_differences.json +++ b/crates/coverage-report/src/streaming_expected_differences.json @@ -29,8 +29,7 @@ "source": "ChatCompletions", "target": "*", "fields": [ - { "pattern": "choices[*].delta.refusal", "reason": "ChatCompletions refusal field has no equivalent in other providers" }, - { "pattern": "choices[*].delta.tool_calls", "reason": "Streaming tool_calls transformation not yet implemented" } + { "pattern": "choices[*].delta.refusal", "reason": "ChatCompletions refusal field has no equivalent in other providers" } ] }, { @@ -48,11 +47,11 @@ ] }, { - "source": "Responses", + "source": "*", "target": "Anthropic", "fields": [ - { "pattern": "id", "reason": "Responses API response IDs don't map to Anthropic streaming format" }, - { "pattern": "model", "reason": "Responses API model field isn't preserved in Anthropic streaming events" } + { "pattern": "id", "reason": "Anthropic streaming separates metadata (message_start) from content events; id may be lost when tool_calls prevent emitting message_start" }, + { "pattern": "model", "reason": "Anthropic streaming separates metadata (message_start) from content events; model may be lost when tool_calls prevent emitting message_start" } ] } ], @@ -60,17 +59,19 @@ { "testCase": "toolCallRequest", "source": "ChatCompletions", - "target": "*", + "target": "Responses", "fields": [ - { "pattern": "choices[*].delta.content", "reason": "ChatCompletions sends null content when tool_calls are present; not preserved in transformation" } + { "pattern": "id", "reason": "Responses streaming separates metadata from content events; id lost when first chunk has tool_calls" }, + { "pattern": "model", "reason": "Responses streaming separates metadata from content events; model lost when first chunk has tool_calls" } ] }, { "testCase": "toolChoiceRequiredParam", "source": "ChatCompletions", - "target": "*", + "target": "Responses", "fields": [ - { "pattern": "choices[*].delta.content", "reason": "ChatCompletions sends null content when tool_calls are present; not preserved in transformation" } + { "pattern": "id", "reason": "Responses streaming separates metadata from content events; id lost when first chunk has tool_calls" }, + { "pattern": "model", "reason": "Responses streaming separates metadata from content events; model lost when first chunk has tool_calls" } ] } ] diff --git a/crates/lingua/src/providers/anthropic/adapter.rs b/crates/lingua/src/providers/anthropic/adapter.rs index 04e7e3f..1c5cb87 100644 --- a/crates/lingua/src/providers/anthropic/adapter.rs +++ b/crates/lingua/src/providers/anthropic/adapter.rs @@ -368,7 +368,6 @@ impl ProviderAdapter for AnthropicAdapter { match event_type { "content_block_delta" => { - // Extract text delta - only handle text_delta type for basic text support let delta = payload.get("delta"); let delta_type = delta.and_then(|d| d.get("type")).and_then(Value::as_str); @@ -378,7 +377,7 @@ impl ProviderAdapter for AnthropicAdapter { // Use null for empty/missing text, preserving semantic equivalence with source let content_value = match text { Some(t) if !t.is_empty() => Value::String(t.to_string()), - _ => Value::Null, // Empty or missing text becomes null + _ => Value::Null, }; let index = payload.get("index").and_then(Value::as_u64).unwrap_or(0) as u32; @@ -399,7 +398,35 @@ impl ProviderAdapter for AnthropicAdapter { ))); } - // For non-text deltas (tool_use, etc.), return keep-alive + if delta_type == Some("input_json_delta") { + let partial_json = delta + .and_then(|d| d.get("partial_json")) + .and_then(Value::as_str) + .unwrap_or(""); + let block_index = + payload.get("index").and_then(Value::as_u64).unwrap_or(0) as u32; + + return Ok(Some(UniversalStreamChunk::new( + None, + None, + vec![UniversalStreamChoice { + index: 0, + delta: Some(serde_json::json!({ + "tool_calls": [{ + "index": block_index, + "function": { + "arguments": partial_json + } + }] + })), + finish_reason: None, + }], + None, + None, + ))); + } + + // For other delta types, return keep-alive Ok(Some(UniversalStreamChunk::keep_alive())) } @@ -466,11 +493,54 @@ impl ProviderAdapter for AnthropicAdapter { Ok(None) } - "content_block_start" | "content_block_stop" | "ping" => { - // Metadata events - return keep-alive + "content_block_start" => { + let content_block = payload.get("content_block"); + let block_type = content_block + .and_then(|b| b.get("type")) + .and_then(Value::as_str); + + if block_type == Some("tool_use") { + let id = content_block + .and_then(|b| b.get("id")) + .and_then(Value::as_str) + .unwrap_or(""); + let name = content_block + .and_then(|b| b.get("name")) + .and_then(Value::as_str) + .unwrap_or(""); + let block_index = + payload.get("index").and_then(Value::as_u64).unwrap_or(0) as u32; + + return Ok(Some(UniversalStreamChunk::new( + None, + None, + vec![UniversalStreamChoice { + index: 0, + delta: Some(serde_json::json!({ + "role": "assistant", + "content": Value::Null, + "tool_calls": [{ + "index": block_index, + "id": id, + "type": "function", + "function": { + "name": name, + "arguments": "" + } + }] + })), + finish_reason: None, + }], + None, + None, + ))); + } + Ok(Some(UniversalStreamChunk::keep_alive())) } + "content_block_stop" | "ping" => Ok(Some(UniversalStreamChunk::keep_alive())), + _ => { // Unknown event type - return keep-alive Ok(Some(UniversalStreamChunk::keep_alive())) @@ -491,16 +561,26 @@ impl ProviderAdapter for AnthropicAdapter { .and_then(|c| c.finish_reason.as_ref()) .is_some(); + // Check if delta has tool_calls + let has_tool_calls = chunk + .choices + .first() + .and_then(|c| c.delta.as_ref()) + .and_then(|d| d.get("tool_calls")) + .and_then(Value::as_array) + .is_some_and(|arr| !arr.is_empty()); + // Check if this is an initial metadata chunk (has model/id/usage but no content) + // Exclude chunks with tool_calls - those must be handled by the tool call path let is_initial_metadata = (chunk.model.is_some() || chunk.id.is_some() || chunk.usage.is_some()) && !has_finish + && !has_tool_calls && chunk .choices .first() .and_then(|c| c.delta.as_ref()) .is_none_or(|d| { - // Initial chunk has role but empty/no content d.get("content") .and_then(Value::as_str) .is_none_or(|s| s.is_empty()) @@ -564,6 +644,50 @@ impl ProviderAdapter for AnthropicAdapter { // Check if this is a content delta if let Some(choice) = chunk.choices.first() { if let Some(delta) = &choice.delta { + // Check for tool_calls in the delta + if let Some(tool_calls) = delta.get("tool_calls").and_then(Value::as_array) { + if let Some(tc) = tool_calls.first() { + let tool_index = + tc.get("index").and_then(Value::as_u64).unwrap_or(0) as u32; + + // Initial tool call chunk has an id field + if let Some(id) = tc.get("id").and_then(Value::as_str) { + let name = tc + .get("function") + .and_then(|f| f.get("name")) + .and_then(Value::as_str) + .unwrap_or(""); + + return Ok(serde_json::json!({ + "type": "content_block_start", + "index": tool_index, + "content_block": { + "type": "tool_use", + "id": id, + "name": name, + "input": {} + } + })); + } + + // Subsequent chunks have only function.arguments + if let Some(arguments) = tc + .get("function") + .and_then(|f| f.get("arguments")) + .and_then(Value::as_str) + { + return Ok(serde_json::json!({ + "type": "content_block_delta", + "index": tool_index, + "delta": { + "type": "input_json_delta", + "partial_json": arguments + } + })); + } + } + } + if let Some(content) = delta.get("content").and_then(Value::as_str) { return Ok(serde_json::json!({ "type": "content_block_delta", @@ -575,16 +699,12 @@ impl ProviderAdapter for AnthropicAdapter { })); } - // Role-only delta or null content - return empty text_delta - // Treat null content the same as missing content (semantically equivalent) - // Using text_delta (instead of content_block_start) ensures proper roundtrip - // since our stream_to_universal converts empty text back to null - // Note: When tool_calls are present with null content, this will emit empty text - // which is documented as an expected limitation in streaming_expected_differences.json + // Role-only delta or null content without tool_calls - return empty text_delta let content_is_missing_or_null = delta.get("content").is_none() || delta.get("content") == Some(&Value::Null); + let has_tool_calls = delta.get("tool_calls").is_some(); - if delta.get("role").is_some() && content_is_missing_or_null { + if delta.get("role").is_some() && content_is_missing_or_null && !has_tool_calls { return Ok(serde_json::json!({ "type": "content_block_delta", "index": choice.index, diff --git a/crates/lingua/src/providers/openai/responses_adapter.rs b/crates/lingua/src/providers/openai/responses_adapter.rs index 634ba82..9d2ce65 100644 --- a/crates/lingua/src/providers/openai/responses_adapter.rs +++ b/crates/lingua/src/providers/openai/responses_adapter.rs @@ -612,6 +612,80 @@ impl ProviderAdapter for ResponsesAdapter { ))) } + "response.output_item.added" => { + // Tool call start - extract call_id, name, and output_index + let item = payload.get("item"); + let item_type = item.and_then(|i| i.get("type")).and_then(Value::as_str); + + if item_type == Some("function_call") { + let call_id = item + .and_then(|i| i.get("call_id")) + .and_then(Value::as_str) + .unwrap_or(""); + let name = item + .and_then(|i| i.get("name")) + .and_then(Value::as_str) + .unwrap_or(""); + let output_index = payload + .get("output_index") + .and_then(Value::as_u64) + .unwrap_or(0) as u32; + + return Ok(Some(UniversalStreamChunk::new( + None, + None, + vec![UniversalStreamChoice { + index: 0, + delta: Some(serde_json::json!({ + "role": "assistant", + "content": Value::Null, + "tool_calls": [{ + "index": output_index, + "id": call_id, + "type": "function", + "function": { + "name": name, + "arguments": "" + } + }] + })), + finish_reason: None, + }], + None, + None, + ))); + } + + Ok(Some(UniversalStreamChunk::keep_alive())) + } + + "response.function_call_arguments.delta" => { + let arguments = payload.get("delta").and_then(Value::as_str).unwrap_or(""); + let output_index = payload + .get("output_index") + .and_then(Value::as_u64) + .unwrap_or(0) as u32; + + Ok(Some(UniversalStreamChunk::new( + None, + None, + vec![UniversalStreamChoice { + index: 0, + delta: Some(serde_json::json!({ + "tool_calls": [{ + "index": output_index, + "function": { + "arguments": arguments + } + }] + })), + finish_reason: None, + }], + None, + None, + ))) + } + // All other events are metadata/keep-alive _ => Ok(Some(UniversalStreamChunk::keep_alive())), } @@ -633,16 +707,26 @@ impl ProviderAdapter for ResponsesAdapter { .and_then(|c| c.finish_reason.as_ref()) .is_some(); + // Check if delta has tool_calls + let has_tool_calls = chunk + .choices + .first() + .and_then(|c| c.delta.as_ref()) + .and_then(|d| d.get("tool_calls")) + .and_then(Value::as_array) + .is_some_and(|arr| !arr.is_empty()); + // Check if this is an initial metadata chunk (has model/id/usage but no content) + // Exclude chunks with tool_calls - those must be handled by the tool call path let is_initial_metadata = (chunk.model.is_some() || chunk.id.is_some() || chunk.usage.is_some()) && !has_finish + && !has_tool_calls && chunk .choices .first() .and_then(|c| c.delta.as_ref()) .is_none_or(|d| { - // Initial chunk has role but empty/no content d.get("content") .and_then(Value::as_str) .is_none_or(|s| s.is_empty()) @@ -709,6 +793,48 @@ impl ProviderAdapter for ResponsesAdapter { // Check for content delta if let Some(choice) = chunk.choices.first() { if let Some(delta) = &choice.delta { + // Check for tool_calls in the delta + if let Some(tool_calls) = delta.get("tool_calls").and_then(Value::as_array) { + if let Some(tc) = tool_calls.first() { + let output_index = + tc.get("index").and_then(Value::as_u64).unwrap_or(0) as u32; + + // Initial tool call chunk has an id field + if let Some(call_id) = tc.get("id").and_then(Value::as_str) { + let name = tc + .get("function") + .and_then(|f| f.get("name")) + .and_then(Value::as_str) + .unwrap_or(""); + + return Ok(serde_json::json!({ + "type": "response.output_item.added", + "output_index": output_index, + "item": { + "type": "function_call", + "status": "in_progress", + "call_id": call_id, + "name": name, + "arguments": "" + } + })); + } + + // Subsequent chunks have only function.arguments + if let Some(arguments) = tc + .get("function") + .and_then(|f| f.get("arguments")) + .and_then(Value::as_str) + { + return Ok(serde_json::json!({ + "type": "response.function_call_arguments.delta", + "output_index": output_index, + "delta": arguments + })); + } + } + } + if let Some(content) = delta.get("content").and_then(Value::as_str) { return Ok(serde_json::json!({ "type": "response.output_text.delta", @@ -718,15 +844,11 @@ impl ProviderAdapter for ResponsesAdapter { })); } - // If content is null or missing, return empty text delta - // Using text delta (instead of output_item.start) ensures proper roundtrip - // since our stream_to_universal converts empty text back to null - // Note: When tool_calls are present with null content, this will emit empty text - // which is documented as an expected limitation in streaming_expected_differences.json + // If content is null or missing and no tool_calls, return empty text delta let content_is_missing_or_null = delta.get("content").is_none() || delta.get("content") == Some(&Value::Null); - if content_is_missing_or_null { + if content_is_missing_or_null && !has_tool_calls { return Ok(serde_json::json!({ "type": "response.output_text.delta", "output_index": choice.index,