[Spark Streaming] Parallel reading from Kafka

2,733 views
Skip to first unread message

bart.ve...@portico.io

unread,
Sep 20, 2013, 3:22:59 AM9/20/13
to spark...@googlegroups.com
Hi,

When reading from Kafka, I noticed that all reading is done on a single Spark instance.
It seems not to be distributed among multiple spark-workers.
(when testing on a 10 node spark cluster, we saw indeed that one node had a CPU usage of almost 100%, the one reading from Kafka, while the others were much lower)

When taking a look at the KafkaStreaming code :
/**
 * Input stream that pulls messages from a Kafka Broker.
 * 
 * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
 * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
 * in its own thread.
 * @param storageLevel RDD storage level.
 */
private[streaming]
class KafkaInputDStream[T: ClassManifest, D <: Decoder[_]: Manifest]

It seems that indeed all partitions are read in a different thread, but within the same task.
So basically my question: Is it possible to distribute the reading from Kafka among multiple workers?
If not, this might be a huge bottleneck as you're bound to the network bandwidth and processing power of this single instance.

We could implement our own KafkaInputDStream classes so that we end up with e.g. one DStream for each partition.
That would automatically make it distributed, no?

Or am I missing something here?

Greets,
Bart

Jerry Shao

unread,
Sep 22, 2013, 2:36:58 AM9/22/13
to spark...@googlegroups.com
Hi Bart, 

I think you can create multiple KafkaInputDStream to distribute kafak receivers among workers. If you have 10 nodes, you can create like this:

val kafkaInputs = (1 to 10).map { _ =>
  ssc.kafkaStream(zkQuorum, groupId, Map("topic" -> 1))
}

to create 10 kafkaInputs. And then union all this DStreams like:

val union = ssc.union(kafkaInputs)

Then you can manipulate DStream transformation on this union DStream.

Internally, each kafka receivers is wrapped as a task and distributed to cluster by spark scheduler, so default only one task will be launched on one node as you saw, creating multiple receivers will launch multiple tasks simultaneously, this can solve your bottleneck problem.

Thanks
Jerry


在 2013年9月20日星期五UTC+8下午3时22分59秒,bart.ve...@portico.io写道:

bart.ve...@portico.io

unread,
Oct 12, 2013, 5:34:10 AM10/12/13
to spark...@googlegroups.com
Thanks Jerry,

This seems to work ... now the only thing that remains is to connect this logic to zookeeper.
As the partitioning/kafka-brokers is known in zookeeper, the KafkaInputDStream should take care of this.
I might look at a solution to wrap this around the KafkaInputDStream so that the parallelism is handled automatically ...

thanks for your suggestion.
Greets,
Bart

Jerry Shao

unread,
Oct 14, 2013, 4:44:46 AM10/14/13
to spark...@googlegroups.com
Hi Bart,

Yes, NetworkReceiver has getLocationPreference() function, you can override it in KafkaReceiver to add location hints for KafkaInputDstream, this location hints can be got from zookeeper  as  you said. One concern is that actually Kafka designers may not want users to read zookeeper data directly, since if the data format or path is changed as Kafka version changes, the program will get failed.

Thanks
Jerry

在 2013年10月12日星期六UTC+8下午5时34分10秒,bart.ve...@portico.io写道:

bart.ve...@portico.io

unread,
Nov 15, 2013, 6:30:01 AM11/15/13
to spark...@googlegroups.com
Hi Jerry,

We noticed some problems in the KafkaInputDStream.
It seems that it loses data when reading from multiple partitions on a topic.
In the kafka implementation in spark, every partition has its own thread. The data fetched from each thread is just added together, without locking or thread synchronisation.
Also I noticed "lockups" on the kafka input after some time.
When the job runs for a certain period of time, suddenly nothing is read anymore from kafka ...
Seems like there is a threading issue inside the kafkaInput.

You noticed anything of this?

Mark Hamstra

unread,
Nov 15, 2013, 12:32:23 PM11/15/13
to spark...@googlegroups.com
Is that before or after this was merged?

bart.ve...@portico.io

unread,
Nov 16, 2013, 3:57:28 AM11/16/13
to spark...@googlegroups.com
Hi Mark,

Thanks for pointing this out ...
That looks indeed like the fix for the issue I'm seeing.
I'l have another go at it with this fix.

Greets,
Bart
Reply all
Reply to author
Forward
0 new messages