[akka-stream] What is the best way to acknowledge completion?

500 views
Skip to first unread message

Dom B

unread,
May 28, 2015, 11:19:18 AM5/28/15
to akka...@googlegroups.com
Hi,

I'm looking at the new Streams API and trying to work out if it's a best fit for my use case. The problem I'm having is with acknowledgement.

I have an external system placing requests onto a queue (Beanstalk in this case), once the external system receives the OK from beanstalk, like an event during replay, I'm committed to preforming that job.

My plan was to build a ActorPublisher based Beanstalk client based on the existing Akka IO code, then reserve the jobs on the beanstalk server. This starts a time to live on the beanstalk server, if I do not delete or release the job within that window, the server will assume I'm dead and release the job back onto the queue (https://github.com/kr/beanstalkd/wiki/faq#how-does-ttr-work). This is perfect in a HA system, because if my Akka node dies, beanstalk will simply reset the job for another node to process.

My problem is once my ActorPublisher gives the job to the "onNext" method, it has no way to track if and when the job is completed, thus the only time it can delete from beanstalk is straight away and then I just lost the whole advantage of the job TTL. 

I'm leaning towards a Sink such that the stream has to be `BeanstalkPublisher ~> F1 ~> F2 ~> BeanstalkDeleter`, but this means the data type flowing though F1 and F2 has to include the job id. This makes Flow composition a little more annoying as I had planned to reuse F1 and F2 but without a Beanstalk source.

Am I missing something obvious, or is this the only way to achieve an ACK back to the upstream publisher once F2 has completed processing? (Note in this example F2 is the persistence point)

Cheers,

Dom

Tim Harper

unread,
Aug 12, 2015, 3:15:58 AM8/12/15
to Akka User List
Dom,

Have a look at my library, `op-rabbit`. I am planning to lift out the acknowledged stream component so it can be used on it's own.

Akka Team

unread,
Aug 14, 2015, 6:13:04 AM8/14/15
to Akka User List
Hi Dom,

this keeps coming up in different forms and the use-case is a valid one. So far I’d say that it is possible to write F1 and F2 generically such that they carry some auxiliary information of type T around and that may or may not be used; when applied in the beanstalk context this would be a job ID, during tests it could be some debug info that helps identify what went wrong (in case something ever does). The problem with trying to lift this into the framework is that we have stages/junctions that are non-obvious to deal with (essentially everything that is not 1:1 element-wise).

As an example you can take a look at the HTTP client which provides such generic flows, threading some T through from request to response.

Would this be adequate in your situation? If not, do you have a brilliant idea of what would be?

Regards,

Roland

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

Dom B

unread,
Aug 17, 2015, 4:24:59 AM8/17/15
to Akka User List
Hi Roland,

In the end I solved it used various type classes from Scalaz and created a "Flowz" class to remove the boiler plat from using it with "Flow" DSL. Then included the beanstalk deleter as the sink at the end of all possible routes in my graph.

See the Flowz implementation here: https://gist.github.com/DomBlack/568752041e4ec0cecc18

Cheers,

Dom

Andrew Rollins

unread,
Aug 20, 2015, 12:20:28 AM8/20/15
to Akka User List
Dom,

I'm interested in this as well. I like the sample you provided. I just have a couple questions to better understand your solution.

In your example, you have:

val process = Flowz[Option, Int].map(_ * 2 toString) // Outputs Flow[Option[Int], Option[String]]

I'm assuming that in your beanstalk case, you use a type other than Option which lets you thread the job id all the way to the end, so maybe you end up doing some sort of Flowz[Job, Int].

This approach works great if you use it everywhere. I have a question though. What if you already have a Flow[Int, String], and you want to somehow augment it to pass through the Job? Have you considered how you might handle it, or do you just avoid that altogether?

Dom B

unread,
Aug 21, 2015, 4:40:51 AM8/21/15
to Akka User List
Hi Andrew,

The way I have solved that problem was to broadcast the flow, Flow.map out the Job wrapper, go into the premade flow, then zip the two together before mapping the resultant back into the Job wrapper

~> broadcast ~> mapOutWrapper ~> f1 ~> zip ~> mapIntoWrapper ~>
   broadcast           ~>              zip

This only works though, when your f1 task remains ordered. I suppose I could abstract it into the Flowz implementation, but because of that ordered requirement, I decided in the one case I needed it, just to put it in directly to make the requirement clear during review.

Cheers,

Dom

Roland Kuhn

unread,
Aug 25, 2015, 9:45:11 AM8/25/15
to akka-user
Hi Dom,

thanks for sharing the Flowz, and I agree that that is about as far as you can go with static guarantees—it makes sense to keep the zipping graph explicit for the reason you state.

Regards,

Roland


Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Reactive apps on the JVM.
twitter: @rolandkuhn


Andrew Rollins

unread,
Aug 25, 2015, 12:31:38 PM8/25/15
to akka...@googlegroups.com
Dom,

Thanks so much for your reply.

I have one question. It seems like the approach you outlined assumes you broadcast one object per acknowledgment needed later, which means the zip will be in lock step. However, what if the object you are sending through the pipeline is very large and you want to break it into N smaller parts of work? For example, let's say you are processing pointers to large files in HDFS or AWS S3, and you'd like to chunk those files into smaller pieces through the pipeline. Have you thought about how you might approach that?

You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/XaSpX48hoJY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Andrew Rollins
Chief Software Architect, Localytics

timch...@gmail.com

unread,
Aug 25, 2015, 12:50:59 PM8/25/15
to akka...@googlegroups.com
Andrew,

I just thought that I should mention that I've deployed my acknowledged stream implementation to maven central. It uses promises, and both Roland/Victor hate that. :). But, it works with stream operations that modify cardinality, such as group, groupedWithin, filter, mapConcat, etc.


Sent from my iPhone

Andrew Rollins

unread,
Aug 25, 2015, 12:53:17 PM8/25/15
to akka...@googlegroups.com
Tim,

Interesting! I'll take a look.

Roland Kuhn

unread,
Aug 25, 2015, 2:02:08 PM8/25/15
to akka-user
“Hate” is an awfully strong word :-) My last assessment was that I prefer a working solution over a non-existing one any time—and that more research is needed on this topic.
Reply all
Reply to author
Forward
0 new messages