How can i create an akka stream from a java Native Queue?

28 views
Skip to first unread message

Bishnu Shankar Pandey

unread,
Nov 6, 2019, 10:14:32 AM11/6/19
to Akka User List
Hi All,

My use case is I want to create an Akka stream in which the source is a java Queue. The stream should keep on the pooling values from the queue and if the queue is empty then wait for the values in the queue. Akka stream queue is another option but if in case of any failure I want to store the values that are there in the queue (I don’t know how to do that with Akka stream Queue). I tried the following:

val source: Source[String, NotUsed] = Source.from(queue)
source.ask(1, actor, classOf[String], 10 seconds).runWith(Sink.ignore(), mat)

I tried setting idleTimeOut and keepAlive properties but they are not working. The stream goes to the Done state if there are no values in the queue.



Regards,

Bishnu

Felix Nensa

unread,
Nov 6, 2019, 10:24:50 AM11/6/19
to akka...@googlegroups.com
You could wrap your queue in a GraphStage, which will give you a lot of flexibility.
There are a lot of good examples (include some with queues) at https://doc.akka.io/docs/akka/current/stream/stream-customize.html#custom-processing-with-graphstage

BR, Felix
> --
> *****************************************************************************************************
> ** New discussion forum: https://discuss.akka.io/ replacing akka-user google-group soon.
> ** This group will soon be put into read-only mode, and replaced by discuss.akka.io
> ** More details: https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
> *****************************************************************************************************
> >>>>>>>>>>
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/akka-user/7b86524e-a5a1-4eca-9a7b-44377c122dff%40googlegroups.com.




Bishnu Shankar Pandey

unread,
Nov 6, 2019, 1:11:46 PM11/6/19
to Akka User List
Hi Felix,
Thank you for the response. I went through the link that you have shared with me. But in the documentation, I saw that the queue is used as a flow in the example. But in my use case, I want to use the queue as an Akka stream source and then send it to an actor. It will be very helpful for me if you will just write a code snippet on how to implement that. 

Regards,
Bishnu
> To unsubscribe from this group and stop receiving emails from it, send an email to akka...@googlegroups.com.
Message has been deleted
Message has been deleted

Felix Nensa

unread,
Nov 7, 2019, 4:44:22 AM11/7/19
to akka...@googlegroups.com
Well, a GraphStage has a shape which you can define. If you define your GraphStage to have an Outlet but no Inlet then it obviously becomes a Source, right?
By Source.fromGraph(..your GraphStage ..) you can convert it into a Source.
Or just look at the „NumbersSource“ example in https://doc.akka.io/docs/akka/current/stream/stream-customize.html#custom-processing-with-graphstage
The name „NumbersSource“ even tells you that this GraphStage is a Source.

Bishnu Shankar Pandey

unread,
Nov 7, 2019, 4:55:00 AM11/7/19
to Akka User List
I saw the Example. I tried implementing it with queue but the issue that I am facing is that if the queue becomes empty fr some time the stream is getting completed. Is there any way to create a continuous stream source and if there if the queue is empty wait till it got any element and then send it to the actor.

Brian Maso

unread,
Nov 11, 2019, 10:36:28 AM11/11/19
to akka...@googlegroups.com
Have you seen Source.queue? Does that not get you what you are describing?

Brian Maso

--
*****************************************************************************************************
** New discussion forum: https://discuss.akka.io/ replacing akka-user google-group soon.
** This group will soon be put into read-only mode, and replaced by discuss.akka.io
** More details: https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
*****************************************************************************************************
>>>>>>>>>>
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/akka-user/544b3e8e-2de7-4f1d-a1e9-4575926d2b03%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages