I am looking for a way to reduce a
Vector[Process[F, A]] into a
Process[F, Vector[A]] where each of the component processes is run in lock-step. Does this already exist?
If not, I submit the following as a starting point for such a function. It would probably live on
scalaz.stream.tee.
/**
* Reduces a collection of processes into a single process using Tee.zipWith along with the provided reducer.
* @param ps the collection of processes to reduce
* @param r the reducer to zip with
*/
def reduceTeeZip[F[_] : Foldable, G[_], A, B](ps: OneAnd[F, Process[G, A]], r: Reducer[A, B]): Process[G, B] = {
val b: Process[G, B] = ps.head.map(r.unit)
Foldable[F].foldLeft[Process[G, A], Process[G, B]](ps.tail, b) { case (acc, next) =>
acc.zipWith(next)(r.snoc)
}
}
This is the most general function I was able to think of that can solve my use case. As for my specific function type, the issue is an empty vector has three choises and I'm unsure which is best.
- Fail
- Emit an endless stream of Vector.empty
- Emit a single Vector.empty and end