Skip to content

Event Bus

This library uses the core principle called event bus.

For all events that are persisted (when the save method has been executed on the repository), the event wrapped in a message will be dispatched to the event bus. All listeners are then called for each event/message.

Message

A Message contains the event and related meta information such as the aggregate class and id. A message contains the following information:

  • aggregate class
  • aggregate id
  • playhead
  • event
  • recorded on
  • custom headers

Each event is packed into a message and dispatched using the event bus.

use Patchlevel\EventSourcing\Clock\SystemClock;
use Patchlevel\EventSourcing\EventBus\Message;

$clock = SystemClock();
$message = Message::create(new NameChanged('foo'))
    ->withAggregateClass(Profile::class)
    ->withAggregateId('bca7576c-536f-4428-b694-7b1f00c714b7')
    ->withPlayhead(2)
    ->withRecordedOn($clock->now());

$eventBus->dispatch($message);

Note

The message object is immutable.

You don't have to create the message yourself, it is automatically created, saved and dispatched in the repository.

Custom headers

You can also enrich your own header or metadata information. This information is then accessible in the message object and is also stored in the database.

use Patchlevel\EventSourcing\EventBus\Message;

$message = Message::create(new NameChanged('foo'))
    // ...
    ->withCustomHeader('application-id', 'app');

Note

You can read about how to pass additional headers to the message object in the message decorator docs.

You can also access your custom headers.

use Patchlevel\EventSourcing\EventBus\Message;

$message->customHeader('application-id'); // app
$message->customHeaders(); // ['application-id' => 'app']

Event Bus

Default event bus

The library also delivers a light-weight event bus for which you can register listeners/subscribers and dispatch events.

use Patchlevel\EventSourcing\EventBus\DefaultEventBus;

$eventBus = new DefaultEventBus();
$eventBus->addListener($mailListener);
$eventBus->addListener($projectionListener);

Note

You can determine the order in which the listeners are executed. For example, you can also add listeners after ProjectionListener to access the projections.

Symfony event bus

You can also use the symfony message bus which is much more powerful.

To use the optional symfony messenger you first have to install the packet.

composer require symfony/messenger

You can either let us build it with the create factory:

use Patchlevel\EventSourcing\EventBus\SymfonyEventBus;

$eventBus = SymfonyEventBus::create([
    $mailListener,
    $projectionListener
]);

Note

You can determine the order in which the listeners are executed. For example, you can also add listeners after ProjectionListener to access the projections.

Or plug it together by hand:

use Patchlevel\EventSourcing\EventBus\SymfonyEventBus;

$symfonyMessenger = //...

$eventBus = new SymfonyEventBus($symfonyMessenger);

Warning

You can't mix it with a command bus. You should create a new bus for it.

Note

An event bus can have zero or more listeners on an event. You should allow no handler in the HandleMessageMiddleware.

Listener

A listener must implement the Listener interface and define the __invoke method. This listener is then called for all saved events / messages.

use Patchlevel\EventSourcing\EventBus\Listener;
use Patchlevel\EventSourcing\EventBus\Message;

final class WelcomeListener implements Listener 
{
    public function __invoke(Message $message): void
    {
        if ($message->event() instanceof ProfileCreated) {
            echo 'Welcome!';
        }
    }
}

Warning

If you only want to listen to certain messages, then you have to check it in the __invoke method or use the subscriber.

Subscriber

A Subscriber is a listener, except that it has implemented the invoke method itself. Instead, you can define your own and multiple methods and listen for specific events with the attribute Handle.

use Patchlevel\EventSourcing\Attribute\Handle;
use Patchlevel\EventSourcing\EventBus\Listener;
use Patchlevel\EventSourcing\EventBus\Message;

final class WelcomeSubscriber extends Subscriber 
{
    #[Handle(ProfileCreated::class)]
    public function onProfileCreated(Message $message): void
    {
        echo 'Welcome!';
    }
}