class ActorPubSub extends ActorSubscriber with ActorPublisher[Int] {
var events = Seq.empty[Int]
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
override def receive: Actor.Receive = {
case OnNext(e: Int) => events = e +: events
case Request(cnt) => events.take(cnt.toInt).foreach(onNext)
}
}
val pubsubRef = system.actorOf(Props(new ActorPubSub))
val pub = ActorPublisher[Int](pubsubRef)
val sub = ActorSubscriber[Int](pubsubRef)
val pubsubFlow = Flow(Sink(sub), Source(pub))FlowGraph { implicit b =>
import akka.stream.scaladsl.FlowGraphImplicits._
Source((1 to 10).toList) ~> pubsubFlow ~> Sink.foreach[Int](e =>
println("Got a number " + e)
)
}.run()
According to Flow.apply(Sink, Source) doc:
Create a Flow from a seemingly disconnected Source and Sink pair.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
val in = Flow[Int].to(Sink(ActorSubscriber[Int](pubsubRef)))
val out = Source(ActorPublisher[Int](pubsubRef))
Flow.wrap(in, out)(Keep.none)
Enter code here...
case Request(cnt) =>
if (events.nonEmpty)
events.take(cnt.toInt).foreach(onNext)
else
request(cnt)