CQRS, Event Sourcing streams

Build Command Query Responsibility Segregation (CQRS), Event Sourcing, and Event Logging streams out of pre-built stages, stand-alone functions, and your custom command, service, and event.

CQRS

CRQS with Event Sourcing example.

A remote system publishes commands to a Kafka topic. Kafka is language neutral, the publishing system can be written in another language, as long as it publishes messages subscribing systems understand.

A subscribing system polls the topic. Once deserialized, commands can be fanned out to parallel sub-streams, one sends them to your service, the other maps them to events and inserts them into an Event Log. Backpressure prevents overflow and ensures parallel sub-streams pull from upstream when both are ready. If the service fails the event isn’t logged and vice versa: example. An equivalent stream with service and Event Logging connected in series: example.

Kafka recommends Avro for serialization/deserialization. Two flavors are provided: one uses Avro4s which can usually do almost all of it for you, the other offers more control using your Avro schema with a UDF.

KafkaSink configures and wraps Kafka’s driver which asynchronously sends messages to your topic.

KafkaSource also configures and wraps the driver. It polls your topic and commits messages.

ConsumerRecordsDequeue pushes one message at a time downstream. It only pulls when all messages polled have been processed.

ConsumerRecordDeserializer maps Avro serialized messages to case classes and pushes them with Kafka’s ConsumerRecordMetadata.

Broadcast is a built-in Akka Streams fan out stage, it pushes one input to two outputs, one for command processing the other for Event Logging.

Command processing pushes only the value part of the key/value tuple to a service.

CassandraRetrySink executes commands with a UDF. It can handle different commands to insert, conditional update, or delete. They return a Java ResultSetFuture. The sink configures and wraps DataStax Cassandra Java driver and handles the Future.

The command executing service can be replaced with whatever you need. You create Sinks either by passing a function to the built-in Akka Streams Sink or use one from dendrites as a template to write a custom one.

Event Logging maps the key/command tuple to an event case class. Kafka’s message key is re-purposed as the event id, it can also be used later to find and compensate for duplicates.

CassandraBind creates a BoundStatement from the event case class and a PreparedStatement.

CassandraSink inserts the event into an Event Log.

Duplicates happen when some commands are processed and an error stops processing the rest. Re-polling will process all commands but the ones that succeeded are re-executed. Later on the query side, the ‘Q’ in CQRS, we can query for events that were logged more than once. From this, a compensating function can restore the correct state (example). After checking for and compensating for duplicates the system is ready for user queries. Try to design idempotent commands, where duplicates can be ignored.

Querying Cassandra clusters can be slow when it has many nodes. Querying can be sped up with composite keys, a Partition Key narrows queries by node, a Clustering Key narrows queries within nodes. A timestamp Clustering Key narrows search by time, useful when you’re only concerned with recent events.

The Query side of CQRS isn’t shown, for the example it’s just a Cassandra query of the command table.