How to emit events into kafka

1,535 views
Skip to first unread message

nara....@gmail.com

unread,
Jun 5, 2013, 2:58:21 AM6/5/13
to druid-de...@googlegroups.com
Hi,

I am new to druid and kafka. I dont know how to emit data or events to kafka. My requirement is to add log data to druid. The data will be in log files(txt or json file) and i want to use realtime node for this. Pls let me know if anyone has tried with realtime ingestion and also the procedure to emit data into kafka.


Thanks,
Sailaja

Eric Tschetter

unread,
Jun 5, 2013, 1:58:02 PM6/5/13
to druid-de...@googlegroups.com
Kafka has pretty decent documentation at


You can start with the 0.7 quick start.  A producer is what you want in order to put data into kafka.


You can look at some of the ecosystem projects for how they emit events into Kafka as well if your language of choice is not Java:


--Eric



--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/b7a74205-dd94-49c1-b333-4b092861e9ff%40googlegroups.com?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Fangjin Yang

unread,
Jun 5, 2013, 2:00:13 PM6/5/13
to druid-de...@googlegroups.com
You may also want to consider your own firehose:

The code has a few examples of how different firehoses are implemented.


Fangjin Yang

unread,
Jun 6, 2013, 12:39:51 PM6/6/13
to druid-de...@googlegroups.com
Hi Sailaja,

Thinking about your problem more. I think there are a few steps you have to take:
1) Convert your raw log data to JSON. Make sure the format is similar to the examples provided earlier. 
2) If you want to use an intermediate message bus, look into Kafka. If data durability is not important to you right now, you can also look at the EventReceiverFirehoseFactory. This is not fully documented yet but it is a firehose that you can post HTTP events to directly.

Thx,
FJ 

nara....@gmail.com

unread,
Jun 10, 2013, 7:00:04 AM6/10/13
to druid-de...@googlegroups.com
Hi Eric,

Thanks for the reply.I am having some issues with realtime nodes. pls let me know if my understanding is correct or not.pls find below the procedure i have followed for realtime insertion.pls let me know whether its correct or not.

1)Downloaded kafka 0.7.2 version.
2)downloaded druid master(0.4.29)  from github and did a maven install and generated jar files.
3)started zookeeper and kafka servers from kafka folder
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties.
4)Started master and realtime nodes using the commands attached.
5)started kafka producer using bin/kafka-console-producer.sh --zookeeper localhost:2181 --sync --topic test. SO my kafka topic is test.I gave json messages from the producer console to kafka topic.(realtime node is started before this).
6) I am able to see folders in tmp/kafka-logs folder.

Is this approach correct. I am not able to see any segments getting created. relatime is not getting data from kafka topic.Pls find attached files tat i have used. Pls let me know if i am missing any configurations.I am stuck with this issue.pls help me to resolve this

Thanks,
Sailaja

--------------------------------------------------------------------------
nodelogs.zip

Eric Tschetter

unread,
Jun 10, 2013, 1:46:41 PM6/10/13
to druid-de...@googlegroups.com
That process is generally good.  However,

1) You do not need to run a Master.  The Master coordinates the compute nodes and isn't actually involved with the realtime nodes.  You will need a Master when you decide to fire up Compute nodes, but not any sooner.  The logs about not finding segments in the DB mean that there aren't any segments for the compute nodes to load, it doesn't actually mean anything for how the realtime nodes are operating.
2) You didn't include enough of the realtime log for me to get a sense of what's going on there, can you provide more.  The things to look for are going to be the "events/processed", "events/unparseable" and "events/thrownAway" metrics that get emitted every minute.  You might have to set

com.metamx.emitter.logging.level=info

To get the lines to actually show up in the log.

--Eric


nara....@gmail.com

unread,
Jun 11, 2013, 12:23:38 AM6/11/13
to druid-de...@googlegroups.com
Hi Eric/Fangjin,

Thanks for ur replies and help.Pls find the links where i have pasted the logs for realtime node, real timepsec file and kafka server properties.

http://pastebin.ca/2395353  - realtime spec file
http://pastebin.ca/2395354 - command to start realtime
http://pastebin.ca/2395357 - realtime logs
http://pastebin.ca/2395359 - kafka server properties
http://pastebin.ca/2395363 - sample data
http://pastebin.ca/2395365 - kafka producer properties
http://pastebin.ca/2395367 - kafak consumer properties

Thanks,
Sailaja


-----------------------------------------------------------------------------------------------------

Eric Tschetter

unread,
Jun 11, 2013, 1:06:29 PM6/11/13
to druid-de...@googlegroups.com
Hrm,

From looking at the logs, it looks like Kafka isn't finding any partitions to read:

2013-06-11 03:38:42,490 INFO [main] kafka.consumer.ZookeeperConsumerConnector - testgroup_rrr-1370921922099-637f8ca9 Committing all offsets after clearing the fetcher queues
2013-06-11 03:38:42,491 INFO [main] kafka.consumer.ZookeeperConsumerConnector - testgroup_rrr-1370921922099-637f8ca9 Releasing partition ownership
2013-06-11 03:38:42,494 INFO [main] kafka.consumer.ZookeeperConsumerConnector - testgroup_rrr-1370921922099-637f8ca9 Consumer testgroup_rrr-1370921922099-637f8ca9 rebalancing the following partitions: List() for topic test-0 with consumers: List(testgroup_rrr-1370921922099-637f8ca9-0)
2013-06-11 03:38:42,495 WARN [main] kafka.consumer.ZookeeperConsumerConnector - testgroup_rrr-1370921922099-637f8ca9 No broker partitions consumed by consumer thread testgroup_rrr-1370921922099-637f8ca9-0 for topic test-0
2013-06-11 03:38:42,497 INFO [main] kafka.consumer.ZookeeperConsumerConnector - testgroup_rrr-1370921922099-637f8ca9 Updating the cache
2013-06-11 03:38:42,499 INFO [main] kafka.consumer.ZookeeperConsumerConnector - testgroup_rrr-1370921922099-637f8ca9 Consumer testgroup_rrr-1370921922099-637f8ca9 selected partitions :
2013-06-11 03:38:42,502 INFO [main] kafka.consumer.ZookeeperConsumerConnector - testgroup_rrr-1370921922099-637f8ca9 end rebalancing consumer testgroup_rrr-1370921922099-637f8ca9 try #0

Line 4 there says "No broker partitions consumed".

Can you try setting up just a normal consumer on the queue and make sure that you can consume messages off the topic?

--Eric


nara....@gmail.com

unread,
Jun 12, 2013, 11:31:21 AM6/12/13
to druid-de...@googlegroups.com
Hi,

Thanks for the reply, now realtime is working. I have few more queries,can u pls clarify
1) realtime is not writing any metadata to mysql how long does it take to write to mysql?
2)If my data is like below
  1. {"ts":1370922241644,"tns":"8260","services":"rrr","uid":1}
  2. {"ts":1370922241644,"tns":"202","services":"eee","uid":1}
  3. {"ts":1370922241644,"tns":"001","services":"rrr","uid":1}
  4. {"ts":1370922241644,"tns":"004","services":"eee","uid":1}
  5. {"ts":1370922241645,"tns":"8448","services":"rrt","uid":1}
then how the query looks like to get result say i need all tns whose uid is 1,how can i get flexible queries,any wiki pls share it.
3)How can i know how long realtime is taking to read data from kafka and create segements.I mean how can i claculate the time taken for insertion and query retrieval?

Thanks,
Sailaja
------------------------------------------------------------------------------------

Eric Tschetter

unread,
Jun 12, 2013, 1:50:11 PM6/12/13
to druid-de...@googlegroups.com
1) realtime is not writing any metadata to mysql how long does it take to write to mysql?

Metadata is written when realtime hands off the segment to the historical cluster.  The segment propagation diagram on https://github.com/metamx/druid/wiki/Realtime might help.

Basically, the handoff happens every segmentGranularity time period.  It actually happens at segmentGranularity+windowPeriod time points.  So with a segmentGranularity of "hour" and windowPeriod "PT10m" it will start the handoff on the 10th minute of every hour, so 00:10, 01:10, 02:10, etc.
 
