Cassandra streaming components

Build distributed database clients with stream stages and stand-alone functions.

CassandraQuery CassandraPaging CassandraSink CassandraRetrySink CassandraBoundQuery CassandraMappedPaging CassandraConditional

Click image to open source code in a new tab. Hover over image for stage inputs and outputs

Cassandra query, sink, and conditional stages wrap Datastax’s Java Driver and call it asynchronously. They handle errors with recovery in-stage. Paging stages use backpressure to push a page at a time. Pre and and post stream functions initialize and clean up client operations.

Stand-alone functions

Functions for clusters, schemas, sessions, and logging.

val config = CassandraConfig
val addresses = config.getInetAddresses
...
val cluster = createCluster(addresses, retryPolicy, reConnectPolicy)
val lbp = createLoadBalancingPolicy(config.localDataCenter)
initLoadBalancingPolicy(cluster, lbp)
logMetadata(cluster)
registerQueryLogger(cluster)
val session = connect(cluster)
val schema = config.keySpace
val strategy = config.replicationStrategy
createSchema(session, schema, strategy, repCount)
...
val resultSet = executeBoundStmt(session, bndStmt)
...
val colNames: StringBuilder = getRowColumnNames(row)
val sessionInfo: String = sessionLogInfo(session)
...
dropSchema(session, schema)
close(session, cluster)

Setup client and connect

Before running streams, a cluster connection is configured, initialized and managed with provided functions. CassandraConfig is an optional trait you can extend. PreparedStatements are pre-parsed in the database.

createClusterSchemaSession(ShoppingCartConfig, 1)
CassandraShoppingCart.createTable(session, schema)
CassandraShoppingCartEvtLog.createTable(session, schema)
prepStmts = prepareStatements(session, schema)
createClusterSchemaSession is an example of grouping stand-alone functions with user defined settings

Query from a stream

CassandraBind is Akka Streams’ built-in map stage used to create a BoundStatement from a PreparedStatement, a case class, and a user defined function (UDF). Example insert, query bind UDFs.

CassandraQuery executes BoundStatements. The driver returns a ResultSetFuture (which extends Guava’s ListenableFuture). It’s mapped to a Scala Future. Success invokes an Akka AsyncCallback which pushes the ResultSet.

CassandraPaging pulls a ResultSet, having any number of Rows, and pushes a specified number of them.

A map stage is created with a UDF that maps Rows to case classes. ScalaCass helps do the mapping (example)

val partialBndQuery = bndQuery(Playlists.prepQuery(session, schema), _: UUID)
val query = new CassandraQuery(session)
val paging = new CassandraPaging(10)

val runnableGraph = source
  .map(partialBndQuery)
  .via(query).via(paging)
  .map(Playlists.mapRows)
  .to(sink)
Before running, DB parses PreparedStatement, it’s partially applied to bind function, later, UUID passed as stage input.

CassandraBoundQuery binds and queries in a single stage. There’s an additional check that upstream completion doesn’t cause this stage to complete before pushing its result.

CassandraMappedPaging pushes a specified number of case classes after mapping them from Rows.

val query = new CassandraBoundQuery[UUID](session, prepStmt, bndQuery, 1)
val paging = new CassandraMappedPaging[ShoppingCart](10, mapRows)
source.via(query).via(paging).runWith(sink)

Insert, delete from a stream

CassandraSink is for executing insert and delete.

val partialBndInsert = bndInsert(Playlists.prepInsert(session, schema), _: Playlist)
val sink = new CassandraSink(session)
source.map(partialBndInsert).runWith(sink)

Update from a stream

Updating any database risks updating dirty data (it changed since your last read). With eventually consistent databases there’s more risk. CassandraRetrySink uses Lightweight Transactions with conditional statements and optimistic locking. Your data must have a version number and your statement has a query to check it. If current, update is executed. If not, it uses the current version and retries (with exponential backoff). CassandraRetrySink accepts an A => ResultSetFuture function, where A is usually a case class. It can execute different statements depending on the content of A (example). This is slower than insert: try to design for insert over update.

val curriedCheckAndSetOwner = checkAndSetOwner(session, prepQueryStmt, prepStmt) _
val source = Source[SetOwner](iter)
val sink = new CassandraRetrySink[SetOwner](RetryConfig, curriedCheckAndSetOwner).withAttributes(dispatcher)
source.runWith(sink)
Curry function to query then update depending on query result before running, later, pass case class and call function.

Cassandra conditional flows

CassandraConditional is a Flow for conditional updates. They return a ResultSetFuture with just one Row, its first column is “applied” and is true on success. If it failed the Row is wrapped in an Option and pushed to the next stage. Use this if you want to handle the result.

val partialBndUpdateItems = bndUpdateItems(prepStmt, _: SetItems)
val curriedErrorHandler = getConditionalError(rowToString) _
val conditional = new CassandraConditional(session, curriedErrorHandler)
val runnableGraph = source.map(partialBndUpdateItems).via(conditional)...
Partially apply function with 1 argument list, curry function with 2 argument lists.

CassandraKeyValueFlow also executes conditional statements. On Success, the input key/case class value is pushed. On fail, after exhausting retries, the stage fails stopping the stream. Useful for downstream event logging. With a KafkaSource, ConsumerRecordMetadata can be used as the Key passing on Kafka metadata.

val curriedDoCmd = doShoppingCartCmd(session, prepStmts) _
val cmdFlowGraph = new CassandraKeyValueFlow[String, ShoppingCartCmd](RetryConfig, curriedDoCmd).withAttributes(dispatcher)
val cmdFlow = Flow.fromGraph(cmdFlowGraph)
val optInsEvtPrepStmt = prepStmts.get("InsertEvt")
val insEvtPrepStmt = optInsEvtPrepStmt match {
  case Some(x) => x
  case None => throw new NullPointerException("ShoppingCartEvt Insert preparedStatement not found")
}
val partialBndInsert = bndInsert(insEvtPrepStmt, _: ShoppingCartEvt)
val sink = new CassandraSink(session)
cmdFlow.map(cmdToEvt).map(partialBndInsert).to(sink)
Execute command, map input metadata & command case class to event case class, log event.

Example Configurations

Typesafe Config example, and optional, config settings for Cassandra are in src/main/resources/reference.conf. You can choose to use Typesafe Config and override these in your application’s src/main/resources/application.conf.

Other Cassandra Streaming products

Lightbend’s Alpakka module has a CassandraConnector, with a Source and Sink.