[JUnit] How test KStreams (2.0 client library) with avro?

198 views
Skip to first unread message

ramon....@ext.privalia.com

unread,
Oct 8, 2018, 4:39:17 AM10/8/18
to Confluent Platform
Hi,

Is there any documentation or, examples with tests of an embebbed kafka cluster linked with schemaRegistry(Avro) where I can use for unit testing?

Dependencies im using:
    testCompile 'org.apache.kafka:kafka-streams-test-utils:2.0.0'
    testCompile group: 'org.mockito', name: 'mockito-all', version: '1.10.19'
    compile("org.apache.kafka:kafka-streams:2.0.0")

I only found some examples with Spring Kafka, but im wongering if KStreams has its onw way.

Thanks for the time.

John Roesler

unread,
Oct 8, 2018, 1:42:40 PM10/8/18
to confluent...@googlegroups.com
Hello,

Offhand, I'm not aware of any specific docs, but if I'm not mistaken, there's no link between the broker and schema registry.

I think it should be sufficient to set up the schema registry serdes in your streams app just as you would for production code, and it should also work with the embedded kafka cluster (or even the TopologyTestDriver).

Does this help?
-John

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/2c13f609-7bfb-493c-a0ba-f528f1f1cc77%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ramón Jansen

unread,
Oct 9, 2018, 3:46:16 AM10/9/18
to confluent...@googlegroups.com
Hello,

I know that there's no link between broker and schema, that's why I wonder how to test it, because before the record publishing, Kafka ask for the Avro schema if I'm not mistaken.

Anyway I would try with TopologyTestDriver :) 

I will write again once I do some tests.

Thanks for the help.

-Ramon

ramon....@ext.privalia.com

unread,
Oct 10, 2018, 6:02:38 AM10/10/18
to Confluent Platform
Hello,

After a little testing, I reached a problem with avro, what do I have until now is:

public class SyncronizerIntegrationTest {


    private ConsumerRecordFactory<String, Tracking> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new SpecificAvroSerializer<>());

    MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();


    @Test
    void integrationTest() throws IOException, RestClientException {


        Properties props = new Properties();
        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streamsTest");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
        props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081"); //Dunno if this do anything? :/
        StreamsBuilder kStreamBuilder = new StreamsBuilder();
        Serde<Tracking> avroSerde = getAvroSerde();
        mockSchemaRegistryClient.register(Tracking.getClassSchema().getName(), Tracking.getClassSchema());


        KStream<String, Tracking> unmappedOrdersStream = kStreamBuilder.stream(
                "topic",
                Consumed.with(Serdes.String(), avroSerde));

        unmappedOrdersStream
                .filter((k, v) -> v != null).to("ouput");

        Topology topology = kStreamBuilder.build();
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

        testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));

    }

private <T extends SpecificRecord> Serde<T> getAvroSerde() {

    // Configure Avro ser/des
    final Map<String,String> avroSerdeConfig = new HashMap<>();
    avroSerdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");

    final Serde<T> avroSerde = new SpecificAvroSerde<>();
    avroSerde.configure(avroSerdeConfig, false); // `false` for record values
    return avroSerde;
}


}

And I've got the following exception when the pipeinput is executed:
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
    at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:184)
    at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:270)


For me, seems the schemaRegistry "reference" is missing, so avro library cannot retrieve the schema...

Any hints? Any help will be wellcome : )

-Ramon

El lunes, 8 de octubre de 2018, 19:42:40 (UTC+2), John Roesler escribió:
Hello,

Offhand, I'm not aware of any specific docs, but if I'm not mistaken, there's no link between the broker and schema registry.

I think it should be sufficient to set up the schema registry serdes in your streams app just as you would for production code, and it should also work with the embedded kafka cluster (or even the TopologyTestDriver).

Does this help?
-John

On Mon, Oct 8, 2018 at 3:39 AM <ramon....@ext.privalia.com> wrote:
Hi,

Is there any documentation or, examples with tests of an embebbed kafka cluster linked with schemaRegistry(Avro) where I can use for unit testing?

Dependencies im using:
    testCompile 'org.apache.kafka:kafka-streams-test-utils:2.0.0'
    testCompile group: 'org.mockito', name: 'mockito-all', version: '1.10.19'
    compile("org.apache.kafka:kafka-streams:2.0.0")

I only found some examples with Spring Kafka, but im wongering if KStreams has its onw way.

Thanks for the time.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Matthias J. Sax

unread,
Oct 10, 2018, 3:03:39 PM10/10/18
to confluent...@googlegroups.com
signature.asc

ramon....@ext.privalia.com

unread,
Oct 11, 2018, 6:05:31 AM10/11/18
to Confluent Platform
Anwsered there.
>         confluent-platform+unsub...@googlegroups.com <javascript:>.
>         To post to this group, send email to
>         confluent...@googlegroups.com <javascript:>.
>         To view this discussion on the web visit
>         https://groups.google.com/d/msgid/confluent-platform/2c13f609-7bfb-493c-a0ba-f528f1f1cc77%40googlegroups.com
>         <https://groups.google.com/d/msgid/confluent-platform/2c13f609-7bfb-493c-a0ba-f528f1f1cc77%40googlegroups.com?utm_medium=email&utm_source=footer>.
>         For more options, visit https://groups.google.com/d/optout
>         <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send

ramon....@ext.privalia.com

unread,
Oct 17, 2018, 10:17:58 AM10/17/18
to Confluent Platform
Thread update:

After a few test, new configs, etc... finally i reached the next blocker point:

I'm facing a problem with the TopologyTestDriver, since is mandatory to set the property: KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "url" when the topology is trying to publish the record into the topic, it goes to the "url" provided in the config. How can I mock that access to point to the MockedSchemaRegistry?

To write records in avro to the topic works with mockSchemaRegistryClient.register


Also another issue, how can I load a stateStore into the topology? I'm creating the stateStore at the initialization(the topic is already created)


more info: 

https://stackoverflow.com/questions/52856612/problem-publishing-topologytestdriver-with-mockschemaregistry

Matthias J. Sax

unread,
Oct 19, 2018, 12:45:07 PM10/19/18
to confluent...@googlegroups.com
I left a comment on SO.

You need to create serdes manually instead of setting them in the config
atm. If you rely on the config, a regular avro serde is created that
tries to connect to a regular schema registry.

If you want to use mocked-registry, you cannot rely on this mechanism
but you need to create the serde via `new` manually and pass in the
mock-registry manually:

GenericAvroSerde serde = new GenericAvroSerde(mockRegistryClient);

Afterward, you pass in the serde via DSL code, eg:

builder.stream("topic", Consumed.with(serde, serde));


-Matthias
> >         confluent-platf...@googlegroups.com
> <javascript:>.
> >         To post to this group, send email to
> >         confluent...@googlegroups.com <javascript:>.
> >         To view this discussion on the web visit
> >        
> https://groups.google.com/d/msgid/confluent-platform/2c13f609-7bfb-493c-a0ba-f528f1f1cc77%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/2c13f609-7bfb-493c-a0ba-f528f1f1cc77%40googlegroups.com>
>
> >        
> <https://groups.google.com/d/msgid/confluent-platform/2c13f609-7bfb-493c-a0ba-f528f1f1cc77%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/confluent-platform/2c13f609-7bfb-493c-a0ba-f528f1f1cc77%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
> >         For more options, visit
> https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>
> >         <https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>>.
> >
> > --
> > You received this message because you are subscribed to the
> Google
> > Groups "Confluent Platform" group.
> > To unsubscribe from this group and stop receiving emails from
> it, send
> > an email to confluent-platf...@googlegroups.com
> > <mailto:confluent-platf...@googlegroups.com>.
> <https://groups.google.com/d/msgid/confluent-platform/0f5bfc54-e39f-4abc-9b97-ad8214d69158%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/confluent-platform/0f5bfc54-e39f-4abc-9b97-ad8214d69158%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
> > For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/5f0f77cb-5c95-440b-931c-a96333e0f376%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/5f0f77cb-5c95-440b-931c-a96333e0f376%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

ramon....@ext.privalia.com

unread,
Oct 22, 2018, 5:57:29 AM10/22/18
to Confluent Platform
posted the solution, its working, thanks for everything!
Reply all
Reply to author
Forward
0 new messages