Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3e36038
Update dependency pymdown-extensions to v10.18
renovate[bot] Dec 7, 2025
f95b013
Update dependency pymdown-extensions to v10.19
renovate[bot] Dec 11, 2025
b671bbb
Add QueryBus integration test which was missing, also add "full cycle…
DanielBadura Dec 12, 2025
ccfb812
Merge pull request #799 from patchlevel/add-querybus-and-bench-test
DanielBadura Dec 12, 2025
6705bc4
Update dependency pymdown-extensions to v10.19.1
renovate[bot] Dec 14, 2025
36d8d3f
Lock file maintenance
renovate[bot] Dec 16, 2025
e488b56
Update dependency mkdocs-material to v9.7.1
renovate[bot] Dec 18, 2025
024ba26
Lock file maintenance
renovate[bot] Dec 20, 2025
2452727
Lock file maintenance
renovate[bot] Dec 22, 2025
1e0365c
Lock file maintenance
renovate[bot] Dec 23, 2025
c0aed5d
Lock file maintenance
renovate[bot] Dec 25, 2025
d69b115
Update dependency infection/infection to ^0.32.0
renovate[bot] Dec 25, 2025
62d0aea
Lock file maintenance
renovate[bot] Dec 26, 2025
46126cb
Lock file maintenance
renovate[bot] Dec 27, 2025
a935088
Add `StoreMigrateCommand` from integration repos to ease maintainance
DanielBadura Dec 29, 2025
d2e80f9
Merge pull request #801 from patchlevel/add-store-migration-command
DanielBadura Dec 29, 2025
6dc8a9f
Lock file maintenance
renovate[bot] Dec 30, 2025
fd74b08
Lock file maintenance
renovate[bot] Dec 31, 2025
705afc3
Update dependency pymdown-extensions to v10.20
renovate[bot] Dec 31, 2025
f37304a
Lock file maintenance
renovate[bot] Jan 2, 2026
7fab24b
Lock file maintenance
renovate[bot] Jan 3, 2026
183a44c
Lock file maintenance
renovate[bot] Jan 4, 2026
9717148
Lock file maintenance
renovate[bot] Jan 6, 2026
33a5134
Deprecate `SubscriberHelper` and `SubscriberUtil` and update docs to …
DanielBadura Jan 6, 2026
a18515f
Merge pull request #802 from patchlevel/deprecate-subscriber-util-and…
DanielBadura Jan 6, 2026
71cb083
Lock file maintenance
renovate[bot] Jan 7, 2026
2153d2d
Merge remote-tracking branch 'origin/4.0.x' into 3.15.x-merge-up-into…
DanielBadura Jan 8, 2026
bf535b9
Use StreamStore and message loader, fix cs
DanielBadura Jan 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"require-dev": {
"ext-pdo_sqlite": "~8.2.0 || ~8.3.0 || ~8.4.0 || ~8.5.0",
"doctrine/orm": "^2.18.0 || ^3.0.0",
"infection/infection": "^0.31.9",
"infection/infection": "^0.32.0",
"league/commonmark": "^2.6.1",
"patchlevel/coding-standard": "^1.3.0",
"patchlevel/event-sourcing-phpstan-extension": "dev-event-sourcing-4.0",
Expand Down
497 changes: 288 additions & 209 deletions composer.lock

Large diffs are not rendered by default.

23 changes: 9 additions & 14 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,12 @@ use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Attribute\Setup;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Attribute\Teardown;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberUtil;

#[Projector('hotel')]
#[Projector(self::TABLE)]
final class HotelProjector
{
use SubscriberUtil;
// use a const for easier access in the projector & to keep projector id and table name in sync
private const TABLE = 'hotel';

public function __construct(
private readonly Connection $db,
Expand All @@ -178,14 +178,14 @@ final class HotelProjector
/** @return list<array{id: string, name: string, guests: int}> */
public function getHotels(): array
{
return $this->db->fetchAllAssociative("SELECT id, name, guests FROM {$this->table()};");
return $this->db->fetchAllAssociative(sprintf('SELECT id, name, guests FROM %s;'), self::TABLE);
}

#[Subscribe(HotelCreated::class)]
public function handleHotelCreated(HotelCreated $event): void
{
$this->db->insert(
$this->table(),
self::TABLE,
[
'id' => $event->hotelId->toString(),
'name' => $event->hotelName,
Expand All @@ -198,7 +198,7 @@ final class HotelProjector
public function handleGuestIsCheckedIn(GuestIsCheckedIn $event): void
{
$this->db->executeStatement(
"UPDATE {$this->table()} SET guests = guests + 1 WHERE id = ?;",
sprintf('UPDATE %s SET guests = guests + 1 WHERE id = ?;', self::TABLE),
[$event->hotelId->toString()],
);
}
Expand All @@ -207,26 +207,21 @@ final class HotelProjector
public function handleGuestIsCheckedOut(GuestIsCheckedOut $event): void
{
$this->db->executeStatement(
"UPDATE {$this->table()} SET guests = guests - 1 WHERE id = ?;",
sprintf('UPDATE %s SET guests = guests - 1 WHERE id = ?;', self::TABLE),
[$event->hotelId->toString()],
);
}

#[Setup]
public function create(): void
{
$this->db->executeStatement("CREATE TABLE IF NOT EXISTS {$this->table()} (id VARCHAR PRIMARY KEY, name VARCHAR, guests INTEGER);");
$this->db->executeStatement(sprintf('CREATE TABLE IF NOT EXISTS %s (id VARCHAR PRIMARY KEY, name VARCHAR, guests INTEGER);', self::TABLE));
}

#[Teardown]
public function drop(): void
{
$this->db->executeStatement("DROP TABLE IF EXISTS {$this->table()};");
}

private function table(): string
{
return 'projection_' . $this->subscriberId();
$this->db->executeStatement(sprintf('DROP TABLE IF EXISTS %s;', self::TABLE));
}
}
```
Expand Down
19 changes: 5 additions & 14 deletions docs/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,6 @@ use Patchlevel\EventSourcing\Subscription\Lookup;
#[Projector('public_profile')]
final class PublicProfileProjection
{
use SubscriberUtil;

// ... constructor

#[Subscribe(Published::class)]
Expand Down Expand Up @@ -310,32 +308,26 @@ use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Attribute\Setup;
use Patchlevel\EventSourcing\Attribute\Teardown;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberUtil;

#[Projector('profile_1')]
#[Projector(self::TABLE)]
final class ProfileProjector
{
use SubscriberUtil;
private const TABLE = 'profile_v1';

private Connection $connection;

#[Setup]
public function create(): void
{
$this->connection->executeStatement(
"CREATE TABLE IF NOT EXISTS {$this->table()} (id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL);",
sprintf('CREATE TABLE IF NOT EXISTS %s (id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL);', self::TABLE),
);
}

#[Teardown]
public function drop(): void
{
$this->connection->executeStatement("DROP TABLE IF EXISTS {$this->table()};");
}

private function table(): string
{
return 'projection_' . $this->subscriberId();
$this->connection->executeStatement(sprintf('DROP TABLE IF EXISTS %s;', self::TABLE));
}
}
```
Expand All @@ -345,12 +337,11 @@ MySQL and MariaDB don't support transactions for DDL statements.
So you must use a different database connection in your projectors,
otherwise you will get an error when the subscription tries to create the table.
:::

:::warning
If you change the subscriber id, you must also change the table/collection name.
The subscription engine will create a new subscription with the new subscriber id.
That means the setup method will be called again and the table/collection will conflict with the old existing projection.
You can use the `SubscriberUtil` to build the table/collection name.
:::

:::note
Expand Down
66 changes: 54 additions & 12 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ parameters:
count: 1
path: src/Message/Serializer/DefaultHeadersSerializer.php

-
message: '#^Call to an undefined method Patchlevel\\EventSourcing\\Store\\Store\:\:archive\(\)\.$#'
identifier: method.notFound
count: 1
path: src/Repository/DefaultRepository.php

-
message: '#^Property Patchlevel\\EventSourcing\\Serializer\\Normalizer\\IdNormalizer\:\:\$identifierClass \(class\-string\<Patchlevel\\EventSourcing\\Identifier\\Identifier\>\|null\) does not accept string\.$#'
identifier: assign.propertyType
Expand Down Expand Up @@ -168,6 +162,24 @@ parameters:
count: 3
path: src/Subscription/ThrowableToErrorContextTransformer.php

-
message: '#^Cannot access offset ''name'' on array\<string, mixed\>\|false\.$#'
identifier: offsetAccess.nonOffsetAccessible
count: 1
path: tests/Benchmark/BasicImplementation/Projection/ProfileProjector.php

-
message: '#^Method Patchlevel\\EventSourcing\\Tests\\Benchmark\\BasicImplementation\\Projection\\ProfileProjector\:\:getProfileName\(\) should return string but returns mixed\.$#'
identifier: return.type
count: 1
path: tests/Benchmark/BasicImplementation/Projection/ProfileProjector.php

-
message: '#^Parameter \#1 \$messageLoader of class Patchlevel\\EventSourcing\\Subscription\\Engine\\DefaultSubscriptionEngine constructor expects Patchlevel\\EventSourcing\\Subscription\\Engine\\MessageLoader, Patchlevel\\EventSourcing\\Store\\StreamDoctrineDbalStore given\.$#'
identifier: argument.type
count: 1
path: tests/Benchmark/CommandToQueryBench.php

-
message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Tests\\\\Integration\\\\BankAccountSplitStream\\\\BankAccount'' and Patchlevel\\EventSourcing\\Tests\\Integration\\BankAccountSplitStream\\BankAccount will always evaluate to true\.$#'
identifier: staticMethod.alreadyNarrowedType
Expand Down Expand Up @@ -216,6 +228,42 @@ parameters:
count: 1
path: tests/Integration/BasicImplementation/BasicIntegrationTest.php

-
message: '#^Instantiated class Patchlevel\\EventSourcing\\Tests\\Integration\\BasicImplementation\\DoctrineDbalStore not found\.$#'
identifier: class.notFound
count: 1
path: tests/Integration/BasicImplementation/BasicIntegrationTest.php

-
message: '#^Parameter \#1 \$messageLoader of class Patchlevel\\EventSourcing\\Subscription\\Engine\\DefaultSubscriptionEngine constructor expects Patchlevel\\EventSourcing\\Subscription\\Engine\\MessageLoader, Patchlevel\\EventSourcing\\Tests\\Integration\\BasicImplementation\\DoctrineDbalStore given\.$#'
identifier: argument.type
count: 1
path: tests/Integration/BasicImplementation/BasicIntegrationTest.php

-
message: '#^Parameter \#2 \$schemaConfigurator of class Patchlevel\\EventSourcing\\Schema\\DoctrineSchemaDirector constructor expects Patchlevel\\EventSourcing\\Schema\\DoctrineSchemaConfigurator, Patchlevel\\EventSourcing\\Tests\\Integration\\BasicImplementation\\DoctrineDbalStore given\.$#'
identifier: argument.type
count: 1
path: tests/Integration/BasicImplementation/BasicIntegrationTest.php

-
message: '#^Parameter \#2 \$store of class Patchlevel\\EventSourcing\\Repository\\DefaultRepositoryManager constructor expects Patchlevel\\EventSourcing\\Store\\Store, Patchlevel\\EventSourcing\\Tests\\Integration\\BasicImplementation\\DoctrineDbalStore given\.$#'
identifier: argument.type
count: 1
path: tests/Integration/BasicImplementation/BasicIntegrationTest.php

-
message: '#^Cannot access offset ''name'' on array\<string, mixed\>\|false\.$#'
identifier: offsetAccess.nonOffsetAccessible
count: 1
path: tests/Integration/BasicImplementation/Projection/ProfileProjector.php

-
message: '#^Method Patchlevel\\EventSourcing\\Tests\\Integration\\BasicImplementation\\Projection\\ProfileProjector\:\:getProfileName\(\) should return string but returns mixed\.$#'
identifier: return.type
count: 1
path: tests/Integration/BasicImplementation/Projection/ProfileProjector.php

-
message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Tests\\\\Integration\\\\MicroAggregate\\\\Profile'' and Patchlevel\\EventSourcing\\Tests\\Integration\\MicroAggregate\\Profile will always evaluate to true\.$#'
identifier: staticMethod.alreadyNarrowedType
Expand Down Expand Up @@ -276,12 +324,6 @@ parameters:
count: 1
path: tests/Integration/Subscription/SubscriptionTest.php

-
message: '#^Cannot use array destructuring on list\<mixed\>\|null\.$#'
identifier: offsetAccess.nonArray
count: 1
path: tests/ReturnCallback.php

-
message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Metadata\\\\AggregateRoot\\\\AggregateRootMetadata'' and Patchlevel\\EventSourcing\\Metadata\\AggregateRoot\\AggregateRootMetadata\<Patchlevel\\EventSourcing\\Aggregate\\BasicAggregateRoot\> will always evaluate to true\.$#'
identifier: staticMethod.alreadyNarrowedType
Expand Down
88 changes: 88 additions & 0 deletions src/Console/Command/StoreMigrateCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Console\Command;

use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Console\OutputStyle;
use Patchlevel\EventSourcing\Message\Pipe;
use Patchlevel\EventSourcing\Message\Translator\Translator;
use Patchlevel\EventSourcing\Store\Store;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

use function count;

#[AsCommand(
'event-sourcing:store:migrate',
'migrate events from one store to another',
)]
final class StoreMigrateCommand extends Command
{
/** @param iterable<int, Translator> $translators */
public function __construct(
private readonly Store $store,
private readonly Store $newStore,
private readonly iterable $translators = [],
) {
parent::__construct();
}

protected function configure(): void
{
$this
->addOption(
'buffer',
null,
InputOption::VALUE_REQUIRED,
'How many messages should be buffered',
1_000,
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$buffer = InputHelper::positiveIntOrZero($input->getOption('buffer'));
$style = new OutputStyle($input, $output);

$style->info('Migration initialization...');

$count = $this->store->count();
$messages = $this->store->load();

$style->progressStart($count);

$bufferedMessages = [];

$pipe = new Pipe(
$messages,
...$this->translators,
);

foreach ($pipe as $message) {
$bufferedMessages[] = $message;

if (count($bufferedMessages) < $buffer) {
continue;
}

$this->newStore->save(...$bufferedMessages);
$bufferedMessages = [];
$style->progressAdvance($buffer);
}

if (count($bufferedMessages) !== 0) {
$this->newStore->save(...$bufferedMessages);
$style->progressAdvance(count($bufferedMessages));
}

$style->progressFinish();
$style->success('Migration finished');

return 0;
}
}
14 changes: 7 additions & 7 deletions src/Subscription/Engine/SubscriptionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,48 +77,48 @@ public function find(SubscriptionCriteria $criteria): array
public function add(Subscription ...$subscriptions): void
{
foreach ($subscriptions as $sub) {
$this->forAdd->attach($sub);
$this->forAdd->offsetSet($sub);
}
}

public function update(Subscription ...$subscriptions): void
{
foreach ($subscriptions as $sub) {
$this->forUpdate->attach($sub);
$this->forUpdate->offsetSet($sub);
}
}

public function remove(Subscription ...$subscriptions): void
{
foreach ($subscriptions as $sub) {
$this->forRemove->attach($sub);
$this->forRemove->offsetSet($sub);
}
}

public function flush(): void
{
foreach ($this->forAdd as $subscription) {
if ($this->forRemove->contains($subscription)) {
if ($this->forRemove->offsetExists($subscription)) {
continue;
}

$this->subscriptionStore->add($subscription);
}

foreach ($this->forUpdate as $subscription) {
if ($this->forAdd->contains($subscription)) {
if ($this->forAdd->offsetExists($subscription)) {
continue;
}

if ($this->forRemove->contains($subscription)) {
if ($this->forRemove->offsetExists($subscription)) {
continue;
}

$this->subscriptionStore->update($subscription);
}

foreach ($this->forRemove as $subscription) {
if ($this->forAdd->contains($subscription)) {
if ($this->forAdd->offsetExists($subscription)) {
continue;
}

Expand Down
1 change: 1 addition & 0 deletions src/Subscription/Subscriber/SubscriberHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadata;
use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadataFactory;

/** @deprecated since 3.15.0 will be removed with 4.0.0 */
final class SubscriberHelper
{
public function __construct(
Expand Down
1 change: 1 addition & 0 deletions src/Subscription/Subscriber/SubscriberUtil.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory;
use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadataFactory;

/** @deprecated since 3.15.0 will be removed with 4.0.0 */
trait SubscriberUtil
{
private static SubscriberMetadataFactory|null $metadataFactory = null;
Expand Down
Loading
Loading