Aggregate
The linchpin of event-sourcing is the aggregate. These aggregates can be imagined like entities in ORM. One main difference is that we don't save the current state, but only the individual events that led to the state. This means it is always possible to build the current state again from the events.
Note
The term aggregate itself comes from DDD and has nothing to do with event sourcing and can be used independently as a pattern. You can find out more about Aggregates here.
An aggregate must fulfill a few points so that we can use it in event-sourcing:
- It must implement the
AggregateRoot
interface. - It needs a unique identifier.
- It needs to provide the current playhead.
- It must make changes to his state available as events.
- And rebuild/catchup its state from the events.
We can implement this ourselves, or use the BasicAggregateRoot
implementation that already brings everything with it.
This basic implementation uses attributes to configure the aggregate and to specify how it should handle events.
We are building a minimal aggregate class here which only has an ID and mark this with the Id
attribute.
To make it easy to register with a name, we also add the Aggregate
attribute. This is what it looks like:
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Id;
#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
#[Id]
private Uuid $id;
public static function register(Uuid $id): self
{
$self = new self();
$self->id = $id; // we need to set the id temporary here for the basic example and will be replaced later.
return $self;
}
}
Warning
The aggregate is not yet finished and has only been built to the point that you can instantiate the object.
Tip
Find out more about aggregate IDs here.
We use a so-called named constructor here to create an object of the AggregateRoot.
The constructor itself is protected and cannot be called from outside.
But it is possible to define different named constructors for different use-cases like import
.
After the basic structure for an aggregate is in place, it could theoretically be saved:
use Patchlevel\EventSourcing\Repository\Repository;
final class CreateProfileHandler
{
public function __construct(
private readonly Repository $profileRepository,
) {
}
public function __invoke(CreateProfile $command): void
{
$profile = Profile::register($command->id());
$this->profileRepository->save($profile);
}
}
Warning
If you look in the database now, you would see that nothing has been saved. This is because only events are stored in the database and as long as no events exist, nothing happens.
Tip
A command bus system is not necessary, only recommended. The interaction can also easily take place in a controller or service.
Create a new aggregate
In order that an aggregate is actually saved, at least one event must exist in the DB.
For our aggregate we create the Event ProfileRegistered
with an ID and a name.
We also give the event a unique name using the Event
attribute.
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Event;
#[Event('profile.registered')]
final class ProfileRegistered
{
public function __construct(
public readonly Uuid $profileId,
public readonly string $name,
) {
}
}
Note
You can find out more about events here.
After we have defined the event, we have to adapt the profile aggregate:
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\Id;
#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
#[Id]
private Uuid $id;
private string $name;
public function name(): string
{
return $this->name;
}
public static function register(Uuid $id, string $name): self
{
$self = new self();
$self->recordThat(new ProfileRegistered($id, $name));
return $self;
}
#[Apply]
protected function applyProfileRegistered(ProfileRegistered $event): void
{
$this->id = $event->profileId;
$this->name = $event->name;
}
}
Tip
Prefixing the apply methods with "apply" improves readability.
In our named constructor register
we have now created the event and recorded it with the method recordThat
.
The aggregate remembers all new recorded events in order to save them later.
At the same time, a defined apply
method is executed directly so that we can change our state.
So that the AggregateRoot also knows which method it should call,
we have to mark it with the Apply
attribute. We did that in the applyProfileRegistered
method.
In there we then change the state of the aggregate by filling the properties with the values from the event.
Success
The aggregate is now ready to be saved!
Modify an aggregate
In order to change the state of the aggregates afterwards, only further events have to be defined.
As example we can add a NameChanged
event:
use Patchlevel\EventSourcing\Attribute\Event;
#[Event('profile.name_changed')]
final class NameChanged
{
public function __construct(
public readonly string $name,
) {
}
}
Note
Events should best be written in the past, as they describe a state that has happened.
After we have defined the event, we can define a new public method called changeName
to change the profile name.
This method then creates the event NameChanged
and records it:
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\Id;
#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
#[Id]
private Uuid $id;
private string $name;
public function name(): string
{
return $this->name;
}
public static function register(Uuid $id, string $name): static
{
$self = new static();
$self->recordThat(new ProfileRegistered($id, $name));
return $self;
}
public function changeName(string $name): void
{
$this->recordThat(new NameChanged($name));
}
#[Apply]
protected function applyProfileRegistered(ProfileRegistered $event): void
{
$this->id = $event->profileId;
$this->name = $event->name;
}
#[Apply]
protected function applyNameChanged(NameChanged $event): void
{
$this->name = $event->name;
}
}
apply
method named applyNameChanged
where we change the name depending on the value in the event.
When using it, it can look like this:
use Patchlevel\EventSourcing\Repository\Repository;
final class ChangeNameHandler
{
public function __construct(private Repository $profileRepository)
{
}
public function __invoke(ChangeName $command): void
{
$profile = $this->profileRepository->load($command->id());
$profile->changeName($command->name());
$this->profileRepository->save($profile);
}
}
Success
Our aggregate can now be changed and saved.
Note
You can read more about Repository here.
Here the aggregate is loaded from the repository
by fetching all events from the database.
These events are then executed again with the apply
methods in order to rebuild the current state.
All of this happens automatically in the load
method.
The method changeName
is then executed on the aggregate to change the name.
In this method the event NameChanged
is generated and recorded.
The applyNameChanged
method was also called again internally to adjust the state.
When the save
method is called on the repository,
all newly recorded events are then fetched and written to the database.
In this specific case only the NameChanged
changed event.
Multiple apply attributes on the same method
You can also define several apply attributes with different events using the same method.
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
// ...
#[Apply(ProfileCreated::class)]
#[Apply(NameChanged::class)]
protected function applyProfileCreated(ProfileCreated|NameChanged $event): void
{
if ($event instanceof ProfileCreated) {
$this->id = $event->profileId;
}
$this->name = $event->name;
}
}
Suppress missing apply methods
Sometimes you have events that do not change the state of the aggregate itself,
but are still recorded for the future or to subscribe for processor and projection.
So that you are not forced to write an apply method for it,
you can suppress the missing apply exceptions these events with the SuppressMissingApply
attribute.
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\SuppressMissingApply;
#[Aggregate('profile')]
#[SuppressMissingApply([NameChanged::class])]
final class Profile extends BasicAggregateRoot
{
// ...
#[Apply]
protected function applyProfileCreated(ProfileCreated $event): void
{
$this->id = $event->profileId;
$this->name = $event->name;
}
}
Suppress missing apply for all methods
You can also completely deactivate the exceptions for missing apply methods.
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\SuppressMissingApply;
#[Aggregate('profile')]
#[SuppressMissingApply(SuppressMissingApply::ALL)]
final class Profile extends BasicAggregateRoot
{
// ...
#[Apply]
protected function applyProfileCreated(ProfileCreated $event): void
{
$this->id = $event->profileId;
$this->name = $event->name;
}
}
Warning
When all events are suppressed, debugging becomes more difficult if you forget an apply method.
Stream Name
Warning
The stream name
works only with the StreamDoctrineDbalStore.
The stream name is the name of the stream in the event store.
By default, the stream name has the format aggregateName-aggregateId
.
But you can also define your own stream name with the Stream
attribute.
You can use the placeholder {id}
to insert the aggregate id into the stream name.
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Stream;
#[Aggregate('profile')]
#[Stream('profile-{id}')]
final class Profile extends BasicAggregateRoot
{
// ...
}
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
#[Aggregate('guest_list')]
#[Stream(Meeting::class)]
final class GuestList extends BasicAggregateRoot
{
// ...
}
Tip
You can find more about splitting aggregates here.
Business rules
Usually, aggregates have business rules that must be observed. Like there may not be more than 10 people in a group.
These rules must be checked before an event is recorded. As soon as an event was recorded, the described thing happened and cannot be undone.
A further check in the apply method is also not possible because these events have already happened and were then also saved in the database.
In the next example we want to make sure that the name is at least 3 characters long:
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
// ...
public function changeName(string $name): void
{
if (strlen($name) < 3) {
throw new NameIsToShortException($name);
}
$this->recordThat(new NameChanged($name));
}
#[Apply]
protected function applyNameChanged(NameChanged $event): void
{
$this->name = $event->name;
}
}
Danger
Validations during "apply" should not happen, they will break the rebuilding of the aggregate! Instead validate the data before the event will be recorded.
We have now ensured that this rule takes effect when a name is changed with the method changeName
.
But when we create a new profile this rule does not currently apply.
In order for this to work, we either have to duplicate the rule or outsource it. Here we show how we can do it all with a value object:
final class Name
{
public function __construct(private string $value)
{
if (strlen($value) < 3) {
throw new NameIsToShortException($value);
}
}
public function toString(): string
{
return $this->value;
}
}
Name
in our aggregate:
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\Id;
#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
#[Id]
private Uuid $id;
private Name $name;
public static function register(Uuid $id, Name $name): static
{
$self = new static();
$self->recordThat(new ProfileRegistered($id, $name));
return $self;
}
// ...
public function name(): Name
{
return $this->name;
}
public function changeName(Name $name): void
{
$this->recordThat(new NameChanged($name));
}
#[Apply]
protected function applyNameChanged(NameChanged $event): void
{
$this->name = $event->name;
}
}
NameChanged
event,
since we only expected a string before but now passed a Name
value object.
use Patchlevel\EventSourcing\Attribute\Event;
#[Event('profile.name_changed')]
final class NameChanged
{
public function __construct(
#[NameNormalizer]
public readonly Name $name,
) {
}
}
Warning
You need to create a normalizer for the Name
value object.
So the payload must be serializable and unserializable as json.
Note
You can find out more about normalizer here.
There are also cases where business rules have to be defined depending on the aggregate state.
Sometimes also from states, which were changed in the same method.
This is not a problem, as the apply
methods are always executed immediately.
In the next case we throw an exception if the hotel is already overbooked.
Besides that, we record another event FullyBooked
, if the hotel is fully booked with the last booking.
With this event we could notify external systems
or fill a projection with fully booked hotels.
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\SuppressMissingApply;
#[Aggregate('hotel')]
#[SuppressMissingApply([FullyBooked::class])]
final class Hotel extends BasicAggregateRoot
{
private const SIZE = 5;
private int $people;
// ...
public function book(string $name): void
{
if ($this->people === self::SIZE) {
throw new NoPlaceException($name);
}
$this->recordThat(new RoomBocked($name));
if ($this->people !== self::SIZE) {
return;
}
$this->recordThat(new FullyBooked());
}
#[Apply]
protected function applyRoomBocked(RoomBocked $event): void
{
$this->people++;
}
}
Working with dates
An aggregate should always be deterministic. In other words, whenever I execute methods on the aggregate, I always get the same result. This also makes testing much easier.
But that often doesn't seem to be possible, e.g. if you want to save a createAt date. But you can pass this information by yourself.
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Id;
#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
#[Id]
private Uuid $id;
private Name $name;
private DateTimeImmutable $registeredAt;
public static function register(Uuid $id, string $name, DateTimeImmutable $registeredAt): static
{
$self = new static();
$self->recordThat(new ProfileRegistered($id, $name, $registeredAt));
return $self;
}
// ...
}
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Id;
use Psr\Clock\ClockInterface;
#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
#[Id]
private Uuid $id;
private Name $name;
private DateTimeImmutable $registeredAt;
public static function register(Uuid $id, string $name, ClockInterface $clock): static
{
$self = new static();
$self->recordThat(new ProfileRegistered($id, $name, $clock->now()));
return $self;
}
// ...
}
SystemClock
to determine the current time.
Or for test purposes the FrozenClock
, which always returns the same time.
Note
You can find out more about clock here.
Splitting Aggregates
In some cases, it makes sense to split an aggregate into several smaller aggregates. This can be the case if the aggregate becomes too large or if the aggregate is used in different contexts. We currently support two patterns for this: Micro Aggregates and Child Aggregates (experimental).
Micro Aggregates
Warning
This feature works only with the StreamDoctrineDbalStore.
Micro Aggregates are a pattern to split an aggregate into several smaller aggregates. Each of these aggregates is saved in the same stream. This gives the Micro Aggregates the ability to independently manage their state and trigger their events, but still allows the associated Micro Aggregates to listen to the events in order to enforce their own business rules.
In the following example, we have an Order
micro aggregate and a Shipping
micro aggregate.
The order handle the order itself and the shipping handle the shipping of the order.
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\Id;
#[Aggregate('order')]
final class Order extends BasicAggregateRoot
{
#[Id]
private Uuid $id;
public static function create(Uuid $id): static
{
$self = new static();
$self->recordThat(new OrderCreated($id));
return $self;
}
#[Apply]
public function applyOrderCreated(OrderCreated $event): void
{
$this->id = $event->id;
}
}
Shipping
aggregate listens to the OrderCreated
event to initialize itself.
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\Id;
use Patchlevel\EventSourcing\Attribute\Stream;
#[Aggregate('shipping')]
#[Stream(Order::class)]
final class Shipping extends BasicChildAggregate
{
#[Id]
private Uuid $id;
private bool $arrived = false;
public function arrive(): void
{
$this->recordThat(new Arrived());
}
#[Apply]
public function applyOrderCreated(OrderCreated $event): void
{
$this->id = $event->id;
}
#[Apply]
public function applyArrived(Arrived $event): void
{
$this->arrived = true;
}
public function isArrived(): bool
{
return $this->arrived;
}
}
Child Aggregates
Experimental
This feature is still experimental and may change in the future. Use it with caution.
Another way to split an aggregate is to use child aggregates. The difference to Micro Aggregates, child aggregates can only be accessed by the root aggregate and are not separate aggregates.
In the following example, we have an Order
aggregate that has a Shipping
child aggregate.
use Patchlevel\EventSourcing\Aggregate\BasicChildAggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
final class Shipping extends BasicChildAggregate
{
private bool $arrived = false;
public function __construct(
private string $trackingId,
) {
}
public function arrive(): void
{
$this->recordThat(new Arrived());
}
#[Apply]
public function applyArrived(Arrived $event): void
{
$this->arrived = true;
}
public function isArrived(): bool
{
return $this->arrived;
}
}
Warning
The apply method must be public, otherwise the root aggregate cannot call it.
Note
Supress missing apply methods need to be defined in the root aggregate.
And the Order
aggregate root looks like this:
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\ChildAggregate;
use Patchlevel\EventSourcing\Attribute\Id;
#[Aggregate('order')]
final class Order extends BasicAggregateRoot
{
#[Id]
private Uuid $id;
#[ChildAggregate]
private Shipping $shipping;
public static function create(Uuid $id, string $trackingId): static
{
$self = new static();
$self->recordThat(new OrderCreated($id, $trackingId));
return $self;
}
#[Apply]
public function applyOrderCreated(OrderCreated $event): void
{
$this->shipping = new Shipping($event->trackingId);
}
public function arrive(): void
{
$this->shipping->arrive();
}
}
Aggregate Root Registry
The library needs to know about all aggregates so that the correct aggregate class is used to load from the database.
There is an AggregateRootRegistry
for this purpose. The registry is a simple hashmap between aggregate name and aggregate class.
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
$aggregateRegistry = new AggregateRootRegistry([
'profile' => Profile::class,
]);
AttributeAggregateRootRegistryFactory
is used.
There, with the help of paths, all classes with the attribute Aggregate
are searched for
and the AggregateRootRegistry
is built up.
use Patchlevel\EventSourcing\Metadata\Event\AttributeEventRegistryFactory;
$aggregateRegistry = (new AttributeEventRegistryFactory())->create([/* paths... */]);