Hi,
We’re a team of 3 graduate students working on a paper using Bullet for our evaluation. We’re trying to setup the example application locally, but are running into an issue:
All components are up and running locally (UI, web service, Kafka as pubsub, Spark as backend), but the Spark backend does not read or process the messages from the bullet.requests Kafka topic.
We’ve also deployed Kafdrop to monitor the messages in the two topics: bullet.requests and bullet.responses. The bullet.requests topic has messages, bullet.responses stays empty.
However, the Spark logs indicate that the BulletSparkStreamingMain app launched and connected successfully:
[INFO ] 2021-06-23 04:41:39,533 org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.6.0
[INFO ] 2021-06-23 04:41:39,533 org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 62abe01bee039651
[INFO ] 2021-06-23 04:41:39,533 org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1624423299529
[INFO ] 2021-06-23 04:41:39,540 org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Subscribed to topic(s): bullet.requests
[INFO ] 2021-06-23 04:41:39,543 com.yahoo.bullet.spark.QueryReceiver - Setup PubSub: com.yahoo.bullet.kafka.KafkaPubSub@36b35cc3 with Subscriber: com.yahoo.bullet.kafka.KafkaSubscriber@73967938
[INFO ] 2021-06-23 04:41:39,544 com.yahoo.bullet.spark.QueryReceiver - Query receiver started.
[INFO ] 2021-06-23 04:41:39,545 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Called receiver 0 onStart
[INFO ] 2021-06-23 04:41:39,545 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Waiting for receiver to be stopped
[INFO ] 2021-06-23 04:41:40,788 org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Cluster ID: QgfInwnoRnCFliTIEw3P6g
[INFO ] 2021-06-23 04:41:40,792 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Discovered group coordinator kafka:29092 (id: 2147482646 rack: null)
[INFO ] 2021-06-23 04:41:40,800 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] (Re-)joining group
[INFO ] 2021-06-23 04:41:40,823 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
[INFO ] 2021-06-23 04:41:40,824 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] (Re-)joining group
[INFO ] 2021-06-23 04:41:42,657 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Successfully joined group with generation 21
[INFO ] 2021-06-23 04:41:42,672 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Notifying assignor about the new Assignment(partitions=[bullet.requests-0])
[INFO ] 2021-06-23 04:41:42,685 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Adding newly assigned partitions: bullet.requests-0
[INFO ] 2021-06-23 04:41:42,723 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Setting offset for partition bullet.requests-0 to the committed offset FetchPosition{offset=228, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[kafka:29092 (id: 1001 rack: null)], epoch=0}}
However, there are no logs of the app successfully processing any of the messages piling up in bullet.requests. So, eventually the Bullet web service fails with the following error in the logs:
bullet-webservice_1 | 2021-06-23 05:12:05.022 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService : Backend is not up! Failing all queries and refusing to accept new queries
bullet-webservice_1 | 2021-06-23 05:12:34.995 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService : Backend is not up! Failing all queries and refusing to accept new queries
bullet-webservice_1 | 2021-06-23 05:13:04.988 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService : Backend is not up! Failing all queries and refusing to accept new queries
bullet-webservice_1 | 2021-06-23 05:13:34.963 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService : Backend is not up! Failing all queries and refusing to accept new queries
This is an example message from Kafdrop in bullet.requests (“default” key and message format):
��sr%com.yahoo.bullet.pubsub.PubSubMessage��,�m4�[1]
projectiont#Lcom/yahoo/bullet/query/Projection;L
tableFunctiont5Lcom/yahoo/bullet/query/tablefunctions/TableFunction;LwindowtLcom/yahoo/bullet/query/Window;xpsr'com.yahoo.bullet.query.aggregations.Raw��X����[1]xr/com.yahoo.bullet.query.aggregations.Aggregation�93�n>V"[1][1]L
Any ideas what could cause the backend to not process the messages even though it connected to the Kafka consumer group? Could it be a resource issues?
I’ve attached some screenshots of the Spark App UI for reference. This is the command that we used to start the backend:
spark-submit \
--master spark://spark:7077 \
--deploy-mode cluster \
--class com.yahoo.bullet.spark.BulletSparkStreamingMain \
--jars /opt/bitnami/spark/jars/bullet-kafka-1.2.2-fat.jar,/opt/bitnami/spark/jars/bullet-spark-example.jar \
/opt/bitnami/spark/jars/bullet-spark-1.0.4-standalone.jar \
--bullet-spark-conf /opt/bitnami/spark/conf/bullet_spark_kafka_settings.yaml
Best regards,
FRANK BLECHSCHMIDT
Graduate Student
The Grainger College of Engineering
Computer Science
fra...@illinois.edu
Under the Illinois Freedom of Information Act any written communication to or from university employees regarding university business is a public record and may be subject to public disclosure.
Yes, we’ve followed the tutorial, but updated the versions and run it in Docker containers locally.
I’m currently trying to run it natively (non-Docker) on my machine and want to compare the difference.
What confuses me right now is that the Spark backend seem sto connect to Kafka successfully, but is not processing any messages.
These are the versions we’re running:
BULLET_EXAMPLES_VERSION=1.0.0
BULLET_KAFKA_VERSION=1.2.2
BULLET_SPARK_VERSION=1.0.4
BULLET_UI_VERSION=1.1.0
BULLET_WS_VERSION=1.1.0
Java 1.8, Node.js 16, Spark 3.1, Zookeeper 3.7, Kafka 2.8 (runs on port 29092)
I’ve attached the configurations for reference.
Best,
FRANK BLECHSCHMIDT
Graduate Student
The Grainger College of Engineering
Computer Science
fra...@illinois.edu
Under the Illinois Freedom of Information Act any written communication to or from university employees regarding university business is a public record and may be subject to public disclosure.
--
You received this message because you are subscribed to the Google Groups "bullet-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
bullet-users...@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/bullet-users/0f260c00-1a23-4150-baa4-ba257e4c64bcn%40googlegroups.com.
[Thank you in advance for your help!]
The non-Docker version starts up successfully on the older artifact versions of bullet.
I also figured out the problem with the Docker-powered setup: the Spark cluster (standalone) in Docker (using docker-compose) had only 2 workers.
So, the tasks would starve since they could not find executors.
I’m now running a setup with 10 workers on the latest release versions of bullet and I can observe more activity in the Spark cluster.
Still, I’m not able to get a response in the UI/web service.
It looks like some issue with the processing of RDDs (I’ve also attached screenshots from the Spark UI):
org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[1460] at receiverStream at RandomProducer.scala:19 after its blocks have been removed!
org.apache.spark.SparkException: Attempted to use BlockRDD[1460] at receiverStream at RandomProducer.scala:19 after its blocks have been removed!
at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:55)
at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)
at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:50)
at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:110)
at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2333)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:2307)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2$adapted(DAGScheduler.scala:1329)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1226)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5(DAGScheduler.scala:1229)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5$adapted(DAGScheduler.scala:1228)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1228)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1168)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2395)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1341)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1226)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5(DAGScheduler.scala:1229)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5$adapted(DAGScheduler.scala:1228)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1228)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1168)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2395)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1012)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:1010)
at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1(ResultEmitter.scala:20)
at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1$adapted(ResultEmitter.scala:20)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Attempted to use BlockRDD[1460] at receiverStream at RandomProducer.scala:19 after its blocks have been removed!
at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:55)
at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)
at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:50)
at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:110)
at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2333)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:2307)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2$adapted(DAGScheduler.scala:1329)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1226)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5(DAGScheduler.scala:1229)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5$adapted(DAGScheduler.scala:1228)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1228)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1168)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2395)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
There are also some other errors:
org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 1705, num of partitions: 2]; Checkpoint RDD [ID: 1720, num of partitions: 0].
at org.apache.spark.rdd.ReliableCheckpointRDD$.writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:183)
at org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:59)
at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:75)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1885)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1875)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3(RDD.scala:1887)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3$adapted(RDD.scala:1887)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1887)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1875)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3(RDD.scala:1887)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3$adapted(RDD.scala:1887)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1887)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1875)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2198)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1012)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:1010)
at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1(ResultEmitter.scala:20)
at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1$adapted(ResultEmitter.scala:20)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Got it working 🚀 (see attached screenshot)
After figuring out that I needed more workers, I at least was able to start the Spark backend.
I started playing with the number of Spark workers. The app also starts with “only” 4 workers.
However, I was still observing the error logs and crashes from my previous email.
As it turns out, this got triggered whenever I tried the example query in the Bullet UI. So… the query processing caused the Spark executors to crash.
The Spark UI gave some hints that it might be related to resources. Originally the defaults for the workers were set to:
SPARK_WORKER_MEMORY=1G
SPARK_WORKER_CORES=1
After increasing it to
SPARK_WORKER_MEMORY=12G
SPARK_WORKER_CORES=2
the query processing in Bullet started working.
Conclusion: Bullet needs more than 2 Spark workers, and each worker needs more than 1CPU/1GB.
Do you know by any chance good settings for Bullet? How many Spark workers? What resources should Spark worker have?
Cheers,
Frank
To view this discussion on the web visit https://groups.google.com/d/msgid/bullet-users/BY5PR11MB41968AF77E1ADC16F47814979B079%40BY5PR11MB4196.namprd11.prod.outlook.com.
PS:
For now, we can confirm that the latest versions of all Bullet components start up with the example app from the tutorial.
We’ve also updated the tooling to the latest versions: Spark 3.1.2 with Hadoop 3.2, Kafka 2.8.0, nvm 0.38.0, and node.js 16.4.0.
The application is running fine natively on a MacBook (install-all-spark.sh) as well as containerized in a Docker setup with right-sized Spark cluster workers (standalone cluster).
We ultimately aim to bring everything up’n’running in Kubernetes.
Once we’ve confirmed a good setup and were able to reliably use it for our paper, we also hope to contribute back to the project, e.g. Dockerfiles, docker-compose.yml, Kubernetes helm charts, etc.
Best,
Frank
Wow, that’s quite a scale for Storm! We wanted to start with Spark Streaming since it seemed to be a framework that’s easier to set up.
Eventually, we like to benchmark Storm as well and compare the performance of both. Support for Spark Structured Streaming and Flink would be amazing!
We essentially plan to run a subset of queries from existing streaming benchmarks (all queries that are currently supported in BQL) to compare sketch with non-sketch performance. The expectation is that the sketch-optimized queries run magnitudes faster. It would be interesting to see the actual factor for established benchmarks from academia.
We do this incrementally: first only on Spark with one data source from a benchmark (data set or data generator) and very few queries. Eventually, we want to include more benchmarks, data sets, and Storm as well.
However, we’re very time-constrained to finish up everything by August 8 to submit our report (hence the incremental approach).
We containerized Bullet with the goal of running it on Kubernetes to enable easy reproducibility.
Best,
Frank
To view this discussion on the web visit
https://groups.google.com/d/msgid/bullet-users/93577fd6-c441-456c-bd9c-7cbefefd73afn%40googlegroups.com.