diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 0bda366..3c02c2b 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -10,3 +10,5 @@ jobs:
- uses: actions/checkout@v4
- uses: php-actions/composer@v6 # or alternative dependency management
- uses: php-actions/phpunit@v4
+ - name: Run PHP CS Fixer
+ run: php vendor/bin/php-cs-fixer fix --dry-run --diff
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 95b35eb..d22ccb5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,3 +6,4 @@ composer.lock
.idea
.phpunit.cache
.php-version
+.php-cs-fixer.cache
\ No newline at end of file
diff --git a/.php-cs-fixer.dist.php b/.php-cs-fixer.dist.php
new file mode 100644
index 0000000..385218d
--- /dev/null
+++ b/.php-cs-fixer.dist.php
@@ -0,0 +1,49 @@
+in('src')
+ //->in('tests')
+ ->files()->name('*.php');
+
+$config = new PhpCsFixer\Config();
+$config->setRules([
+ '@Symfony' => true,
+ '@Symfony:risky' => true,
+ '@PSR12' => true,
+ 'array_syntax' => [
+ 'syntax' => 'short',
+ ],
+ 'combine_consecutive_unsets' => true,
+ 'native_function_invocation' => [
+ 'include' => [
+ '@compiler_optimized',
+ ],
+ ],
+ 'no_extra_blank_lines' => [
+ 'tokens' => [
+ 'break',
+ 'continue',
+ 'extra',
+ 'return',
+ 'throw',
+ 'use',
+ 'parenthesis_brace_block',
+ 'square_brace_block',
+ 'curly_brace_block',
+ ],
+ ],
+ 'ordered_class_elements' => true,
+ 'ordered_imports' => true,
+ 'yoda_style' => [
+ 'equal' => false,
+ 'identical' => false,
+ 'less_and_greater' => false,
+ 'always_move_variable' => false,
+ ],
+])
+ ->setRiskyAllowed(true)
+ ->setFinder(
+ $finder
+ );
+
+return $config;
diff --git a/.php_cs.dist b/.php_cs.dist
deleted file mode 100644
index d4e742b..0000000
--- a/.php_cs.dist
+++ /dev/null
@@ -1,12 +0,0 @@
-in(__DIR__.'/src');
-
-return PhpCsFixer\Config::create()
- ->setRules([
- '@Symfony' => true,
- 'declare_strict_types' => true,
- ])
- ->setFinder($finder)
- ->setRiskyAllowed(true)
-;
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 92d1194..f0a7568 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,6 @@
+# Version 5.3.0
+* Added auto update count processed item while running job
+
# Version 5.2.0
* Added custom index for job status
diff --git a/Tests/DataflowType/Writer/CollectionWriterTest.php b/Tests/DataflowType/Writer/CollectionWriterTest.php
index 115ff0c..680897b 100644
--- a/Tests/DataflowType/Writer/CollectionWriterTest.php
+++ b/Tests/DataflowType/Writer/CollectionWriterTest.php
@@ -44,9 +44,7 @@ public function testAll()
$embeddedWriter
->expects($matcher)
->method('write')
- ->with($this->callback(function ($arg) use ($matcher, $values) {
- return $arg === $values[$matcher->numberOfInvocations() - 1];
- }))
+ ->with($this->callback(fn($arg) => $arg === $values[$matcher->numberOfInvocations() - 1]))
;
$writer = new CollectionWriter($embeddedWriter);
diff --git a/Tests/DataflowType/Writer/PortWriterAdapterTest.php b/Tests/DataflowType/Writer/PortWriterAdapterTest.php
index 9aa86e3..9cded58 100644
--- a/Tests/DataflowType/Writer/PortWriterAdapterTest.php
+++ b/Tests/DataflowType/Writer/PortWriterAdapterTest.php
@@ -11,7 +11,7 @@ public function testAll()
{
$value = 'not an array';
- $writer = $this->getMockBuilder('\Port\Writer')
+ $writer = $this->getMockBuilder(\Port\Writer::class)
->onlyMethods(['prepare', 'finish', 'writeItem'])
->getMock()
;
diff --git a/Tests/Manager/ScheduledDataflowManagerTest.php b/Tests/Manager/ScheduledDataflowManagerTest.php
index 8a33d84..335e200 100644
--- a/Tests/Manager/ScheduledDataflowManagerTest.php
+++ b/Tests/Manager/ScheduledDataflowManagerTest.php
@@ -49,15 +49,10 @@ public function testCreateJobsFromScheduledDataflows()
$this->jobRepository
->expects($matcher)
->method('findPendingForScheduledDataflow')
- ->with($this->callback(function ($arg) use ($matcher, $scheduled1, $scheduled2) {
- switch ($matcher->numberOfInvocations()) {
- case 1:
- return $arg === $scheduled1;
- case 2:
- return $arg === $scheduled2;
- default:
- return false;
- }
+ ->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
+ 1 => $arg === $scheduled1,
+ 2 => $arg === $scheduled2,
+ default => false,
}))
->willReturnOnConsecutiveCalls(new Job(), null)
;
diff --git a/Tests/Processor/JobProcessorTest.php b/Tests/Processor/JobProcessorTest.php
index ab112de..f1879ee 100644
--- a/Tests/Processor/JobProcessorTest.php
+++ b/Tests/Processor/JobProcessorTest.php
@@ -44,18 +44,11 @@ public function testProcess()
->expects($matcher)
->method('dispatch')
->with(
- $this->callback(function ($arg) use ($job) {
- return $arg instanceof ProcessingEvent && $arg->getJob() === $job;
- }),
- $this->callback(function ($arg) use ($matcher) {
- switch ($matcher->numberOfInvocations()) {
- case 1:
- return $arg === Events::BEFORE_PROCESSING;
- case 2:
- return $arg === Events::AFTER_PROCESSING;
- default:
- return false;
- }
+ $this->callback(fn($arg) => $arg instanceof ProcessingEvent && $arg->getJob() === $job),
+ $this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
+ 1 => $arg === Events::BEFORE_PROCESSING,
+ 2 => $arg === Events::AFTER_PROCESSING,
+ default => false,
})
);
diff --git a/Tests/Runner/MessengerDataflowRunnerTest.php b/Tests/Runner/MessengerDataflowRunnerTest.php
index 6a17726..347eb26 100644
--- a/Tests/Runner/MessengerDataflowRunnerTest.php
+++ b/Tests/Runner/MessengerDataflowRunnerTest.php
@@ -39,15 +39,10 @@ public function testRunPendingDataflows()
$this->repository
->expects($matcher)
->method('save')
- ->with($this->callback(function ($arg) use ($matcher, $job1, $job2) {
- switch ($matcher->numberOfInvocations()) {
- case 1:
- return $arg === $job1;
- case 2:
- return $arg === $job2;
- default:
- return false;
- }
+ ->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
+ 1 => $arg === $job1,
+ 2 => $arg === $job2,
+ default => false,
}))
;
@@ -55,15 +50,10 @@ public function testRunPendingDataflows()
$this->bus
->expects($matcher)
->method('dispatch')
- ->with($this->callback(function ($arg) use ($matcher, $id1, $id2) {
- switch ($matcher->numberOfInvocations()) {
- case 1:
- return $arg instanceof JobMessage && $arg->getJobId() === $id1;
- case 2:
- return $arg instanceof JobMessage && $arg->getJobId() === $id2;
- default:
- return false;
- }
+ ->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
+ 1 => $arg instanceof JobMessage && $arg->getJobId() === $id1,
+ 2 => $arg instanceof JobMessage && $arg->getJobId() === $id2,
+ default => false,
}))
->willReturnOnConsecutiveCalls(
new Envelope(new JobMessage($id1)),
diff --git a/Tests/Runner/PendingDataflowRunnerTest.php b/Tests/Runner/PendingDataflowRunnerTest.php
index 383d081..37183df 100644
--- a/Tests/Runner/PendingDataflowRunnerTest.php
+++ b/Tests/Runner/PendingDataflowRunnerTest.php
@@ -38,15 +38,10 @@ public function testRunPendingDataflows()
$this->processor
->expects($matcher)
->method('process')
- ->with($this->callback(function ($arg) use ($matcher, $job1, $job2) {
- switch ($matcher->numberOfInvocations()) {
- case 1:
- return $arg === $job1;
- case 2:
- return $arg === $job2;
- default:
- return false;
- }
+ ->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
+ 1 => $arg === $job1,
+ 2 => $arg === $job2,
+ default => false,
}))
;
diff --git a/composer.json b/composer.json
index 7a872d5..393f4b0 100644
--- a/composer.json
+++ b/composer.json
@@ -60,9 +60,10 @@
},
"require-dev": {
"amphp/amp": "^2.5",
+ "friendsofphp/php-cs-fixer": "^3.75",
"phpunit/phpunit": "^11",
"portphp/portphp": "^1.9",
- "rector/rector": "^1.0",
+ "rector/rector": "^2.0",
"symfony/messenger": "^7.0"
},
"suggest": {
diff --git a/rector.php b/rector.php
index 09c0a0f..92cd240 100644
--- a/rector.php
+++ b/rector.php
@@ -17,7 +17,7 @@
$rectorConfig->rule(InlineConstructorDefaultToPropertyRector::class);
$rectorConfig->sets([
- SymfonySetList::SYMFONY_60,
+ SymfonySetList::SYMFONY_70,
SymfonySetList::SYMFONY_CODE_QUALITY,
SymfonySetList::SYMFONY_CONSTRUCTOR_INJECTION,
LevelSetList::UP_TO_PHP_80,
diff --git a/src/Command/AddScheduledDataflowCommand.php b/src/Command/AddScheduledDataflowCommand.php
index 807fbcb..7a895ca 100644
--- a/src/Command/AddScheduledDataflowCommand.php
+++ b/src/Command/AddScheduledDataflowCommand.php
@@ -27,29 +27,27 @@ public function __construct(private DataflowTypeRegistryInterface $registry, pri
parent::__construct();
}
- /**
- * {@inheritdoc}
- */
protected function configure(): void
{
$this
->setHelp('The %command.name% allows you to create a new scheduled dataflow.')
->addOption('label', null, InputOption::VALUE_REQUIRED, 'Label of the scheduled dataflow')
->addOption('type', null, InputOption::VALUE_REQUIRED, 'Type of the scheduled dataflow (FQCN)')
- ->addOption('options', null, InputOption::VALUE_OPTIONAL,
- 'Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})')
+ ->addOption(
+ 'options',
+ null,
+ InputOption::VALUE_OPTIONAL,
+ 'Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})'
+ )
->addOption('frequency', null, InputOption::VALUE_REQUIRED, 'Frequency of the scheduled dataflow')
->addOption('first_run', null, InputOption::VALUE_REQUIRED, 'Date for the first run of the scheduled dataflow (Y-m-d H:i:s)')
->addOption('enabled', null, InputOption::VALUE_REQUIRED, 'State of the scheduled dataflow')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
- /**
- * {@inheritdoc}
- */
protected function execute(InputInterface $input, OutputInterface $output): int
{
- if (null !== $input->getOption('connection')) {
+ if ($input->getOption('connection') !== null) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$choices = [];
@@ -71,13 +69,17 @@ protected function execute(InputInterface $input, OutputInterface $output): int
}
$options = $input->getOption('options');
if (!$options) {
- $options = $io->ask('What are the launch options for the scheduled dataflow? (ex: {"option1": "value1", "option2": "value2"})',
- json_encode([]));
+ $options = $io->ask(
+ 'What are the launch options for the scheduled dataflow? (ex: {"option1": "value1", "option2": "value2"})',
+ json_encode([])
+ );
}
$frequency = $input->getOption('frequency');
if (!$frequency) {
- $frequency = $io->choice('What is the frequency for the scheduled dataflow?',
- ScheduledDataflow::AVAILABLE_FREQUENCIES);
+ $frequency = $io->choice(
+ 'What is the frequency for the scheduled dataflow?',
+ ScheduledDataflow::AVAILABLE_FREQUENCIES
+ );
}
$firstRun = $input->getOption('first_run');
if (!$firstRun) {
@@ -92,22 +94,25 @@ protected function execute(InputInterface $input, OutputInterface $output): int
'id' => null,
'label' => $label,
'dataflow_type' => $type,
- 'options' => json_decode($options, true, 512, JSON_THROW_ON_ERROR),
+ 'options' => json_decode($options, true, 512, \JSON_THROW_ON_ERROR),
'frequency' => $frequency,
'next' => new \DateTime($firstRun),
'enabled' => $enabled,
]);
$errors = $this->validator->validate($newScheduledDataflow);
- if (count($errors) > 0) {
+ if (\count($errors) > 0) {
$io->error((string) $errors);
return 2;
}
$this->scheduledDataflowRepository->save($newScheduledDataflow);
- $io->success(sprintf('New scheduled dataflow "%s" (id:%d) was created successfully.',
- $newScheduledDataflow->getLabel(), $newScheduledDataflow->getId()));
+ $io->success(\sprintf(
+ 'New scheduled dataflow "%s" (id:%d) was created successfully.',
+ $newScheduledDataflow->getLabel(),
+ $newScheduledDataflow->getId()
+ ));
return 0;
}
diff --git a/src/Command/ChangeScheduleStatusCommand.php b/src/Command/ChangeScheduleStatusCommand.php
index cca2565..35735fc 100644
--- a/src/Command/ChangeScheduleStatusCommand.php
+++ b/src/Command/ChangeScheduleStatusCommand.php
@@ -26,9 +26,6 @@ public function __construct(private ScheduledDataflowRepository $scheduledDatafl
parent::__construct();
}
- /**
- * {@inheritdoc}
- */
protected function configure(): void
{
$this
@@ -39,12 +36,9 @@ protected function configure(): void
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
- /**
- * {@inheritdoc}
- */
protected function execute(InputInterface $input, OutputInterface $output): int
{
- if (null !== $input->getOption('connection')) {
+ if ($input->getOption('connection') !== null) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$io = new SymfonyStyle($input, $output);
@@ -52,7 +46,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$schedule = $this->scheduledDataflowRepository->find((int) $input->getArgument('schedule-id'));
if (!$schedule) {
- $io->error(sprintf('Cannot find scheduled dataflow with id "%d".', $input->getArgument('schedule-id')));
+ $io->error(\sprintf('Cannot find scheduled dataflow with id "%d".', $input->getArgument('schedule-id')));
return 1;
}
@@ -71,9 +65,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int
try {
$schedule->setEnabled($input->getOption('enable'));
$this->scheduledDataflowRepository->save($schedule);
- $io->success(sprintf('Schedule with id "%s" has been successfully updated.', $schedule->getId()));
+ $io->success(\sprintf('Schedule with id "%s" has been successfully updated.', $schedule->getId()));
} catch (\Exception $e) {
- $io->error(sprintf('An error occured when changing schedule status : "%s".', $e->getMessage()));
+ $io->error(\sprintf('An error occured when changing schedule status : "%s".', $e->getMessage()));
return 4;
}
diff --git a/src/Command/DatabaseSchemaCommand.php b/src/Command/DatabaseSchemaCommand.php
index 0371368..464df39 100644
--- a/src/Command/DatabaseSchemaCommand.php
+++ b/src/Command/DatabaseSchemaCommand.php
@@ -27,9 +27,6 @@ public function __construct(private ConnectionFactory $connectionFactory)
parent::__construct();
}
- /**
- * {@inheritdoc}
- */
protected function configure(): void
{
$this
@@ -43,7 +40,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
- if (null !== $input->getOption('connection')) {
+ if ($input->getOption('connection') !== null) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
@@ -61,7 +58,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$tables = [];
foreach ($sm->listTables() as $table) {
/** @var Table $table */
- if (in_array($table->getName(), $tableArray)) {
+ if (\in_array($table->getName(), $tableArray)) {
$tables[] = $table;
}
}
@@ -90,14 +87,14 @@ protected function execute(InputInterface $input, OutputInterface $output): int
if ($input->getOption('dump-sql')) {
$io->text('Execute these SQL Queries on your database:');
foreach ($sqls as $sql) {
- $io->text($sql . ';');
+ $io->text($sql.';');
}
return Command::SUCCESS;
}
if (!$io->askQuestion(new ConfirmationQuestion('Are you sure to update database ?', true))) {
- $io->text("Execution canceled.");
+ $io->text('Execution canceled.');
return Command::SUCCESS;
}
@@ -106,7 +103,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$connection->executeQuery($sql);
}
- $io->success(sprintf('%d queries executed.', \count($sqls)));
+ $io->success(\sprintf('%d queries executed.', \count($sqls)));
return parent::SUCCESS;
}
diff --git a/src/Command/ExecuteDataflowCommand.php b/src/Command/ExecuteDataflowCommand.php
index 0f7fa8e..fa99cd5 100644
--- a/src/Command/ExecuteDataflowCommand.php
+++ b/src/Command/ExecuteDataflowCommand.php
@@ -4,8 +4,10 @@
namespace CodeRhapsodie\DataflowBundle\Command;
+use CodeRhapsodie\DataflowBundle\DataflowType\AutoUpdateCountInterface;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
+use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Symfony\Component\Console\Attribute\AsCommand;
@@ -26,18 +28,16 @@ class ExecuteDataflowCommand extends Command implements LoggerAwareInterface
{
use LoggerAwareTrait;
- public function __construct(private DataflowTypeRegistryInterface $registry, private ConnectionFactory $connectionFactory)
+ public function __construct(private DataflowTypeRegistryInterface $registry, private ConnectionFactory $connectionFactory, private JobRepository $jobRepository)
{
parent::__construct();
}
- /**
- * {@inheritdoc}
- */
protected function configure(): void
{
$this
- ->setHelp(<<<'EOF'
+ ->setHelp(
+ <<<'EOF'
The %command.name% command runs one dataflow with the provided options.
php %command.full_name% App\Dataflow\MyDataflow '{"option1": "value1", "option2": "value2"}'
@@ -48,19 +48,20 @@ protected function configure(): void
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
- /**
- * {@inheritdoc}
- */
protected function execute(InputInterface $input, OutputInterface $output): int
{
- if (null !== $input->getOption('connection')) {
+ if ($input->getOption('connection') !== null) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$fqcnOrAlias = $input->getArgument('fqcn');
- $options = json_decode($input->getArgument('options'), true, 512, JSON_THROW_ON_ERROR);
+ $options = json_decode($input->getArgument('options'), true, 512, \JSON_THROW_ON_ERROR);
$io = new SymfonyStyle($input, $output);
$dataflowType = $this->registry->getDataflowType($fqcnOrAlias);
+ if ($dataflowType instanceof AutoUpdateCountInterface) {
+ $dataflowType->setRepository($this->jobRepository);
+ }
+
if ($dataflowType instanceof LoggerAwareInterface && isset($this->logger)) {
$dataflowType->setLogger($this->logger);
}
diff --git a/src/Command/JobShowCommand.php b/src/Command/JobShowCommand.php
index 0bde6ec..67e5412 100644
--- a/src/Command/JobShowCommand.php
+++ b/src/Command/JobShowCommand.php
@@ -31,9 +31,6 @@ public function __construct(private JobRepository $jobRepository, private Connec
parent::__construct();
}
- /**
- * {@inheritdoc}
- */
protected function configure(): void
{
$this
@@ -44,12 +41,9 @@ protected function configure(): void
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
- /**
- * {@inheritdoc}
- */
protected function execute(InputInterface $input, OutputInterface $output): int
{
- if (null !== $input->getOption('connection')) {
+ if ($input->getOption('connection') !== null) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
@@ -73,7 +67,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
return 2;
}
- if (null === $job) {
+ if ($job === null) {
$io->error('Cannot find job :/');
return 3;
@@ -87,19 +81,19 @@ protected function execute(InputInterface $input, OutputInterface $output): int
['Started at', $job->getStartTime() ? $job->getStartTime()->format('Y-m-d H:i:s') : '-'],
['Ended at', $job->getEndTime() ? $job->getEndTime()->format('Y-m-d H:i:s') : '-'],
['Object number', $job->getCount()],
- ['Errors', count((array) $job->getExceptions())],
+ ['Errors', \count((array) $job->getExceptions())],
['Status', $this->translateStatus($job->getStatus())],
];
if ($input->getOption('details')) {
$display[] = ['Type', $job->getDataflowType()];
- $display[] = ['Options', json_encode($job->getOptions(), JSON_THROW_ON_ERROR)];
+ $display[] = ['Options', json_encode($job->getOptions(), \JSON_THROW_ON_ERROR)];
$io->section('Summary');
}
$io->table(['Field', 'Value'], $display);
if ($input->getOption('details')) {
$io->section('Exceptions');
- $exceptions = array_map(fn(string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());
+ $exceptions = array_map(fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());
$io->write($exceptions);
}
diff --git a/src/Command/RunPendingDataflowsCommand.php b/src/Command/RunPendingDataflowsCommand.php
index cc2045a..a08cae5 100644
--- a/src/Command/RunPendingDataflowsCommand.php
+++ b/src/Command/RunPendingDataflowsCommand.php
@@ -29,22 +29,17 @@ public function __construct(private ScheduledDataflowManagerInterface $manager,
parent::__construct();
}
- /**
- * {@inheritdoc}
- */
protected function configure(): void
{
$this
- ->setHelp(<<<'EOF'
+ ->setHelp(
+ <<<'EOF'
The %command.name% command runs dataflows according to the schedule defined in the UI by the user.
EOF
)
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
- /**
- * {@inheritdoc}
- */
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (!$this->lock()) {
@@ -53,7 +48,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
return 0;
}
- if (null !== $input->getOption('connection')) {
+ if ($input->getOption('connection') !== null) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
diff --git a/src/Command/ScheduleListCommand.php b/src/Command/ScheduleListCommand.php
index ee08524..d4a187c 100644
--- a/src/Command/ScheduleListCommand.php
+++ b/src/Command/ScheduleListCommand.php
@@ -24,9 +24,6 @@ public function __construct(private ScheduledDataflowRepository $scheduledDatafl
parent::__construct();
}
- /**
- * {@inheritdoc}
- */
protected function configure(): void
{
$this
@@ -34,12 +31,9 @@ protected function configure(): void
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
- /**
- * {@inheritdoc}
- */
protected function execute(InputInterface $input, OutputInterface $output): int
{
- if (null !== $input->getOption('connection')) {
+ if ($input->getOption('connection') !== null) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$io = new SymfonyStyle($input, $output);
diff --git a/src/Command/SchemaCommand.php b/src/Command/SchemaCommand.php
index 39fe0bd..237724c 100644
--- a/src/Command/SchemaCommand.php
+++ b/src/Command/SchemaCommand.php
@@ -4,13 +4,6 @@
namespace CodeRhapsodie\DataflowBundle\Command;
-use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
-use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
-use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
-use CodeRhapsodie\DataflowBundle\SchemaProvider\DataflowSchemaProvider;
-use Doctrine\DBAL\Schema\Comparator;
-use Doctrine\DBAL\Schema\Schema;
-use Doctrine\DBAL\Schema\Table;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\ArrayInput;
@@ -21,14 +14,12 @@
/**
* @codeCoverageIgnore
+ *
* @deprecated This command is deprecated and will be removed in 6.0, use this command "code-rhapsodie:dataflow:database-schema" instead.
*/
#[AsCommand('code-rhapsodie:dataflow:dump-schema', 'Generates schema create / update SQL queries')]
class SchemaCommand extends Command
{
- /**
- * {@inheritdoc}
- */
protected function configure(): void
{
$this
@@ -38,9 +29,6 @@ protected function configure(): void
;
}
- /**
- * {@inheritdoc}
- */
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
@@ -48,9 +36,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$options = array_filter($input->getOptions());
- //add -- before each keys
+ // add -- before each keys
$options = array_combine(
- array_map(fn($key) => '--' . $key, array_keys($options)),
+ array_map(fn ($key) => '--'.$key, array_keys($options)),
array_values($options)
);
@@ -58,7 +46,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$inputArray = new ArrayInput([
'command' => 'code-rhapsodie:dataflow:database-schema',
- ...$options
+ ...$options,
]);
return $this->getApplication()->doRun($inputArray, $output);
diff --git a/src/DataflowType/AMPAsyncDataflowBuilder.php b/src/DataflowType/AMPAsyncDataflowBuilder.php
index 7f17b96..b50eebf 100644
--- a/src/DataflowType/AMPAsyncDataflowBuilder.php
+++ b/src/DataflowType/AMPAsyncDataflowBuilder.php
@@ -7,7 +7,6 @@
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\AMPAsyncDataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\DataflowInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
-use Symfony\Component\OptionsResolver\OptionsResolver;
class AMPAsyncDataflowBuilder extends DataflowBuilder
{
diff --git a/src/DataflowType/AbstractDataflowType.php b/src/DataflowType/AbstractDataflowType.php
index 57dc7ca..543b173 100644
--- a/src/DataflowType/AbstractDataflowType.php
+++ b/src/DataflowType/AbstractDataflowType.php
@@ -4,15 +4,20 @@
namespace CodeRhapsodie\DataflowBundle\DataflowType;
+use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
-abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface
+abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface, AutoUpdateCountInterface
{
use LoggerAwareTrait;
+ private JobRepository $repository;
+
+ private ?\DateTime $saveDate = null;
+
/**
* @codeCoverageIgnore
*/
@@ -21,14 +26,24 @@ public function getAliases(): iterable
return [];
}
- public function process(array $options): Result
+ public function process(array $options, ?int $jobId = null): Result
{
+ $this->saveDate = new \DateTime('+1 minute');
+
$optionsResolver = new OptionsResolver();
$this->configureOptions($optionsResolver);
$options = $optionsResolver->resolve($options);
$builder = $this->createDataflowBuilder();
$builder->setName($this->getLabel());
+ $builder->addAfterItemProcessor(function (int|string $index, mixed $item, int $count) use ($jobId) {
+ if ($jobId === null || $this->saveDate > new \DateTime()) {
+ return;
+ }
+
+ $this->repository->updateCount($jobId, $count);
+ $this->saveDate = new \DateTime('+1 minute');
+ });
$this->buildDataflow($builder, $options);
$dataflow = $builder->getDataflow();
if ($dataflow instanceof LoggerAwareInterface && $this->logger instanceof LoggerInterface) {
@@ -38,6 +53,11 @@ public function process(array $options): Result
return $dataflow->process();
}
+ public function setRepository(JobRepository $repository): void
+ {
+ $this->repository = $repository;
+ }
+
protected function createDataflowBuilder(): DataflowBuilder
{
return new DataflowBuilder();
diff --git a/src/DataflowType/AutoUpdateCountInterface.php b/src/DataflowType/AutoUpdateCountInterface.php
new file mode 100644
index 0000000..ccdfdb1
--- /dev/null
+++ b/src/DataflowType/AutoUpdateCountInterface.php
@@ -0,0 +1,12 @@
+isResolved() in version 2.5
+ $resolved = false; // missing $deferred->isResolved() in version 2.5
$producer = new Producer(function (callable $emit) {
foreach ($this->reader as $index => $item) {
yield new Delayed($this->emitInterval);
@@ -89,7 +87,7 @@ public function process(): Result
$it = $producer->getCurrent();
[$index, $item] = $it;
$this->states[$index] = [$index, 0, $item];
- } elseif (!$resolved && 0 === count($this->states)) {
+ } elseif (!$resolved && \count($this->states) === 0) {
$resolved = true;
$deferred->resolve();
}
@@ -120,20 +118,20 @@ public function process(): Result
private function processState(mixed $state, int &$count, array &$exceptions): void
{
[$readIndex, $stepIndex, $item] = $state;
- if ($stepIndex < count($this->steps)) {
+ if ($stepIndex < \count($this->steps)) {
if (!isset($this->stepsJobs[$stepIndex])) {
$this->stepsJobs[$stepIndex] = [];
}
[$step, $scale] = $this->steps[$stepIndex];
- if ((is_countable($this->stepsJobs[$stepIndex]) ? count($this->stepsJobs[$stepIndex]) : 0) < $scale && !isset($this->stepsJobs[$stepIndex][$readIndex])) {
+ if ((is_countable($this->stepsJobs[$stepIndex]) ? \count($this->stepsJobs[$stepIndex]) : 0) < $scale && !isset($this->stepsJobs[$stepIndex][$readIndex])) {
$this->stepsJobs[$stepIndex][$readIndex] = true;
/** @var Promise $promise */
$promise = coroutine($step)($item);
- $promise->onResolve(function (?Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$exceptions) {
+ $promise->onResolve(function (?\Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$exceptions) {
if ($exception) {
$exceptions[$stepIndex] = $exception;
$this->logException($exception, (string) $stepIndex);
- } elseif (false === $newItem) {
+ } elseif ($newItem === false) {
unset($this->states[$readIndex]);
} else {
$this->states[$readIndex] = [$readIndex, $stepIndex + 1, $newItem];
@@ -153,7 +151,7 @@ private function processState(mixed $state, int &$count, array &$exceptions): vo
}
}
- private function logException(Throwable $e, ?string $index = null): void
+ private function logException(\Throwable $e, ?string $index = null): void
{
if (!isset($this->logger)) {
return;
diff --git a/src/DataflowType/Dataflow/Dataflow.php b/src/DataflowType/Dataflow/Dataflow.php
index 11a6866..dbbb761 100644
--- a/src/DataflowType/Dataflow/Dataflow.php
+++ b/src/DataflowType/Dataflow/Dataflow.php
@@ -21,6 +21,13 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
private ?\Closure $customExceptionIndex = null;
+ private ?\DateTimeInterface $dateTime = null;
+
+ /**
+ * @var \Closure[]
+ */
+ private array $afterItemProcessors = [];
+
public function __construct(private iterable $reader, private ?string $name)
{
}
@@ -56,8 +63,15 @@ public function setCustomExceptionIndex(callable $callable): self
}
/**
- * {@inheritdoc}
+ * @param array $processors
*/
+ public function setAfterItemProcessors(array $processors): self
+ {
+ $this->afterItemProcessors = array_map(fn (callable $callable) => \Closure::fromCallable($callable), $processors);
+
+ return $this;
+ }
+
public function process(): Result
{
$count = 0;
@@ -75,7 +89,7 @@ public function process(): Result
} catch (\Throwable $e) {
$exceptionIndex = $index;
try {
- if (is_callable($this->customExceptionIndex)) {
+ if (\is_callable($this->customExceptionIndex)) {
$exceptionIndex = (string) ($this->customExceptionIndex)($item, $index);
}
} catch (\Throwable $e2) {
@@ -87,6 +101,10 @@ public function process(): Result
}
++$count;
+
+ foreach ($this->afterItemProcessors as $afterItemProcessor) {
+ $afterItemProcessor($index, $item, $count);
+ }
}
foreach ($this->writers as $writer) {
@@ -103,9 +121,9 @@ public function process(): Result
private function processItem(mixed $item): void
{
foreach ($this->steps as $step) {
- $item = call_user_func($step, $item);
+ $item = \call_user_func($step, $item);
- if (false === $item) {
+ if ($item === false) {
return;
}
}
diff --git a/src/DataflowType/DataflowBuilder.php b/src/DataflowType/DataflowBuilder.php
index acf3b23..fab6c0b 100644
--- a/src/DataflowType/DataflowBuilder.php
+++ b/src/DataflowType/DataflowBuilder.php
@@ -18,6 +18,10 @@ class DataflowBuilder
private array $writers = [];
private ?\Closure $customExceptionIndex = null;
+ /**
+ * @var \Closure[]
+ */
+ private array $afterItemProcessors = [];
public function setName(string $name): self
{
@@ -54,6 +58,13 @@ public function setCustomExceptionIndex(callable $callable): self
return $this;
}
+ public function addAfterItemProcessor(callable $callable): self
+ {
+ $this->afterItemProcessors[] = \Closure::fromCallable($callable);
+
+ return $this;
+ }
+
public function getDataflow(): DataflowInterface
{
$dataflow = new Dataflow($this->reader, $this->name);
@@ -69,10 +80,12 @@ public function getDataflow(): DataflowInterface
$dataflow->addWriter($writer);
}
- if (is_callable($this->customExceptionIndex)) {
+ if (\is_callable($this->customExceptionIndex)) {
$dataflow->setCustomExceptionIndex($this->customExceptionIndex);
}
+ $dataflow->setAfterItemProcessors($this->afterItemProcessors);
+
return $dataflow;
}
}
diff --git a/src/DataflowType/DataflowTypeInterface.php b/src/DataflowType/DataflowTypeInterface.php
index 079132c..d64b111 100644
--- a/src/DataflowType/DataflowTypeInterface.php
+++ b/src/DataflowType/DataflowTypeInterface.php
@@ -10,5 +10,5 @@ public function getLabel(): string;
public function getAliases(): iterable;
- public function process(array $options): Result;
+ public function process(array $options, ?int $jobId = null): Result;
}
diff --git a/src/DataflowType/Result.php b/src/DataflowType/Result.php
index 6bd8bf3..5c8716b 100644
--- a/src/DataflowType/Result.php
+++ b/src/DataflowType/Result.php
@@ -20,7 +20,7 @@ class Result
public function __construct(private string $name, private \DateTimeInterface $startTime, private \DateTimeInterface $endTime, private int $totalProcessedCount, array $exceptions)
{
$this->elapsed = $startTime->diff($endTime);
- $this->errorCount = count($exceptions);
+ $this->errorCount = \count($exceptions);
$this->successCount = $totalProcessedCount - $this->errorCount;
$this->exceptions = $exceptions;
}
diff --git a/src/DataflowType/Writer/CollectionWriter.php b/src/DataflowType/Writer/CollectionWriter.php
index e0e4100..704e675 100644
--- a/src/DataflowType/Writer/CollectionWriter.php
+++ b/src/DataflowType/Writer/CollectionWriter.php
@@ -18,21 +18,15 @@ public function __construct(private WriterInterface $writer)
{
}
- /**
- * {@inheritdoc}
- */
public function prepare()
{
$this->writer->prepare();
}
- /**
- * {@inheritdoc}
- */
public function write($collection)
{
if (!is_iterable($collection)) {
- throw new UnsupportedItemTypeException(sprintf('Item to write was expected to be an iterable, received %s.', get_debug_type($collection)));
+ throw new UnsupportedItemTypeException(\sprintf('Item to write was expected to be an iterable, received %s.', get_debug_type($collection)));
}
foreach ($collection as $item) {
@@ -40,17 +34,11 @@ public function write($collection)
}
}
- /**
- * {@inheritdoc}
- */
public function finish()
{
$this->writer->finish();
}
- /**
- * {@inheritdoc}
- */
public function supports($item): bool
{
return is_iterable($item);
diff --git a/src/DataflowType/Writer/DelegateWriterInterface.php b/src/DataflowType/Writer/DelegateWriterInterface.php
index 6d3463b..1ea8c77 100644
--- a/src/DataflowType/Writer/DelegateWriterInterface.php
+++ b/src/DataflowType/Writer/DelegateWriterInterface.php
@@ -11,8 +11,6 @@ interface DelegateWriterInterface extends WriterInterface
{
/**
* Returns true if the argument is of a supported type.
- *
- * @param $item
*/
public function supports($item): bool;
}
diff --git a/src/DataflowType/Writer/DelegatorWriter.php b/src/DataflowType/Writer/DelegatorWriter.php
index 522f650..de2ada3 100644
--- a/src/DataflowType/Writer/DelegatorWriter.php
+++ b/src/DataflowType/Writer/DelegatorWriter.php
@@ -21,9 +21,6 @@ public function __construct()
{
}
- /**
- * {@inheritdoc}
- */
public function prepare()
{
foreach ($this->delegates as $delegate) {
@@ -31,9 +28,6 @@ public function prepare()
}
}
- /**
- * {@inheritdoc}
- */
public function write($item)
{
foreach ($this->delegates as $delegate) {
@@ -46,12 +40,9 @@ public function write($item)
return;
}
- throw new UnsupportedItemTypeException(sprintf('None of the registered delegate writers support the received item of type %s', get_debug_type($item)));
+ throw new UnsupportedItemTypeException(\sprintf('None of the registered delegate writers support the received item of type %s', get_debug_type($item)));
}
- /**
- * {@inheritdoc}
- */
public function finish()
{
foreach ($this->delegates as $delegate) {
@@ -59,9 +50,6 @@ public function finish()
}
}
- /**
- * {@inheritdoc}
- */
public function supports($item): bool
{
foreach ($this->delegates as $delegate) {
diff --git a/src/DependencyInjection/Compiler/BusCompilerPass.php b/src/DependencyInjection/Compiler/BusCompilerPass.php
index 4021449..5a8114f 100644
--- a/src/DependencyInjection/Compiler/BusCompilerPass.php
+++ b/src/DependencyInjection/Compiler/BusCompilerPass.php
@@ -12,9 +12,6 @@
class BusCompilerPass implements CompilerPassInterface
{
- /**
- * {@inheritdoc}
- */
public function process(ContainerBuilder $container)
{
if (!$container->hasParameter('coderhapsodie.dataflow.bus')) {
@@ -23,7 +20,7 @@ public function process(ContainerBuilder $container)
$bus = $container->getParameter('coderhapsodie.dataflow.bus');
if (!$container->has($bus)) {
- throw new InvalidArgumentException(sprintf('Service "%s" not found', $bus));
+ throw new InvalidArgumentException(\sprintf('Service "%s" not found', $bus));
}
if (!$container->has(MessengerDataflowRunner::class)) {
diff --git a/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php b/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php
index 808abf6..2c0ba24 100644
--- a/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php
+++ b/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php
@@ -16,9 +16,6 @@
*/
class DataflowTypeCompilerPass implements CompilerPassInterface
{
- /**
- * {@inheritdoc}
- */
public function process(ContainerBuilder $container)
{
if (!$container->has(DataflowTypeRegistry::class)) {
diff --git a/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php b/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php
index 96e5b9c..e3c3a41 100644
--- a/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php
+++ b/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php
@@ -12,9 +12,6 @@
class DefaultLoggerCompilerPass implements CompilerPassInterface
{
- /**
- * {@inheritdoc}
- */
public function process(ContainerBuilder $container)
{
$defaultLogger = $container->getParameter('coderhapsodie.dataflow.default_logger');
diff --git a/src/DependencyInjection/Configuration.php b/src/DependencyInjection/Configuration.php
index d8c1c68..a4bf333 100644
--- a/src/DependencyInjection/Configuration.php
+++ b/src/DependencyInjection/Configuration.php
@@ -10,7 +10,7 @@
class Configuration implements ConfigurationInterface
{
- public function getConfigTreeBuilder(): \Symfony\Component\Config\Definition\Builder\TreeBuilder
+ public function getConfigTreeBuilder(): TreeBuilder
{
$treeBuilder = new TreeBuilder('code_rhapsodie_dataflow');
$rootNode = $treeBuilder->getRootNode();
@@ -34,7 +34,7 @@ public function getConfigTreeBuilder(): \Symfony\Component\Config\Definition\Bui
->end()
->end()
->validate()
- ->ifTrue(static fn($v): bool => $v['enabled'] && !interface_exists(MessageBusInterface::class))
+ ->ifTrue(static fn ($v): bool => $v['enabled'] && !interface_exists(MessageBusInterface::class))
->thenInvalid('You need "symfony/messenger" in order to use Dataflow messenger mode.')
->end()
->end()
diff --git a/src/Entity/Job.php b/src/Entity/Job.php
index 571d7bf..cd931a6 100644
--- a/src/Entity/Job.php
+++ b/src/Entity/Job.php
@@ -75,19 +75,19 @@ public static function createFromScheduledDataflow(ScheduledDataflow $scheduled)
public static function createFromArray(array $datas)
{
$lost = array_diff(static::KEYS, array_keys($datas));
- if (count($lost) > 0) {
+ if (\count($lost) > 0) {
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"');
}
$job = new self();
- $job->id = null === $datas['id'] ? null : (int) $datas['id'];
- $job->setStatus(null === $datas['status'] ? null : (int) $datas['status']);
+ $job->id = $datas['id'] === null ? null : (int) $datas['id'];
+ $job->setStatus($datas['status'] === null ? null : (int) $datas['status']);
$job->setLabel($datas['label']);
$job->setDataflowType($datas['dataflow_type']);
$job->setOptions($datas['options']);
$job->setRequestedDate($datas['requested_date']);
- $job->setScheduledDataflowId(null === $datas['scheduled_dataflow_id'] ? null : (int) $datas['scheduled_dataflow_id']);
- $job->setCount(null === $datas['count'] ? null : (int) $datas['count']);
+ $job->setScheduledDataflowId($datas['scheduled_dataflow_id'] === null ? null : (int) $datas['scheduled_dataflow_id']);
+ $job->setCount($datas['count'] === null ? null : (int) $datas['count']);
$job->setExceptions($datas['exceptions']);
$job->setStartTime($datas['start_time']);
$job->setEndTime($datas['end_time']);
@@ -112,7 +112,7 @@ public function toArray(): array
];
}
- public function setId(int $id): Job
+ public function setId(int $id): self
{
$this->id = $id;
@@ -129,7 +129,7 @@ public function getStatus(): int
return $this->status;
}
- public function setStatus(int $status): Job
+ public function setStatus(int $status): self
{
$this->status = $status;
@@ -141,7 +141,7 @@ public function getLabel(): ?string
return $this->label;
}
- public function setLabel(?string $label): Job
+ public function setLabel(?string $label): self
{
$this->label = $label;
@@ -153,7 +153,7 @@ public function getDataflowType(): ?string
return $this->dataflowType;
}
- public function setDataflowType(?string $dataflowType): Job
+ public function setDataflowType(?string $dataflowType): self
{
$this->dataflowType = $dataflowType;
@@ -165,7 +165,7 @@ public function getOptions(): ?array
return $this->options;
}
- public function setOptions(?array $options): Job
+ public function setOptions(?array $options): self
{
$this->options = $options;
@@ -177,7 +177,7 @@ public function getRequestedDate(): ?\DateTimeInterface
return $this->requestedDate;
}
- public function setRequestedDate(?\DateTimeInterface $requestedDate): Job
+ public function setRequestedDate(?\DateTimeInterface $requestedDate): self
{
$this->requestedDate = $requestedDate;
@@ -189,7 +189,7 @@ public function getScheduledDataflowId(): ?int
return $this->scheduledDataflowId;
}
- public function setScheduledDataflowId(?int $scheduledDataflowId): Job
+ public function setScheduledDataflowId(?int $scheduledDataflowId): self
{
$this->scheduledDataflowId = $scheduledDataflowId;
@@ -201,7 +201,7 @@ public function getCount(): ?int
return $this->count;
}
- public function setCount(?int $count): Job
+ public function setCount(?int $count): self
{
$this->count = $count;
@@ -213,7 +213,7 @@ public function getExceptions(): ?array
return $this->exceptions;
}
- public function setExceptions(?array $exceptions): Job
+ public function setExceptions(?array $exceptions): self
{
$this->exceptions = $exceptions;
@@ -225,7 +225,7 @@ public function getStartTime(): ?\DateTimeInterface
return $this->startTime;
}
- public function setStartTime(?\DateTimeInterface $startTime): Job
+ public function setStartTime(?\DateTimeInterface $startTime): self
{
$this->startTime = $startTime;
@@ -237,7 +237,7 @@ public function getEndTime(): ?\DateTimeInterface
return $this->endTime;
}
- public function setEndTime(?\DateTimeInterface $endTime): Job
+ public function setEndTime(?\DateTimeInterface $endTime): self
{
$this->endTime = $endTime;
diff --git a/src/Entity/ScheduledDataflow.php b/src/Entity/ScheduledDataflow.php
index 70b7ca0..3f138cd 100644
--- a/src/Entity/ScheduledDataflow.php
+++ b/src/Entity/ScheduledDataflow.php
@@ -50,19 +50,19 @@ class ScheduledDataflow
public static function createFromArray(array $datas)
{
$lost = array_diff(static::KEYS, array_keys($datas));
- if (count($lost) > 0) {
+ if (\count($lost) > 0) {
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"');
}
$scheduledDataflow = new self();
- $scheduledDataflow->id = null === $datas['id'] ? null : (int) $datas['id'];
+ $scheduledDataflow->id = $datas['id'] === null ? null : (int) $datas['id'];
$scheduledDataflow->setLabel($datas['label']);
$scheduledDataflow->setDataflowType($datas['dataflow_type']);
$scheduledDataflow->setOptions($datas['options']);
$scheduledDataflow->setFrequency($datas['frequency']);
$scheduledDataflow->setNext($datas['next']);
- $scheduledDataflow->setEnabled(null === $datas['enabled'] ? null : (bool) $datas['enabled']);
+ $scheduledDataflow->setEnabled($datas['enabled'] === null ? null : (bool) $datas['enabled']);
return $scheduledDataflow;
}
@@ -80,7 +80,7 @@ public function toArray(): array
];
}
- public function setId(int $id): ScheduledDataflow
+ public function setId(int $id): self
{
$this->id = $id;
@@ -97,7 +97,7 @@ public function getLabel(): ?string
return $this->label;
}
- public function setLabel(?string $label): ScheduledDataflow
+ public function setLabel(?string $label): self
{
$this->label = $label;
@@ -109,7 +109,7 @@ public function getDataflowType(): ?string
return $this->dataflowType;
}
- public function setDataflowType(?string $dataflowType): ScheduledDataflow
+ public function setDataflowType(?string $dataflowType): self
{
$this->dataflowType = $dataflowType;
@@ -121,7 +121,7 @@ public function getOptions(): ?array
return $this->options;
}
- public function setOptions(?array $options): ScheduledDataflow
+ public function setOptions(?array $options): self
{
$this->options = $options;
@@ -133,7 +133,7 @@ public function getFrequency(): ?string
return $this->frequency;
}
- public function setFrequency(?string $frequency): ScheduledDataflow
+ public function setFrequency(?string $frequency): self
{
$this->frequency = $frequency;
@@ -145,7 +145,7 @@ public function getNext(): ?\DateTimeInterface
return $this->next;
}
- public function setNext(?\DateTimeInterface $next): ScheduledDataflow
+ public function setNext(?\DateTimeInterface $next): self
{
$this->next = $next;
@@ -157,7 +157,7 @@ public function getEnabled(): ?bool
return $this->enabled;
}
- public function setEnabled(?bool $enabled): ScheduledDataflow
+ public function setEnabled(?bool $enabled): self
{
$this->enabled = $enabled;
diff --git a/src/Exceptions/UnknownDataflowTypeException.php b/src/Exceptions/UnknownDataflowTypeException.php
index 7f34343..819c9f0 100644
--- a/src/Exceptions/UnknownDataflowTypeException.php
+++ b/src/Exceptions/UnknownDataflowTypeException.php
@@ -11,7 +11,7 @@ class UnknownDataflowTypeException extends \Exception
{
public static function create(string $aliasOrFqcn, array $knownDataflowTypes): self
{
- return new self(sprintf(
+ return new self(\sprintf(
'Unknown dataflow type FQCN or alias "%s". Registered dataflow types FQCN and aliases are %s.',
$aliasOrFqcn,
implode(', ', $knownDataflowTypes)
diff --git a/src/Factory/ConnectionFactory.php b/src/Factory/ConnectionFactory.php
index 0314388..04da339 100644
--- a/src/Factory/ConnectionFactory.php
+++ b/src/Factory/ConnectionFactory.php
@@ -24,6 +24,6 @@ public function setConnectionName(string $connectionName)
public function getConnection(): \Doctrine\DBAL\Connection
{
- return $this->container->get(sprintf('doctrine.dbal.%s_connection', $this->connectionName));
+ return $this->container->get(\sprintf('doctrine.dbal.%s_connection', $this->connectionName));
}
}
diff --git a/src/Logger/BufferHandler.php b/src/Logger/BufferHandler.php
index 9b8ceae..cc6bc0a 100644
--- a/src/Logger/BufferHandler.php
+++ b/src/Logger/BufferHandler.php
@@ -7,7 +7,6 @@
use Monolog\Formatter\FormatterInterface;
use Monolog\Formatter\LineFormatter;
use Monolog\Handler\AbstractProcessingHandler;
-use Monolog\Logger;
use Monolog\LogRecord;
class BufferHandler extends AbstractProcessingHandler
diff --git a/src/Logger/DelegatingLogger.php b/src/Logger/DelegatingLogger.php
index f4675e1..fc7a029 100644
--- a/src/Logger/DelegatingLogger.php
+++ b/src/Logger/DelegatingLogger.php
@@ -16,7 +16,7 @@ public function __construct(iterable $loggers)
{
foreach ($loggers as $logger) {
if (!$logger instanceof LoggerInterface) {
- throw new \InvalidArgumentException(sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, $logger::class));
+ throw new \InvalidArgumentException(\sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, $logger::class));
}
$this->loggers[] = $logger;
diff --git a/src/Manager/ScheduledDataflowManager.php b/src/Manager/ScheduledDataflowManager.php
index 90d6bd0..cbb0f23 100644
--- a/src/Manager/ScheduledDataflowManager.php
+++ b/src/Manager/ScheduledDataflowManager.php
@@ -19,15 +19,12 @@ public function __construct(private Connection $connection, private ScheduledDat
{
}
- /**
- * {@inheritdoc}
- */
public function createJobsFromScheduledDataflows(): void
{
$this->connection->beginTransaction();
try {
foreach ($this->scheduledDataflowRepository->findReadyToRun() as $scheduled) {
- if (null !== $this->jobRepository->findPendingForScheduledDataflow($scheduled)) {
+ if ($this->jobRepository->findPendingForScheduledDataflow($scheduled) !== null) {
continue;
}
diff --git a/src/Processor/JobProcessor.php b/src/Processor/JobProcessor.php
index b2266f9..5f3182b 100644
--- a/src/Processor/JobProcessor.php
+++ b/src/Processor/JobProcessor.php
@@ -4,6 +4,7 @@
namespace CodeRhapsodie\DataflowBundle\Processor;
+use CodeRhapsodie\DataflowBundle\DataflowType\RepositoryInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
@@ -30,6 +31,10 @@ public function process(Job $job): void
$this->beforeProcessing($job);
$dataflowType = $this->registry->getDataflowType($job->getDataflowType());
+ if ($dataflowType instanceof RepositoryInterface) {
+ $dataflowType->setRepository($this->repository);
+ }
+
$loggers = [new Logger('dataflow_internal', [$bufferHandler = new BufferHandler()])];
if (isset($this->logger)) {
$loggers[] = $this->logger;
@@ -40,7 +45,7 @@ public function process(Job $job): void
$dataflowType->setLogger($logger);
}
- $result = $dataflowType->process($job->getOptions());
+ $result = $dataflowType->process($job->getOptions(), $job->getId());
if (!$dataflowType instanceof LoggerAwareInterface) {
foreach ($result->getExceptions() as $index => $e) {
diff --git a/src/Registry/DataflowTypeRegistry.php b/src/Registry/DataflowTypeRegistry.php
index 4855f95..f2bd10a 100644
--- a/src/Registry/DataflowTypeRegistry.php
+++ b/src/Registry/DataflowTypeRegistry.php
@@ -18,9 +18,6 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface
/** @var array|DataflowTypeInterface[] */
private array $aliasesRegistry = [];
- /**
- * {@inheritdoc}
- */
public function getDataflowType(string $fqcnOrAlias): DataflowTypeInterface
{
if (isset($this->fqcnRegistry[$fqcnOrAlias])) {
@@ -34,17 +31,11 @@ public function getDataflowType(string $fqcnOrAlias): DataflowTypeInterface
throw UnknownDataflowTypeException::create($fqcnOrAlias, [...array_keys($this->fqcnRegistry), ...array_keys($this->aliasesRegistry)]);
}
- /**
- * {@inheritdoc}
- */
public function listDataflowTypes(): iterable
{
return $this->fqcnRegistry;
}
- /**
- * {@inheritdoc}
- */
public function registerDataflowType(DataflowTypeInterface $dataflowType): void
{
$this->fqcnRegistry[$dataflowType::class] = $dataflowType;
diff --git a/src/Repository/InitFromDbTrait.php b/src/Repository/InitFromDbTrait.php
index 8581f77..5932d9d 100644
--- a/src/Repository/InitFromDbTrait.php
+++ b/src/Repository/InitFromDbTrait.php
@@ -14,7 +14,7 @@ abstract private function getFields(): array;
private function initDateTime(array $datas): array
{
foreach ($this->getFields() as $key => $type) {
- if ('datetime' === $type && null !== $datas[$key]) {
+ if ($type === 'datetime' && $datas[$key] !== null) {
$datas[$key] = new \DateTime($datas[$key]);
}
}
@@ -24,10 +24,10 @@ private function initDateTime(array $datas): array
private function initArray(array $datas): array
{
- if (!is_array($datas['options'])) {
+ if (!\is_array($datas['options'])) {
$datas['options'] = $this->strToArray($datas['options']);
}
- if (array_key_exists('exceptions', $datas) && !is_array($datas['exceptions'])) {
+ if (\array_key_exists('exceptions', $datas) && !\is_array($datas['exceptions'])) {
$datas['exceptions'] = $this->strToArray($datas['exceptions']);
}
@@ -36,12 +36,12 @@ private function initArray(array $datas): array
private function strToArray($value): array
{
- if (null === $value) {
+ if ($value === null) {
return [];
}
- $array = json_decode($value, true, 512, JSON_THROW_ON_ERROR);
+ $array = json_decode($value, true, 512, \JSON_THROW_ON_ERROR);
- return (false === $array) ? [] : $array;
+ return ($array === false) ? [] : $array;
}
}
diff --git a/src/Repository/JobRepository.php b/src/Repository/JobRepository.php
index 177644a..92d36a6 100644
--- a/src/Repository/JobRepository.php
+++ b/src/Repository/JobRepository.php
@@ -45,7 +45,7 @@ public function findOneshotDataflows(): iterable
->andWhere($qb->expr()->isNull('scheduled_dataflow_id'))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER)));
$stmt = $qb->executeQuery();
- if (0 === $stmt->rowCount()) {
+ if ($stmt->rowCount() === 0) {
return [];
}
while (false !== ($row = $stmt->fetchAssociative())) {
@@ -93,7 +93,7 @@ public function findLatests(): iterable
->orderBy('requested_date', 'DESC')
->setMaxResults(20);
$stmt = $qb->executeQuery();
- if (0 === $stmt->rowCount()) {
+ if ($stmt->rowCount() === 0) {
return [];
}
while (false !== ($row = $stmt->fetchAssociative())) {
@@ -108,7 +108,7 @@ public function findForScheduled(int $id): iterable
->orderBy('requested_date', 'DESC')
->setMaxResults(20);
$stmt = $qb->executeQuery();
- if (0 === $stmt->rowCount()) {
+ if ($stmt->rowCount() === 0) {
return [];
}
while (false !== ($row = $stmt->fetchAssociative())) {
@@ -121,14 +121,14 @@ public function save(Job $job)
$datas = $job->toArray();
unset($datas['id']);
- if (is_array($datas['options'])) {
- $datas['options'] = json_encode($datas['options'], JSON_THROW_ON_ERROR);
+ if (\is_array($datas['options'])) {
+ $datas['options'] = json_encode($datas['options'], \JSON_THROW_ON_ERROR);
}
- if (is_array($datas['exceptions'])) {
- $datas['exceptions'] = json_encode($datas['exceptions'], JSON_THROW_ON_ERROR);
+ if (\is_array($datas['exceptions'])) {
+ $datas['exceptions'] = json_encode($datas['exceptions'], \JSON_THROW_ON_ERROR);
}
- if (null === $job->getId()) {
+ if ($job->getId() === null) {
$this->connection->insert(static::TABLE_NAME, $datas, $this->getFields());
$job->setId((int) $this->connection->lastInsertId());
@@ -137,6 +137,11 @@ public function save(Job $job)
$this->connection->update(static::TABLE_NAME, $datas, ['id' => $job->getId()], $this->getFields());
}
+ public function updateCount(int $jobId, int $count): void
+ {
+ $this->connection->update(static::TABLE_NAME, ['count' => $count], ['id' => $jobId]);
+ }
+
public function createQueryBuilder($alias = null): QueryBuilder
{
$qb = $this->connection->createQueryBuilder();
@@ -149,7 +154,7 @@ public function createQueryBuilder($alias = null): QueryBuilder
private function returnFirstOrNull(QueryBuilder $qb): ?Job
{
$stmt = $qb->executeQuery();
- if (0 === $stmt->rowCount()) {
+ if ($stmt->rowCount() === 0) {
return null;
}
diff --git a/src/Repository/ScheduledDataflowRepository.php b/src/Repository/ScheduledDataflowRepository.php
index 219ae0f..dd9d1e3 100644
--- a/src/Repository/ScheduledDataflowRepository.php
+++ b/src/Repository/ScheduledDataflowRepository.php
@@ -41,7 +41,7 @@ public function findReadyToRun(): iterable
;
$stmt = $qb->executeQuery();
- if (0 === $stmt->rowCount()) {
+ if ($stmt->rowCount() === 0) {
return [];
}
while (false !== ($row = $stmt->fetchAssociative())) {
@@ -65,7 +65,7 @@ public function findAllOrderedByLabel(): iterable
$qb->orderBy('label', 'ASC');
$stmt = $qb->executeQuery();
- if (0 === $stmt->rowCount()) {
+ if ($stmt->rowCount() === 0) {
return [];
}
while (false !== ($row = $stmt->fetchAssociative())) {
@@ -90,11 +90,11 @@ public function save(ScheduledDataflow $scheduledDataflow)
$datas = $scheduledDataflow->toArray();
unset($datas['id']);
- if (is_array($datas['options'])) {
- $datas['options'] = json_encode($datas['options'], JSON_THROW_ON_ERROR);
+ if (\is_array($datas['options'])) {
+ $datas['options'] = json_encode($datas['options'], \JSON_THROW_ON_ERROR);
}
- if (null === $scheduledDataflow->getId()) {
+ if ($scheduledDataflow->getId() === null) {
$this->connection->insert(static::TABLE_NAME, $datas, $this->getFields());
$scheduledDataflow->setId((int) $this->connection->lastInsertId());
@@ -129,7 +129,7 @@ public function createQueryBuilder($alias = null): QueryBuilder
private function returnFirstOrNull(QueryBuilder $qb): ?ScheduledDataflow
{
$stmt = $qb->executeQuery();
- if (0 === $stmt->rowCount()) {
+ if ($stmt->rowCount() === 0) {
return null;
}
diff --git a/src/Resources/config/services.yaml b/src/Resources/config/services.yaml
index eba8e1e..3233846 100644
--- a/src/Resources/config/services.yaml
+++ b/src/Resources/config/services.yaml
@@ -23,6 +23,7 @@ services:
arguments:
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
+ $jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\JobShowCommand:
diff --git a/src/Runner/PendingDataflowRunner.php b/src/Runner/PendingDataflowRunner.php
index 3c13b73..36ff37e 100644
--- a/src/Runner/PendingDataflowRunner.php
+++ b/src/Runner/PendingDataflowRunner.php
@@ -13,9 +13,6 @@ public function __construct(private JobRepository $repository, private JobProces
{
}
- /**
- * {@inheritdoc}
- */
public function runPendingDataflows(): void
{
while (null !== ($job = $this->repository->findNextPendingDataflow())) {
diff --git a/src/Validator/Constraints/FrequencyValidator.php b/src/Validator/Constraints/FrequencyValidator.php
index b923b68..4a03814 100644
--- a/src/Validator/Constraints/FrequencyValidator.php
+++ b/src/Validator/Constraints/FrequencyValidator.php
@@ -10,22 +10,19 @@
class FrequencyValidator extends ConstraintValidator
{
- /**
- * {@inheritdoc}
- */
public function validate(mixed $value, Constraint $constraint)
{
if (!$constraint instanceof Frequency) {
throw new UnexpectedTypeException($constraint, Frequency::class);
}
- if (null === $value) {
+ if ($value === null) {
return;
}
try {
$interval = \DateInterval::createFromDateString($value);
- } catch (\Exception){
+ } catch (\Exception) {
$interval = false;
}