"Blocking operation on a IO thread" problem in 0.23 - how to use kafka messaging in Quarkus properly nowadays?

1,986 views
Skip to first unread message

Sergey Bezrukov

unread,
Oct 6, 2019, 4:44:12 PM10/6/19
to Quarkus Development mailing list
Hi,

our typical scenario is to read messages from Kafka, process it synchronously (it usually means we commit some data to database), then manually commit offset, so we can be sure that message was processed and we don't lost any data, if offest was commited, like this:


@Inject
SomeService someService; // has transactional method and EntityManager injected

@Incoming("some-topic")
processMessage
(ReceivedKafkaMessage<String, byte[]> input){

someService
.processData(input.getPayload); // process and save data to DB

input
.ack();


}


But, after ugrading Quarkus to 0.23 we found that
io.quarkus.hibernate.orm.runtime.entitymanager.TransactionScopedEntityManager now contains checkBlocking() method, which leads to following exception in our code:


You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.: java.lang.IllegalStateException: You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.



I also found that algorithm, which detects is current thread is IOThread or not, was changed in master after 0.23, so, basically I have 2 questions:

1. Is thread, which handle @Incoming in our code, will still detected as "IOThread" after alorithm change will be released?
2. If so, how should we handle messages from Kafka, if we really need "synchronous" processing here, from message receiving till acknowledgment via JPA operations?

Thanks
--
Sergey

clement escoffier

unread,
Oct 7, 2019, 3:09:12 AM10/7/19
to sergey....@gmail.com, Quarkus Development mailing list
Hello,


To answer your questions:

1. - yes, the Kafka connector uses the same event loops as the HTTP layer, so they are IO Thread
2. - for the time being (because I’m working on a support of “blocking” in reactive messaging itself) you need to delegate to another thread, most probably to a single thread executor so you preserve the ordering. What happens in the executor can be synchronous and blocking.

Clement

--
You received this message because you are subscribed to the Google Groups "Quarkus Development mailing list" group.
To unsubscribe from this group and stop receiving emails from it, send an email to quarkus-dev...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/quarkus-dev/a437ca9b-a37c-43e9-9608-94b38860b604%40googlegroups.com.

Sergey Bezrukov

unread,
Oct 7, 2019, 3:20:13 AM10/7/19
to clement escoffier, Quarkus Development mailing list
Ok, thank you for the explanation.

--
Sergey Bezrukov
Cell: +7(903)790-93-34
Skype: sergey.bezrukov

Luca Masini

unread,
Oct 7, 2019, 3:33:34 AM10/7/19
to sergey....@gmail.com, clement escoffier, Quarkus Development mailing list
Well I think we have a problem in streams, in my Incoming:

@Incoming("ordini")
public void updateDevices(String deviceKeyJson) {
}
I can notice this thread running the incoming message:

2019-10-07 09:05:20,375 INFO [xxxxxx.StreamResource] (vert.x-eventloop-thread-1) ********************************** SSE sent to all deviceKey: {"property1":"999910763C000003071019090519","property1":486,"property1":"D","property1":"MONITOR_VASSOIO"}

Why don't the framework take care of this ?



--
****************************************
http://www.lucamasini.net
http://twitter.com/lmasini
http://www.linkedin.com/pub/luca-masini/7/10/2b9
****************************************

Stuart Douglas

unread,
Oct 8, 2019, 11:30:34 PM10/8/19
to Luca Masini, sergey....@gmail.com, clement escoffier, Quarkus Development mailing list
I think we need some way for reactive messaging to easily delegate to a worker thread, with both a global switch and a method level annotation to control which thread it runs it.

Running directly in the IO thread gives maximum performance, but it does not help much if you want to do blocking operations such as using JPA to write to the DB. We should make it easy to select the appropriate model.



Stephane Epardaud

unread,
Oct 9, 2019, 3:51:08 AM10/9/19
to Stuart Douglas, Luca Masini, sergey....@gmail.com, clement escoffier, Quarkus Development mailing list
Agreed.



--
Stéphane Épardaud

Ken Finnigan

unread,
Oct 9, 2019, 8:21:14 AM10/9/19
to stephane...@gmail.com, Stuart Douglas, Luca Masini, sergey....@gmail.com, clement escoffier, Quarkus Development mailing list
I think Clement, and the spec, had been discussing something like @Blocking to indicate that the operation is a blocking one

David Lloyd

unread,
Oct 9, 2019, 9:09:44 AM10/9/19
to k...@kenfinnigan.me, Stephane Epardaud, Stuart Douglas, Luca Masini, sergey....@gmail.com, clement escoffier, Quarkus Development mailing list
Hopefully we could also infer whether an operation is blocking based
on what it injects/uses? Which implies that something like a
`@Blocking` would need a boolean parameter to allow overriding the
heuristic.
> To view this discussion on the web visit https://groups.google.com/d/msgid/quarkus-dev/CAKeeVe7C4oh%2BQWC_RcQkdjOPFhnhjSzxRGsO-MLABn%2Br1sO7JA%40mail.gmail.com.



--
- DML

Bruno Baptista

unread,
Jun 8, 2020, 12:57:57 PM6/8/20
to clement....@gmail.com, sergey....@gmail.com, Quarkus Development mailing list

Hi Clement,

Is the " Blocking" operation now supported?

Where can I track that work?

Cheers

Ibrahim abed al aziz

unread,
Jun 8, 2020, 1:25:23 PM6/8/20
to brun...@gmail.com, clement escoffier, sergey....@gmail.com, Quarkus Development mailing list
It is supported in Quarkus 1.5. You need to add @Blocking annotation to your method. 

clement escoffier

unread,
Jun 8, 2020, 2:32:44 PM6/8/20
to Ibrahim abed al aziz, brun...@gmail.com, sergey....@gmail.com, Quarkus Development mailing list

Željko Trogrlić

unread,
Jun 9, 2020, 3:49:25 AM6/9/20
to Quarkus Development mailing list
Note that if you want to use Message, like in the original example, using @Blocking is not that straightforward: https://github.com/smallrye/smallrye-reactive-messaging/issues/588


On Monday, June 8, 2020 at 8:32:44 PM UTC+2, clement escoffier wrote:
Clement

To unsubscribe from this group and stop receiving emails from it, send an email to quark...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Quarkus Development mailing list" group.
To unsubscribe from this group and stop receiving emails from it, send an email to quark...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Quarkus Development mailing list" group.
To unsubscribe from this group and stop receiving emails from it, send an email to quark...@googlegroups.com.

Bruno Baptista

unread,
Jun 9, 2020, 10:55:17 AM6/9/20
to clement escoffier, Ibrahim abed al aziz, sergey....@gmail.com, Quarkus Development mailing list

Thanks very much Clement!

The config is a bit thin. What will be the default config when you use the stock @Blocking?

How many concurrent requests will handled simultaneously, by default?

Cheers

clement escoffier

unread,
Jun 9, 2020, 1:26:28 PM6/9/20
to Bruno Baptista, Ibrahim abed al aziz, sergey....@gmail.com, Quarkus Development mailing list
Hello,

The default pool, in Quarkus, it can handle 20 tasks simultaneously. 

Clement

Emmanuel Bernard

unread,
Jun 10, 2020, 8:16:35 AM6/10/20
to clement escoffier, Bruno Baptista, Ibrahim abed al aziz, sergey....@gmail.com, Quarkus Development mailing list
I think Stuart discussed moving it up to 200  (with scale down capability), as 20 was a bit narrow in some situations.

clement escoffier

unread,
Jun 10, 2020, 8:24:43 AM6/10/20
to Emmanuel Bernard, Bruno Baptista, Ibrahim abed al aziz, sergey....@gmail.com, Quarkus Development mailing list
Hello,

I still see this:

/**
* The size of the worker thread pool.
*/
@ConfigItem(defaultValue = "20")
public int workerPoolSize;
But, it can be configured. 
Clement

Paul Carter-Brown

unread,
Jun 10, 2020, 8:41:17 AM6/10/20
to clement....@gmail.com, Emmanuel Bernard, Bruno Baptista, Ibrahim abed al aziz, sergey....@gmail.com, Quarkus Development mailing list
The change to a default of 200 was done on the executor pool not the worker pool. The executor pool is the one that vertx eventloop threads hands work over to for processing incoming server requests. IIRC the worker pool is for async, scheduled and other background tasks.
 
Paul 



Stuart Douglas

unread,
Jun 10, 2020, 7:43:22 PM6/10/20
to clement escoffier, Emmanuel Bernard, Bruno Baptista, Ibrahim abed al aziz, sergey....@gmail.com, Quarkus Development mailing list
This is still a problem that we need to resolve, in that we have not yet merged our blocking executor (the main one provided by ExecutorRecorder), with the vert.x blocking task pool.

Almost everything uses our main one, however some vert.x stuff might use the vert.x blocking one. At the moment this is hard coded in Vert.x, so we can't really change it, so we need to discuss with the vert.x team how to go about this.

Stuart

clement escoffier

unread,
Jun 11, 2020, 2:05:32 AM6/11/20
to Stuart Douglas, Emmanuel Bernard, Bruno Baptista, Ibrahim abed al aziz, sergey....@gmail.com, Quarkus Development mailing list
I agree. I need to check if it's still on the Vert.x roadmap. It may have been removed.

Clement
Reply all
Reply to author
Forward
0 new messages