Motivation for organizing the business logic

Building a web app is pretty standardized these days. Every incoming request has its own controller, where it is processed, and a response is generated there. For the basic app, all the code is written inside the controller, and no further organization is necessary. As the app grows, the code dwelling inside the controller becomes more complex. It is time to use techniques that help reduce code redundancy and repetition, improving readability and maintenance.

Message bus image

Basic app

Basic apps use a simple app controller.

class MyController implements Controller {
    // ...

    public function process(Request $request) : Response {
        // process request

        return new Response(
            // ...
        );
    }
}

The first step of improving the code is to move some of the logic to private class helper functions. This way you can achieve a single responsibility for the functions, but you cannot avoid the code repetition. Also code sharing is not possible here.

class MyController implements Controller {
    // ...

    public function process(Request $request) : Response {
        // process request
        $this->processHelper();

        return new Response(
            // ...
        );
    }

    private function processHelper() {
        // ...
    }
}

Command query segregation

When we realize this is not enough, we probably want to move logic into a new layer. At this point, it would be wise to distinguish between the reading and writing logic from the storage point of view. The reading logic is often called Query, and the writing is Command. The Query serves the requested data to a caller, and the command is responsible for some action or change.

Command query segregation

// query controller
class QueryController implements Controller {
    
    public function __construct(
        private MyQuery $query
    ) {}

    public function process(Request $request) : Response {
        $result = $this->query->get(
            // populate from request
        );

        return new Response(
            // populate from query result
        );
    }
}

// command controller
class CommandController implements Controller {
    
    public function __construct(
        private MyCommand $command
    ) {}

    public function process(Request $request) : Response {
        try {
            $this->command->execute(
                // populate from request
            );
        } catch (\Throwable $e) {
            return new Response(
                // negative response
            );
        }

        return new Response(
            // pozitive result
        );
    }
}

Both layers can be mixed within the controller. Thus, you can call the command, then request some affected data using a query, and return it in the controller’s response. This approach offers the benefit of isolating the controller from database calls, API calls, mail service, SMS service, integration tools, etc.

Command query layering

We will not cover the Query part but dive deeper into the Command part. To better organize the Command part, we use a message bus.

Command to message bus

Basic idea behind the message bus

First, the message is created and populated with parameters. This message is then sent to a message bus. There is no return value, and you, as a client, don’t know what happens inside - it is the black box. When implementing the message bus, you choose whether it’s called synchronously or asynchronously. Synchronouse call means that the process begins immediately, and you wait for the process to end. Thus, you know whether the process ended with failure or with success. Still no return value. Asynchronous implementation can even run on different machine being fully isolated from the client. You just drop the message there and don’t wait for the result. In a classical web app, the synchronous way is a better choise, since you probably want to know if something went wrong and provide a message to the end user.

Message bus processing

What happens inside the message bus

Message bus has a set of message bus middleware objects. They are called upon every message in the order they are implemented. The message bus is responsible for finding the right message handler(s). There is usually at least one handler for each message. These handlers can produce another message(s), which is dropped again on the message bus. There could be a list of these messages in one moment on the bus, but it is invisible to the caller of the first message. He can only be sure the process won’t stop until the message bus is clean.

Message bus middleware

Message types

There are two common types of messages - commands and events. The command has exactly one handler. The event can have zero or more handlers. You can implement an event bus or a command bus. They can even be combined into a bus handling both events and commands. This decision is implemented in one of the middlewares.

Implementation of message bus

Fortunately, you don’t have to implement the message bus all by yourself. There exist packages to solve the problem for you. The one I have chosen, is SimpleBus written by Matthias Noback.

Useful message bus middleware

The package of SimpleBus contains some useful middlewares. One of them is FinishesHandlingMessageBeforeHandlingNext. It assures the message handler you are currently inside ends before any other message dropped on the message bus is served by another handler.

The second useful middleware is WrapsMessageHandlingInTransaction. It wraps the whole chain of message handlers’ execution into a database transaction.

The third middleware you will use depends on the strategy of the resolver you choose. If you decide on an event bus, you will need NotifiesMessageSubscribersMiddleware. If a command bus is what you need, you will need DelegatesToMessageHandlerMiddleware. If you go for a combined event and command bus solution, you will probably create your own middleware based on these two middlewares.

