Store
In the end, the messages have to be saved somewhere. Each message contains an event and the associated headers.
Note
More information about the message can be found here.
The store is optimized to efficiently store and load events for aggregates.
Configure Store
We offer different stores to store the messages. Two stores based on doctrine dbal and one in-memory store for testing purposes.
DoctrineDbalStore
This is the current default store for event sourcing.
You can create a store with the DoctrineDbalStore
class.
The store needs a dbal connection, an event serializer and has some optional parameters like options.
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Tools\DsnParser;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
$connection = DriverManager::getConnection(
(new DsnParser())->parse('pdo-pgsql://user:secret@localhost/app'),
);
$store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths(['src/Event']),
);
Note
You can find out more about how to create a connection here
Following options are available in DoctrineDbalStore
:
Option | Type | Default | Description |
---|---|---|---|
table_name | string | eventstore | The name of the table in the database |
aggregate_id_type | "uuid"/"string" | uuid | The type of the aggregate_id column |
locking | bool | true | If the store should use locking for writing |
lock_id | int | 133742 | The id of the lock |
lock_timeout | int | -1 | The timeout of the lock. -1 means no timeout |
The table structure of the DoctrineDbalStore
looks like this:
Column | Type | Description |
---|---|---|
id | bigint | The index of the whole stream (autoincrement) |
aggregate | string | The name of the aggregate |
aggregate_id | uuid/string | The id of the aggregate |
playhead | int | The current playhead of the aggregate |
event | string | The name of the event |
payload | json | The payload of the event |
recorded_on | datetime | The date when the event was recorded |
new_stream_start | bool | If the event is the first event of the aggregate |
archived | bool | If the event is archived |
custom_headers | json | Custom headers for the event |
Note
The default type of the aggregate_id
column is uuid
if the database supports it and string
if not.
You can change the type with the aggregate_id_type
to string
if you want use custom id.
StreamDoctrineDbalStore
We offer a new store called StreamDoctrineDbalStore
.
This store is decoupled from the aggregate and can be used to store events from other sources.
The difference to the DoctrineDbalStore
is that the StreamDoctrineDbalStore
merge the aggregate id
and the aggregate name into one column named stream
. Additionally, the column playhead
is nullable.
This store introduces two new methods streams
and remove
.
The store needs a dbal connection, an event serializer and has some optional parameters like options.
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Tools\DsnParser;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;
$connection = DriverManager::getConnection(
(new DsnParser())->parse('pdo-pgsql://user:secret@localhost/app'),
);
$store = new StreamDoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths(['src/Event']),
);
Note
You can find out more about how to create a connection here
Following options are available in StreamDoctrineDbalStore
:
Option | Type | Default | Description |
---|---|---|---|
table_name | string | event_store | The name of the table in the database |
locking | bool | true | If the store should use locking for writing |
lock_id | int | 133742 | The id of the lock |
lock_timeout | int | -1 | The timeout of the lock. -1 means no timeout |
keep_index | bool | false | By message save the index header will be kept |
The table structure of the StreamDoctrineDbalStore
looks like this:
Column | Type | Description |
---|---|---|
id | bigint | The index of the whole stream (autoincrement) |
stream | string | The name of the stream |
playhead | ?int | The current playhead of the aggregate |
event_id | string | The id of the event |
event_name | string | The name of the event |
event_payload | json | The payload of the event |
recorded_on | datetime | The date when the event was recorded |
new_stream_start | bool | If the event is the first event of the aggregate |
archived | bool | If the event is archived |
custom_headers | json | Custom headers for the event |
InMemoryStore
We also offer an in-memory store for testing purposes.
Tip
You can pass messages to the constructor to initialize the store with some events.
ReadOnlyStore & StreamReadOnlyStore
Last but not least, we offer two read-only stores.
One for the DoctrineDbalStore
and one for the StreamDoctrineDbalStore
.
It passes all methods to the underlying store, but throws an StoreIsReadOnly
exception when trying to execute write
operations.
use Patchlevel\EventSourcing\Store\ReadOnlyStore;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\StreamReadOnlyStore;
use Patchlevel\EventSourcing\Store\StreamStore;
/** @var Store $store */
$readOnlyStore = new ReadOnlyStore($store);
/** @var StreamStore $store */
$readOnlyStore = new StreamReadOnlyStore($store);
Schema
With the help of the SchemaDirector
, the database structure can be created, updated and deleted.
Tip
You can also use doctrine migration to create and keep your schema in sync.
Doctrine Schema Director
The SchemaDirector
is responsible for creating, updating and deleting the database schema.
The DoctrineSchemaDirector
is a concrete implementation of the SchemaDirector
for doctrine dbal.
Additionally, it implements the DryRunSchemaDirector
interface, to show the sql statements that would be executed.
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Store\Store;
/**
* @var Connection $connection
* @var Store $store
*/
$schemaDirector = new DoctrineSchemaDirector(
$connection,
$store,
);
Note
How to setup cli commands for schema director can be found here.
Create schema
You can create the table from scratch using the create
method.
use Patchlevel\EventSourcing\Schema\SchemaDirector;
/** @var SchemaDirector $schemaDirector */
$schemaDirector->create();
Or can give you back which SQL statements would be necessary for this. Either for a dry run, or to define your own migrations.
use Patchlevel\EventSourcing\Schema\DryRunSchemaDirector;
/** @var DryRunSchemaDirector $schemaDirector */
$sql = $schemaDirector->dryRunCreate();
Update schema
The update method compares the current state in the database and how the table should be structured. As a result, the diff is executed to bring the table to the desired state.
use Patchlevel\EventSourcing\Schema\SchemaDirector;
/** @var SchemaDirector $schemaDirector */
$schemaDirector->update();
Or can give you back which SQL statements would be necessary for this.
use Patchlevel\EventSourcing\Schema\DryRunSchemaDirector;
/** @var DryRunSchemaDirector $schemaDirector */
$sql = $schemaDirector->dryRunUpdate();
Drop schema
You can also delete the table with the drop
method.
use Patchlevel\EventSourcing\Schema\SchemaDirector;
/** @var SchemaDirector $schemaDirector */
$schemaDirector->drop();
Or can give you back which SQL statements would be necessary for this.
use Patchlevel\EventSourcing\Schema\DryRunSchemaDirector;
/** @var DryRunSchemaDirector $schemaDirector */
$sql = $schemaDirector->dryRunDrop();
Doctrine Migrations
You can use doctrine migration,
which is known from doctrine orm,
to create your schema and keep it in sync.
We have added a DoctrineMigrationSchemaProvider
for doctrine migrations so that you just have to plug the whole thing
together.
use Doctrine\DBAL\Connection;
use Doctrine\Migrations\Configuration\Connection\ExistingConnection;
use Doctrine\Migrations\Configuration\Migration\ConfigurationLoader;
use Doctrine\Migrations\DependencyFactory;
use Doctrine\Migrations\Provider\SchemaProvider;
use Patchlevel\EventSourcing\Schema\DoctrineMigrationSchemaProvider;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Store\Store;
// event sourcing schema director configuration
/**
* @var Connection $connection
* @var Store $store
*/
$schemaDirector = new DoctrineSchemaDirector(
$connection,
$store,
);
$schemaProvider = new DoctrineMigrationSchemaProvider($schemaDirector);
// doctrine migration configuration
/** @var ConfigurationLoader $configLoader */
$dependencyFactory = DependencyFactory::fromConnection(
$configLoader,
new ExistingConnection($connection),
);
$dependencyFactory->setService(
SchemaProvider::class,
$schemaProvider,
);
Note
Here you can find more information on how to configure doctrine migration.
Note
How to setup cli commands for doctrine migration can be found here.
Usage
The store has a few methods to interact with the database.
Load
You can load all events from an aggregate with the load
method.
This method returns a Stream
object, which is a collection of events.
The load method also has a few parameters to filter, limit and sort the events.
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Store;
/** @var Store $store */
$stream = $store->load(
new Criteria(), // filter criteria
100, // limit
50, // offset
true, // latest first
);
Criteria
The Criteria
object is used to filter the events.
use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion;
use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
$criteria = new Criteria(
new AggregateNameCriterion('profile'),
new AggregateIdCriterion('e3e3e3e3-3e3e-3e3e-3e3e-3e3e3e3e3e3e'),
new FromPlayheadCriterion(2),
new FromIndexCriterion(100),
new ArchivedCriterion(true),
new EventsCriterion(['profile.created', 'profile.name_changed']),
);
Or you can the criteria builder to create the criteria.
use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder;
$criteria = (new CriteriaBuilder())
->aggregateName('profile')
->aggregateId('e3e3e3e3-3e3e-3e3e-3e3e-3e3e3e3e3e3e')
->fromPlayhead(2)
->fromIndex(100)
->archived(true)
->events(['profile.created', 'profile.name_changed'])
->build();
Stream
The load method returns a Stream
object and is a generator.
This means that the messages are only loaded when they are needed.
use Patchlevel\EventSourcing\Store\Stream;
/** @var Stream $stream */
$stream->index(); // get the index of the stream
$stream->position(); // get the current position of the stream
$stream->current(); // get the current event
$stream->next(); // move to the next event
$stream->end(); // check if the stream is at the end
foreach ($stream as $message) {
$message->event(); // get the event
}
Note
You can find more information about the Message
object here.
Warning
The stream cannot rewind, so you can only iterate over it once.
If you want to iterate over it again, you have to call the load
method again.
Count
You can count the number of events in the store with the count
method.
The count method also has the possibility to filter the events.
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Store;
/** @var Store $store */
$count = $store->count(
new Criteria(), // filter criteria
);
Save
You can save a message with the save
method.
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Store;
/**
* @var Store $store
* @var Message $message
* @var Message $message1
* @var Message $message2
* @var Message $message3
* @var list<Message> $messages
*/
$store->save($message);
$store->save($message1, $message2, $message3);
$store->save(...$messages);
Note
The saving happens in a transaction, so all messages are saved or none.
The store lock the table for writing during each save by default.
Tip
Use transactional method if you want call multiple save methods in a transaction.
Update
It is not possible to update events. In event sourcing, the events are immutable.
Remove
You can remove a stream with the remove
method.
use Patchlevel\EventSourcing\Store\StreamStore;
/** @var StreamStore $store */
$store->remove('profile-*');
Note
The method is only available in the StreamStore
like StreamDoctrineDbalStore
.
List Streams
You can list all streams with the streams
method.
use Patchlevel\EventSourcing\Store\StreamStore;
/** @var StreamStore $store */
$streams = $store->streams(); // ['profile-1', 'profile-2', 'profile-3']
Note
The method is only available in the StreamStore
like StreamDoctrineDbalStore
.
Transaction
There is also the possibility of executing a function in a transaction. The store takes care of starting a transaction, committing it and then possibly rollback it again.
use Patchlevel\EventSourcing\Store\Store;
/** @var Store $store */
$store->transactional(static function () use ($command, $bankAccountRepository): void {
$accountFrom = $bankAccountRepository->get($command->from());
$accountTo = $bankAccountRepository->get($command->to());
$accountFrom->transferMoney($command->to(), $command->amount());
$accountTo->receiveMoney($command->from(), $command->amount());
$bankAccountRepository->save($accountFrom);
$bankAccountRepository->save($accountTo);
});
Note
The store lock the table for writing during the transaction by default.
Tip
If you want save only one aggregate, so you don't have to use the transactional method. The save method in store/repository is already transactional.