Accessing SourceQueue or ActorRef inside Graph for publishing elements

110 views
Skip to first unread message

Muki

unread,
Nov 22, 2015, 10:49:14 AM11/22/15
to Akka User List
Hi,

I have a stream pipeline like this:

progress queue    +---------> mapping  +---------+
                                                 |
   ^    ^    ^                                   |
   |    |    |                                   v
   |    |    +------------------------+          +-- merge +--->
   |    |                             |          ^
   |    +---------+                   |          |
   +              +                   +          |
data  +--------> stage1   +--------> stage2 +----+


  • data is a single source that generates the data to be processed
  • progress queue is a Source.queue[Progress](10, OverflowStrategy.dropHead) where the progress should be emitted
  • stage1 and stage2 are emitting elements to the next stage and also the the progress queue

So these are actually two different data pipelines, which are only connected by the progress queue and the merge in the end.
The initial implementation just sent messages to a progress actor, which then send it over a play-websocket. Now I want to do this
with akka-http and the websocket describes as a Flow[Message, Message].

I read "Accessing the materialized value inside the Graph", but couldn't figure out how to implement this kind of pattern.

Thanks in advance,
Muki

Akka Team

unread,
Nov 23, 2015, 6:33:28 AM11/23/15
to Akka User List
Hi Muki,



That will not likely to help you, as that let's you access the materialized value as a value of a Source. What you will need actually is to model your progress emitting stages as GraphStages instead. They will let you to expose a second output port for the only purpose of sending progress information. Those are available in M1 but not really documented unfortunately. Also, that will mean that your stage1 and stage2 will no longer be Flows but three-port stages (which means that you would need to use the graph DSL to connect them).

-Endre
 

Thanks in advance,
Muki

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

Muki

unread,
Nov 29, 2015, 8:10:24 AM11/29/15
to Akka User List
Hi Endre,

Thanks for the hint with GraphStage. I tried to use the code from UnzipWith2 and adjust it to my needs, but the API was not really clear for me.
When should I call pull and what effects does it have? I wasn't able to create a queue inside the GraphStage (e.g. like a Source that gets flattened into the progress output).

In the end I had the feeling to build a Broadcast stage. E.g.

                                    +---> map to progress +--->  progress sink
                                    |
                                    +
data  +----> stage1 +----> broadcast
                                    +
                                    |
                                    +------------------------------------------------> stage2


which only works for 1-progress-output per stage. However I would like to be able to ouput more then one progress in one stage. Eg.

override def onPush(elem: TrajectoryBoxes, ctx: Context[RotatedTrajectoryBoxes]): SyncDirective = {
   val angle = calculateAngle(elem)
   progress(25, "angles calculated")
   val rotated = rotate(elem, angle)
   progress(100, "boxes rotated")
   ctx.push(rotated)
}


Endre Varga

unread,
Nov 30, 2015, 5:31:14 AM11/30/15
to akka...@googlegroups.com
Hi,


On Sun, Nov 29, 2015 at 2:10 PM, Muki <nepomuk...@gmail.com> wrote:
Hi Endre,

Thanks for the hint with GraphStage. I tried to use the code from UnzipWith2 and adjust it to my needs, but the API was not really clear for me.
When should I call pull and what effects does it have?

This is backpressured processing. If you want an element, you have to request it, otherwise you get nothing. Pulling is requesting the next element. Pushing is delivering the next element.

 
I wasn't able to create a queue inside the GraphStage (e.g. like a Source that gets flattened into the progress output).

I am confused here totall. What are you trying to flatten? How does this relate to having a monitor output?

The idea of the GraphStage is to have a stage with one input and two output ports, one of them to be connected to the progress tracker. 
 

In the end I had the feeling to build a Broadcast stage. E.g.

                                    +---> map to progress +--->  progress sink
                                    |
                                    +
data  +----> stage1 +----> broadcast
                                    +
                                    |
                                    +------------------------------------------------> stage2


which only works for 1-progress-output per stage. However I would like to be able to ouput more then one progress in one stage. Eg.

override def onPush(elem: TrajectoryBoxes, ctx: Context[RotatedTrajectoryBoxes]): SyncDirective = {
   val angle = calculateAngle(elem)
   progress(25, "angles calculated")
   val rotated = rotate(elem, angle)
   progress(100, "boxes rotated")
   ctx.push(rotated)
}



The above code is not a GraphStage but a PushPullStage. We have no documentation yet for GraphStage though, but M2 will arrive soon and have it.

-Endre

薛永飞

unread,
Feb 12, 2018, 8:44:09 AM2/12/18
to Akka User List
use zip method
Reply all
Reply to author
Forward
0 new messages