Besides these middlewares, there is one more called HandlesRecordedMessagesMiddleware. It is useful when there are side effects you don’t want to handle during the main process but rather run them later.

Strategy for putting things together

Most apps are strongly database-oriented, which means the main purpose of the app is to store data in a database. A typical executed command in such an app consists of three parts:

  • primary task
    Only the single main task operation.
  • secondary task
    All operations included in a transaction with the main task.
  • side effects
    All operations excluded from the main task transaction.

We create a command for the primary task to execute the main task command handler. The handler can create an event for the secondary task and a list of secondary task handlers for operation included in the transaction with the main task. There can be other events created and passed to the message bus in secondary task handlers.

For operations to be excluded from the main task transaction, we create an event and pass it to the message recorder for future processing.

Real world example

Here is an example. Our command AddAppointment will do all these operations:

  • create Appointment entity
    A new entity connected with Client, Manager, Office, and User entities.
  • create UserLog entity
    To track all users’ actions.
  • create Telemetry entity with name AppointmentAdded
    For statistics, predictions, and analytics of the app.
  • send an email to the client
    There is an email generated with the necessary appointment information.
  • send an SMS to client
    Also, SMS is sent to a client through the third-party API

We can split these operations into our three groups.

  • primary task
    • create an Appointment entity
  • secondary task
    • create a UserLog entity
    • create a Telemetry entity
  • side effects
    • send an email to client
    • send an SMS to the client

We create the initial message AddAppointment, which extends the base class Command. For this message, we create a message handler AddAppointmentHandler. Inside the handle function, the Appointment entity is created, and then an Event based message AppointmentAdded is populated and dropped on the message bus. Then we can create all listeners for an event AppointmentAdded. The same goes for message recorder listeners.

Message bus example

Here, in more detail, we create the AddAppointment command, derived from the Command class that extends the base Message class. It is an immutable class filled with parameters, serving data through getters.

// command AddAppointment
class AddAppointment extends Command {
    public function __construct(
        private int $clientId,
        private int $officeId,
        private int $managerId,
        private DateTime $date,
        private int $userId
    ) {}

    public function getClientId() : int {
        return $this->clientId;
    }

    // ... other getters ...
}

Then we create the AppointmentAdded event, derived from the Event class that extends the base Message class.

// event AppointmentAdded
class AppointmentAdded extends Event {
    public function __construct(
        private int $appointmentId
    ) {}

    public function getAppointmentId() : int {
        return $this->appointmentId;
    }
}

Here is an implementation of the AddAppointmentHandler class. It implements the MessageHandler interface. All the logic lies in the handle method. We retrieve all entities involved in a process using data from the AddAppointment command. We create an Appointment entity, populate it, and persist using entity manager. At the end, we create an AppointmentAdded event and pass it to the message bus.

// handler AddAppointmentHandler
class AddAppointmentHandler implements MessageHandler {
    public function __construct(
        private EntityManager $entityManager,
        private MessageBus $messageBus
    ) {}

    public function handle(Message $message) : void {
        $client = $this->entityManager
            ->getRepository(Client::class)
            ->find(
                $message->getClientId()
            );

        // ... get other entities $manager, $office, $user ...

        $appointment = (new Appointment())
            ->setClient($client)
            ->setDate($message->getDate())
            ->setManager($manager)
            ->setUser($user)
            ->setOffice($office);
        $this->entityManager->persist($appointment);
        $this->entityManager->flush();

        $this->messageBus->handle(
            new AppointmentAdded(
                $appointment->getId()
            )
        );
    }
}

All other message listeners implement the MessageSubscriber interface. Here is an example of WriteTelemetryWhenAppointmentAdded. Since it is an event listener, all the logic is in the notify method. First, the created Appointment entity is restored using the ID from the incoming message. Then, a Telemetry entity is created and persisted.

// listener WriteTelemetryWhenAppointmentAdded
class WriteTelemetryWhenAppointmentAdded implements MessageSubscriber {
    public function __construct(
        private EntityManager $entityManager
    ) {}

