Hi Akshai,
We’re currently in the midst of running a variety of benchmark experiments using Bullet on Spark Streaming.
We were wondering what parameters we should look out for to get the best performance out of the system.
We’re running in a 3 node configuration (1 master, 2 workers) with 7 CPU/25Gi per worker and data ingestion via Kafka.
What should we set to optimize for parallelization, e.g. actually using all 7 cores fully within a worker?
Any tips on the relation of # Kafka topics to workers and worker cores?
Cheers,
FRANK BLECHSCHMIDT
Graduate Student
The Grainger College of Engineering
Computer Science
fra...@illinois.edu
Under the Illinois Freedom of Information Act any written communication to or from university employees regarding university business is a public record and may be subject to public disclosure.
From: <bullet...@googlegroups.com> on behalf of "Blechschmidt, Frank" <fra...@illinois.edu>
Date: Saturday, July 3, 2021 at 12:56 AM
To: Akshai Sarma <aksha...@gmail.com>, bullet-users <bullet...@googlegroups.com>
Cc: "Makam, Hareesh" <hma...@illinois.edu>, "Pendyala, Murali Krishna" <mk...@illinois.edu>
Subject: Re: Containerized Bullet components
We definitely would like to contribute back to the Bullet project. We’re aiming to add the Helm chart and templates next and then eventually add Bullet’s Storm backend as well.
The report for our project is due on August 8 and we hope to have all pieces together by then. This might mark a good milestone to prep a PR against Bullet’s docs repo.
In the meantime, we’ve added another tool and open-sourced it that might be useful for developers using Bullet:
https://github.com/SketchBench/sketchbench-data-ingestion-tester
The Data Ingestion Tester produces test data (messages) using the Python Faker library and ingests it into Kafka. The generated message payload covers all supported data types in Bullet's schema.
We’ve also published it again on
We’re using this to test out our Bullet infrastructure and configuration, especially the data ingestion through Kafka, before plugging in the actual benchmark data generators.
Cheers,
FRANK BLECHSCHMIDT
Graduate Student
The Grainger College of Engineering
Computer Science
fra...@illinois.edu
Under the Illinois Freedom of Information Act any written communication to or from university employees regarding university business is a public record and may be subject to public disclosure.
From: <bullet...@googlegroups.com> on behalf of Akshai Sarma <aksha...@gmail.com>
Date: Thursday, July 1, 2021 at 1:46 PM
To: bullet-users <bullet...@googlegroups.com>
Subject: Re: Containerized Bullet components
Very cool! I think the way it's structured - if you wanted to contribute it back - it might go very well into the documentation repo.
On Thursday, July 1, 2021 at 2:02:32 AM UTC-7 fra...@illinois.edu wrote:
Quick update on our effort to containerize Bullet:
We’ve published our current effort in this repo: https://github.com/SketchBench/bullet-docker
It automatically publishes the Docker images to our own namespace on
- DockerHub: https://hub.docker.com/u/sketchbench
- GitHub: https://github.com/orgs/SketchBench/packages
We’ve also added an example docker-compose.yaml to bring up the Bullet Quick Start app on Spark.
The builds are currently set up to run on every merge into the main branch as well as weekly un Sunday to stay up-to-date with upstream.
They are completely parameterized with Docker build arguments to easily adopt newer releases of Bullet’s components.
We’re already working on a Helm Chart based on these images to easily spin up all components on Kubernetes (the foundation to enable easy reproducibility of our benchmark).
Cheers,
FRANK BLECHSCHMIDT
Graduate Student
The Grainger College of Engineering
Computer Science
fra...@illinois.edu
Under the Illinois Freedom of Information Act any written communication to or from university employees regarding university business is a public record and may be subject to public disclosure.
From: <bullet...@googlegroups.com> on behalf of Akshai Sarma <aksha...@gmail.com>
Date: Thursday, June 24, 2021 at 1:44 PM
To: bullet-users <bullet...@googlegroups.com>
Subject: Re: Spark Bullet Backend not reading messages from Kafka (PubSub)
I see, thanks for letting us know. If we can help with something, don't hesitate to ask!
On Thursday, June 24, 2021 at 1:40:18 PM UTC-7 fra...@illinois.edu wrote:
Wow, that’s quite a scale for Storm! We wanted to start with Spark Streaming since it seemed to be a framework that’s easier to set up.
Eventually, we like to benchmark Storm as well and compare the performance of both. Support for Spark Structured Streaming and Flink would be amazing!
We essentially plan to run a subset of queries from existing streaming benchmarks (all queries that are currently supported in BQL) to compare sketch with non-sketch performance. The expectation is that the sketch-optimized queries run magnitudes faster. It would be interesting to see the actual factor for established benchmarks from academia.
We do this incrementally: first only on Spark with one data source from a benchmark (data set or data generator) and very few queries. Eventually, we want to include more benchmarks, data sets, and Storm as well.
However, we’re very time-constrained to finish up everything by August 8 to submit our report (hence the incremental approach).
We containerized Bullet with the goal of running it on Kubernetes to enable easy reproducibility.
Best,
Frank
From: <bullet...@googlegroups.com> on behalf of Akshai Sarma <aksha...@gmail.com>
Date: Thursday, June 24, 2021 at 10:28 AM
To: bullet-users <bullet...@googlegroups.com>
Subject: Re: Spark Bullet Backend not reading messages from Kafka (PubSub)
That's great! Yea the backend has a few components and not sure how Spark doles out tasks if there are too few cores. I'd have imagined it'd work, albeit just slower. Guess not.
Do note that we built Bullet aiming for event-by-event processing in the backend. Our Spark backend implementation still uses DStreams (so micro-batches). We know for a fact that that doesn't scale very well to large amounts of data or queries. The Storm version, in contrast, we've scaled to ~6M simultaneous queries as well as ~2M records per sec. We want to look into Spark Structured Streaming if that let's us get off the micro-batching. We're also going to POC a Flink instance of Bullet soon, which already lets us do the low level event processing. With all that said, would you mind sharing what you're trying to do for your work? Are you comparing Bullet with other things?
Yea we'd totally be interested in a containerized version of Bullet! Before you contribute, please do make an issue laying out how you're structuring it and we can figure out the best place to put it.
On Thursday, June 24, 2021 at 12:57:00 AM UTC-7 fra...@illinois.edu wrote:
PS:
For now, we can confirm that the latest versions of all Bullet components start up with the example app from the tutorial.
We’ve also updated the tooling to the latest versions: Spark 3.1.2 with Hadoop 3.2, Kafka 2.8.0, nvm 0.38.0, and node.js 16.4.0.
The application is running fine natively on a MacBook (install-all-spark.sh) as well as containerized in a Docker setup with right-sized Spark cluster workers (standalone cluster).
We ultimately aim to bring everything up’n’running in Kubernetes.
Once we’ve confirmed a good setup and were able to reliably use it for our paper, we also hope to contribute back to the project, e.g. Dockerfiles, docker-compose.yml, Kubernetes helm charts, etc.
Best,
Frank
From: "Blechschmidt, Frank" <fra...@illinois.edu>
Date: Thursday, June 24, 2021 at 12:38 AM
To: "Blechschmidt, Frank" <fra...@illinois.edu>, Akshay Sarma <aksha...@gmail.com>
Cc: bullet-users <bullet...@googlegroups.com>, "Makam, Hareesh" <hma...@illinois.edu>, "Pendyala, Murali Krishna" <mk...@illinois.edu>
Subject: RE: Spark Bullet Backend not reading messages from Kafka (PubSub)
Got it working 🚀 (see attached screenshot)
After figuring out that I needed more workers, I at least was able to start the Spark backend.
I started playing with the number of Spark workers. The app also starts with “only” 4 workers.
However, I was still observing the error logs and crashes from my previous email.
As it turns out, this got triggered whenever I tried the example query in the Bullet UI. So… the query processing caused the Spark executors to crash.
The Spark UI gave some hints that it might be related to resources. Originally the defaults for the workers were set to:
SPARK_WORKER_MEMORY=1G
SPARK_WORKER_CORES=1
After increasing it to
SPARK_WORKER_MEMORY=12G
SPARK_WORKER_CORES=2
the query processing in Bullet started working.
Conclusion: Bullet needs more than 2 Spark workers, and each worker needs more than 1CPU/1GB.
Do you know by any chance good settings for Bullet? How many Spark workers? What resources should Spark worker have?
Cheers,
Frank
From: bullet...@googlegroups.com <bullet...@googlegroups.com> On Behalf Of Blechschmidt, Frank
Sent: Wednesday, June 23, 2021 11:44 PM
To: Akshay Sarma <aksha...@gmail.com>
Cc: bullet-users <bullet...@googlegroups.com>; Makam, Hareesh <hma...@illinois.edu>; Pendyala, Murali Krishna <mk...@illinois.edu>
Subject: RE: Spark Bullet Backend not reading messages from Kafka (PubSub)
[Thank you in advance for your help!]
The non-Docker version starts up successfully on the older artifact versions of bullet.
I also figured out the problem with the Docker-powered setup: the Spark cluster (standalone) in Docker (using docker-compose) had only 2 workers.
So, the tasks would starve since they could not find executors.
I’m now running a setup with 10 workers on the latest release versions of bullet and I can observe more activity in the Spark cluster.
Still, I’m not able to get a response in the UI/web service.
It looks like some issue with the processing of RDDs (I’ve also attached screenshots from the Spark UI):
org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[1460] at receiverStream at RandomProducer.scala:19 after its blocks have been removed!
org.apache.spark.SparkException: Attempted to use BlockRDD[1460] at receiverStream at RandomProducer.scala:19 after its blocks have been removed!
at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:55)
at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)
at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:50)
at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:110)
at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2333)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:2307)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2$adapted(DAGScheduler.scala:1329)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1226)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5(DAGScheduler.scala:1229)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5$adapted(DAGScheduler.scala:1228)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1228)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1168)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2395)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1341)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1226)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5(DAGScheduler.scala:1229)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5$adapted(DAGScheduler.scala:1228)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1228)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1168)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2395)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1012)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:1010)
at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1(ResultEmitter.scala:20)
at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1$adapted(ResultEmitter.scala:20)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Attempted to use BlockRDD[1460] at receiverStream at RandomProducer.scala:19 after its blocks have been removed!
at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:55)
at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)
at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:50)
at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:110)
at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2333)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:2307)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2$adapted(DAGScheduler.scala:1329)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1226)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5(DAGScheduler.scala:1229)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5$adapted(DAGScheduler.scala:1228)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1228)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1168)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2395)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
There are also some other errors:
org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 1705, num of partitions: 2]; Checkpoint RDD [ID: 1720, num of partitions: 0].
at org.apache.spark.rdd.ReliableCheckpointRDD$.writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:183)
at org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:59)
at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:75)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1885)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1875)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3(RDD.scala:1887)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3$adapted(RDD.scala:1887)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1887)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1875)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3(RDD.scala:1887)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3$adapted(RDD.scala:1887)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1887)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1875)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2198)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1012)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:1010)
at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1(ResultEmitter.scala:20)
at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1$adapted(ResultEmitter.scala:20)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
From: bullet...@googlegroups.com <bullet...@googlegroups.com> On Behalf Of Akshay Sarma
Sent: Wednesday, June 23, 2021 11:03 PM
To: Blechschmidt, Frank <fra...@illinois.edu>
Cc: bullet-users <bullet...@googlegroups.com>; Makam, Hareesh <hma...@illinois.edu>; Pendyala, Murali Krishna <mk...@illinois.edu>
Subject: Re: Spark Bullet Backend not reading messages from Kafka (PubSub)
Yea try taking it out of Docker and try the exact versions as in the current quick-start. I'll try to get your versions running tomorrow and see if I run into the same issues.
On Wed, Jun 23, 2021 at 7:36 PM Blechschmidt, Frank <fra...@illinois.edu> wrote:
Yes, we’ve followed the tutorial, but updated the versions and run it in Docker containers locally.
I’m currently trying to run it natively (non-Docker) on my machine and want to compare the difference.
What confuses me right now is that the Spark backend seem sto connect to Kafka successfully, but is not processing any messages.
These are the versions we’re running:
BULLET_EXAMPLES_VERSION=1.0.0
BULLET_KAFKA_VERSION=1.2.2
BULLET_SPARK_VERSION=1.0.4
BULLET_UI_VERSION=1.1.0
BULLET_WS_VERSION=1.1.0
Java 1.8, Node.js 16, Spark 3.1, Zookeeper 3.7, Kafka 2.8 (runs on port 29092)
I’ve attached the configurations for reference.
Best,
FRANK BLECHSCHMIDT
Graduate Student
The Grainger College of Engineering
Computer Science
fra...@illinois.edu
Error! Filename not specified.
Under the Illinois Freedom of Information Act any written communication to or from university employees regarding university business is a public record and may be subject to public disclosure.
From: <bullet...@googlegroups.com> on behalf of Akshai Sarma <aksha...@gmail.com>
Date: Wednesday, June 23, 2021 at 1:42 PM
To: bullet-users <bullet...@googlegroups.com>
Subject: Re: Spark Bullet Backend not reading messages from Kafka (PubSub)
So it looks like everything is being run locally. Did you try the quickstart for Spark here: https://bullet-db.github.io/quick-start/spark/ (it is using an older version of Bullet 1.x - we need to update our documentation) already? Did that also result in the same errors?
If that doesn't work, can you post all the configs you used? The kafka settings, the bullet spark config and the bullet service config (including the pubsub config you passed to the bullet service).
On Tuesday, June 22, 2021 at 10:34:49 PM UTC-7 fra...@illinois.edu wrote:
Hi,
We’re a team of 3 graduate students working on a paper using Bullet for our evaluation. We’re trying to setup the example application locally, but are running into an issue:
All components are up and running locally (UI, web service, Kafka as pubsub, Spark as backend), but the Spark backend does not read or process the messages from the bullet.requests Kafka topic.
We’ve also deployed Kafdrop to monitor the messages in the two topics: bullet.requests and bullet.responses. The bullet.requests topic has messages, bullet.responses stays empty.
However, the Spark logs indicate that the BulletSparkStreamingMain app launched and connected successfully:
[INFO ] 2021-06-23 04:41:39,533 org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.6.0
[INFO ] 2021-06-23 04:41:39,533 org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 62abe01bee039651
[INFO ] 2021-06-23 04:41:39,533 org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1624423299529
[INFO ] 2021-06-23 04:41:39,540 org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Subscribed to topic(s): bullet.requests
[INFO ] 2021-06-23 04:41:39,543 com.yahoo.bullet.spark.QueryReceiver - Setup PubSub: com.yahoo.bullet.kafka.KafkaPubSub@36b35cc3 with Subscriber: com.yahoo.bullet.kafka.KafkaSubscriber@73967938
[INFO ] 2021-06-23 04:41:39,544 com.yahoo.bullet.spark.QueryReceiver - Query receiver started.
[INFO ] 2021-06-23 04:41:39,545 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Called receiver 0 onStart
[INFO ] 2021-06-23 04:41:39,545 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Waiting for receiver to be stopped
[INFO ] 2021-06-23 04:41:40,788 org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Cluster ID: QgfInwnoRnCFliTIEw3P6g
[INFO ] 2021-06-23 04:41:40,792 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Discovered group coordinator kafka:29092 (id: 2147482646 rack: null)
[INFO ] 2021-06-23 04:41:40,800 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] (Re-)joining group
[INFO ] 2021-06-23 04:41:40,823 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
[INFO ] 2021-06-23 04:41:40,824 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] (Re-)joining group
[INFO ] 2021-06-23 04:41:42,657 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Successfully joined group with generation 21
[INFO ] 2021-06-23 04:41:42,672 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Notifying assignor about the new Assignment(partitions=[bullet.requests-0])
[INFO ] 2021-06-23 04:41:42,685 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Adding newly assigned partitions: bullet.requests-0
[INFO ] 2021-06-23 04:41:42,723 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Setting offset for partition bullet.requests-0 to the committed offset FetchPosition{offset=228, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[kafka:29092 (id: 1001 rack: null)], epoch=0}}
However, there are no logs of the app successfully processing any of the messages piling up in bullet.requests. So, eventually the Bullet web service fails with the following error in the logs:
bullet-webservice_1 | 2021-06-23 05:12:05.022 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService : Backend is not up! Failing all queries and refusing to accept new queries
bullet-webservice_1 | 2021-06-23 05:12:34.995 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService : Backend is not up! Failing all queries and refusing to accept new queries
bullet-webservice_1 | 2021-06-23 05:13:04.988 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService : Backend is not up! Failing all queries and refusing to accept new queries
bullet-webservice_1 | 2021-06-23 05:13:34.963 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService : Backend is not up! Failing all queries and refusing to accept new queries
This is an example message from Kafdrop in bullet.requests (“default” key and message format):
��sr%com.yahoo.bullet.pubsub.PubSubMessage��,�m4 �[1]
[contentt[1][BL[1]idt Ljava/lang/String;L metadatat"Lcom/yahoo/bullet/pubsub/Metadata;xpur[1][B��� T�[1]xp
���sr com.yahoo.bullet.query.Query 7 �C���[1]L
aggregationt1Lcom/yahoo/bullet/query/aggregations/Aggregation;L durationtLjava/lang/Long;L filtert/Lcom/yahoo/bullet/query/expressions/Expression;LpostAggregationstLjava/util/List;Lprojectiont#Lcom/yahoo/bullet/query/Projection;L
tableFunctiont5Lcom/yahoo/bullet/query/tablefunctions/TableFunction;L windowtLcom/yahoo/bullet/query/Window;xpsr'com.yahoo.bullet.query.aggregations.Raw��X����[1]xr/com.yahoo.bullet.query.aggregations.Aggregation�93�n>V"[1][1]L
sizet Ljava/lang/Integer;L
typet5Lcom/yahoo/bullet/query/aggregations/AggregationType;xpsr java.lang.Integer ⠤���8[1]Ivaluexrjava.lang.Number���
���[1]xp~r3com.yahoo.bullet.query.aggregations.AggregationType xr
java.lang.Enum xpt
RAWsr
java.lang.Long;��̏#�[1]Jvaluexq~ppsr!com.yahoo.bullet.query.Projection�g���_a�[1][1]L fieldsq~
L
typet(Lcom/yahoo/bullet/query/Projection$Type;xpp~r&com.yahoo.bullet.query.Projection$Type xq~ t
PASS_THROUGHpsrcom.yahoo.bullet.query.Window2�x,}���[1]
L emitEveryq~
L emitTypet$Lcom/yahoo/bullet/query/Window$Unit;L
includeFirstq~
L
includeTypeq~‑xpppppt$6b3a0f8c-871a-437b-8c8d-e0dcf85b9a63sr$com.yahoo.bullet.kafka.KafkaMetadata@��YwJ|[1]L
topicPartitiont(Lorg/apache/kafka/common/TopicPartition;xr com.yahoo.bullet.pubsub.Metadatag�PE�+2
[1]
JcreatedLcontentt Ljava/lang/Object;L signalt)Lcom/yahoo/bullet/pubsub/Metadata$Signal;xpz7HT�tWebservice status tickpsr&org.apache.kafka.common.TopicPartition�{��|�U[1]
I
hashI partitionLtopicq~[1]xptbullet.responses
Any ideas what could cause the backend to not process the messages even though it connected to the Kafka consumer group? Could it be a resource issues?
I’ve attached some screenshots of the Spark App UI for reference. This is the command that we used to start the backend:
spark-submit \
--master spark://spark:7077 \
--deploy-mode cluster \
--class com.yahoo.bullet.spark.BulletSparkStreamingMain \
--jars /opt/bitnami/spark/jars/bullet-kafka-1.2.2-fat.jar,/opt/bitnami/spark/jars/bullet-spark-example.jar \
/opt/bitnami/spark/jars/bullet-spark-1.0.4-standalone.jar \
--bullet-spark-conf /opt/bitnami/spark/conf/bullet_spark_kafka_settings.yaml
Best regards,
FRANK BLECHSCHMIDT
Graduate Student
The Grainger College of Engineering
Computer Science
fra...@illinois.edu
Under the Illinois Freedom of Information Act any written communication to or from university employees regarding university business is a public record and may be subject to public disclosure.--
You received this message because you are subscribed to the Google Groups "bullet-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to bullet-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/bullet-users/BY5PR11MB41968AF77E1ADC16F47814979B079%40BY5PR11MB4196.namprd11.prod.outlook.com.--
You received this message because you are subscribed to the Google Groups "bullet-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to bullet-users...@googlegroups.com.To view this discussion on the web visit https://groups.google.com/d/msgid/bullet-users/93577fd6-c441-456c-bd9c-7cbefefd73afn%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "bullet-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to bullet-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/bullet-users/c7b43104-30cd-484d-bb7e-0a978a98c540n%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "bullet-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
bullet-users...@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/bullet-users/2c73a673-0d78-48cd-9cef-05ec314c9115n%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "bullet-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
bullet-users...@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/bullet-users/8ECD0394-AD21-4603-BDB5-140F7B6231B4%40illinois.edu.
Yes, single Kafka topic. We use Bullet DSL with the KafkaConnector for that.
We have 4 queries with a stream observation time of 5min:
For 1:
>> Increase your partitions and set your parallelism for your receivers to match.
Are you referring to bullet.spark.data.producer.parallelism?
With two executors and each having 7 cores, how much of that should be reserved for the Kafka ingestion side? E.g., 7 cores, 5 partitions, parallelism of 5 with 2 cores left for other tasks?
For 2 & 3:
We’re only running 1 query at a time. Are any of these settings needed?
Regarding checkpointing:
We’ve observed that Bullet’s Spark backend can end up in a situation of linearly increasing “Scheduling Delay” with every checkpoint causing an additional slight jump upwards.
First, how can we avoid such a delay (looking at your Spark performance page, it should be avoidable)?
Second, any tips on tuning the checkpointing?
Regarding Storm:
We would love to have Storm in our evaluation as well, but out course is very time-restricted and we have to submit our final report/paper by August 8 :(
Cheers,
FRANK BLECHSCHMIDT
Graduate Student
The Grainger College of Engineering
Computer Science
fra...@illinois.edu
To view this discussion on the web visit
https://groups.google.com/d/msgid/bullet-users/e437dec5-2bf0-4189-82d9-9ba8b7990914n%40googlegroups.com.
Are you referring to bullet.spark.data.producer.parallelism?
With two executors and each having 7 cores, how much of that should be reserved for the Kafka ingestion side? E.g., 7 cores, 5 partitions, parallelism of 5 with 2 cores left for other tasks?
Yes, if you wrote your own producer. It's this. Wasn't sure if you were using Bullet DSL. It depends on how complex your data is and what you're doing to convert data from your Kafka source into a Bullet Record. If you only have 2 executors with 7 cores and 5 partitions for the entire job, I would actually scale out the cores for the FilterStreaming to at least match the Data Producer. This is because if the Data Producer is just reading and doing minor work to convert data, the FilterStreaming is not only processing the same volume but also running it through all the queries. You would need to play with this to see what's happening.
We’re only running 1 query at a time. Are any of these settings needed?
If you're only running one query at a time (as in one instance of that one query), then no, you don't really need to tweak these. Looks like you're benchmarking the performance of a query type as opposed to scaling out data or queries.
We’ve observed that Bullet’s Spark backend can end up in a situation of linearly increasing “Scheduling Delay” with every checkpoint causing an additional slight jump upwards.
First, how can we avoid such a delay (looking at your Spark performance page, it should be avoidable)?
Second, any tips on tuning the checkpointing?
This just means that you can't process a batch in your batch interval in time for the next batch. We had the checkpoint multiplier settings for bullet-spark set to 20 when we tried it in Spark 2, with a batch interval of 2 s for our data. However, we were running a lot of small executors instead of 2 large ones like you guys. You can revisit this once you adjust the resources for the other Bullet components to see if it's still there.
Are you draining a queue in kafka when you start? You should set auto.offset.reset to latest to see if the initial read is causing run away delays.
Is spark backpressure on?
Yes, if you wrote your own producer. It's this. Wasn't sure if you were using Bullet DSL.
We do use Bullet DSL (KafkaConnector). Is bullet.spark.data.producer.parallelism then still the right knob to twist?
If you only have 2 executors with 7 cores and 5 partitions for the entire job, I would actually scale out the cores for the FilterStreaming to at least match the Data Producer.
We currently reserve two Kubernetes nodes as workers in a Standalone Spark cluster setup on K8s. That said, we can run multiple executors on one worker node tweaking spark.cores.max and spark.executor.cores. So, the question essentially becomes: what does Bullet benefit more from… few high CPU executors (one executor per node) or many low CPU executors (multiple executors per node)?
I would actually scale out the cores for the FilterStreaming to at least match the Data Producer
Alrighty, that sounds like tweaking bullet.spark.filter.partition.parallel.mode.parallelism has to get added to our list.
It depends on how complex your data is and what you're doing to convert data from your Kafka source into a Bullet Record
We have pretty flat JSON objects that we convert with
dsl:
converter:
class:
name: com.yahoo.bullet.dsl.converter.JSONBulletRecordConverter
deserializer:
class:
name: com.yahoo.bullet.dsl.deserializer.IdentityDeserializer
…and a schema.
Looks like you're benchmarking the performance of a query type as opposed to scaling out data or queries.
That’s accurate: we’re studying the performance of the different queries we selected comparing Bullet (potentially accelerating the processing with sketches) vs implementing them directly on Spark (no sketches).
This just means that you can't process a batch in your batch interval in time for the next batch. We had the checkpoint multiplier settings for bullet-spark set to 20 when we tried it in Spark 2, with a batch interval of 2 s for our data. However, we were running a lot of small executors instead of 2 large ones like you guys. You can revisit this once you adjust the resources for the other Bullet components to see if it's still there.
Yeah, we’re playing with the checkpointing parameters right now.
Are you draining a queue in kafka when you start? You should set auto.offset.reset to latest to see if the initial read is causing run away delays.
Do you mean bullet.dsl.connector.kafka.start.at.end.enable?
Is spark backpressure on?
Playing with that setting too right now.
I think fundamentally our questions are: what kind of core/executor and executor/kafka topics ratio does Bullet benefit from? And how do we optimize for utilization?
We will try out the parameters you mentioned and see whether we’re observing any improvements and will also have another look at the parameters used on the bullet-spark performance page.
Cheers, Frank
We also observe that the most time is lost/spent in the DStream.union(); what could be the cause of this? (see attachment)
Start command:
spark-submit
--master
spark://sketchbench-espbench-spark-master-svc:7077
--deploy-mode
client
--conf
spark.driver.host=sketchbench-espbench-bullet-spark-backend-driver
--conf
spark.driver.port=4041
--conf
spark.driver.blockManager.port=4042
--conf
spark.driver.bindAddress=0.0.0.0
--conf
spark.ui.prometheus.enabled=true
--conf
spark.metrics.appStatusSource.enabled=true
--conf
spark.sql.streaming.metricsEnabled=true
--conf
spark.cores.max=18
--conf
spark.executor.cores=3
--conf
spark.executor.memory=13G
--conf
spark.streaming.backpressure.enabled=true
--conf
spark.dynamicAllocation.enabled=false
--conf
spark.driver.extraJavaOptions="-XX:+UseG1GC"
--conf
spark.executor.extraJavaOptions="-XX:+UseG1GC"
--class
com.yahoo.bullet.spark.BulletSparkStreamingMain
--jars
/opt/bitnami/spark/jars/bullet-kafka-fat.jar,/opt/bitnami/spark/jars/bullet-dsl.jar,/opt/bitnami/spark/jars/bullet-spark-example.jar
/opt/bitnami/spark/jars/bullet-spark-standalone.jar
--bullet-spark-conf
/bullet/configs/bullet_spark_settings.yaml
I’ve also attached the bullet_spark_settings.yaml conf file.
To view this discussion on the web visit https://groups.google.com/d/msgid/bullet-users/BY5PR11MB4196024AA0E123D4C5EFC9AB9BEC9%40BY5PR11MB4196.namprd11.prod.outlook.com.
We do use Bullet DSL (KafkaConnector). Is bullet.spark.data.producer.parallelism then still the right knob to twist?
Yes, if you're using DSL, you'd be using DSLDataProducer and this should parallelize that.
We currently reserve two Kubernetes nodes as workers in a Standalone Spark cluster setup on K8s. That said, we can run multiple executors on one worker node tweaking spark.cores.max and spark.executor.cores. So, the question essentially becomes: what does Bullet benefit more from… few high CPU executors (one executor per node) or many low CPU executors (multiple executors per node)?
Generally the less network hopping you do, the better but I have also seen a lot of contention and context switching happening causing poor performance based on machine arch that makes it actually worse in some cases. In your case, since you only have 2 nodes in the cluster and you're planning to saturate them, it shouldn't really matter. If you had a multi-tenant cluster and your executors were split across the logical equivalent of 2 nodes (physically mapping to multiple nodes), then I'd suggest trying many low CPU executors.
That’s accurate: we’re studying the performance of the different queries we selected comparing Bullet (potentially accelerating the processing with sketches) vs implementing them directly on Spark (no sketches).
Hmm, this would boil down to benchmarking Sketches themselves right? You're essentially comparing how efficient sketches are for a problem as opposed to native distributed implementations. The Sketches themselves have benchmarks that you can also use. For instance: Theta sketches: http://datasketches.apache.org/docs/Theta/ThetaUpdateSpeed.html
Do you mean bullet.dsl.connector.kafka.start.at.end.enable?
This will seek to the end even if you were within the retention of your Kafka topic. You may do that but just be aware that you will definitely drop data on startup even if you were restarting after a brief gap. You can also provide any Kafka consumer setting to the KafkaConnector by prefixing it with bullet.dsl.connector.kafka. You can provide the setting I was talking about that just resets your offset to the latest offset if you started outside your retention by using bullet.dsl.connector.kafka.auto.offset.reset: true
I think fundamentally our questions are: what kind of core/executor and executor/kafka topics ratio does Bullet benefit from? And how do we optimize for utilization?
I would just try both. The kafka piece doesn't seem as important as the QueryDatUnioning and the FilterStreaming stages. We've tried it using Spark on Yarn on many native machines. You have Spark on K8s on 2 nodes. There are too many factors that may be in play here so I can't really recommend anything particular without trying it out myself.
We also observe that the most time is lost/spent in the DStream.union(); what could be the cause of this? (see attachment)
Looks like you have a ton of data that needs to be sent to the JoinStreaming stage: https://github.com/bullet-db/bullet-spark/blob/master/src/main/scala/com/yahoo/bullet/spark/BulletSparkStreamingBaseJob.scala#L54. What's your batch interval and how many data records are in there? I am assuming you have 1 query running. Try sampling your data to see if the bottleneck scales with that.