How is Initial Demand Generated in this Example?

Visto 115 veces
Saltar al primer mensaje no leído

Abhijit Sarkar

no leída,
5 oct 2015, 1:22:215/10/15
a Akka User List
I'm cutting my teeth on Akka streams and did a fibonacci publisher-subscriber example as follows. However, I don't quite understand yet how the demand is initially generated and what relation it has with the subscriber's request strategy. Can someone please explain?

Full Disclosure: I'd posted this question on SO but didn't received an answer.

FibonacciPublisher:

lass FibonacciPublisher extends ActorPublisher[Long] with ActorLogging {
  private val queue = Queue[Long](0, 1)

  def receive = {
    case Request(_) => // _ is the demand
      log.debug("Received request; demand = {}.", totalDemand)
      publish
    case Cancel =>
      log.info("Stopping.")
      context.stop(self)
    case unknown => log.warning("Received unknown event: {}.", unknown)
  }

  final def publish = {
    while (isActive && totalDemand > 0) {
      val next = queue.head
      queue += (queue.dequeue + queue.head)

      log.debug("Producing fibonacci number: {}.", next)

      onNext(next)

      if (next > 5000) self ! Cancel
    }
  }
}

FibonacciSubscriber:

class FibonacciSubscriber extends ActorSubscriber with ActorLogging {
  val requestStrategy = WatermarkRequestStrategy(20)

  def receive = {
    case OnNext(fib: Long) =>
      log.debug("Received Fibonacci number: {}", fib)

      if (fib > 5000) self ! OnComplete
    case OnError(ex: Exception) =>
      log.error(ex, ex.getMessage)
      self ! OnComplete
    case OnComplete =>
      log.info("Fibonacci stream completed.")
      context.stop(self)
    case unknown => log.warning("Received unknown event: {}.", unknown)
  }
}

Fibonacci App:

val src = Source.actorPublisher(Props[FibonacciPublisher])
val flow = Flow[Long].map { _ * 2 }
val sink = Sink.actorSubscriber(Props[FibonacciSubscriber])

src.via(flow).runWith(sink)

Sample run: Question: Where did the initial demand for 4 come from?

2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 4.
2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 0.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 2.
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 0
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 2.
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 4
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 3.
2015-10-03 23:10:49.125 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 5.


Roland Kuhn

no leída,
5 oct 2015, 2:49:355/10/15
a akka-user
Hi Abhijit,

perhaps it might make sense to test drive the car a little bit before popping the hood and fiddling with the engine: formulate a few flows without ActorPublisher and ActorSubscriber and get a feel for how Akka Streams really is about declarative data flow. Interfacing with regular Actors means having to learn lots about the underlying mechanics—and most of the time you won’t need that.

Note to self: we need to de-emphasize ActorPublisher/Subscriber a lot and tell people that that is a last resort kind of special low-level tool.

Regards,

Roland

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Reactive apps on the JVM.
twitter: @rolandkuhn


Viktor Klang

no leída,
5 oct 2015, 2:59:465/10/15
a Akka User List
On Mon, Oct 5, 2015 at 8:49 AM, Roland Kuhn <goo...@rkuhn.info> wrote:
Hi Abhijit,

perhaps it might make sense to test drive the car a little bit before popping the hood and fiddling with the engine: formulate a few flows without ActorPublisher and ActorSubscriber and get a feel for how Akka Streams really is about declarative data flow. Interfacing with regular Actors means having to learn lots about the underlying mechanics—and most of the time you won’t need that.

Note to self: we need to de-emphasize ActorPublisher/Subscriber a lot and tell people that that is a last resort kind of special low-level tool.

Agreed, they don't come with the proper power level labelling at the moment.



--
Cheers,

Abhijit Sarkar

no leída,
5 oct 2015, 3:05:485/10/15
a Akka User List
I don't understand the reference to Power Point and I disagree with you regarding the test drive example. The internet is full of such "test drive" examples with people doubling integers using Akka streams. If I really wanted to do that, I'd not need a new library.
Since you didn't answer my question I request that you direct me to any source of information that's relevant to my question, which is how the initial demand is generated.

Regards,
Abhijit

Roland Kuhn

no leída,
5 oct 2015, 3:13:475/10/15
a akka-user
Hi Abhijit,

there is no reference to PowerPoint in my email, what are you referring to?

Your question will be answered by reading these docs. What I meant to convey is that the internal handling of demand generation is an implementation detail that almost all Akka Streams usage should not care about, the whole point of the library is to solve this problem on your behalf to free up your time for more interesting things. If on the other hand you want to dive deep into Streams, please do read all the documentation (including the design goals and the cookbook).

Regards,

Roland

Abhijit Sarkar

no leída,
5 oct 2015, 3:54:045/10/15
a Akka User List

there is no reference to PowerPoint in my email, what are you referring to?

I was referring to the following statement from your previous email:


Agreed, they don't come with the proper power level labelling at the moment.


If on the other hand you want to dive deep into Streams...
 
I do want to understand the nuts and bolts. I read the design concepts behind Akka Streams and other than backpressure, it didn't feel very unfamiliar. Cascading for example, that builds on Hadoop, not unlike Akka Streams builds on Akka actors, has remarkably similar design, including the plumbing and data flow analogy.   

Regards

...

Abhijit Sarkar

no leída,
8 oct 2015, 23:09:258/10/15
a Akka User List
I read Buffers and working with flows and it doesn't answer the following questions. While playing with my fibonacci example above, instead of one subscriber, I attached 2, and the initial demand became 16 (from 4). I see that akka.stream.materializer.initial-input-buffer-size = 4 andakka.stream.materializer.max-input-buffer-size= 16.
  1. When the demand is more than 4 but less than 16, what happens?
  2. What happens when the demand is more than 16? 

Konrad Malawski

no leída,
13 oct 2015, 17:37:2913/10/15
a Akka User List
Hi Abhijit,
truth is, you should never depend on actual demand numbers for any kind of correctness of your algorithm.
Stages (i.e. fan-in, fan-out, linear etc) can contain arbitrary request strategies and by itself (and by design) it's not a good idea to try to depend on actual demand numbers.

Digging into specific examples takes time and digging around impls, and is not something one should be doing in general anyway - demand is generated such as is deemed safe by the downstreams - that's the only assumption one should be coding to when using streams - not actual counts of demand.

>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,
Konrad 'ktoso' Malawski

Abhijit Sarkar

no leída,
14 oct 2015, 22:39:0614/10/15
a Akka User List
Hi Konrad,
I'm not looking at these configurations in order to use them in my program. I'm looking at them to understand what they're used for. I assume they serve some purpose otherwise they wouldn't exist. Specifically, what is the significance of akka.stream.materializer.initial-input-buffer-size and  andakka.stream.materializer.max-input-buffer-size?

Regards,
Abhijit
Responder a todos
Responder al autor
Reenviar
0 mensajes nuevos