    public function notify(Message $message) : void {
        $appointment = $this->entityManager
            ->getRepository(Appointment::class)
            ->find(
                $message->getAppointmentId()
            );

        $telemetry = (new Telemetry())
            ->setName('AppointmentAdded')
            ->setTime(new DateTime())
            ->setData(
                json_encode([
                    'office' => $appointment->getOffice()->getName(),
                    'manager' => $appointment->getManager()->getName(),
                    'date' => $appointment->getDate()
                    // ... other telemetric data ...
                ])
            );
        $this->entityManager->persist($telemetry);
        $this->entityManager->flush();
    }
}

From the controller’s point of view, the logic is very clean. You only create a command AddAppointment, populate it with data from the request and pass it to the message bus.

// calling command within controller
class MyController implements Controller {
    public function __construct(
        private MessageBus $messageBus
    ) {}

    public function process(Request $request) : Response {
        // ... validation ...

        try {
            $this->messageBus->handle(
                new AddAppointment(
                    clientId: (int) $validData['clientId'],
                    officeId: (int) $validData['officeId'],
                    managerId: (int) $validData['managerId'],
                    userId: (int) $validData['userId'],
                    date: new DateTime($validData['date'])
                )
            );
        } catch (\Throwable $e) {
            return new Response(
                // ... unsuccesful response ...
            );
        }

        return new Response(
            // ... successful response ...
        );
    }
}

Now that we have all the pieces together, we can configure the MessageBus. It’s simply assembling objects, often done directly in the DI container. Additionally, the maps of commands and events are stored in the container.

The configuration example provided here is for an older version of the SimpleBus library and is specific to our application. It might vary slightly in each case. Similarly, the use of specific ORM libraries might differ. In this example, I combine two styles for handling commands and events.

// an example of basic configuration in older version of SimpleBus
// this configuration is usualy implemented inside a DI container
// and is specific for your app

// name resolver
$classBasedNameResolver = new ClassBasedNameResolver();

// create a container connector
$invokableServiceLocator = new InvokableServiceLocator(
    $serviceContainer
);

// create finishes first middleware
$finishesHandlingMessageBeforeHandlingNextMiddleware = 
    new FinishesHandlingMessageBeforeHandlingNext();

// create transaction middleware assuming we have an entity manager
$transactionMiddleware = new TransactionMiddleware(
    $entityManager
);

// create event subscriber resolver based on provided list of events
$eventListenerList = [
    'src\event\AppointmentAdded' => [
        'src\listener\WriteTelemetryWhenAppointmentAdded',
        'src\listener\AddUserLogWhenAppointmentAdded'
    ],
    // ...
];
$eventSubscriberCollection = new LazyLoadingMessageSubscriberCollection(
    $eventListenerList, 
    $invokableServiceLocator
);
$eventSubscriberResolver = new NameBasedMessageSubscriberResolver(
    $classBasedNameResolver, 
    $eventSubscriberCollection
);

// create command handler resolver based on provided list of commands
$commandHandlerList = [
    'src\event\AddAppointment' => 'src\handler\AddAppointmentHandler'
    // ...
];
$commandHandlerMap = new LazyLoadingMessageHandlerMap(
    $commandHandlerList, 
    $invokableServiceLocator
);
$commandHandlerResolver = new NameBasedMessageHandlerResolver(
    $classBasedNameResolver, 
    $commandHandlerMap
);

// join event and command resolver to one middleware
$delegatesToCommonHandlerMiddleware = new HandleCommonMessageMiddleware(
    $commandHandlerResolver, 
    $eventSubscriberResolver
);

// creating and configuring the command bus with its middleware
$messageBus = new MessageBusSupportingMiddleware();
$messageBus->appendMiddleware($finishesHandlingMessageBeforeHandlingNextMiddleware);
$messageBus->appendMiddleware($transactionMiddleware);
$messageBus->appendMiddleware($delegatesToCommonHandlerMiddleware);

Event listeners as command wrappers

Usually, there is a logic inside the event listener, however, sometimes the logic is common for more events. In this situation creating the new command and command handler can be useful. The original event listener then only prepares data for the new command and passes it to the message bus.

