Store
In the end, the messages have to be saved somewhere. Each message contains an event and the associated headers.
Note
More information about the message can be found here.
The store is optimized to efficiently store and load events for aggregates. We currently only offer one doctrine dbal store.
Create DBAL connection
The first thing we need for our store is a DBAL connection:
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Tools\DsnParser;
$connection = DriverManager::getConnection(
(new DsnParser())->parse('pdo-pgsql://user:secret@localhost/app'),
);
Note
You can find out more about how to create a connection here
Configure Store
You can create a store with the DoctrineDbalStore
class.
The store needs a dbal connection, an event serializer, an aggregate registry and a table name.
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
/** @var Connection $connection */
$store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths(['src/Event']),
null,
['table_name' => 'eventstore'],
);
Schema
The table structure of the DoctrineDbalStore
looks like this:
Column | Type | Description |
---|---|---|
id | bigint | The index of the whole stream (autoincrement) |
aggregate | string | The name of the aggregate |
aggregate_id | uuid/string | The id of the aggregate |
playhead | int | The current playhead of the aggregate |
event | string | The name of the event |
payload | json | The payload of the event |
recorded_on | datetime | The date when the event was recorded |
new_stream_start | bool | If the event is the first event of the aggregate |
archived | bool | If the event is archived |
custom_headers | json | Custom headers for the event |
With the help of the SchemaDirector
, the database structure can be created, updated and deleted.
Note
The default type of the aggregate_id
column is uuid
if the database supports it and string
if not.
You can change the type with the aggregate_id_type
to string
if you want use custom id.
Tip
You can also use doctrine migration to create and keep your schema in sync.
Schema Director
The SchemaDirector
is responsible for creating, updating and deleting the database schema.
The DoctrineSchemaDirector
is a concrete implementation of the SchemaDirector
for doctrine dbal.
Additionally, it implements the DryRunSchemaDirector
interface, to show the sql statements that would be executed.
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
/**
* @var Connection $connection
* @var DoctrineDbalStore $store
*/
$schemaDirector = new DoctrineSchemaDirector(
$connection,
$store,
);
Note
How to setup cli commands for schema director can be found here.
Create schema
You can create the table from scratch using the create
method.
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
/** @var DoctrineSchemaDirector $schemaDirector */
$schemaDirector->create();
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
/** @var DoctrineSchemaDirector $schemaDirector */
$sql = $schemaDirector->dryRunCreate();
Update schema
The update method compares the current state in the database and how the table should be structured. As a result, the diff is executed to bring the table to the desired state.
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
/** @var DoctrineSchemaDirector $schemaDirector */
$schemaDirector->update();
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
/** @var DoctrineSchemaDirector $schemaDirector */
$sql = $schemaDirector->dryRunUpdate();
Drop schema
You can also delete the table with the drop
method.
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
/** @var DoctrineSchemaDirector $schemaDirector */
$schemaDirector->drop();
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
/** @var DoctrineSchemaDirector $schemaDirector */
$sql = $schemaDirector->dryRunDrop();
Doctrine Migrations
You can use doctrine migration,
which is known from doctrine orm,
to create your schema and keep it in sync.
We have added a DoctrineMigrationSchemaProvider
for doctrine migrations so that you just have to plug the whole thing
together.
use Doctrine\DBAL\Connection;
use Doctrine\Migrations\Configuration\Connection\ExistingConnection;
use Doctrine\Migrations\Configuration\Migration\ConfigurationLoader;
use Doctrine\Migrations\DependencyFactory;
use Doctrine\Migrations\Provider\SchemaProvider;
use Patchlevel\EventSourcing\Schema\DoctrineMigrationSchemaProvider;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
// event sourcing schema director configuration
/**
* @var Connection $connection
* @var DoctrineDbalStore $store
*/
$schemaDirector = new DoctrineSchemaDirector(
$connection,
$store,
);
$schemaProvider = new DoctrineMigrationSchemaProvider($schemaDirector);
// doctrine migration configuration
/** @var ConfigurationLoader $configLoader */
$dependencyFactory = DependencyFactory::fromConnection(
$configLoader,
new ExistingConnection($connection),
);
$dependencyFactory->setService(
SchemaProvider::class,
$schemaProvider,
);
Note
Here you can find more information on how to configure doctrine migration.
Note
How to setup cli commands for doctrine migration can be found here.
Usage
The store has a few methods to interact with the database.
Load
You can load all events from an aggregate with the load
method.
This method returns a Stream
object, which is a collection of events.
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Store;
/** @var Store $store */
$stream = $store->load(
new Criteria(), // filter criteria
100, // limit
50, // offset
true, // latest first
);
Criteria
The Criteria
object is used to filter the events.
use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion;
use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
$criteria = new Criteria(
new AggregateNameCriterion('profile'),
new AggregateIdCriterion('e3e3e3e3-3e3e-3e3e-3e3e-3e3e3e3e3e3e'),
new FromPlayheadCriterion(2),
new FromIndexCriterion(100),
new ArchivedCriterion(true),
);
use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder;
$criteria = (new CriteriaBuilder())
->aggregateName('profile')
->aggregateId('e3e3e3e3-3e3e-3e3e-3e3e-3e3e3e3e3e3e')
->fromPlayhead(2)
->fromIndex(100)
->archived(true)
->build();
Stream
The load method returns a Stream
object and is a generator.
This means that the messages are only loaded when they are needed.
use Patchlevel\EventSourcing\Store\Stream;
/** @var Stream $stream */
$stream->index(); // get the index of the stream
$stream->position(); // get the current position of the stream
$stream->current(); // get the current event
$stream->next(); // move to the next event
$stream->end(); // check if the stream is at the end
foreach ($stream as $message) {
$message->event(); // get the event
}
Note
You can find more information about the Message
object here.
Warning
The stream cannot rewind, so you can only iterate over it once.
If you want to iterate over it again, you have to call the load
method again.
Count
You can count the number of events in the store with the count
method.
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Store;
/** @var Store $store */
$count = $store->count(
new Criteria(), // filter criteria
);
Save
You can save a message with the save
method.
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Store;
/**
* @var Store $store
* @var Message $message
* @var Message $message1
* @var Message $message2
* @var Message $message3
* @var list<Message> $messages
*/
$store->save($message);
$store->save($message1, $message2, $message3);
$store->save(...$messages);
Note
The saving happens in a transaction, so all messages are saved or none.
Delete & Update
It is not possible to delete or update events. In event sourcing, the events are immutable.
Transaction
There is also the possibility of executing a function in a transaction. Then dbal takes care of starting a transaction, committing it and then possibly rollback it again.
use Patchlevel\EventSourcing\Store\Store;
/** @var Store $store */
$store->transactional(static function () use ($command, $bankAccountRepository): void {
$accountFrom = $bankAccountRepository->get($command->from());
$accountTo = $bankAccountRepository->get($command->to());
$accountFrom->transferMoney($command->to(), $command->amount());
$accountTo->receiveMoney($command->from(), $command->amount());
$bankAccountRepository->save($accountFrom);
$bankAccountRepository->save($accountTo);
});