Skip to content

Store

In the end, the messages have to be saved somewhere. Each message contains an event and the associated headers.

Note

More information about the message can be found here.

The store is optimized to efficiently store and load events for aggregates. We currently only offer one doctrine dbal store.

Create DBAL connection

The first thing we need for our store is a DBAL connection:

use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Tools\DsnParser;

$connection = DriverManager::getConnection(
    (new DsnParser())->parse('pdo-pgsql://user:secret@localhost/app'),
);

Note

You can find out more about how to create a connection here

Configure Store

You can create a store with the DoctrineDbalStore class. The store needs a dbal connection, an event serializer, an aggregate registry and some options.

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;

/** @var Connection $connection */
$store = new DoctrineDbalStore(
    $connection,
    DefaultEventSerializer::createFromPaths(['src/Event']),
    null,
    [/** options */],
);
Following options are available in DoctrineDbalStore:

Option Type Default Description
table_name string eventstore The name of the table in the database
aggregate_id_type "uuid" "string" uuid
locking bool true If the store should use locking for writing
lock_id int 133742 The id of the lock
lock_timeout int -1 The timeout of the lock. -1 means no timeout

Schema

The table structure of the DoctrineDbalStore looks like this:

Column Type Description
id bigint The index of the whole stream (autoincrement)
aggregate string The name of the aggregate
aggregate_id uuid/string The id of the aggregate
playhead int The current playhead of the aggregate
event string The name of the event
payload json The payload of the event
recorded_on datetime The date when the event was recorded
new_stream_start bool If the event is the first event of the aggregate
archived bool If the event is archived
custom_headers json Custom headers for the event

With the help of the SchemaDirector, the database structure can be created, updated and deleted.

Note

The default type of the aggregate_id column is uuid if the database supports it and string if not. You can change the type with the aggregate_id_type to string if you want use custom id.

Tip

You can also use doctrine migration to create and keep your schema in sync.

Schema Director

The SchemaDirector is responsible for creating, updating and deleting the database schema. The DoctrineSchemaDirector is a concrete implementation of the SchemaDirector for doctrine dbal. Additionally, it implements the DryRunSchemaDirector interface, to show the sql statements that would be executed.

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;

/**
 * @var Connection $connection
 * @var DoctrineDbalStore $store
 */
$schemaDirector = new DoctrineSchemaDirector(
    $connection,
    $store,
);

Note

How to setup cli commands for schema director can be found here.

Create schema

You can create the table from scratch using the create method.

use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;

/** @var DoctrineSchemaDirector $schemaDirector */
$schemaDirector->create();
Or can give you back which SQL statements would be necessary for this. Either for a dry run, or to define your own migrations.

use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;

/** @var DoctrineSchemaDirector $schemaDirector */
$sql = $schemaDirector->dryRunCreate();

Update schema

The update method compares the current state in the database and how the table should be structured. As a result, the diff is executed to bring the table to the desired state.

use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;

/** @var DoctrineSchemaDirector $schemaDirector */
$schemaDirector->update();
Or can give you back which SQL statements would be necessary for this.

use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;

/** @var DoctrineSchemaDirector $schemaDirector */
$sql = $schemaDirector->dryRunUpdate();

Drop schema

You can also delete the table with the drop method.

use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;

/** @var DoctrineSchemaDirector $schemaDirector */
$schemaDirector->drop();
Or can give you back which SQL statements would be necessary for this.

use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;

/** @var DoctrineSchemaDirector $schemaDirector */
$sql = $schemaDirector->dryRunDrop();

Doctrine Migrations

You can use doctrine migration, which is known from doctrine orm, to create your schema and keep it in sync. We have added a DoctrineMigrationSchemaProvider for doctrine migrations so that you just have to plug the whole thing together.

use Doctrine\DBAL\Connection;
use Doctrine\Migrations\Configuration\Connection\ExistingConnection;
use Doctrine\Migrations\Configuration\Migration\ConfigurationLoader;
use Doctrine\Migrations\DependencyFactory;
use Doctrine\Migrations\Provider\SchemaProvider;
use Patchlevel\EventSourcing\Schema\DoctrineMigrationSchemaProvider;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;

// event sourcing schema director configuration

