..

Transactional Outbox Pattern in a monolithic application

Typically I can see the Transactional Outbox Pattern mentioned in the context of microservices and, to be honest, it’s a bit surprising to me - to my mind it is as relevant in the context of monolithic applications.

In this article, I’ll describe the pattern in detail and then provide an implementation example with Kotlin, Spring and PostgreSQL.

What is it for?

The transactional outbox pattern describes how to achieve reliable communication. The use case is: you’d like to atomically make an update to the database and invoke another action, typically on an external system. This action could be:

  • calling an external API, e.g. by making an HTTP request;

    Example scenario: a user creates an account in our application, so we’d like to save the account in the database and send a welcome e-mail using AWS SES.

  • publishing a message to a queue;

    Example scenario: an order is placed in our shopping app, so we’d like to save it to the database and enqueue an OrderPlaced event in AWS SQS.

  • in Domain Driven Design - updating another aggregate. This is an action invoked on the same system as the original database transaction. It would mean you’d like to atomically run two separate database transactions;

    Example scenario: an order is placed in our shopping app, so we’d like to save the order and an invoice for it - order and invoice being two separate aggregates.


Side note

To my mind, the DDD example translates nicely into “calling an external API” or “publishing a message to a queue” when talking about microservices. Microservices may often represent specific bounded contexts and it is very likely that there would be a need to make an update in multiple contexts atomically. The transactional outbox pattern may prove to be very useful to make the communication between them reliable


Let’s first take a look at the problem that this pattern lets us solve.

The problem with transactional consistency

Let’s take a look at the “welcome email” example. When a user creates an account in our app, we’d like to reliably save it to the database and sent a welcome email using AWS SES. We could have the following piece of code in our application:

databaseTransaction {
    persist(account) // call to the database
}
sendWelcomeEmailFor(account) // call to SES

Now, what if the call to SES fails? We’ll be left with an account in our database, but the user will never get the welcome email.

One might say that with SES having an uptime of at least 99.9% we can be virtually sure these two instructions will run just fine. But the failure may also be caused by us. We could e.g. configure our SES client with wrong credentials.

What if we can’t or simply don’t want to lose our messages?

We can’t do the following:

databaseTransaction {
    persist(account) // call to the database
    sendWelcomeEmailFor(account) // call to SES
}

In case of a network issue we may not be able to reach SES. We’ll rollback the databaseTransaction, so the account will not be saved. At the same time it means our users won’t be able to create an account if SES is unreachable, which may not be perfect from the business perspective.

Another possible network issue may cause the response from SES not come back to us even though the call succeeds behind the scenes. We’ll rollback the databaseTransaction, but the welcome email will be sent even though the related account is not in our database. This kind of inconsistency may not be desired either.

Another possibility is that sending the email succeeds, but the commit of databaseTransaction fails for some reason. We’ll end up with the same inconsistency as with the “lost response” scenario.

Why not just use two-phase-commit?

Apart from the possibility that we may not be able (or not easily able) to use 2PC with the tools we chose, we need to take into account that 2PC is a synchronous protocol. It means we need to wait until all the participants of the transaction completed their actions and are ready to commit.

In our “welcome email” example, imagine a user needs to wait until the email is sent to receive a successful response. Or worse - as already described - the user is unable to create the account because of SES being down. Of course it depends on what the business needs, but I suspect that in general it’d be fine from the business perspective to let the user create an account even though the email service is down, and send the email as soon as it is up and running again.

I recommend the following article on this subject: Starbucks Does Not Use Two-Phase Commit.

To sum up - 2PC makes the communication between services synchronous, effectively reducing the availability of our service because we’re depending on the availability of all the participants of the transaction. We won’t be able to complete a transaction if a particular participant is down. What’s more, our performance may suffer due to a participant taking a long time to finish its work.

The pattern

The idea is simple. In our database we create an additional table called e.g. outbox_message. We can then change our code to the following:

databaseTransaction {
    persist(account) // 'insert into account ...'
    persist(accountCreatedEvent) // 'insert into outbox_message ...'
}

We should be now able to respond to the user that the account has been successfully created. We still need to send the welcome email though.

Now, there are two strategies for processing the messages from the outbox_message table:

  • polling the ‘outbox_message’ table:

    It means setting up e.g. a CRON job or a timer in your application to read and process the messages.

  • transaction log tailing:

    It means monitoring the database’s transaction log file to see what gets inserted into the outbox_message table and reacting to the inserts.

I’ll describe only the polling mechanism a.k.a. ‘the polling publisher’. Two reasons for it:

  • to my mind it is much simpler to implement;
  • I have not had a chance to implement the second one yet.

If you feel like reading up on it, you can refer to Transaction Log Tailing @ microservices.io - I believe it is a good start.

Polling publisher

In our application, somewhere in the background, we need to read the message from the outbox_message table, send the welcome email and then mark the message as processed so that it does not get processed again. We’ll basically implement a simple queue in the database.

We may set up a CRON job or a timer in our application to do the polling.

each n milliseconds {
    databaseTransaction {
        message = readLastOutboxMessage()
        process(message) // send email using AWS SES
        markAsProcessed(message)
    }
}

At first it may look as if we were back to the original problem because we’re trying to make a call to an external service in the middle of a database transaction. It is, however, not true. If we analyze what happens when any of the steps fail, we’ll see that the message will be delivered eventually. There are, however, two specific scenarios:

  1. sending the email succeeds behind the scenes but we don’t receive the response (e.g. due to network issue);
  2. commit of the transaction fails;

In both scenarios, the email will be sent, but we won’t mark the message as processed. It means the message will be processed one more time and, as a result, the email will be sent more than once. The transactional outbox pattern guarantees at-least-once delivery. We either have to be alright with the external service being invoked more than once or the service needs to handle duplicate messages. The latter is, however, a topic for a separate article.

Implementation example

I’ll create a simple web application to simulate the “welcome email” scenario.

I’ll use Kotlin with Spring to model the application and PostgreSQL as the database. I believe the code samples can be easily translated to other languages and frameworks.

I chose PostgreSQL since I’m using it for a side project where I’ve already implemented this pattern and it has proved to be working just fine, so the code listed below is battle-tested. Besides, I’ll use a SKIP LOCKED feature which is specific to PostgreSQL ≥9.5. It is used to lock rows for editing in order to build a concurrent queue. A similar feature should be available at least in Oracle Database and SQL Server1.

You can see the complete working implementation @ https://github.com/maciejtoporowicz/transactional-outbox-example

We’ll need two tables in our database:

account { id, name, email }
outbox_message { id, type, json_content }

View source code @ GitHub

When an account is created, in our business logic we need to create two objects: one for representing the account and another one for representing events related to the account creation. We can then ask our accountRepo to save both objects at once.

class AccountModule(private val accountRepo: AccountRepo) {
    fun createNewAccount(name: String, email: String) {
        val (account, relatedEvents) = Account.newAccount(name, email)

        accountRepo.save(account, relatedEvents)
    }
}

View source code @ GitHub

Here’s the implementation of the AccountRepo::save method with some context:

class AccountRepoAdapter(
        private val accountRepo: SpringAccountRepo,
        private val outboxMessageRepo: SpringOutboxMessageRepo,
        private val jsonMapper: JsonMapper) : AccountRepo {

    @Transactional
    override fun save(account: Account, relatedEvents: List<AccountEvent>) {
        accountRepo.save(asEntity(account))
        outboxMessageRepo.saveAll(
                relatedEvents.stream()
                        .map {
                            OutboxMessage(
                                    eventType = it.javaClass.canonicalName,
                                    jsonContent = jsonMapper.toJson(it)
                            )
                        }
                        .toList()
        )

View source code @ GitHub

This is an adapter class between the business logic contained in AccountModule and the infrastructure-related classes - SpringAccountRepo and SpringOutboxMessageRepo which do the actual inserts to the database.

As you can see, I’m using the canonical name of the class as the OutboxMessage#type.

With the messages in the database, we can proceed to the polling mechanism. We’ve got a OutboxMessagePollerTimer class with a tick() method running at constant intervals:

@Scheduled(fixedRateString = "\${outboxMessages.polling.interval}")
fun tick() {
    log.info("Polling for outbox messages")
    taskExecutor.execute(outboxMessagePollerFactory.newOutgoingEventPoller())
}

View source code @ GitHub

The polling interval should be fine tuned to match the requirements and capabilities of the application.

Each time tick() is invoked, a new task is inserted into a thread pool to poll the outbox_message table and handle pending outbox messages. Here’s the implementation of the task:

(...)

transaction.begin()

val message = deleteFirstMessage(entityManager)

if (message == null) {
    transaction.rollback()
    return
}

try {
    val messageContent = extractMessageContentFrom(message)
    applicationEventPublisher.publishEvent(messageContent)
    transaction.commit()
}

(...)

View source code @ GitHub

The published event gets picked up by a WelcomeEmailTrigger:

class WelcomeEmailTrigger {
    private val log = LoggerFactory.getLogger(WelcomeEmailTrigger::class.java)

    @EventListener
    fun onAccountCreated(event: AccountCreatedEvent) {
        log.info("Received event: $event")
    }
}

View source code @ GitHub

applicationEventPublisher.publishEvent(...) is synchronous, so it’ll return only if all event handlers finish processing the published event. We should be safe to commit the transaction afterwards.

Key takeways

  • Transactioncal consistency reduces the availability of your system.
  • Do you need transactional consistency?
  • Eventual consistency does not mean that your system will be slow. It may often be a matter of milliseconds for it to become consistent.

Further reading

Sources

  1. https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/