Skip to content

Command Bus

The Command Bus is an optional component in the Event Sourcing library that coordinates the execution of commands. It allows commands to be forwarded to the appropriate aggregates and their handlers to be invoked. This promotes a clear separation of responsibilities and simplifies the management of business logic.

Command

First of all, you need to create a command class. A command is a simple data transfer object that represents an intention to perform an action.

final class CreateProfile
{
    public function __construct(
        public readonly ProfileId $id,
        public readonly string $name,
    ) {
    }
}

Handler

Then you need to create a handler class. A handler is a class that contains the business logic for a command. It will be invoked when a command is dispatched. You need to mark the method that handles the command with the #[Handle] attribute.

use Patchlevel\EventSourcing\Attribute\Handle;

final class CreateProfileHandler
{
    #[Handle]
    public function __invoke(CreateProfile $command): void
    {
        // handle command
    }
}

Note

To use Service Handler you need to register the handler in the ServiceHandlerProvider.

Tip

A class can have multiple handle methods.

Aggregate Handler

Another way to handle commands is to use the aggregates themselves. To do this, you need to mark the method that handles the command with the #[Handle] attribute.

Note

The aggregates themselves are of course not a service. The AggregateHandlerProvider uses the aggregates to create the handlers for you. You can find out more about this in the providers section.

Create Aggregate

If you want to create a new aggregate, you need to create a static method that returns a new instance of the aggregate.

use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Handle;
use Patchlevel\EventSourcing\Attribute\Id;

#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
    #[Id]
    private ProfileId $id;
    private string $name;

    #[Handle]
    public static function create(CreateProfile $command): self
    {
        $self = new self();
        $self->recordThat(new ProfileCreated($command->id, $command->name));

        return $self;
    }

    // ... apply methods
}

Tip

You can find more information about aggregates here.

Update Aggregate

If you want to update an existing aggregate, first you need to mark the aggregate id with the #[Id] attribute in the command class. Otherwise, the handler does not know which aggregates should be loaded.

use Patchlevel\EventSourcing\Attribute\Id;

final class ChangeProfileName
{
    public function __construct(
        #[Id]
        public readonly ProfileId $id,
        public readonly string $name,
    ) {
    }
}
Then you need to create a method that changes the aggregate state. Here too, you need to mark the method with the #[Handle] attribute.

use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Handle;
use Patchlevel\EventSourcing\Attribute\Id;

#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
    #[Id]
    private ProfileId $id;
    private string $name;

    #[Handle]
    public function changeName(ChangeProfileName $command): void
    {
        if (!$nameValidator($command->name)) {
            throw new InvalidArgument();
        }

        $this->recordThat(new NameChanged($command->name));
    }

    // ... apply methods
}

Inject Service

You can inject services into aggregate handler methods. Starting with the second parameter, it automatically tries to inject the service using a service locator. Standard, it uses the fully qualified class name from the parameter type hint to find the service.

use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Handle;
use Psr\Clock\ClockInterface;

#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
    #[Handle]
    public static function create(
        CreateProfile $command,
        ClockInterface $clock,
    ): self {
        $self = new self();

        $self->recordThat(new ProfileCreated(
            $command->id,
            $command->name,
            $clock->now(),
        ));

        return $self;
    }

    // ... apply methods
}

Note

The service must be registered in the service locator.

Tip

You can inject multiple services into the handler method.

Or you can inject the service manually using the #[Inject] attribute. There you can specify the service name that should be injected.

use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Handle;
use Patchlevel\EventSourcing\Attribute\Inject;

#[Aggregate('profile')]
final class Profile extends BasicAggregateRoot
{
    #[Handle]
    public static function create(
        CreateProfile $command,
        #[Inject('name_validator')]
        NameValidator $nameValidator,
    ): self {
        $self = new self();

        if (!$nameValidator($command->name)) {
            throw new InvalidArgument();
        }

        $self->recordThat(new ProfileCreated($command->id, $command->name));

        return $self;
    }

    // ... apply methods
}

Note

Injection in handler methods is only possible with the AggregateHandlerProvider.

Setup

We provide a SyncCommandBus that you can use to dispatch commands. You need to pass a HandlerProvider to the constructor.

use Patchlevel\EventSourcing\CommandBus\HandlerProvider;
use Patchlevel\EventSourcing\CommandBus\SyncCommandBus;

/** @var HandlerProvider $handlerProvider */
$commandBus = new SyncCommandBus($handlerProvider);

$commandBus->dispatch(new CreateProfile($profileId, 'name'));
$commandBus->dispatch(new ChangeProfileName($profileId, 'new name'));

Note

The SyncCommandBus is a synchronous command bus. But it ensures that a command has been completely handled before the next handler is executed.

Provider

There are different types of providers that you can use to register handlers.

Service Handler Provider

The classically way to handle commands is to use services. The ServiceHandlerProvider is used to handle commands by invoking methods on services.

use Patchlevel\EventSourcing\CommandBus\ServiceHandlerProvider;

$provider = new ServiceHandlerProvider([
    new CreateProfileHandler(),
    new ChangeProfileNameHandler(
        new NameValidator(),
    ),
]);

Aggregate Handler Provider

The AggregateHandlerProvider is used to handle commands by invoking methods on aggregates. The special thing about it is that the aggregates themselves are not services, but the handler provider automatically creates suitable handler services for the aggregates.

use Patchlevel\EventSourcing\CommandBus\AggregateHandlerProvider;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Repository\RepositoryManager;

/**
 * @var AggregateRootRegistry $aggregateRootRegistry
 * @var RepositoryManager $repositoryManager
 */
$provider = new AggregateHandlerProvider(
    $aggregateRootRegistry,
    $repositoryManager,
);

Service Locator

If you want service injection in aggregate handler methods, you need to pass a service locator to the AggregateHandlerProvider. You can use any psr-11 compatible container, or you can use our implementation ServiceLocator.

use Patchlevel\EventSourcing\CommandBus\AggregateHandlerProvider;
use Patchlevel\EventSourcing\CommandBus\ServiceLocator;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Repository\RepositoryManager;

/**
 * @var AggregateRootRegistry $aggregateRootRegistry
 * @var RepositoryManager $repositoryManager
 */
$provider = new AggregateHandlerProvider(
    $aggregateRootRegistry,
    $repositoryManager,
    new ServiceLocator([
        'name_validator' => new NameValidator(),
    ]), // or other psr-11 compatible container
);

Tip

You can find suitable implementations of psr-11 containers on packagist.

Chain Handler Provider

The ChainHandlerProvider allows you to combine multiple handler providers.

use Patchlevel\EventSourcing\CommandBus\ChainHandlerProvider;

$provider = new ChainHandlerProvider([
    $serviceHandlerProvider,
    $aggregateHandlerProvider,
]);

Learn more