[scalaz-stream] Merging bounded and unbounded counters with async callbacks and terminating properly

32 views
Skip to first unread message

Xander Dunn

unread,
Oct 13, 2015, 1:28:31 AM10/13/15
to scalaz
I'm new to scalaz-stream and trying to brainstorm ways to solve my current problem.  I think my situation naturally works out to three components:

1. A bounded counter
def episodeID(numEpisodes: Int): stream.Process[Nothing, Int] = stream.Process.range(0, numEpisodes)

2. An unbounded counter that starts back at 0 every time the above counter increments:
def stepID(n: Int): stream.Process[Nothing, Int] = stream.Process(n) ++ stepID(n + 1)


3. Finally, I have an asynchronous `Task` that receives callbacks from another library.  I've successfully implemented this standalone piece:

  def getAction(subscriberProvider: RabbitMQProvider) : Task[Action] = {
    val channel
: Channel = subscriberProvider.channel
   
Task async { callback =>
      val consumer
= new DefaultConsumer(channel) { // This happens to be making use of the RabbitMQ library
       
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) : Unit = {
          val action
= // Get my action
          callback
(right(action))
       
}
     
}
      channel
.basicConsume(subscriberProvider.channelName, true, consumer)
   
}
 
}


Now, my difficulty is in bringing these things together.  I have these challenges:
1. I need to combine all three of these so that at each output, I have all three values.  zip?  wye?  tee? Oh my.  
2. While getAction is running, stepID increments each time a callback is consumed, until some condition is met and then episodeID increments, stepID goes back to 0, and getAction goes about unboundedly consuming.  How is this done?  I can't think of how to make `stepID` restart at 0 when `episodeID` increments.

So, I basically have a cascade going from the bottom up that would produce results (episodeID, stepID, action) like:
(0, 0, A0), (0, 1, A1), (0, 2, A2), …
(1, 0, A0), (1, 1, A1), (1, 2, A2), …

3. The `getAction` `Task` above is incapable of knowing when it should terminate.  It will endlessly wait for its consumer to execute the callback.  Something external could tell it when to stop.  Maybe `wye.interrupt`?  Or perhaps I should frame it around the episodeID choosing when to terminate:

At some point I came up with this very rough pseudo code:
episodeID wye.mergeHaltL (stepID wye.merge getAction)

But, I have the problem that episodeID and stepID don't know what to evaluate to until the callback from getAction has been called, which could occur at any time.  Is there any way I could evaluate episodeID (and stepID) as a result of the end of the previous evaluation occurring (the callback in getAction being called)?

I'm sure there are better ways of thinking about this.

Thanks a lot!

Xander Dunn

unread,
Oct 14, 2015, 3:51:02 PM10/14/15
to scalaz
Ignoring all the specifics that I gave above, I believe my difficulty reduces to this: 

I am able to successfully represent each individual piece of information I have as a Process.  In bringing them together, my challenge is that I need to create a cycle.  In essence: The output of Process A is the input of Process B whose output is the input of Process A.  I found a StackOverflow question on circular scalaz-streams and a post here about it.  Paul Chiusano suggests either thinking about it in a different way or dumping the output to a Queue.  I'll look into Queues and see if I can solve it that way.  
Reply all
Reply to author
Forward
0 new messages