(kafka-avro-serializer-1.0.jar, kafka-schema-registry-client-1.0.jarwhile(it.hasNext())Log: with -verbose
[Loaded kafka.consumer.ConsumerFetcherManager$$anonfun$startConnections$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ConsumerFetcherManager$$anonfun$startConnections$1$$anonfun$apply$mcV$sp$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded scala.collection.TraversableOnce$$anonfun$toMap$1 from file:/C:/Apps/.m2/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar]
[Loaded kafka.consumer.ConsumerFetcherManager$$anonfun$startConnections$1$$anonfun$apply$mcV$sp$2 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcVI$sp$4 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded java.util.concurrent.locks.ReentrantReadWriteLock$Sync$HoldCounter from C:\Program Files (x86)\Java\jdk1.6.0_25\jre\lib\rt.jar]
[Loaded com.yammer.metrics.stats.ThreadLocalRandom from file:/C:/Apps/.m2/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar]
[Loaded com.yammer.metrics.stats.ThreadLocalRandom$1 from file:/C:/Apps/.m2/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar]
[Loaded kafka.javaapi.consumer.ZookeeperConsumerConnector$$anonfun$createMessageStreams$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.javaapi.consumer.ZookeeperConsumerConnector$$anonfun$createMessageStreams$2 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.javaapi.consumer.ZookeeperConsumerConnector$$anonfun$createMessageStreams$2$$anonfun$apply$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
2.1{test=[group1 kafka stream]}
3group1 kafka stream
[Loaded kafka.utils.ShutdownableThread$$anonfun$run$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.utils.KafkaScheduler$$anonfun$1$$anonfun$apply$mcV$sp$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ZookeeperConsumerConnector$$anonfun$autoCommit$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.utils.KafkaScheduler$$anonfun$1$$anonfun$apply$mcV$sp$4 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
We are trying to implement the Java producer and Java consumer with the Confluent Platform.#1. The CP platform is deployed on the RHEL (single node) and after deploying we wen thru the steps outlined here http://confluent.io/docs/current/quickstart.html . Everything is working ok (we produce messages to the "test" topic in Avro and are able to see in the consumer).#2: Next, we implemented the Producer in Java. It is working ok. We are able to see the message by using the command-line Consumer in the above step.#3: However, we ran into some difficulties trying the get the Java Consumer working. Some issues we noticed are:#3.1: Issue with the Maven POM: Out POM look like below, but we figured out that we needed to add 2 jars in our lib to make it work. It was not working via the Maven POM (not sure if some issue with the maven repo?))(kafka-avro-serializer-1.0.jar, kafka-schema-registry-client-1.0.jar
Our POM for the consumer:<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>0.8.2.0</version><scope>compile</scope></dependency>
<dependency><artifactId>avro-parent</artifactId><groupId>org.apache.avro</groupId><version>1.7.7</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.7.7</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro-mapred</artifactId><version>1.7.7</version></dependency><!-- <dependency>
<groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>1.0.0</version></dependency><dependency><groupId>io.confluent</groupId><artifactId>kafka-schema-registry-client</artifactId><version>2.0</version></dependency>-->
<dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.10.4</version></dependency><dependency><groupId>com.fasterxml.jackson</groupId><artifactId>jackson-parent</artifactId><version>2.5</version></dependency></dependencies>
#2: Now we got the consumer up and running but it looks like it is stuck in the while loop. The consumer keeps running without expiring, trying to executewhile(it.hasNext())
See below:---------------Log: with -verbose
[Loaded kafka.consumer.ConsumerFetcherManager$$anonfun$startConnections$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ConsumerFetcherManager$$anonfun$startConnections$1$$anonfun$apply$mcV$sp$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded scala.collection.TraversableOnce$$anonfun$toMap$1 from file:/C:/Apps/.m2/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar]
[Loaded kafka.consumer.ConsumerFetcherManager$$anonfun$startConnections$1$$anonfun$apply$mcV$sp$2 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcVI$sp$4 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded java.util.concurrent.locks.ReentrantReadWriteLock$Sync$HoldCounter from C:\Program Files (x86)\Java\jdk1.6.0_25\jre\lib\rt.jar]
[Loaded com.yammer.metrics.stats.ThreadLocalRandom from file:/C:/Apps/.m2/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar]
[Loaded com.yammer.metrics.stats.ThreadLocalRandom$1 from file:/C:/Apps/.m2/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar]
[Loaded kafka.javaapi.consumer.ZookeeperConsumerConnector$$anonfun$createMessageStreams$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.javaapi.consumer.ZookeeperConsumerConnector$$anonfun$createMessageStreams$2 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.javaapi.consumer.ZookeeperConsumerConnector$$anonfun$createMessageStreams$2$$anonfun$apply$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
2.1{test=[group1 kafka stream]}
3group1 kafka stream
[Loaded kafka.utils.ShutdownableThread$$anonfun$run$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.utils.KafkaScheduler$$anonfun$1$$anonfun$apply$mcV$sp$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.consumer.ZookeeperConsumerConnector$$anonfun$autoCommit$1 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]
[Loaded kafka.utils.KafkaScheduler$$anonfun$1$$anonfun$apply$mcV$sp$4 from file:/C:/Apps/.m2/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar]---------------Any tips, hints or pointers would be invaluable to get us across the finish line, as far java producer and consumer with Avro messages.One tip (for newbie on Kafka / CP) is always implement the producer and consumer as 2 separate java projects as the consumer still seems to require lot more dependecies (we guess this will be resolved in Kafka v0.9.0)
--Thank you!
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/bb35d240-b3c1-4540-8feb-80976fb6430d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
ConsumerIterator<Object, Object> it = stream.iterator(); public Properties getConsumerProperties() {
Properties props = new Properties();
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.connect", "sgls1000597.ap.net:2181");
props.put("group.id", "group1");
props.put("bootstrap.servers", "sgls1000597.ap.net:9092");
props.put("schema.registry.url", "http://sgls1000597.ap.net:8081");
props.put("key.deserializer",MyKafkaDeserializer.class);
props.put("value.deserializer",MyKafkaDeserializer.class);
props.put("partition.assignment.strategy", "range");
props.put("offsets.storage","zookeeper");
props.put("dual.commit.enabled","false");
// Set any other properties
return props;
} /**
* Consumer reading the messages
*/
public void readMessage() {
String topic = "test";
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
System.out.println("1");
VerifiableProperties vProps = new VerifiableProperties(getConsumerProperties());
System.out.println("1.1");
KafkaAvroDecoder keyDecoder = new KafkaAvroDecoder(vProps);
System.out.println("1.2");
KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(vProps);
System.out.println("1.3");
ConsumerConfig conConfig = new ConsumerConfig(getConsumerProperties());
System.out.println("1.4");
ConsumerConnector connector = Consumer.createJavaConsumerConnector(conConfig);
// ConsumerConnector connector = Consumer.create(conConfig);
System.out.println("2");
// Map<String, List<KafkaStream<Object, Object>>> consumerMap =
// (HashMap<String, List<KafkaStream<Object, Object>>>)
// connector.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
Map<String, List<KafkaStream<Object, Object>>> consumerMap = connector.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
System.out.println("2.1"+consumerMap);
KafkaStream<Object, Object> stream = consumerMap.get(topic).get(0);
System.out.println("3"+stream);
ConsumerIterator<Object, Object> it = stream.iterator();
System.out.println("3.0" + it);
while (it.hasNext()) {
System.out.println("3.1");
MessageAndMetadata<Object, Object> messageAndMetadata = it.next();
System.out.println("3.2");
try {
//GenericData.Record rec = (GenericData.Record)messageAndMetadata.message();
Object rec = messageAndMetadata.message();
System.out.println("[rec:" + rec + "]");
// String key = (String) messageAndMetadata.key();
// String value = (String) messageAndMetadata.message();
// System.out.println("[key:" + key + "; value:" + value + "]");
System.out.println("3.3");
} catch (Exception e) {
System.out.println("[SOMETHING IS WRONG ]" + e.getMessage());
// may need to do something with it
e.printStackTrace();
System.out.println("3.4");
}
}
System.out.println("4");
}
io.confluent.kafka.serializerspublic class MyKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> {
public MyKafkaDeserializer() {
}
@Override
public void configure(Map<String, ?> map, boolean b) {
System.out.println("MyKafkaDeserializer::configure");
}
@Override
public Object deserialize(String s, byte[] bytes) {
System.out.println("MyKafkaDeserializer::deserialize");
return super.deserialize(bytes);
}
@Override
public void close() {
System.out.println("MyKafkaDeserializer::close");
}
}
Hi Ramkumar,
Thank you!
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/bb35d240-b3c1-4540-8feb-80976fb6430d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--Thanks,
Ewen
Hi Ewen,Thanks for the tips. We continued to try but still unable to read messages from our java consumer. It seems to get stuck at this line:ConsumerIterator<Object, Object> it = stream.iterator();Overall, we think we are making a "newbie mistake" but unable to put our fingers to it. In summary:
#1. On the maven file, for cetain reasons we could not make it work as you have suggested (maybe bcos we are inside the enterprise). Anyway, I think we have got the jars correct.#2: We are using the following properties for the consumer, and Our consumer code looks like this:public Properties getConsumerProperties() {
Properties props = new Properties();
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.connect", "sgls1000597.ap.net:2181");
props.put("group.id", "group1");
#4: We had to write the fillowing concrete class (bcos the package -did not have one):io.confluent.kafka.serializerspublic class MyKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> {
public MyKafkaDeserializer() {
}
@Override
public void configure(Map<String, ?> map, boolean b) {
System.out.println("MyKafkaDeserializer::configure");
}
@Override
public Object deserialize(String s, byte[] bytes) {
System.out.println("MyKafkaDeserializer::deserialize");
return super.deserialize(bytes);
}
@Override
public void close() {
System.out.println("MyKafkaDeserializer::close");
}
}
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/c677aae5-f89e-4f83-91a9-518bcbe5a726%40googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAE1jLMN%2BY8A7CjUsrqpLc1W5F8WGv1Oxq3tudgSjGszVEXsu_g%40mail.gmail.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/D58CC66D-679E-422D-93DA-EC368CC5C3C0%40wikimedia.org.
props.put("group.id", "key1");
...
Thanks for all your inputs.In its current form, I managed to make consumer read the records.In producer code, I published message intoString key = "key1";ProducerRecord<Object, Object> record = new ProducerRecord<Object, Object>("test", key, avroRecord);This value should be same as that of groupId from where consumer should be reading the messages.props.put("group.id", "key1");
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/ae50e5fd-c97b-4db0-a353-f4e0bc18a5c4%40googlegroups.com.