--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
On May 13, 2015, at 09:48, Rob Moore <rob....@gmail.com> wrote:Hi, Tim,Thanks for making this library available. I noticed the dependency on your scoped-fixtures library. Is that something that you plan on open sourcing as well?
On May 13, 2015, at 01:15, Roland Kuhn <goo...@rkuhn.info> wrote:Hi Tim,this looks like a very nice package, thanks for sharing! Since I am not that deeply into RabbitMQ, I guess the “opinionated” part is about favoring at-least-once guarantees and explicit acknowledgement over magic? This would fit in well with the overall Akka philosophy, and that would presumably not be coincidental ;-)
How has your experience been so far with building upon Akka Streams, did you miss anything or find anything especially great?
Looking at your code samples there is possibly one difficult part, which is the Promise that gets fed into the Sink in order to signal completion—perhaps that could be modeled more cleanly by exposing an outgoing stream of acknowledgements (i.e. making the producer a Flow and not a Sink) so that a feedback loop would inform upstream producers of successful ingestion.
14 maj 2015 kl. 00:43 skrev Tim Harper <timch...@gmail.com>:On May 13, 2015, at 01:15, Roland Kuhn <goo...@rkuhn.info> wrote:Hi Tim,this looks like a very nice package, thanks for sharing! Since I am not that deeply into RabbitMQ, I guess the “opinionated” part is about favoring at-least-once guarantees and explicit acknowledgement over magic? This would fit in well with the overall Akka philosophy, and that would presumably not be coincidental ;-)It's the consumer pattern, deserialization pattern and error-reporting pattern, bundled up into one integrated package.How has your experience been so far with building upon Akka Streams, did you miss anything or find anything especially great?I love Akka Streams. My only qualm currently (was going to comment on the thread in which you solicited feedback on the DSL) is the .toMat(sink)(Keep.right); I frequently use the Future from Sink.foreach and wish it were the default.
Looking at your code samples there is possibly one difficult part, which is the Promise that gets fed into the Sink in order to signal completion—perhaps that could be modeled more cleanly by exposing an outgoing stream of acknowledgements (i.e. making the producer a Flow and not a Sink) so that a feedback loop would inform upstream producers of successful ingestion.That's an interesting idea; I am using promises right now to achieve that result, and the pattern works relatively well. It allows me to pass the original upstream acknowledgement promise into the publisher sink, such that the message isn't considered "done" until any resultant messages are persisted in another message queue.I've thought about your approach and I'm not quite sure how I would do it. I could pass a tuple of the original deliveryTag (from the consumer Source) and the message to publish into the Producer flow, and have the publisher flow yield the deliveryTag; then, stream the deliveryTag back to the consumer Source so it can acknowledge it. One benefit of this is I could cross process boundaries safely. However, the consumer Source actually monitors the promises that it yields, and yells at the user if they are garbage collection (to let them know they dropped a promise without resolving it). I suppose I could do a configurable timeout, instead. Is this what you had in mind?
I tend to prefer the promises, since they are simple and universal, but I admit there are issues with cross-process boundaries.Worth of mention, I must ack the message on the same AMQP channel from which it was originally delivered, so, crossing machine boundaries is already going to cause pain, and a less reliable auto-acking approach should be used, instead.
Regards,Tim Harper
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
On May 27, 2015, at 07:47, Tal Pressman <kir...@gmail.com> wrote:Hi Tim,This looks great - I was just thinking of implementing something like this myself, so the timing couldn't have been better. ^_^
I do have a couple of questions, though.From what I see in AsyncAckingConsumer, the default error handling strategy is to acknowledge the message once the retry limit has been reached. This means that some messages could, theoretically, be lost (no at-least-once-delivery). Have you considered rejecting the message instead as a default, or providing another built-in strategy that does that? It's not a problem to implement it independently, but it could be a bit surprising for the user.
Regarding configuration, is there any way of configuring the connection dynamically? I couldn't find anywhere in the code that overrides the settings read from the default config file. For example, in my use case I have to be able to open connections to several different RabbitMQ clusters, and it doesn't seem to be possible with the current implementation.
As a side note, is it possible to change the configuration element to something a little less general (maybe rabbit-op.rabbitmq)?
Finally, a couple of things regarding the stream module:From my understanding of streams, creating a Source and creating a flow should not depend on the actual RabbitMQ connection/subscription. Instead, the subscription (and possible the connection/channel as well?) should only be created once the flow gets materialized. Have you considered using Source.actorPublisher(Props) to create the actor and subscribe in the actor's preStart or something?
The other thing about streams ties in with Roland's comments (I think) about the use of Futures with streams. It means that the entire flow must now be aware of the fact that it's a RabbitMQ flow (or at the very least, that its messages contain the Future), so it is not as composable as it might have been otherwise. Also, I don't see how it plays with streams' error handling mechanism / strategies. At the same time, the use of Futures to "track" messages is very elegant, and I don't see any easy way of achieving something similar with streams (maybe something using BidiFlow?).
In any case, like I already said, this looks like a very nice library. If you need any help with it, please let me know - I would love to contribute to it.
Tal
You're right; after the retry amount, the messages are lost. Also, worse, is that message order is not maintained. This makes the recovery strategy good only when message order does not matter. I had to do this because I use message headers to count retries.If you'll look at impl/AsyncAckingConsumer.scala you'll find the implementation for the RecoveryStrategy withRetry. I'd be open to an alternative recovery strategy. If the future returned by the RecoveryStrategy is a failure, then the original delivery gets Nacked.
This is doable; we could model the configuration values available with a case class, then create a method fromConfig to pull it from typesafe configuration; make it the default.
That may be a good idea, especially if op-rabbit begins adding more configuration options other than just how to connect to rabbitmq; however, I'd hope to avoid adding more configuration if possible.
I've considered Roland's comments deeply. I started down the route of implementing his suggestion and abandoned it as complexity grew. We have cases where the messages will be routed to different message sinks, or perhaps delivered to two different sinks. Passing along a promise allows us to fork this promise, such that the upstream promise is fulfilled only after the two forked downstream promises are (that way, enabling us to declare "the work is done" once the byproduct messages have all been confirmed to be persisted.
If you'd like to toy with alternative recovery strategies, that would be most helpful. I can work on switching the streams over to your suggestion of having them handle allocating and subscription to the subscription.
On May 28, 2015, at 06:25, Tal Pressman <kir...@gmail.com> wrote:
On Thursday, May 28, 2015 at 1:26:28 AM UTC+3, Tim Harper wrote:You're right; after the retry amount, the messages are lost. Also, worse, is that message order is not maintained. This makes the recovery strategy good only when message order does not matter. I had to do this because I use message headers to count retries.If you'll look at impl/AsyncAckingConsumer.scala you'll find the implementation for the RecoveryStrategy withRetry. I'd be open to an alternative recovery strategy. If the future returned by the RecoveryStrategy is a failure, then the original delivery gets Nacked.Indeed, I hadn't noticed that the retries actually re-enqueued the message.Maybe it's better to just let the downstream decide what to do with the message (retry for however many times it wants, for example), and make the AsyncAckingRabbitConsumer "stupid" - when it gets an error it rejects the message (or some other simple strategy) ?Anyway, I'll try playing around with it and see where I get...This is doable; we could model the configuration values available with a case class, then create a method fromConfig to pull it from typesafe configuration; make it the default.Adding case classes for configuration would be a nice touch. What I was trying to get at, however, is the ability to load from the non-default Config (for example, fromConfig(config: Config = ConfigFactory.load() or something).
That may be a good idea, especially if op-rabbit begins adding more configuration options other than just how to connect to rabbitmq; however, I'd hope to avoid adding more configuration if possible.I think the more important issue here is just to make the "root" configuration element name more unique and less likely to conflict with other libraries. "rabbitmq" seems like a name that could be picked by any number of RabbitMQ-related libraries.I've considered Roland's comments deeply. I started down the route of implementing his suggestion and abandoned it as complexity grew. We have cases where the messages will be routed to different message sinks, or perhaps delivered to two different sinks. Passing along a promise allows us to fork this promise, such that the upstream promise is fulfilled only after the two forked downstream promises are (that way, enabling us to declare "the work is done" once the byproduct messages have all been confirmed to be persisted.Yeah, that's what I like about the Future/Promise approach...On the other hand, streams already have a similar capability in the way they report back-pressure and demand for flows that are just as complex. Maybe it's something the Akka guys should consider adding to the streams directly...
If you'd like to toy with alternative recovery strategies, that would be most helpful. I can work on switching the streams over to your suggestion of having them handle allocating and subscription to the subscription.Yeah, I'll start playing around with it, and see what happens. And maybe also try to get publish confirmations working - blocking commits are really painful.
Tal
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/0TONFZ0uHao/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.