2)If my data is like below
  1. {"ts":1370922241644,"tns":"8260","services":"rrr","uid":1}
  2. {"ts":1370922241644,"tns":"202","services":"eee","uid":1}
  3. {"ts":1370922241644,"tns":"001","services":"rrr","uid":1}
  4. {"ts":1370922241644,"tns":"004","services":"eee","uid":1}
  5. {"ts":1370922241645,"tns":"8448","services":"rrt","uid":1}
then how the query looks like to get result say i need all tns whose uid is 1,how can i get flexible queries,any wiki pls share it.


Discusses how to query the system.  It sounds like you want a groupBy Query (https://github.com/metamx/druid/wiki/GroupByQuery) like:

{
  "queryType": "groupBy",
  "dataSource": "the_datasource",
  "granularity": "all",
  "dimensions": ["tns"],
  "filter": {
      "type": "selector",
      "dimension": "uid",
      "value": "1"
  },
  "aggregations": [
    {
      "type": "count",
      "name": "rows"
    }
  ],
  "postAggregations": [],
  "intervals": [
    "2012-01-01T00:00:00.000/2012-01-03T00:00:00.000"
  ]
}

You will probably need to edit that to match your time bounds and stuff like that, but hopefully that gets you started.


 
3)How can i know how long realtime is taking to read data from kafka and create segements.I mean how can i claculate the time taken for insertion and query retrieval?

The best way to know what kind of throughput you are getting is to watch your "events/processed" metrics.  They are emitted every minute and indicate the number of messages ingested in that minute.

--Eric


 

nara....@gmail.com

unread,
Jun 13, 2013, 12:02:36 PM6/13/13
to druid-de...@googlegroups.com
Hi,

Thanks for ur reply.

1)still not able to see data in mysql,can we see any logs regarding this insertion to find any errors?
2)How does batch ingestion wrk. I have done realtime ingestion for todays date and did a batch ingestion giving interval as 2013-06-13/2013-06-14 but this is deleting the segments created by realtime.Is it how this wrks.pls let me know if i have to do realtime and batch ingestion or any updates how it wrk?
3)how i can update the already existing segments or data?
4)What is indexing and how druid do this?Is indexing based on any input column?
5) i am seeing some druid00900311323index.zip getting created.What is this?what does this mean?

Thanks,
Sailaja

---------------------------------------------------

Eric Tschetter

unread,
Jun 17, 2013, 1:42:38 PM6/17/13
to druid-de...@googlegroups.com
On Thu, Jun 13, 2013 at 9:02 AM, <nara....@gmail.com> wrote:
Hi,

Thanks for ur reply.

1)still not able to see data in mysql,can we see any logs regarding this insertion to find any errors?

Yes, the logs will come from the "persist-n-merge" thread, so if you have the thread name getting logged out and you search through the logs for persist-n-merge you should see it doing things.
 
2)How does batch ingestion wrk. I have done realtime ingestion for todays date and did a batch ingestion giving interval as 2013-06-13/2013-06-14 but this is deleting the segments created by realtime.Is it how this wrks.pls let me know if i have to do realtime and batch ingestion or any updates how it wrk?

There is only one canonical "segment" for a given time period and yes, batch ingestion will override the real-time generated segment.  That is by design and is how you can restate the past.  

That said, it is possible to have multiple physical segments compose one logical segment by doing things with the shardSpec when you index.  I wouldn't recommend getting into that though.
 
3)how i can update the already existing segments or data?

You re-index the data for the time period that has changed.
 
4)What is indexing and how druid do this?Is indexing based on any input column?

"Indexing" is the act of converting from some row-oriented format into Druid's column-oriented format.  It includes various compression techniques and the creation of bitmap indexes on dimensions. 
 
5) i am seeing some druid00900311323index.zip getting created.What is this?what does this mean?

That is a temporary file created when the realtime indexer goes to push segments up to deep storage.  It should get cleaned up afterwards.

--Eric



 
Reply all
Reply to author
Forward
0 new messages