ActorPublisher GraphStage alternative?

498 views
Skip to first unread message

Curt Siffert

unread,
May 26, 2017, 4:34:04 PM5/26/17
to Akka User List

Hi, I see in the docs for 2.5.2 that ActorPublisher/ActorSubscriber will be deprecated.

In my (still beginning) experiments with akka streams I used ActorPublisher as a way to help create some back pressure controls while consuming messages from an external queue. This worked just by consuming the queue like normal and then for each message consumed, sending a message to ActorPublisher.

Without using ActorPublisher, I can use a Source.actorRef, but that doesn't have back pressure controls.

I know the recommended alternative to ActorPublisher is to use a custom graph stage and I have started experimenting with that but so far I don't see how to meet the ActorPublisher use case with it. So far it doesn't seem like a custom Source has an ActorRef type signature like Source.actorRef does. Once the custom stage is created, can I send a "tell" message to it the way I did to ActorPublisher? Or am I supposed to use Source.actorRef and then funnel it through the custom stage to get the back pressure controls?

Sorry if my question is muddled, I am still making my way through this. :-) I recognize this is a bit weird since ideally the back pressure controls would be applied to the queueing tech itself.

Curt

Richard Rodseth

unread,
May 27, 2017, 10:29:48 AM5/27/17
to akka...@googlegroups.com

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Patrik Nordwall

unread,
May 28, 2017, 6:20:10 AM5/28/17
to akka...@googlegroups.com
Can't you use Source.queue? Backpressure can be maintained by piping the result of the future back to an ordinary actor.

/Patrik
lör 27 maj 2017 kl. 16:29 skrev Richard Rodseth <rrod...@gmail.com>:
On Fri, May 26, 2017 at 1:34 PM, Curt Siffert <cu...@keenworks.com> wrote:

Hi, I see in the docs for 2.5.2 that ActorPublisher/ActorSubscriber will be deprecated.

In my (still beginning) experiments with akka streams I used ActorPublisher as a way to help create some back pressure controls while consuming messages from an external queue. This worked just by consuming the queue like normal and then for each message consumed, sending a message to ActorPublisher.

Without using ActorPublisher, I can use a Source.actorRef, but that doesn't have back pressure controls.

I know the recommended alternative to ActorPublisher is to use a custom graph stage and I have started experimenting with that but so far I don't see how to meet the ActorPublisher use case with it. So far it doesn't seem like a custom Source has an ActorRef type signature like Source.actorRef does. Once the custom stage is created, can I send a "tell" message to it the way I did to ActorPublisher? Or am I supposed to use Source.actorRef and then funnel it through the custom stage to get the back pressure controls?

Sorry if my question is muddled, I am still making my way through this. :-) I recognize this is a bit weird since ideally the back pressure controls would be applied to the queueing tech itself.

Curt

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

Curt Siffert

unread,
Jun 3, 2017, 12:16:05 AM6/3/17
to akka...@googlegroups.com
I made some progress figuring out how to use getStageActor (not getStageActorRef - the docs have an error) - the stage’s preStart() registers the stage’s actorRef with an outside actor. Messages forwarded to that actorRef (via the outside actor) will get sent to a callback function inside the stage.

What I’m doing is using the callback to add items to a stage’s internal queue, and then using onPull to dequeue that queue if there are items in the queue.

The part that is confusing to me is that when applied to a Source, I haven’t figured out how to get onPull to trigger from that callback. onPull only seems to fire right after materialization, when the queue is still empty, and then it doesn’t pull again after receiving messages and queuing, even though I have a stream wired up to process what what the Source emits. 

ActorPublisher was originally written to suggest that after creating the Source, you could then send messages to the actor in a periodic fashion and the stream would be able to process those messages. But so far I haven’t been able to duplicate that behavior. It’s as if the only way to use getStageActor is if its GraphStage is a FlowShape.

In that case, I suppose I could use Source.queue but it seems like I’m missing something. Is it not possible to use getStageActor on a Source? If anyone has code samples of getStageActor on a Source, I’d love to see it.

Thanks,
Curt


You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/UyHSlnRkcpM/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Curt Siffert

unread,
Jun 4, 2017, 3:12:20 PM6/4/17
to akka...@googlegroups.com
I believe I have figured this out thanks to finding a getStageActor example in the reactive-kafka project. The insight I missed is that onPull() is only called sometimes (I assume when consumer experiences back pressure and signals readiness by pulling), and that push() should also be called from the callback function. I will link to a code sample for others looking for a smaller getStageActor example after I clean it up a bit.

Curt

Curt Siffert

unread,
Jun 4, 2017, 6:32:13 PM6/4/17
to akka...@googlegroups.com
Here’s a code sample showing a possible usage of getStageActor.

Reply all
Reply to author
Forward
0 new messages