My "Kafka Streams client" doesn't eat any messages

2,116 views
Skip to first unread message

Ludovic Martin

unread,
Sep 12, 2016, 12:01:50 PM9/12/16
to Confluent Platform
Hello,

I am trying to use Kafka Streams as a simple Kafka client with a v0.10.0.1 server.
I use "bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test" to feed a test topic.

Here is my test application :
package fr.ludovicmartin.openheatflow;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

public class Application {

    public static void main(String[] args) throws IOException {
        //Stream configuration
        Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "zouzou");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "pi1.home.XXXX:9092");
        prop.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "pi1.home.XXXX:2181");
        prop.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        //Start streaming
        KStreamBuilder builder = new KStreamBuilder();

        KafkaStreams streams = new KafkaStreams(builder, prop);
        streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
            System.out.println(e);
        });
        streams.start();

        KStream<String, String> stream = builder.stream("test");
        stream.foreach((k, v) -> {
            System.out.println(v);
        });

        new BufferedReader(new InputStreamReader(System.in)).readLine();
        streams.close();

    }
}

When I run this code, I get no data printed out on the standard output.
But when I run "bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning", it works. All data produced by "kafka-console-producer.sh" are visible on the terminal running "kafka-console-consumer.sh".

There must be something wrong with my code but I can't see what...
Does anybody can help me ?

Thanks,
Ludovic

PS : here is the full trace of the test application :

--- exec-maven-plugin:1.2.1:exec (default-cli) @ openheatflow.kernel ---
17:29:12.274 [main] INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values:
    replication.factor = 1
    num.standby.replicas = 0
    metric.reporters = []
    commit.interval.ms = 30000
    bootstrap.servers = [pi1.home.XXXX:9092]
    state.dir = /tmp/kafka-streams
    partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
    state.cleanup.delay.ms = 60000
    poll.ms = 100
    zookeeper.connect = pi1.home.XXXX:2181
    key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
    metrics.sample.window.ms = 30000
    buffered.records.per.partition = 1000
    value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
    timestamp.extractor = class org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor
    num.stream.threads = 1
    metrics.num.samples = 2
    application.id = zouzou
    client.id =

17:29:12.343 [main] INFO org.apache.kafka.streams.processor.internals.StreamThread - Creating producer client for stream thread [StreamThread-1]
17:29:12.355 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
    metric.reporters = []
    metadata.max.age.ms = 300000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [pi1.home.XXXX:9092]
    ssl.keystore.type = JKS
    sasl.mechanism = GSSAPI
    max.block.ms = 60000
    interceptor.classes = null
    ssl.truststore.password = null
    client.id = zouzou-1-StreamThread-1-producer
    ssl.endpoint.identification.algorithm = null
    request.timeout.ms = 30000
    acks = 1
    receive.buffer.bytes = 32768
    ssl.truststore.type = JKS
    retries = 0
    ssl.truststore.location = null
    ssl.keystore.password = null
    send.buffer.bytes = 131072
    compression.type = none
    metadata.fetch.timeout.ms = 60000
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    ssl.protocol = TLS
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    batch.size = 16384
    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    max.request.size = 1048576
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    linger.ms = 100

17:29:12.361 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bufferpool-wait-time
17:29:12.364 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name buffer-exhausted-records
17:29:12.476 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [pi1.home.XXXX:9092 (id: -1 rack: null)], partitions = [])
17:29:12.489 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:
17:29:12.489 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:
17:29:12.489 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:
17:29:12.490 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:
17:29:12.491 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:
17:29:12.491 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:
17:29:12.492 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:
17:29:12.496 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name batch-size
17:29:12.498 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name compression-rate
17:29:12.498 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name queue-time
17:29:12.498 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name request-time
17:29:12.498 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name produce-throttle-time
17:29:12.499 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-per-request
17:29:12.499 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-retries
17:29:12.499 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name errors
17:29:12.499 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-size-max
17:29:12.500 [kafka-producer-network-thread | zouzou-1-StreamThread-1-producer] DEBUG org.apache.kafka.clients.producer.internals.Sender - Starting Kafka producer I/O thread.
17:29:12.501 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
    metric.reporters = []
    metadata.max.age.ms = 300000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [pi1.home.XXXX:9092]
    ssl.keystore.type = JKS
    sasl.mechanism = GSSAPI
    max.block.ms = 60000
    interceptor.classes = null
    ssl.truststore.password = null
    client.id = zouzou-1-StreamThread-1-producer
    ssl.endpoint.identification.algorithm = null
    request.timeout.ms = 30000
    acks = 1
    receive.buffer.bytes = 32768
    ssl.truststore.type = JKS
    retries = 0
    ssl.truststore.location = null
    ssl.keystore.password = null
    send.buffer.bytes = 131072
    compression.type = none
    metadata.fetch.timeout.ms = 60000
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    ssl.protocol = TLS
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    batch.size = 16384
    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    max.request.size = 1048576
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    linger.ms = 100

17:29:12.503 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.1
17:29:12.503 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5
17:29:12.503 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - Kafka producer started
17:29:12.504 [main] INFO org.apache.kafka.streams.processor.internals.StreamThread - Creating consumer client for stream thread [StreamThread-1]
17:29:12.510 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
    metric.reporters = []
    metadata.max.age.ms = 300000
    partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    max.partition.fetch.bytes = 1048576
    bootstrap.servers = [pi1.home.XXXX:9092]
    ssl.keystore.type = JKS
    enable.auto.commit = false
    sasl.mechanism = GSSAPI
    interceptor.classes = null
    exclude.internal.topics = true
    ssl.truststore.password = null
    client.id = zouzou-1-StreamThread-1-consumer
    ssl.endpoint.identification.algorithm = null
    max.poll.records = 2147483647
    check.crcs = true
    request.timeout.ms = 40000
    heartbeat.interval.ms = 3000
    auto.commit.interval.ms = 5000
    receive.buffer.bytes = 65536
    ssl.truststore.type = JKS
    ssl.truststore.location = null
    ssl.keystore.password = null
    fetch.min.bytes = 1
    send.buffer.bytes = 131072
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    group.id = zouzou
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.trustmanager.algorithm = PKIX
    ssl.key.password = null
    fetch.max.wait.ms = 500
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    session.timeout.ms = 30000
    metrics.num.samples = 2
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    ssl.protocol = TLS
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    auto.offset.reset = latest

17:29:12.510 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Starting the Kafka consumer
17:29:12.511 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [pi1.home.XXXX:9092 (id: -1 rack: null)], partitions = [])
17:29:12.511 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:
17:29:12.511 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:
17:29:12.511 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:
17:29:12.511 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:
17:29:12.511 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:
17:29:12.511 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:
17:29:12.512 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:
17:29:12.533 [main] DEBUG org.I0Itec.zkclient.ZkConnection - Creating new ZookKeeper instance to connect to pi1.home.XXXX:2181.
17:29:12.533 [ZkClient-EventThread-13-pi1.home.XXXX:2181] INFO org.I0Itec.zkclient.ZkEventThread - Starting ZkClient event thread.
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:host.name=zouzou
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.version=1.8.0_91
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle Corporation
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.home=/opt/java/jre
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.class.path=/home/ludovic/Prog/openheatflow.kernel/target/classes:/home/ludovic/.m2/repository/org/springframework/boot/spring-boot-starter-web/1.4.0.RELEASE/spring-boot-starter-web-1.4.0.RELEASE.jar:/home/ludovic/.m2/repository/org/springframework/boot/spring-boot-starter/1.4.0.RELEASE/spring-boot-starter-1.4.0.RELEASE.jar:/home/ludovic/.m2/repository/org/springframework/boot/spring-boot-starter-logging/1.4.0.RELEASE/spring-boot-starter-logging-1.4.0.RELEASE.jar:/home/ludovic/.m2/repository/ch/qos/logback/logback-classic/1.1.7/logback-classic-1.1.7.jar:/home/ludovic/.m2/repository/ch/qos/logback/logback-core/1.1.7/logback-core-1.1.7.jar:/home/ludovic/.m2/repository/org/slf4j/jcl-over-slf4j/1.7.21/jcl-over-slf4j-1.7.21.jar:/home/ludovic/.m2/repository/org/slf4j/jul-to-slf4j/1.7.21/jul-to-slf4j-1.7.21.jar:/home/ludovic/.m2/repository/org/slf4j/log4j-over-slf4j/1.7.21/log4j-over-slf4j-1.7.21.jar:/home/ludovic/.m2/repository/org/yaml/snakeyaml/1.17/snakeyaml-1.17.jar:/home/ludovic/.m2/repository/org/hibernate/hibernate-validator/5.2.4.Final/hibernate-validator-5.2.4.Final.jar:/home/ludovic/.m2/repository/javax/validation/validation-api/1.1.0.Final/validation-api-1.1.0.Final.jar:/home/ludovic/.m2/repository/org/jboss/logging/jboss-logging/3.3.0.Final/jboss-logging-3.3.0.Final.jar:/home/ludovic/.m2/repository/com/fasterxml/classmate/1.3.1/classmate-1.3.1.jar:/home/ludovic/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.8.1/jackson-databind-2.8.1.jar:/home/ludovic/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.8.1/jackson-annotations-2.8.1.jar:/home/ludovic/.m2/repository/org/springframework/spring-web/4.3.2.RELEASE/spring-web-4.3.2.RELEASE.jar:/home/ludovic/.m2/repository/org/springframework/spring-aop/4.3.2.RELEASE/spring-aop-4.3.2.RELEASE.jar:/home/ludovic/.m2/repository/org/springframework/spring-webmvc/4.3.2.RELEASE/spring-webmvc-4.3.2.RELEASE.jar:/home/ludovic/.m2/repository/org/springframework/spring-expression/4.3.2.RELEASE/spring-expression-4.3.2.RELEASE.jar:/home/ludovic/.m2/repository/org/springframework/boot/spring-boot-starter-jetty/1.4.0.RELEASE/spring-boot-starter-jetty-1.4.0.RELEASE.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/jetty-servlets/9.3.11.v20160721/jetty-servlets-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/jetty-continuation/9.3.11.v20160721/jetty-continuation-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/jetty-http/9.3.11.v20160721/jetty-http-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/jetty-util/9.3.11.v20160721/jetty-util-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/jetty-io/9.3.11.v20160721/jetty-io-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/jetty-webapp/9.3.11.v20160721/jetty-webapp-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/jetty-xml/9.3.11.v20160721/jetty-xml-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/jetty-servlet/9.3.11.v20160721/jetty-servlet-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/jetty-security/9.3.11.v20160721/jetty-security-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/jetty-server/9.3.11.v20160721/jetty-server-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/websocket/websocket-server/9.3.11.v20160721/websocket-server-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/websocket/websocket-common/9.3.11.v20160721/websocket-common-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/websocket/websocket-api/9.3.11.v20160721/websocket-api-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/websocket/websocket-client/9.3.11.v20160721/websocket-client-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/websocket/websocket-servlet/9.3.11.v20160721/websocket-servlet-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/javax/servlet/javax.servlet-api/3.1.0/javax.servlet-api-3.1.0.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/websocket/javax-websocket-server-impl/9.3.11.v20160721/javax-websocket-server-impl-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/jetty-annotations/9.3.11.v20160721/jetty-annotations-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/jetty-plus/9.3.11.v20160721/jetty-plus-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/javax/annotation/javax.annotation-api/1.2/javax.annotation-api-1.2.jar:/home/ludovic/.m2/repository/org/ow2/asm/asm/5.0.1/asm-5.0.1.jar:/home/ludovic/.m2/repository/org/ow2/asm/asm-commons/5.0.1/asm-commons-5.0.1.jar:/home/ludovic/.m2/repository/org/ow2/asm/asm-tree/5.0.1/asm-tree-5.0.1.jar:/home/ludovic/.m2/repository/org/eclipse/jetty/websocket/javax-websocket-client-impl/9.3.11.v20160721/javax-websocket-client-impl-9.3.11.v20160721.jar:/home/ludovic/.m2/repository/javax/websocket/javax.websocket-api/1.0/javax.websocket-api-1.0.jar:/home/ludovic/.m2/repository/org/springframework/boot/spring-boot-devtools/1.4.0.RELEASE/spring-boot-devtools-1.4.0.RELEASE.jar:/home/ludovic/.m2/repository/org/springframework/boot/spring-boot/1.4.0.RELEASE/spring-boot-1.4.0.RELEASE.jar:/home/ludovic/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/1.4.0.RELEASE/spring-boot-autoconfigure-1.4.0.RELEASE.jar:/home/ludovic/.m2/repository/net/rossillo/mvc/cache/spring-mvc-cache-control/1.1.1-RELEASE/spring-mvc-cache-control-1.1.1-RELEASE.jar:/home/ludovic/.m2/repository/org/springframework/spring-context/4.3.2.RELEASE/spring-context-4.3.2.RELEASE.jar:/home/ludovic/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.8.1/jackson-core-2.8.1.jar:/home/ludovic/.m2/repository/com/fasterxml/jackson/jackson-datatype-json-org/1.8.0/jackson-datatype-json-org-1.8.0.jar:/home/ludovic/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.8.6/jackson-mapper-asl-1.8.6.jar:/home/ludovic/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.8.6/jackson-core-asl-1.8.6.jar:/home/ludovic/.m2/repository/org/json/json/20140107/json-20140107.jar:/home/ludovic/.m2/repository/commons-io/commons-io/2.5/commons-io-2.5.jar:/home/ludovic/.m2/repository/org/apache/kafka/kafka-streams/0.10.0.1/kafka-streams-0.10.0.1.jar:/home/ludovic/.m2/repository/org/rocksdb/rocksdbjni/4.8.0/rocksdbjni-4.8.0.jar:/home/ludovic/.m2/repository/com/101tec/zkclient/0.8/zkclient-0.8.jar:/home/ludovic/.m2/repository/log4j/log4j/1.2.15/log4j-1.2.15.jar:/home/ludovic/.m2/repository/javax/mail/mail/1.4/mail-1.4.jar:/home/ludovic/.m2/repository/javax/activation/activation/1.1/activation-1.1.jar:/home/ludovic/.m2/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar:/home/ludovic/.m2/repository/jline/jline/0.9.94/jline-0.9.94.jar:/home/ludovic/.m2/repository/junit/junit/4.12/junit-4.12.jar:/home/ludovic/.m2/repository/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar:/home/ludovic/.m2/repository/io/netty/netty/3.7.0.Final/netty-3.7.0.Final.jar:/home/ludovic/.m2/repository/org/apache/kafka/connect-json/0.10.0.1/connect-json-0.10.0.1.jar:/home/ludovic/.m2/repository/org/apache/kafka/connect-api/0.10.0.1/connect-api-0.10.0.1.jar:/home/ludovic/.m2/repository/org/apache/kafka/kafka-clients/0.10.0.1/kafka-clients-0.10.0.1.jar:/home/ludovic/.m2/repository/net/jpountz/lz4/lz4/1.3.0/lz4-1.3.0.jar:/home/ludovic/.m2/repository/org/xerial/snappy/snappy-java/1.1.2.6/snappy-java-1.1.2.6.jar:/home/ludovic/.m2/repository/org/slf4j/slf4j-api/1.7.21/slf4j-api-1.7.21.jar:/home/ludovic/.m2/repository/org/springframework/spring-jdbc/4.3.2.RELEASE/spring-jdbc-4.3.2.RELEASE.jar:/home/ludovic/.m2/repository/org/springframework/spring-beans/4.3.2.RELEASE/spring-beans-4.3.2.RELEASE.jar:/home/ludovic/.m2/repository/org/springframework/spring-core/4.3.2.RELEASE/spring-core-4.3.2.RELEASE.jar:/home/ludovic/.m2/repository/org/springframework/spring-tx/4.3.2.RELEASE/spring-tx-4.3.2.RELEASE.jar:/home/ludovic/.m2/repository/org/apache/commons/commons-dbcp2/2.1.1/commons-dbcp2-2.1.1.jar:/home/ludovic/.m2/repository/org/apache/commons/commons-pool2/2.4.2/commons-pool2-2.4.2.jar:/home/ludovic/.m2/repository/commons-logging/commons-logging/1.2/commons-logging-1.2.jar:/home/ludovic/.m2/repository/org/postgresql/postgresql/9.4.1209.jre7/postgresql-9.4.1209.jre7.jar:/home/ludovic/.m2/repository/com/google/collections/google-collections/1.0/google-collections-1.0.jar
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.library.path=/opt/java/jre/lib/amd64:/opt/java/jre/lib/i386::/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=/tmp
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.compiler=<NA>
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.name=Linux
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.arch=amd64
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.version=4.4.0-36-generic
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.name=ludovic
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.home=/home/ludovic
17:29:12.539 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.dir=/home/ludovic/Prog/openheatflow.kernel
17:29:12.540 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=pi1.home.XXXX:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@5622fdf
17:29:12.542 [main] DEBUG org.apache.zookeeper.ClientCnxn - zookeeper.disableAutoWatchReset is false
17:29:12.549 [main] DEBUG org.I0Itec.zkclient.ZkClient - Awaiting connection to Zookeeper server
17:29:12.549 [main] INFO org.I0Itec.zkclient.ZkClient - Waiting for keeper state SyncConnected
17:29:13.730 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server cl-757.mrs-01.fr.sixxs.net/2a01:240:fe00:2f4:0:0:0:2:2181. Will not attempt to authenticate using SASL (unknown error)
17:29:13.770 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to cl-757.mrs-01.fr.sixxs.net/2a01:240:fe00:2f4:0:0:0:2:2181, initiating session
17:29:13.771 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Session establishment request sent on cl-757.mrs-01.fr.sixxs.net/2a01:240:fe00:2f4:0:0:0:2:2181
17:29:13.785 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server cl-757.mrs-01.fr.sixxs.net/2a01:240:fe00:2f4:0:0:0:2:2181, sessionid = 0x156fc967ef70052, negotiated timeout = 30000
17:29:13.786 [main-EventThread] DEBUG org.I0Itec.zkclient.ZkClient - Received event: WatchedEvent state:SyncConnected type:None path:null
17:29:13.786 [main-EventThread] INFO org.I0Itec.zkclient.ZkClient - zookeeper state changed (SyncConnected)
17:29:13.786 [main-EventThread] DEBUG org.I0Itec.zkclient.ZkClient - Leaving process event
17:29:13.786 [main] DEBUG org.I0Itec.zkclient.ZkClient - State is SyncConnected
17:29:13.786 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
    metric.reporters = []
    metadata.max.age.ms = 300000
    partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    max.partition.fetch.bytes = 1048576
    bootstrap.servers = [pi1.home.XXXX:9092]
    ssl.keystore.type = JKS
    enable.auto.commit = false
    sasl.mechanism = GSSAPI
    interceptor.classes = null
    exclude.internal.topics = true
    ssl.truststore.password = null
    client.id = zouzou-1-StreamThread-1-consumer
    ssl.endpoint.identification.algorithm = null
    max.poll.records = 2147483647
    check.crcs = true
    request.timeout.ms = 40000
    heartbeat.interval.ms = 3000
    auto.commit.interval.ms = 5000
    receive.buffer.bytes = 65536
    ssl.truststore.type = JKS
    ssl.truststore.location = null
    ssl.keystore.password = null
    fetch.min.bytes = 1
    send.buffer.bytes = 131072
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    group.id = zouzou
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.trustmanager.algorithm = PKIX
    ssl.key.password = null
    fetch.max.wait.ms = 500
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    session.timeout.ms = 30000
    metrics.num.samples = 2
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    ssl.protocol = TLS
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    auto.offset.reset = latest

17:29:13.795 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name heartbeat-latency
17:29:13.795 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name join-latency
17:29:13.795 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name sync-latency
17:29:13.797 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name commit-latency
17:29:13.801 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-fetched
17:29:13.801 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-fetched
17:29:13.801 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-latency
17:29:13.802 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-lag
17:29:13.802 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-throttle-time
17:29:13.802 [main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration replication.factor = 1 was supplied but isn't a known config.
17:29:13.802 [main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration num.standby.replicas = 0 was supplied but isn't a known config.
17:29:13.802 [main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration zookeeper.connect = pi1.home.XXXX:2181 was supplied but isn't a known config.
17:29:13.802 [main] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration __stream.thread.instance__ = Thread[StreamThread-1,5,main] was supplied but isn't a known config.
17:29:13.802 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.1
17:29:13.802 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5
17:29:13.802 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
17:29:13.802 [main] INFO org.apache.kafka.streams.processor.internals.StreamThread - Creating restore consumer client for stream thread [StreamThread-1]
17:29:13.803 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
    metric.reporters = []
    metadata.max.age.ms = 300000
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    max.partition.fetch.bytes = 1048576
    bootstrap.servers = [pi1.home.XXXX:9092]
    ssl.keystore.type = JKS
    enable.auto.commit = false
    sasl.mechanism = GSSAPI
    interceptor.classes = null
    exclude.internal.topics = true
    ssl.truststore.password = null
    client.id = zouzou-1-StreamThread-1-restore-consumer
    ssl.endpoint.identification.algorithm = null
    max.poll.records = 2147483647
    check.crcs = true
    request.timeout.ms = 40000
    heartbeat.interval.ms = 3000
    auto.commit.interval.ms = 5000
    receive.buffer.bytes = 65536
    ssl.truststore.type = JKS
    ssl.truststore.location = null
    ssl.keystore.password = null
    fetch.min.bytes = 1
    send.buffer.bytes = 131072
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    group.id =
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.trustmanager.algorithm = PKIX
    ssl.key.password = null
    fetch.max.wait.ms = 500
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    session.timeout.ms = 30000
    metrics.num.samples = 2
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    ssl.protocol = TLS
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    auto.offset.reset = latest

17:29:13.803 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Starting the Kafka consumer
17:29:13.803 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [pi1.home.XXXX:9092 (id: -1 rack: null)], partitions = [])
17:29:13.803 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:
17:29:13.803 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:
17:29:13.803 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:
17:29:13.804 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:
17:29:13.804 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:
17:29:13.804 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:
17:29:13.804 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:
17:29:13.805 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
    metric.reporters = []
    metadata.max.age.ms = 300000
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    max.partition.fetch.bytes = 1048576
    bootstrap.servers = [pi1.home.XXXX:9092]
    ssl.keystore.type = JKS
    enable.auto.commit = false
    sasl.mechanism = GSSAPI
    interceptor.classes = null
    exclude.internal.topics = true
    ssl.truststore.password = null
    client.id = zouzou-1-StreamThread-1-restore-consumer
    ssl.endpoint.identification.algorithm = null
    max.poll.records = 2147483647
    check.crcs = true
    request.timeout.ms = 40000
    heartbeat.interval.ms = 3000
    auto.commit.interval.ms = 5000
    receive.buffer.bytes = 65536
    ssl.truststore.type = JKS
    ssl.truststore.location = null
    ssl.keystore.password = null
    fetch.min.bytes = 1
    send.buffer.bytes = 131072
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    group.id =
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.trustmanager.algorithm = PKIX
    ssl.key.password = null
    fetch.max.wait.ms = 500
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    session.timeout.ms = 30000
    metrics.num.samples = 2
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    ssl.protocol = TLS
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    auto.offset.reset = latest

17:29:13.805 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name heartbeat-latency
17:29:13.806 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name join-latency
17:29:13.806 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name sync-latency
17:29:13.806 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name commit-latency
17:29:13.807 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-fetched
17:29:13.807 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-fetched
17:29:13.807 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-latency
17:29:13.807 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-lag
17:29:13.807 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-throttle-time
17:29:13.808 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.1
17:29:13.808 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5
17:29:13.808 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
17:29:13.809 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name commit-time
17:29:13.809 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name poll-time
17:29:13.809 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name process-time
17:29:13.810 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name punctuate-time
17:29:13.810 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name task-creation
17:29:13.810 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name task-destruction
17:29:13.810 [main] DEBUG org.apache.kafka.streams.KafkaStreams - Starting Kafka Stream process
17:29:13.811 [main] INFO org.apache.kafka.streams.KafkaStreams - Started Kafka Stream process
17:29:13.811 [StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - Starting stream thread [StreamThread-1]
17:29:13.811 [StreamThread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Unsubscribed all topics or patterns and assigned partitions
17:29:13.811 [StreamThread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending coordinator request for group zouzou to broker pi1.home.XXXX:9092 (id: -1 rack: null)
17:29:13.824 [StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at pi1.home.XXXX:9092.
17:29:13.827 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
17:29:13.827 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received
17:29:13.827 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
17:29:13.827 [StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1
17:29:13.925 [StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request {topics=[]} to node -1
17:29:13.933 [StreamThread-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(nodes = [pi1.home.XXXX:9092 (id: 0 rack: null)], partitions = [])
17:29:13.945 [StreamThread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received group coordinator response ClientResponse(receivedTimeMs=1473694153944, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@5e27e3d8, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=zouzou-1-StreamThread-1-consumer}, body={group_id=zouzou}), createdTimeMs=1473694153822, sendTimeMs=1473694153926), responseBody={error_code=0,coordinator={node_id=0,host=pi1.home.XXXX,port=9092}})
17:29:13.951 [StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator pi1.home.XXXX:9092 (id: 2147483647 rack: null) for group zouzou.
17:29:13.951 [StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 2147483647 at pi1.home.XXXX:9092.
17:29:13.991 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.bytes-sent
17:29:13.992 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.bytes-received
17:29:13.992 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.latency
17:29:13.992 [StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 2147483647
17:29:23.791 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 5ms
17:29:33.800 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 2ms
17:29:43.808 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 3ms
17:29:53.818 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 2ms
17:30:03.828 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 3ms
17:30:13.838 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 2ms
17:30:23.849 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 2ms
17:30:33.861 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 4ms
17:30:43.870 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 2ms
17:30:53.881 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 4ms
17:31:03.894 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 6ms
17:31:13.903 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 5ms
17:31:23.913 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 4ms
17:31:33.922 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 2ms
17:31:43.935 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 4ms
17:31:53.946 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 4ms
17:32:03.957 [main-SendThread(cl-757.mrs-01.fr.sixxs.net:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x156fc967ef70052 after 4ms

Eno Thereska

unread,
Sep 12, 2016, 3:20:15 PM9/12/16
to Confluent Platform
Perhaps it's worth trying adding the following config:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

That ensures that your Kafka Streams application consumes the topic from the earliest offset.

Eno
...

Ludovic Martin

unread,
Sep 13, 2016, 4:54:29 PM9/13/16
to Confluent Platform
Hi Eno,

Thank you for you reply.
Unfortunately, I get no change :-(

Ludovic

Ryan Worsley

unread,
Sep 14, 2016, 2:10:25 AM9/14/16
to Confluent Platform
You know, I had this problem for ages and found it really frustrating.  You've probably already solved it, but the problem for me was that me stream processor needed resetting.

Have a look here for details.

Guillermo

unread,
Sep 14, 2016, 5:52:21 AM9/14/16
to Confluent Platform
Hi!

Try to change:

streams.start();

        KStream<String, String> stream = builder.stream("test");
        stream.foreach((k, v) -> {
            System.out.println(v);
        });

to:

        KStream<String, String> stream = builder.stream("test");
        stream.foreach((k, v) -> {
            System.out.println(v);
        });

streams.start();

Guillermo.

Ludovic Martin

unread,
Sep 14, 2016, 5:38:01 PM9/14/16
to Confluent Platform
Very instructive blog post !
But, that don't solve my problem...

Ludovic Martin

unread,
Sep 14, 2016, 5:40:17 PM9/14/16
to Confluent Platform
Thanks for this but it doesn't work too...

Ludovic Martin

unread,
Sep 14, 2016, 5:46:57 PM9/14/16
to Confluent Platform
I'm starting to think about going back to a traditional Kafka client :-|

I just tried to run my code on a fresh installation of Kafka server (0.10.0.1).
Everything works very well with the cli consumer but my Kafka streams application does't "see" any message.

In addition, I didn't see any error on Kafka server logs.
...

Guozhang Wang

unread,
Sep 15, 2016, 7:05:04 PM9/15/16
to Confluent Platform
Hello Ludovic,

I think Guillermo's comment is almost correct: you need to specify the topology through the builder before giving it to the streams client and starts it. So more specifically:

        KStream<String, String> stream = builder.stream("test");
        stream.foreach((k, v) -> {
            System.out.println(v);
        });

        KafkaStreams streams = new KafkaStreams(builder, prop);
        streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
            System.out.println(e);
        });

        streams.start();


Guozhang

...

Ludovic Martin

unread,
Sep 16, 2016, 6:02:29 PM9/16/16
to Confluent Platform
Well.. I think a read Guillermo's message too fast :-)
It works now!

Here is my working code :
package fr.ludovicmartin.openheatflow;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

public class Application {

    public static void main(String[] args) throws IOException {
        //Stream configuration
        Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "zouzou");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "pi1.home.XXXX:9092");
        prop.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "pi1.home.XXXX:2181");
        prop.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


        //Start streaming
        KStreamBuilder builder = new KStreamBuilder();

        KStream<String, String> stream = builder.stream("test");
        stream.foreach((k, v) -> {
            System.out.println(v);
        });

        KafkaStreams streams = new KafkaStreams(builder, prop);
        streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
            System.out.println(e);
        });
        streams.start();

        new BufferedReader(new InputStreamReader(System.in)).readLine();
        streams.close();

    }
}


Many thanks to everyone for the help!
Ludovic
...
Reply all
Reply to author
Forward
0 new messages