/**
 * @var Connection $connection
 * @var DoctrineDbalStore $store
 */
$schemaDirector = new DoctrineSchemaDirector(
    $connection,
    $store,
);

$schemaProvider = new DoctrineMigrationSchemaProvider($schemaDirector);

// doctrine migration configuration

/** @var ConfigurationLoader $configLoader */
$dependencyFactory = DependencyFactory::fromConnection(
    $configLoader,
    new ExistingConnection($connection),
);

$dependencyFactory->setService(
    SchemaProvider::class,
    $schemaProvider,
);

Note

Here you can find more information on how to configure doctrine migration.

Note

How to setup cli commands for doctrine migration can be found here.

Usage

The store has a few methods to interact with the database.

Load

You can load all events from an aggregate with the load method. This method returns a Stream object, which is a collection of events.

use Patchlevel\EventSourcing\Store\Store;

/** @var Store $store */
$stream = $store->load();
The load method also has a few parameters to filter, limit and sort the events.

use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Store;

/** @var Store $store */
$stream = $store->load(
    new Criteria(), // filter criteria
    100, // limit
    50, // offset
    true,  // latest first
);

Criteria

The Criteria object is used to filter the events.

use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion;
use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;

$criteria = new Criteria(
    new AggregateNameCriterion('profile'),
    new AggregateIdCriterion('e3e3e3e3-3e3e-3e3e-3e3e-3e3e3e3e3e3e'),
    new FromPlayheadCriterion(2),
    new FromIndexCriterion(100),
    new ArchivedCriterion(true),
);
Or you can the criteria builder to create the criteria.

use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder;

$criteria = (new CriteriaBuilder())
    ->aggregateName('profile')
    ->aggregateId('e3e3e3e3-3e3e-3e3e-3e3e-3e3e3e3e3e3e')
    ->fromPlayhead(2)
    ->fromIndex(100)
    ->archived(true)
    ->build();

Stream

The load method returns a Stream object and is a generator. This means that the messages are only loaded when they are needed.

use Patchlevel\EventSourcing\Store\Stream;

/** @var Stream $stream */
$stream->index(); // get the index of the stream
$stream->position(); // get the current position of the stream
$stream->current(); // get the current event
$stream->next(); // move to the next event
$stream->end(); // check if the stream is at the end

foreach ($stream as $message) {
    $message->event(); // get the event
}

Note

You can find more information about the Message object here.

Warning

The stream cannot rewind, so you can only iterate over it once. If you want to iterate over it again, you have to call the load method again.

Count

You can count the number of events in the store with the count method.

use Patchlevel\EventSourcing\Store\Store;

/** @var Store $store */
$count = $store->count();
The count method also has the possibility to filter the events.

use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Store;

/** @var Store $store */
$count = $store->count(
    new Criteria(), // filter criteria
);

Save

You can save a message with the save method.

use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Store;

/**
 * @var Store $store
 * @var Message $message
 * @var Message $message1
 * @var Message $message2
 * @var Message $message3
 * @var list<Message> $messages
 */
$store->save($message);
$store->save($message1, $message2, $message3);
$store->save(...$messages);

Note

The saving happens in a transaction, so all messages are saved or none.
The store lock the table for writing during each save by default.

Tip

Use transactional method if you want call multiple save methods in a transaction.

Delete & Update

It is not possible to delete or update events. In event sourcing, the events are immutable.

Transaction

There is also the possibility of executing a function in a transaction. The store takes care of starting a transaction, committing it and then possibly rollback it again.

use Patchlevel\EventSourcing\Store\Store;

/** @var Store $store */
$store->transactional(static function () use ($command, $bankAccountRepository): void {
    $accountFrom = $bankAccountRepository->get($command->from());
    $accountTo = $bankAccountRepository->get($command->to());

    $accountFrom->transferMoney($command->to(), $command->amount());
    $accountTo->receiveMoney($command->from(), $command->amount());

    $bankAccountRepository->save($accountFrom);
    $bankAccountRepository->save($accountTo);
});

Note

The store lock the table for writing during the transaction by default.

Tip

If you want save only one aggregate, so you don't have to use the transactional method. The save method in store/repository is already transactional.

Learn more