Druid cant find my kafka - Producer connection to 127.0.0.1:9092 unsuccessful

1,436 views
Skip to first unread message

David Montgomery

unread,
Feb 10, 2014, 8:34:30 PM2/10/14
to druid-de...@googlegroups.com
Hi,

per the below.. Producer connection to 127.0.0.1:9092 unsuccessful

I am assuming that realtime node is tring to find kafka on 127.0.0.1:9092


Where on the realtime node do I tell it where kafka is?  e.g kafka.test.com:9092

'
-server
-Xmx256m
-Duser.timezone=UTC
-Dfile.encoding=UTF-8

druid.host=localhost
druid.service=realtime
druid.port=8083

druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.52"]


druid.zk.service.host=localhost

# The realtime config file.
druid.realtime.specFile=/path/to/specFile

# Choices: db (hand off segments), noop (do not hand off segments).
druid.publish.type=db

druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd

druid.processing.buffer.sizeBytes=100000000

Thanks


2014-02-11 01:12:21,026 INFO [topic-pixel_do-druidrealtime-sf-development-20140204220702-1391840719425-e36dea5f-leader-finder-thread] kafka.client.ClientUtils$ - Fetching metadata from broker id:1391209402,host:127.0.0.1,port:9092 with correlation id 1139432 for 1 topic(s) Set(topic-pixel)
2014-02-11 01:12:21,027 ERROR [topic-pixel_do-druidrealtime-sf-development-20140204220702-1391840719425-e36dea5f-leader-finder-thread] kafka.producer.SyncProducer - Producer connection to 127.0.0.1:9092 unsuccessful
java.net.ConnectException: Connection refused
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:465)
    at sun.nio.ch.Net.connect(Net.java:457)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
    at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
    at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
    at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2014-02-11 01:12:21,027 WARN [topic-pixel_do-druidrealtime-sf-development-20140204220702-1391840719425-e36dea5f-leader-finder-thread] kafka.client.ClientUtils$ - Fetching topic metadata with correlation id 1139432 for topics [Set(topic-pixel)] from broker [id:1391209402,host:127.0.0.1,port:9092] failed
java.net.ConnectException: Connection refused
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:465)
    at sun.nio.ch.Net.connect(Net.java:457)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
    at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
    at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
    at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2014-02-11 01:12:21,028 WARN [topic-pixel_do-druidrealtime-sf-development-20140204220702-1391840719425-e36dea5f-leader-finder-thread] kafka.consumer.ConsumerFetcherManager$LeaderFinderThread - [topic-pixel_do-druidrealtime-sf-development-20140204220702-1391840719425-e36dea5f-leader-finder-thread], Failed to find leader for Set([topic-pixel,1], [topic-pixel,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(topic-pixel)] from broker [ArrayBuffer(id:1391209402,host:127.0.0.1,port:9092)] failed
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Caused by: java.net.ConnectException: Connection refused
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:465)
    at sun.nio.ch.Net.connect(Net.java:457)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
    at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
    at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
    at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
    ... 3 more

Fangjin Yang

unread,
Feb 10, 2014, 11:46:38 PM2/10/14
to druid-de...@googlegroups.com
Hi David, 

See inline.


On Monday, February 10, 2014 5:34:30 PM UTC-8, David Montgomery wrote:
Hi,

per the below.. Producer connection to 127.0.0.1:9092 unsuccessful

I am assuming that realtime node is tring to find kafka on 127.0.0.1:9092

Do you have a local kafka running? 

Where on the realtime node do I tell it where kafka is?  e.g kafka.test.com:9092

You specify this in your kafka configs in your realtime spec file. For kafka 7, these configs look like:
"consumerProps": {
            "autocommit.enable": "false",
            "autooffset.reset": "largest",
            "fetch.size": "1048586",
            "groupid": "druid-realtime-blah",
            "zk.connect": "IPS_GO_HERE/kafka",
            "zk.connectiontimeout.ms": "15000",
            "zk.sessiontimeout.ms": "15000",
            "zk.synctime.ms": "5000"
        }
 

Let me know if you have questions.

-- FJ

David Montgomery

unread,
Feb 11, 2014, 12:05:57 AM2/11/14
to druid-de...@googlegroups.com
In

In this page I get an error when I click the link.  http://druid.io/docs/0.6.52/Realtime.html

Realtime nodes currently require a Kafka cluster to sit in front of them and collect results. There’s more configuration required for these as well.

tutorial:-Loading-Your-Data-Part-2.md#set-up-kafka
The address wasn't understood

Firefox doesn't know how to open this address, because the protocol (tutorial) isn't associated with any program.

    You might need to install other software to open this address.




I have kafka running.  I have no issues ingesting data into kafka.  I dont know what else to do.  I am running kafka 8.  I can telnet into zookeeper from realtime and also to kafka from realtime.  How is realtem node resolving to 127.0.0.1:9092 when kafka is installed on kafka.test.com?

So...why is druid not finding kafka and looking for


[{
  "schema" : { "dataSource":"datasource-pixel",
               "aggregators":[ {"type":"count", "name":"impressions"}],
               "indexGranularity":"minute",
               "shardSpec" : { "type": "none" } },
  "config" : { "maxRowsInMemory" : 500000,
               "intermediatePersistPeriod" : "PT10m" },
  "firehose" : { "type" : "kafka-0.8",
                 "consumerProps" : { "zookeeper.connect" : "1.dzk.development.sf.test.com:2181",
                                     "zookeeper.connection.timeout.ms" : "15000",
                                     "zookeeper.session.timeout.ms" : "15000",
                                     "zookeeper.sync.time.ms" : "5000",
                                     "group.id" : "topic-pixel",
                                     "auto.offset.reset" : "largest",
                                     "auto.commit.enable" : "false" },
                 "feed" : "topic-pixel",
                 "parser" : { "timestampSpec" : { "column" : "utcdt", "format" : "iso" },
                              "data" : { "format" : "json" },
                              "dimensionExclusions" : ["deleteme"] } },
  "plumber" : { "type" : "realtime",
                "windowPeriod" : "PT10m",
                "segmentGranularity":"hour",
                "basePersistDirectory" : "/tmp/realtime/basePersist" }

}
]


























--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/4ae9cc4c-0161-497b-9f27-de27ee669e44%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

David Montgomery

unread,
Feb 11, 2014, 12:08:29 AM2/11/14
to druid-de...@googlegroups.com
Below us my runtime.properties file fyi

Is druid getting the kafka domain from ZK?


druid.host=127.0.0.1
druid.service=realtime
druid.port=8082

druid.extensions.coordinates=["io.druid.extensions:druid-kafka-eight:0.6.52"]
druid.zk.service.host=1.dzk.development.sf.test.com:2181


# The realtime config file.
druid.realtime.specFile=/home/ubuntu/druid-services-0.6.52/config/realtime/realtime_druid.json


# Choices: db (hand off segments), noop (do not hand off segments).
druid.publish.type=db

druid.db.connector.connectURI=jdbc\:mysql\://druid.mysql.development.sf.test.com\:3306/dry
druid.db.connector.user=test
druid.db.connector.password=test

druid.processing.buffer.sizeBytes=100000000

Fangjin Yang

unread,
Feb 11, 2014, 12:21:33 PM2/11/14
to druid-de...@googlegroups.com
Hi David,

Are you sure your kafka is configured to talk to the same zookeeper as your realtime node?

David Montgomery

unread,
Feb 11, 2014, 7:25:40 PM2/11/14
to druid-de...@googlegroups.com
The above is in the kafka server.properties file. 

Hosts are filled in my the same host from a databag in a chef recipe.  Hard to go wrong. 


Below are selected printouts from zookeeper using pykeeper.  zookeeper is aware of kafka.  It has the broker id that was assigned randomly on node creation.  Also is getting topics as well.

print client.get_children('/')
['consumers', 'druid', 'controller', 'brokers', 'zookeeper', 'controller_epoch']


print client.get_children('/brokers')
['topics', 'ids']

print client.get_children('/brokers/topics')
['topic-pixel', 'my-topic']


print client.get_children('/brokers/ids')
['1391209402']

broker id on server.properties file
broker.id=1391209402

