Algebird streaming approximations

Near realtime approximations of stream data with pre-built Algebird stages and stand-alone functions.

AvgFlow sumAvgFlow CMSFlow ZipTimeFlow HLLFlow estSizeFlow sumHLLs toApproximate toApproximates maxFlow minFlow firstQuartileFlow secondQuartileFlow thirdQuartileFlow interQuartileMean qTreeMaxFlow qTreeMinFlow

Click image to open source code in a new tab. Hover over image for stage inputs and outputs

Stream Stages

Most of these are stand-alone functions wrapped in an Akka Streams built-in Flow stage.

AveragedValue estimates a variable’s mean in the stream.

CountMinSketch estimates a variable’s frequency. CreateCMSFlow is a custom Flow stage: it needs an implicit CMSMonoid.

DecayedValue estimates a variable’s moving average and de-weights values by age. The value is tupled with a timestamp. ZipTimeFlow is a custom Flow stage: It takes a Numeric value and a UDF that creates a time value and returns a tuple of (value, time).

HyperLogLog estimates a variable’s number of distinct values. CreateHLLFlow is a custom Flow stage: it needs an implicit HyperLogLogAggregator.

Min and Max estimates a variable’s minimum or maximum values.

QTree estimates quartiles for a variable.

BloomFilter quickly ensures a word is not in a dictionary or a set of words and quickly predicts a word is probably in a dictionary or a set of words

Algebird approximators can stream in parallel. This contrived example uses Agents as a thread safe shared value holder. Agents are deprecated in Akka 2.5.

// Zip input agent update Futures, waits for all to complete
def zipper: ZipWith5[Future[AveragedValue], Future[CMS[A]], Future[Seq[DecayedValue]], Future[HLL], Future[QTree[A]], (Future[AveragedValue], Future[CMS[A]], Future[Seq[DecayedValue]], Future[HLL], Future[QTree[A]])] = ZipWith((in0: Future[AveragedValue],
  in1: Future[CMS[A]],
  in2: Future[Seq[DecayedValue]],
  in3: Future[HLL],
  in4: Future[QTree[A]]) => (in0, in1, in2, in3, in4))

// Graph to broadcast to update agent composite sinks
val approximators = GraphDSL.create() { implicit builder =>
  val bcast: UniformFanOutShape[Seq[A], Seq[A]] = builder.add(Broadcast[Seq[A]](5))
  val avg = builder.add(AveragedAgentFlow.compositeFlow(avgAgent))
  val cms = builder.add(CountMinSketchAgentFlow.compositeFlow(cmsAgent))
  val dvt = builder.add(DecayedValueAgentFlow.compositeFlow(dcaAgent, time))
  val hll = builder.add(HyperLogLogAgentFlow.compositeFlow(hllAgent))
  val qtaf = new QTreeAgentFlow(qtAgent)
  val qtrAg = builder.add(qtaf)
  val zip = builder.add(zipper)

  bcast ~> avg ~> zip.in0
  bcast ~> cms ~> zip.in1
  bcast ~> dvt ~> zip.in2
  bcast ~> hll ~> zip.in3
  bcast ~> qtrAg ~> zip.in4
  FlowShape(bcast.in, zip.out)
}.named("parallelApproximators")

Stand-alone Functions

Approximating Functions

