HDFS Connector Sink - should it work with non-Connect topic?

181 views
Skip to first unread message

Robin Moffatt

unread,
Jul 27, 2016, 6:29:56 AM7/27/16
to Confluent Platform
Hi, 

I'm new to Kafka/Confluent Platform, so hope this isn't too much of a daft question...

I'm using Confluent Platform 3.0, and experimenting with various connectors and pipeline permutations. One I'm having problem with is data in a topic that's populated from outside of Kafka Connect (in this example, it's logstash), and trying to write that data from Kafka to HDFS with the HDFS Connector. In theory, should this work? Because when I run it, nothing happens -- no errors, but no data either.

I know that the topic's getting lots of data, as validated using the console-consumer. Here's a sample of the data: 

[rmoff@confluent-02 ~]$ /usr/bin/kafka-console-consumer --topic twitter --zookeeper localhost:2181 --max-messages 1
{"created_at":"Wed Jul 20 14:02:27 +0000 2016","id":755764810923597824,"id_str":"755764810923597824","text":"Trump foreign policy adviser @WalidPhares basically says #Iran will violate JCPOA so US should do so first. Dont know how to renege yet.","source":"<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":2208906530,"id_str":"2208906530","name":"Ali Ahmadi","screen_name":"AliR_Ahmadi","location":"Tehran-NYC","url":null,"description":"Researcher and writer focusing on Iran, the Middle East and sources of US foreign policy towards the region. Personal account.","protected":false,"verified":false,"followers_count":1519,"friends_count":886,"listed_count":58,"favourites_count":2876,"statuses_count":12150,"created_at":"Fri Nov 22 12:41:04 +0000 2013","utc_offset":-25200,"time_zone":"Pacific Time (US & Canada)","geo_enabled":false,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"C0DEED","profile_background_image_url":"http://abs.twimg.com/images/themes/theme1/bg.png","profile_background_image_url_https":"https://abs.twimg.com/images/themes/theme1/bg.png","profile_background_tile":false,"profile_link_color":"0084B4","profile_sidebar_border_color":"C0DEED","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"profile_image_url":"http://pbs.twimg.com/profile_images/573155028506181632/gV1_lfDs_normal.jpeg","profile_image_url_https":"https://pbs.twimg.com/profile_images/573155028506181632/gV1_lfDs_normal.jpeg","profile_banner_url":"https://pbs.twimg.com/profile_banners/2208906530/1428808169","default_profile":true,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"is_quote_status":false,"retweet_count":0,"favorite_count":0,"entities":{"hashtags":[{"text":"Iran","indices":[57,62]}],"urls":[],"user_mentions":[{"screen_name":"WalidPhares","name":"Walid Phares","id":37535548,"id_str":"37535548","indices":[29,41]}],"symbols":[]},"favorited":false,"retweeted":false,"filter_level":"low","lang":"en","timestamp_ms":"1469023347012","@version":"1","@timestamp":"2016-07-20T14:02:27.000Z"}
Processed a total of 1 messages

When I run kafka connect with the configuration here, nothing happens after the initial burst of log messages (see previous link for full logs), and none of the log messages indicate any kind of problem. 

Would appreciate any pointers on whether what I'm trying to do should work, and if so, how to go about debugging it further. 

thanks. 

Gwen Shapira

unread,
Jul 27, 2016, 2:39:09 PM7/27/16
to confluent...@googlegroups.com
Hi Robin,

Good to see you here :)

Are you streaming messages to the topic after the connector started...
because it looks a bit like "no messages".

A good way to get more info is simply turn on debug for KafkaConnect
and the connector in log4j properties. This will let us see in more
details what's going on...
> --
> You received this message because you are subscribed to the Google Groups
> "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to confluent-platf...@googlegroups.com.
> To post to this group, send email to confluent...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/46018bbc-a537-4fd7-9799-3909c78e42db%40googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

Robin Moffatt

unread,
Jul 28, 2016, 6:58:12 AM7/28/16
to confluent...@googlegroups.com
Hi Gwen :)

I'm definitely streaming messages to the topic; I've got a kafka-console-consumer running to validate this and see plenty of new messages coming in. 


One thing I note is during startup there is this (with the topic listed)

[2016-07-28 11:43:37,949] DEBUG Sending metadata request {topics=[twitter]} to node -1 (org.apache.kafka.clients.NetworkClient:640)

but after a few minutes running the debug message doesn't have the topic any more: 

[2016-07-28 11:48:27,857] DEBUG Sending metadata request {topics=[]} to node -1 (org.apache.kafka.clients.NetworkClient:640)

No idea if that's anything relevant or not!

thanks, Robin.

Ewen Cheslack-Postava

unread,
Jul 28, 2016, 8:39:48 PM7/28/16
to Confluent Platform
From the original log it looks like the task is started, joins the consumer group, and gets assigned a partition twitter-0 (which seems to indicate this is a single partition topic). It also runs the recovery process, at which point it should be accepting data and writing it.

In the second log, there is this:

[2016-07-28 11:43:33,838] DEBUG Failed to detect a valid hadoop home directory (org.apache.hadoop.util.Shell:320)
java.io.IOException: Hadoop home directory  does not exist, is not a directory, or is not an absolute path.
        at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:312)
        at org.apache.hadoop.util.Shell.<clinit>(Shell.java:327)
        at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
        at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104)
        at org.apache.hadoop.security.Groups.<init>(Groups.java:86)
        at org.apache.hadoop.security.Groups.<init>(Groups.java:66)
        at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280)
        at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:271)
        at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:248)
        at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:763)
        at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748)
        at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621)
        at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:2753)
        at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:2617)
        at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:417)
        at io.confluent.connect.hdfs.storage.HdfsStorage.<init>(HdfsStorage.java:39)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at io.confluent.connect.hdfs.storage.StorageFactory.createStorage(StorageFactory.java:29)
        at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:168)
        at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:64)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:207)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

which could be an issue.

Although it also looks like the task finishes, starts, and gets a partition assignment:

[2016-07-28 11:43:37,769] INFO Sink task WorkerSinkTask{id=hdfs-sink-twitter01-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:208)
[2016-07-28 11:43:37,991] INFO Setting newly assigned partitions [twitter-0] for group connect-hdfs-sink-twitter01 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)

And it also looks like its fetching data since some metrics are created:

[2016-07-28 11:43:38,087] DEBUG Added sensor with name topic.twitter.bytes-fetched (org.apache.kafka.common.metrics.Metrics:296)
[2016-07-28 11:43:38,088] DEBUG Added sensor with name topic.twitter.records-fetched (org.apache.kafka.common.metrics.Metrics:296)

(By the way, you could check these metrics to see if the fetches are getting any data.)

It does look like the partition is being paused, which does happen as part of the writing process:

[2016-07-28 11:43:38,351] DEBUG Pausing partition twitter-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1312)

But then it should be writing to the temp file. There doesn't seem to be anything else relevant in the log, in particular no notice starting with "Starting commit and rotation for topic partition" which is when a file would be committed to its final location. 

And then the connection to HDFS is closed:

[2016-07-28 11:43:48,067] DEBUG IPC Client (1090806722) connection to cdh57-01-node-01.moffatt.me/192.168.10.112:8020 from rmoff: closed (org.apache.hadoop.ipc.Client:1184)
[2016-07-28 11:43:48,067] DEBUG IPC Client (1090806722) connection to cdh57-01-node-01.moffatt.me/192.168.10.112:8020 from rmoff: stopped, remaining connections 0 (org.apache.hadoop.ipc.Client:979)

So it seems like something is going wrong when writing the temp file, but we might need to improve the logging in the connector (including TRACE level) in order to figure out exactly where it is getting stuck.

Only other thing I can think of to suggest right now is taking a jstack dump after its been running for a bit. That might give us a clue about where it is getting stuck.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Robin Moffatt

unread,
Jul 29, 2016, 3:56:30 AM7/29/16
to confluent...@googlegroups.com
Thanks for this. I'll try the TRACE option, and also look at the metrics.

Robin Moffatt

unread,
Aug 31, 2016, 11:48:37 AM8/31/16
to confluent...@googlegroups.com
So following up on this, I've hit the same problem (HDFS Connector, no errors, but nothing written to HDFS either) again, this time with data from the Oracle GoldenGate Kafka Connect producer. 

I've taken the advice from last time and increased logging to TRACE, as well as looked at some of the metrics. 




Let me know if there's other debug information that it would be useful for me to supply. 

Thanks,
Robin
Reply all
Reply to author
Forward
0 new messages