hi all:
org.apache.flume.FlumeException: Unable to load sink type: com.vipshop.flume.sink.kafka.KafkaSink, class: com.vipshop.flume.sink.kafka.KafkaSink
at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:69)
at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:415)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
从报错看应该是 Flume Kafka Plugin 的 jar 包没有加载进去。现在 $FLUME_HOME/lib 下和 kakfa 相关的 jar 包 list 如下:
[root@wayne41-log lib]# ll kafka*
-rw-r--r-- 1 root root 1312106 8月 14 11:40 kafka-0.7.2.jar
-rw-r--r-- 1 root root 3285405 8月 14 14:50 kafka_2.8.0-0.8.1.1.jar
-rw-r--r-- 1 root root 37407 8月 14 14:50 kafka_2.8.0-0.8.1.1-javadoc.jar
-rw-r--r-- 1 root root 2149814 8月 14 14:50 kafka_2.8.0-0.8.1.1-scaladoc.jar
-rw-r--r-- 1 root root 4505 8月 14 14:50 kafka_2.8.0-0.8.1.1-sources.jar
-rw-r--r-- 1 root root 2525701 8月 14 16:12 kafka_2.9.2-0.8.0-beta1.jar
flume 的配置文件是内容是:
agent_log.sources = r1
agent_log.sinks = kafka
agent_log.channels = c1
agent_log.sources.r1.type = exec
agent_log.sources.r1.channels = c1
agent_log.sources.r1.command = tail -f /var/log/test.log
agent_log.channels.c1.type = memory
agent_log.channels.c1.capacity = 1000
agent_log.channels.c1.trasactionCapacity = 100
agent_log.sources.r1.channels = c1
agent_log.sinks.kafka.channel = c1
agent_log.sinks.kafka.type = com.vipshop.flume.sink.kafka.KafkaSink
agent_log.sinks.kafka.zk.connect = xxxxxx:2181
agent_log.sinks.kafka.topic = my-replicated-topic
agent_log.sinks.kafka.batchsize = 200
agent_log.sinks.kafka.producer.type = async
agent_log.sinks.kafka.serializer.class = kafka.serializer.StringEncoder
有没有同学碰到同样的问题呢?
谢谢