KafkaConnector - ClosedChannelException

79 views
Skip to first unread message

Ankur Goel

unread,
Sep 14, 2016, 5:45:43 PM9/14/16
to Presto
Hi Guys,
       Querying Kafka from Presto fails intermittently with ClosedChannelException.
Below is the relevant stack trace.

java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)

at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)

at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)

at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)

at com.facebook.presto.kafka.KafkaSplitManager.getSplits(KafkaSplitManager.java:83)

at com.facebook.presto.split.SplitManager.getSplits(SplitManager.java:45)

at com.facebook.presto.sql.planner.DistributedExecutionPlanner$Visitor.visitTableScan(DistributedExecutionPlanner.java:112)


On closer code inspection I see this in KafkaSplitManager.java


public ConnectorSplitSource getSplits(...) {

   SimpleConsumer simpleConsumer = consumerManager.getConsumer(selectRandom(nodes));
   TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(ImmutableList.of(kafkaTableHandle.getTopicName()));
 ...

This selects a random SimpleConsumer from cache without verifying if it has
an active connection with the broker (which apparently it doesn't) and hence
the call fails. However, a query retry is typically successful as a different consumer
gets selected.

My question:

Shouldn't the getConsumer(...) verify the connection and invalidate the cache entry
if an active connection isn't found ?

Thanks
-Ankur
Reply all
Reply to author
Forward
0 new messages