All indication is that zk and kafka talk. 














Fangjin Yang

unread,
Feb 11, 2014, 10:44:58 PM2/11/14
to druid-de...@googlegroups.com
Hi David, can you do a get on the ids of your different Kafka brokers and see what IP address is in there?

David Montgomery

unread,
Feb 12, 2014, 7:14:35 PM2/12/14
to druid-de...@googlegroups.com
Ok...I set tbe host name in kafka.  That was the issues.  I just rebuilt kafka and no data in it.




but now when realtime talks to kafka I get the below in the realtime logs.  It just goes on and on

62.243.132.240,port:9092 with correlation id 2302 for 1 topic(s) Set(topic-pixel)
2014-02-13 00:09:30,278 INFO [topic-pixel_do-druidrealtime-sf-development-20140204220702-1392249665758-f04fdf89-leader-finder-thread] kafka.producer.SyncProducer - Connected to 162.243.132.240:9092 for producing
2014-02-13 00:09:30,280 INFO [topic-pixel_do-druidrealtime-sf-development-20140204220702-1392249665758-f04fdf89-leader-finder-thread] kafka.producer.SyncProducer - Disconnecting from 162.243.132.240:9092
2014-02-13 00:09:30,490 INFO [topic-pixel_do-druidrealtime-sf-development-20140204220702-1392249665758-f04fdf89-leader-finder-thread] kafka.utils.VerifiableProperties - Verifying properties
2014-02-13 00:09:30,494 INFO [topic-pixel_do-druidrealtime-sf-development-20140204220702-1392249665758-f04fdf89-leader-finder-thread] kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to 162.243.132.240:9092
2014-02-13 00:09:30,495 INFO [topic-pixel_do-druidrealtime-sf-development-20140204220702-1392249665758-f04fdf89-leader-finder-thread] kafka.utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000
2014-02-13 00:09:30,495 INFO [topic-pixel_do-druidrealtime-sf-development-20140204220702-1392249665758-f04fdf89-leader-finder-thread] kafka.utils.VerifiableProperties - Property client.id is overridden to topic-pixel
2014-02-13 00:09:30,496 INFO [topic-pixel_do-druidrealtime-sf-development-20140204220702-1392249665758-f04fdf89-leader-finder-thread] kafka.client.ClientUtils$ - Fetching metadata from broker id:1392248055,host:162.243.132.240,port:9092 with correlation id 2303 for 1 topic(s) Set(topic-pixel)
2014-02-13 00:09:30,497 INFO [topic-pixel_do-druidrealtime-sf-development-20140204220702-1392249665758-f04fdf89-leader-finder-thread] kafka.producer.SyncProducer - Connected to 162.243.132.240:9092 for producing
2014-02-13 00:09:30,499 INFO [topic-pixel_do-druidrealtime-sf-development-20140204220702-1392249665758-f04fdf89-leader-finder-thread] kafka.producer.SyncProducer - Disconnecting from 162.243.132.240:9092





In the kafka logs I get the below. and just goes on and on




[2014-02-13 00:12:56,230] ERROR [KafkaApi-1392248055] Error while fetching metadata for partition [topic-pixel,1] (kafka.server.KafkaApis)
kafka.common.LeaderNotAvailableException: Leader not available for partition [topic-pixel,1]
    at kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:474)
    at kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:462)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
    at scala.collection.immutable.List.foreach(List.scala:45)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
    at scala.collection.immutable.List.map(List.scala:45)
    at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:462)
    at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:458)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
    at scala.collection.immutable.Set$Set1.map(Set.scala:68)
    at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:458)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
    at java.lang.Thread.run(Thread.java:744)
[2014-02-13 00:12:56,233] INFO Closing socket connection to /162.243.140.187. (kafka.network.Processor)





thanks



Gian Merlino

unread,
Feb 14, 2014, 2:05:54 PM2/14/14
to druid-de...@googlegroups.com
I'm not too familiar with kafka 0.8, but it looks like something is wrong with your kafka setup. Can you get the console producer and console consumer to work from some box other than the one the broker is located on?
Reply all
Reply to author
Forward
0 new messages