flume + kafka 报错请教

242 views
Skip to first unread message

周元元

unread,
Aug 14, 2014, 5:06:01 AM8/14/14
to hado...@googlegroups.com
hi all:
   
     在做 flume + kafka 整合做日志传输,lib 是官方推荐的 Flume Kafka Plugin ( https://github.com/baniuyao/flume-kafka ),在调试的时候遇到报错
 
org.apache.flume.FlumeException: Unable to load sink type: com.vipshop.flume.sink.kafka.KafkaSink, class: com.vipshop.flume.sink.kafka.KafkaSink
at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:69)
at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:415)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)

从报错看应该是 Flume Kafka Plugin 的 jar 包没有加载进去。现在 $FLUME_HOME/lib 下和 kakfa 相关的 jar 包 list 如下:

[root@wayne41-log lib]# ll kafka*
-rw-r--r-- 1 root root 1312106 8月  14 11:40 kafka-0.7.2.jar
-rw-r--r-- 1 root root 3285405 8月  14 14:50 kafka_2.8.0-0.8.1.1.jar
-rw-r--r-- 1 root root   37407 8月  14 14:50 kafka_2.8.0-0.8.1.1-javadoc.jar
-rw-r--r-- 1 root root 2149814 8月  14 14:50 kafka_2.8.0-0.8.1.1-scaladoc.jar
-rw-r--r-- 1 root root    4505 8月  14 14:50 kafka_2.8.0-0.8.1.1-sources.jar
-rw-r--r-- 1 root root 2525701 8月  14 16:12 kafka_2.9.2-0.8.0-beta1.jar

flume 的配置文件是内容是:

agent_log.sources = r1
agent_log.sinks = kafka
agent_log.channels = c1

agent_log.sources.r1.type = exec
agent_log.sources.r1.channels = c1
agent_log.sources.r1.command = tail -f /var/log/test.log

agent_log.channels.c1.type = memory
agent_log.channels.c1.capacity = 1000
agent_log.channels.c1.trasactionCapacity = 100

agent_log.sources.r1.channels = c1
agent_log.sinks.kafka.channel = c1

agent_log.sinks.kafka.type = com.vipshop.flume.sink.kafka.KafkaSink
agent_log.sinks.kafka.zk.connect = xxxxxx:2181
agent_log.sinks.kafka.topic = my-replicated-topic
agent_log.sinks.kafka.batchsize = 200
agent_log.sinks.kafka.producer.type = async
agent_log.sinks.kafka.serializer.class = kafka.serializer.StringEncoder



有没有同学碰到同样的问题呢?

谢谢

王勇

unread,
Aug 14, 2014, 5:23:05 AM8/14/14
to hado...@googlegroups.com

Hello,
 首先这个插件是集成kafka0.7.2和flumeng的,所以你的kafka0.8相关的包都可以去掉了
 其次这个插件你至少打一个jar包然后扔到你的flume的lib下面吧,不然哪里去找这个插件的代码呢,是吧。
 所以根据这两点 可以去理解一下,然后测试测试哦!
 


在 2014年8月14日星期四UTC+8下午5时06分01秒,周元元写道:

周元元

unread,
Aug 14, 2014, 6:15:11 AM8/14/14
to hado...@googlegroups.com
java 实在没搞过,麻烦再问下,那 flume 启动的时候会在 lib 下所有的 jar 包中去找我需要的 sink type 吗?所有我自己打 jar 包的名字是随便定义的吗? 谢谢
 
在 2014年8月14日星期四UTC+8下午5时23分05秒,王勇写道:

周元元

unread,
Aug 15, 2014, 10:53:30 AM8/15/14
to hado...@googlegroups.com
你好,打包的问题解决了!现在又报了新的错误,网上搜了没有找到答案,再次请教,非常感谢!
报错如下:貌似是配置文件除了什么问题
2014-08-15 22:35:24,785 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:149)] Unhandled error
java.lang.NoSuchMethodError: org.apache.flume.conf.FlumeConfiguration.<init>(Ljava/util/Map;)V
at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:193)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:94)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)




在 2014年8月14日星期四UTC+8下午5时23分05秒,王勇写道:
Reply all
Reply to author
Forward
0 new messages