Spark Bullet Backend not reading messages from Kafka (PubSub)

44 views
Skip to first unread message

Blechschmidt, Frank

unread,
Jun 23, 2021, 1:34:49 AM6/23/21
to bullet...@googlegroups.com, Makam, Hareesh, Pendyala, Murali Krishna

Hi,

 

We’re a team of 3 graduate students working on a paper using Bullet for our evaluation. We’re trying to setup the example application locally, but are running into an issue:

All components are up and running locally (UI, web service, Kafka as pubsub, Spark as backend), but the Spark backend does not read or process the messages from the bullet.requests Kafka topic.

We’ve also deployed Kafdrop to monitor the messages in the two topics: bullet.requests and bullet.responses. The bullet.requests topic has messages, bullet.responses stays empty.

 

However, the Spark logs indicate that the BulletSparkStreamingMain app launched and connected successfully:

 

[INFO ] 2021-06-23 04:41:39,533 org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.6.0

[INFO ] 2021-06-23 04:41:39,533 org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 62abe01bee039651

[INFO ] 2021-06-23 04:41:39,533 org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1624423299529

[INFO ] 2021-06-23 04:41:39,540 org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Subscribed to topic(s): bullet.requests

[INFO ] 2021-06-23 04:41:39,543 com.yahoo.bullet.spark.QueryReceiver - Setup PubSub: com.yahoo.bullet.kafka.KafkaPubSub@36b35cc3 with Subscriber: com.yahoo.bullet.kafka.KafkaSubscriber@73967938

[INFO ] 2021-06-23 04:41:39,544 com.yahoo.bullet.spark.QueryReceiver - Query receiver started.

[INFO ] 2021-06-23 04:41:39,545 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Called receiver 0 onStart

[INFO ] 2021-06-23 04:41:39,545 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Waiting for receiver to be stopped

[INFO ] 2021-06-23 04:41:40,788 org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Cluster ID: QgfInwnoRnCFliTIEw3P6g

[INFO ] 2021-06-23 04:41:40,792 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Discovered group coordinator kafka:29092 (id: 2147482646 rack: null)

[INFO ] 2021-06-23 04:41:40,800 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] (Re-)joining group

[INFO ] 2021-06-23 04:41:40,823 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.

[INFO ] 2021-06-23 04:41:40,824 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] (Re-)joining group

[INFO ] 2021-06-23 04:41:42,657 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Successfully joined group with generation 21

[INFO ] 2021-06-23 04:41:42,672 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Notifying assignor about the new Assignment(partitions=[bullet.requests-0])

[INFO ] 2021-06-23 04:41:42,685 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Adding newly assigned partitions: bullet.requests-0

[INFO ] 2021-06-23 04:41:42,723 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-bullet-query-consumer-1, groupId=bullet-query-consumer] Setting offset for partition bullet.requests-0 to the committed offset FetchPosition{offset=228, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[kafka:29092 (id: 1001 rack: null)], epoch=0}}

 

However, there are no logs of the app successfully processing any of the messages piling up in bullet.requests. So, eventually the Bullet web service fails with the following error in the logs:

 

bullet-webservice_1  | 2021-06-23 05:12:05.022 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService    : Backend is not up! Failing all queries and refusing to accept new queries

bullet-webservice_1  | 2021-06-23 05:12:34.995 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService    : Backend is not up! Failing all queries and refusing to accept new queries

bullet-webservice_1  | 2021-06-23 05:13:04.988 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService    : Backend is not up! Failing all queries and refusing to accept new queries

bullet-webservice_1  | 2021-06-23 05:13:34.963 ERROR 1 --- [pool-1-thread-1] c.y.bullet.rest.service.StatusService    : Backend is not up! Failing all queries and refusing to accept new queries

 

This is an example message from Kafdrop in bullet.requests (“default” key and message format):

 

��sr%com.yahoo.bullet.pubsub.PubSubMessage��,�m4�[1]