val bigDecimals: Seq[BigDecimal]
val avg0 = avg(bigDecimals)
AveragedValue of a Sequence of values
val bigDecimals2: Seq[BigDecimal]
val avg1 = avg(bigDecimals2)
val avgs = Vector[AveragedValue](avg0, avg1)
val avgSum = sumAverageValues(avgs)
AveragedValue of a sequence of AveragedValues
val falsePositivepProb: Double = 0.01
val words = readWords(wordsPath)
val wordsBF = createBF(words, fpProb)
Create a BloomFilter
val falsePositivepProb: Double = 0.01
val word = "path"
val inDict = wordsBF.contains(word).isTrue
Is word in BloomFilter
val falsePositivepProb: Double = 0.01
val wordsFalseWords: IndexedSeq[String]
val falsePositives = for {
  i <- wordsFalseWords
  if wordsBF.contains(i).isTrue
} yield i
val acceptable = falsePositives.size < words.size * fpProb
Is BloomFilter’s false positive rate acceptable
val addrs = inetAddresses(ipRange)
val longZips = inetToLongZip(addrs)
val longs = testLongs(longZips)
implicit val m = createCMSMonoid[Long]()
val cms = createCountMinSketch(longs)
val estimatedCount = cms.totalCount
CountMinSketch estimate total number of elements seen so far
val estFreq = cms.frequency(longZips(5))
CountMinSketch estimate count of elements with the same value as the one selected
val cms1 = createCountMinSketch(longs)
val cmss = Vector(cms, cms1)
val cmsSum = sumCountMinSketch(cmss)
val estimatedCount = cmsSum.totalCount
Sum a Sequence of CountMinSketch then estimate combined total number of elements
val estFreq = cmsSum.frequency(longZips(5))
From a Sequence of CountMinSketch estimate count of elements with the indexed same value
val sines = genSineWave(100, 0 to 360)
val days = Range.Double(0.0, 361.0, 1.0)
val sinesZip = sines.zip(days)
val decayedValues = toDecayedValues(sinesZip, 10.0, None)
val avgAt90 = decayedValues(90).average(10.0)
DecayedValue moving average from the initial value to specified index
val avg80to90 = decayedValues(90).averageFrom(10.0, 80.0, 90.0)
DecayedValue moving average from specified index to specified index
implicit val ag = HyperLogLogAggregator(12)
val ints: Seq[Int]
val hll = createHLL(ints)
HyperLogLog create a HLL from a sequence of Int
val hlls = Vector(hll, hll2)
val sum = hlls.reduce(_ + _)
val size = sum.estimatedSize
Sum a Sequence of HLL and estimate total size
val approxs = mapHLL2Approximate(hlls)
Create a sequence of Approximate HHL approximate. Map a sequence of HLL to a sequence of Approximate
val sum = approxs.reduce(_ + _)
Sum a Sequence of Approximate and estimate total size
val level = 5
implicit val qtBDSemigroup = new QTreeSemigroup[BigDecimal](level)
val qtBD = buildQTree[BigDecimal](bigDecimals)
Build QTree from a Sequence
val iqm = qtBD.interQuartileMean
Get its InterQuartileMean
val qTrees = Vector(qtBD, qtBD2)
val sumQTree = sumQTrees(qTrees)
Sum a Sequence of QTrees to a QTree
def wrapMax[Int](x: Int) = Max(x)
val wm = SeqFunctor.map[Int, Max[Int]](List(1,2,3,4))(wrapMax)
val bigDecimals: Seq[BigDecimal]
val negBigDecimals = SeqFunctor.map[BigDecimal, BigDecimal](bigDecimals)(negate)
val invertedBigDecimals = SeqFunctor.map[BigDecimal, BigDecimal](bigDecimals)(inverse)
SeqFunctor map elements of a sequence to elements of another sequence.
val bigDecimals: Seq[BigDecimal]
val invertedNegBigDecimals = andThen[BigDecimal, BigDecimal, BigDecimal](bigDecimals)( inverse)( negate)
andThen Functor map the elements of a sequence, map that sequence: f() andThen g().
val bigDecimals: Seq[BigDecimal]
val max = max(bigDecimals)
Get Max element of a sequence. For Sequence types that have a Semigroup, Monoid and Ordering
val bigDecimals: Seq[BigDecimal]
val min = min(bigDecimals)
val optBigDecs: [Option[BigDecimal]]
val min2 = min(optBigDecs.flatten)
val eithBigInts = Seq[Either[String, BigInt]]
val min3 = min(filterRight(eithBigInts)
Get Min element of a sequence.

Example Configurations

Typesafe Config example, and optional, config settings for Algebird are in src/main/resources/reference.conf. You can choose to use Typesafe Config and override these in your application’s src/main/resources/application.conf.