Subscriptions
One core concept of event sourcing is the ability to react and process events in a different way. This is where subscriptions and the subscription engine come into play.
There are different types of subscriptions. In most cases, we are talking about projector and processor. But you can use it for anything like migration, report or something else.
For this, we use the event store to get the events and process them. The event store remains untouched and everything can always be reproduced from the events.
The subscription engine manages individual subscribers and keeps the subscriptions running. Internally, the subscription engine does this by tracking where each subscriber is in the event stream.
Subscriber
If you want to react to events, you have to create a subscriber. Each subscriber need a unique ID and a run mode.
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Subscription\RunMode;
#[Subscriber('do_stuff', RunMode::Once)]
final class DoStuffSubscriber
{
}
Note
For each subsciber ID, the engine will create a subscription. If the subscriber ID changes, a new subscription will be created. In some cases like projections, you want to change the subscriber ID to rebuild the projection.
Tip
You can use specific attributes for specific subscribers like Projector
or Processor
.
So you don't have to define the group and run mode every time.
Projector
You can create projections and read models with a subscriber.
We named this type of subscriber projector
. But in the end it's the same.
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Subscription\RunMode;
#[Subscriber('profile_1', RunMode::FromBeginning)]
final class ProfileProjector
{
public function __construct(
private readonly Connection $connection,
) {
}
}
Projector
attribute.
It extends the Subscriber
attribute with a default group and run mode.
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Attribute\Projector;
#[Projector('profile_1')]
final class ProfileProjector
{
public function __construct(
private readonly Connection $connection,
) {
}
}
Warning
MySQL and MariaDB don't support transactions for DDL statements. So you must use a different database connection for your subscriptions.
Tip
Add a version as suffix to the subscriber id
so you can increment it when the subscription changes.
Like profile_1
to profile_2
.
Processor
The other way to react to events is to take actions like sending an email, dispatch commands or change other aggregates.
We named this type of subscriber processor
.
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Subscription\RunMode;
#[Subscriber('welcome_email', RunMode::FromNow)]
final class WelcomeEmailProcessor
{
public function __construct(
private readonly Mailer $mailer,
) {
}
}
For this reason, it is also possible to use the Processor
attribute.
It extends the Subscriber
attribute with a default group and run mode.
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Attribute\Processor;
#[Processor('welcome_email')]
final class WelcomeEmailProcessor
{
public function __construct(
private readonly Connection $connection,
) {
}
}
Subscribe
A subscriber (projector/processor) can subscribe any number of events.
In order to say which method is responsible for which event, you need the Subscribe
attribute.
There you can pass the event class to which the reaction should then take place.
The method itself must expect a Message
, which then contains the event.
The method name itself doesn't matter.
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Subscription\RunMode;
#[Subscriber('do_stuff', RunMode::Once)]
final class DoStuffSubscriber
{
#[Subscribe(ProfileCreated::class)]
public function onProfileCreated(Message $message): void
{
$profileCreated = $message->event();
// do something
}
}
Tip
If you are using psalm then you can install the event sourcing plugin to make the event method return the correct type.
Subscribe all events
If you want to subscribe on all events, you can pass *
or Subscribe::ALL
instead of the event class.
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Message\Message;
final class WelcomeSubscriber
{
#[Subscribe('*')]
public function onProfileCreated(Message $message): void
{
echo 'Welcome!';
}
}
Argument Resolver
The library analyses the method signature and tries to resolve the arguments. The order of the arguments doesn't matter, you can use multiple arguments and mix them.
Message Resolver
The message resolver resolves the Message
object.
It looks for a parameter with the type Message
.
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Subscription\RunMode;
#[Subscriber('do_stuff', RunMode::Once)]
final class DoStuffSubscriber
{
#[Subscribe(ProfileCreated::class)]
public function onProfileCreated(Message $message): void
{
// do something
}
}
Event Resolver
The event resolver resolves the event object. It looks for a parameter with the type of the event.
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Subscription\RunMode;
#[Subscriber('do_stuff', RunMode::Once)]
final class DoStuffSubscriber
{
#[Subscribe(ProfileCreated::class)]
public function onProfileCreated(ProfileCreated $profileCreated): void
{
// do something
}
}
Aggregate Id Resolver
The aggregate id resolver resolves the aggregate id.
It looks for a parameter with the instance of the AggregateRootId
.
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Subscription\RunMode;
#[Subscriber('do_stuff', RunMode::Once)]
final class DoStuffSubscriber
{
#[Subscribe(ProfileCreated::class)]
public function onProfileCreated(ProfileId $profileId): void
{
// do something
}
}
Warning
The resolver argument doesn't know if you're using the correct aggregate id class and doesn't check it.
It gets the Aggregate ID as a string, takes the class and instantiates it with the method fromString
.
Recorded On Resolver
The recorded on resolver resolves the recorded on date.
It looks for a parameter with the instance of the DateTimeImmutable
.
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Subscription\RunMode;
#[Subscriber('do_stuff', RunMode::Once)]
final class DoStuffSubscriber
{
#[Subscribe(ProfileCreated::class)]
public function onProfileCreated(DateTimeImmutable $recordedOn): void
{
// do something
}
}
Setup and Teardown
Subscribers can have one setup
and teardown
method that is executed when the subscription is created or deleted.
For this there are the attributes Setup
and Teardown
. The method name itself doesn't matter.
This is especially helpful for projectors, as they can create the necessary structures for the projection here.
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Attribute\Setup;
use Patchlevel\EventSourcing\Attribute\Teardown;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberUtil;
#[Projector('profile_1')]
final class ProfileProjector
{
use SubscriberUtil;
private Connection $connection;
#[Setup]
public function create(): void
{
$this->connection->executeStatement(
"CREATE TABLE IF NOT EXISTS {$this->table()} (id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL);",
);
}
#[Teardown]
public function drop(): void
{
$this->connection->executeStatement("DROP TABLE IF EXISTS {$this->table()};");
}
private function table(): string
{
return 'projection_' . $this->subscriberId();
}
}
Danger
MySQL and MariaDB don't support transactions for DDL statements. So you must use a different database connection in your projectors, otherwise you will get an error when the subscription tries to create the table.
Warning
If you change the subscriber id, you must also change the table/collection name.
The subscription engine will create a new subscription with the new subscriber id.
That means the setup method will be called again and the table/collection will conflict with the old existing projection.
You can use the SubscriberUtil
to build the table/collection name.
Note
Most databases have a limit on the length of the table/collection name. The limit is usually 64 characters.
Versioning
As soon as the structure of a projection changes, or you need other events from the past, you can change the subscriber ID to rebuild the projection. This will trigger the subscription engine to create a new subscription and boot the projection from the beginning.
use Patchlevel\EventSourcing\Attribute\Projector;
#[Projector('profile_2')]
final class ProfileSubscriber
{
// ...
}
Warning
If you change the subscriberID
, you must also change the table/collection name.
Otherwise the table/collection will conflict with the old subscription.
Tip
Add a version as suffix to the subscriber id
so you can increment it when the subscription changes.
Like profile_1
to profile_2
.
Grouping
You can also group subscribers together and filter them in the subscription engine. This is useful if you want to run subscribers in different processes or on different servers.
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Subscription\RunMode;
#[Subscriber('profile_1', runMode: RunMode::Once, group: 'a')]
final class ProfileSubscriber
{
// ...
}
Note
The different attributes has different default group.
Subscriber
-default
Projector
-projector
Processor
-processor
Run Mode
The run mode determines how the subscriber should behave. There are three different modes:
From Beginning
The subscriber will start from the beginning of the event stream and process all events. This is useful for subscribers that need to build up a projection from scratch.
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Subscription\RunMode;
#[Subscriber('welcome_email', RunMode::FromBeginning)]
final class WelcomeEmailSubscriber
{
// ...
}
Tip
If you want create projections and run from the beginning, you can use the Projector
attribute.
From Now
Certain subscribers operate exclusively on post-release events, disregarding historical data. This is useful for subscribers that are only interested in events that occur after a certain point in time. As example, a welcome email subscriber that only wants to send emails to new users.
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Subscription\RunMode;
#[Subscriber('welcome_email', RunMode::FromNow)]
final class WelcomeEmailSubscriber
{
// ...
}
Tip
If you want process events from now, you can use the Processor
attribute.
Once
This mode is useful for subscribers that only need to run once. This is useful for subscribers to create reports or to migrate data.
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Subscription\RunMode;
#[Subscriber('migration', RunMode::Once)]
final class MigrationSubscriber
{
// ...
}
Subscription Engine
The subscription engine manages individual subscribers and keeps the subscriptions running. Internally, the subscription engine does this by tracking where each subscriber is in the event stream and keeping all subscriptions up to date.
He also takes care that new subscribers are booted and old ones are removed again. If something breaks, the subscription engine marks the individual subscriptions as faulty and retries them.
Tip
The Subscription Engine was inspired by the following two blog posts:
Subscription ID
The subscription ID is taken from the associated subscriber and corresponds to the subscriber ID. Unlike the subscriber ID, the subscription ID can no longer change. If the Subscriber ID is changed, a new subscription will be created with this new subscriber ID. So there are two subscriptions, one with the old subscriber ID and one with the new subscriber ID.
Subscription Position
Furthermore, the position in the event stream is stored for each subscription. So that the subscription engine knows where the subscription stopped and must continue.
Subscription Status
There is a lifecycle for each subscription. This cycle is tracked by the subscription engine.
stateDiagram-v2
direction LR
[*] --> New
New --> Booting
New --> Active
New --> Error
Booting --> Active
Booting --> Paused
Booting --> Finished
Booting --> Error
Active --> Paused
Active --> Finished
Active --> Detached
Active --> Error
Paused --> Booting
Paused --> Active
Paused --> Detached
Finished --> Active
Finished --> Detached
Error --> New
Error --> Booting
Error --> Active
Error --> Paused
Error --> [*]
Detached --> Active
Detached --> [*]
New
A subscription is created and "new" if a subscriber exists with an ID that is not yet tracked.
This can happen when either a new subscriber has been added, the subscriber ID has changed
or the subscription has been manually deleted from the subscription store.
You can then set up the subscription so that it is booting or active.
In this step, the subscription engine also tries to call the setup
method if available.
Booting
Booting status is reached when the setup process is finished. In this step the subscription engine tries to catch up to the current event stream. When the process is finished, the subscription is set to active or finished.
Active
The active status describes the subscriptions currently being actively managed by the subscription engine. These subscriptions have a subscriber, follow the event stream and should be up-to-date.
Paused
A subscription can manually be paused. It will then no longer be updated by the subscription engine. This can be useful if you want to pause a subscription for a certain period of time. You can also reactivate the subscription if you want so that it continues.
Finished
A subscription is finished if the subscriber has the mode RunMode::Once
.
This means that the subscription is only run once and then set to finished if it reaches the end of the event stream.
You can also reactivate the subscription if you want so that it continues.
Detached
If an active or finished subscription exists in the subscription store that does not have a subscriber in the source code with a corresponding subscriber ID, then this subscription is marked as detached. This happens when either the subscriber has been deleted or the subscriber ID of a subscriber has changed. In the last case there should be a new subscription with the new subscriber ID.
A detached subscription does not automatically become active again when the subscriber exists again. This happens, for example, when an old version was deployed again during a rollback.
There are two options to reactivate the subscription:
- Reactivate the subscription, so that the subscription is active again.
- Remove the subscription and rebuild it from scratch.
Error
If an error occurs in a subscriber, then the subscription is set to Error. This can happen in the create process, in the boot process or in the run process. This subscription will then no longer boot/run until the subscription is reactivate or retried.
The subscription engine has a retry strategy to retry subscriptions that have failed. It tries to reactivate the subscription after a certain time and a certain number of attempts. If this does not work, the subscription is set to error and must be manually reactivated.
There are two options here:
- Reactivate the subscription, so that the subscription is in the previous state again.
- Remove the subscription and rebuild it from scratch.
Setup
In order for the subscription engine to be able to do its work, you have to assemble it beforehand.
Subscription Store
The Subscription Engine uses a subscription store to store the status of each subscription. We provide a Doctrine implementation of this by default.
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
/** @var Connection $connection */
$subscriptionStore = new DoctrineSubscriptionStore($connection);
DoctrineSchemaDirector
our schema configuration.
Using ChainDoctrineSchemaConfigurator
we can add multiple schema configurators.
In our case they need the DoctrineSchemaDirector
from the event store and subscription store.
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
/**
* @var Connection $connection
* @var Store $eventStore
* @var DoctrineSubscriptionStore $subscriptionStore
*/
$schemaDirector = new DoctrineSchemaDirector(
$connection,
new ChainDoctrineSchemaConfigurator([
$eventStore,
$subscriptionStore,
]),
);
Note
You can find more about schema configurator here
Retry Strategy
The subscription engine uses a retry strategy to retry subscriptions that have failed. Our default strategy can be configured with the following parameters:
baseDelay
- The base delay in seconds.delayFactor
- The factor by which the delay is multiplied after each attempt.maxAttempts
- The maximum number of attempts.
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
$retryStrategy = new ClockBasedRetryStrategy(
baseDelay: 5,
delayFactor: 2,
maxAttempts: 5,
);
Tip
You can reactivate the subscription manually or remove it and rebuild it from scratch.
Subscriber Accessor
The subscriber accessor repository is responsible for providing the subscribers to the subscription engine. We provide a metadata subscriber accessor repository by default.
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
/**
* @var object $subscriber1
* @var object $subscriber2
* @var object $subscriber3
*/
$subscriberAccessorRepository = new MetadataSubscriberAccessorRepository([
$subscriber1,
$subscriber2,
$subscriber3,
]);
Subscription Engine
Now we can create the subscription engine and plug together the necessary services. The event store is needed to load the events, the Subscription Store to store the subscription state and we need the subscriber accessor repository. Optionally, we can also pass a retry strategy.
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\NoRetryStrategy;
use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
/**
* @var Store $eventStore
* @var DoctrineSubscriptionStore $subscriptionStore
* @var MetadataSubscriberAccessorRepository $subscriberAccessorRepository
* @var NoRetryStrategy $retryStrategy
*/
$subscriptionEngine = new DefaultSubscriptionEngine(
$eventStore,
$subscriptionStore,
$subscriberAccessorRepository,
$retryStrategy,
);
Catch up Subscription Engine
If aggregates are used in the processors and new events are generated there,
then they are not part of the current subscription engine run and will only be processed during the next run or boot.
This is usually not a problem in dev or prod environment because a worker is used
and these events will be processed at some point. But in testing it is not so easy.
For this reason, we have the CatchUpSubscriptionEngine
decorator.
use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
/** @var SubscriptionEngine $subscriptionStore */
$catchupSubscriptionEngine = new CatchUpSubscriptionEngine($subscriptionEngine);
Tip
You can use the CatchUpSubscriptionEngine
in your tests to process the events immediately.
Throw on error Subscription Engine
This is another decorator for the subscription engine. It throws an exception if a subscription is in error state. This is useful for testing or development to get directly feedback if something is wrong.
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\ThrowOnErrorSubscriptionEngine;
/** @var SubscriptionEngine $subscriptionStore */
$throwOnErrorSubscriptionEngine = new ThrowOnErrorSubscriptionEngine($subscriptionEngine);
Warning
This is only for testing or development. Don't use it in production. The subscription engine has an build in retry strategy to retry subscriptions that have failed.
Run Subscription Engine after save
You can trigger the subscription engine after calling the save
method on the repository.
This means that a worker to run the subscriptions are not needed.
use Patchlevel\EventSourcing\Repository\RepositoryManager;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Repository\RunSubscriptionEngineRepositoryManager;
/**
* @var SubscriptionEngine $subscriptionEngine
* @var RepositoryManager $defaultRepositoryManager
*/
$eventBus = new RunSubscriptionEngineRepositoryManager(
$defaultRepositoryManager,
$subscriptionEngine,
['id1', 'id2'], // filter subscribers by id
['group1', 'group2'], // filter subscribers by group
100, // limit the number of messages
);
Danger
By using this, you can't wrap the repository in a transaction. A rollback is not supported and can break the subscription engine. Internally, the events are saved in a transaction to ensure data consistency.
Note
More about repository manager and repository can be found here.
Tip
You can perfectly use it in development or testing.
Especially in combination with the CatchUpSubscriptionEngine
and ThrowOnErrorSubscriptionEngine
decorators.
Usage
The Subscription Engine has a few methods needed to use it effectively.
A SubscriptionEngineCriteria
can be passed to all of these methods to filter the respective subscriptions.
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
$criteria = new SubscriptionEngineCriteria(
ids: ['profile_1', 'welcome_email'],
groups: ['default'],
);
Note
An OR
check is made for the respective criteria and all criteria are checked with an AND
.
Setup
New subscriptions need to be set up before they can be used.
In this step, the subscription engine also tries to call the setup
method if available.
After the setup process, the subscription is set to booting or active.
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->setup(new SubscriptionEngineCriteria());
Tip
You can skip the booting step with the second boolean parameter named skipBooting
.
Boot
You can boot the subscriptions with the boot
method.
All booting subscriptions will catch up to the current event stream.
After the boot process, the subscription is set to active or finished.
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->boot(new SubscriptionEngineCriteria());
Run
All active subscriptions are continued and updated here.
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->run(new SubscriptionEngineCriteria());
Teardown
If subscriptions are detached, they can be cleaned up here.
The subscription engine also tries to call the teardown
method if available.
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->teardown(new SubscriptionEngineCriteria());
Remove
You can also directly remove a subscription regardless of its status.
An attempt is made to call the teardown
method if available.
But the entry will still be removed if it doesn't work.
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->remove(new SubscriptionEngineCriteria());
Reactivate
If a subscription had an error or is outdated, you can reactivate it. As a result, the subscription gets in the last status again.
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->reactivate(new SubscriptionEngineCriteria());
Pause
Pausing a subscription is also possible. The subscription will then no longer be managed by the subscription engine. You can reactivate the subscription if you want so that it continues.
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
/** @var SubscriptionEngine $subscriptionEngine */
$subscriptionEngine->pause(new SubscriptionEngineCriteria());
Status
To get the current status of all subscriptions, you can get them using the subscriptions
method.
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
/** @var SubscriptionEngine $subscriptionEngine */
$subscriptions = $subscriptionEngine->subscriptions(new SubscriptionEngineCriteria());
foreach ($subscriptions as $subscription) {
echo $subscription->status()->value;
}