java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
at com.facebook.presto.kafka.KafkaSplitManager.getSplits(KafkaSplitManager.java:83)
at com.facebook.presto.split.SplitManager.getSplits(SplitManager.java:45)
at com.facebook.presto.sql.planner.DistributedExecutionPlanner$Visitor.visitTableScan(DistributedExecutionPlanner.java:112)
On closer code inspection I see this in KafkaSplitManager.java
public ConnectorSplitSource getSplits(...) {
SimpleConsumer simpleConsumer = consumerManager.getConsumer(selectRandom(nodes));
TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(ImmutableList.of(kafkaTableHandle.getTopicName()));...
This selects a random SimpleConsumer from cache without verifying if it has
an active connection with the broker (which apparently it doesn't) and hence
the call fails. However, a query retry is typically successful as a different consumer
gets selected.
My question:
Shouldn't the getConsumer(...) verify the connection and invalidate the cache entry
if an active connection isn't found ?
Thanks
-Ankur