I'm new to scalaz-stream and trying to brainstorm ways to solve my current problem. I think my situation naturally works out to three components:
1. A bounded counter
def episodeID(numEpisodes: Int): stream.Process[Nothing, Int] = stream.Process.range(0, numEpisodes)
2. An unbounded counter that starts back at 0 every time the above counter increments:
def stepID(n: Int): stream.Process[Nothing, Int] = stream.Process(n) ++ stepID(n + 1)
3. Finally, I have an asynchronous `Task` that receives callbacks from another library. I've successfully implemented this standalone piece:
def getAction(subscriberProvider: RabbitMQProvider) : Task[Action] = {
val channel: Channel = subscriberProvider.channel
Task async { callback =>
val consumer = new DefaultConsumer(channel) { // This happens to be making use of the RabbitMQ library
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) : Unit = {
val action = // Get my action
callback(right(action))
}
}
channel.basicConsume(subscriberProvider.channelName, true, consumer)
}
}
Now, my difficulty is in bringing these things together. I have these challenges:
1. I need to combine all three of these so that at each output, I have all three values. zip? wye? tee? Oh my.
2. While getAction is running, stepID increments each time a callback is consumed, until some condition is met and then episodeID increments, stepID goes back to 0, and getAction goes about unboundedly consuming. How is this done? I can't think of how to make `stepID` restart at 0 when `episodeID` increments.
So, I basically have a cascade going from the bottom up that would produce results (episodeID, stepID, action) like:
(0, 0, A0), (0, 1, A1), (0, 2, A2), …
(1, 0, A0), (1, 1, A1), (1, 2, A2), …
…
3. The `getAction` `Task` above is incapable of knowing when it should terminate. It will endlessly wait for its consumer to execute the callback. Something external could tell it when to stop. Maybe `wye.interrupt`? Or perhaps I should frame it around the episodeID choosing when to terminate:
At some point I came up with this very rough pseudo code:
episodeID wye.mergeHaltL (stepID wye.merge getAction)
But, I have the problem that episodeID and stepID don't know what to evaluate to until the callback from getAction has been called, which could occur at any time. Is there any way I could evaluate episodeID (and stepID) as a result of the end of the previous evaluation occurring (the callback in getAction being called)?
I'm sure there are better ways of thinking about this.
Thanks a lot!