Quick update on our effort to containerize Bullet:
We’ve published our current effort in this repo: https://github.com/SketchBench/bullet-docker
It automatically publishes the Docker images to our own namespace on
We’ve also added an example docker-compose.yaml to bring up the Bullet Quick Start app on Spark.
The builds are currently set up to run on every merge into the main branch as well as weekly un Sunday to stay up-to-date with upstream.
They are completely parameterized with Docker build arguments to easily adopt newer releases of Bullet’s components.
We’re already working on a Helm Chart based on these images to easily spin up all components on Kubernetes (the foundation to enable easy reproducibility of our benchmark).
Cheers,
FRANK BLECHSCHMIDT
Graduate Student
The Grainger College of Engineering
Computer Science
fra...@illinois.edu
Under the Illinois Freedom of Information Act any written communication to or from university employees regarding university business is a public record and may be subject to public disclosure.
From: <bullet...@googlegroups.com> on behalf of Akshai Sarma <aksha...@gmail.com>
Date: Thursday, June 24, 2021 at 1:44 PM
To: bullet-users <bullet...@googlegroups.com>
Subject: Re: Spark Bullet Backend not reading messages from Kafka (PubSub)
I see, thanks for letting us know. If we can help with something, don't hesitate to ask!
On Thursday, June 24, 2021 at 1:40:18 PM UTC-7 fra...@illinois.edu wrote:
Wow, that’s quite a scale for Storm! We wanted to start with Spark Streaming since it seemed to be a framework that’s easier to set up.
Eventually, we like to benchmark Storm as well and compare the performance of both. Support for Spark Structured Streaming and Flink would be amazing!
We essentially plan to run a subset of queries from existing streaming benchmarks (all queries that are currently supported in BQL) to compare sketch with non-sketch performance. The expectation is that the sketch-optimized queries run magnitudes faster. It would be interesting to see the actual factor for established benchmarks from academia.
We do this incrementally: first only on Spark with one data source from a benchmark (data set or data generator) and very few queries. Eventually, we want to include more benchmarks, data sets, and Storm as well.
However, we’re very time-constrained to finish up everything by August 8 to submit our report (hence the incremental approach).
We containerized Bullet with the goal of running it on Kubernetes to enable easy reproducibility.
Best,
Frank
From: <bullet...@googlegroups.com> on behalf of Akshai Sarma <aksha...@gmail.com>
Date: Thursday, June 24, 2021 at 10:28 AM
To: bullet-users <bullet...@googlegroups.com>
Subject: Re: Spark Bullet Backend not reading messages from Kafka (PubSub)
That's great! Yea the backend has a few components and not sure how Spark doles out tasks if there are too few cores. I'd have imagined it'd work, albeit just slower. Guess not.
Do note that we built Bullet aiming for event-by-event processing in the backend. Our Spark backend implementation still uses DStreams (so micro-batches). We know for a fact that that doesn't scale very well to large amounts of data or queries. The Storm version, in contrast, we've scaled to ~6M simultaneous queries as well as ~2M records per sec. We want to look into Spark Structured Streaming if that let's us get off the micro-batching. We're also going to POC a Flink instance of Bullet soon, which already lets us do the low level event processing. With all that said, would you mind sharing what you're trying to do for your work? Are you comparing Bullet with other things?
Yea we'd totally be interested in a containerized version of Bullet! Before you contribute, please do make an issue laying out how you're structuring it and we can figure out the best place to put it.
On Thursday, June 24, 2021 at 12:57:00 AM UTC-7 fra...@illinois.edu wrote:
PS:
For now, we can confirm that the latest versions of all Bullet components start up with the example app from the tutorial.
We’ve also updated the tooling to the latest versions: Spark 3.1.2 with Hadoop 3.2, Kafka 2.8.0, nvm 0.38.0, and node.js 16.4.0.
The application is running fine natively on a MacBook (install-all-spark.sh) as well as containerized in a Docker setup with right-sized Spark cluster workers (standalone cluster).
We ultimately aim to bring everything up’n’running in Kubernetes.
Once we’ve confirmed a good setup and were able to reliably use it for our paper, we also hope to contribute back to the project, e.g. Dockerfiles, docker-compose.yml, Kubernetes helm charts, etc.
Best,
Frank
From: "Blechschmidt, Frank" <fra...@illinois.edu>
Date: Thursday, June 24, 2021 at 12:38 AM
To: "Blechschmidt, Frank" <fra...@illinois.edu>, Akshay Sarma <aksha...@gmail.com>
Cc: bullet-users <bullet...@googlegroups.com>, "Makam, Hareesh" <hma...@illinois.edu>, "Pendyala, Murali Krishna" <mk...@illinois.edu>
Subject: RE: Spark Bullet Backend not reading messages from Kafka (PubSub)
Got it working 🚀 (see attached screenshot)
After figuring out that I needed more workers, I at least was able to start the Spark backend.
I started playing with the number of Spark workers. The app also starts with “only” 4 workers.
However, I was still observing the error logs and crashes from my previous email.
As it turns out, this got triggered whenever I tried the example query in the Bullet UI. So… the query processing caused the Spark executors to crash.
The Spark UI gave some hints that it might be related to resources. Originally the defaults for the workers were set to:
SPARK_WORKER_MEMORY=1G
SPARK_WORKER_CORES=1
After increasing it to
SPARK_WORKER_MEMORY=12G
SPARK_WORKER_CORES=2
the query processing in Bullet started working.
Conclusion: Bullet needs more than 2 Spark workers, and each worker needs more than 1CPU/1GB.
Do you know by any chance good settings for Bullet? How many Spark workers? What resources should Spark worker have?
Cheers,
Frank
From: bullet...@googlegroups.com <bullet...@googlegroups.com> On Behalf Of Blechschmidt, Frank
Sent: Wednesday, June 23, 2021 11:44 PM
To: Akshay Sarma <aksha...@gmail.com>
Cc: bullet-users <bullet...@googlegroups.com>; Makam, Hareesh <hma...@illinois.edu>; Pendyala, Murali Krishna <mk...@illinois.edu>
Subject: RE: Spark Bullet Backend not reading messages from Kafka (PubSub)
[Thank you in advance for your help!]
The non-Docker version starts up successfully on the older artifact versions of bullet.
I also figured out the problem with the Docker-powered setup: the Spark cluster (standalone) in Docker (using docker-compose) had only 2 workers.
So, the tasks would starve since they could not find executors.
I’m now running a setup with 10 workers on the latest release versions of bullet and I can observe more activity in the Spark cluster.
Still, I’m not able to get a response in the UI/web service.
It looks like some issue with the processing of RDDs (I’ve also attached screenshots from the Spark UI):
org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[1460] at receiverStream at RandomProducer.scala:19 after its blocks have been removed!
org.apache.spark.SparkException: Attempted to use BlockRDD[1460] at receiverStream at RandomProducer.scala:19 after its blocks have been removed!
at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:55)
at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)
at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:50)
at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:110)
at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2333)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:2307)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2$adapted(DAGScheduler.scala:1329)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1226)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5(DAGScheduler.scala:1229)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5$adapted(DAGScheduler.scala:1228)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1228)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1168)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2395)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1341)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1226)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5(DAGScheduler.scala:1229)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5$adapted(DAGScheduler.scala:1228)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1228)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1168)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2395)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1012)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:1010)
at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1(ResultEmitter.scala:20)
at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1$adapted(ResultEmitter.scala:20)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Attempted to use BlockRDD[1460] at receiverStream at RandomProducer.scala:19 after its blocks have been removed!
at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:55)
at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)
at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:50)
at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:110)
at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2333)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:2307)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2$adapted(DAGScheduler.scala:1329)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1226)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5(DAGScheduler.scala:1229)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5$adapted(DAGScheduler.scala:1228)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1228)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1168)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2395)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
There are also some other errors:
org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 1705, num of partitions: 2]; Checkpoint RDD [ID: 1720, num of partitions: 0].
at org.apache.spark.rdd.ReliableCheckpointRDD$.writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:183)
at org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:59)
at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:75)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1885)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1875)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3(RDD.scala:1887)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3$adapted(RDD.scala:1887)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1887)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1875)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3(RDD.scala:1887)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3$adapted(RDD.scala:1887)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1887)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1875)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2198)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1012)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:1010)
at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1(ResultEmitter.scala:20)
at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1$adapted(ResultEmitter.scala:20)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
From: bullet...@googlegroups.com <bullet...@googlegroups.com> On Behalf Of Akshay Sarma
Sent: Wednesday, June 23, 2021 11:03 PM
To: Blechschmidt, Frank <fra...@illinois.edu>
Cc: bullet-users <bullet...@googlegroups.com>; Makam, Hareesh <hma...@illinois.edu>; Pendyala, Murali Krishna <mk...@illinois.edu>
Subject: Re: Spark Bullet Backend not reading messages from Kafka (PubSub)
Yea try taking it out of Docker and try the exact versions as in the current quick-start. I'll try to get your versions running tomorrow and see if I run into the same issues.
On Wed, Jun 23, 2021 at 7:36 PM Blechschmidt, Frank <fra...@illinois.edu> wrote:
Yes, we’ve followed the tutorial, but updated the versions and run it in Docker containers locally.
I’m currently trying to run it natively (non-Docker) on my machine and want to compare the difference.
What confuses me right now is that the Spark backend seem sto connect to Kafka successfully, but is not processing any messages.
These are the versions we’re running:
BULLET_EXAMPLES_VERSION=1.0.0
BULLET_KAFKA_VERSION=1.2.2
BULLET_SPARK_VERSION=1.0.4
BULLET_UI_VERSION=1.1.0
BULLET_WS_VERSION=1.1.0
Java 1.8, Node.js 16, Spark 3.1, Zookeeper 3.7, Kafka 2.8 (runs on port 29092)
I’ve attached the configurations for reference.
Best,
FRANK BLECHSCHMIDT
Graduate Student
The Grainger College of Engineering
Computer Science
fra...@illinois.edu
Under the Illinois Freedom of Information Act any written communication to or from university employees regarding university business is a public record and may be subject to public disclosure.
From: <bullet...@googlegroups.com> on behalf of Akshai Sarma <aksha...@gmail.com>
Date: Wednesday, June 23, 2021 at 1:42 PM
To: bullet-users <bullet...@googlegroups.com>
Subject: Re: Spark Bullet Backend not reading messages from Kafka (PubSub)
So it looks like everything is being run locally. Did you try the quickstart for Spark here: https://bullet-db.github.io/quick-start/spark/ (it is using an older version of Bullet 1.x - we need to update our documentation) already? Did that also result in the same errors?
If that doesn't work, can you post all the configs you used? The kafka settings, the bullet spark config and the bullet service config (including the pubsub config you passed to the bullet service).
On Tuesday, June 22, 2021 at 10:34:49 PM UTC-7 fra...@illinois.edu wrote:
Hi,
We’re a team of 3 graduate students working on a paper using Bullet for our evaluation. We’re trying to setup the example application locally, but are running into an issue:
All components are up and running locally (UI, web service, Kafka as pubsub, Spark as backend), but the Spark backend does not read or process the messages from the bullet.requests Kafka topic.
We’ve also deployed Kafdrop to monitor the messages in the two topics: bullet.requests and bullet.responses. The bullet.requests topic has messages, bullet.responses stays empty.
However, the Spark logs indicate that the BulletSparkStreamingMain app launched and connected successfully:
[INFO ] 2021-06-23 04:41:39,533 org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.6.0
[INFO ] 2021-06-23 04:41:39,533 org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 62abe01bee039651
[INFO ] 2021-06-23 04:41:39,533 org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1624423299529
[INFO ] 2021-06-23 04:41:39,540 org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Subscribed to topic(s): bullet.requests
[INFO ] 2021-06-23 04:41:39,543 com.yahoo.bullet.spark.QueryReceiver - Setup PubSub: com.yahoo.bullet.kafka.KafkaPubSub@36b35cc3 with Subscriber: com.yahoo.bullet.kafka.KafkaSubscriber@73967938
[INFO ] 2021-06-23 04:41:39,544 com.yahoo.bullet.spark.QueryReceiver - Query receiver started.
[INFO ] 2021-06-23 04:41:39,545 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Called receiver 0 onStart
[INFO ] 2021-06-23 04:41:39,545 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Waiting for receiver to be stopped
[INFO ] 2021-06-23 04:41:40,788 org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Cluster ID: QgfInwnoRnCFliTIEw3P6g
[INFO ] 2021-06-23 04:41:40,792 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Discovered group coordinator kafka:29092 (id: 2147482646 rack: null)
[INFO ] 2021-06-23 04:41:40,800 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] (Re-)joining group
[INFO ] 2021-06-23 04:41:40,823 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
[INFO ] 2021-06-23 04:41:40,824 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] (Re-)joining group
[INFO ] 2021-06-23 04:41:42,657 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Successfully joined group with generation 21
[INFO ] 2021-06-23 04:41:42,672 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Notifying assignor about the new Assignment(partitions=[bullet.requests-0])
[INFO ] 2021-06-23 04:41:42,685 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Adding newly assigned partitions: bullet.requests-0
[INFO ] 2021-06-23 04:41:42,723 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Setting offset for partition bullet.requests-0 to the committed offset FetchPosition{offset=228, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[kafka:29092 (id: 1001 rack: null)], epoch=0}}
However, there are no logs of the app successfully processing any of the messages piling up in bullet.requests. So, eventually the Bullet web service fails with the following error in the logs:
bullet-webservice_1 | 2021-06-23 05:12:05.022 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService : Backend is not up! Failing all queries and refusing to accept new queries
bullet-webservice_1 | 2021-06-23 05:12:34.995 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService : Backend is not up! Failing all queries and refusing to accept new queries
bullet-webservice_1 | 2021-06-23 05:13:04.988 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService : Backend is not up! Failing all queries and refusing to accept new queries
bullet-webservice_1 | 2021-06-23 05:13:34.963 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService : Backend is not up! Failing all queries and refusing to accept new queries
This is an example message from Kafdrop in bullet.requests (“default” key and message format):
��sr%com.yahoo.bullet.pubsub.PubSubMessage��,�m4 �[1]
[contentt[1][BL[1]idt Ljava/lang/String;L metadatat"Lcom/yahoo/bullet/pubsub/Metadata;xpur[1][B��� T�[1]xp
���sr com.yahoo.bullet.query.Query 7 �C���[1]L
aggregationt1Lcom/yahoo/bullet/query/aggregations/Aggregation;L durationtLjava/lang/Long;L filtert/Lcom/yahoo/bullet/query/expressions/Expression;LpostAggregationstLjava/util/List;Lprojectiont#Lcom/yahoo/bullet/query/Projection;L
tableFunctiont5Lcom/yahoo/bullet/query/tablefunctions/TableFunction;L windowtLcom/yahoo/bullet/query/Window;xpsr'com.yahoo.bullet.query.aggregations.Raw��X����[1]xr/com.yahoo.bullet.query.aggregations.Aggregation�93�n>V"[1][1]L
sizet Ljava/lang/Integer;L
typet5Lcom/yahoo/bullet/query/aggregations/AggregationType;xpsr java.lang.Integer ⠤���8[1]Ivaluexrjava.lang.Number���
���[1]xp~r3com.yahoo.bullet.query.aggregations.AggregationType xr
java.lang.Enum xpt
RAWsr
java.lang.Long;��̏#�[1]Jvaluexq~ppsr!com.yahoo.bullet.query.Projection�g���_a�[1][1]L fieldsq~
L
typet(Lcom/yahoo/bullet/query/Projection$Type;xpp~r&com.yahoo.bullet.query.Projection$Type xq~ t
PASS_THROUGHpsrcom.yahoo.bullet.query.Window2�x,}���[1]
L emitEveryq~
L emitTypet$Lcom/yahoo/bullet/query/Window$Unit;L
includeFirstq~
L
includeTypeq~‑xpppppt$6b3a0f8c-871a-437b-8c8d-e0dcf85b9a63sr$com.yahoo.bullet.kafka.KafkaMetadata@��YwJ|[1]L
topicPartitiont(Lorg/apache/kafka/common/TopicPartition;xr com.yahoo.bullet.pubsub.Metadatag�PE�+2
[1]
JcreatedLcontentt Ljava/lang/Object;L signalt)Lcom/yahoo/bullet/pubsub/Metadata$Signal;xpz7HT�tWebservice status tickpsr&org.apache.kafka.common.TopicPartition�{��|�U[1]
I
hashI partitionLtopicq~[1]xptbullet.responses
Any ideas what could cause the backend to not process the messages even though it connected to the Kafka consumer group? Could it be a resource issues?
I’ve attached some screenshots of the Spark App UI for reference. This is the command that we used to start the backend:
spark-submit \
--master spark://spark:7077 \
--deploy-mode cluster \
--class com.yahoo.bullet.spark.BulletSparkStreamingMain \
--jars /opt/bitnami/spark/jars/bullet-kafka-1.2.2-fat.jar,/opt/bitnami/spark/jars/bullet-spark-example.jar \
/opt/bitnami/spark/jars/bullet-spark-1.0.4-standalone.jar \
--bullet-spark-conf /opt/bitnami/spark/conf/bullet_spark_kafka_settings.yaml
Best regards,
FRANK BLECHSCHMIDT
Graduate Student
The Grainger College of Engineering
Computer Science
fra...@illinois.edu
Under the Illinois Freedom of Information Act any written communication to or from university employees regarding university business is a public record and may be subject to public disclosure.--
You received this message because you are subscribed to the Google Groups "bullet-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to bullet-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/bullet-users/BY5PR11MB41968AF77E1ADC16F47814979B079%40BY5PR11MB4196.namprd11.prod.outlook.com.--
You received this message because you are subscribed to the Google Groups "bullet-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to bullet-users...@googlegroups.com.To view this discussion on the web visit https://groups.google.com/d/msgid/bullet-users/93577fd6-c441-456c-bd9c-7cbefefd73afn%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "bullet-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
bullet-users...@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/bullet-users/c7b43104-30cd-484d-bb7e-0a978a98c540n%40googlegroups.com.
We definitely would like to contribute back to the Bullet project. We’re aiming to add the Helm chart and templates next and then eventually add Bullet’s Storm backend as well.
The report for our project is due on August 8 and we hope to have all pieces together by then. This might mark a good milestone to prep a PR against Bullet’s docs repo.
In the meantime, we’ve added another tool and open-sourced it that might be useful for developers using Bullet:
https://github.com/SketchBench/sketchbench-data-ingestion-tester
The Data Ingestion Tester produces test data (messages) using the Python Faker library and ingests it into Kafka. The generated message payload covers all supported data types in Bullet's schema.
We’ve also published it again on
We’re using this to test out our Bullet infrastructure and configuration, especially the data ingestion through Kafka, before plugging in the actual benchmark data generators.
Cheers,
FRANK BLECHSCHMIDT
Graduate Student
The Grainger College of Engineering
Computer Science
fra...@illinois.edu
Under the Illinois Freedom of Information Act any written communication to or from university employees regarding university business is a public record and may be subject to public disclosure.
From: <bullet...@googlegroups.com> on behalf of Akshai Sarma <aksha...@gmail.com>
Date: Thursday, July 1, 2021 at 1:46 PM
To: bullet-users <bullet...@googlegroups.com>
Subject: Re: Containerized Bullet components
Very cool! I think the way it's structured - if you wanted to contribute it back - it might go very well into the documentation repo.
On Thursday, July 1, 2021 at 2:02:32 AM UTC-7 fra...@illinois.edu wrote:
Quick update on our effort to containerize Bullet:
We’ve published our current effort in this repo: https://github.com/SketchBench/bullet-docker
It automatically publishes the Docker images to our own namespace on
- DockerHub: https://hub.docker.com/u/sketchbench
- GitHub: https://github.com/orgs/SketchBench/packages
We’ve also added an example docker-compose.yaml to bring up the Bullet Quick Start app on Spark.
The builds are currently set up to run on every merge into the main branch as well as weekly un Sunday to stay up-to-date with upstream.
They are completely parameterized with Docker build arguments to easily adopt newer releases of Bullet’s components.
We’re already working on a Helm Chart based on these images to easily spin up all components on Kubernetes (the foundation to enable easy reproducibility of our benchmark).
Cheers,
FRANK BLECHSCHMIDT
Graduate Student
The Grainger College of Engineering
Computer Science
fra...@illinois.edu
To view this discussion on the web visit
https://groups.google.com/d/msgid/bullet-users/2c73a673-0d78-48cd-9cef-05ec314c9115n%40googlegroups.com.