HTTP, JSON streaming components

Non-blocking pre-built HTTP client stages and JSON mapping to case classes. Streams can be both clients and servers.

TypedQueryFlow TypedResponseFlow

TypedQueryResponseFlow

Click image to open source code in a new tab. Hover over image for stage inputs and outputs

GET request/response with JSON/case class mapping

TypedQueryFlow takes advantage of the natural (but unappreciated!) fit of currying and streaming. It’s initialized with a base URL, base path, and mapping function. These don’t change over the stream’s lifetime. They are the first parameter list of typedQuery, it becomes a curried function. Mapping turns a case class into a fully constructed GET request. caseClassToGetQuery maps case classes with basic argument types, a custom mapping function is needed for complex types.

When TypedQueryFlow pulls a case class, it’s passed to typeQuery’s second argument list. Case class fields are mapped to a complete request string. An HTTPRequest is sent to the server, returning an HttpResponse Future.

mapAsync is one of Akka Streams’ built in stages, it calls the server, handles the Future’s completion and pushes the HTTPResponse.

TypedResponseFlow also marries currying to streaming. HTTP can throw exceptions or return an error messages. Scala’s Either fits here: Left for errors, Right for JSON values.

typedResponse, first argument list takes custom functions mapping the response, if the HttpResponse’s content type is ‘json’, mapRight unmarshalls it to a case class, if its content type is ‘plain’, or there was an error, mapLeft extracts the error message.

BalancesProtocols shows JSON marshalling and unmarshalling with case classes. A UDF maps errors to strings and good results to case classes.

def mapPlain(entity: HttpEntity): Future[Left[String, Nothing]] = {
  Unmarshal(entity).to[String].map(Left(_))
}

def mapChecking(entity: HttpEntity): Future[Right[String, AnyRef]] = {
  Unmarshal(entity).to[CheckingAccountBalances[BigDecimal]].map(Right(_))
}

Then specify the server’s base URL, request path, query and response flows, and construct the composite flow. You can optionally add a Supervision decider for error handling. TypedQueryResponse defines a decider in its companion object or use your own.

val baseURL = clientConfig.baseURL
val requestPath = clientConfig.requestPath
val queryFlow = new TypedQueryFlow(baseURL, requestPath,caseClassToGetQuery)
val responseFlow = new TypedResponseFlow(mapPlain, mapChecking)

val tqr = new TypedQueryResponseFlow(queryFlow, responseFlow)

def flow: Flow[Product, Either[String, AnyRef], NotUsed] = {
  val flow = tqr.flow
  flow.withAttributes(ActorAttributes.supervisionStrategy(decider))
}

TypedQueryResponseFlow composes TypedQuery and TypedResponse to a single flow.

Request case class in, result case class out, non-blocking

POST, PUT, and DELETE methods

Akka HTTP built-in components and examples support these methods on the client side.

Non-blocking calls to parallel services and combined responses.

ZipWith takes outputs of all services and pushes them as a tuple when they are all ready.

def zipper = ZipWith((in0: Either[String, AnyRef],
  in1: Either[String, AnyRef],
  in2: Either[String, AnyRef]) => (in0, in1, in2))

val ccf = new CheckingCallFlow
val mmcf = new MoneyMarketCallFlow
val scf = new SavingsCallFlow

import GraphDSL.Implicits._
// Create Graph in Shape of a Flow
val flowGraph = GraphDSL.create() { implicit builder =>
  val bcast: UniformFanOutShape[Product, Product] =     builder.add(Broadcast[Product](3))
  val check: FlowShape[Product,Either[String, AnyRef]] = builder.add(ccf.flow)
  val mm: FlowShape[Product,Either[String, AnyRef]] = builder.add(mmcf.flow)
  val savings: FlowShape[Product,Either[String, AnyRef]] = builder.add(scf.flow)
  val zip = builder.add(zipper)

  bcast ~> check ~> zip.in0
  bcast ~> mm ~> zip.in1
  bcast ~> savings ~> zip.in2
  FlowShape(bcast.in, zip.out)
}.named("calls")
// Cast Graph to Flow
val asFlow = Flow.fromGraph(flowGraph)
// Map tuple3 from flowGraph
val fgLR = GraphDSL.create() { implicit builder =>
  val fgCalls = builder.add(asFlow)
  val fgLR = builder.add(leftRightFlow) // results combiner

  fgCalls ~> fgLR
  FlowShape(fgCalls.in, fgLR.outlet)
}.named("callsLeftRight")
val wrappedCallsLRFlow = Flow.fromGraph(fgLR)

Akka HTTP servers

Akka HTTP’s high level routing DSL is elegant, easy to code, and readable for non-programmers. BalancesService shows an example server with a GET request handler.

Stand-alone Functions

val hostConfig: (Config, String, Int) = getHostConfig("dendrites.checking-balances.http.interface","my.http.port")
Get Config, ip address, port as tuple3
val baseURL: StringBuilder = configBaseUrl("my.http.path", hostConfig)
Append path to host URL
val url: StringBuilder = createUrl(scheme, ipDomain, port, path) 
Create URL including path
val balanceQuery = GetAccountBalances(goodId)
val checkingPath = "/account/balances/checking/"
val balancesQuery: StringBuilder = caseClassToGetQuery(balanceQuery)()
val q = checkingPath ++ balancesQuery 
Create GET request with request path i.e. “?”, “key=value” for case classes with only basic field types
val callFuture: Future[HttpResponse] = typedQuery(GetAccountBalances(id), clientConfig.baseURL)
Create GET request from URL with path and GET arguments mapped to JSON from case class. Call it.
partial: HttpResponse => Future[Either[String, AnyRef]] = typedResponse(mapLeft, mapRight) _ // curried
Handle response, mapPlain for exception & server error messages, mapRight for JSON result, 2nd arg list callFuture can be curried.
val id = 1L
val cc = GetAccountBalances(id)
val callFuture: Future[HttpResponse] = typedQuery(baseURL, cc.productPrefix, caseClassToGetQuery)(cc)
val future: Future[Either[String, AnyRef]] = typedFutureResponse(mapPlain, mapChecking)(callFuture)
Map Future[HttpResponse} to a Future[Either] Left for error, Right for good result.
val partial: Product => Future[Either[String, AnyRef]] = typedQueryResponse(baseURL, mapPlain, mapChecking) _ // curried
val responseFuture: Future[Either[String, AnyRef]] = partial(GetAccountBalances(id))
Combine query & response, call it with first argument list once. Call curried function with second argument list for each case class to query on.

Example Configurations

Typesafe Config example, and optional, config settings for HTTP are in src/main/resources/reference.conf. You can choose to use Typesafe Config and override these in your application’s src/main/resources/application.conf.