Simple KStream transform

575 views
Skip to first unread message

Shannon Ma

unread,
Oct 5, 2016, 2:54:22 PM10/5/16
to Confluent Platform
Hi,

Jut start looking at kafka stream, i have a simple question, i just need to transform data from one topic (avro) and publish to a new topci (json format), so i am writing my TransformerSupplier, i dont know do i construct the return KStream, i cannot find any example.


code is like

public KStream<String, String> transform(String key, GenericRecord value) {

Thanks
Shannon

Matthias J. Sax

unread,
Oct 5, 2016, 6:09:11 PM10/5/16
to confluent...@googlegroups.com
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Shannon,

if you want to process each record independently, you might be better
of using KStreams#mapValues() or KStream#map()

#transform() allows to attach a state to the operation -- a feature
you might not need.

About creating KStream. You do not need to create one explicitly. You
just write a function (either map or transform) that computes a single
output record for each input record. Kafka Streams does the rest for you
.

You use it like:

KStream inputStream = KStreamBuilder.stream("topicName");
KStream resultStream = inputStream.map(/* give map function here */);
resultStream.to("outputTopicName");


- -Matthias
> -- You received this message because you are subscribed to the
> Google Groups "Confluent Platform" group. To unsubscribe from this
> group and stop receiving emails from it, send an email to
> confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>. To post
> to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>. To view this
> discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/a61d6ae8-1b46-4a0
1-b15d-f804d80a8165%40googlegroups.com
>
>
<https://groups.google.com/d/msgid/confluent-platform/a61d6ae8-1b46-4a01
- -b15d-f804d80a8165%40googlegroups.com?utm_medium=email&utm_source=footer
>.
> For more options, visit https://groups.google.com/d/optout.
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJX9XoFAAoJECnhiMLycopPyVoQAKaMWAW+PBgwnvwNoWSJzjP8
GGjPKMV8JS8Dqauqr4A6H5/DSgqqYynLYMFhycMj/XQ8tQdVhR4vpySmwDl9mIpD
TeXYzUJOrRiL9HvwLmPkA48Ltt84wUwROOi8nvkiKhDTVMaosPr/SQfXp57nYIEO
fK4yCD4qJ4tLBNcX8G5plx2/JQ7veVM1+wPI7Qgc92VEmln/MH9rEC4tnBAZzHMR
BQ0slHkdDtz4E+0nXrDphSBRVNhQNhlSigdN3hKFcXCOuUG6ROiXBL8iCIad/4Gn
7kto/TZqHN5jKyTG93RU/WDoDbkRJLH2vaezciMYirSKBJS0Xm1bLk2VhWe0seDZ
RFGnFisCSesTQK0o86BIUVeD5NkHHYCA3cochf2EtPGHExyPEOnGFr89XL4WdOAL
CX191sJ2Gkd3WHL6Emz5uM5xRhbDUFMhqj5ThP4LfQThYva9g+WB0vgfQ2WC14a5
tXssM3zMShEQnHsr3FCjAIk1VV8hZEMwz+mVn8dq4/y/sQhLwhTL+HzxCsDlAXBT
x/pS80p0dBmhloqI5GE9ouBXd46jQuIaaBnlI3BylryNzAPXRtFMPQBVMWcxcyWh
/q3jAeYM7gtzMRLVSazFZAkEbN+5DyygPvWWq/OHMlu+WpUk6Qkz/yJk8J3uVt1y
vfXGZ+CC8OO3jf2Kbdks
=hDHi
-----END PGP SIGNATURE-----

Shannon Ma

unread,
Oct 5, 2016, 10:29:23 PM10/5/16
to Confluent Platform
Thanks, i already coded the transform, 

TransformerSupplier<String, GenericRecord, KeyValue<String, String>> transformerSupplier =
           new TransformerSupplier<String, GenericRecord, KeyValue<String, String>>() {
   
               public Transformer<String, GenericRecord, KeyValue<String, String>> get() {
                   return new Transformer<String, GenericRecord, KeyValue<String, String>>() {


and my transform() method, 

 public KeyValue<String, String> transform(String key, GenericRecord value) {




i am getting

java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord


what did i do wrong, basically my input is <String, GenericRecord> output is <String, String>

Shannon

Shannon Ma

unread,
Oct 5, 2016, 10:52:36 PM10/5/16
to Confluent Platform
I figured out, i did not put serde in stream.to(), after put string serde for key and value, it works.

Matthias J. Sax

unread,
Oct 6, 2016, 1:51:06 PM10/6/16
to confluent...@googlegroups.com
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Great! Glad it works. :)
> -Matthias
>
>
> On 10/5/16 11:54 AM, Shannon Ma wrote:
>> Hi,
>
>> Jut start looking at kafka stream, i have a simple question, i
> just
>> need to transform data from one topic (avro) and publish to a
>> new topci (json format), so i am writing my TransformerSupplier,
>> i
> dont
>> know do i construct the return KStream, i cannot find any
> example.
>
>
>> code is like
>
>> public KStream<String, String> transform(String key,
> GenericRecord
>> value) {
>
>> Thanks Shannon
>
>> -- You received this message because you are subscribed to the
>> Google Groups "Confluent Platform" group. To unsubscribe from
> this
>> group and stop receiving emails from it, send an email to
>> confluent-platf...@googlegroups.com
>> <mailto:confluent-platf...@googlegroups.com>. To
01-b15d-f804d80a8165%40googlegroups.com>
>
>
>
>
> <https://groups.google.com/d/msgid/confluent-platform/a61d6ae8-1b46-4a
01
>
>
<https://groups.google.com/d/msgid/confluent-platform/a61d6ae8-1b46-4a01
>
>
> -
> -b15d-f804d80a8165%40googlegroups.com?utm_medium=email&utm_source=foot
er
>
>
<http://40googlegroups.com?utm_medium=email&utm_source=footer>
>> . For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
> -- You received this message because you are subscribed to the
> Google Groups "Confluent Platform" group. To unsubscribe from this
> group and stop receiving emails from it, send an email to
> confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>. To post
> to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>. To view this
> discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/8d94fae7-785a-4de
c-b21e-aff161e32d83%40googlegroups.com
>
>
<https://groups.google.com/d/msgid/confluent-platform/8d94fae7-785a-4dec
- -b21e-aff161e32d83%40googlegroups.com?utm_medium=email&utm_source=footer
>.
> For more options, visit https://groups.google.com/d/optout.
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJX9o7/AAoJECnhiMLycopPxG8P/A95II+TlCjDSvb7zgbSE2SC
QC77Zb8xmJtw5AsMyZKT0gaETyC14GVzQCfDxkOR3XxQWGpoeXMyFWusMJJWdrow
Jqx41F7zfqY7ceTM31DsXrYMaS/f8NZPlGhRDZkIXQZ4u8Er4T1bVTUSMUBArTHb
Me2uoF7ofSwXVj/k5WEqqk3/fUuzPtR1xIfUI0C09MKO7bPlHLHesImPJMVhrZsm
fTF5l3sCgzdA8PD5fNpYUuPG2V4FTbVgvH9o0IFyvc3oRpa9N4T19lgRlECAqjPS
bRwop03hW5hIeQNX7ex+RPdfKUfuwOVujRvhXzFN3C5yTeMJybzY2WnzHwnhCfEP
UzwvbGtnL6vcr1ZpBsT1THSPoDIzu5rgH2Mr3jCOda+O9DT2C6PRuNh9YAOkZx5t
o1o+5+T6+9q2vc5fIXNClY2td6qMCcB9j0PAYoa6tmwROw6aEb41jJhAuZgX0KSo
oeAZnq840lRsMZh13GLw0RufJGLp0Ca4EISpT5fLdSDAFmjfYIjtCdQyXvFhNeQT
1IWFJizdhS1wjYz/suZGfPi1m4FqNYtGE2XPtLd/4S+/tIJpBxA96jxDip2SDShR
0zI2HsdstilucvNIN3LjfGdrbN/Z7mo78d/51Z+yZ/KGidwGg9x18gkR1Zc4n0Kl
2Lq5VDCvpVev1cY1L1SU
=iwBW
-----END PGP SIGNATURE-----

Michael Noll

unread,
Nov 28, 2016, 9:34:09 AM11/28/16
to confluent...@googlegroups.com
For future readers of this thread:   Shannon's question was resolved via other means, but in case you are wondering how to use the `KStream#transform()` method -- and similarly, the `KStream#process()` method -- please take a look at the following code example:


The link above is for the `3.1.x` branch of https://github.com/confluentinc/examples, which means it's compatible with Confluent Platform 3.1.x and Kafka 0.10.1.

-Michael



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/a61d6ae8-1b46-4a01-b15d-f804d80a8165%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog
Reply all
Reply to author
Forward
0 new messages