Beware of message handler order, which can differ from expected.

Message bus wrapper

// instead of implementing logic here
// we only pass new command to message bus
class AddLogWhenAppointmentAdded implements MessageSubscriber {
    public function __construct(
        private MessageBus $messageBus
    ) {}

    public function notify(Message $message) : void {
        $appointment = $this->entityManager
            ->getRepository(Appointment::class)
            ->find(
                $message->getAppointmentId()
            );

        $this->messageBus->handle(
            new AddLog(
                'Appointment added',
                (int) $appointment->getUser()->getId()
            )
        );
    }
}

Who finishes first

From the design of the message bus, it is obvious that you cannot be sure about the execution order of event listeners. Imagine this situation:

You have a set of event listeners connected to an event and want them to be executed in order as they are because the second listener depends on data properly set in the first event listener. However, the first listener is only a wrapper for another command execution. It produces the command, which is dropped on the message bus and the execution of the listener is ended. Messages are executed in order as they come, so the first listener hasn’t finished yet, but the second is already up next.

If you really want to rely on something to be already finished, you must use proper events and event listeners.

Message bus execution order

What is inside transaction

First, message execution opens a database transaction for the message bus, and the transaction finishes after the message bus is empty. To minimize the duration of the transaction, it’s crucial to consider what should be included in the database transaction and what should be deferred to the message recorder for future execution.

Primarily, only database modifications should be part of the transaction. However, it’s acceptable to include local file storage as well. Other executions should go to the message recorder, which doesn’t use a transaction.

Involving output buffer for crucial side effects

Since we often have to communicate with external sources through API, we might also like to include them in the transaction. We can, for example, integrate our app with other systems using a Rabbit message queue client.

If we exclude the integration call from the transaction, other systems may not be informed in case the queue is temporarily unavailable.

If we include it in the transaction, the whole transaction could be rolled back in such a case, which is pretty tough when everything else works fine.

The best solution could be to create an output buffer in the form of a database table, where we drop an integration message within the transaction and pass an event to the message recorder. Handling routine in the recorder will then send all integration messages from the buffer to the message queue client. If it fails, the next execution will probably succeed.

We can include this way processes like sending an SMS, writing a file into external storage, elastic search calls, etc. The key is to decide where data loss is crucial for the main process.

ORM and message bus

It might seem to you a lot of database reading is involved in the process. Every message handler should be independent of others, and only uses parameters passed within a message. You often retrieve entities at the beginning of the message handler, make a change, and create another message with an ID of entities. Fortunately, you often work with the same set of entities, and thanks to the ORM, they are cached in the identity map, so no database call is needed.

Provided that all message handlers should be independent of each other, all modifications to the database through ORM should be flushed at the end of the handler.

GUID for controller response

It is a good practice if your controller returns an ID of a newly created entity. The message bus concept can only process the command part but is not suitable for returning any value. The only information we get back is whether it has already ended, assuming we are running the message bus in a synchronous way. The way to return the ID is by creating the global ID before we actually call the message bus process. The ID is passed into the message bus as a command parameter.

class MyController implements Controller {
    public function __construct(
        private MessageBus $messageBus,
        private GlobalIdGenerator $globalIdGenerator
    ) {}

    public function process(Request $request) : Response {
        // ...

        $globalAppointmentId = $this->globalIDGenerator->generate();

        $this->messageBus->handle(
            new AddAppointment(
                globalId: (string) $globalAppointmentId,
                // ... other parameters ...
            )
        );

        // we can retrieve the whole entity here 

        return new Response([
            'appointmentId' => $globalAppointmentId
        ]);
    }
}

Summary

We have demonstrated the usage of the message bus technique for the organization of complex domain logic. There are various combinations of the message bus, message recorder, its middlewares, and message handlers. I like the idea of calling the process synchronously or asynchronously based on your needs.

Advantages

  • better organization
  • easily adding and removing logic
  • logic sharing
  • isolation of message handler
  • isolation of controller from database
  • easy transaction handling
  • choice of synchronous/asynchronous call

Disadvantages

  • tricky configuration
  • more work
  • harder to track - nonlinear process