[contentt[1][BL[1]idtLjava/lang/String;Lmetadatat"Lcom/yahoo/bullet/pubsub/Metadata;xpur[1][B���T�[1]xp
���srcom.yahoo.bullet.query.Query7�C���[1]L
aggregationt1Lcom/yahoo/bullet/query/aggregations/Aggregation;LdurationtLjava/lang/Long;Lfiltert/Lcom/yahoo/bullet/query/expressions/Expression;LpostAggregationstLjava/util/List;L

projectiont#Lcom/yahoo/bullet/query/Projection;L

tableFunctiont5Lcom/yahoo/bullet/query/tablefunctions/TableFunction;Lwindowt­Lcom/yahoo/bullet/query/Window;xpsr'com.yahoo.bullet.query.aggregations.Raw��X����[1]xr/com.yahoo.bullet.query.aggregations.Aggregation�93�n>V"[1][1]L


sizetLjava/lang/Integer;L
typet5Lcom/yahoo/bullet/query/aggregations/AggregationType;xpsrjava.lang.Integer���8[1]Ivaluexrjava.lang.Number���
���[1]xp~r3com.yahoo.bullet.query.aggregations.AggregationTypexr
java.lang.Enumxpt
RAWsr
java.lang.Long;��̏#�[1]Jvaluexq~ppsr!com.yahoo.bullet.query.Projection�g���_a�[1][1]Lfieldsq~
L
typet(Lcom/yahoo/bullet/query/Projection$Type;xpp~r&com.yahoo.bullet.query.Projection$Typexq~t
PASS_THROUGHpsrcom.yahoo.bullet.query.Window2�x,}���[1]
L    emitEveryq~
LemitTypet$Lcom/yahoo/bullet/query/Window$Unit;L
includeFirstq~
L
includeTypeq~‑xpppppt$6b3a0f8c-871a-437b-8c8d-e0dcf85b9a63sr$com.yahoo.bullet.kafka.KafkaMetadata@��YwJ|[1]L
topicPartitiont(Lorg/apache/kafka/common/TopicPartition;xr com.yahoo.bullet.pubsub.Metadatag�PE�+2
[1]
JcreatedLcontenttLjava/lang/Object;Lsignalt)Lcom/yahoo/bullet/pubsub/Metadata$Signal;xpz7HT�tWebservice status tickpsr&org.apache.kafka.common.TopicPartition�{��|�U[1]
I
hashI       partitionLtopicq~[1]xptbullet.responses

 

 

Any ideas what could cause the backend to not process the messages even though it connected to the Kafka consumer group? Could it be a resource issues?

I’ve attached some screenshots of the Spark App UI for reference. This is the command that we used to start the backend:

 

spark-submit \

    --master spark://spark:7077 \

    --deploy-mode cluster \

    --class com.yahoo.bullet.spark.BulletSparkStreamingMain \

    --jars /opt/bitnami/spark/jars/bullet-kafka-1.2.2-fat.jar,/opt/bitnami/spark/jars/bullet-spark-example.jar \

    /opt/bitnami/spark/jars/bullet-spark-1.0.4-standalone.jar \

    --bullet-spark-conf /opt/bitnami/spark/conf/bullet_spark_kafka_settings.yaml

 

Best regards,

FRANK BLECHSCHMIDT
Graduate Student
 
The Grainger College of Engineering
Computer Science


fra...@illinois.edu
 
signature_1898741172

Under the Illinois Freedom of Information Act any written communication to or from university employees regarding university business is a public record and may be subject to public disclosure.

 

 

 

Screen Shot 2021-06-22 at 10.21.43 PM.png
Screen Shot 2021-06-22 at 10.21.34 PM.png
Screen Shot 2021-06-22 at 10.20.41 PM.png
Screen Shot 2021-06-22 at 10.20.07 PM.png
Screen Shot 2021-06-22 at 10.20.02 PM.png

Akshai Sarma

unread,
Jun 23, 2021, 4:42:27 PM6/23/21
to bullet-users
So it looks like everything is being run locally. Did you try the quickstart for Spark here: https://bullet-db.github.io/quick-start/spark/ (it is using an older version of Bullet 1.x - we need to update our documentation) already? Did that also result in the same errors?

If that doesn't work, can you post all the configs you used? The kafka settings, the bullet spark config and the bullet service config (including the pubsub config you passed to the bullet service).

Blechschmidt, Frank

unread,
Jun 23, 2021, 10:36:59 PM6/23/21
to Akshai Sarma, bullet-users, Makam, Hareesh, Pendyala, Murali Krishna

Yes, we’ve followed the tutorial, but updated the versions and run it in Docker containers locally.

I’m currently trying to run it natively (non-Docker) on my machine and want to compare the difference.

What confuses me right now is that the Spark backend seem sto connect to Kafka successfully, but is not processing any messages.

 

These are the versions we’re running:

 

BULLET_EXAMPLES_VERSION=1.0.0

BULLET_KAFKA_VERSION=1.2.2

BULLET_SPARK_VERSION=1.0.4

BULLET_UI_VERSION=1.1.0

BULLET_WS_VERSION=1.1.0

 

Java 1.8, Node.js 16, Spark 3.1, Zookeeper 3.7, Kafka 2.8 (runs on port 29092)

 

 

I’ve attached the configurations for reference.

 

 

Best,

FRANK BLECHSCHMIDT
Graduate Student
 
The Grainger College of Engineering
Computer Science


fra...@illinois.edu
 

signature_584808589

Image removed by sender. signature_1898741172



Under the Illinois Freedom of Information Act any written communication to or from university employees regarding university business is a public record and may be subject to public disclosure.

 

 

 

--
You received this message because you are subscribed to the Google Groups "bullet-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to bullet-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/bullet-users/0f260c00-1a23-4150-baa4-ba257e4c64bcn%40googlegroups.com.

bullet_spark_kafka_settings.yaml
env-settings.json
kafka_pubsub_config.yaml
query_config.yaml
schema.json

Akshay Sarma

unread,
Jun 24, 2021, 2:03:38 AM6/24/21
to Blechschmidt, Frank, bullet-users, Makam, Hareesh, Pendyala, Murali Krishna
Yea try taking it out of Docker and try the exact versions as in the current quick-start. I'll try to get your versions running tomorrow and see if I run into the same issues.

Blechschmidt, Frank

unread,
Jun 24, 2021, 2:43:42 AM6/24/21
to Akshay Sarma, bullet-users, Makam, Hareesh, Pendyala, Murali Krishna

[Thank you in advance for your help!]

 

The non-Docker version starts up successfully on the older artifact versions of bullet.

 

I also figured out the problem with the Docker-powered setup: the Spark cluster (standalone) in Docker (using docker-compose) had only 2 workers.

So, the tasks would starve since they could not find executors.

 

I’m now running a setup with 10 workers on the latest release versions of bullet and I can observe more activity in the Spark cluster.

Still, I’m not able to get a response in the UI/web service.

 

It looks like some issue with the processing of RDDs (I’ve also attached screenshots from the Spark UI):

 

org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[1460] at receiverStream at RandomProducer.scala:19 after its blocks have been removed!

org.apache.spark.SparkException: Attempted to use BlockRDD[1460] at receiverStream at RandomProducer.scala:19 after its blocks have been removed!

         at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)

         at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:55)

         at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)

         at scala.Option.getOrElse(Option.scala:189)

         at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)

         at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:50)

         at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:110)

         at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)

         at scala.Option.getOrElse(Option.scala:189)

         at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)

         at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2333)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)

         at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)

         at scala.collection.immutable.List.foreach(List.scala:392)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)

         at scala.collection.immutable.List.foreach(List.scala:392)

         at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)

         at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)

         at scala.collection.immutable.List.foreach(List.scala:392)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)

         at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)

         at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)

         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)

         at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)

         at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:2307)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2(DAGScheduler.scala:1329)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2$adapted(DAGScheduler.scala:1329)

         at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)

         at scala.collection.Iterator.foreach(Iterator.scala:941)

         at scala.collection.Iterator.foreach$(Iterator.scala:941)

         at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

         at scala.collection.IterableLike.foreach(IterableLike.scala:74)

         at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

         at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

         at scala.collection.TraversableLike.map(TraversableLike.scala:238)

         at scala.collection.TraversableLike.map$(TraversableLike.scala:231)

         at scala.collection.AbstractTraversable.map(Traversable.scala:108)

         at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1329)

         at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1226)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5(DAGScheduler.scala:1229)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5$adapted(DAGScheduler.scala:1228)

         at scala.collection.immutable.List.foreach(List.scala:392)

         at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1228)

         at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1168)

         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2395)

         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)

         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)

         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

 

         at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)

         at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)

         at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)

         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)

         at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)

         at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1341)

         at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1226)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5(DAGScheduler.scala:1229)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5$adapted(DAGScheduler.scala:1228)

         at scala.collection.immutable.List.foreach(List.scala:392)

         at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1228)

         at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1168)

         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2395)

         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)

         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)

         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

         at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)

         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)

         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)

         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)

         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)

         at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1012)

         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

         at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)

         at org.apache.spark.rdd.RDD.foreach(RDD.scala:1010)

         at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1(ResultEmitter.scala:20)

         at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1$adapted(ResultEmitter.scala:20)

         at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)

         at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)

         at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)

         at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

         at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)

         at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)

         at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

         at scala.util.Try$.apply(Try.scala:213)

         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)

         at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)

         at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)

         at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)

         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

         at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.spark.SparkException: Attempted to use BlockRDD[1460] at receiverStream at RandomProducer.scala:19 after its blocks have been removed!

         at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)

         at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:55)

         at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)

         at scala.Option.getOrElse(Option.scala:189)

         at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)

         at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:50)

         at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:110)

         at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:324)

         at scala.Option.getOrElse(Option.scala:189)

         at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:324)

         at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2333)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)

         at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)

         at scala.collection.immutable.List.foreach(List.scala:392)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)

         at scala.collection.immutable.List.foreach(List.scala:392)

         at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$3(DAGScheduler.scala:2344)

         at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)

         at scala.collection.immutable.List.foreach(List.scala:392)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2(DAGScheduler.scala:2343)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$getPreferredLocsInternal$2$adapted(DAGScheduler.scala:2341)

         at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)

         at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)

         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)

         at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2341)

         at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:2307)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2(DAGScheduler.scala:1329)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitMissingTasks$2$adapted(DAGScheduler.scala:1329)

         at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)

         at scala.collection.Iterator.foreach(Iterator.scala:941)

         at scala.collection.Iterator.foreach$(Iterator.scala:941)

         at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

         at scala.collection.IterableLike.foreach(IterableLike.scala:74)

         at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

         at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

         at scala.collection.TraversableLike.map(TraversableLike.scala:238)

         at scala.collection.TraversableLike.map$(TraversableLike.scala:231)

         at scala.collection.AbstractTraversable.map(Traversable.scala:108)

         at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1329)

         at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1226)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5(DAGScheduler.scala:1229)

         at org.apache.spark.scheduler.DAGScheduler.$anonfun$submitStage$5$adapted(DAGScheduler.scala:1228)

         at scala.collection.immutable.List.foreach(List.scala:392)

         at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1228)

         at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1168)

         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2395)

         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)

         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)

         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

 

 

 

 

 

 

There are also some other errors:

 

org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 1705, num of partitions: 2]; Checkpoint RDD [ID: 1720, num of partitions: 0].
         at org.apache.spark.rdd.ReliableCheckpointRDD$.writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:183)
         at org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:59)
         at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:75)
         at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1885)
         at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
         at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1875)
         at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3(RDD.scala:1887)
         at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3$adapted(RDD.scala:1887)
         at scala.collection.immutable.List.foreach(List.scala:392)
         at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1887)
         at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
         at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1875)
         at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3(RDD.scala:1887)
         at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$3$adapted(RDD.scala:1887)
         at scala.collection.immutable.List.foreach(List.scala:392)
         at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1887)
         at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
         at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1875)
         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2198)
         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
         at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1012)
         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
         at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
         at org.apache.spark.rdd.RDD.foreach(RDD.scala:1010)
         at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1(ResultEmitter.scala:20)
         at com.yahoo.bullet.spark.ResultEmitter$.$anonfun$emit$1$adapted(ResultEmitter.scala:20)
         at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
         at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
         at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
         at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
         at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
         at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
         at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
         at scala.util.Try$.apply(Try.scala:213)
         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
         at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
         at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
         at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:748)
streaming.PNG
executors.PNG
executors.PNG
stages.PNG

Blechschmidt, Frank

unread,
Jun 24, 2021, 3:38:19 AM6/24/21
to Blechschmidt, Frank, Akshay Sarma, bullet-users, Makam, Hareesh, Pendyala, Murali Krishna

Got it working 🚀 (see attached screenshot)

 

After figuring out that I needed more workers, I at least was able to start the Spark backend.

I started playing with the number of Spark workers. The app also starts with “only” 4 workers.

 

However, I was still observing the error logs and crashes from my previous email.

As it turns out, this got triggered whenever I tried the example query in the Bullet UI. So… the query processing caused the Spark executors to crash.

 

The Spark UI gave some hints that it might be related to resources. Originally the defaults for the workers were set to:

SPARK_WORKER_MEMORY=1G

SPARK_WORKER_CORES=1

After increasing it to

SPARK_WORKER_MEMORY=12G

SPARK_WORKER_CORES=2

the query processing in Bullet started working.

 

Conclusion: Bullet needs more than 2 Spark workers, and each worker needs more than 1CPU/1GB.

 

Do you know by any chance good settings for Bullet? How many Spark workers? What resources should Spark worker have?

 

Cheers,

Frank

working.PNG

Blechschmidt, Frank

unread,
Jun 24, 2021, 3:57:00 AM6/24/21
to Akshay Sarma, bullet-users, Makam, Hareesh, Pendyala, Murali Krishna

PS:

For now, we can confirm that the latest versions of all Bullet components start up with the example app from the tutorial.

We’ve also updated the tooling to the latest versions: Spark 3.1.2 with Hadoop 3.2, Kafka 2.8.0, nvm 0.38.0, and node.js 16.4.0.

The application is running fine natively on a MacBook (install-all-spark.sh) as well as containerized in a Docker setup with right-sized Spark cluster workers (standalone cluster).

We ultimately aim to bring everything up’n’running in Kubernetes.

 

Once we’ve confirmed a good setup and were able to reliably use it for our paper, we also hope to contribute back to the project, e.g. Dockerfiles, docker-compose.yml, Kubernetes helm charts, etc.

 

Best,

Frank

Akshai Sarma

unread,
Jun 24, 2021, 1:27:49 PM6/24/21
to bullet-users
That's great! Yea the backend has a few components and not sure how Spark doles out tasks if there are too few cores. I'd have imagined it'd work, albeit just slower. Guess not.

Do note that we built Bullet aiming for event-by-event processing in the backend. Our Spark backend implementation still uses DStreams (so micro-batches). We know for a fact that that doesn't scale very well to large amounts of data or queries. The Storm version, in contrast, we've scaled to ~6M simultaneous queries as well as ~2M records per sec. We want to look into Spark Structured Streaming if that let's us get off the micro-batching. We're also going to POC a Flink instance of Bullet soon, which already lets us do the low level event processing. With all that said, would you mind sharing what you're trying to do for your work? Are you comparing Bullet with other things?

Yea we'd totally be interested in a containerized version of Bullet! Before you contribute, please do make an issue laying out how you're structuring it and we can figure out the best place to put it.

Blechschmidt, Frank

unread,
Jun 24, 2021, 4:40:18 PM6/24/21
to Akshai Sarma, bullet-users, Makam, Hareesh, Pendyala, Murali Krishna

Wow, that’s quite a scale for Storm! We wanted to start with Spark Streaming since it seemed to be a framework that’s easier to set up.

Eventually, we like to benchmark Storm as well and compare the performance of both. Support for Spark Structured Streaming and Flink would be amazing!

 

We essentially plan to run a subset of queries from existing streaming benchmarks (all queries that are currently supported in BQL) to compare sketch with non-sketch performance. The expectation is that the sketch-optimized queries run magnitudes faster. It would be interesting to see the actual factor for established benchmarks from academia.

We do this incrementally: first only on Spark with one data source from a benchmark (data set or data generator) and very few queries. Eventually, we want to include more benchmarks, data sets, and Storm as well.

However, we’re very time-constrained to finish up everything by August 8 to submit our report (hence the incremental approach).

 

We containerized Bullet with the goal of running it on Kubernetes to enable easy reproducibility.

 

Best,

Frank

Akshai Sarma

unread,
Jun 24, 2021, 4:44:06 PM6/24/21
to bullet-users
I see, thanks for letting us know. If we can help with something, don't hesitate to ask!
Reply all
Reply to author
Forward
0 new messages