case class Message[T](id: Long, body: T)
trait Queue {
def acknowledge(id: Long): Unit
}
type Handler[T] = Flow[Message[T], Try[_], _]
type AckSink = Sink[(Message[_], Try[_]), Future[_]]
// A sink that acknowledges messages upon successful processing
def ackSink(queue: Queue) =
Sink.foreach[(Message[_], Try[_])] {
case (msg, result) => result match {
case Success(_) => queue.acknowledge(msg.id) case Failure(t) => {
// Do something on failure
println(t)
}
}
}
// The flow that wraps the handler and acknowledger sink
def handleAndAckSink[T](handler: Handler[T], ackSink: AckSink) =
Sink(handler, ackSink, Broadcast[Message[T]](2), Zip[Message[T], Try[_]])((_, mat, _, _) => mat) {
implicit b => (handler, ackSink, bcast, zip) =>
bcast ~> zip.in0
bcast ~> handler ~> zip.in1
zip.out ~> ackSink
}
class AckSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures {
def testSource(n: Int) = Source((0 to n)).map(n => Message(n, s"message $n"))
val testQueue = new Queue {
def acknowledge(id: Long) = println(s"acknowledging message $id")
}
val testHandler = Flow[Message[String]].map { msg =>
// Randomly fail
if (Random.nextBoolean) Failure(new Exception(s"failure processing message $msg"))
else Success(s"success processing message $msg")
}
"Acknowledge" should "ack messages" in {
val future = testSource(10).runWith(handleAndAckSink(testHandler, ackSink(testQueue)))
whenReady(future) { result =>
}
}
}