diff --git a/src/Queue/ParallelQueue.php b/src/Queue/ParallelQueue.php index 35115f64..752ec3b6 100644 --- a/src/Queue/ParallelQueue.php +++ b/src/Queue/ParallelQueue.php @@ -4,7 +4,6 @@ namespace Phenix\Queue; -use Amp\Future; use Amp\Interval; use Amp\Parallel\Worker\Execution; use Amp\Parallel\Worker\WorkerPool; @@ -15,9 +14,10 @@ use Phenix\Tasks\Exceptions\FailedTaskException; use Phenix\Tasks\QueuableTask; use Phenix\Tasks\Result; +use Throwable; -use function Amp\async; -use function Amp\delay; +use function Amp\weakClosure; +use function count; class ParallelQueue extends Queue { @@ -80,7 +80,6 @@ public function popChunk(int $limit, string|null $queueName = null): array continue; } - // If reservation failed re-enqueue the task parent::push($task); } @@ -144,44 +143,66 @@ public function clear(): void private function initializeProcessor(): void { $this->processingStarted = true; + $this->processingInterval ??= new Interval($this->interval, weakClosure($this->handleIntervalTick(...))); + $this->processingInterval->disable(); - $this->processingInterval = new Interval($this->interval, function (): void { - $this->cleanupCompletedTasks(); - - if (! empty($this->runningTasks)) { - return; // Skip processing if tasks are still running - } - - $reservedTasks = $this->chunkProcessing - ? $this->popChunk($this->chunkSize) - : $this->processSingle(); - - if (empty($reservedTasks)) { - $this->disableProcessing(); + $this->isEnabled = false; + } - return; - } + private function handleIntervalTick(): void + { + $this->cleanupCompletedTasks(); - $executions = array_map(function (QueuableTask $task): Execution { - /** @var WorkerPool $pool */ - $pool = App::make(WorkerPool::class); + if (! empty($this->runningTasks)) { + return; + } - $timeout = new TimeoutCancellation($task->getTimeout()); + $batchSize = min($this->chunkSize, $this->maxConcurrency); - return $pool->submit($task, $timeout); - }, $reservedTasks); + $reservedTasks = $this->chunkProcessing + ? $this->popChunk($batchSize) + : $this->processSingle(); - $this->runningTasks = array_merge($this->runningTasks, $executions); + if (empty($reservedTasks)) { + $this->disableProcessing(); - $future = async(function () use ($reservedTasks, $executions): void { - $this->processTaskResults($reservedTasks, $executions); - }); + return; + } - $future->await(); - }); + $executions = array_map(function (QueuableTask $task): Execution { + /** @var WorkerPool $pool */ + $pool = App::make(WorkerPool::class); + + $timeout = new TimeoutCancellation($task->getTimeout()); + + return $pool->submit($task, $timeout); + }, $reservedTasks); + + $this->runningTasks = array_merge($this->runningTasks, $executions); + + foreach ($executions as $i => $execution) { + $task = $reservedTasks[$i]; + + $execution->getFuture() + ->ignore() + ->map(function (Result $result) use ($task): void { + if ($result->isSuccess()) { + $this->stateManager->complete($task); + } else { + $this->handleTaskFailure($task, $result->message()); + } + }) + ->catch(function (Throwable $error) use ($task): void { + $this->handleTaskFailure($task, $error->getMessage()); + }) + ->finally(function () use ($i): void { + unset($this->runningTasks[$i]); + + $this->stateManager->cleanupExpiredReservations(); + }); + } - $this->processingInterval->disable(); - $this->isEnabled = false; + $this->cleanupCompletedTasks(); } private function enableProcessing(): void @@ -227,39 +248,16 @@ private function getNextTask(): QueuableTask|null $taskId = $task->getTaskId(); $state = $this->stateManager->getTaskState($taskId); - // If task has no state or is available if ($state === null || ($state['available_at'] ?? 0) <= time()) { return $task; } - // If not available, re-enqueue the task parent::push($task); } return null; } - private function processTaskResults(array $tasks, array $executions): void - { - /** @var array $results */ - $results = Future\await(array_map( - fn (Execution $e): Future => $e->getFuture(), - $executions, - )); - - foreach ($results as $index => $result) { - $task = $tasks[$index]; - - if ($result->isSuccess()) { - $this->stateManager->complete($task); - } else { - $this->handleTaskFailure($task, $result->message()); - } - } - - $this->stateManager->cleanupExpiredReservations(); - } - private function cleanupCompletedTasks(): void { $completedTasks = []; @@ -286,8 +284,6 @@ private function handleTaskFailure(QueuableTask $task, string $message): void if ($task->getAttempts() < $maxRetries) { $this->stateManager->retry($task, $retryDelay); - delay($retryDelay); - parent::push($task); } else { $this->stateManager->fail($task, new FailedTaskException($message)); diff --git a/src/Tasks/Worker.php b/src/Tasks/Worker.php deleted file mode 100644 index 1af6b8e9..00000000 --- a/src/Tasks/Worker.php +++ /dev/null @@ -1,33 +0,0 @@ -worker = Workers\createWorker(); - } - - protected function prepareTask(Task $parallelTask): Workers\Execution - { - $timeout = new TimeoutCancellation($parallelTask->getTimeout()); - - return $this->worker->submit($parallelTask, $timeout); - } - - protected function finalize(): void - { - $this->worker->shutdown(); - } -} diff --git a/tests/Unit/Events/EventEmitterTest.php b/tests/Unit/Events/EventEmitterTest.php index 3e850427..91910f50 100644 --- a/tests/Unit/Events/EventEmitterTest.php +++ b/tests/Unit/Events/EventEmitterTest.php @@ -499,7 +499,6 @@ $called = false; EventFacade::on('fake.event', function () use (&$called): void { - dump('FAILING'); $called = true; }); diff --git a/tests/Unit/Queue/ParallelQueueTest.php b/tests/Unit/Queue/ParallelQueueTest.php index 492e1be0..6da0cf74 100644 --- a/tests/Unit/Queue/ParallelQueueTest.php +++ b/tests/Unit/Queue/ParallelQueueTest.php @@ -22,16 +22,12 @@ afterEach(function (): void { $driver = Queue::driver(); - $driver->clear(); - if ($driver instanceof ParallelQueue) { $driver->stop(); } }); it('pushes a task onto the parallel queue', function (): void { - Queue::clear(); - expect(Queue::pop())->toBeNull(); expect(Queue::getConnectionName())->toBe('default'); @@ -52,7 +48,6 @@ }); it('dispatches a task conditionally', function (): void { - Queue::clear(); BasicQueuableTask::dispatchIf(fn (): bool => true); $task = Queue::pop(); @@ -66,7 +61,6 @@ }); it('pushes a task onto a custom parallel queue', function (): void { - Queue::clear(); Queue::pushOn('custom-parallel', new BasicQueuableTask()); $task = Queue::pop('custom-parallel'); @@ -76,7 +70,6 @@ }); it('returns the correct size for parallel queue', function (): void { - Queue::clear(); Queue::push(new BasicQueuableTask()); $this->assertSame(1, Queue::size()); @@ -248,6 +241,7 @@ // Verify the queue size - should be 1 (running task) or 0 if already completed $size = $parallelQueue->size(); + $this->assertLessThanOrEqual(1, $size); $this->assertGreaterThanOrEqual(0, $size); }); @@ -270,8 +264,6 @@ $this->assertFalse($parallelQueue->isProcessing()); $this->assertSame(0, $parallelQueue->size()); $this->assertSame(0, $parallelQueue->getRunningTasksCount()); - - $parallelQueue->clear(); }); it('automatically disables processing after all tasks complete', function (): void { @@ -296,8 +288,6 @@ $this->assertSame(0, $status['pending_tasks']); $this->assertSame(0, $status['running_tasks']); $this->assertSame(0, $status['total_tasks']); - - $parallelQueue->clear(); }); it('handles chunk processing when no available tasks exist', function (): void { @@ -319,7 +309,6 @@ $this->assertTrue($parallelQueue->isProcessing()); $this->assertGreaterThan(0, $parallelQueue->size()); - $parallelQueue->clear(); $parallelQueue->stop(); }); @@ -368,25 +357,9 @@ $this->assertSame(10, $initialSize); // Allow some time for processing to start and potentially encounter reservation conflicts - delay(3.5); // Wait just a bit more than the interval time - - // Verify queue is still functioning properly despite any reservation conflicts - $currentSize = $parallelQueue->size(); - $this->assertGreaterThanOrEqual(0, $currentSize); - - // If tasks remain, processing should continue - if ($currentSize > 0) { - $this->assertTrue($parallelQueue->isProcessing()); - } - - // Wait for all tasks to complete - delay(12.0); - - // Eventually all tasks should be processed - $this->assertSame(0, $parallelQueue->size()); - $this->assertFalse($parallelQueue->isProcessing()); + delay(4.0); - $parallelQueue->clear(); + $this->assertLessThan(10, $parallelQueue->size()); }); it('handles task failures gracefully', function (): void { @@ -459,8 +432,6 @@ // Since the task isn't available yet, the processor should disable itself and re-enqueue the task $this->assertFalse($parallelQueue->isProcessing()); $this->assertSame(1, $parallelQueue->size()); - - $parallelQueue->clear(); }); it('re-enqueues the task when reservation fails inside getTaskChunk', function (): void { @@ -484,8 +455,6 @@ // Since reservation failed, it should have been re-enqueued and processing disabled $this->assertFalse($parallelQueue->isProcessing()); $this->assertSame(1, $parallelQueue->size()); - - $parallelQueue->clear(); }); it('process task in single mode', function (): void { @@ -519,8 +488,6 @@ $this->assertFalse($parallelQueue->isProcessing()); $this->assertSame(1, $parallelQueue->size()); - - $parallelQueue->clear(); }); it('logs pushed tasks when logging is enabled', function (): void { @@ -607,7 +574,6 @@ Queue::expect(BasicQueuableTask::class)->toPushNothing(); Config::set('app.env', 'local'); - Queue::clear(); }); it('does not log tasks when logging is disabled', function (): void { diff --git a/tests/Unit/Tasks/WorkerTest.php b/tests/Unit/Tasks/WorkerTest.php deleted file mode 100644 index 0c4f91f2..00000000 --- a/tests/Unit/Tasks/WorkerTest.php +++ /dev/null @@ -1,17 +0,0 @@ -push($task)->run(); - - expect($result->isSuccess())->toBeTrue(); - expect($result->output())->toBe('Task completed successfully'); -});