diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0bda366..3c02c2b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,3 +10,5 @@ jobs: - uses: actions/checkout@v4 - uses: php-actions/composer@v6 # or alternative dependency management - uses: php-actions/phpunit@v4 + - name: Run PHP CS Fixer + run: php vendor/bin/php-cs-fixer fix --dry-run --diff \ No newline at end of file diff --git a/.gitignore b/.gitignore index 95b35eb..d22ccb5 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ composer.lock .idea .phpunit.cache .php-version +.php-cs-fixer.cache \ No newline at end of file diff --git a/.php-cs-fixer.dist.php b/.php-cs-fixer.dist.php new file mode 100644 index 0000000..385218d --- /dev/null +++ b/.php-cs-fixer.dist.php @@ -0,0 +1,49 @@ +in('src') + //->in('tests') + ->files()->name('*.php'); + +$config = new PhpCsFixer\Config(); +$config->setRules([ + '@Symfony' => true, + '@Symfony:risky' => true, + '@PSR12' => true, + 'array_syntax' => [ + 'syntax' => 'short', + ], + 'combine_consecutive_unsets' => true, + 'native_function_invocation' => [ + 'include' => [ + '@compiler_optimized', + ], + ], + 'no_extra_blank_lines' => [ + 'tokens' => [ + 'break', + 'continue', + 'extra', + 'return', + 'throw', + 'use', + 'parenthesis_brace_block', + 'square_brace_block', + 'curly_brace_block', + ], + ], + 'ordered_class_elements' => true, + 'ordered_imports' => true, + 'yoda_style' => [ + 'equal' => false, + 'identical' => false, + 'less_and_greater' => false, + 'always_move_variable' => false, + ], +]) + ->setRiskyAllowed(true) + ->setFinder( + $finder + ); + +return $config; diff --git a/.php_cs.dist b/.php_cs.dist deleted file mode 100644 index d4e742b..0000000 --- a/.php_cs.dist +++ /dev/null @@ -1,12 +0,0 @@ -in(__DIR__.'/src'); - -return PhpCsFixer\Config::create() - ->setRules([ - '@Symfony' => true, - 'declare_strict_types' => true, - ]) - ->setFinder($finder) - ->setRiskyAllowed(true) -; diff --git a/CHANGELOG.md b/CHANGELOG.md index 92d1194..f0a7568 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +# Version 5.3.0 +* Added auto update count processed item while running job + # Version 5.2.0 * Added custom index for job status diff --git a/Tests/DataflowType/Writer/CollectionWriterTest.php b/Tests/DataflowType/Writer/CollectionWriterTest.php index 115ff0c..680897b 100644 --- a/Tests/DataflowType/Writer/CollectionWriterTest.php +++ b/Tests/DataflowType/Writer/CollectionWriterTest.php @@ -44,9 +44,7 @@ public function testAll() $embeddedWriter ->expects($matcher) ->method('write') - ->with($this->callback(function ($arg) use ($matcher, $values) { - return $arg === $values[$matcher->numberOfInvocations() - 1]; - })) + ->with($this->callback(fn($arg) => $arg === $values[$matcher->numberOfInvocations() - 1])) ; $writer = new CollectionWriter($embeddedWriter); diff --git a/Tests/DataflowType/Writer/PortWriterAdapterTest.php b/Tests/DataflowType/Writer/PortWriterAdapterTest.php index 9aa86e3..9cded58 100644 --- a/Tests/DataflowType/Writer/PortWriterAdapterTest.php +++ b/Tests/DataflowType/Writer/PortWriterAdapterTest.php @@ -11,7 +11,7 @@ public function testAll() { $value = 'not an array'; - $writer = $this->getMockBuilder('\Port\Writer') + $writer = $this->getMockBuilder(\Port\Writer::class) ->onlyMethods(['prepare', 'finish', 'writeItem']) ->getMock() ; diff --git a/Tests/Manager/ScheduledDataflowManagerTest.php b/Tests/Manager/ScheduledDataflowManagerTest.php index 8a33d84..335e200 100644 --- a/Tests/Manager/ScheduledDataflowManagerTest.php +++ b/Tests/Manager/ScheduledDataflowManagerTest.php @@ -49,15 +49,10 @@ public function testCreateJobsFromScheduledDataflows() $this->jobRepository ->expects($matcher) ->method('findPendingForScheduledDataflow') - ->with($this->callback(function ($arg) use ($matcher, $scheduled1, $scheduled2) { - switch ($matcher->numberOfInvocations()) { - case 1: - return $arg === $scheduled1; - case 2: - return $arg === $scheduled2; - default: - return false; - } + ->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) { + 1 => $arg === $scheduled1, + 2 => $arg === $scheduled2, + default => false, })) ->willReturnOnConsecutiveCalls(new Job(), null) ; diff --git a/Tests/Processor/JobProcessorTest.php b/Tests/Processor/JobProcessorTest.php index ab112de..f1879ee 100644 --- a/Tests/Processor/JobProcessorTest.php +++ b/Tests/Processor/JobProcessorTest.php @@ -44,18 +44,11 @@ public function testProcess() ->expects($matcher) ->method('dispatch') ->with( - $this->callback(function ($arg) use ($job) { - return $arg instanceof ProcessingEvent && $arg->getJob() === $job; - }), - $this->callback(function ($arg) use ($matcher) { - switch ($matcher->numberOfInvocations()) { - case 1: - return $arg === Events::BEFORE_PROCESSING; - case 2: - return $arg === Events::AFTER_PROCESSING; - default: - return false; - } + $this->callback(fn($arg) => $arg instanceof ProcessingEvent && $arg->getJob() === $job), + $this->callback(fn($arg) => match ($matcher->numberOfInvocations()) { + 1 => $arg === Events::BEFORE_PROCESSING, + 2 => $arg === Events::AFTER_PROCESSING, + default => false, }) ); diff --git a/Tests/Runner/MessengerDataflowRunnerTest.php b/Tests/Runner/MessengerDataflowRunnerTest.php index 6a17726..347eb26 100644 --- a/Tests/Runner/MessengerDataflowRunnerTest.php +++ b/Tests/Runner/MessengerDataflowRunnerTest.php @@ -39,15 +39,10 @@ public function testRunPendingDataflows() $this->repository ->expects($matcher) ->method('save') - ->with($this->callback(function ($arg) use ($matcher, $job1, $job2) { - switch ($matcher->numberOfInvocations()) { - case 1: - return $arg === $job1; - case 2: - return $arg === $job2; - default: - return false; - } + ->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) { + 1 => $arg === $job1, + 2 => $arg === $job2, + default => false, })) ; @@ -55,15 +50,10 @@ public function testRunPendingDataflows() $this->bus ->expects($matcher) ->method('dispatch') - ->with($this->callback(function ($arg) use ($matcher, $id1, $id2) { - switch ($matcher->numberOfInvocations()) { - case 1: - return $arg instanceof JobMessage && $arg->getJobId() === $id1; - case 2: - return $arg instanceof JobMessage && $arg->getJobId() === $id2; - default: - return false; - } + ->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) { + 1 => $arg instanceof JobMessage && $arg->getJobId() === $id1, + 2 => $arg instanceof JobMessage && $arg->getJobId() === $id2, + default => false, })) ->willReturnOnConsecutiveCalls( new Envelope(new JobMessage($id1)), diff --git a/Tests/Runner/PendingDataflowRunnerTest.php b/Tests/Runner/PendingDataflowRunnerTest.php index 383d081..37183df 100644 --- a/Tests/Runner/PendingDataflowRunnerTest.php +++ b/Tests/Runner/PendingDataflowRunnerTest.php @@ -38,15 +38,10 @@ public function testRunPendingDataflows() $this->processor ->expects($matcher) ->method('process') - ->with($this->callback(function ($arg) use ($matcher, $job1, $job2) { - switch ($matcher->numberOfInvocations()) { - case 1: - return $arg === $job1; - case 2: - return $arg === $job2; - default: - return false; - } + ->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) { + 1 => $arg === $job1, + 2 => $arg === $job2, + default => false, })) ; diff --git a/composer.json b/composer.json index 7a872d5..393f4b0 100644 --- a/composer.json +++ b/composer.json @@ -60,9 +60,10 @@ }, "require-dev": { "amphp/amp": "^2.5", + "friendsofphp/php-cs-fixer": "^3.75", "phpunit/phpunit": "^11", "portphp/portphp": "^1.9", - "rector/rector": "^1.0", + "rector/rector": "^2.0", "symfony/messenger": "^7.0" }, "suggest": { diff --git a/rector.php b/rector.php index 09c0a0f..92cd240 100644 --- a/rector.php +++ b/rector.php @@ -17,7 +17,7 @@ $rectorConfig->rule(InlineConstructorDefaultToPropertyRector::class); $rectorConfig->sets([ - SymfonySetList::SYMFONY_60, + SymfonySetList::SYMFONY_70, SymfonySetList::SYMFONY_CODE_QUALITY, SymfonySetList::SYMFONY_CONSTRUCTOR_INJECTION, LevelSetList::UP_TO_PHP_80, diff --git a/src/Command/AddScheduledDataflowCommand.php b/src/Command/AddScheduledDataflowCommand.php index 807fbcb..7a895ca 100644 --- a/src/Command/AddScheduledDataflowCommand.php +++ b/src/Command/AddScheduledDataflowCommand.php @@ -27,29 +27,27 @@ public function __construct(private DataflowTypeRegistryInterface $registry, pri parent::__construct(); } - /** - * {@inheritdoc} - */ protected function configure(): void { $this ->setHelp('The %command.name% allows you to create a new scheduled dataflow.') ->addOption('label', null, InputOption::VALUE_REQUIRED, 'Label of the scheduled dataflow') ->addOption('type', null, InputOption::VALUE_REQUIRED, 'Type of the scheduled dataflow (FQCN)') - ->addOption('options', null, InputOption::VALUE_OPTIONAL, - 'Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})') + ->addOption( + 'options', + null, + InputOption::VALUE_OPTIONAL, + 'Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})' + ) ->addOption('frequency', null, InputOption::VALUE_REQUIRED, 'Frequency of the scheduled dataflow') ->addOption('first_run', null, InputOption::VALUE_REQUIRED, 'Date for the first run of the scheduled dataflow (Y-m-d H:i:s)') ->addOption('enabled', null, InputOption::VALUE_REQUIRED, 'State of the scheduled dataflow') ->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use'); } - /** - * {@inheritdoc} - */ protected function execute(InputInterface $input, OutputInterface $output): int { - if (null !== $input->getOption('connection')) { + if ($input->getOption('connection') !== null) { $this->connectionFactory->setConnectionName($input->getOption('connection')); } $choices = []; @@ -71,13 +69,17 @@ protected function execute(InputInterface $input, OutputInterface $output): int } $options = $input->getOption('options'); if (!$options) { - $options = $io->ask('What are the launch options for the scheduled dataflow? (ex: {"option1": "value1", "option2": "value2"})', - json_encode([])); + $options = $io->ask( + 'What are the launch options for the scheduled dataflow? (ex: {"option1": "value1", "option2": "value2"})', + json_encode([]) + ); } $frequency = $input->getOption('frequency'); if (!$frequency) { - $frequency = $io->choice('What is the frequency for the scheduled dataflow?', - ScheduledDataflow::AVAILABLE_FREQUENCIES); + $frequency = $io->choice( + 'What is the frequency for the scheduled dataflow?', + ScheduledDataflow::AVAILABLE_FREQUENCIES + ); } $firstRun = $input->getOption('first_run'); if (!$firstRun) { @@ -92,22 +94,25 @@ protected function execute(InputInterface $input, OutputInterface $output): int 'id' => null, 'label' => $label, 'dataflow_type' => $type, - 'options' => json_decode($options, true, 512, JSON_THROW_ON_ERROR), + 'options' => json_decode($options, true, 512, \JSON_THROW_ON_ERROR), 'frequency' => $frequency, 'next' => new \DateTime($firstRun), 'enabled' => $enabled, ]); $errors = $this->validator->validate($newScheduledDataflow); - if (count($errors) > 0) { + if (\count($errors) > 0) { $io->error((string) $errors); return 2; } $this->scheduledDataflowRepository->save($newScheduledDataflow); - $io->success(sprintf('New scheduled dataflow "%s" (id:%d) was created successfully.', - $newScheduledDataflow->getLabel(), $newScheduledDataflow->getId())); + $io->success(\sprintf( + 'New scheduled dataflow "%s" (id:%d) was created successfully.', + $newScheduledDataflow->getLabel(), + $newScheduledDataflow->getId() + )); return 0; } diff --git a/src/Command/ChangeScheduleStatusCommand.php b/src/Command/ChangeScheduleStatusCommand.php index cca2565..35735fc 100644 --- a/src/Command/ChangeScheduleStatusCommand.php +++ b/src/Command/ChangeScheduleStatusCommand.php @@ -26,9 +26,6 @@ public function __construct(private ScheduledDataflowRepository $scheduledDatafl parent::__construct(); } - /** - * {@inheritdoc} - */ protected function configure(): void { $this @@ -39,12 +36,9 @@ protected function configure(): void ->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use'); } - /** - * {@inheritdoc} - */ protected function execute(InputInterface $input, OutputInterface $output): int { - if (null !== $input->getOption('connection')) { + if ($input->getOption('connection') !== null) { $this->connectionFactory->setConnectionName($input->getOption('connection')); } $io = new SymfonyStyle($input, $output); @@ -52,7 +46,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $schedule = $this->scheduledDataflowRepository->find((int) $input->getArgument('schedule-id')); if (!$schedule) { - $io->error(sprintf('Cannot find scheduled dataflow with id "%d".', $input->getArgument('schedule-id'))); + $io->error(\sprintf('Cannot find scheduled dataflow with id "%d".', $input->getArgument('schedule-id'))); return 1; } @@ -71,9 +65,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int try { $schedule->setEnabled($input->getOption('enable')); $this->scheduledDataflowRepository->save($schedule); - $io->success(sprintf('Schedule with id "%s" has been successfully updated.', $schedule->getId())); + $io->success(\sprintf('Schedule with id "%s" has been successfully updated.', $schedule->getId())); } catch (\Exception $e) { - $io->error(sprintf('An error occured when changing schedule status : "%s".', $e->getMessage())); + $io->error(\sprintf('An error occured when changing schedule status : "%s".', $e->getMessage())); return 4; } diff --git a/src/Command/DatabaseSchemaCommand.php b/src/Command/DatabaseSchemaCommand.php index 0371368..464df39 100644 --- a/src/Command/DatabaseSchemaCommand.php +++ b/src/Command/DatabaseSchemaCommand.php @@ -27,9 +27,6 @@ public function __construct(private ConnectionFactory $connectionFactory) parent::__construct(); } - /** - * {@inheritdoc} - */ protected function configure(): void { $this @@ -43,7 +40,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int { $io = new SymfonyStyle($input, $output); - if (null !== $input->getOption('connection')) { + if ($input->getOption('connection') !== null) { $this->connectionFactory->setConnectionName($input->getOption('connection')); } @@ -61,7 +58,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $tables = []; foreach ($sm->listTables() as $table) { /** @var Table $table */ - if (in_array($table->getName(), $tableArray)) { + if (\in_array($table->getName(), $tableArray)) { $tables[] = $table; } } @@ -90,14 +87,14 @@ protected function execute(InputInterface $input, OutputInterface $output): int if ($input->getOption('dump-sql')) { $io->text('Execute these SQL Queries on your database:'); foreach ($sqls as $sql) { - $io->text($sql . ';'); + $io->text($sql.';'); } return Command::SUCCESS; } if (!$io->askQuestion(new ConfirmationQuestion('Are you sure to update database ?', true))) { - $io->text("Execution canceled."); + $io->text('Execution canceled.'); return Command::SUCCESS; } @@ -106,7 +103,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $connection->executeQuery($sql); } - $io->success(sprintf('%d queries executed.', \count($sqls))); + $io->success(\sprintf('%d queries executed.', \count($sqls))); return parent::SUCCESS; } diff --git a/src/Command/ExecuteDataflowCommand.php b/src/Command/ExecuteDataflowCommand.php index 0f7fa8e..fa99cd5 100644 --- a/src/Command/ExecuteDataflowCommand.php +++ b/src/Command/ExecuteDataflowCommand.php @@ -4,8 +4,10 @@ namespace CodeRhapsodie\DataflowBundle\Command; +use CodeRhapsodie\DataflowBundle\DataflowType\AutoUpdateCountInterface; use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory; use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface; +use CodeRhapsodie\DataflowBundle\Repository\JobRepository; use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerAwareTrait; use Symfony\Component\Console\Attribute\AsCommand; @@ -26,18 +28,16 @@ class ExecuteDataflowCommand extends Command implements LoggerAwareInterface { use LoggerAwareTrait; - public function __construct(private DataflowTypeRegistryInterface $registry, private ConnectionFactory $connectionFactory) + public function __construct(private DataflowTypeRegistryInterface $registry, private ConnectionFactory $connectionFactory, private JobRepository $jobRepository) { parent::__construct(); } - /** - * {@inheritdoc} - */ protected function configure(): void { $this - ->setHelp(<<<'EOF' + ->setHelp( + <<<'EOF' The %command.name% command runs one dataflow with the provided options. php %command.full_name% App\Dataflow\MyDataflow '{"option1": "value1", "option2": "value2"}' @@ -48,19 +48,20 @@ protected function configure(): void ->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use'); } - /** - * {@inheritdoc} - */ protected function execute(InputInterface $input, OutputInterface $output): int { - if (null !== $input->getOption('connection')) { + if ($input->getOption('connection') !== null) { $this->connectionFactory->setConnectionName($input->getOption('connection')); } $fqcnOrAlias = $input->getArgument('fqcn'); - $options = json_decode($input->getArgument('options'), true, 512, JSON_THROW_ON_ERROR); + $options = json_decode($input->getArgument('options'), true, 512, \JSON_THROW_ON_ERROR); $io = new SymfonyStyle($input, $output); $dataflowType = $this->registry->getDataflowType($fqcnOrAlias); + if ($dataflowType instanceof AutoUpdateCountInterface) { + $dataflowType->setRepository($this->jobRepository); + } + if ($dataflowType instanceof LoggerAwareInterface && isset($this->logger)) { $dataflowType->setLogger($this->logger); } diff --git a/src/Command/JobShowCommand.php b/src/Command/JobShowCommand.php index 0bde6ec..67e5412 100644 --- a/src/Command/JobShowCommand.php +++ b/src/Command/JobShowCommand.php @@ -31,9 +31,6 @@ public function __construct(private JobRepository $jobRepository, private Connec parent::__construct(); } - /** - * {@inheritdoc} - */ protected function configure(): void { $this @@ -44,12 +41,9 @@ protected function configure(): void ->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use'); } - /** - * {@inheritdoc} - */ protected function execute(InputInterface $input, OutputInterface $output): int { - if (null !== $input->getOption('connection')) { + if ($input->getOption('connection') !== null) { $this->connectionFactory->setConnectionName($input->getOption('connection')); } @@ -73,7 +67,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int return 2; } - if (null === $job) { + if ($job === null) { $io->error('Cannot find job :/'); return 3; @@ -87,19 +81,19 @@ protected function execute(InputInterface $input, OutputInterface $output): int ['Started at', $job->getStartTime() ? $job->getStartTime()->format('Y-m-d H:i:s') : '-'], ['Ended at', $job->getEndTime() ? $job->getEndTime()->format('Y-m-d H:i:s') : '-'], ['Object number', $job->getCount()], - ['Errors', count((array) $job->getExceptions())], + ['Errors', \count((array) $job->getExceptions())], ['Status', $this->translateStatus($job->getStatus())], ]; if ($input->getOption('details')) { $display[] = ['Type', $job->getDataflowType()]; - $display[] = ['Options', json_encode($job->getOptions(), JSON_THROW_ON_ERROR)]; + $display[] = ['Options', json_encode($job->getOptions(), \JSON_THROW_ON_ERROR)]; $io->section('Summary'); } $io->table(['Field', 'Value'], $display); if ($input->getOption('details')) { $io->section('Exceptions'); - $exceptions = array_map(fn(string $exception) => substr($exception, 0, 900).'…', $job->getExceptions()); + $exceptions = array_map(fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions()); $io->write($exceptions); } diff --git a/src/Command/RunPendingDataflowsCommand.php b/src/Command/RunPendingDataflowsCommand.php index cc2045a..a08cae5 100644 --- a/src/Command/RunPendingDataflowsCommand.php +++ b/src/Command/RunPendingDataflowsCommand.php @@ -29,22 +29,17 @@ public function __construct(private ScheduledDataflowManagerInterface $manager, parent::__construct(); } - /** - * {@inheritdoc} - */ protected function configure(): void { $this - ->setHelp(<<<'EOF' + ->setHelp( + <<<'EOF' The %command.name% command runs dataflows according to the schedule defined in the UI by the user. EOF ) ->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use'); } - /** - * {@inheritdoc} - */ protected function execute(InputInterface $input, OutputInterface $output): int { if (!$this->lock()) { @@ -53,7 +48,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int return 0; } - if (null !== $input->getOption('connection')) { + if ($input->getOption('connection') !== null) { $this->connectionFactory->setConnectionName($input->getOption('connection')); } diff --git a/src/Command/ScheduleListCommand.php b/src/Command/ScheduleListCommand.php index ee08524..d4a187c 100644 --- a/src/Command/ScheduleListCommand.php +++ b/src/Command/ScheduleListCommand.php @@ -24,9 +24,6 @@ public function __construct(private ScheduledDataflowRepository $scheduledDatafl parent::__construct(); } - /** - * {@inheritdoc} - */ protected function configure(): void { $this @@ -34,12 +31,9 @@ protected function configure(): void ->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use'); } - /** - * {@inheritdoc} - */ protected function execute(InputInterface $input, OutputInterface $output): int { - if (null !== $input->getOption('connection')) { + if ($input->getOption('connection') !== null) { $this->connectionFactory->setConnectionName($input->getOption('connection')); } $io = new SymfonyStyle($input, $output); diff --git a/src/Command/SchemaCommand.php b/src/Command/SchemaCommand.php index 39fe0bd..237724c 100644 --- a/src/Command/SchemaCommand.php +++ b/src/Command/SchemaCommand.php @@ -4,13 +4,6 @@ namespace CodeRhapsodie\DataflowBundle\Command; -use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory; -use CodeRhapsodie\DataflowBundle\Repository\JobRepository; -use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository; -use CodeRhapsodie\DataflowBundle\SchemaProvider\DataflowSchemaProvider; -use Doctrine\DBAL\Schema\Comparator; -use Doctrine\DBAL\Schema\Schema; -use Doctrine\DBAL\Schema\Table; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\ArrayInput; @@ -21,14 +14,12 @@ /** * @codeCoverageIgnore + * * @deprecated This command is deprecated and will be removed in 6.0, use this command "code-rhapsodie:dataflow:database-schema" instead. */ #[AsCommand('code-rhapsodie:dataflow:dump-schema', 'Generates schema create / update SQL queries')] class SchemaCommand extends Command { - /** - * {@inheritdoc} - */ protected function configure(): void { $this @@ -38,9 +29,6 @@ protected function configure(): void ; } - /** - * {@inheritdoc} - */ protected function execute(InputInterface $input, OutputInterface $output): int { $io = new SymfonyStyle($input, $output); @@ -48,9 +36,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int $options = array_filter($input->getOptions()); - //add -- before each keys + // add -- before each keys $options = array_combine( - array_map(fn($key) => '--' . $key, array_keys($options)), + array_map(fn ($key) => '--'.$key, array_keys($options)), array_values($options) ); @@ -58,7 +46,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $inputArray = new ArrayInput([ 'command' => 'code-rhapsodie:dataflow:database-schema', - ...$options + ...$options, ]); return $this->getApplication()->doRun($inputArray, $output); diff --git a/src/DataflowType/AMPAsyncDataflowBuilder.php b/src/DataflowType/AMPAsyncDataflowBuilder.php index 7f17b96..b50eebf 100644 --- a/src/DataflowType/AMPAsyncDataflowBuilder.php +++ b/src/DataflowType/AMPAsyncDataflowBuilder.php @@ -7,7 +7,6 @@ use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\AMPAsyncDataflow; use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\DataflowInterface; use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface; -use Symfony\Component\OptionsResolver\OptionsResolver; class AMPAsyncDataflowBuilder extends DataflowBuilder { diff --git a/src/DataflowType/AbstractDataflowType.php b/src/DataflowType/AbstractDataflowType.php index 57dc7ca..543b173 100644 --- a/src/DataflowType/AbstractDataflowType.php +++ b/src/DataflowType/AbstractDataflowType.php @@ -4,15 +4,20 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType; +use CodeRhapsodie\DataflowBundle\Repository\JobRepository; use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerAwareTrait; use Psr\Log\LoggerInterface; use Symfony\Component\OptionsResolver\OptionsResolver; -abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface +abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface, AutoUpdateCountInterface { use LoggerAwareTrait; + private JobRepository $repository; + + private ?\DateTime $saveDate = null; + /** * @codeCoverageIgnore */ @@ -21,14 +26,24 @@ public function getAliases(): iterable return []; } - public function process(array $options): Result + public function process(array $options, ?int $jobId = null): Result { + $this->saveDate = new \DateTime('+1 minute'); + $optionsResolver = new OptionsResolver(); $this->configureOptions($optionsResolver); $options = $optionsResolver->resolve($options); $builder = $this->createDataflowBuilder(); $builder->setName($this->getLabel()); + $builder->addAfterItemProcessor(function (int|string $index, mixed $item, int $count) use ($jobId) { + if ($jobId === null || $this->saveDate > new \DateTime()) { + return; + } + + $this->repository->updateCount($jobId, $count); + $this->saveDate = new \DateTime('+1 minute'); + }); $this->buildDataflow($builder, $options); $dataflow = $builder->getDataflow(); if ($dataflow instanceof LoggerAwareInterface && $this->logger instanceof LoggerInterface) { @@ -38,6 +53,11 @@ public function process(array $options): Result return $dataflow->process(); } + public function setRepository(JobRepository $repository): void + { + $this->repository = $repository; + } + protected function createDataflowBuilder(): DataflowBuilder { return new DataflowBuilder(); diff --git a/src/DataflowType/AutoUpdateCountInterface.php b/src/DataflowType/AutoUpdateCountInterface.php new file mode 100644 index 0000000..ccdfdb1 --- /dev/null +++ b/src/DataflowType/AutoUpdateCountInterface.php @@ -0,0 +1,12 @@ +isResolved() in version 2.5 + $resolved = false; // missing $deferred->isResolved() in version 2.5 $producer = new Producer(function (callable $emit) { foreach ($this->reader as $index => $item) { yield new Delayed($this->emitInterval); @@ -89,7 +87,7 @@ public function process(): Result $it = $producer->getCurrent(); [$index, $item] = $it; $this->states[$index] = [$index, 0, $item]; - } elseif (!$resolved && 0 === count($this->states)) { + } elseif (!$resolved && \count($this->states) === 0) { $resolved = true; $deferred->resolve(); } @@ -120,20 +118,20 @@ public function process(): Result private function processState(mixed $state, int &$count, array &$exceptions): void { [$readIndex, $stepIndex, $item] = $state; - if ($stepIndex < count($this->steps)) { + if ($stepIndex < \count($this->steps)) { if (!isset($this->stepsJobs[$stepIndex])) { $this->stepsJobs[$stepIndex] = []; } [$step, $scale] = $this->steps[$stepIndex]; - if ((is_countable($this->stepsJobs[$stepIndex]) ? count($this->stepsJobs[$stepIndex]) : 0) < $scale && !isset($this->stepsJobs[$stepIndex][$readIndex])) { + if ((is_countable($this->stepsJobs[$stepIndex]) ? \count($this->stepsJobs[$stepIndex]) : 0) < $scale && !isset($this->stepsJobs[$stepIndex][$readIndex])) { $this->stepsJobs[$stepIndex][$readIndex] = true; /** @var Promise $promise */ $promise = coroutine($step)($item); - $promise->onResolve(function (?Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$exceptions) { + $promise->onResolve(function (?\Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$exceptions) { if ($exception) { $exceptions[$stepIndex] = $exception; $this->logException($exception, (string) $stepIndex); - } elseif (false === $newItem) { + } elseif ($newItem === false) { unset($this->states[$readIndex]); } else { $this->states[$readIndex] = [$readIndex, $stepIndex + 1, $newItem]; @@ -153,7 +151,7 @@ private function processState(mixed $state, int &$count, array &$exceptions): vo } } - private function logException(Throwable $e, ?string $index = null): void + private function logException(\Throwable $e, ?string $index = null): void { if (!isset($this->logger)) { return; diff --git a/src/DataflowType/Dataflow/Dataflow.php b/src/DataflowType/Dataflow/Dataflow.php index 11a6866..dbbb761 100644 --- a/src/DataflowType/Dataflow/Dataflow.php +++ b/src/DataflowType/Dataflow/Dataflow.php @@ -21,6 +21,13 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface private ?\Closure $customExceptionIndex = null; + private ?\DateTimeInterface $dateTime = null; + + /** + * @var \Closure[] + */ + private array $afterItemProcessors = []; + public function __construct(private iterable $reader, private ?string $name) { } @@ -56,8 +63,15 @@ public function setCustomExceptionIndex(callable $callable): self } /** - * {@inheritdoc} + * @param array $processors */ + public function setAfterItemProcessors(array $processors): self + { + $this->afterItemProcessors = array_map(fn (callable $callable) => \Closure::fromCallable($callable), $processors); + + return $this; + } + public function process(): Result { $count = 0; @@ -75,7 +89,7 @@ public function process(): Result } catch (\Throwable $e) { $exceptionIndex = $index; try { - if (is_callable($this->customExceptionIndex)) { + if (\is_callable($this->customExceptionIndex)) { $exceptionIndex = (string) ($this->customExceptionIndex)($item, $index); } } catch (\Throwable $e2) { @@ -87,6 +101,10 @@ public function process(): Result } ++$count; + + foreach ($this->afterItemProcessors as $afterItemProcessor) { + $afterItemProcessor($index, $item, $count); + } } foreach ($this->writers as $writer) { @@ -103,9 +121,9 @@ public function process(): Result private function processItem(mixed $item): void { foreach ($this->steps as $step) { - $item = call_user_func($step, $item); + $item = \call_user_func($step, $item); - if (false === $item) { + if ($item === false) { return; } } diff --git a/src/DataflowType/DataflowBuilder.php b/src/DataflowType/DataflowBuilder.php index acf3b23..fab6c0b 100644 --- a/src/DataflowType/DataflowBuilder.php +++ b/src/DataflowType/DataflowBuilder.php @@ -18,6 +18,10 @@ class DataflowBuilder private array $writers = []; private ?\Closure $customExceptionIndex = null; + /** + * @var \Closure[] + */ + private array $afterItemProcessors = []; public function setName(string $name): self { @@ -54,6 +58,13 @@ public function setCustomExceptionIndex(callable $callable): self return $this; } + public function addAfterItemProcessor(callable $callable): self + { + $this->afterItemProcessors[] = \Closure::fromCallable($callable); + + return $this; + } + public function getDataflow(): DataflowInterface { $dataflow = new Dataflow($this->reader, $this->name); @@ -69,10 +80,12 @@ public function getDataflow(): DataflowInterface $dataflow->addWriter($writer); } - if (is_callable($this->customExceptionIndex)) { + if (\is_callable($this->customExceptionIndex)) { $dataflow->setCustomExceptionIndex($this->customExceptionIndex); } + $dataflow->setAfterItemProcessors($this->afterItemProcessors); + return $dataflow; } } diff --git a/src/DataflowType/DataflowTypeInterface.php b/src/DataflowType/DataflowTypeInterface.php index 079132c..d64b111 100644 --- a/src/DataflowType/DataflowTypeInterface.php +++ b/src/DataflowType/DataflowTypeInterface.php @@ -10,5 +10,5 @@ public function getLabel(): string; public function getAliases(): iterable; - public function process(array $options): Result; + public function process(array $options, ?int $jobId = null): Result; } diff --git a/src/DataflowType/Result.php b/src/DataflowType/Result.php index 6bd8bf3..5c8716b 100644 --- a/src/DataflowType/Result.php +++ b/src/DataflowType/Result.php @@ -20,7 +20,7 @@ class Result public function __construct(private string $name, private \DateTimeInterface $startTime, private \DateTimeInterface $endTime, private int $totalProcessedCount, array $exceptions) { $this->elapsed = $startTime->diff($endTime); - $this->errorCount = count($exceptions); + $this->errorCount = \count($exceptions); $this->successCount = $totalProcessedCount - $this->errorCount; $this->exceptions = $exceptions; } diff --git a/src/DataflowType/Writer/CollectionWriter.php b/src/DataflowType/Writer/CollectionWriter.php index e0e4100..704e675 100644 --- a/src/DataflowType/Writer/CollectionWriter.php +++ b/src/DataflowType/Writer/CollectionWriter.php @@ -18,21 +18,15 @@ public function __construct(private WriterInterface $writer) { } - /** - * {@inheritdoc} - */ public function prepare() { $this->writer->prepare(); } - /** - * {@inheritdoc} - */ public function write($collection) { if (!is_iterable($collection)) { - throw new UnsupportedItemTypeException(sprintf('Item to write was expected to be an iterable, received %s.', get_debug_type($collection))); + throw new UnsupportedItemTypeException(\sprintf('Item to write was expected to be an iterable, received %s.', get_debug_type($collection))); } foreach ($collection as $item) { @@ -40,17 +34,11 @@ public function write($collection) } } - /** - * {@inheritdoc} - */ public function finish() { $this->writer->finish(); } - /** - * {@inheritdoc} - */ public function supports($item): bool { return is_iterable($item); diff --git a/src/DataflowType/Writer/DelegateWriterInterface.php b/src/DataflowType/Writer/DelegateWriterInterface.php index 6d3463b..1ea8c77 100644 --- a/src/DataflowType/Writer/DelegateWriterInterface.php +++ b/src/DataflowType/Writer/DelegateWriterInterface.php @@ -11,8 +11,6 @@ interface DelegateWriterInterface extends WriterInterface { /** * Returns true if the argument is of a supported type. - * - * @param $item */ public function supports($item): bool; } diff --git a/src/DataflowType/Writer/DelegatorWriter.php b/src/DataflowType/Writer/DelegatorWriter.php index 522f650..de2ada3 100644 --- a/src/DataflowType/Writer/DelegatorWriter.php +++ b/src/DataflowType/Writer/DelegatorWriter.php @@ -21,9 +21,6 @@ public function __construct() { } - /** - * {@inheritdoc} - */ public function prepare() { foreach ($this->delegates as $delegate) { @@ -31,9 +28,6 @@ public function prepare() } } - /** - * {@inheritdoc} - */ public function write($item) { foreach ($this->delegates as $delegate) { @@ -46,12 +40,9 @@ public function write($item) return; } - throw new UnsupportedItemTypeException(sprintf('None of the registered delegate writers support the received item of type %s', get_debug_type($item))); + throw new UnsupportedItemTypeException(\sprintf('None of the registered delegate writers support the received item of type %s', get_debug_type($item))); } - /** - * {@inheritdoc} - */ public function finish() { foreach ($this->delegates as $delegate) { @@ -59,9 +50,6 @@ public function finish() } } - /** - * {@inheritdoc} - */ public function supports($item): bool { foreach ($this->delegates as $delegate) { diff --git a/src/DependencyInjection/Compiler/BusCompilerPass.php b/src/DependencyInjection/Compiler/BusCompilerPass.php index 4021449..5a8114f 100644 --- a/src/DependencyInjection/Compiler/BusCompilerPass.php +++ b/src/DependencyInjection/Compiler/BusCompilerPass.php @@ -12,9 +12,6 @@ class BusCompilerPass implements CompilerPassInterface { - /** - * {@inheritdoc} - */ public function process(ContainerBuilder $container) { if (!$container->hasParameter('coderhapsodie.dataflow.bus')) { @@ -23,7 +20,7 @@ public function process(ContainerBuilder $container) $bus = $container->getParameter('coderhapsodie.dataflow.bus'); if (!$container->has($bus)) { - throw new InvalidArgumentException(sprintf('Service "%s" not found', $bus)); + throw new InvalidArgumentException(\sprintf('Service "%s" not found', $bus)); } if (!$container->has(MessengerDataflowRunner::class)) { diff --git a/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php b/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php index 808abf6..2c0ba24 100644 --- a/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php +++ b/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php @@ -16,9 +16,6 @@ */ class DataflowTypeCompilerPass implements CompilerPassInterface { - /** - * {@inheritdoc} - */ public function process(ContainerBuilder $container) { if (!$container->has(DataflowTypeRegistry::class)) { diff --git a/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php b/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php index 96e5b9c..e3c3a41 100644 --- a/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php +++ b/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php @@ -12,9 +12,6 @@ class DefaultLoggerCompilerPass implements CompilerPassInterface { - /** - * {@inheritdoc} - */ public function process(ContainerBuilder $container) { $defaultLogger = $container->getParameter('coderhapsodie.dataflow.default_logger'); diff --git a/src/DependencyInjection/Configuration.php b/src/DependencyInjection/Configuration.php index d8c1c68..a4bf333 100644 --- a/src/DependencyInjection/Configuration.php +++ b/src/DependencyInjection/Configuration.php @@ -10,7 +10,7 @@ class Configuration implements ConfigurationInterface { - public function getConfigTreeBuilder(): \Symfony\Component\Config\Definition\Builder\TreeBuilder + public function getConfigTreeBuilder(): TreeBuilder { $treeBuilder = new TreeBuilder('code_rhapsodie_dataflow'); $rootNode = $treeBuilder->getRootNode(); @@ -34,7 +34,7 @@ public function getConfigTreeBuilder(): \Symfony\Component\Config\Definition\Bui ->end() ->end() ->validate() - ->ifTrue(static fn($v): bool => $v['enabled'] && !interface_exists(MessageBusInterface::class)) + ->ifTrue(static fn ($v): bool => $v['enabled'] && !interface_exists(MessageBusInterface::class)) ->thenInvalid('You need "symfony/messenger" in order to use Dataflow messenger mode.') ->end() ->end() diff --git a/src/Entity/Job.php b/src/Entity/Job.php index 571d7bf..cd931a6 100644 --- a/src/Entity/Job.php +++ b/src/Entity/Job.php @@ -75,19 +75,19 @@ public static function createFromScheduledDataflow(ScheduledDataflow $scheduled) public static function createFromArray(array $datas) { $lost = array_diff(static::KEYS, array_keys($datas)); - if (count($lost) > 0) { + if (\count($lost) > 0) { throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"'); } $job = new self(); - $job->id = null === $datas['id'] ? null : (int) $datas['id']; - $job->setStatus(null === $datas['status'] ? null : (int) $datas['status']); + $job->id = $datas['id'] === null ? null : (int) $datas['id']; + $job->setStatus($datas['status'] === null ? null : (int) $datas['status']); $job->setLabel($datas['label']); $job->setDataflowType($datas['dataflow_type']); $job->setOptions($datas['options']); $job->setRequestedDate($datas['requested_date']); - $job->setScheduledDataflowId(null === $datas['scheduled_dataflow_id'] ? null : (int) $datas['scheduled_dataflow_id']); - $job->setCount(null === $datas['count'] ? null : (int) $datas['count']); + $job->setScheduledDataflowId($datas['scheduled_dataflow_id'] === null ? null : (int) $datas['scheduled_dataflow_id']); + $job->setCount($datas['count'] === null ? null : (int) $datas['count']); $job->setExceptions($datas['exceptions']); $job->setStartTime($datas['start_time']); $job->setEndTime($datas['end_time']); @@ -112,7 +112,7 @@ public function toArray(): array ]; } - public function setId(int $id): Job + public function setId(int $id): self { $this->id = $id; @@ -129,7 +129,7 @@ public function getStatus(): int return $this->status; } - public function setStatus(int $status): Job + public function setStatus(int $status): self { $this->status = $status; @@ -141,7 +141,7 @@ public function getLabel(): ?string return $this->label; } - public function setLabel(?string $label): Job + public function setLabel(?string $label): self { $this->label = $label; @@ -153,7 +153,7 @@ public function getDataflowType(): ?string return $this->dataflowType; } - public function setDataflowType(?string $dataflowType): Job + public function setDataflowType(?string $dataflowType): self { $this->dataflowType = $dataflowType; @@ -165,7 +165,7 @@ public function getOptions(): ?array return $this->options; } - public function setOptions(?array $options): Job + public function setOptions(?array $options): self { $this->options = $options; @@ -177,7 +177,7 @@ public function getRequestedDate(): ?\DateTimeInterface return $this->requestedDate; } - public function setRequestedDate(?\DateTimeInterface $requestedDate): Job + public function setRequestedDate(?\DateTimeInterface $requestedDate): self { $this->requestedDate = $requestedDate; @@ -189,7 +189,7 @@ public function getScheduledDataflowId(): ?int return $this->scheduledDataflowId; } - public function setScheduledDataflowId(?int $scheduledDataflowId): Job + public function setScheduledDataflowId(?int $scheduledDataflowId): self { $this->scheduledDataflowId = $scheduledDataflowId; @@ -201,7 +201,7 @@ public function getCount(): ?int return $this->count; } - public function setCount(?int $count): Job + public function setCount(?int $count): self { $this->count = $count; @@ -213,7 +213,7 @@ public function getExceptions(): ?array return $this->exceptions; } - public function setExceptions(?array $exceptions): Job + public function setExceptions(?array $exceptions): self { $this->exceptions = $exceptions; @@ -225,7 +225,7 @@ public function getStartTime(): ?\DateTimeInterface return $this->startTime; } - public function setStartTime(?\DateTimeInterface $startTime): Job + public function setStartTime(?\DateTimeInterface $startTime): self { $this->startTime = $startTime; @@ -237,7 +237,7 @@ public function getEndTime(): ?\DateTimeInterface return $this->endTime; } - public function setEndTime(?\DateTimeInterface $endTime): Job + public function setEndTime(?\DateTimeInterface $endTime): self { $this->endTime = $endTime; diff --git a/src/Entity/ScheduledDataflow.php b/src/Entity/ScheduledDataflow.php index 70b7ca0..3f138cd 100644 --- a/src/Entity/ScheduledDataflow.php +++ b/src/Entity/ScheduledDataflow.php @@ -50,19 +50,19 @@ class ScheduledDataflow public static function createFromArray(array $datas) { $lost = array_diff(static::KEYS, array_keys($datas)); - if (count($lost) > 0) { + if (\count($lost) > 0) { throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"'); } $scheduledDataflow = new self(); - $scheduledDataflow->id = null === $datas['id'] ? null : (int) $datas['id']; + $scheduledDataflow->id = $datas['id'] === null ? null : (int) $datas['id']; $scheduledDataflow->setLabel($datas['label']); $scheduledDataflow->setDataflowType($datas['dataflow_type']); $scheduledDataflow->setOptions($datas['options']); $scheduledDataflow->setFrequency($datas['frequency']); $scheduledDataflow->setNext($datas['next']); - $scheduledDataflow->setEnabled(null === $datas['enabled'] ? null : (bool) $datas['enabled']); + $scheduledDataflow->setEnabled($datas['enabled'] === null ? null : (bool) $datas['enabled']); return $scheduledDataflow; } @@ -80,7 +80,7 @@ public function toArray(): array ]; } - public function setId(int $id): ScheduledDataflow + public function setId(int $id): self { $this->id = $id; @@ -97,7 +97,7 @@ public function getLabel(): ?string return $this->label; } - public function setLabel(?string $label): ScheduledDataflow + public function setLabel(?string $label): self { $this->label = $label; @@ -109,7 +109,7 @@ public function getDataflowType(): ?string return $this->dataflowType; } - public function setDataflowType(?string $dataflowType): ScheduledDataflow + public function setDataflowType(?string $dataflowType): self { $this->dataflowType = $dataflowType; @@ -121,7 +121,7 @@ public function getOptions(): ?array return $this->options; } - public function setOptions(?array $options): ScheduledDataflow + public function setOptions(?array $options): self { $this->options = $options; @@ -133,7 +133,7 @@ public function getFrequency(): ?string return $this->frequency; } - public function setFrequency(?string $frequency): ScheduledDataflow + public function setFrequency(?string $frequency): self { $this->frequency = $frequency; @@ -145,7 +145,7 @@ public function getNext(): ?\DateTimeInterface return $this->next; } - public function setNext(?\DateTimeInterface $next): ScheduledDataflow + public function setNext(?\DateTimeInterface $next): self { $this->next = $next; @@ -157,7 +157,7 @@ public function getEnabled(): ?bool return $this->enabled; } - public function setEnabled(?bool $enabled): ScheduledDataflow + public function setEnabled(?bool $enabled): self { $this->enabled = $enabled; diff --git a/src/Exceptions/UnknownDataflowTypeException.php b/src/Exceptions/UnknownDataflowTypeException.php index 7f34343..819c9f0 100644 --- a/src/Exceptions/UnknownDataflowTypeException.php +++ b/src/Exceptions/UnknownDataflowTypeException.php @@ -11,7 +11,7 @@ class UnknownDataflowTypeException extends \Exception { public static function create(string $aliasOrFqcn, array $knownDataflowTypes): self { - return new self(sprintf( + return new self(\sprintf( 'Unknown dataflow type FQCN or alias "%s". Registered dataflow types FQCN and aliases are %s.', $aliasOrFqcn, implode(', ', $knownDataflowTypes) diff --git a/src/Factory/ConnectionFactory.php b/src/Factory/ConnectionFactory.php index 0314388..04da339 100644 --- a/src/Factory/ConnectionFactory.php +++ b/src/Factory/ConnectionFactory.php @@ -24,6 +24,6 @@ public function setConnectionName(string $connectionName) public function getConnection(): \Doctrine\DBAL\Connection { - return $this->container->get(sprintf('doctrine.dbal.%s_connection', $this->connectionName)); + return $this->container->get(\sprintf('doctrine.dbal.%s_connection', $this->connectionName)); } } diff --git a/src/Logger/BufferHandler.php b/src/Logger/BufferHandler.php index 9b8ceae..cc6bc0a 100644 --- a/src/Logger/BufferHandler.php +++ b/src/Logger/BufferHandler.php @@ -7,7 +7,6 @@ use Monolog\Formatter\FormatterInterface; use Monolog\Formatter\LineFormatter; use Monolog\Handler\AbstractProcessingHandler; -use Monolog\Logger; use Monolog\LogRecord; class BufferHandler extends AbstractProcessingHandler diff --git a/src/Logger/DelegatingLogger.php b/src/Logger/DelegatingLogger.php index f4675e1..fc7a029 100644 --- a/src/Logger/DelegatingLogger.php +++ b/src/Logger/DelegatingLogger.php @@ -16,7 +16,7 @@ public function __construct(iterable $loggers) { foreach ($loggers as $logger) { if (!$logger instanceof LoggerInterface) { - throw new \InvalidArgumentException(sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, $logger::class)); + throw new \InvalidArgumentException(\sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, $logger::class)); } $this->loggers[] = $logger; diff --git a/src/Manager/ScheduledDataflowManager.php b/src/Manager/ScheduledDataflowManager.php index 90d6bd0..cbb0f23 100644 --- a/src/Manager/ScheduledDataflowManager.php +++ b/src/Manager/ScheduledDataflowManager.php @@ -19,15 +19,12 @@ public function __construct(private Connection $connection, private ScheduledDat { } - /** - * {@inheritdoc} - */ public function createJobsFromScheduledDataflows(): void { $this->connection->beginTransaction(); try { foreach ($this->scheduledDataflowRepository->findReadyToRun() as $scheduled) { - if (null !== $this->jobRepository->findPendingForScheduledDataflow($scheduled)) { + if ($this->jobRepository->findPendingForScheduledDataflow($scheduled) !== null) { continue; } diff --git a/src/Processor/JobProcessor.php b/src/Processor/JobProcessor.php index b2266f9..5f3182b 100644 --- a/src/Processor/JobProcessor.php +++ b/src/Processor/JobProcessor.php @@ -4,6 +4,7 @@ namespace CodeRhapsodie\DataflowBundle\Processor; +use CodeRhapsodie\DataflowBundle\DataflowType\RepositoryInterface; use CodeRhapsodie\DataflowBundle\DataflowType\Result; use CodeRhapsodie\DataflowBundle\Entity\Job; use CodeRhapsodie\DataflowBundle\Event\Events; @@ -30,6 +31,10 @@ public function process(Job $job): void $this->beforeProcessing($job); $dataflowType = $this->registry->getDataflowType($job->getDataflowType()); + if ($dataflowType instanceof RepositoryInterface) { + $dataflowType->setRepository($this->repository); + } + $loggers = [new Logger('dataflow_internal', [$bufferHandler = new BufferHandler()])]; if (isset($this->logger)) { $loggers[] = $this->logger; @@ -40,7 +45,7 @@ public function process(Job $job): void $dataflowType->setLogger($logger); } - $result = $dataflowType->process($job->getOptions()); + $result = $dataflowType->process($job->getOptions(), $job->getId()); if (!$dataflowType instanceof LoggerAwareInterface) { foreach ($result->getExceptions() as $index => $e) { diff --git a/src/Registry/DataflowTypeRegistry.php b/src/Registry/DataflowTypeRegistry.php index 4855f95..f2bd10a 100644 --- a/src/Registry/DataflowTypeRegistry.php +++ b/src/Registry/DataflowTypeRegistry.php @@ -18,9 +18,6 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface /** @var array|DataflowTypeInterface[] */ private array $aliasesRegistry = []; - /** - * {@inheritdoc} - */ public function getDataflowType(string $fqcnOrAlias): DataflowTypeInterface { if (isset($this->fqcnRegistry[$fqcnOrAlias])) { @@ -34,17 +31,11 @@ public function getDataflowType(string $fqcnOrAlias): DataflowTypeInterface throw UnknownDataflowTypeException::create($fqcnOrAlias, [...array_keys($this->fqcnRegistry), ...array_keys($this->aliasesRegistry)]); } - /** - * {@inheritdoc} - */ public function listDataflowTypes(): iterable { return $this->fqcnRegistry; } - /** - * {@inheritdoc} - */ public function registerDataflowType(DataflowTypeInterface $dataflowType): void { $this->fqcnRegistry[$dataflowType::class] = $dataflowType; diff --git a/src/Repository/InitFromDbTrait.php b/src/Repository/InitFromDbTrait.php index 8581f77..5932d9d 100644 --- a/src/Repository/InitFromDbTrait.php +++ b/src/Repository/InitFromDbTrait.php @@ -14,7 +14,7 @@ abstract private function getFields(): array; private function initDateTime(array $datas): array { foreach ($this->getFields() as $key => $type) { - if ('datetime' === $type && null !== $datas[$key]) { + if ($type === 'datetime' && $datas[$key] !== null) { $datas[$key] = new \DateTime($datas[$key]); } } @@ -24,10 +24,10 @@ private function initDateTime(array $datas): array private function initArray(array $datas): array { - if (!is_array($datas['options'])) { + if (!\is_array($datas['options'])) { $datas['options'] = $this->strToArray($datas['options']); } - if (array_key_exists('exceptions', $datas) && !is_array($datas['exceptions'])) { + if (\array_key_exists('exceptions', $datas) && !\is_array($datas['exceptions'])) { $datas['exceptions'] = $this->strToArray($datas['exceptions']); } @@ -36,12 +36,12 @@ private function initArray(array $datas): array private function strToArray($value): array { - if (null === $value) { + if ($value === null) { return []; } - $array = json_decode($value, true, 512, JSON_THROW_ON_ERROR); + $array = json_decode($value, true, 512, \JSON_THROW_ON_ERROR); - return (false === $array) ? [] : $array; + return ($array === false) ? [] : $array; } } diff --git a/src/Repository/JobRepository.php b/src/Repository/JobRepository.php index 177644a..92d36a6 100644 --- a/src/Repository/JobRepository.php +++ b/src/Repository/JobRepository.php @@ -45,7 +45,7 @@ public function findOneshotDataflows(): iterable ->andWhere($qb->expr()->isNull('scheduled_dataflow_id')) ->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER))); $stmt = $qb->executeQuery(); - if (0 === $stmt->rowCount()) { + if ($stmt->rowCount() === 0) { return []; } while (false !== ($row = $stmt->fetchAssociative())) { @@ -93,7 +93,7 @@ public function findLatests(): iterable ->orderBy('requested_date', 'DESC') ->setMaxResults(20); $stmt = $qb->executeQuery(); - if (0 === $stmt->rowCount()) { + if ($stmt->rowCount() === 0) { return []; } while (false !== ($row = $stmt->fetchAssociative())) { @@ -108,7 +108,7 @@ public function findForScheduled(int $id): iterable ->orderBy('requested_date', 'DESC') ->setMaxResults(20); $stmt = $qb->executeQuery(); - if (0 === $stmt->rowCount()) { + if ($stmt->rowCount() === 0) { return []; } while (false !== ($row = $stmt->fetchAssociative())) { @@ -121,14 +121,14 @@ public function save(Job $job) $datas = $job->toArray(); unset($datas['id']); - if (is_array($datas['options'])) { - $datas['options'] = json_encode($datas['options'], JSON_THROW_ON_ERROR); + if (\is_array($datas['options'])) { + $datas['options'] = json_encode($datas['options'], \JSON_THROW_ON_ERROR); } - if (is_array($datas['exceptions'])) { - $datas['exceptions'] = json_encode($datas['exceptions'], JSON_THROW_ON_ERROR); + if (\is_array($datas['exceptions'])) { + $datas['exceptions'] = json_encode($datas['exceptions'], \JSON_THROW_ON_ERROR); } - if (null === $job->getId()) { + if ($job->getId() === null) { $this->connection->insert(static::TABLE_NAME, $datas, $this->getFields()); $job->setId((int) $this->connection->lastInsertId()); @@ -137,6 +137,11 @@ public function save(Job $job) $this->connection->update(static::TABLE_NAME, $datas, ['id' => $job->getId()], $this->getFields()); } + public function updateCount(int $jobId, int $count): void + { + $this->connection->update(static::TABLE_NAME, ['count' => $count], ['id' => $jobId]); + } + public function createQueryBuilder($alias = null): QueryBuilder { $qb = $this->connection->createQueryBuilder(); @@ -149,7 +154,7 @@ public function createQueryBuilder($alias = null): QueryBuilder private function returnFirstOrNull(QueryBuilder $qb): ?Job { $stmt = $qb->executeQuery(); - if (0 === $stmt->rowCount()) { + if ($stmt->rowCount() === 0) { return null; } diff --git a/src/Repository/ScheduledDataflowRepository.php b/src/Repository/ScheduledDataflowRepository.php index 219ae0f..dd9d1e3 100644 --- a/src/Repository/ScheduledDataflowRepository.php +++ b/src/Repository/ScheduledDataflowRepository.php @@ -41,7 +41,7 @@ public function findReadyToRun(): iterable ; $stmt = $qb->executeQuery(); - if (0 === $stmt->rowCount()) { + if ($stmt->rowCount() === 0) { return []; } while (false !== ($row = $stmt->fetchAssociative())) { @@ -65,7 +65,7 @@ public function findAllOrderedByLabel(): iterable $qb->orderBy('label', 'ASC'); $stmt = $qb->executeQuery(); - if (0 === $stmt->rowCount()) { + if ($stmt->rowCount() === 0) { return []; } while (false !== ($row = $stmt->fetchAssociative())) { @@ -90,11 +90,11 @@ public function save(ScheduledDataflow $scheduledDataflow) $datas = $scheduledDataflow->toArray(); unset($datas['id']); - if (is_array($datas['options'])) { - $datas['options'] = json_encode($datas['options'], JSON_THROW_ON_ERROR); + if (\is_array($datas['options'])) { + $datas['options'] = json_encode($datas['options'], \JSON_THROW_ON_ERROR); } - if (null === $scheduledDataflow->getId()) { + if ($scheduledDataflow->getId() === null) { $this->connection->insert(static::TABLE_NAME, $datas, $this->getFields()); $scheduledDataflow->setId((int) $this->connection->lastInsertId()); @@ -129,7 +129,7 @@ public function createQueryBuilder($alias = null): QueryBuilder private function returnFirstOrNull(QueryBuilder $qb): ?ScheduledDataflow { $stmt = $qb->executeQuery(); - if (0 === $stmt->rowCount()) { + if ($stmt->rowCount() === 0) { return null; } diff --git a/src/Resources/config/services.yaml b/src/Resources/config/services.yaml index eba8e1e..3233846 100644 --- a/src/Resources/config/services.yaml +++ b/src/Resources/config/services.yaml @@ -23,6 +23,7 @@ services: arguments: $registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface' $connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory' + $jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository' tags: ['console.command'] CodeRhapsodie\DataflowBundle\Command\JobShowCommand: diff --git a/src/Runner/PendingDataflowRunner.php b/src/Runner/PendingDataflowRunner.php index 3c13b73..36ff37e 100644 --- a/src/Runner/PendingDataflowRunner.php +++ b/src/Runner/PendingDataflowRunner.php @@ -13,9 +13,6 @@ public function __construct(private JobRepository $repository, private JobProces { } - /** - * {@inheritdoc} - */ public function runPendingDataflows(): void { while (null !== ($job = $this->repository->findNextPendingDataflow())) { diff --git a/src/Validator/Constraints/FrequencyValidator.php b/src/Validator/Constraints/FrequencyValidator.php index b923b68..4a03814 100644 --- a/src/Validator/Constraints/FrequencyValidator.php +++ b/src/Validator/Constraints/FrequencyValidator.php @@ -10,22 +10,19 @@ class FrequencyValidator extends ConstraintValidator { - /** - * {@inheritdoc} - */ public function validate(mixed $value, Constraint $constraint) { if (!$constraint instanceof Frequency) { throw new UnexpectedTypeException($constraint, Frequency::class); } - if (null === $value) { + if ($value === null) { return; } try { $interval = \DateInterval::createFromDateString($value); - } catch (\Exception){ + } catch (\Exception) { $interval = false; }