Algebird streaming approximations
Near realtime approximations of stream data with pre-built Algebird stages and stand-alone functions.
Click image to open source code in a new tab. Hover over image for stage inputs and outputs
Stream Stages
Most of these are stand-alone functions wrapped in an Akka Streams built-in Flow
stage.
AveragedValue estimates a variable’s mean in the stream.
CountMinSketch estimates a variable’s frequency. CreateCMSFlow
is a custom Flow stage: it needs an implicit CMSMonoid
.
DecayedValue estimates a variable’s moving average and de-weights values by age. The value is tupled with a timestamp. ZipTimeFlow
is a custom Flow stage: It takes a Numeric value and a UDF that creates a time value and returns a tuple of (value, time).
HyperLogLog estimates a variable’s number of distinct values. CreateHLLFlow
is a custom Flow stage: it needs an implicit HyperLogLogAggregator
.
Min and Max estimates a variable’s minimum or maximum values.
QTree estimates quartiles for a variable.
BloomFilter quickly ensures a word is not in a dictionary or a set of words and quickly predicts a word is probably in a dictionary or a set of words
Algebird approximators can stream in parallel. This contrived example uses Agents as a thread safe shared value holder. Agents are deprecated in Akka 2.5.
// Zip input agent update Futures, waits for all to complete
def zipper: ZipWith5[Future[AveragedValue], Future[CMS[A]], Future[Seq[DecayedValue]], Future[HLL], Future[QTree[A]], (Future[AveragedValue], Future[CMS[A]], Future[Seq[DecayedValue]], Future[HLL], Future[QTree[A]])] = ZipWith((in0: Future[AveragedValue],
in1: Future[CMS[A]],
in2: Future[Seq[DecayedValue]],
in3: Future[HLL],
in4: Future[QTree[A]]) => (in0, in1, in2, in3, in4))
// Graph to broadcast to update agent composite sinks
val approximators = GraphDSL.create() { implicit builder =>
val bcast: UniformFanOutShape[Seq[A], Seq[A]] = builder.add(Broadcast[Seq[A]](5))
val avg = builder.add(AveragedAgentFlow.compositeFlow(avgAgent))
val cms = builder.add(CountMinSketchAgentFlow.compositeFlow(cmsAgent))
val dvt = builder.add(DecayedValueAgentFlow.compositeFlow(dcaAgent, time))
val hll = builder.add(HyperLogLogAgentFlow.compositeFlow(hllAgent))
val qtaf = new QTreeAgentFlow(qtAgent)
val qtrAg = builder.add(qtaf)
val zip = builder.add(zipper)
bcast ~> avg ~> zip.in0
bcast ~> cms ~> zip.in1
bcast ~> dvt ~> zip.in2
bcast ~> hll ~> zip.in3
bcast ~> qtrAg ~> zip.in4
FlowShape(bcast.in, zip.out)
}.named("parallelApproximators")
Stand-alone Functions
val bigDecimals: Seq[BigDecimal]
val avg0 = avg(bigDecimals)
AveragedValue of a Sequence of values
val bigDecimals2: Seq[BigDecimal]
val avg1 = avg(bigDecimals2)
val avgs = Vector[AveragedValue](avg0, avg1)
val avgSum = sumAverageValues(avgs)
AveragedValue of a sequence of AveragedValues
val falsePositivepProb: Double = 0.01
val words = readWords(wordsPath)
val wordsBF = createBF(words, fpProb)
Create a BloomFilter
val falsePositivepProb: Double = 0.01
val word = "path"
val inDict = wordsBF.contains(word).isTrue
Is word in BloomFilter
val falsePositivepProb: Double = 0.01
val wordsFalseWords: IndexedSeq[String]
val falsePositives = for {
i <- wordsFalseWords
if wordsBF.contains(i).isTrue
} yield i
val acceptable = falsePositives.size < words.size * fpProb
Is BloomFilter’s false positive rate acceptable
val addrs = inetAddresses(ipRange)
val longZips = inetToLongZip(addrs)
val longs = testLongs(longZips)
implicit val m = createCMSMonoid[Long]()
val cms = createCountMinSketch(longs)
val estimatedCount = cms.totalCount
CountMinSketch estimate total number of elements seen so far
val estFreq = cms.frequency(longZips(5))
CountMinSketch estimate count of elements with the same value as the one selected
val cms1 = createCountMinSketch(longs)
val cmss = Vector(cms, cms1)
val cmsSum = sumCountMinSketch(cmss)
val estimatedCount = cmsSum.totalCount
Sum a Sequence of CountMinSketch then estimate combined total number of elements
val estFreq = cmsSum.frequency(longZips(5))
From a Sequence of CountMinSketch estimate count of elements with the indexed same value
val sines = genSineWave(100, 0 to 360)
val days = Range.Double(0.0, 361.0, 1.0)
val sinesZip = sines.zip(days)
val decayedValues = toDecayedValues(sinesZip, 10.0, None)
val avgAt90 = decayedValues(90).average(10.0)
DecayedValue moving average from the initial value to specified index
val avg80to90 = decayedValues(90).averageFrom(10.0, 80.0, 90.0)
DecayedValue moving average from specified index to specified index
implicit val ag = HyperLogLogAggregator(12)
val ints: Seq[Int]
val hll = createHLL(ints)
HyperLogLog create a HLL from a sequence of Int
val hlls = Vector(hll, hll2)
val sum = hlls.reduce(_ + _)
val size = sum.estimatedSize
Sum a Sequence of HLL and estimate total size
val approxs = mapHLL2Approximate(hlls)
Create a sequence of Approximate HHL approximate. Map a sequence of HLL to a sequence of Approximate
val sum = approxs.reduce(_ + _)
Sum a Sequence of Approximate and estimate total size
val level = 5
implicit val qtBDSemigroup = new QTreeSemigroup[BigDecimal](level)
val qtBD = buildQTree[BigDecimal](bigDecimals)
Build QTree from a Sequence
val iqm = qtBD.interQuartileMean
Get its InterQuartileMean
val qTrees = Vector(qtBD, qtBD2)
val sumQTree = sumQTrees(qTrees)
Sum a Sequence of QTrees to a QTree
def wrapMax[Int](x: Int) = Max(x)
val wm = SeqFunctor.map[Int, Max[Int]](List(1,2,3,4))(wrapMax)
val bigDecimals: Seq[BigDecimal]
val negBigDecimals = SeqFunctor.map[BigDecimal, BigDecimal](bigDecimals)(negate)
val invertedBigDecimals = SeqFunctor.map[BigDecimal, BigDecimal](bigDecimals)(inverse)
SeqFunctor map elements of a sequence to elements of another sequence.
val bigDecimals: Seq[BigDecimal]
val invertedNegBigDecimals = andThen[BigDecimal, BigDecimal, BigDecimal](bigDecimals)( inverse)( negate)
andThen Functor map the elements of a sequence, map that sequence: f() andThen g().
val bigDecimals: Seq[BigDecimal]
val max = max(bigDecimals)
Get Max element of a sequence. For Sequence types that have a Semigroup, Monoid and Ordering
val bigDecimals: Seq[BigDecimal]
val min = min(bigDecimals)
val optBigDecs: [Option[BigDecimal]]
val min2 = min(optBigDecs.flatten)
val eithBigInts = Seq[Either[String, BigInt]]
val min3 = min(filterRight(eithBigInts)
Get Min element of a sequence.
Example Configurations
Typesafe Config example, and optional, config settings for Algebird are in src/main/resources/reference.conf
. You can choose to use Typesafe Config and override these in your application’s src/main/resources/application.conf
.