How to "Vertxify" a function that consume messages from sqs

545 views
Skip to first unread message

id...@bluerbn.com

unread,
Dec 7, 2017, 7:24:19 AM12/7/17
to vert.x
I have the following function that consume messages from sqs:

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(address);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();

for (Message message : messages) {
log.debug("Receive event: address {} MessageId {} Body {}",
address, message.getMessageId(), message.getBody());

for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}


We miss two things:
1. We need to wrap this somehow within infinite loop to pull messages.
2. After consuming the message I want it to be offload to the main thread loop to enjoy non-blocking flow.

Any ideas?

Thank you.

Julien Viet

unread,
Dec 7, 2017, 8:24:47 AM12/7/17
to vert.x
I think you can use the event bus and make an SQS bridge that publishes the messages on the bus.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/66142a3e-d249-4359-a3b0-b1e979cb464a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

id...@bluerbn.com

unread,
Dec 7, 2017, 8:40:57 AM12/7/17
to vert.x
One of the reasons we ditched vert.x eventbus bridge was that we wanted our messages to be persisten. therefor we went to SQS. if go back and wrap SQS with eventbus I dont get the advantage of SQS as messages can still be queue on the eventbus part in cases my consumer has lags.

isnt other way to consume the message without using the eventbus and just offload it to the eventloop thread?

Alexander Lehmann

unread,
Dec 7, 2017, 8:45:01 AM12/7/17
to vert.x
I think you can call vertx code from another thread with runOnContext, this will execute your vert.x in the event loop and you can run your blocking code in a thread that is not associated with vertx

id...@bluerbn.com

unread,
Dec 7, 2017, 9:00:17 AM12/7/17
to vert.x
Could you show me snippet using my code? 
would be highly appreciated.
thank you

id...@bluerbn.com

unread,
Dec 7, 2017, 12:33:56 PM12/7/17
to vert.x
Iam sure someone did it with other external queue. there is no idea to take messages from external queue and use eventbus to forward them inside vertx as the main point was to persistent them outside. putting eventbus wrapping the external queue consumer will get me to the first point.

I need to understand also how can I loop inside vertx around that consumer as the messages being retrieved by polling and not being push async.

id...@bluerbn.com

unread,
Dec 7, 2017, 12:46:16 PM12/7/17
to vert.x
Ok I understood how to get the message and execute it on the theadloop context using:
final Context currentContext = vertx.getOrCreateContext();
currentContext.runOnContext(v -> {..});

I only need to understand how can I create a finite loop on a worker thread that will keep continue and execute the SQS polling method without heart anyone 

Alexander Lehmann

unread,
Dec 8, 2017, 7:09:36 AM12/8/17
to vert.x
worker threads in Vertx are not intended for operations that run indefinitely, I would use a normal jvm thread Runnable for that and start it before the vert.x stuff is initialized in the main method.

jklingsporn

unread,
Dec 8, 2017, 9:58:43 AM12/8/17
to vert.x
You could do vertx.setPeriodic(delayInMilliseconds,handler->{/*poll messages from the queue*/})

id...@bluerbn.com

unread,
Dec 8, 2017, 10:50:06 AM12/8/17
to vert.x
but then I might "lose" some critical time or invoke lots of periodic triggers as prev triggers were busy while consumed messages

id...@bluerbn.com

unread,
Dec 8, 2017, 10:50:43 AM12/8/17
to vert.x
Normal jvm thread? using java ExecutorService wont do the job?

id...@bluerbn.com

unread,
Dec 8, 2017, 11:05:13 AM12/8/17
to vert.x
what about using   WorkerExecutor executor = vertx.createSharedWorkerExecutor("event-consumer-loop");
it wont do the trick?

Alexander Lehmann

unread,
Dec 8, 2017, 3:03:50 PM12/8/17
to vert.x
A shared WorkerExecutor still has a maximum running time for the blocking operation, if the polling operation is blocking for an unknown time, that is not really the right thing for a polling loop.

You can probably use any ExecutorService that allows for blocking threads, you do not have to use a normal thread.

Idan Fridman

unread,
Dec 8, 2017, 3:28:47 PM12/8/17
to ve...@googlegroups.com
I hear you. Ok ill try use it. But having an infinite  loop inside vertx container using executor service wont affect it in some way?

Alexander Lehmann

unread,
Dec 9, 2017, 7:48:54 AM12/9/17
to vert.x
I am not really sure if this affects the thread, if it has blocking detection, it will print the warning occasionally, if not the thread will be stopped when the executor is stopped I guess.

Idan Fridman

unread,
Dec 12, 2017, 4:10:39 AM12/12/17
to ve...@googlegroups.com
What is your opinion about resources consuming?
Putting an infinite loop behind Thread.. it could kill the cpu

Alexander Lehmann

unread,
Dec 12, 2017, 7:53:16 AM12/12/17
to vert.x
I assume that the call to sqs.receiveMessage() blocks until a message is available, so it will not use a cpu most of the time.

the infinite loop will take a few messages when they are available and hand them off to the vertx and continue with the next receive operation
Reply all
Reply to author
Forward
0 new messages