val queue = Source.queue(bufferSize, overflowStrategy)
.filter(!_.raining)
.runForeach(println)
queue.offer(Weather("02139", 32.0, true))
CompletionStage<Done> clearlyNotAQueue =
Source.<Integer>queue(5, OverflowStrategy.dropHead())
.filter(msg -> msg.equals(5))
.runForeach(System.out::println, materializer)
Notice: This email is confidential and may contain copyright material of members of the Ocado Group. Opinions and views expressed in this message may not necessarily reflect the opinions and views of the members of the Ocado Group.
If you are not the intended recipient, please notify us immediately and delete all copies of this message. Please note that it is your responsibility to scan this message for viruses.
Fetch and Sizzle are trading names of Speciality Stores Limited, a member of the Ocado Group.
References to the “Ocado Group” are to Ocado Group plc (registered in England and Wales with number 7098618) and its subsidiary undertakings (as that expression is defined in the Companies Act 2006) from time to time. The registered office of Ocado Group plc is Titan Court, 3 Bishops Square, Hatfield Business Park, Hatfield, Herts. AL10 9NE.
Source queueSource = Source.<String>queue(100, OverflowStrategy.backpressure()).mapMaterializedValue(mat -> {
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
CompletionStage<QueueOfferResult> offerResult = mat.offer(message);
offerResult.whenComplete((result, e) -> {
try {
if (result == QueueOfferResult.enqueued()) {
channel.basicAck(envelope.getDeliveryTag(), false);
} else {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
} catch(Exception ex) {
throw new RuntimeException(e);
}
});
}
};
channel.basicConsume("queuename", consumer);
return mat;
});
Recently I spotted a great example of how to use the Source.queue feature in Streams to pre-materialise a flow and then pass events into it independently.The examples utilising Actors were tempting but would over-complicate my use case - which is to throttle writes to a database in a custom persistence Journal implementation.Using Source.queue for the life of me I cannot work out how to get the SourceQueue from which to then 'offer' out of this flow within the Java DSL.Scala example was:val queue = Source.queue(bufferSize, overflowStrategy) .filter(!_.raining) .runForeach(println) queue.offer(Weather("02139", 32.0, true))
My Java line for line conversion is:CompletionStage<Done> clearlyNotAQueue =
Source.<Integer>queue(5, OverflowStrategy.dropHead())
.filter(msg -> msg.equals(5))
.runForeach(System.out::println, materializer)However this returns a CompletionStage<Done> representing the runForEach's completion without result.A pointer in the right direction would be hugely appreciated.
SourceQueue<Integer> queue = Sink.foreach (msg -> System.out::println).runWith (
SourceQueueWithComplete<Pair<Request, ResponsePromise<Request, Response>>> queue = Source .<Pair<Request, ResponsePromise<Request, Response>>>queue(bufferSize, OverflowStrategy.dropNew()) .via(flow).run(materializer)
protected SourceQueueWithComplete<Pair<Request, ResponsePromise<Request, Response>>> createStream( int bufferSize, int actionsPerSecond, Flow<Pair<Request, ResponsePromise<Request, Response>>, Pair<CompletionStage<Response>, ResponsePromise<Request, Response>>, NotUsed> flow ) {
return Source .<Pair<Request, ResponsePromise<Request, Response>>>queue(bufferSize, OverflowStrategy.dropNew()) .via(flow) .withAttributes(ActorAttributes.withSupervisionStrategy(Supervision.getResumingDecider())) .map(this::tryResponseFulfilPromise) .throttle(actionsPerSecond, Duration.apply(1, TimeUnit.SECONDS), actionsPerSecond, ThrottleMode.shaping()) .watchTermination(watchTerminationAndLog(this.getClass())) .to(Sink.ignore()) .run(materializer); }
private CompletionStage<Done> tryResponseFulfilPromise(Pair<CompletionStage<Response>, ResponsePromise<Request, Response>> in) { try { return in .first() .thenApplyAsync(res -> { fulfilPromise(in.second(), res); return Done.getInstance(); }) .exceptionally(e -> { fulfilPromise(in.second(), e); return Done.getInstance(); }); } catch (Exception e) { fulfilPromise(in.second(), e); } return CompletableFuture.completedFuture(Done.getInstance()); }