Invalid Topology running kstream examples

1,428 views
Skip to first unread message

Jason Jho

unread,
May 2, 2016, 11:06:51 AM5/2/16
to Confluent Platform
Hi,

I'm trying to run the PageView example (https://github.com/confluentinc/examples/blob/master/kafka-streams/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java) as a standalone KStreams app and am seeing a TopologyBuilderException:

Exception in thread "main" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: KSTREAM-MAP-0000000001 and KTABLE-MAPVALUES-0000000004 are not joinable
 at org
.apache.kafka.streams.kstream.internals.AbstractStream.ensureJoinableWith(AbstractStream.java:44)
 at org
.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:383)
 at io
.confluent.examples.streams.PageViewRegionExample.main(PageViewRegionExample.java:102)


Note: I made some minor changes (see below) to allow the intermediate Avro schema to be discoverable when running in fat jar mode, but this is pretty unrelated.

Schema schema = new Schema.Parser().parse(PageViewRegionExample.class.getResourceAsStream("/io/confluent/examples/streams/pageviewregion.avsc"));


Any reason why a KStream-KTable leftJoin would break topology rules?

Guozhang Wang

unread,
May 17, 2016, 7:42:31 PM5/17/16
to Confluent Platform
Hi Jason,

Sorry for getting to this thread late, have you been able to resolve it now?

Guozhang

Philippe Derome

unread,
Jun 14, 2016, 10:16:30 PM6/14/16
to Confluent Platform
Guozhang,

I have the same problem running in fat jar mode. I also needed to work around the parse(new File("pageviewregion.avsc"))  issue (file does not exist in that location) but I worked around it by copying the file in current directory when launching java. I am running with confluent.3.0.0 distribution. A side issue which I reported on us...@kafka.apache.org is that I can only build the jar by excluding two examples as I consume too much memory on my MB-Air (4GB RAM device) so the streams-examples-3.0.0-standalone.jar has fewer classes as a result with about 1.5GB consumed during a mvn test command for 9 embedded test cases (but I don't see my post there yet).

Ideally, the README file that goes with the examples could be a little more explicit about the necessary steps to run all the examples in that mode with commands to create required topics and possibly any curl commands to send to schema-registry as we don't necessarily have the Confluent Admin Console to work with.

Phil Derome

Philippe Derome

unread,
Jun 14, 2016, 10:17:32 PM6/14/16
to Confluent Platform
Guozhang,

I have the same problem running in fat jar mode. I also needed to work around the parse(new File("pageviewregion.avsc"))  issue (file does not exist in that location) but I worked around it by copying the file in current directory when launching java. I am running with confluent.3.0.0 distribution. A side issue which I reported on us...@kafka.apache.org is that I can only build the jar by excluding two examples as I consume too much memory on my MB-Air (4GB RAM device) so the streams-examples-3.0.0-standalone.jar has fewer classes as a result with about 1.5GB consumed during a mvn test command for 9 embedded test cases (but I don't see my post there yet).

Ideally, the README file that goes with the examples could be a little more explicit about the necessary steps to run all the examples in that mode with commands to create required topics and possibly any curl commands to send to schema-registry as we don't necessarily have the Confluent Admin Console to work with.

Phil Derome

Philippe Derome

unread,
Jun 14, 2016, 10:38:12 PM6/14/16
to confluent...@googlegroups.com
The error looks similar to what is being discussed here: https://groups.google.com/forum/#!topic/confluent-platform/t2N4vr1MxgQ
between Matthias J. Sax and Peng Liu with this suggestion: 

You need to ensure, that both inputs are co-located. 

stream1 is not partitioned by the join key after the map. You need to 
trigger re-partitioning using through() .

I don't see a call to through in the example following:
KStream<String, GenericRecord> viewsByUser = views.map((dummy, record) -> new KeyValue<>((String) record.get("user"), record));

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/5cc39daa-427d-4ee4-9fb3-b4c8e6ba9d3d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Matthias J. Sax

unread,
Jun 15, 2016, 6:51:44 AM6/15/16
to confluent...@googlegroups.com
Seems like the same bug...

Can you patch the example (adding a .through() after .map()) and try again?

You should create the topic you use in .though() manually and ensure it
has the correct number of partitions -- co-locating requires both inputs
to have the same number of partitions for both inputs.

-Matthias

On 06/15/2016 04:37 AM, Philippe Derome wrote:
> The error looks similar to what is being discussed
> here: https://groups.google.com/forum/#!topic/confluent-platform/t2N4vr1MxgQ
> between Matthias J. Sax and Peng Liu with this suggestion:
>
> You need to ensure, that both inputs are co-located.
>
>
> stream1 is not partitioned by the join key after the map.
> You need to
>
> trigger re-partitioning using through() .
>
> I don't see a call to through in the example following:
>
> KStream<String, GenericRecord> viewsByUser = views.*map*((dummy, record) -> new KeyValue<>((String) record.get("user"), record));
>
>
> On Tue, Jun 14, 2016 at 10:17 PM, Philippe Derome <phde...@gmail.com
> <mailto:phde...@gmail.com>> wrote:
>
> Guozhang,
>
> I have the same problem running in fat jar mode. I also needed to
> work around the parse(new File("pageviewregion.avsc")) issue (file
> does not exist in that location) but I worked around it by copying
> the file in current directory when launching java. I am running with
> confluent.3.0.0 distribution. /A side issue which I reported
> on us...@kafka.apache.org <mailto:us...@kafka.apache.org> is that I
> can only build the jar by excluding two examples as I consume too
> much memory on my MB-Air (4GB RAM device) so
> the streams-examples-3.0.0-standalone.jar has fewer classes as a
> result with about 1.5GB consumed during a mvn test command for 9
> embedded test cases (but I don't see my post there yet)./
> *Note:* I made some minor changes (see below) to allow the
> intermediate Avro schema to be discoverable when running in
> fat jar mode, but this is pretty unrelated.
>
> Schema schema = new Schema.Parser().parse(PageViewRegionExample.class.getResourceAsStream("/io/confluent/examples/streams/pageviewregion.avsc"));
>
>
>
> Any reason why a KStream-KTable leftJoin would break
> topology rules?
>
> --
> You received this message because you are subscribed to a topic in
> the Google Groups "Confluent Platform" group.
> To unsubscribe from this topic, visit
> https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to
> confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to
> confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> <https://groups.google.com/d/msgid/confluent-platform/5cc39daa-427d-4ee4-9fb3-b4c8e6ba9d3d%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/CAPbAcg4fi-jFyffEj454Ndhg8sc-tR5SKrvx8jCixWP7QGvHqg%40mail.gmail.com
> <https://groups.google.com/d/msgid/confluent-platform/CAPbAcg4fi-jFyffEj454Ndhg8sc-tR5SKrvx8jCixWP7QGvHqg%40mail.gmail.com?utm_medium=email&utm_source=footer>.
signature.asc

Philippe Derome

unread,
Jun 15, 2016, 11:50:30 AM6/15/16
to confluent...@googlegroups.com
That appears to work since the application is waiting for any input to come in, but then I am not able to fully validate the example as I appear to incorrectly use kafka-avro-console-producer specifying essentially the same data as example's pageview.avsc (without the namespace piece) with JsonParseException as I am improvising in specifying JSON input to match the schema and must not understand the Avro schema to use in JSON. I also don't know whether "PageViews" inside the property value.schema can be arbitrary or should match the --topic argument of avro-console-producer.

The analogous change for the similar example on JDK 7 (no use of Lambdas) also gets the desired result of waiting on next input.

Are you suggesting I create a JIRA or work on a pull request or someone (Michael Noll, main contributor of examples) will go ahead and modify the examples?

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic PageViews --property value.schema='{

"type": "record","name": "PageViews","fields": [{"name": "user", "type": "string"},{"name": "page", "type": "string"},{"name": "industry", "type": "string"}]}'

...

{"user":"Paul"},{"page":"www.kafka.org"},{"industry","non-profit"}

org.apache.kafka.common.errors.SerializationException: Error deserializing json {"user":"Paul"},{"page":"www.kafka.org"},{"industry","non-profit"} to Avro of schema {"type":"record","name":"PageViews","fields":[{"name":"user","type":"string"},{"name":"page","type":"string"},{"name":"industry","type":"string"}]}

Caused by: org.apache.avro.AvroTypeException: Expected string. Got END_OBJECT

at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:697)

at org.apache.avro.io.JsonDecoder.readString(JsonDecoder.java:227)


To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/576132A5.9050406%40confluent.io.

Guozhang Wang

unread,
Jun 15, 2016, 2:43:15 PM6/15/16
to Confluent Platform
Hello Philippe,

I think this is indeed a bug in the examples code, such that we have updated the Kafka Streams in 0.10.0.0 release since the tech preview, but the examples code are not updated accordingly.

If you are willing to submit a PR to the examples code, we would love to review and merge it.


Guozhang

>     To post to this group, send email to
>     confluent...@googlegroups.com
>     <mailto:confluent...@googlegroups.com>.
>     To view this discussion on the web visit
>     https://groups.google.com/d/msgid/confluent-platform/5cc39daa-427d-4ee4-9fb3-b4c8e6ba9d3d%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/5cc39daa-427d-4ee4-9fb3-b4c8e6ba9d3d%40googlegroups.com?utm_medium=email&utm_source=footer>.
>     For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Philippe Derome

unread,
Jun 15, 2016, 7:48:05 PM6/15/16
to confluent...@googlegroups.com
If that were the only issue with that problem I probably could do the PR, but the last hurdle I encountered I doubt much that I can solve that with little effort.

I get a NullPointerDbException in PageViewRegionLambdaExample trying to write state to RocksDB and that's probably too hard for me to troubleshoot right now. See bottom of email for the stack.
Prior to this I had some cast exception to address with respect to deserialize Avro to String. This link with poster Aaron Kimball suggests we don't cast to String and use the toString method in here (I assume it's a required change for the examples):
There are several casts to String in this example, as well as for the non-Lambda one:
                return new KeyValue<>((String) record.get("user"), record);  // non-Lambda sample
could be 
return new KeyValue<>( record.get("user").toString(), record); // non-Lambda sample
KStream<String, GenericRecord> viewsByUser = views.map((dummy, record) -> new KeyValue<>( record.get("user").toString(), record))
.through("re-partitioning-topic");
KTable<String, String> userRegions = users.mapValues(record -> record.get("region").toString());


Stack trace:

Exception in thread "StreamThread-1" java.lang.NullPointerException

at org.rocksdb.AbstractWriteBatch.put(AbstractWriteBatch.java:19)

at org.apache.kafka.streams.state.internals.RocksDBStore.putAllInternal(RocksDBStore.java:319)

at org.apache.kafka.streams.state.internals.RocksDBStore.flushCache(RocksDBStore.java:379)

at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:411)

at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:165)


>     To post to this group, send email to
>     confluent...@googlegroups.com
>     <mailto:confluent...@googlegroups.com>.

>     To view this discussion on the web visit
>     https://groups.google.com/d/msgid/confluent-platform/5cc39daa-427d-4ee4-9fb3-b4c8e6ba9d3d%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/5cc39daa-427d-4ee4-9fb3-b4c8e6ba9d3d%40googlegroups.com?utm_medium=email&utm_source=footer>.
>     For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Michael Noll

unread,
Jun 16, 2016, 10:15:37 AM6/16/16
to confluent...@googlegroups.com
Phil,

regarding your NPE:  Could there be null keys or null values in your input data (i.e. the user profiles topic and/or the page views topic)?

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Best regards,
Michael Noll


Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
Download Apache Kafka and Confluent Platform: www.confluent.io/download

Philippe Derome

unread,
Jun 16, 2016, 11:02:00 AM6/16/16
to confluent...@googlegroups.com
I definitely could be doing something wrong as I am having difficulty specifying a proper JSON to AVRO producer input when UserProfile.avsc schema says {"name": "experience", "type": ["string", "null"]},
so I replaced "type" value with just "String" as I don't know how to create valid JSON input to match that type of ["type", "null"] (then I say {"user":"Paul","region":"CA"} as input to avro producer). Also, I am not sure about how the left join should work; should the userprofile schema really specify "experience" and not "user" as "user" would seem more natural? How would Streams know to do a join on "experience"? Just by position in the schema?

Philippe Derome

unread,
Jun 16, 2016, 11:09:43 AM6/16/16
to confluent...@googlegroups.com
More fundamentally I don't understand how the join would work:
pageview.avsc has fields user, page, and industry for a stream whereas userprofile.avsc has fields experience and region. Looks like we're trying to mate cats with dogs if humour is allowed... Should not there be a common field in the two schemas? Or we are simply happy to set pageview.user as "Paul" and userprofile.experience as "Paul" for  a match? In the real world "Paul" is a name or user not an experience (like plumber might be).

Philippe Derome

unread,
Jun 16, 2016, 5:31:00 PM6/16/16
to Confluent Platform
I'd like to work with that suggestion, but concretely how do I cleanse my data if it may be polluted with nulls as you suggest? Would wiping out /tmp/kafka.logs and force deleting topics do the trick? 

