Akka Source.queue (Java DSL query)

610 views
Skip to first unread message

Daniel Stoner

unread,
Jul 14, 2016, 10:13:57 AM7/14/16
to akka...@googlegroups.com
Recently I spotted a great example of how to use the Source.queue feature in Streams to pre-materialise a flow and then pass events into it independently.

The examples utilising Actors were tempting but would over-complicate my use case - which is to throttle writes to a database in a custom persistence Journal implementation.

Using Source.queue for the life of me I cannot work out how to get the SourceQueue from which to then 'offer' out of this flow within the Java DSL.

Scala example was:
val queue = Source.queue(bufferSize, overflowStrategy)
                  .filter(!_.raining)
                  .runForeach(println)

queue.offer(Weather("02139", 32.0, true))

My Java line for line conversion is:
CompletionStage<Done> clearlyNotAQueue = 
                 Source.<Integer>queue(5, OverflowStrategy.dropHead())
                .filter(msg -> msg.equals(5))
                .runForeach(System.out::println, materializer)

However this returns a CompletionStage<Done> representing the runForEach's completion without result.

A pointer in the right direction would be hugely appreciated.

Thanks,
Daniel Stoner
--
Daniel Stoner | Senior Software Engineer UtopiaIT | Ocado Technology


Notice:  This email is confidential and may contain copyright material of members of the Ocado Group. Opinions and views expressed in this message may not necessarily reflect the opinions and views of the members of the Ocado Group. 

 

If you are not the intended recipient, please notify us immediately and delete all copies of this message. Please note that it is your responsibility to scan this message for viruses. 

 

Fetch and Sizzle are trading names of Speciality Stores Limited, a member of the Ocado Group.

 

References to the “Ocado Group” are to Ocado Group plc (registered in England and Wales with number 7098618) and its subsidiary undertakings (as that expression is defined in the Companies Act 2006) from time to time.  The registered office of Ocado Group plc is Titan Court, 3 Bishops Square, Hatfield Business Park, Hatfield, Herts. AL10 9NE.

Ian Graham

unread,
Jan 19, 2017, 4:02:42 AM1/19/17
to Akka User List
Hi,

am sure you have already solved this issue but I found it yesterday when I was trying to do exactly the same thing.

Eventually I managed to get together something that seems like a working solution so thought I would post:

Source queueSource = Source.<String>queue(100, OverflowStrategy.backpressure()).mapMaterializedValue(mat -> {
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
CompletionStage<QueueOfferResult> offerResult = mat.offer(message);
offerResult.whenComplete((result, e) -> {
try {
if (result == QueueOfferResult.enqueued()) {
channel.basicAck(envelope.getDeliveryTag(), false);
} else {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
} catch(Exception ex) {
throw new RuntimeException(e);
}
});
}
};
channel.basicConsume("queuename", consumer);
return mat;
});

Obviously not production ready code, was just me spiking it, hope this helps,

cheers,
Ian

Jeremy Sowden

unread,
Feb 13, 2017, 6:38:02 AM2/13/17
to Akka User List
On Thursday, 14 July 2016 15:13:57 UTC+1, Daniel Stoner wrote:
Recently I spotted a great example of how to use the Source.queue feature in Streams to pre-materialise a flow and then pass events into it independently.

The examples utilising Actors were tempting but would over-complicate my use case - which is to throttle writes to a database in a custom persistence Journal implementation.

Using Source.queue for the life of me I cannot work out how to get the SourceQueue from which to then 'offer' out of this flow within the Java DSL.

Scala example was:
val queue = Source.queue(bufferSize, overflowStrategy)
                  .filter(!_.raining)
                  .runForeach(println)

queue.offer(Weather("02139", 32.0, true))

My Java line for line conversion is:
CompletionStage<Done> clearlyNotAQueue = 
                 Source.<Integer>queue(5, OverflowStrategy.dropHead())
                .filter(msg -> msg.equals(5))
                .runForeach(System.out::println, materializer)

However this returns a CompletionStage<Done> representing the runForEach's completion without result.

A pointer in the right direction would be hugely appreciated.

Given a source of type Source<Out, M1> and a sink of type Sink<In, M2>,  source.runWith (sink, mat) returns an M2, but sink.runWith (source, mat) returns an M1.  Therefore, something like the following (untested) should do what you want:

SourceQueue<Integer> queue = Sink.foreach (msg -> System.out::println).runWith (
  Source.<Integer>queue (5, OverflowStrategy.dropHead ()).filter (msg -> msg.equals (5)),
  materializer 
);



 

Daniel Stoner

unread,
Feb 14, 2017, 3:54:44 AM2/14/17
to Akka User List
Thanks all for the responses. I did utilise something akin to Ians solution of having a visible channel to send responses back on in the end.

To retrieve the SourceQueueWithComplete I used:
SourceQueueWithComplete<Pair<Request, ResponsePromise<Request, Response>>> queue = Source
                .<Pair<Request, ResponsePromise<Request, Response>>>queue(bufferSize, OverflowStrategy.dropNew())
                .via(flow).run(materializer)

You'll note a few points:
A) It is the run method which returns the materialized stream SourceQueue
B) I don't just take Requests into my streams - but Pairs of requests and response promises (An async callback function basically)

In the end though I found the semantics of Streams a little cumbersome to get quite production ready. Previously my Journal implementation was creating a stream per batched write to the database. Now I have a Singleton queue and need it to be long-lived. Akka Streams collapse in event of exception by default so I had to watch out for this happening (A lot of production debugging to find out why, how and whether my stream had collapsed turned up the very useful 'watchTermination' method which allows you to watch and log the exception on stream collapses.

Final code looked a little like this:
   protected SourceQueueWithComplete<Pair<Request, ResponsePromise<Request, Response>>> createStream(
            int bufferSize,
            int actionsPerSecond,
            Flow<Pair<Request, ResponsePromise<Request, Response>>, Pair<CompletionStage<Response>, ResponsePromise<Request, Response>>, NotUsed> flow
    ) {

        return Source
                .<Pair<Request, ResponsePromise<Request, Response>>>queue(bufferSize, OverflowStrategy.dropNew())
                .via(flow)
                .withAttributes(ActorAttributes.withSupervisionStrategy(Supervision.getResumingDecider()))
                .map(this::tryResponseFulfilPromise)
                .throttle(actionsPerSecond, Duration.apply(1, TimeUnit.SECONDS), actionsPerSecond, ThrottleMode.shaping())
                .watchTermination(watchTerminationAndLog(this.getClass()))
                .to(Sink.ignore())
                .run(materializer);
    }

    private CompletionStage<Done> tryResponseFulfilPromise(Pair<CompletionStage<Response>, ResponsePromise<Request, Response>> in) {
        try {
            return in
                    .first()
                    .thenApplyAsync(res -> {
                        fulfilPromise(in.second(), res);
                        return Done.getInstance();
                    })
                    .exceptionally(e -> {
                        fulfilPromise(in.second(), e);
                        return Done.getInstance();
                    });
        } catch (Exception e) {
            fulfilPromise(in.second(), e);
        }
        return CompletableFuture.completedFuture(Done.getInstance());
    }

Finally - I use a per-request created stream to add to the queue, collect the response promises and mapAsync the response promises into their relevant output. This leaves me with the method signature I required for the Journal.

Would I recommend others try this approach? Definitely not.

I originally took on this task as we use Dynamo for database access, and it throws exceptions if you go beyond your set capacity. During Cluster startup we would frequently hammer the database and immediately go over whatever limit we set. (Normal DB usage is 5 reads/5 writes per second, but cluster startup easily surpassed 1500 reads per second). What I wanted to do was in essence slow down cluster shard startup.
However - shortly after switching all my reads and writes to the database to use Queue like this I realised that a surprising number of reads are performed during the write phase and that it became increasingly difficult to maintain such complex and generalised code. Perhaps the Scala equivalent syntax is more manageable but I was very tempted to go back to allowing the failures on the DB to occur during startup and try to put in something more basic in terms of custom throttling.

Thanks kindly,
Daniel Stoner


On Thursday, 14 July 2016 15:13:57 UTC+1, Daniel Stoner wrote:
Reply all
Reply to author
Forward
0 new messages