Announcement: Opinionated RabbitMQ library using Akka / Reactive Streams

703 views
Skip to first unread message

Tim Harper

unread,
May 11, 2015, 12:23:50 AM5/11/15
to akka...@googlegroups.com
I have developed a high-level library for efficiently setting up resilient, fault-tolerant RabbitMQ consumers using Akka and Akka Reactive Streams. 

Some of the features:

- Recovery:
    - Consumers automatically reconnect and subscribe if the connection is lost
    - Messages published can optionally 
- Integration
    - Connection settings pulled from Typesafe config library
    - Asyncronous, concurrent consumption using Scala native Futures or the new Akka Streams project.
    - Common pattern for serialization allows easy integration with serialization libraries such play-json or json4s
    - Common pattern for exception handling to publish errors to Airbrake, Syslog, or all of the above
- Modular
    - Composition favored over inheritance enabling flexible and high code reuse.
- Modeled
    - Queue binding, exchange binding modeled with case classes
    - Publishing mechansims also modeled
- Reliability
    - Builds on the excellent [Akka RabbitMQ client](https://github.com/thenewmotion/akka-rabbitmq) library for easy recovery.
    - Built-in consumer error recovery strategy in which messages are re-delivered to the message queue and retried (not implemented for akka-streams integration as retry mechanism affects message order)
    - With a single message, pause all consumers if service health check fails (IE: database unavailable); easily resume the same.
- Graceful shutdown
    - Consumers and streams can immediately unsubscribe, but stay alive long enough to wait for any messages to finish being processed.
- Tested
    - Extensive integration tests

The source is available here: https://github.com/SpinGo/op-rabbit

We have been using the library internally at SpinGo for a year and I am working towards a 1.0.0 release candidate. We're using the streaming integration as the foundation for a billing system which is heavily based on reliable message-order, and at-least-once-delivery guarantees. I'm rather excited to share it with the world, and would be grateful for feedback. I plan on creating an Activator project to help people learn the library quickly.

Some examples are on the github page. More examples can be found in the tests.

Feedback, is of course, appreciated.

Tim

Joseph Mansigian

unread,
May 11, 2015, 10:53:57 PM5/11/15
to akka...@googlegroups.com
Hello Tim,

I saw your post about the library you developed and wonder if it can help me with a problem.

I have a Java/Akka project that is now run from the command line. Currently anyone who wants to see what it can do has to install/already have:

  • git 
  • java
  • scala
  • akka
  • yamlbeans
I think that it is a barrier to go from a synopsis to being able to actually run the code if all of the above has to be in place as foundation.  So I am thinking that what is needed is a Web based implementation that only requires you to have a browser.  You can manipulate the application entirely by data ( all specific intelligence is modeled in data ) so forms would work.  I am thinking of something like the W3C Try It kind of thing.  With little time investment people could see if they are interested.

My question is can your library help to realize the goal of giving my project an online presence?   Or help in another way.

A synopsis of my project can be found at:

The project source is available on GitHib:
git clone https://github.com/aed-project/aspire aspire-emergent-design

Thanks,
Joe Mansigian  at 10:53 pm

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Tim Harper

unread,
May 12, 2015, 5:37:48 PM5/12/15
to akka...@googlegroups.com
Unfortunately, Joseph, I think the answer here is no; this library will not reduce dependencies on any of the services you specify. It helps Scala applications integrate with RabbitMQ.

Roland Kuhn

unread,
May 13, 2015, 3:15:34 AM5/13/15
to akka-user
Hi Tim,

this looks like a very nice package, thanks for sharing! Since I am not that deeply into RabbitMQ, I guess the “opinionated” part is about favoring at-least-once guarantees and explicit acknowledgement over magic? This would fit in well with the overall Akka philosophy, and that would presumably not be coincidental ;-)

How has your experience been so far with building upon Akka Streams, did you miss anything or find anything especially great? Looking at your code samples there is possibly one difficult part, which is the Promise that gets fed into the Sink in order to signal completion—perhaps that could be modeled more cleanly by exposing an outgoing stream of acknowledgements (i.e. making the producer a Flow and not a Sink) so that a feedback loop would inform upstream producers of successful ingestion.

Regards,

Roland

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Reactive apps on the JVM.
twitter: @rolandkuhn


Rob Moore

unread,
May 13, 2015, 12:59:35 PM5/13/15
to akka...@googlegroups.com
Hi, Tim,

