From a3f569cfcfd55e77bc9bd09d85a6cdf92dd323aa Mon Sep 17 00:00:00 2001 From: Yogesh Vaishnav Date: Mon, 26 Jan 2026 23:46:20 +0530 Subject: [PATCH 1/2] fix: mark text completed on each text-complete point of response parsing This makes sure that are always consistent text-start..text-complete event pairs Resolves: https://github.com/prism-php/prism/issues/870#issue-3855351301 --- src/Providers/Anthropic/Handlers/Stream.php | 17 +++++-- src/Providers/DeepSeek/Handlers/Stream.php | 4 ++ src/Providers/Gemini/Handlers/Stream.php | 2 + src/Providers/Groq/Handlers/Stream.php | 4 ++ src/Providers/Mistral/Handlers/Stream.php | 6 +++ src/Providers/Ollama/Handlers/Stream.php | 4 ++ src/Providers/OpenAI/Handlers/Stream.php | 2 + src/Providers/OpenRouter/Handlers/Stream.php | 6 +++ src/Providers/XAI/Handlers/Stream.php | 4 ++ src/Streaming/StreamState.php | 7 +++ .../stream-multiple-output-items-1.json | 50 +++++++++++++++++++ tests/Providers/OpenAI/StreamTest.php | 29 +++++++++++ tests/Unit/Streaming/StreamStateTest.php | 10 ++++ 13 files changed, 140 insertions(+), 5 deletions(-) create mode 100644 tests/Fixtures/openai/stream-multiple-output-items-1.json diff --git a/src/Providers/Anthropic/Handlers/Stream.php b/src/Providers/Anthropic/Handlers/Stream.php index d8a7765c6..1af791433 100644 --- a/src/Providers/Anthropic/Handlers/Stream.php +++ b/src/Providers/Anthropic/Handlers/Stream.php @@ -213,11 +213,7 @@ protected function handleContentBlockDelta(array $event): ?StreamEvent protected function handleContentBlockStop(array $event): ?StreamEvent { $result = match ($this->state->currentBlockType()) { - 'text' => new TextCompleteEvent( - id: EventID::generate(), - timestamp: time(), - messageId: $this->state->messageId() - ), + 'text' => $this->handleTextComplete(), 'thinking' => new ThinkingCompleteEvent( id: EventID::generate(), timestamp: time(), @@ -306,6 +302,17 @@ protected function handleThinkingStart(): ThinkingStartEvent ); } + protected function handleTextComplete(): TextCompleteEvent + { + $this->state->markTextCompleted(); + + return new TextCompleteEvent( + id: EventID::generate(), + timestamp: time(), + messageId: $this->state->messageId() + ); + } + /** * @param array $delta */ diff --git a/src/Providers/DeepSeek/Handlers/Stream.php b/src/Providers/DeepSeek/Handlers/Stream.php index 64941f6a6..d03709874 100644 --- a/src/Providers/DeepSeek/Handlers/Stream.php +++ b/src/Providers/DeepSeek/Handlers/Stream.php @@ -111,6 +111,8 @@ protected function processStream(Response $response, Request $request, int $dept $rawFinishReason = data_get($data, 'choices.0.finish_reason'); if ($rawFinishReason === 'tool_calls') { if ($this->state->hasTextStarted() && $text !== '') { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), @@ -193,6 +195,8 @@ protected function processStream(Response $response, Request $request, int $dept $finishReason = $this->mapFinishReason($data); if ($this->state->hasTextStarted() && $text !== '') { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), diff --git a/src/Providers/Gemini/Handlers/Stream.php b/src/Providers/Gemini/Handlers/Stream.php index 2e310430b..a97c25718 100644 --- a/src/Providers/Gemini/Handlers/Stream.php +++ b/src/Providers/Gemini/Handlers/Stream.php @@ -215,6 +215,8 @@ protected function processStream(Response $response, Request $request, int $dept } if ($this->state->hasTextStarted()) { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), diff --git a/src/Providers/Groq/Handlers/Stream.php b/src/Providers/Groq/Handlers/Stream.php index 90b67f422..7b6d7f376 100644 --- a/src/Providers/Groq/Handlers/Stream.php +++ b/src/Providers/Groq/Handlers/Stream.php @@ -121,6 +121,8 @@ protected function processStream(Response $response, Request $request, int $dept if ($this->mapFinishReason($data) === FinishReason::ToolCalls) { // Complete any ongoing text if ($this->state->hasTextStarted() && $text !== '') { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), @@ -162,6 +164,8 @@ protected function processStream(Response $response, Request $request, int $dept $finishReason = $this->mapFinishReason($data); if ($this->state->hasTextStarted() && $text !== '') { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), diff --git a/src/Providers/Mistral/Handlers/Stream.php b/src/Providers/Mistral/Handlers/Stream.php index dc0f9a31b..13c5eb744 100644 --- a/src/Providers/Mistral/Handlers/Stream.php +++ b/src/Providers/Mistral/Handlers/Stream.php @@ -117,6 +117,8 @@ protected function processStream(Response $response, Request $request, int $dept if ($this->mapFinishReason($data) === FinishReason::ToolCalls) { if ($this->state->hasTextStarted() && $text !== '') { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), @@ -134,6 +136,8 @@ protected function processStream(Response $response, Request $request, int $dept if ($this->mapFinishReason($data) === FinishReason::ToolCalls) { if ($this->state->hasTextStarted() && $text !== '') { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), @@ -219,6 +223,8 @@ protected function processStream(Response $response, Request $request, int $dept } if ($this->state->hasTextStarted() && $text !== '') { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), diff --git a/src/Providers/Ollama/Handlers/Stream.php b/src/Providers/Ollama/Handlers/Stream.php index 60c22725d..b8833bdcd 100644 --- a/src/Providers/Ollama/Handlers/Stream.php +++ b/src/Providers/Ollama/Handlers/Stream.php @@ -173,6 +173,8 @@ protected function processStream(Response $response, Request $request, int $dept if ((bool) data_get($data, 'done', false) && $this->state->hasToolCalls()) { // Emit text complete if we had text content if ($this->state->hasTextStarted()) { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), @@ -189,6 +191,8 @@ protected function processStream(Response $response, Request $request, int $dept if ((bool) data_get($data, 'done', false)) { // Emit text complete if we had text content if ($this->state->hasTextStarted()) { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), diff --git a/src/Providers/OpenAI/Handlers/Stream.php b/src/Providers/OpenAI/Handlers/Stream.php index b5501d9d2..e307e6cd3 100644 --- a/src/Providers/OpenAI/Handlers/Stream.php +++ b/src/Providers/OpenAI/Handlers/Stream.php @@ -245,6 +245,8 @@ protected function processStream(Response $response, Request $request, int $dept } if (data_get($data, 'type') === 'response.output_text.done' && $this->state->hasTextStarted()) { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), diff --git a/src/Providers/OpenRouter/Handlers/Stream.php b/src/Providers/OpenRouter/Handlers/Stream.php index 63847c686..168e4bc13 100644 --- a/src/Providers/OpenRouter/Handlers/Stream.php +++ b/src/Providers/OpenRouter/Handlers/Stream.php @@ -108,6 +108,8 @@ protected function processStream(Response $response, Request $request, int $dept $finishReason = data_get($data, 'choices.0.finish_reason'); if ($finishReason !== null && $this->mapFinishReason($data) === FinishReason::ToolCalls) { if ($this->state->hasTextStarted() && $text !== '') { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), @@ -134,6 +136,8 @@ protected function processStream(Response $response, Request $request, int $dept $finishReasonValue = data_get($data, 'choices.0.finish_reason'); if ($finishReasonValue !== null && $this->mapFinishReason($data) === FinishReason::ToolCalls) { if ($this->state->hasTextStarted() && $text !== '') { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), @@ -210,6 +214,8 @@ protected function processStream(Response $response, Request $request, int $dept $finishReason = $this->mapFinishReason($data); if ($this->state->hasTextStarted() && $text !== '') { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), diff --git a/src/Providers/XAI/Handlers/Stream.php b/src/Providers/XAI/Handlers/Stream.php index 90e039f86..f836910ca 100644 --- a/src/Providers/XAI/Handlers/Stream.php +++ b/src/Providers/XAI/Handlers/Stream.php @@ -188,6 +188,8 @@ protected function processStream(Response $response, Request $request, int $dept if ($toolCalls !== []) { if ($this->state->hasTextStarted() && $text !== '') { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), @@ -201,6 +203,8 @@ protected function processStream(Response $response, Request $request, int $dept } if ($this->state->hasTextStarted() && $text !== '') { + $this->state->markTextCompleted(); + yield new TextCompleteEvent( id: EventID::generate(), timestamp: time(), diff --git a/src/Streaming/StreamState.php b/src/Streaming/StreamState.php index bbe320f97..b8f488349 100644 --- a/src/Streaming/StreamState.php +++ b/src/Streaming/StreamState.php @@ -118,6 +118,13 @@ public function markTextStarted(): self return $this; } + public function markTextCompleted(): self + { + $this->textStarted = false; + + return $this; + } + public function markThinkingStarted(): self { $this->thinkingStarted = true; diff --git a/tests/Fixtures/openai/stream-multiple-output-items-1.json b/tests/Fixtures/openai/stream-multiple-output-items-1.json new file mode 100644 index 000000000..90597e625 --- /dev/null +++ b/tests/Fixtures/openai/stream-multiple-output-items-1.json @@ -0,0 +1,50 @@ +event: response.created +data: {"type":"response.created","response":{"id":"resp_001","object":"response","created_at":1750705325,"status":"in_progress","model":"gpt-4o","output":[],"usage":null}} + +event: response.in_progress +data: {"type":"response.in_progress","response":{"id":"resp_001","status":"in_progress"}} + +event: response.output_item.added +data: {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_001","type":"message","status":"in_progress","content":[],"role":"assistant"}} + +event: response.content_part.added +data: {"type":"response.content_part.added","item_id":"msg_001","output_index":0,"content_index":0,"part":{"type":"output_text","text":""}} + +event: response.output_text.delta +data: {"type":"response.output_text.delta","item_id":"msg_001","output_index":0,"content_index":0,"delta":"First"} + +event: response.output_text.delta +data: {"type":"response.output_text.delta","item_id":"msg_001","output_index":0,"content_index":0,"delta":" message"} + +event: response.output_text.done +data: {"type":"response.output_text.done","item_id":"msg_001","output_index":0,"content_index":0,"text":"First message"} + +event: response.content_part.done +data: {"type":"response.content_part.done","item_id":"msg_001","output_index":0,"content_index":0,"part":{"type":"output_text","text":"First message"}} + +event: response.output_item.done +data: {"type":"response.output_item.done","item":{"id":"msg_001","type":"message","status":"completed","content":[{"type":"output_text","text":"First message"}],"role":"assistant"}} + +event: response.output_item.added +data: {"type":"response.output_item.added","output_index":1,"item":{"id":"msg_002","type":"message","status":"in_progress","content":[],"role":"assistant"}} + +event: response.content_part.added +data: {"type":"response.content_part.added","item_id":"msg_002","output_index":1,"content_index":0,"part":{"type":"output_text","text":""}} + +event: response.output_text.delta +data: {"type":"response.output_text.delta","item_id":"msg_002","output_index":1,"content_index":0,"delta":"Second"} + +event: response.output_text.delta +data: {"type":"response.output_text.delta","item_id":"msg_002","output_index":1,"content_index":0,"delta":" message"} + +event: response.output_text.done +data: {"type":"response.output_text.done","item_id":"msg_002","output_index":1,"content_index":0,"text":"Second message"} + +event: response.content_part.done +data: {"type":"response.content_part.done","item_id":"msg_002","output_index":1,"content_index":0,"part":{"type":"output_text","text":"Second message"}} + +event: response.output_item.done +data: {"type":"response.output_item.done","item":{"id":"msg_002","type":"message","status":"completed","content":[{"type":"output_text","text":"Second message"}],"role":"assistant"}} + +event: response.completed +data: {"type":"response.completed","response":{"id":"resp_001","status":"completed","output":[{"id":"msg_001","type":"message","status":"completed","content":[{"type":"output_text","text":"First message"}],"role":"assistant"},{"id":"msg_002","type":"message","status":"completed","content":[{"type":"output_text","text":"Second message"}],"role":"assistant"}],"usage":{"input_tokens":10,"output_tokens":4}}} diff --git a/tests/Providers/OpenAI/StreamTest.php b/tests/Providers/OpenAI/StreamTest.php index 1883400d9..6f71c9d8a 100644 --- a/tests/Providers/OpenAI/StreamTest.php +++ b/tests/Providers/OpenAI/StreamTest.php @@ -755,3 +755,32 @@ $lastEvent = end($events); expect($lastEvent)->toBeInstanceOf(StreamEndEvent::class); }); + +it('emits TextStart and TextComplete events for each output item', function (): void { + FixtureResponse::fakeResponseSequence('v1/responses', 'openai/stream-multiple-output-items'); + + $response = Prism::text() + ->using('openai', 'gpt-4o') + ->withPrompt('Test multiple output items') + ->asStream(); + + $events = []; + foreach ($response as $event) { + $events[] = $event; + } + + $textStartEvents = array_filter($events, fn (\Prism\Prism\Streaming\Events\StreamEvent $e): bool => $e instanceof \Prism\Prism\Streaming\Events\TextStartEvent); + $textCompleteEvents = array_filter($events, fn (\Prism\Prism\Streaming\Events\StreamEvent $e): bool => $e instanceof \Prism\Prism\Streaming\Events\TextCompleteEvent); + $textDeltaEvents = array_filter($events, fn (\Prism\Prism\Streaming\Events\StreamEvent $e): bool => $e instanceof TextDeltaEvent); + + expect($textStartEvents)->toHaveCount(2); + expect($textCompleteEvents)->toHaveCount(2); + expect($textDeltaEvents)->toHaveCount(4); + + $textStartIndices = array_keys($textStartEvents); + $textCompleteIndices = array_keys($textCompleteEvents); + + expect($textStartIndices[0])->toBeLessThan($textCompleteIndices[0]); + expect($textCompleteIndices[0])->toBeLessThan($textStartIndices[1]); + expect($textStartIndices[1])->toBeLessThan($textCompleteIndices[1]); +}); diff --git a/tests/Unit/Streaming/StreamStateTest.php b/tests/Unit/Streaming/StreamStateTest.php index d786c38c7..888ca892f 100644 --- a/tests/Unit/Streaming/StreamStateTest.php +++ b/tests/Unit/Streaming/StreamStateTest.php @@ -101,6 +101,16 @@ ->and($state->hasTextStarted())->toBeTrue(); }); +it('markTextCompleted returns self and resets flag', function (): void { + $state = new StreamState; + $state->markTextStarted(); + + $result = $state->markTextCompleted(); + + expect($result)->toBe($state) + ->and($state->hasTextStarted())->toBeFalse(); +}); + it('markThinkingStarted returns self and sets flag', function (): void { $state = new StreamState; From 0eb7c0a45c3bbd8d45db78ffacbc98e8dcc7dbf8 Mon Sep 17 00:00:00 2001 From: TJ Miller Date: Mon, 26 Jan 2026 16:40:45 -0500 Subject: [PATCH 2/2] Cleanup --- tests/Providers/OpenAI/StreamTest.php | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/Providers/OpenAI/StreamTest.php b/tests/Providers/OpenAI/StreamTest.php index 6f71c9d8a..2354948a4 100644 --- a/tests/Providers/OpenAI/StreamTest.php +++ b/tests/Providers/OpenAI/StreamTest.php @@ -16,7 +16,9 @@ use Prism\Prism\Streaming\Events\StreamEndEvent; use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; +use Prism\Prism\Streaming\Events\TextCompleteEvent; use Prism\Prism\Streaming\Events\TextDeltaEvent; +use Prism\Prism\Streaming\Events\TextStartEvent; use Prism\Prism\Streaming\Events\ThinkingEvent; use Prism\Prism\Streaming\Events\ToolCallDeltaEvent; use Prism\Prism\Streaming\Events\ToolCallEvent; @@ -120,8 +122,8 @@ expect($providerToolEvents)->toBeEmpty(); // Verify only one StreamStartEvent and one StreamEndEvent - $streamStartEvents = array_filter($events, fn (\Prism\Prism\Streaming\Events\StreamEvent $event): bool => $event instanceof StreamStartEvent); - $streamEndEvents = array_filter($events, fn (\Prism\Prism\Streaming\Events\StreamEvent $event): bool => $event instanceof StreamEndEvent); + $streamStartEvents = array_filter($events, fn (StreamEvent $event): bool => $event instanceof StreamStartEvent); + $streamEndEvents = array_filter($events, fn (StreamEvent $event): bool => $event instanceof StreamEndEvent); expect($streamStartEvents)->toHaveCount(1); expect($streamEndEvents)->toHaveCount(1); @@ -400,7 +402,7 @@ expect($providerToolEvents)->not->toBeEmpty(); - $statuses = array_map(fn (\Prism\Prism\Streaming\Events\ProviderToolEvent $e): string => $e->status, $providerToolEvents); + $statuses = array_map(fn (ProviderToolEvent $e): string => $e->status, $providerToolEvents); expect($statuses)->toContain('in_progress'); expect($statuses)->toContain('generating'); expect($statuses)->toContain('completed'); @@ -769,9 +771,9 @@ $events[] = $event; } - $textStartEvents = array_filter($events, fn (\Prism\Prism\Streaming\Events\StreamEvent $e): bool => $e instanceof \Prism\Prism\Streaming\Events\TextStartEvent); - $textCompleteEvents = array_filter($events, fn (\Prism\Prism\Streaming\Events\StreamEvent $e): bool => $e instanceof \Prism\Prism\Streaming\Events\TextCompleteEvent); - $textDeltaEvents = array_filter($events, fn (\Prism\Prism\Streaming\Events\StreamEvent $e): bool => $e instanceof TextDeltaEvent); + $textStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof TextStartEvent); + $textCompleteEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof TextCompleteEvent); + $textDeltaEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof TextDeltaEvent); expect($textStartEvents)->toHaveCount(2); expect($textCompleteEvents)->toHaveCount(2);