[akka-stream] unfold / unfoldM

214 views
Skip to first unread message

Gilad Hoch

unread,
Nov 26, 2015, 8:03:10 AM11/26/15
to Akka User List
Hi,

I'm thinking of migrating some code I have written with play's iteratees module, to akka-stream.
I'm lacking many useful constructs iteratees have, e.g unfold & unfoldM generators.

Iv'e tried doing something naive at first:

def unfold[S,E](s: S)(f: S => Option[(S,E)]): Source[E,Unit] =
   
Source(() => Iterator.iterate(f(s))(opt => f(opt.get._1)))
     
.map(_.get._2)

def unfoldM[S,E](s: S)(g: S => Future[Option[(S,E)]])(implicit ec: ExecutionContext): Source[E,Unit] =
   
Source(() => Iterator.iterate(g(s))(_.flatMap(opt => g(opt.get._1))))
     
.mapAsync(1)(_.map(_.get._2))

but it didn't worked very well (don't know why).
so I implemented it using actorPublisher instead.

code can be found at this gist: https://gist.github.com/hochgi/cbe5ffc6cf2915e31091

this seems to work fine, and it wasn't too hard to implement,
so it raises some questions.

1. I may think the implementation is good, but is it? I'm no expert...
2. if it is so trivial, why it's not in the official API? I think it should be part of the API. especially since I'm probably not the only one migrating iteratees code to akka streams...

thanks,
Gilad.

Akka Team

unread,
Nov 26, 2015, 8:15:05 AM11/26/15
to Akka User List
Hi Gilad,

On Thu, Nov 26, 2015 at 1:56 PM, Gilad Hoch <gila...@gmail.com> wrote:
Hi,

I'm thinking of migrating some code I have written with play's iteratees module, to akka-stream.
I'm lacking many useful constructs iteratees have, e.g unfold & unfoldM generators.

Iv'e tried doing something naive at first:

def unfold[S,E](s: S)(f: S => Option[(S,E)]): Source[E,Unit] =
   
Source(() => Iterator.iterate(f(s))(opt => f(opt.get._1)))
     
.map(_.get._2)

def unfoldM[S,E](s: S)(g: S => Future[Option[(S,E)]])(implicit ec: ExecutionContext): Source[E,Unit] =
   
Source(() => Iterator.iterate(g(s))(_.flatMap(opt => g(opt.get._1))))
     
.mapAsync(1)(_.map(_.get._2))

but it didn't worked very well (don't know why).

Can you give us a testcase that fails with the above? There should be no problem implementing these signatures (apart from that your above code does not handle the Option and therefore does not stop).
 
so I implemented it using actorPublisher instead.

code can be found at this gist: https://gist.github.com/hochgi/cbe5ffc6cf2915e31091

this seems to work fine, and it wasn't too hard to implement,
so it raises some questions.

Hmm, for these tasks, if the build in operations are not good enough you are better of creating a custom stage: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M1/scala/stream-customize.html#custom-linear-processing-stages

That does not work yet with asynchronous processing (futures in your case) but that will be possible with the new GraphStages which I am documenting right now.
 

1. I may think the implementation is good, but is it? I'm no expert...
2. if it is so trivial, why it's not in the official API? I think it should be part of the API.

Because
 - we can't come up with all the possible combinators people might ever need. Open a ticket if you feel something is missing, and then we consider it for inclusion
 - we are very conservative about adding combinators to avoid building a kitchen sink. I understand it feels better to have a one-liner for everything, but Akka Streams is very extensible (more and more every day) and in the long term that is what matters because no framework can provide all the operators that people will ever need. That said, unfold is one nice addition, so please open a ticket.

-Endre
 
especially since I'm probably not the only one migrating iteratees code to akka streams...

thanks,
Gilad.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

Gilad Hoch

unread,
Nov 26, 2015, 9:21:51 AM11/26/15
to Akka User List
thanks Endre,

a simple example to show it does not work as expected,
generating a fibonacci stream for all fib numbers under 10M:

scala> import akka.stream.scaladsl._
import akka.stream.scaladsl._

scala> import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializer

scala> import akka.actor.ActorSystem
import akka.actor.ActorSystem

scala> implicit val system = ActorSystem("akka-stream-experiments")
system: akka.actor.ActorSystem = akka://akka-stream-experiments

scala> implicit val materializer = ActorMaterializer()
materializer: akka.stream.ActorMaterializer = ActorMaterializerImpl(akka://akka-stream-experiments,akka.stream.ActorMaterializerSettings@3aab6fda,akka.dispatch.Dispatchers@b75ed3f,Actor[akka://akka-stream-experiments/user/$a#-1004252643],false,0,flow)

scala> def unfold[S,E](s: S)(f: S => Option[(S,E)]): Source[E,Unit] =
     |     Source(() => Iterator.iterate(f(s))(opt => f(opt.get._1))).map(_.get._2)
unfold: [S, E](s: S)(f: S => Option[(S, E)])akka.stream.scaladsl.Source[E,Unit]

scala> unfold(0->1){
     | case (a,b) if a < 10000000 => Some((b->(a+b),a))
     | case _ => None
     | }
res0: akka.stream.scaladsl.Source[Int,Unit] = akka.stream.scaladsl.Source@404ef48b

scala> res0.runForeach(println)
res1: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@14bc1d08

scala> 0
1
1
2
3
5
8
13
21
34
55
89
144
233
377
610
987
1597
2584
4181
6765
10946
17711
28657


scala> res1.isCompleted
res2: Boolean = true

so, as you see, not all the numbers are printed.
if I use the unfold implementation from the gist, it works as expected.

I guess it has to do with the way I stop. I'm mapping the option to a get call,
so I guess akka stream doesn't like exceptions thrown from within a map call.

Gilad.

P.S. thanks, I'll open a ticket now :)

Endre Varga

unread,
Nov 26, 2015, 9:23:00 AM11/26/15
to akka...@googlegroups.com
Hi Gilad,

Exceptions from a stream stage will result in a failure of the overall stream.

-Endre

Gilad Hoch

unread,
Nov 26, 2015, 9:34:53 AM11/26/15
to Akka User List


OK,
so how is it possible to generate a finite stream?
if I can't cut the iterator, what can I do?

BTW, issue is opened:
https://github.com/akka/akka/issues/19021

Gilad.

Akka Team

unread,
Nov 26, 2015, 10:01:14 AM11/26/15
to Akka User List
Why don't you just create an Iterator subclass yourself? You only need to implement a next() and hasNext() method. Alternatively, you can create a custom stage: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M1/scala/stream-customize.html#Custom_graph_processing_junctions

-Endre

Viktor Klang

unread,
Nov 26, 2015, 10:41:51 AM11/26/15
to Akka User List

use take(n)
--
Cheers,

Gilad Hoch

unread,
Nov 26, 2015, 10:50:53 AM11/26/15
to Akka User List
iterator + take(n) is not equivalent to unfold.
for instance, in my application, I'm using unfold to convert Elasticsearch's scroll to a play Enumerator.

thanks,
Gilad.

Barys Ilyushonak

unread,
Nov 27, 2015, 3:30:14 AM11/27/15
to Akka User List
Gilad, thank you for pointing that out. 

the one different between Play.Enumerator unfoldM and Iterator - the state is Future in the first case. That allows determine the next element or final event eventually. 
Is it possible to implement similar behaviour via Source API?

Cheers, 
Boris

Akka Team

unread,
Nov 27, 2015, 3:34:51 AM11/27/15
to Akka User List
Hi Barys,

With the new GraphStage API it is possible. Wait for M2 for new documentation (the GraphStage is already there in M1)

-Endre

Gilad Hoch

unread,
Dec 9, 2015, 10:55:38 AM12/9/15
to Akka User List
Hi,

Iv'e implemented it the "right way" this time,
and would like to submit a pull request with the code.

the code I came up with can be found in this gist (feedback would be great):
https://gist.github.com/hochgi/927d1ceab88c55fbb0f9

So, how do I submit the PR?
(which branch? where do I put the code? where are the tests so I can add tests for it?)

Thanks,
Gilad.
Reply all
Reply to author
Forward
0 new messages