Thanks for making this library available. I noticed the dependency on your scoped-fixtures library. Is that something that you plan on open sourcing as well?

Regards,

Rob

Tim Harper

unread,
May 13, 2015, 6:32:30 PM5/13/15
to akka...@googlegroups.com
Hi Rob,


On May 13, 2015, at 09:48, Rob Moore <rob....@gmail.com> wrote:

Hi, Tim,

Thanks for making this library available. I noticed the dependency on your scoped-fixtures library. Is that something that you plan on open sourcing as well?


Tim Harper

unread,
May 13, 2015, 6:43:39 PM5/13/15
to akka...@googlegroups.com
On May 13, 2015, at 01:15, Roland Kuhn <goo...@rkuhn.info> wrote:

Hi Tim,

this looks like a very nice package, thanks for sharing! Since I am not that deeply into RabbitMQ, I guess the “opinionated” part is about favoring at-least-once guarantees and explicit acknowledgement over magic? This would fit in well with the overall Akka philosophy, and that would presumably not be coincidental ;-)

It's the consumer pattern, deserialization pattern and error-reporting pattern, bundled up into one integrated package.

How has your experience been so far with building upon Akka Streams, did you miss anything or find anything especially great?

I love Akka Streams. My only qualm currently (was going to comment on the thread in which you solicited feedback on the DSL) is the .toMat(sink)(Keep.right); I frequently use the Future from Sink.foreach and wish it were the default.

Looking at your code samples there is possibly one difficult part, which is the Promise that gets fed into the Sink in order to signal completion—perhaps that could be modeled more cleanly by exposing an outgoing stream of acknowledgements (i.e. making the producer a Flow and not a Sink) so that a feedback loop would inform upstream producers of successful ingestion.

That's an interesting idea; I am using promises right now to achieve that result, and the pattern works relatively well. It allows me to pass the original upstream acknowledgement promise into the publisher sink, such that the message isn't considered "done" until any resultant messages are persisted in another message queue.

I've thought about your approach and I'm not quite sure how I would do it. I could pass a tuple of the original deliveryTag (from the consumer Source) and the message to publish into the Producer flow, and have the publisher flow yield the deliveryTag; then, stream the deliveryTag back to the consumer Source so it can acknowledge it. One benefit of this is I could cross process boundaries safely. However, the consumer Source actually monitors the promises that it yields, and yells at the user if they are garbage collection (to let them know they dropped a promise without resolving it). I suppose I could do a configurable timeout, instead. Is this what you had in mind?

I tend to prefer the promises, since they are simple and universal, but I admit there are issues with cross-process boundaries.

Worth of mention, I must ack the message on the same AMQP channel from which it was originally delivered, so, crossing machine boundaries is already going to cause pain, and a less reliable auto-acking approach should be used, instead.

Regards,

Tim Harper

Roland Kuhn

unread,
May 14, 2015, 2:12:01 AM5/14/15
to akka-user
14 maj 2015 kl. 00:43 skrev Tim Harper <timch...@gmail.com>:


On May 13, 2015, at 01:15, Roland Kuhn <goo...@rkuhn.info> wrote:

Hi Tim,

this looks like a very nice package, thanks for sharing! Since I am not that deeply into RabbitMQ, I guess the “opinionated” part is about favoring at-least-once guarantees and explicit acknowledgement over magic? This would fit in well with the overall Akka philosophy, and that would presumably not be coincidental ;-)

It's the consumer pattern, deserialization pattern and error-reporting pattern, bundled up into one integrated package.

How has your experience been so far with building upon Akka Streams, did you miss anything or find anything especially great?

I love Akka Streams. My only qualm currently (was going to comment on the thread in which you solicited feedback on the DSL) is the .toMat(sink)(Keep.right); I frequently use the Future from Sink.foreach and wish it were the default.

In that situation you can use .runWith(Sink.head) where the default is to return the Future—we use that in almost all our tests ;-) The non-running DSL methods all consistently default to the “left” side since that is obviously correct for almost all of them (like map/filter/drop/take), while the convenience runners default to their argument to provide this choice in a nicer fashion for some prominent use-cases.


Looking at your code samples there is possibly one difficult part, which is the Promise that gets fed into the Sink in order to signal completion—perhaps that could be modeled more cleanly by exposing an outgoing stream of acknowledgements (i.e. making the producer a Flow and not a Sink) so that a feedback loop would inform upstream producers of successful ingestion.

That's an interesting idea; I am using promises right now to achieve that result, and the pattern works relatively well. It allows me to pass the original upstream acknowledgement promise into the publisher sink, such that the message isn't considered "done" until any resultant messages are persisted in another message queue.

I've thought about your approach and I'm not quite sure how I would do it. I could pass a tuple of the original deliveryTag (from the consumer Source) and the message to publish into the Producer flow, and have the publisher flow yield the deliveryTag; then, stream the deliveryTag back to the consumer Source so it can acknowledge it. One benefit of this is I could cross process boundaries safely. However, the consumer Source actually monitors the promises that it yields, and yells at the user if they are garbage collection (to let them know they dropped a promise without resolving it). I suppose I could do a configurable timeout, instead. Is this what you had in mind?

Yes, that is what I meant.


I tend to prefer the promises, since they are simple and universal, but I admit there are issues with cross-process boundaries.

Worth of mention, I must ack the message on the same AMQP channel from which it was originally delivered, so, crossing machine boundaries is already going to cause pain, and a less reliable auto-acking approach should be used, instead.

I wasn’t thinking so much about crossing machine boundaries, I’m currently more interested in learning how powerful the DSL is that we have built. By using Promises you introduce an “uncontrolled” back-channel for communicating between stream processing stages, and if that were the only way to solve this then our DSL would not be exhaustive. So, for me it is more of an aesthetic concern that the solution would be simpler in a way by just using one abstraction instead of mixing two. A working solution still is a working solution for you, of course ;-)

Regards,

Roland


Regards,

Tim Harper


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

dpratt

unread,
May 14, 2015, 7:40:36 PM5/14/15
to akka...@googlegroups.com
This is actually a fairly interesting problem that I've been putting a lot of thought into lately as well. I've been working on a (very rough) ground-up implementation of AMQP using the new StreamTcp API, and I've run into an interesting problem with RabbitMQ.

Basically, AMQP uses the concept of an Ack for a message into two different (and unrelated) concepts:

1) It's used to signal that a message has been handled and consumed, and thus may be removed from the queue that it originated in.
2) It's also used to signal back pressure to the broker from a client. When creating an AMQP channel, you specify a QoS level, which is the total number of in-flight un-acked messages that the broker will deliver to you.

On top of this, Rabbit also uses TCP to signal back pressure the other way around - when a client sends too many requests (publishes, etc), it signals TCP back pressure.

The incoming/subscription method of using Acks and QoS seems to make sense to me, since you don't want a slow consumer to stall all of the other subscribing channels on a given connection. I've been playing with the idea of modeling a subscription as a BidiFlow with messages being emitted on the top side, and having an incoming stream of Ack/Reject messages on the bottom side. The top side would adjust the underlying channel's QoS based on incoming demand, and leaving the responsibility to Ack messages entirely up to the user-defined downstream message-processing Flow. This is obviously not ideal, since it would easily allow a poorly-behaved downstream that never sends an Ack to cause the QoS on the channel to eventually blow up and be unbounded.

The best I can think of to get around this is to just remove the concept of Acks from the downstream consumer, and keep it entirely bounded inside of the subscription's Source. Namely, keep an internal buffer of messages, and send an Ack back to the broker when downstream signals demand and an element is emitted. This would work quite well, but it does eliminate some of the AMQP model from the downstream processor - namely, there's no way to explicitly reject a message as inappropriate. In the grand scheme of things, I don't think this is a big deal, however, because for most of the use cases I've seen, when a message on an AMQP queue is malformed or unprocurable, the last thing you really want to do is put it back on the same queue to be processed later.

Forgive the digression and wordiness, this is just something I've been thinking about quite a lot recently - my end goal is the ability to have a Rabbit client that supports a subset of the protocol (most likely publish and subscribe) written and based from the ground up on Akka StreamTcp. Am I completely barking the wrong tree here?

Tal Pressman

unread,
May 27, 2015, 9:47:11 AM5/27/15
to akka...@googlegroups.com
Hi Tim,

This looks great - I was just thinking of implementing something like this myself, so the timing couldn't have been better. ^_^
I do have a couple of questions, though. 

From what I see in AsyncAckingConsumer, the default error handling strategy is to acknowledge the message once the retry limit has been reached. This means that some messages could, theoretically, be lost (no at-least-once-delivery). Have you considered rejecting the message instead as a default, or providing another built-in strategy that does that? It's not a problem to implement it independently, but it could be a bit surprising for the user.

Regarding configuration, is there any way of configuring the connection dynamically? I couldn't find anywhere in the code that overrides the settings read from the default config file. For example, in my use case I have to be able to open connections to several different RabbitMQ clusters, and it doesn't seem to be possible with the current implementation.
As a side note, is it possible to change the configuration element to something a little less general (maybe rabbit-op.rabbitmq)? 

Finally, a couple of things regarding the stream module:

From my understanding of streams, creating a Source and creating a flow should not depend on the actual RabbitMQ connection/subscription. Instead, the subscription (and possible the connection/channel as well?) should only be created once the flow gets materialized. Have you considered using Source.actorPublisher(Props) to create the actor and subscribe in the actor's preStart or something?

The other thing about streams ties in with Roland's comments (I think) about the use of Futures with streams. It means that the entire flow must now be aware of the fact that it's a RabbitMQ flow (or at the very least, that its messages contain the Future), so it is not as composable as it might have been otherwise. Also, I don't see how it plays with streams' error handling mechanism / strategies. At the same time, the use of Futures to "track" messages is very elegant, and I don't see any easy way of achieving something similar with streams (maybe something using BidiFlow?).

In any case, like I already said, this looks like a very nice library. If you need any help with it, please let me know - I would love to contribute to it.

Tal

Tim Harper

unread,
May 27, 2015, 6:26:28 PM5/27/15
to akka...@googlegroups.com
On May 27, 2015, at 07:47, Tal Pressman <kir...@gmail.com> wrote:

Hi Tim,

This looks great - I was just thinking of implementing something like this myself, so the timing couldn't have been better. ^_^

Glad to hear it!

I do have a couple of questions, though. 

From what I see in AsyncAckingConsumer, the default error handling strategy is to acknowledge the message once the retry limit has been reached. This means that some messages could, theoretically, be lost (no at-least-once-delivery). Have you considered rejecting the message instead as a default, or providing another built-in strategy that does that? It's not a problem to implement it independently, but it could be a bit surprising for the user.

You're right; after the retry amount, the messages are lost. Also, worse, is that message order is not maintained. This makes the recovery strategy good only when message order does not matter. I had to do this because I use message headers to count retries.

If you'll look at impl/AsyncAckingConsumer.scala you'll find the implementation for the RecoveryStrategy withRetry. I'd be open to an alternative recovery strategy. If the future returned by the RecoveryStrategy is a failure, then the original delivery gets Nacked.




Regarding configuration, is there any way of configuring the connection dynamically? I couldn't find anywhere in the code that overrides the settings read from the default config file. For example, in my use case I have to be able to open connections to several different RabbitMQ clusters, and it doesn't seem to be possible with the current implementation.

This is doable; we could model the configuration values available with a case class, then create a method fromConfig to pull it from typesafe configuration; make it the default.

As a side note, is it possible to change the configuration element to something a little less general (maybe rabbit-op.rabbitmq)?

That may be a good idea, especially if op-rabbit begins adding more configuration options other than just how to connect to rabbitmq; however, I'd hope to avoid adding more configuration if possible.

Finally, a couple of things regarding the stream module:

From my understanding of streams, creating a Source and creating a flow should not depend on the actual RabbitMQ connection/subscription. Instead, the subscription (and possible the connection/channel as well?) should only be created once the flow gets materialized. Have you considered using Source.actorPublisher(Props) to create the actor and subscribe in the actor's preStart or something?

I just about went down this route originally, but due to code constraints I didn't. I've since refactored the code however to make that feasible. This would be a simpler approach.

The other thing about streams ties in with Roland's comments (I think) about the use of Futures with streams. It means that the entire flow must now be aware of the fact that it's a RabbitMQ flow (or at the very least, that its messages contain the Future), so it is not as composable as it might have been otherwise. Also, I don't see how it plays with streams' error handling mechanism / strategies. At the same time, the use of Futures to "track" messages is very elegant, and I don't see any easy way of achieving something similar with streams (maybe something using BidiFlow?).

I've considered Roland's comments deeply. I started down the route of implementing his suggestion and abandoned it as complexity grew. We have cases where the messages will be routed to different message sinks, or perhaps delivered to two different sinks. Passing along a promise allows us to fork this promise, such that the upstream promise is fulfilled only after the two forked downstream promises are (that way, enabling us to declare "the work is done" once the byproduct messages have all been confirmed to be persisted.


In any case, like I already said, this looks like a very nice library. If you need any help with it, please let me know - I would love to contribute to it.

If you'd like to toy with alternative recovery strategies, that would be most helpful. I can work on switching the streams over to your suggestion of having them handle allocating and subscription to the subscription.

Tal

Thanks!

Tim

Tal Pressman

unread,
May 28, 2015, 8:25:42 AM5/28/15
to akka...@googlegroups.com


On Thursday, May 28, 2015 at 1:26:28 AM UTC+3, Tim Harper wrote:

You're right; after the retry amount, the messages are lost. Also, worse, is that message order is not maintained. This makes the recovery strategy good only when message order does not matter. I had to do this because I use message headers to count retries.

If you'll look at impl/AsyncAckingConsumer.scala you'll find the implementation for the RecoveryStrategy withRetry. I'd be open to an alternative recovery strategy. If the future returned by the RecoveryStrategy is a failure, then the original delivery gets Nacked.


Indeed, I hadn't noticed that the retries actually re-enqueued the message.
Maybe it's better to just let the downstream decide what to do with the message (retry for however many times it wants, for example), and make the AsyncAckingRabbitConsumer "stupid" - when it gets an error it rejects the message (or some other simple strategy) ?
Anyway, I'll try playing around with it and see where I get...
 

This is doable; we could model the configuration values available with a case class, then create a method fromConfig to pull it from typesafe configuration; make it the default.


Adding case classes for configuration would be a nice touch. What I was trying to get at, however, is the ability to load from the non-default Config (for example, fromConfig(config: Config = ConfigFactory.load() or something).


That may be a good idea, especially if op-rabbit begins adding more configuration options other than just how to connect to rabbitmq; however, I'd hope to avoid adding more configuration if possible.


I think the more important issue here is just to make the "root" configuration element name more unique and less likely to conflict with other libraries. "rabbitmq" seems like a name that could be picked by any number of RabbitMQ-related libraries.

 
I've considered Roland's comments deeply. I started down the route of implementing his suggestion and abandoned it as complexity grew. We have cases where the messages will be routed to different message sinks, or perhaps delivered to two different sinks. Passing along a promise allows us to fork this promise, such that the upstream promise is fulfilled only after the two forked downstream promises are (that way, enabling us to declare "the work is done" once the byproduct messages have all been confirmed to be persisted.


Yeah, that's what I like about the Future/Promise approach...
On the other hand, streams already have a similar capability in the way they report back-pressure and demand for flows that are just as complex. Maybe it's something the Akka guys should consider adding to the streams directly...  


If you'd like to toy with alternative recovery strategies, that would be most helpful. I can work on switching the streams over to your suggestion of having them handle allocating and subscription to the subscription.

Yeah, I'll start playing around with it, and see what happens. And maybe also try to get publish confirmations working - blocking commits are really painful.

 Tal

Tim Harper

unread,
Jun 10, 2015, 3:17:25 AM6/10/15
to akka...@googlegroups.com
On May 28, 2015, at 06:25, Tal Pressman <kir...@gmail.com> wrote:



On Thursday, May 28, 2015 at 1:26:28 AM UTC+3, Tim Harper wrote:

You're right; after the retry amount, the messages are lost. Also, worse, is that message order is not maintained. This makes the recovery strategy good only when message order does not matter. I had to do this because I use message headers to count retries.

If you'll look at impl/AsyncAckingConsumer.scala you'll find the implementation for the RecoveryStrategy withRetry. I'd be open to an alternative recovery strategy. If the future returned by the RecoveryStrategy is a failure, then the original delivery gets Nacked.


Indeed, I hadn't noticed that the retries actually re-enqueued the message.
Maybe it's better to just let the downstream decide what to do with the message (retry for however many times it wants, for example), and make the AsyncAckingRabbitConsumer "stupid" - when it gets an error it rejects the message (or some other simple strategy) ?
Anyway, I'll try playing around with it and see where I get...
 

This is doable; we could model the configuration values available with a case class, then create a method fromConfig to pull it from typesafe configuration; make it the default.


Adding case classes for configuration would be a nice touch. What I was trying to get at, however, is the ability to load from the non-default Config (for example, fromConfig(config: Config = ConfigFactory.load() or something).

I've added this support on the master branch.



That may be a good idea, especially if op-rabbit begins adding more configuration options other than just how to connect to rabbitmq; however, I'd hope to avoid adding more configuration if possible.


I think the more important issue here is just to make the "root" configuration element name more unique and less likely to conflict with other libraries. "rabbitmq" seems like a name that could be picked by any number of RabbitMQ-related libraries.

 
I've considered Roland's comments deeply. I started down the route of implementing his suggestion and abandoned it as complexity grew. We have cases where the messages will be routed to different message sinks, or perhaps delivered to two different sinks. Passing along a promise allows us to fork this promise, such that the upstream promise is fulfilled only after the two forked downstream promises are (that way, enabling us to declare "the work is done" once the byproduct messages have all been confirmed to be persisted.


Yeah, that's what I like about the Future/Promise approach...
On the other hand, streams already have a similar capability in the way they report back-pressure and demand for flows that are just as complex. Maybe it's something the Akka guys should consider adding to the streams directly...  

The stream components only communicate demand, and demand, naturally, is not the same as "message made it through the entire pipe". There are some other complexities supported by the promise, such as the ability to fork them (create two new promises, and fulfill the upstream one once both new promises are completed), allowing support for patterns in which an upstream message is Acked only after it has made it fully through all the downstream pipes. Of course, this is useful only when you embrace "send at least once" (which is probably a good idea, anyways).



If you'd like to toy with alternative recovery strategies, that would be most helpful. I can work on switching the streams over to your suggestion of having them handle allocating and subscription to the subscription.

Yeah, I'll start playing around with it, and see what happens. And maybe also try to get publish confirmations working - blocking commits are really painful.

Let me know how I can help!

Tim


 Tal

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/0TONFZ0uHao/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Tal Pressman

unread,
Jul 13, 2015, 8:55:54 AM7/13/15
to akka...@googlegroups.com
Hi Tim,

Sorry to disappear on you like that, priorities got shifted a little here and I had to work on something else for a while. I'm back to working on this now, though, and it seems like there have been a lot of changes in the meantime!

So I created a new project, started with the sample code, and then continued from there. There were a few places where the samples were out-of-date, but nothing too serious (I can open a pull request for it, if you'd like). After getting the samples running I tried combining the source with the sink in a single flow (read from a queue, do some work, write the results to a different queue), and there I ran into some issues.

First, it appears that RabbitSink doesn't fulfill the messages' promises. This means messages are never acked back to the source, in turn causing it to stop sending messages. I fixed this locally, but I wasn't sure if this was by design or if it's an actual problem.

Second, it appears that sometimes messages get sent out-of-order. I haven't been able to isolate the exact cause yet, but I'll keep digging... Did you run into anything similar?

In any case, if you'd like to have a look at my changes, they're here 

Tal

Tim Harper

unread,
Jul 20, 2015, 12:22:49 AM7/20/15
to akka...@googlegroups.com
I responded to your issues by commenting on your github fork; all those issues have been resolved, and there are tests in place to assert the functionality is retained. Also, I've pushed in a new concept of acknowledged streams with the recent v1.0.0-M9 release. The reasoning behind the change can be found on my blog, here: http://tim.theenchanter.com/2015/07/the-need-for-acknowledgement-in-streams.html. Take a look, let me know what you think. The change is ambitious but I think it is shaping up rather nicely.

Thanks,

Tim

Tal Pressman

unread,
Jul 21, 2015, 9:18:25 AM7/21/15
to akka...@googlegroups.com
Hi,

Thanks for the fixes, the ack issue especially was pretty much a showstopper.
The acknowledged streams seem very nice. I haven't gotten into the small details, but at a high level they do seem to be what I was looking for.
It would be nice if, as you mentioned in the blog, you extracted the acknowledged stream part to its own library. Right now, I already have to extend it to at least 2 other "connection" types other than RabbitMQ - Akka Remote actors, and persistent at-least-once-delivery actors. It's not a big deal since I use rabbit-op anyway, but it would make my code cleaner and mean that I could share my sinks/sources more easily.
Another thing that might be missing is access to the actual future / promise from within the stream. I agree it's a good idea to hide it as a default, but in some use cases it might be useful to manipulate it at certain intermediate stages of the flow (to register more callbacks for its completion, or to complete it early, for example). I don't know, I'm not really sure about this yet, as I'm still trying to decide what exactly will and won't be part of my streams, but it's just something to think about.

Tal
Reply all
Reply to author
Forward
0 new messages