I have this setting for Kafka to help me force delete topics.
delete.topic.enable=true


I wonder if anyone at Confluent has run this particular example recently and could confirm it normally works as it represents high pedagogical value.

>     To post to this group, send email to
>     confluent...@googlegroups.com
>     <mailto:confluent...@googlegroups.com>.

>     To view this discussion on the web visit
>     https://groups.google.com/d/msgid/confluent-platform/5cc39daa-427d-4ee4-9fb3-b4c8e6ba9d3d%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/5cc39daa-427d-4ee4-9fb3-b4c8e6ba9d3d%40googlegroups.com?utm_medium=email&utm_source=footer>.
>     For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Michael Noll

unread,
Jun 17, 2016, 2:06:39 AM6/17/16
to confluent...@googlegroups.com
Phil,

> More fundamentally I don't understand how the join would work:
> pageview.avsc has fields user, page, and industry for a stream whereas userprofile.avsc has fields experience and region. > Looks like we're trying to mate cats with dogs if humour is allowed... Should not there be a common field in the two
> schemas? Or we are simply happy to set pageview.user as "Paul" and userprofile.experience as "Paul" for  a match?
> In the real world "Paul" is a name or user not an experience (like plumber might be).

for the sake of other readers who might have a similar question let me summarize what I already shared with you in an offline email discussion.

In Kafka and Kafka Streams, messages/records are always key-value pairs.  Joins operate on the *key* of these records.  The Avro schemas in the examples only apply to the *values*.  In the case of the user profiles in the example, the record value contains only "experience" and "region".  The user information is assumed to be in the record key, see https://github.com/confluentinc/examples/blob/master/kafka-streams/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java#L97-L100.

Hope this helps!
Michael







Philippe Derome

unread,
Jun 17, 2016, 11:51:57 AM6/17/16
to Confluent Platform
actually it does not help since the example is actually provided by Confluent and not by me, so Confluent should fix the broken published examples including this one, or it should simply remove from the documented confluent website all broken examples as it deters interest in learning about Kafka Streams and Confluent's solutions (CP3) as broken examples are a waste of time for those trying them out, particularly if they're published by a for profit organization (an open-source project may be forgiven due to lack of funding). I have no issue with bugs in commercial or open-source products in general, but published broken examples from a corporation is a non-starter, especially after repeated notification. It'd seem so easy and simple and quick for some of your staff to address this, but that's apparently not the direction you/Confluent want(s) to take. If time was a constraint, it's possible that CP3 was released too soon since the documentation should have been vetted.

Not only examples should be working but the steps required to execute them should be very clear. Anecdotally, I do not recall of other major software corporations that keep non-working examples of their products on their website.

I deleted all the logs, restarted all the servers and even simplified the schema so that there is no union for "experience" since keeping the union actually broke the example sooner. I also had to remove String casts in the example as it's also broken with the String casts.


This is how far I went through with this example:

java -cp target/streams-examples-3.0.0-standalone.jar  io.confluent.examples.streams.PageViewRegionLambdaExample

log4j:WARN No appenders could be found for logger (org.apache.kafka.streams.StreamsConfig).

log4j:WARN Please initialize the log4j system properly.

Exception in thread "StreamThread-1" java.lang.NullPointerException

at io.confluent.examples.streams.PageViewRegionLambdaExample.lambda$main$3(PageViewRegionLambdaExample.java:104)

at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)

at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)

at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)

at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)

at org.apache.kafka.streams.kstream.internals.KStreamKTableLeftJoin$KStreamKTableLeftJoinProcessor.process(KStreamKTableLeftJoin.java:61)

at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)

at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)

at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)

at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)

at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)

at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)

at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

>     To post to this group, send email to
>     confluent...@googlegroups.com
>     <mailto:confluent...@googlegroups.com>.

>     To view this discussion on the web visit
>     https://groups.google.com/d/msgid/confluent-platform/5cc39daa-427d-4ee4-9fb3-b4c8e6ba9d3d%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/5cc39daa-427d-4ee4-9fb3-b4c8e6ba9d3d%40googlegroups.com?utm_medium=email&utm_source=footer>.
>     For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Best regards,
Michael Noll


Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
Download Apache Kafka and Confluent Platform: www.confluent.io/download

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Philippe Derome

unread,
Jun 17, 2016, 3:16:33 PM6/17/16
to Confluent Platform
I can confirm Confluent made a change today to PageViewRegionLambdaExample and it appears that it works very well with clear instructions.

Thanks much!

Michael Noll

unread,
Jun 17, 2016, 3:23:12 PM6/17/16
to confluent...@googlegroups.com
Our pleasure, Phil.

Again, sorry for the bumpy start, and happy to hear it's working well for you now.  For what it's worth, we have also identified a few other issues in the Kafka Streams examples (of github.com/confluentinc/examples) under src/main/.  Please stay tuned, and feel free to report back here in the meantime.

Best,
Michael



>     To post to this group, send email to
>     confluent...@googlegroups.com
>     <mailto:confluent...@googlegroups.com>.

>     To view this discussion on the web visit
>     https://groups.google.com/d/msgid/confluent-platform/5cc39daa-427d-4ee4-9fb3-b4c8e6ba9d3d%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/5cc39daa-427d-4ee4-9fb3-b4c8e6ba9d3d%40googlegroups.com?utm_medium=email&utm_source=footer>.
>     For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Best regards,
Michael Noll


Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
Download Apache Kafka and Confluent Platform: www.confluent.io/download

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Philippe Derome

unread,
Jun 20, 2016, 10:23:45 PM6/20/16
to Confluent Platform
In the interest of clarity of other people starting on examples with Streams (Confluent may correct me).

I didn't understand how the streams joined with which data item in below code. It's implied in the objects view and region that both have keys and values, so the code for leftJoin at end of my commentary today compares the keys of view and region, which are elements respectively of object for which leftJoin applies and 1st parameter userRegions. The keys are set up earlier (though this whole code is declarative and evaluation is delayed upon tasks kicking in, so the code can be viewed as a pattern/template to run on).

.leftJoin(userRegions, (view, region) -> {
GenericRecord viewRegion = new GenericData.Record(schema);
viewRegion.put("user", view.get("user"));
viewRegion.put("page", view.get("page"));
viewRegion.put("region", region);
return viewRegion;
})

What I also didn't understand is how we could specify the proper keys, here, they are "users" in the domain world (and the put("user"...) call is purely coincidental and unrelated and confused me a little). The keys are set by the recently introduced companion driver class (which was not available when I raised the question). For view, that's done from object viewsByUser and that is derived from this "user" attribute lookup:
new KeyValue<>(record.get("user").toString(), record))

For item region, this goes back to KTable userRegions in this snippet (first type String for KTable designates the key type):
KTable<String, String> userRegions = users.mapValues(record -> record.get("region").toString());
whose key comes from users and that users is created just before as:
KTable<String, GenericRecord> users = builder.table("UserProfile");

Object Users above gets "user" key from topic "UserProfile" and that comes from the new companion driver code that publishes on to the topic keys 
and values as follows with users being a normal Java String array that is initialized in the driver (with userProfilesTopic object below evaluating to "UserProfile":
for (String user : users) {
    userProfileBuilder.set("experience", "some");
    userProfileBuilder.set("region", regions[random.nextInt(regions.length)]);
    producer.send(new ProducerRecord<>(userProfilesTopic, user, userProfileBuilder.build()));

Finally, we are in business to get the very impressive leftJoin, which is type safe (no messy "as" or String SQL stuff as in Spark 1.6/2.0 DataSets), no need to create an artificial class representing the resulting join, and concise thanks to JDK 8 Lambdas. The callback in braces that follow the selected rows/items view, region
is used to generate new data, the GenericRecord. Further transformations take place with that data and once all done, it's published to the topic and thus to Kafka. This Streams DSL looks quite elegant on first look!

.leftJoin(userRegions, (view, region) -> {
GenericRecord viewRegion = new GenericData.Record(schema);
viewRegion.put("user", view.get("user"));
viewRegion.put("page", view.get("page"));
viewRegion.put("region", region);
return viewRegion;
})
>     To post to this group, send email to
>     confluent...@googlegroups.com
>     <mailto:confluent...@googlegroups.com>.

>     To view this discussion on the web visit
>     https://groups.google.com/d/msgid/confluent-platform/5cc39daa-427d-4ee4-9fb3-b4c8e6ba9d3d%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/5cc39daa-427d-4ee4-9fb3-b4c8e6ba9d3d%40googlegroups.com?utm_medium=email&utm_source=footer>.
>     For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Best regards,
Michael Noll


Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
Download Apache Kafka and Confluent Platform: www.confluent.io/download

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/I7RnGHv7Bm8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages