[akka-streams] Source and unfold

377 views
Skip to first unread message

Barys Ilyushonak

unread,
Nov 20, 2015, 10:09:38 AM11/20/15
to Akka User List
Hi there, 

I would like to create a Source, which produce the data from init state while it available, and then terminates in the end. Something like play.api.libs.iteratee.Enumerator.unfold, where Option is used to mark that the value is completely unfolded.  

The use case: load data from database by chunks. 

The questions is: what is the way to do it in akka-streams? 
Is the Source.actorPublisher() and implementation of ActorPublisher the only way to do it in 'proper' way?

Cheers, 
Boris



Tim Harper

unread,
Nov 22, 2015, 3:01:44 PM11/22/15
to Akka User List
I don't fully understand your question, Barys, but I think what you are asking is "I've got data from a database that I'd like to make available via a stream. When the stream is done consuming, I'd like to close this database connection".

You reference play.api.libs.iteratee.Enumerator.unfold, which is a synchronous method to pull data from a result-set. This runs against the mantra of akka-streams; blocking is something you generally tend to avoid. However, if blocking is unavoidable, or if you've already got the result-set in memory, you could just write wrap your source in an iterator, and then do this:

def getResulsAsIterator(/*...*/): Iterator[T]

Source(() => getResultsAsIterator(/* ...*/))

(since iterators are stateful, you must provide a function to generate a new instance of the iterator, since a given akka stream can be run multiple times)

(also, there may be some special thread-pooling behavior that play.api.libs.iteratee.Enumerator.unfold provides of which I'm not aware, that akka-stream doesn't, so my suggestion that a standard iterator is probably incorrect).

If your data is pulled asynchronously, an ActorSource is probably your best bet (although I think a custom GraphStage implementation would work, also, documentation is presently lacking). They're relatively simple to implement and reason about, and quite flexible in what you can do with them.

I recently wanted to stream results asynchronously from Cassandra, and implementing an ActorPublisher worked very well for it (in the case, I don't close the result set on termination / stream abort, because Cassandra does not require you to close your sources. Also, when my actorRef is initialized, the resultSet isn't yet ready, so I use the stash pattern to wait for the resultSet to be available, and then proceed emit the results.)


Tim

Viktor Klang

unread,
Nov 23, 2015, 4:43:17 AM11/23/15
to Akka User List
If you need to stream data from an RDBM, use Slick 3, it has support for Reactive Streams: http://slick.typesafe.com/

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Akka Team

unread,
Nov 23, 2015, 4:49:23 AM11/23/15
to Akka User List
Well, "unfold" has completely reasonable use-cases without blocking. For example a PRNG can be implemented as an unfold. Thankfully, it is easy to implement these in Akka Streams, either by using mapConcat or creating a PushPullStage (http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M1/scala/stream-customize.html#custom-linear-processing-stages).

-Endre
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

Roland Kuhn

unread,
Nov 23, 2015, 5:00:51 AM11/23/15
to akka-user
We shouldn’t need to recommend a custom stage for such a common operation: doesn’t Source(() => iterator) cover this case completely? If there is a gap then we should close it. This approach becomes interestingly powerful when combined with flatMapConcat—turning each input element into a variable length sequence that is in control over its own termination (as opposed to expand() which is rate-controlled).

And of course Viktor is right in pointing towards more specialized tools where applicable.

Regards,

Roland


Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Reactive apps on the JVM.
twitter: @rolandkuhn


Akka Team

unread,
Nov 23, 2015, 5:09:50 AM11/23/15
to Akka User List
On Mon, Nov 23, 2015 at 11:00 AM, Roland Kuhn <goo...@rkuhn.info> wrote:
We shouldn’t need to recommend a custom stage for such a common operation:

This is why I mentioned mapConcat first. If that is not enough, there is PushPullStage.
 
doesn’t Source(() => iterator) cover this case completely?

No, not necessarily. It depends on the actual requirements (i.e. what does unfold mean to you). An unfold can be implemented as mapConcat in 90% of the time (this includes the Source variant as well). For the rest there is PushPullStage available.
 
If there is a gap then we should close it. This approach becomes interestingly powerful when combined with flatMapConcat—turning each input element into a variable length sequence that is in control over its own termination (as opposed to expand() which is rate-controlled).

mapConcat can emit an Iterable (which can be lazy), so I don't see any reason here for a flatMapConcat. My opinion is that we shouldn't recommend nested streams and flattening for such a common operation. I do believe that custom stages are far less dangerous than nested streams.
 

And of course Viktor is right in pointing towards more specialized tools where applicable.

I agree, but I don't see why we shouldn't mention the generic tools that are also available if the built-in ones do not cut it.

-Endre

Viktor Klang

unread,
Nov 23, 2015, 5:19:52 AM11/23/15
to Akka User List
+1 for not recommending users to create custom PushPullStages.

Use the simplest tool that gets the job done. :)

Barys Ilyushonak

unread,
Nov 23, 2015, 4:12:26 PM11/23/15
to Akka User List
Hi there, 

that was really great feedback from community! Thank you all for your help. 
As far as mapConcat handle Iterable in lazy way I think that is the solution in my case (I couldn't get it from docs, but akka.stream.impl.fusing.MapConcat source was really helpful).

This approach becomes interestingly powerful when combined with flatMapConcat—turning each input element into a variable length sequence

I would like to allow user stream the data from database (key-value store) via chunked http response. So the data can be added to the store while serving to the user. I'm going to use the following: Source[Iterator[T], Unit] -> flatMapConcat -> Source[T, Unit]


понедельник, 23 ноября 2015 г., 11:19:52 UTC+1 пользователь √ написал:
Reply all
Reply to author
Forward
0 new messages