kafka stream transform return two records

580 views
Skip to first unread message

Shannon Ma

unread,
Nov 10, 2016, 11:13:05 AM11/10/16
to Confluent Platform
Hi,

I am wondering if this is possible, in my kafka stream transform logic, based on the input message, sometimes i need to return two output messages, so instead of 


KeyValue<String, GenericRecord> transform(String key, GenericRecord  value) 

can i do something like


List<KeyValue<String, GenericRecord>> transform(String key, GenericRecord  value) {


Thanks
Shannon

Eno Thereska

unread,
Nov 10, 2016, 12:39:43 PM11/10/16
to Confluent Platform
Unfortunately I don't think it's possible currently.

Eno

Eno Thereska

unread,
Nov 10, 2016, 12:44:37 PM11/10/16
to Confluent Platform
I think I might have misunderstood your question. If you have to operate with return type of KeyValue<String, GenericRecord>, then you have to use that return type. However, if you want to define a different return type (eg a list) that's doeable but then you have to keep using that return. See https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java#L57.

Eno

Michael Noll

unread,
Nov 10, 2016, 1:20:01 PM11/10/16
to confluent...@googlegroups.com
Shannon,

as Eno said, yes, this is possible.  The "caveat" that Eno mentioned is that you'd always have to return a list of results, e.g. a list containing a single element when there's just a single output message to return.

Does that answer your question?



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/670aa53f-1e7b-46dd-a137-e2c4f44996d3%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

Matthias J. Sax

unread,
Nov 10, 2016, 2:45:18 PM11/10/16
to confluent...@googlegroups.com
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

- From my understanding, you can emit as many records as you wish.
However, you need to use ProcessorContext#forward() for this.

The return type of transform() MUST always be "KeyValue".

Thus, you would do something link this:

> new Transformer() { private ProcessorContext context;
>
> void init(ProcessorContext context) { this.context = context; }
>
> KeyValue transform(K key, V value) { context.forward(new
> KeyValue(...)); context.forward(new KeyValue(...));
>
> return null; // don't need to emit anything else -- you can also
> return "new KeyValue()" here in addition to context.forward } }

Please let me know if this works.

- -Matthias


On 11/10/16 10:19 AM, Michael Noll wrote:
> Shannon,
>
> as Eno said, yes, this is possible. The "caveat" that Eno
> mentioned is that you'd always have to return a list of results,
> e.g. a list containing a single element when there's just a single
> output message to return.
>
> Does that answer your question?
>
>
>
> On Thu, Nov 10, 2016 at 6:44 PM, Eno Thereska
> <eno.th...@gmail.com <mailto:eno.th...@gmail.com>> wrote:
>
> I think I might have misunderstood your question. If you have to
> operate with return type of KeyValue<String, GenericRecord>, then
> you have to use that return type. However, if you want to define a
> different return type (eg a list) that's doeable but then you have
> to keep using that return. See
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/a
pache/kafka/streams/kstream/internals/KStreamTransformTest.java#L57
>
>
<https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/ap
ache/kafka/streams/kstream/internals/KStreamTransformTest.java#L57>.
>
> Eno
>
> On Thursday, 10 November 2016 17:39:43 UTC, Eno Thereska wrote:
>
> Unfortunately I don't think it's possible currently.
>
> Eno
>
> On Thursday, 10 November 2016 16:13:05 UTC, Shannon Ma wrote:
>
> Hi,
>
> I am wondering if this is possible, in my kafka stream transform
> logic, based on the input message, sometimes i need to return two
> output messages, so instead of
>
>
> KeyValue<String, GenericRecord> transform(String key, GenericRecord
> value)
>
> can i do something like
>
>
> List<KeyValue<String, GenericRecord>> transform(String key,
> GenericRecord value) {
>
>
> Thanks Shannon
>
> -- You received this message because you are subscribed to the
> Google Groups "Confluent Platform" group. To unsubscribe from this
> group and stop receiving emails from it, send an email to
> confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>. To post
> to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>. To view this
<https://groups.google.com/d/msgid/confluent-platform/670aa53f-1e7b-46dd
- -a137-e2c4f44996d3%40googlegroups.com?utm_medium=email&utm_source=footer
>.
>
> For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
>
>
>
> -- *Michael G. Noll* Product Manager | Confluent +1 650 453 5860 |
> @miguno <https://twitter.com/miguno> Follow us: Twitter
> <https://twitter.com/ConfluentInc> | Blog
> <http://www.confluent.io/blog>
>
> -- You received this message because you are subscribed to the
> Google Groups "Confluent Platform" group. To unsubscribe from this
> group and stop receiving emails from it, send an email to
> confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>. To post
> to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>. To view this
> discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/CAA4zQxKMMoQrEc4z
okr3jYZRFMcwQ78XboZYhSiopW7cF78rXw%40mail.gmail.com
>
>
<https://groups.google.com/d/msgid/confluent-platform/CAA4zQxKMMoQrEc4zo
kr3jYZRFMcwQ78XboZYhSiopW7cF78rXw%40mail.gmail.com?utm_medium=email&utm_
source=footer>.
> For more options, visit https://groups.google.com/d/optout.
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYJM5LAAoJECnhiMLycopPHiIQAI4Rv5PUIGOCVIiFDl0BTmOy
FgD8xHPagQ5VkA5vBdnZv1wHzlYiIMiSdH/HfXOyTCMFNE7cF8Cz3BFNvTeju14r
gpY0OoNIekBTCNy7cqtwYnBhbviFPo478GjpWl2U3q2EpBSp5ufg9V4gVI7e6c/s
/JQvDyIE5xk9tq/O1NYDts/6fmvV9WatWa3374CGzdynFf4xcgg240aU3MZObpdr
3PtA7rTD8FGjh7xx0J3UpV3xeOU66IX5dzxK0b73JedaXyulGau+ejRSZ3hcn7pT
hhjRXC+3CXj5i5NVi561MZamz5cps60BHy7D9JeAnnJPLSwB7C5qCzv2Q1E26Iyh
kUrD9AKuOhEVJxR3t+DTc4NIDCrKVD7upsBZhVNLigKUPJExMx5l7OwAR2K95fag
T1jd/SioGhUxvk+EMj0fc8nBaZOJDYxCv3Jz5og5osHQqTJ8ZDjV1IA4KfPdX+37
XZ9fqrSBMkrn/vTF9WsF2PtJXzLezzItYwT769ilTc2GT9EbaPCQEJF9p8m2q/gr
e8UHirdxDNLI2N+RJT25yhwNyfK+EX/QS7QZlJzDBxV2TiCgUatOraMJr4j48V1W
VueJLBroLIYhWtpER4j7JgX/+foZx8CmpRJY2HngyuCJToEs/8ZlVbniihqw/jS5
vz4ievTDOBbed0Q5wkL1
=WZxD
-----END PGP SIGNATURE-----

Warren Kiser

unread,
Nov 10, 2016, 3:30:45 PM11/10/16
to Confluent Platform
Maybe I'm misunderstanding the question, but wouldn't flatMap be a good candidate for this use case?
> to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>. To view this
> discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/670aa53f-1e7b-46d
d-a137-e2c4f44996d3%40googlegroups.com

>
>
<https://groups.google.com/d/msgid/confluent-platform/670aa53f-1e7b-46dd
- -a137-e2c4f44996d3%40googlegroups.com?utm_medium=email&utm_source=footer
>.
>
> For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
>
>
>
> -- *Michael G. Noll* Product Manager | Confluent +1 650 453 5860 |
> @miguno <https://twitter.com/miguno> Follow us: Twitter
> <https://twitter.com/ConfluentInc> | Blog
> <http://www.confluent.io/blog>
>
> -- You received this message because you are subscribed to the
> Google Groups "Confluent Platform" group. To unsubscribe from this
> group and stop receiving emails from it, send an email to

Matthias J. Sax

unread,
Nov 10, 2016, 6:49:23 PM11/10/16
to confluent...@googlegroups.com
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

It might be. The difference it, that flatMap() is stateless, while
transform() allows to attach a state to the operator.

If no state is needed, flatMap() would be the better choice.


- -Matthias



On 11/10/16 12:30 PM, 'Warren Kiser' via Confluent Platform wrote:
> Maybe I'm misunderstanding the question, but wouldn't flatMap be a
> good candidate for this use case?
>
> On Thursday, November 10, 2016 at 2:45:18 PM UTC-5, Matthias J. Sax
> wrote:
>
> - From my understanding, you can emit as many records as you wish.
> However, you need to use ProcessorContext#forward() for this.
>
> The return type of transform() MUST always be "KeyValue".
>
> Thus, you would do something link this:
>
>> new Transformer() { private ProcessorContext context;
>
>> void init(ProcessorContext context) { this.context = context; }
>
>> KeyValue transform(K key, V value) { context.forward(new
>> KeyValue(...)); context.forward(new KeyValue(...));
>
>> return null; // don't need to emit anything else -- you can also
>> return "new KeyValue()" here in addition to context.forward } }
>
> Please let me know if this works.
>
> -Matthias
>
>
> On 11/10/16 10:19 AM, Michael Noll wrote:
>> Shannon,
>
>> as Eno said, yes, this is possible. The "caveat" that Eno
>> mentioned is that you'd always have to return a list of results,
>> e.g. a list containing a single element when there's just a
>> single output message to return.
>
>> Does that answer your question?
>
>
>
>> On Thu, Nov 10, 2016 at 6:44 PM, Eno Thereska
>> <eno.th...@gmail.com <javascript:> <mailto:eno.th...@gmail.com
> <javascript:>>> wrote:
>
>> I think I might have misunderstood your question. If you have to
>> operate with return type of KeyValue<String, GenericRecord>,
>> then you have to use that return type. However, if you want to
>> define a different return type (eg a list) that's doeable but
>> then you have to keep using that return. See
>
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/a
>
>
pache/kafka/streams/kstream/internals/KStreamTransformTest.java#L57
> <https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/
apache/kafka/streams/kstream/internals/KStreamTransformTest.java#L57>
>
>
>
>
> <https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/
ap
>
>
> ache/kafka/streams/kstream/internals/KStreamTransformTest.java#L57
> <https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/
apache/kafka/streams/kstream/internals/KStreamTransformTest.java#L57>>.
>
>
>
>> Eno
>
>> On Thursday, 10 November 2016 17:39:43 UTC, Eno Thereska wrote:
>
>> Unfortunately I don't think it's possible currently.
>
>> Eno
>
>> On Thursday, 10 November 2016 16:13:05 UTC, Shannon Ma wrote:
>
>> Hi,
>
>> I am wondering if this is possible, in my kafka stream transform
>> logic, based on the input message, sometimes i need to return
>> two output messages, so instead of
>
>
>> KeyValue<String, GenericRecord> transform(String key,
>> GenericRecord value)
>
>> can i do something like
>
>
>> List<KeyValue<String, GenericRecord>> transform(String key,
>> GenericRecord value) {
>
>
>> Thanks Shannon
>
>> -- You received this message because you are subscribed to the
>> Google Groups "Confluent Platform" group. To unsubscribe from
>> this group and stop receiving emails from it, send an email to
>> confluent-platf...@googlegroups.com <javascript:>
>> <mailto:confluent-platf...@googlegroups.com
> <javascript:>>. To post
>> to this group, send email to confluent...@googlegroups.com
> <javascript:>
>> <mailto:confluent...@googlegroups.com <javascript:>>. To view
dd-a137-e2c4f44996d3%40googlegroups.com>
>
>
>
>
> <https://groups.google.com/d/msgid/confluent-platform/670aa53f-1e7b-46
dd
>
>
<https://groups.google.com/d/msgid/confluent-platform/670aa53f-1e7b-46dd
>
>
> -
> -a137-e2c4f44996d3%40googlegroups.com?utm_medium=email&utm_source=foot
er
>
>
<http://40googlegroups.com?utm_medium=email&utm_source=footer>
>> .
>
>> For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>
>> <https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>>.
>
>
>
>
>> -- *Michael G. Noll* Product Manager | Confluent +1 650 453 5860
>> | @miguno <https://twitter.com/miguno> Follow us: Twitter
>> <https://twitter.com/ConfluentInc
> <https://twitter.com/ConfluentInc>> | Blog
>> <http://www.confluent.io/blog>
>
>> -- You received this message because you are subscribed to the
>> Google Groups "Confluent Platform" group. To unsubscribe from
>> this group and stop receiving emails from it, send an email to
>> confluent-platf...@googlegroups.com <javascript:>
>> <mailto:confluent-platf...@googlegroups.com
> <javascript:>>. To post
>> to this group, send email to confluent...@googlegroups.com
> <javascript:>
>> <mailto:confluent...@googlegroups.com <javascript:>>. To view
<https://groups.google.com/d/msgid/confluent-platform/CAA4zQxKMMoQrEc4z>
>
> okr3jYZRFMcwQ78XboZYhSiopW7cF78rXw%40mail.gmail.com
> <http://40mail.gmail.com>
>
>
> <https://groups.google.com/d/msgid/confluent-platform/CAA4zQxKMMoQrEc4
zo
>
>
<https://groups.google.com/d/msgid/confluent-platform/CAA4zQxKMMoQrEc4zo
>
>
> kr3jYZRFMcwQ78XboZYhSiopW7cF78rXw%40mail.gmail.com?utm_medium=email&ut
m_
>
> source=footer
> <http://40mail.gmail.com?utm_medium=email&utm_source=footer>>.
> -- You received this message because you are subscribed to the
> Google Groups "Confluent Platform" group. To unsubscribe from this
> group and stop receiving emails from it, send an email to
> confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>. To post
> to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>. To view this
> discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/800ed6a0-fc08-42d
2-9f66-9698288c820d%40googlegroups.com
>
>
<https://groups.google.com/d/msgid/confluent-platform/800ed6a0-fc08-42d2
- -9f66-9698288c820d%40googlegroups.com?utm_medium=email&utm_source=footer
>.
> For more options, visit https://groups.google.com/d/optout.
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYJQeAAAoJECnhiMLycopPRHwP/2x6AVUn8EGfYOpOfaGTvRv5
+3jLfRaNJamMdVsaSaEG4sGvvGrlh1xTnBdPJ0yMbZL8K/iik7kU/Wb50Z9xZ3aO
16jDKxtPiOCAaXkKVDU7846aDzr+fnaUuRhQMpOoIUZxitL5SfzQCekylq+wW/hB
u1hG5FZNlkPl4p49wBO9g/SmsDRXYABBAnQTycHfZT0gEkeGB1S4VBAcv/pO+kNb
luzINWeIrnzHDFB76aJ/K3osrDh6ManWaPyCe9fnbEEaC96cWMumTuBVgC/AfQYn
aMHpDrzAqBsVJ/bcEt7lzoRxiwk3uZutFdUS7m7QOsXiqzG9BI4jxb0/pVVinWrr
lTp7EOHKFEWx+bU/Mkx8W1jEshIvbrJpf/EFDha7V29yrmirdBznFgpW2uJoiABm
sQX/2t188+j+i3a+f6x0isbEq3LWdPo59NNSaF6NfYpZQE0rVFRQsNYe25igXBHt
Fh4xUbSzl+r8mQehK266VT/CQ1puqhnHmsSwY8b6hjKmkUjRvIFFIojaJU+AjcLF
2VWAKWaDhY+LLx4dK6ECfYpQ2ZZSX3DtV3HnsM38ZK6oO1GHde9JwHuYaDpRGfGC
1DGTAhdemuI4if9dq81U88wGc4E3RNPj5me6DgK40hLgR1TzKdfi9yHXzoR2csCC
y3xbVHS+Lo+yhvYJL5VP
=pjwh
-----END PGP SIGNATURE-----

Shannon Ma

unread,
Nov 11, 2016, 10:11:35 AM11/11/16
to Confluent Platform
Thanks all, right now it is stateless, so i am going to take a look at flatMap(). Is there an example i can look?

Shannon 

Damian Guy

unread,
Nov 11, 2016, 10:30:41 AM11/11/16
to Confluent Platform

On Fri, 11 Nov 2016 at 15:11 Shannon Ma <shan...@gmail.com> wrote:
Thanks all, right now it is stateless, so i am going to take a look at flatMap(). Is there an example i can look?

Shannon 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/1a269e37-1646-4322-bd2e-c2e1fcc06add%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages