Command Bus
The Command Bus is an optional component in the Event Sourcing library that coordinates the execution of commands. It allows commands to be forwarded to the appropriate aggregates and their handlers to be invoked. This promotes a clear separation of responsibilities and simplifies the management of business logic.
Command
First of all, you need to create a command class. A command is a simple data transfer object that represents an intention to perform an action.
final class CreateProfile
{
public function __construct(
public readonly ProfileId $id,
public readonly string $name,
) {
}
}
Handler
Then you need to create a handler class.
A handler is a class that contains the business logic for a command.
It will be invoked when a command is dispatched.
You need to mark the method that handles the command with the #[Handle]
attribute.
use Patchlevel\EventSourcing\Attribute\Handle;
final class CreateProfileHandler
{
#[Handle]
public function __invoke(CreateProfile $command): void
{
// handle command
}
}
Note
To use Service Handler you need to register the handler in the ServiceHandlerProvider
.
Tip
A class can have multiple handle methods.
Aggregate Handler
Another way to handle commands is to use the aggregates themselves.
To do this, you need to mark the method that handles the command with the #[Handle]
attribute.
Note
The aggregates themselves are of course not a service. The AggregateHandlerProvider uses the aggregates to create the handlers for you. You can find out more about this in the providers section.
Create Aggregate
If you want to create a new aggregate, you need to create a static method that returns a new instance of the aggregate.
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Handle;
use Patchlevel\EventSourcing\Attribute\Id;
#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
#[Id]
private ProfileId $id;
private string $name;
#[Handle]
public static function create(CreateProfile $command): self
{
$self = new self();
$self->recordThat(new ProfileCreated($command->id, $command->name));
return $self;
}
// ... apply methods
}
Tip
You can find more information about aggregates here.
Update Aggregate
If you want to update an existing aggregate,
first you need to mark the aggregate id
with the #[Id]
attribute in the command class.
Otherwise, the handler does not know which aggregates should be loaded.
use Patchlevel\EventSourcing\Attribute\Id;
final class ChangeProfileName
{
public function __construct(
#[Id]
public readonly ProfileId $id,
public readonly string $name,
) {
}
}
#[Handle]
attribute.
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Handle;
use Patchlevel\EventSourcing\Attribute\Id;
#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
#[Id]
private ProfileId $id;
private string $name;
#[Handle]
public function changeName(ChangeProfileName $command): void
{
if (!$nameValidator($command->name)) {
throw new InvalidArgument();
}
$this->recordThat(new NameChanged($command->name));
}
// ... apply methods
}
Inject Service
You can inject services into aggregate handler methods. Starting with the second parameter, it automatically tries to inject the service using a service locator. Standard, it uses the fully qualified class name from the parameter type hint to find the service.
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Handle;
use Psr\Clock\ClockInterface;
#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
#[Handle]
public static function create(
CreateProfile $command,
ClockInterface $clock,
): self {
$self = new self();
$self->recordThat(new ProfileCreated(
$command->id,
$command->name,
$clock->now(),
));
return $self;
}
// ... apply methods
}
Note
The service must be registered in the service locator.
Tip
You can inject multiple services into the handler method.
Or you can inject the service manually using the #[Inject]
attribute.
There you can specify the service name that should be injected.
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Handle;
use Patchlevel\EventSourcing\Attribute\Inject;
#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
#[Handle]
public static function create(
CreateProfile $command,
#[Inject('name_validator')]
NameValidator $nameValidator,
): self {
$self = new self();
if (!$nameValidator($command->name)) {
throw new InvalidArgument();
}
$self->recordThat(new ProfileCreated($command->id, $command->name));
return $self;
}
// ... apply methods
}
Note
Injection in handler methods is only possible with the AggregateHandlerProvider
.
Setup
We provide a SyncCommandBus
that you can use to dispatch commands.
You need to pass a HandlerProvider
to the constructor.
use Patchlevel\EventSourcing\CommandBus\HandlerProvider;
use Patchlevel\EventSourcing\CommandBus\SyncCommandBus;
/** @var HandlerProvider $handlerProvider */
$commandBus = new SyncCommandBus($handlerProvider);
$commandBus->dispatch(new CreateProfile($profileId, 'name'));
$commandBus->dispatch(new ChangeProfileName($profileId, 'new name'));
Note
The SyncCommandBus
is a synchronous command bus.
But it ensures that a command has been completely handled before the next handler is executed.
Provider
There are different types of providers that you can use to register handlers.
Service Handler Provider
The classically way to handle commands is to use services.
The ServiceHandlerProvider
is used to handle commands by invoking methods on services.
use Patchlevel\EventSourcing\CommandBus\ServiceHandlerProvider;
$provider = new ServiceHandlerProvider([
new CreateProfileHandler(),
new ChangeProfileNameHandler(
new NameValidator(),
),
]);
Aggregate Handler Provider
The AggregateHandlerProvider
is used to handle commands by invoking methods on aggregates.
The special thing about it is that the aggregates themselves are not services,
but the handler provider automatically creates suitable handler services for the aggregates.
use Patchlevel\EventSourcing\CommandBus\AggregateHandlerProvider;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Repository\RepositoryManager;
/**
* @var AggregateRootRegistry $aggregateRootRegistry
* @var RepositoryManager $repositoryManager
*/
$provider = new AggregateHandlerProvider(
$aggregateRootRegistry,
$repositoryManager,
);
Service Locator
If you want service injection in aggregate handler methods,
you need to pass a service locator to the AggregateHandlerProvider
.
You can use any psr-11 compatible container, or you can use our implementation ServiceLocator
.
use Patchlevel\EventSourcing\CommandBus\AggregateHandlerProvider;
use Patchlevel\EventSourcing\CommandBus\ServiceLocator;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Repository\RepositoryManager;
/**
* @var AggregateRootRegistry $aggregateRootRegistry
* @var RepositoryManager $repositoryManager
*/
$provider = new AggregateHandlerProvider(
$aggregateRootRegistry,
$repositoryManager,
new ServiceLocator([
'name_validator' => new NameValidator(),
]), // or other psr-11 compatible container
);
Tip
You can find suitable implementations of psr-11 containers on packagist.
Chain Handler Provider
The ChainHandlerProvider
allows you to combine multiple handler providers.
use Patchlevel\EventSourcing\CommandBus\ChainHandlerProvider;
$provider = new ChainHandlerProvider([
$serviceHandlerProvider,
$aggregateHandlerProvider,
]);