lass FibonacciPublisher extends ActorPublisher[Long] with ActorLogging {
private val queue = Queue[Long](0, 1)
def receive = {
case Request(_) => // _ is the demand
log.debug("Received request; demand = {}.", totalDemand)
publish
case Cancel =>
log.info("Stopping.")
context.stop(self)
case unknown => log.warning("Received unknown event: {}.", unknown)
}
final def publish = {
while (isActive && totalDemand > 0) {
val next = queue.head
queue += (queue.dequeue + queue.head)
log.debug("Producing fibonacci number: {}.", next)
onNext(next)
if (next > 5000) self ! Cancel
}
}
}
class FibonacciSubscriber extends ActorSubscriber with ActorLogging {
val requestStrategy = WatermarkRequestStrategy(20)
def receive = {
case OnNext(fib: Long) =>
log.debug("Received Fibonacci number: {}", fib)
if (fib > 5000) self ! OnComplete
case OnError(ex: Exception) =>
log.error(ex, ex.getMessage)
self ! OnComplete
case OnComplete =>
log.info("Fibonacci stream completed.")
context.stop(self)
case unknown => log.warning("Received unknown event: {}.", unknown)
}
}
val src = Source.actorPublisher(Props[FibonacciPublisher])
val flow = Flow[Long].map { _ * 2 }
val sink = Sink.actorSubscriber(Props[FibonacciSubscriber])
src.via(flow).runWith(sink)
2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 4.
2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 0.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 2.
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 0
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 2.
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 4
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 3.
2015-10-03 23:10:49.125 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 5.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Hi Abhijit,perhaps it might make sense to test drive the car a little bit before popping the hood and fiddling with the engine: formulate a few flows without ActorPublisher and ActorSubscriber and get a feel for how Akka Streams really is about declarative data flow. Interfacing with regular Actors means having to learn lots about the underlying mechanics—and most of the time you won’t need that.Note to self: we need to de-emphasize ActorPublisher/Subscriber a lot and tell people that that is a last resort kind of special low-level tool.
there is no reference to PowerPoint in my email, what are you referring to?
Agreed, they don't come with the proper power level labelling at the moment.
If on the other hand you want to dive deep into Streams...
...
akka.stream.materializer.max-input-buffer-size= 16
.>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
akka.stream.materializer.max-input-buffer-size?