Eventual Consistency via Kafka Transactional Outbox

Our systems today are typically distributed, and sometimes integrated via an event bus such as Kafka. We store data in a database and publish events to inform other systems of changes. For example, the system that stores a Thing is eventually consistent with the other systems that consume the ThingCreated event. This means that at some point the other systems will be in the state that they should reach when they find out about the new Thing. When systems fail to achieve this level of consistency, it often requires significant time for analysis, troubleshooting and consistency restoration. We would like to save ourselves this time and instead develop correct systems.

HTTP command processing / dual write problem

A typical use case is that a user sends a command (e.g. via HTTP) to a service, which, after validating the command, creates a persistent entity and publishes a related event to notify other services - also known as “dual write problem”. This is visualized in the graphic below.

http command use case

— command / entity persistence / event publication —

Now the question is how we can build this so that we can really achieve eventual consistency, guaranteed, and not only on the happy path. So we want to make sure that both the Thing is saved and the ThingCreated event is published. All or nothing, completely or not at all - also known as atomicity from ACID. I.e. once the Thing has been saved, other systems must receive a ThingCreated event at some point.

A trivial implementation could look like this:

fun createThing(someThing: Thing) {
  repository.save(someThing)
  producer.publish(thingCreated(someThing))
}

Wait, hmm… - an alternative implementation could be:

fun createThing(someThing: Thing) {
  producer.publish(thingCreated(someThing))
  repository.save(someThing)
}

Both implementations do not guarantee eventual consistency: in the first case the ThingCreated event might never be published (because the process could crash before the producer could deliver the event), and in the second case the Thing might not be saved, although the ThingCreated event was published to Kafka. I.e. a failure in one of these operations would result in inconsistent data.

Examples for failure could be:

  • a timeout when writing to the database
    • e.g. because the database is overloaded
    • or because it takes too long to acquire a connection from the connection pool
  • a serialization error occurs when serializing the Kafka message
  • the service is deployed/restarted before the message was sent to Kafka
    • note, that related default KafkaProducer timeouts are in the range of 30 seconds to 2 minutes (e.g. request.timeout.ms, delivery.timeout.ms, max.block.ms) - so by default it is considered “okayish” to wait up to two minutes for the message to be delivered
    • the KafkaProducer.send() method is asynchronous, i.e. without blocking via producer.send(message).get(1, MINUTE) you wouldn’t even know it failed
  • there could be a peak in activity at the moment, meaning that there are a lot of messages to be processed (queued already), so that sending the message runs into a timeout
  • the network could be slow for a moment

Usually none of these cases should result in inconsistencies – depending on the business needs of course.

Solution concept: transactional outbox

A solution to the dual write problem is known as the transactional outbox pattern:

  • the event that shall be published is stored in an outbox table in the same transaction as the business entity. If the thing table update fails or the outbox table update fails, the entire transaction is rolled back.
  • a separate process continuously processes the outbox table and publishes new events to the event bus - this is done asynchronously, decoupled from the transactions writing to the outbox table. I.e. user activity / performance is not affected if there are currently a bunch of messages to be delivered and if message delivery might be a bit slower at the moment. The outbox processor guarantees, that the event is published, so that the downstream services eventually receive the event.
http command use case

— transactional outbox —

Solution implementation & usage

There’s an implementation of this pattern provided by Tomorrow Bank, for services running Spring (Boot), supporting both sync/blocking and async/non-blocking applications: github - transactional-outbox (for the sake of transparency: I created this implementation when we worked for Tomorrow, and as of today I’m still one of the maintainers along with some former Tomorrow colleagues).

The usage in the application looks like this:

@Autowired
private val outboxService: OutboxService

@Transactional
fun createThing(someThing: Thing) {
  repository.save(someThing)
  outboxService.saveForPublishing("thing_topic", someThing.id, ThingCreated(someThing))
}

Apart from adding the library dependency, you must set up/configure the OutboxProcessor, which is responsible for - guess what - continuously processing the outbox table. How to set this up is described in the project README. The OutboxProcessor guarantees an “at-least-once” semantics, i.e. each message/event is published at least once (the library adds a header x-sequence which could be used by consumers for deduplication, if deduplication based on the event details is not possible).

When (not) to use this pattern

  • As said before, the typical scenario where this pattern can be used is for command processing, where both database state shall be modified and one or more events shall be published to Kafka.
  • Another scenario could be the same as before, just for Kafka message/event processing: assuming an at-least-once semantics, we need to deduplicate the consumed messages, which is best done based on the database state. This should be done in the same transaction as the database state would be modified (and a resulting event written to the outbox table).
  • A scenario where the transactional outbox is not needed is for “read-process-write” cycles from Kafka topic A to Kafka topic B, where no database state needs to be modified atomically with the produced events. Quite obvious.
  • When database state is modified and events shall be published while an at-most-once semantics is sufficient, the transactional outbox is also not needed. An example could be events that should be published for a tracking system, when numbers don’t need to be 100% accurate. Or, from another perspective, if (eventual) consistency between systems is not required.

There are alternative solutions, e.g. Debezium or Kafka Connect. The main difference is that they run as separate processes and therefore require additional operational efforts. I.e. they’d have to be operated in a highly available fashion, they’d have to be monitored, and operational ownership would have to be clarified. The idea behind the “transactional-outbox” project is that a team can guarantee eventual consistency for a service with as little effort as possible and without additional external dependencies.

Kommentare