From 7852200c0fcd24d7b7dfa25034d48619a0d4f2c9 Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Wed, 10 Dec 2025 12:14:44 -0500 Subject: [PATCH 01/14] refactor: optimize processing interval handling in ParallelQueue --- src/Queue/ParallelQueue.php | 63 ++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/src/Queue/ParallelQueue.php b/src/Queue/ParallelQueue.php index 35115f64..f5bdec6e 100644 --- a/src/Queue/ParallelQueue.php +++ b/src/Queue/ParallelQueue.php @@ -17,7 +17,8 @@ use Phenix\Tasks\Result; use function Amp\async; -use function Amp\delay; +use function Amp\weakClosure; +use function count; class ParallelQueue extends Queue { @@ -144,44 +145,52 @@ public function clear(): void private function initializeProcessor(): void { $this->processingStarted = true; + $this->processingInterval = new Interval($this->interval, weakClosure($this->handleIntervalTick(...))); + $this->processingInterval->disable(); - $this->processingInterval = new Interval($this->interval, function (): void { - $this->cleanupCompletedTasks(); + $this->isEnabled = false; + } - if (! empty($this->runningTasks)) { - return; // Skip processing if tasks are still running - } + private function handleIntervalTick(): void + { + $this->cleanupCompletedTasks(); - $reservedTasks = $this->chunkProcessing - ? $this->popChunk($this->chunkSize) - : $this->processSingle(); + if (! empty($this->runningTasks)) { + return; // Skip processing if tasks are still running + } - if (empty($reservedTasks)) { - $this->disableProcessing(); + // Preserve batch-sequential characteristics, but cap batch size to maxConcurrency. + $batchSize = min($this->chunkSize, $this->maxConcurrency); - return; - } + $reservedTasks = $this->chunkProcessing + ? $this->popChunk($batchSize) + : $this->processSingle(); - $executions = array_map(function (QueuableTask $task): Execution { - /** @var WorkerPool $pool */ - $pool = App::make(WorkerPool::class); + if (empty($reservedTasks)) { + $this->disableProcessing(); - $timeout = new TimeoutCancellation($task->getTimeout()); + return; + } + + $executions = array_map(function (QueuableTask $task): Execution { + /** @var WorkerPool $pool */ + $pool = App::make(WorkerPool::class); - return $pool->submit($task, $timeout); - }, $reservedTasks); + $timeout = new TimeoutCancellation($task->getTimeout()); - $this->runningTasks = array_merge($this->runningTasks, $executions); + return $pool->submit($task, $timeout); + }, $reservedTasks); - $future = async(function () use ($reservedTasks, $executions): void { - $this->processTaskResults($reservedTasks, $executions); - }); + $this->runningTasks = array_merge($this->runningTasks, $executions); - $future->await(); + $future = async(function () use ($reservedTasks, $executions): void { + $this->processTaskResults($reservedTasks, $executions); }); - $this->processingInterval->disable(); - $this->isEnabled = false; + $future->await(); + + // Keep runningTasks accurate within the same tick. + $this->cleanupCompletedTasks(); } private function enableProcessing(): void @@ -286,8 +295,6 @@ private function handleTaskFailure(QueuableTask $task, string $message): void if ($task->getAttempts() < $maxRetries) { $this->stateManager->retry($task, $retryDelay); - delay($retryDelay); - parent::push($task); } else { $this->stateManager->fail($task, new FailedTaskException($message)); From 0217dbaba3376b91d00e5513b6246639424bdb0c Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Wed, 10 Dec 2025 12:15:28 -0500 Subject: [PATCH 02/14] refactor: remove redundant comments and streamline task handling in ParallelQueue --- src/Queue/ParallelQueue.php | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/Queue/ParallelQueue.php b/src/Queue/ParallelQueue.php index f5bdec6e..cf874d1d 100644 --- a/src/Queue/ParallelQueue.php +++ b/src/Queue/ParallelQueue.php @@ -81,7 +81,6 @@ public function popChunk(int $limit, string|null $queueName = null): array continue; } - // If reservation failed re-enqueue the task parent::push($task); } @@ -156,10 +155,9 @@ private function handleIntervalTick(): void $this->cleanupCompletedTasks(); if (! empty($this->runningTasks)) { - return; // Skip processing if tasks are still running + return; } - // Preserve batch-sequential characteristics, but cap batch size to maxConcurrency. $batchSize = min($this->chunkSize, $this->maxConcurrency); $reservedTasks = $this->chunkProcessing @@ -189,7 +187,6 @@ private function handleIntervalTick(): void $future->await(); - // Keep runningTasks accurate within the same tick. $this->cleanupCompletedTasks(); } @@ -236,12 +233,10 @@ private function getNextTask(): QueuableTask|null $taskId = $task->getTaskId(); $state = $this->stateManager->getTaskState($taskId); - // If task has no state or is available if ($state === null || ($state['available_at'] ?? 0) <= time()) { return $task; } - // If not available, re-enqueue the task parent::push($task); } From ec2aa9043c527a7966aa825ad5dd14268177c07c Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Wed, 10 Dec 2025 17:05:21 -0500 Subject: [PATCH 03/14] refactor: streamline task result handling in ParallelQueue and remove deprecated processTaskResults method --- src/Queue/ParallelQueue.php | 48 ++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/src/Queue/ParallelQueue.php b/src/Queue/ParallelQueue.php index cf874d1d..55a91fc8 100644 --- a/src/Queue/ParallelQueue.php +++ b/src/Queue/ParallelQueue.php @@ -15,8 +15,8 @@ use Phenix\Tasks\Exceptions\FailedTaskException; use Phenix\Tasks\QueuableTask; use Phenix\Tasks\Result; +use Throwable; -use function Amp\async; use function Amp\weakClosure; use function count; @@ -181,11 +181,26 @@ private function handleIntervalTick(): void $this->runningTasks = array_merge($this->runningTasks, $executions); - $future = async(function () use ($reservedTasks, $executions): void { - $this->processTaskResults($reservedTasks, $executions); - }); - - $future->await(); + foreach ($executions as $i => $execution) { + $task = $reservedTasks[$i]; + + $execution->getFuture() + ->map(function (Result $result) use ($task): void { + if ($result->isSuccess()) { + $this->stateManager->complete($task); + } else { + $this->handleTaskFailure($task, $result->message()); + } + }) + ->catch(function (Throwable $error) use ($task): void { + $this->handleTaskFailure($task, $error->getMessage()); + }) + ->finally(function () use ($i): void { + unset($this->runningTasks[$i]); + + $this->stateManager->cleanupExpiredReservations(); + }); + } $this->cleanupCompletedTasks(); } @@ -243,27 +258,6 @@ private function getNextTask(): QueuableTask|null return null; } - private function processTaskResults(array $tasks, array $executions): void - { - /** @var array $results */ - $results = Future\await(array_map( - fn (Execution $e): Future => $e->getFuture(), - $executions, - )); - - foreach ($results as $index => $result) { - $task = $tasks[$index]; - - if ($result->isSuccess()) { - $this->stateManager->complete($task); - } else { - $this->handleTaskFailure($task, $result->message()); - } - } - - $this->stateManager->cleanupExpiredReservations(); - } - private function cleanupCompletedTasks(): void { $completedTasks = []; From b0c2b395638477fd47217b44f4b254c7da193ea0 Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Wed, 10 Dec 2025 17:13:42 -0500 Subject: [PATCH 04/14] style: php cs --- src/Queue/ParallelQueue.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Queue/ParallelQueue.php b/src/Queue/ParallelQueue.php index 55a91fc8..fe569b50 100644 --- a/src/Queue/ParallelQueue.php +++ b/src/Queue/ParallelQueue.php @@ -4,7 +4,6 @@ namespace Phenix\Queue; -use Amp\Future; use Amp\Interval; use Amp\Parallel\Worker\Execution; use Amp\Parallel\Worker\WorkerPool; From de7e06a1692ac6b16cf47561df59d706255bd186 Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Wed, 10 Dec 2025 20:10:47 -0500 Subject: [PATCH 05/14] refactor: remove debug output from EventEmitterTest --- tests/Unit/Events/EventEmitterTest.php | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/Unit/Events/EventEmitterTest.php b/tests/Unit/Events/EventEmitterTest.php index 3e850427..91910f50 100644 --- a/tests/Unit/Events/EventEmitterTest.php +++ b/tests/Unit/Events/EventEmitterTest.php @@ -499,7 +499,6 @@ $called = false; EventFacade::on('fake.event', function () use (&$called): void { - dump('FAILING'); $called = true; }); From 51f9fbc8d8ddb423ce934ce01d640ba4d44484aa Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Wed, 10 Dec 2025 21:02:12 -0500 Subject: [PATCH 06/14] refactor: prevent forward unhandled errors to the event loop handler --- src/Queue/ParallelQueue.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Queue/ParallelQueue.php b/src/Queue/ParallelQueue.php index fe569b50..8d628487 100644 --- a/src/Queue/ParallelQueue.php +++ b/src/Queue/ParallelQueue.php @@ -184,6 +184,7 @@ private function handleIntervalTick(): void $task = $reservedTasks[$i]; $execution->getFuture() + ->ignore() ->map(function (Result $result) use ($task): void { if ($result->isSuccess()) { $this->stateManager->complete($task); From 13c267b9f73993ef21ee0c7199b84451007e05b6 Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Thu, 11 Dec 2025 19:23:14 -0500 Subject: [PATCH 07/14] refactor: enhance finalize method and optimize processing interval initialization --- src/Queue/ParallelQueue.php | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/Queue/ParallelQueue.php b/src/Queue/ParallelQueue.php index 8d628487..fc502b3c 100644 --- a/src/Queue/ParallelQueue.php +++ b/src/Queue/ParallelQueue.php @@ -140,10 +140,16 @@ public function clear(): void $this->runningTasks = []; } + public function finalize(): void + { + unset($this->processingInterval); + $this->processingInterval = null; + } + private function initializeProcessor(): void { $this->processingStarted = true; - $this->processingInterval = new Interval($this->interval, weakClosure($this->handleIntervalTick(...))); + $this->processingInterval ??= new Interval($this->interval, weakClosure($this->handleIntervalTick(...))); $this->processingInterval->disable(); $this->isEnabled = false; From 2688112562f165002a623364006401021670e7a6 Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Thu, 11 Dec 2025 19:23:22 -0500 Subject: [PATCH 08/14] refactor: remove Worker class and associated tests to streamline task processing --- src/Tasks/Worker.php | 33 --------------------------------- tests/Unit/Tasks/WorkerTest.php | 17 ----------------- 2 files changed, 50 deletions(-) delete mode 100644 src/Tasks/Worker.php delete mode 100644 tests/Unit/Tasks/WorkerTest.php diff --git a/src/Tasks/Worker.php b/src/Tasks/Worker.php deleted file mode 100644 index 1af6b8e9..00000000 --- a/src/Tasks/Worker.php +++ /dev/null @@ -1,33 +0,0 @@ -worker = Workers\createWorker(); - } - - protected function prepareTask(Task $parallelTask): Workers\Execution - { - $timeout = new TimeoutCancellation($parallelTask->getTimeout()); - - return $this->worker->submit($parallelTask, $timeout); - } - - protected function finalize(): void - { - $this->worker->shutdown(); - } -} diff --git a/tests/Unit/Tasks/WorkerTest.php b/tests/Unit/Tasks/WorkerTest.php deleted file mode 100644 index 0c4f91f2..00000000 --- a/tests/Unit/Tasks/WorkerTest.php +++ /dev/null @@ -1,17 +0,0 @@ -push($task)->run(); - - expect($result->isSuccess())->toBeTrue(); - expect($result->output())->toBe('Task completed successfully'); -}); From 35b6e5f8e7b686521a3521562db06efcc3280781 Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Thu, 11 Dec 2025 19:23:30 -0500 Subject: [PATCH 09/14] refactor: add finalize calls to ensure proper cleanup in parallel queue tests --- tests/Unit/Queue/ParallelQueueTest.php | 47 ++++++++++++++++--------- tests/Unit/Queue/WorkerParallelTest.php | 6 ++++ 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/tests/Unit/Queue/ParallelQueueTest.php b/tests/Unit/Queue/ParallelQueueTest.php index 492e1be0..53cbfc2f 100644 --- a/tests/Unit/Queue/ParallelQueueTest.php +++ b/tests/Unit/Queue/ParallelQueueTest.php @@ -110,6 +110,8 @@ $this->assertTrue($parallelQueue->isProcessing()); $this->assertSame(1, $parallelQueue->size()); + + $parallelQueue->finalize(); }); it('can manually start and stop processing', function (): void { @@ -128,6 +130,8 @@ // Stop manually $parallelQueue->stop(); $this->assertFalse($parallelQueue->isProcessing()); + + $parallelQueue->finalize(); }); it('processes tasks using interval without blocking', function (): void { @@ -156,6 +160,8 @@ // Some tasks may have been processed $this->assertLessThanOrEqual(5, $parallelQueue->size()); + + $parallelQueue->finalize(); }); it('automatically stops processing when no tasks remain', function (): void { @@ -173,6 +179,8 @@ // There should be no pending tasks $this->assertSame(0, $parallelQueue->getRunningTasksCount()); + + $parallelQueue->finalize(); }); it('provides detailed processor status', function (): void { @@ -198,6 +206,8 @@ $status = $parallelQueue->getProcessorStatus(); $this->assertTrue($status['is_processing']); $this->assertSame(2, $status['total_tasks']); + + $parallelQueue->finalize(); }); it('works correctly with the HTTP server without blocking', function (): void { @@ -233,6 +243,8 @@ // Verify that tasks were added $this->assertSame(10, $parallelQueue->size()); $this->assertTrue($parallelQueue->isProcessing()); + + $parallelQueue->finalize(); }); it('skips processing new tasks when previous tasks are still running', function (): void { @@ -248,8 +260,11 @@ // Verify the queue size - should be 1 (running task) or 0 if already completed $size = $parallelQueue->size(); + $this->assertLessThanOrEqual(1, $size); $this->assertGreaterThanOrEqual(0, $size); + + $parallelQueue->finalize(); }); it('automatically disables processing when no tasks are available to reserve', function (): void { @@ -272,6 +287,7 @@ $this->assertSame(0, $parallelQueue->getRunningTasksCount()); $parallelQueue->clear(); + $parallelQueue->finalize(); }); it('automatically disables processing after all tasks complete', function (): void { @@ -298,6 +314,7 @@ $this->assertSame(0, $status['total_tasks']); $parallelQueue->clear(); + $parallelQueue->finalize(); }); it('handles chunk processing when no available tasks exist', function (): void { @@ -321,6 +338,7 @@ $parallelQueue->clear(); $parallelQueue->stop(); + $parallelQueue->finalize(); }); it('re-enqueues tasks that cannot be reserved during chunk processing', function (): void { @@ -353,6 +371,9 @@ // All tasks should eventually be processed or re-enqueued appropriately $this->assertGreaterThanOrEqual(0, $parallelQueue->size()); + + $parallelQueue->clear(); + $parallelQueue->finalize(); }); it('handles concurrent task reservation attempts correctly', function (): void { @@ -368,25 +389,12 @@ $this->assertSame(10, $initialSize); // Allow some time for processing to start and potentially encounter reservation conflicts - delay(3.5); // Wait just a bit more than the interval time - - // Verify queue is still functioning properly despite any reservation conflicts - $currentSize = $parallelQueue->size(); - $this->assertGreaterThanOrEqual(0, $currentSize); - - // If tasks remain, processing should continue - if ($currentSize > 0) { - $this->assertTrue($parallelQueue->isProcessing()); - } - - // Wait for all tasks to complete - delay(12.0); + delay(4.0); - // Eventually all tasks should be processed - $this->assertSame(0, $parallelQueue->size()); - $this->assertFalse($parallelQueue->isProcessing()); + $this->assertLessThan(10, $parallelQueue->size()); $parallelQueue->clear(); + $parallelQueue->finalize(); }); it('handles task failures gracefully', function (): void { @@ -409,6 +417,8 @@ $this->assertFalse($parallelQueue->isProcessing()); $this->assertSame(0, $parallelQueue->size()); // Task should have been removed after processing + + $parallelQueue->finalize(); }); it('prevent reserve the same task in task state management', function (): void { @@ -461,6 +471,7 @@ $this->assertSame(1, $parallelQueue->size()); $parallelQueue->clear(); + $parallelQueue->finalize(); }); it('re-enqueues the task when reservation fails inside getTaskChunk', function (): void { @@ -486,6 +497,7 @@ $this->assertSame(1, $parallelQueue->size()); $parallelQueue->clear(); + $parallelQueue->finalize(); }); it('process task in single mode', function (): void { @@ -500,6 +512,8 @@ $this->assertFalse($parallelQueue->isProcessing()); $this->assertSame(0, $parallelQueue->size()); + + $parallelQueue->finalize(); }); it('re-enqueues the task when reservation fails in single processing mode', function (): void { @@ -521,6 +535,7 @@ $this->assertSame(1, $parallelQueue->size()); $parallelQueue->clear(); + $parallelQueue->finalize(); }); it('logs pushed tasks when logging is enabled', function (): void { diff --git a/tests/Unit/Queue/WorkerParallelTest.php b/tests/Unit/Queue/WorkerParallelTest.php index 6322d307..b4114ae7 100644 --- a/tests/Unit/Queue/WorkerParallelTest.php +++ b/tests/Unit/Queue/WorkerParallelTest.php @@ -183,6 +183,8 @@ protected function processTask(QueuableTask $task, WorkerOptions $options, Outpu $buffer = $output->fetch(); $this->assertStringContainsString('success: ' . BasicQueuableTask::class . ' processed', $buffer); + + $parallelQueue->finalize(); }); it('processes a chunk via runOnce when chunk mode enabled', function (): void { @@ -210,6 +212,8 @@ protected function processTask(QueuableTask $task, WorkerOptions $options, Outpu $buffer = $output->fetch(); $this->assertStringContainsString('success: ' . BasicQueuableTask::class . ' processed', $buffer); + + $parallelQueue->finalize(); }); it('retries failing tasks in chunk mode', function (): void { @@ -238,6 +242,8 @@ protected function processTask(QueuableTask $task, WorkerOptions $options, Outpu $buffer = $output->fetch(); expect($buffer)->toContain('success: ' . BasicQueuableTask::class . ' processed'); expect($buffer)->toContain('failed'); + + $parallelQueue->finalize(); }); it('cleans up and sleeps when no tasks in chunk mode, then stops', function (): void { From 50f2deea1d710ffb41aa97fbd8b2d32fab90365f Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Fri, 12 Dec 2025 08:30:16 -0500 Subject: [PATCH 10/14] refactor: remove redundant clear call before finalizing parallel queue --- tests/Unit/Queue/ParallelQueueTest.php | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/Unit/Queue/ParallelQueueTest.php b/tests/Unit/Queue/ParallelQueueTest.php index 53cbfc2f..7a711243 100644 --- a/tests/Unit/Queue/ParallelQueueTest.php +++ b/tests/Unit/Queue/ParallelQueueTest.php @@ -372,7 +372,6 @@ // All tasks should eventually be processed or re-enqueued appropriately $this->assertGreaterThanOrEqual(0, $parallelQueue->size()); - $parallelQueue->clear(); $parallelQueue->finalize(); }); From 2f1f3cf3c5d7bcd920505ec687cbff88642681ca Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Fri, 12 Dec 2025 11:47:46 -0500 Subject: [PATCH 11/14] refactor: streamline task failure handling and cleanup in handleIntervalTick method --- src/Queue/ParallelQueue.php | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/Queue/ParallelQueue.php b/src/Queue/ParallelQueue.php index fc502b3c..8c8d9900 100644 --- a/src/Queue/ParallelQueue.php +++ b/src/Queue/ParallelQueue.php @@ -201,14 +201,8 @@ private function handleIntervalTick(): void ->catch(function (Throwable $error) use ($task): void { $this->handleTaskFailure($task, $error->getMessage()); }) - ->finally(function () use ($i): void { - unset($this->runningTasks[$i]); - - $this->stateManager->cleanupExpiredReservations(); - }); + ->finally($this->stateManager->cleanupExpiredReservations(...)); } - - $this->cleanupCompletedTasks(); } private function enableProcessing(): void From b08bf72c9e399cabe10e440055f95f89a64c30f4 Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Fri, 12 Dec 2025 11:48:02 -0500 Subject: [PATCH 12/14] refactor: remove finalize method --- src/Queue/ParallelQueue.php | 6 ------ tests/Unit/Queue/ParallelQueueTest.php | 27 ------------------------- tests/Unit/Queue/WorkerParallelTest.php | 6 ------ 3 files changed, 39 deletions(-) diff --git a/src/Queue/ParallelQueue.php b/src/Queue/ParallelQueue.php index 8c8d9900..28101c90 100644 --- a/src/Queue/ParallelQueue.php +++ b/src/Queue/ParallelQueue.php @@ -140,12 +140,6 @@ public function clear(): void $this->runningTasks = []; } - public function finalize(): void - { - unset($this->processingInterval); - $this->processingInterval = null; - } - private function initializeProcessor(): void { $this->processingStarted = true; diff --git a/tests/Unit/Queue/ParallelQueueTest.php b/tests/Unit/Queue/ParallelQueueTest.php index 7a711243..a804897b 100644 --- a/tests/Unit/Queue/ParallelQueueTest.php +++ b/tests/Unit/Queue/ParallelQueueTest.php @@ -110,8 +110,6 @@ $this->assertTrue($parallelQueue->isProcessing()); $this->assertSame(1, $parallelQueue->size()); - - $parallelQueue->finalize(); }); it('can manually start and stop processing', function (): void { @@ -130,8 +128,6 @@ // Stop manually $parallelQueue->stop(); $this->assertFalse($parallelQueue->isProcessing()); - - $parallelQueue->finalize(); }); it('processes tasks using interval without blocking', function (): void { @@ -160,8 +156,6 @@ // Some tasks may have been processed $this->assertLessThanOrEqual(5, $parallelQueue->size()); - - $parallelQueue->finalize(); }); it('automatically stops processing when no tasks remain', function (): void { @@ -179,8 +173,6 @@ // There should be no pending tasks $this->assertSame(0, $parallelQueue->getRunningTasksCount()); - - $parallelQueue->finalize(); }); it('provides detailed processor status', function (): void { @@ -206,8 +198,6 @@ $status = $parallelQueue->getProcessorStatus(); $this->assertTrue($status['is_processing']); $this->assertSame(2, $status['total_tasks']); - - $parallelQueue->finalize(); }); it('works correctly with the HTTP server without blocking', function (): void { @@ -243,8 +233,6 @@ // Verify that tasks were added $this->assertSame(10, $parallelQueue->size()); $this->assertTrue($parallelQueue->isProcessing()); - - $parallelQueue->finalize(); }); it('skips processing new tasks when previous tasks are still running', function (): void { @@ -263,8 +251,6 @@ $this->assertLessThanOrEqual(1, $size); $this->assertGreaterThanOrEqual(0, $size); - - $parallelQueue->finalize(); }); it('automatically disables processing when no tasks are available to reserve', function (): void { @@ -287,7 +273,6 @@ $this->assertSame(0, $parallelQueue->getRunningTasksCount()); $parallelQueue->clear(); - $parallelQueue->finalize(); }); it('automatically disables processing after all tasks complete', function (): void { @@ -314,7 +299,6 @@ $this->assertSame(0, $status['total_tasks']); $parallelQueue->clear(); - $parallelQueue->finalize(); }); it('handles chunk processing when no available tasks exist', function (): void { @@ -338,7 +322,6 @@ $parallelQueue->clear(); $parallelQueue->stop(); - $parallelQueue->finalize(); }); it('re-enqueues tasks that cannot be reserved during chunk processing', function (): void { @@ -371,8 +354,6 @@ // All tasks should eventually be processed or re-enqueued appropriately $this->assertGreaterThanOrEqual(0, $parallelQueue->size()); - - $parallelQueue->finalize(); }); it('handles concurrent task reservation attempts correctly', function (): void { @@ -393,7 +374,6 @@ $this->assertLessThan(10, $parallelQueue->size()); $parallelQueue->clear(); - $parallelQueue->finalize(); }); it('handles task failures gracefully', function (): void { @@ -416,8 +396,6 @@ $this->assertFalse($parallelQueue->isProcessing()); $this->assertSame(0, $parallelQueue->size()); // Task should have been removed after processing - - $parallelQueue->finalize(); }); it('prevent reserve the same task in task state management', function (): void { @@ -470,7 +448,6 @@ $this->assertSame(1, $parallelQueue->size()); $parallelQueue->clear(); - $parallelQueue->finalize(); }); it('re-enqueues the task when reservation fails inside getTaskChunk', function (): void { @@ -496,7 +473,6 @@ $this->assertSame(1, $parallelQueue->size()); $parallelQueue->clear(); - $parallelQueue->finalize(); }); it('process task in single mode', function (): void { @@ -511,8 +487,6 @@ $this->assertFalse($parallelQueue->isProcessing()); $this->assertSame(0, $parallelQueue->size()); - - $parallelQueue->finalize(); }); it('re-enqueues the task when reservation fails in single processing mode', function (): void { @@ -534,7 +508,6 @@ $this->assertSame(1, $parallelQueue->size()); $parallelQueue->clear(); - $parallelQueue->finalize(); }); it('logs pushed tasks when logging is enabled', function (): void { diff --git a/tests/Unit/Queue/WorkerParallelTest.php b/tests/Unit/Queue/WorkerParallelTest.php index b4114ae7..6322d307 100644 --- a/tests/Unit/Queue/WorkerParallelTest.php +++ b/tests/Unit/Queue/WorkerParallelTest.php @@ -183,8 +183,6 @@ protected function processTask(QueuableTask $task, WorkerOptions $options, Outpu $buffer = $output->fetch(); $this->assertStringContainsString('success: ' . BasicQueuableTask::class . ' processed', $buffer); - - $parallelQueue->finalize(); }); it('processes a chunk via runOnce when chunk mode enabled', function (): void { @@ -212,8 +210,6 @@ protected function processTask(QueuableTask $task, WorkerOptions $options, Outpu $buffer = $output->fetch(); $this->assertStringContainsString('success: ' . BasicQueuableTask::class . ' processed', $buffer); - - $parallelQueue->finalize(); }); it('retries failing tasks in chunk mode', function (): void { @@ -242,8 +238,6 @@ protected function processTask(QueuableTask $task, WorkerOptions $options, Outpu $buffer = $output->fetch(); expect($buffer)->toContain('success: ' . BasicQueuableTask::class . ' processed'); expect($buffer)->toContain('failed'); - - $parallelQueue->finalize(); }); it('cleans up and sleeps when no tasks in chunk mode, then stops', function (): void { From b85eb8a4c31a8a4611377866ee0419c74d51b04b Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Fri, 12 Dec 2025 12:59:51 -0500 Subject: [PATCH 13/14] refactor: improve task cleanup in handleIntervalTick method --- src/Queue/ParallelQueue.php | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/Queue/ParallelQueue.php b/src/Queue/ParallelQueue.php index 28101c90..752ec3b6 100644 --- a/src/Queue/ParallelQueue.php +++ b/src/Queue/ParallelQueue.php @@ -195,8 +195,14 @@ private function handleIntervalTick(): void ->catch(function (Throwable $error) use ($task): void { $this->handleTaskFailure($task, $error->getMessage()); }) - ->finally($this->stateManager->cleanupExpiredReservations(...)); + ->finally(function () use ($i): void { + unset($this->runningTasks[$i]); + + $this->stateManager->cleanupExpiredReservations(); + }); } + + $this->cleanupCompletedTasks(); } private function enableProcessing(): void From a1a6e6248b1dcee135d2fc93eb506cdd68ff1f4d Mon Sep 17 00:00:00 2001 From: barbosa89 Date: Fri, 12 Dec 2025 13:30:12 -0500 Subject: [PATCH 14/14] refactor: remove redundant clear calls in parallel queue tests --- tests/Unit/Queue/ParallelQueueTest.php | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/tests/Unit/Queue/ParallelQueueTest.php b/tests/Unit/Queue/ParallelQueueTest.php index a804897b..6da0cf74 100644 --- a/tests/Unit/Queue/ParallelQueueTest.php +++ b/tests/Unit/Queue/ParallelQueueTest.php @@ -22,16 +22,12 @@ afterEach(function (): void { $driver = Queue::driver(); - $driver->clear(); - if ($driver instanceof ParallelQueue) { $driver->stop(); } }); it('pushes a task onto the parallel queue', function (): void { - Queue::clear(); - expect(Queue::pop())->toBeNull(); expect(Queue::getConnectionName())->toBe('default'); @@ -52,7 +48,6 @@ }); it('dispatches a task conditionally', function (): void { - Queue::clear(); BasicQueuableTask::dispatchIf(fn (): bool => true); $task = Queue::pop(); @@ -66,7 +61,6 @@ }); it('pushes a task onto a custom parallel queue', function (): void { - Queue::clear(); Queue::pushOn('custom-parallel', new BasicQueuableTask()); $task = Queue::pop('custom-parallel'); @@ -76,7 +70,6 @@ }); it('returns the correct size for parallel queue', function (): void { - Queue::clear(); Queue::push(new BasicQueuableTask()); $this->assertSame(1, Queue::size()); @@ -271,8 +264,6 @@ $this->assertFalse($parallelQueue->isProcessing()); $this->assertSame(0, $parallelQueue->size()); $this->assertSame(0, $parallelQueue->getRunningTasksCount()); - - $parallelQueue->clear(); }); it('automatically disables processing after all tasks complete', function (): void { @@ -297,8 +288,6 @@ $this->assertSame(0, $status['pending_tasks']); $this->assertSame(0, $status['running_tasks']); $this->assertSame(0, $status['total_tasks']); - - $parallelQueue->clear(); }); it('handles chunk processing when no available tasks exist', function (): void { @@ -320,7 +309,6 @@ $this->assertTrue($parallelQueue->isProcessing()); $this->assertGreaterThan(0, $parallelQueue->size()); - $parallelQueue->clear(); $parallelQueue->stop(); }); @@ -372,8 +360,6 @@ delay(4.0); $this->assertLessThan(10, $parallelQueue->size()); - - $parallelQueue->clear(); }); it('handles task failures gracefully', function (): void { @@ -446,8 +432,6 @@ // Since the task isn't available yet, the processor should disable itself and re-enqueue the task $this->assertFalse($parallelQueue->isProcessing()); $this->assertSame(1, $parallelQueue->size()); - - $parallelQueue->clear(); }); it('re-enqueues the task when reservation fails inside getTaskChunk', function (): void { @@ -471,8 +455,6 @@ // Since reservation failed, it should have been re-enqueued and processing disabled $this->assertFalse($parallelQueue->isProcessing()); $this->assertSame(1, $parallelQueue->size()); - - $parallelQueue->clear(); }); it('process task in single mode', function (): void { @@ -506,8 +488,6 @@ $this->assertFalse($parallelQueue->isProcessing()); $this->assertSame(1, $parallelQueue->size()); - - $parallelQueue->clear(); }); it('logs pushed tasks when logging is enabled', function (): void { @@ -594,7 +574,6 @@ Queue::expect(BasicQueuableTask::class)->toPushNothing(); Config::set('app.env', 'local'); - Queue::clear(); }); it('does not log tasks when logging is disabled', function (): void {