akka-stream: How to define a Processor in Java

31 views
Skip to first unread message

Guofeng Zhang

unread,
Apr 13, 2016, 2:58:45 AM4/13/16
to akka...@googlegroups.com
Hi,

I am learning akka-stream, so I want to understand the low-level detail. I found the following post is very useful for my case:

In this post, the processor is defined in Scala as the following:
class DoublingProcessor extends ActorSubscriber with ActorPublisher[BigInteger] { ....}

My question is:
     How to define the above process in Java?

Thanks for your help.

Guofeng

Johan Andrén

unread,
Apr 15, 2016, 2:16:13 PM4/15/16
to Akka User List
Hi Goufeng,

I would recommend you to read up on GraphStage rather than the ActorSubscriber and ActorPublisher as it is a much more straight forward way to create custom stages. You can find a pretty thorough walkthrough of how it works and nice code samples in Java for GraphStage in the docs here: http://doc.akka.io/docs/akka/2.4.4/java/stream/stream-customize.html

--
Johan Andrén
Akka Team, Lightbend Inc.

Guofeng Zhang

unread,
Apr 17, 2016, 1:38:10 AM4/17/16
to akka...@googlegroups.com
Johan,

I read it, but Integration with actors is only a stub part. There is no sample to demo how to do it. Is there any link or akka-stream test cast to demo how to do it in detail?

Another question, the doc says that for a stream there are actors created to run the stream. so if I use akka-stream, shall still need to coding actors for a stream?

Thanks for your reply very much!

Guofeng




--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Johan Andrén

unread,
Apr 17, 2016, 5:58:09 AM4/17/16
to akka...@googlegroups.com
In general you should seldom have to write your own stages as there is a very rich set of built in stages provided. For example if you have a request-response you want to perform with an actor you can use mapAsync to interact with the actor from a stream. Something like this:

val myProcessingActor: ActorRef = ???
val mySource: Source[Thing, NotUsed] = ???

val responses: Source[Response, NotUsed] = 
  mySource.mapAsync(1)(thing => (myProcessingActor ? Request(thing)).mapTo[Response])

responses.runForeach(println)

The first parameter to mapAsync specifies the maximum number of concurrent ongoing requests, if there is only one actor there is no need to allow more than 1 concurrent outstanding request as they will just queue up in the actor mailbox if you do.

When you materialize a stream using run it will be running inside one or more actors, but that is an internal implementation detail and the only part of that you will see is having to provide an ActorSystem to the ActorMaterializer which is needed to materialize a flow.

--
Johan Andrén
Akka Team, Lightbend Inc.


You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v23yQwhDhh8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages