Acknowledging messages upon work completion?

158 views
Skip to first unread message

Andrew Rollins

unread,
Aug 20, 2015, 12:20:28 AM8/20/15
to Akka User List
Is there an idiomatic way handle queues with Akka Streas that need to acknowledge messages after work is done for a given message?

This started from a thread on Twitter with Victor (https://twitter.com/viktorklang/status/634117117978107904), but it's more appropriate to continue here. His last comment was "Sounds like you want Flow[T, Ack[T]] such that you can close the loop at the end."

I'm going to show my interpretation of that suggestion and voice my concern with it. I'd love feedback.

Assume we have an external queue service that provides messages through a "getMessage" API. Messages can be acknowledged by calling "ack(messageId)". After acknowledgement, a message is taken off the queue and won't be delivered again.

I'm not exactly sure how a Flow[T, Ack[T]] helps, because how is the Ack being created from an arbitrary T? We need the original message identifier to be passed through the stream such that we can acknowledge the message, so we would need a flow along the lines of Flow[Msg, Ack]. In code, it could look something like this:

  trait Msg { def msgId } // incoming queue message
  trait Ack               // ack result type

  class FakeQueue {
    def receive : Msg = ???
    def ack(m: Msg) : Ack = ???
  }

  val queue = new FakeQueue

  val msgSource: Source[Msg] =
    Source.apply(() => Iterator.iterate[Msg](None)(_ => queue.receive))

  val flow: Flow[Msg, Ack, Unit] = 
    Flow[(Msg, String)].
      map {msg => (deserialize(msg.body), msg) }.
      map {case (x, msg) => (doWork(x), msg)}.
      map {case (y, msg) => insertIntoDatabase(y); msg}.
      map {msg => queue.ack(Msg)}

This seems ok and certainly works, but I have a hangup with this. All my intermediate steps need to passthrough the message to the end, but they individually don't care about the message. Those stages are coupled with some data they don't ultimately handle. I'd like to avoid that.

In other words, in some hand-wavy sense, there is a desire to take a Flow such as Flow[In, Out] which is queue agnostic and then wrap that flow with something that will dequeue messages from a queue, push the "In" object to the interflow, and somehow pass along the outer message such that the message tied to an "In" pops out the other end with the associated "Out".

I'm at a loss for how to do this. Perhaps I'm looking at it wrong to begin with. I'm hoping someone else can provide guidance.

Lance Arlaus

unread,
Aug 20, 2015, 12:31:50 PM8/20/15
to Akka User List
Andrew-

How about using a simple branching flow with a broadcast and zip?

The first branch carries the message to the end of the pipeline where the acknowledger receives both the message and the result of processing as a Tuple. It can then decide whether/how to acknowledge the message.
The second branch contains the desired business logic and produces a Try (or other monadic data type) as its result.
This way, the processing logic has no need to pass along the message. In fact, you could extract and deal with just the payload itself in that second branch if you'd like.
Since the zip waits for elements from both branches before emitting, you'll have matched (Message, Try) tuples.


And the relevant code (the handleAndAckSink method is the key):

case class Message[T](id: Long, body: T)

trait Queue {
  def acknowledge(id: Long): Unit
}


type Handler[T] = Flow[Message[T], Try[_], _]
type AckSink = Sink[(Message[_], Try[_]), Future[_]]

// A sink that acknowledges messages upon successful processing
def ackSink(queue: Queue) =
  Sink.foreach[(Message[_], Try[_])] {
    case (msg, result) => result match {
      case Success(_) => queue.acknowledge(msg.id)
      case Failure(t) => {
        // Do something on failure
        println(t)
      }
    }
  }

// The flow that wraps the handler and acknowledger sink
def handleAndAckSink[T](handler: Handler[T], ackSink: AckSink) = 
  Sink(handler, ackSink, Broadcast[Message[T]](2), Zip[Message[T], Try[_]])((_, mat, _, _) => mat) {
    implicit b => (handler, ackSink, bcast, zip) =>

    bcast            ~> zip.in0
    bcast ~> handler ~> zip.in1
                        zip.out ~> ackSink

    (bcast.in)
  }


class AckSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures {

  def testSource(n: Int) = Source((0 to n)).map(n => Message(n, s"message $n"))

  val testQueue = new Queue {
    def acknowledge(id: Long) = println(s"acknowledging message $id")
  }
 
  val testHandler = Flow[Message[String]].map { msg => 
    // Randomly fail
    if (Random.nextBoolean) Failure(new Exception(s"failure processing message $msg"))
    else Success(s"success processing message $msg")
  }

  "Acknowledge" should "ack messages" in {

    val future = testSource(10).runWith(handleAndAckSink(testHandler, ackSink(testQueue)))

    whenReady(future) { result =>
    }

  }

}



Regards,
Lance

Andrew Rollins

unread,
Aug 21, 2015, 12:38:58 AM8/21/15
to Akka User List
Lance,

I really appreciate your response, it's well thought out, and the code is a great illustration. I have one question, I'm guessing you probably already considered it so I'd like to hear your thoughts.

Using your approach, what's the best way to parallelize work that the handler does? Is the intention to launch several separate handleAndAckSinks to achieve parallelism, or something else?

I ask because the handler flow has to output one element per message in order to keep the message and the Try in lock step. That means the handler flow can't do something like break down the work into a bunch of smaller messages, flatten that into the stream, and utilize mapAsync in that flattened stream to do a bunch of parallel work (however, the handler could have many steps which provide some parallelism through pipelining). I'm just not familiar enough with Akka Streams to intuite the best approach for parallelize the handler.

Thanks,
Andrew

Patrick McLaren

unread,
Aug 21, 2015, 12:38:58 AM8/21/15
to Akka User List
I don't know how idiomatic this solution is, but I recently achieved something like this by materializing a discrete value, say `Success`, then resolving a `Promise` with this value.

If you want to acknowledge a particular message, it might be easier to simply close-over it and `map` your acknowledgment after your call to `run`.

Lance Arlaus

unread,
Aug 21, 2015, 1:43:48 PM8/21/15
to Akka User List
Andrew-

There should be no problem using mapAsync in the handler since mapped elements will be emitted in order and, thus, joined with the proper message for the ack sink.
Quote from the docs: "These Futures may complete in any order, but the elements that are emitted downstream are in the same order as received from upstream."

This will allow you to parallelize any work on the messages with no change to the solution provided (the mapAsync is encapsulated in the Handler).

If, on the other hand, multiple logical messages are encapsulated in one physical message, the solution would need to be slightly different. However, once again, that's easily encapsulated in the handler.
For example, taking a single message and invoking several REST services in parallel and incorporating their responses would just be a broadcast/zip (or similar) flow embedded in the handler.

The point is that whatever handling is needed for each individual message should be fully encapsulated in the Handler, providing a clean separation of concerns.
Of course, you could parallelize acknowledgement as well, but that's a slightly different design and probably overkill, IMHO. KISS unless there's a genuine need to do otherwise.

Hope that helps. Feel free to sling additional questions.

Regards,
Lance

Andrew Rollins

unread,
Aug 25, 2015, 1:01:29 PM8/25/15
to akka...@googlegroups.com
Lance,

These are great ideas. I've been pursuing the methods you described. I intend to eventually share the code. I'll respond back here when I do.

- Andrew

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/OCeXSd8lc3c/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Andrew Rollins
Chief Software Architect, Localytics
Reply all
Reply to author
Forward
0 new messages