def unfold[S,E](s: S)(f: S => Option[(S,E)]): Source[E,Unit] =
Source(() => Iterator.iterate(f(s))(opt => f(opt.get._1)))
.map(_.get._2)
def unfoldM[S,E](s: S)(g: S => Future[Option[(S,E)]])(implicit ec: ExecutionContext): Source[E,Unit] =
Source(() => Iterator.iterate(g(s))(_.flatMap(opt => g(opt.get._1))))
.mapAsync(1)(_.map(_.get._2))
Hi,
I'm thinking of migrating some code I have written with play's iteratees module, to akka-stream.
I'm lacking many useful constructs iteratees have, e.g unfold & unfoldM generators.
Iv'e tried doing something naive at first:
def unfold[S,E](s: S)(f: S => Option[(S,E)]): Source[E,Unit] =
Source(() => Iterator.iterate(f(s))(opt => f(opt.get._1)))
.map(_.get._2)
def unfoldM[S,E](s: S)(g: S => Future[Option[(S,E)]])(implicit ec: ExecutionContext): Source[E,Unit] =
Source(() => Iterator.iterate(g(s))(_.flatMap(opt => g(opt.get._1))))
.mapAsync(1)(_.map(_.get._2))
but it didn't worked very well (don't know why).
so I implemented it using actorPublisher instead.
code can be found at this gist: https://gist.github.com/hochgi/cbe5ffc6cf2915e31091
this seems to work fine, and it wasn't too hard to implement,
so it raises some questions.
1. I may think the implementation is good, but is it? I'm no expert...
2. if it is so trivial, why it's not in the official API? I think it should be part of the API.
especially since I'm probably not the only one migrating iteratees code to akka streams...
thanks,
Gilad.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
scala> import akka.stream.scaladsl._
import akka.stream.scaladsl._
scala> import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializer
scala> import akka.actor.ActorSystem
import akka.actor.ActorSystem
scala> implicit val system = ActorSystem("akka-stream-experiments")
system: akka.actor.ActorSystem = akka://akka-stream-experiments
scala> implicit val materializer = ActorMaterializer()
materializer: akka.stream.ActorMaterializer = ActorMaterializerImpl(akka://akka-stream-experiments,akka.stream.ActorMaterializerSettings@3aab6fda,akka.dispatch.Dispatchers@b75ed3f,Actor[akka://akka-stream-experiments/user/$a#-1004252643],false,0,flow)
scala> def unfold[S,E](s: S)(f: S => Option[(S,E)]): Source[E,Unit] =
| Source(() => Iterator.iterate(f(s))(opt => f(opt.get._1))).map(_.get._2)
unfold: [S, E](s: S)(f: S => Option[(S, E)])akka.stream.scaladsl.Source[E,Unit]
scala> unfold(0->1){
| case (a,b) if a < 10000000 => Some((b->(a+b),a))
| case _ => None
| }
res0: akka.stream.scaladsl.Source[Int,Unit] = akka.stream.scaladsl.Source@404ef48b
scala> res0.runForeach(println)
res1: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@14bc1d08
scala> 0
1
1
2
3
5
8
13
21
34
55
89
144
233
377
610
987
1597
2584
4181
6765
10946
17711
28657
scala> res1.isCompleted
res2: Boolean = true
use take(n)
--
Cheers,
√