Blocking event loop during database operation

376 views
Skip to first unread message

E. Ulrich Kriegel

unread,
Jan 30, 2017, 6:08:04 AM1/30/17
to vert.x
Hi there, 
we have to read a bunch of record from an input database, process these records and write the results to an output database, which is empty and has no indices at the moment of writing. The reactive paradigm is used.
However, for the processing, java crypto libraries are used, i.e. the event loop could be blocked. Therefore, in our first attempt, these operations has been enclosed in a vertx.executeBlockingObservable - but we got a lot of event-loop-blocked warnings. After removing of the vertx.executeBlockingObservable, the number of warnings in a small use case with 20000 input records was reduced. We tried using  onBackPressureBuffer - without success. The only solution we found is to place a delay of 10 ms in the pipe. With that we were able to process 2million records without getting a event-loop-blocking warning.

Is there a adequate solution - placing a delay is IMHO a little bit susceptive.
Here comes our code
JDBCClient inJdbcClient = JDBCClient.createNonShared(vertx, inDbConfig);
// abstract from database specific peculiarities
inDbHelper = new DatabaseHelper4Postgres();
// drop indices on optbut database
Observable.using(DisposableVertxSQLConnection::new, f -> f.create(jdbcClient), f -> f.dispose()).flatMap(conn-> outDbHelper.dropIndices(conn)) 
.flatMap(dummy ->Observable.using(DisposableVertxSQLConnection::new, f -> f.create(inJdbcClient), f -> f.dispose()))
// read records from input database
.flatMap(inConn-> inDbHelper.readDatabase(inConn, "inTable"))
.map(rs -> rs.getResults())
.delay(10, TimeUnit.MILLISECONDS)
.flatMapIterable(li -> li)
// transform input records
.flatMap(input -> inputToPreparedInsertQueryObservable(input))
// collect chunks to use batch datbase operations
.buffer(1000)
.flatMap(params -> {
Observable<Integer> resultObservable = Observable.using(DisposableVertxSQLConnection::new, f -> f.create(jdbcClient), f -> f.dispose())
.flatMap(conn-> {
Observable<List<Integer>> li =  outDbHelper.batchInsertInput(conn, (List<JsonArray>)(Object)params);
return li;}).flatMapIterable(i->i);
// merge batch output with input record id's to generate error messages
return Observable.zip(resultObservable, Observable.from(params), (first, second) -> Observable.just(new Tuple(first, ((JsonArray) second).getString(0)))).flatMap(x->x).collect(()->new ArrayList<Tuple>(), (data, item)-> data.add(item));
})
.map(li ->{
// count batch content
Observable<Integer> totalCountObservable = Observable.from(li).count(); 
// filter not written records
Observable<JsonArray> errorListObservable = Observable.from(li).filter(tuple -> tuple.getFirst() == 0).collect(()-> new JsonArray(), (data, item)->data.add(item.getSecond()));
return Observable.zip(totalCountObservable, errorListObservable, (first, second) -> {
int s = second.size();
if(s > 0){
logger.warn(String.format("Not inserted ids: %s", second.toString()));
}
return first - s;});
})
.flatMap(l -> l)
.collect(()-> { 
long [] counter = new long [1];
counter[0] = 0;
return counter;
}, (counter, item)->{
counter[0] += item; })
// finally create indices on output database
.map(counter ->{
Observable<SQLConnection> indexObservable = jdbcClient.getConnectionObservable()
.flatMap(conn-> outDbHelper.createIndices(conn));
return Observable.zip(Observable.just(counter[0]), indexObservable, (count, conn)->{conn.close(); return count;});
})
.flatMap(i -> i)
.subscribe(insertedRecords->{
Thanks in advance
--Ulrich

Thomas SEGISMONT

unread,
Jan 30, 2017, 12:42:07 PM1/30/17
to ve...@googlegroups.com
The delay operator, as you used it, operates on the computation scheduler. That's why you no longer see the warnings. You shouldn't block the computation scheduler threads either BTW.

Can you simplify your code sample to point to the part where you use blocking/heavy computational APIs? And your attempt to solve it with executeBlocking?

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/833c7afd-ade6-44d6-a6a5-8784724c6d72%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

E. Ulrich Kriegel

unread,
Jan 31, 2017, 3:46:54 AM1/31/17
to vert.x
Hi Thomes, here comes the code

                JDBCClient inJdbcClient = JDBCClient.createNonShared(vertx, inDbConfig);


                // abstract from database specific peculiarities


                inDbHelper =  new DatabaseHelper4Postgres(testFlag);


                // drop indices on optbut database


                Observable.using(DisposableVertxSQLConnection::new, f -> f.create(jdbcClient), f -> f.dispose())


                .flatMap(dummy ->Observable.using(DisposableVertxSQLConnection::new, f -> f.create(inJdbcClient), f -> f.dispose()))


                // read records from input database


                .flatMap(inConn-> inDbHelper.readPersonFromPlainDatabase(inConn, inDbConfig.getString("inTable")))


                .map(rs -> rs.getResults())


                .flatMapIterable(li -> li)


                // transform input records


                .flatMap(input -> vertx.executeBlockingObservable(_future ->{  //EXECUTEBLOCKING


                        _future.complete(inputPreparedInsertQuery(input));


                }))


                // collect chunks to use batch datbase operations


                .buffer(Constants.MAX_BATCH_SIZE)


                .flatMap(params ->


                        Observable.using(DisposableVertxSQLConnection::new, f -> f.create(jdbcClient), f -> f.dispose()).flatMap(conn-> outDbHelper.batchInsert(conn,  (List<JsonArray>(Object)params)))


                .map(li -> li.size())


                .collect(()-> {


                        long [] counter = new long [1];


                        counter[0] = 0;


                        return counter;


                }, (counter, item)->{


                        counter[0] += item; })


                .subscribe(insertedRecords->{
....



Thomas SEGISMONT

unread,
Feb 1, 2017, 4:23:54 PM2/1/17
to ve...@googlegroups.com
Your usage of executeBlocking looks, good to you have any stacktrace which could help to determine where the thread is blocked?

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Ulrich Kriegel

unread,
Feb 2, 2017, 8:28:55 AM2/2/17
to ve...@googlegroups.com
Hi Thomas,
Am 01.02.2017 um 22:23 schrieb Thomas SEGISMONT <tsegi...@gmail.com>:

Your usage of executeBlocking looks, good to you have any stacktrace which could help to determine where the thread is blocked?
How can i do that. Vertx logs only, that thread is blocked. Is ther a hook or something else i can use?

Regards

Ulrich Kriegel




Thomas SEGISMONT

unread,
Feb 2, 2017, 10:10:53 AM2/2/17
to ve...@googlegroups.com
After some time, Vert.x prints a stack trace. You can change this time to be much shorter than the default (5 seconds):

vertxOptions.setWarningExceptionTime(xxx)

The unit in nanoseconds.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Jochen Mader

unread,
Feb 2, 2017, 11:10:45 AM2/2/17
to ve...@googlegroups.com
Send a 'kill 3' to the process running your application. That will cause the JVM to emit a threaddump which should show you where the thread gets stuck.
If you have trouble reading them put them in a Gist and share the link with us.


For more options, visit https://groups.google.com/d/optout.



--
Jochen Mader | Lead IT Consultant

codecentric AG | Elsenheimerstr. 55a | 80687 München | Deutschland
tel: +49 89 215486633 | fax: +49 89 215486699 | mobil: +49 152 51862390
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de

Sitz der Gesellschaft: Düsseldorf | HRB 63043 | Amtsgericht Düsseldorf
Vorstand: Michael Hochgürtel . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

E. Ulrich Kriegel

unread,
Feb 3, 2017, 5:39:21 AM2/3/17
to vert.x
I first with 'kill -3' as proposed by Jochen, but i got 3 times different dumps. then i found out, that my logging configuration prevented me to get the stack traces.

Here they are

MLog initialization issue: slf4j found no binding or threatened to use its (dangerously silent) NOPLogger. We consider the slf4j library not found.

MLog clients using java 1.4+ standard logging. 

Initializing c3p0-0.9.5.2 [built 08-December-2015 22:06:04 -0800; debug? true; trace: 10] 

{"subject":"de.fraunhofer.fokus.adep.BatchMgmtVerticle","status":"success","action":"initialize"} 

Succeeded in deploying verticle 

Initializing c3p0 pool... com.mchange.v2.c3p0.ComboPooledDataSource [ acquireIncrement -> 3, acquireRetryAttempts -> 30, acquireRetryDelay -> 1000, autoCommitOnClose -> false, automaticTestTable -> null, breakAfterAcquireFailure -> false, checkoutTimeout -> 0, connectionCustomizerClassName -> null, connectionTesterClassName -> com.mchange.v2.c3p0.impl.DefaultConnectionTester, contextClassLoaderSource -> caller, dataSourceName -> 2xmt4d9mzc9er6ko2cn|310ecab0, debugUnreturnedConnectionStackTraces -> false, description -> null, driverClass -> org.postgresql.Driver, extensions -> {}, factoryClassLocation -> null, forceIgnoreUnresolvedTransactions -> false, forceSynchronousCheckins -> false, forceUseNamedDriverClass -> false, identityToken -> 2xmt4d9mzc9er6ko2cn|310ecab0, idleConnectionTestPeriod -> 0, initialPoolSize -> 3, jdbcUrl -> jdbc:postgresql://localhost:5432/adep-fr-new, maxAdministrativeTaskTime -> 0, maxConnectionAge -> 0, maxIdleTime -> 0, maxIdleTimeExcessConnections -> 0, maxPoolSize -> 1000, maxStatements -> 0, maxStatementsPerConnection -> 0, minPoolSize -> 3, numHelperThreads -> 3, preferredTestQuery -> null, privilegeSpawnedThreads -> false, properties -> {user=******, password=******}, propertyCycle -> 0, statementCacheNumDeferredCloseThreads -> 0, testConnectionOnCheckin -> false, testConnectionOnCheckout -> false, unreturnedConnectionTimeout -> 0, userOverrides -> {}, usesTraditionalReflectiveProxies -> false ] 

Initializing c3p0 pool... com.mchange.v2.c3p0.ComboPooledDataSource [ acquireIncrement -> 3, acquireRetryAttempts -> 30, acquireRetryDelay -> 1000, autoCommitOnClose -> false, automaticTestTable -> null, breakAfterAcquireFailure -> false, checkoutTimeout -> 0, connectionCustomizerClassName -> null, connectionTesterClassName -> com.mchange.v2.c3p0.impl.DefaultConnectionTester, contextClassLoaderSource -> caller, dataSourceName -> 2xmt4d9mzc9er6ko2cn|16e20167, debugUnreturnedConnectionStackTraces -> false, description -> null, driverClass -> org.postgresql.Driver, extensions -> {}, factoryClassLocation -> null, forceIgnoreUnresolvedTransactions -> false, forceSynchronousCheckins -> false, forceUseNamedDriverClass -> false, identityToken -> 2xmt4d9mzc9er6ko2cn|16e20167, idleConnectionTestPeriod -> 0, initialPoolSize -> 3, jdbcUrl -> jdbc:postgresql://localhost:5432/input-test-1m, maxAdministrativeTaskTime -> 0, maxConnectionAge -> 0, maxIdleTime -> 0, maxIdleTimeExcessConnections -> 0, maxPoolSize -> 15, maxStatements -> 0, maxStatementsPerConnection -> 0, minPoolSize -> 3, numHelperThreads -> 3, preferredTestQuery -> null, privilegeSpawnedThreads -> false, properties -> {user=******, password=******}, propertyCycle -> 0, statementCacheNumDeferredCloseThreads -> 0, testConnectionOnCheckin -> false, testConnectionOnCheckout -> false, unreturnedConnectionTimeout -> 0, userOverrides -> {}, usesTraditionalReflectiveProxies -> false ] 

Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 4047 ms, time limit is 2000 

Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 5049 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 6052 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 7056 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 8061 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 9060 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 10060 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 11060 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 12060 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:238)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 13060 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 14064 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 15065 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 16065 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 17065 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 18069 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 19070 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 20074 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 21077 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 22079 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 23079 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 24083 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 25084 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 26084 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 27084 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 28084 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 29088 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 30089 ms, time limit is 2000 

io.vertx.core.VertxException: Thread blocked

at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:249)

at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:236)

at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onNext(OnSubscribeFlattenIterable.java:128)

at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)

at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)

at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)

at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)

at io.vertx.rx.java.SingleOnSubscribeAdapter.fireNext(SingleOnSubscribeAdapter.java:53)

at io.vertx.rx.java.ObservableFuture$1.dispatch(ObservableFuture.java:58)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:32)

at io.vertx.rx.java.ObservableFuture$HandlerAdapter.handle(ObservableFuture.java:12)

at io.vertx.core.impl.FutureImpl.checkCallHandler(FutureImpl.java:158)

at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:100)

at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:271)

at io.vertx.core.impl.ContextImpl$$Lambda$38/1095017261.handle(Unknown Source)

at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:316)

at io.vertx.core.impl.ContextImpl$$Lambda$25/1307904972.run(Unknown Source)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:440)

at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)

at java.lang.Thread.run(Thread.java:745)


Ulrich

Thomas SEGISMONT

unread,
Feb 3, 2017, 6:15:18 AM2/3/17
to ve...@googlegroups.com
Oh no. Please use the setting I pointed out. kill -3 outputs traces for all threads. How do I know which one is the culprit? How do I know if the EL was blocked at this time?

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

E. Ulrich Kriegel

unread,
Feb 3, 2017, 6:31:53 AM2/3/17
to vert.x
Hi Thomas,
it's for sure that the problem lies between my ears, but i couldn't get the VertxOptions be active. My application is started with vertx run and default launcher in Eclipse. I placed first in my start method

VertxOptions options = new VertxOptions().setWarningExceptionTime(1000000);

Vertx.vertx(options);


but with no success.

As a second try i used a separat vehicle to start my verticle but again i didn't get a stack trace

        public void start() throws Exception{


                VertxOptions vertxOptions = new VertxOptions().setWarningExceptionTime(10);


                Vertx vertx = Vertx.vertx(vertxOptions);


                vertx.deployVerticle("foo.baz.foobaz.Verticle", new DeploymentOptions().setConfig(config()), res->{


                        ...

Thomas SEGISMONT

unread,
Feb 3, 2017, 6:35:47 AM2/3/17
to ve...@googlegroups.com
You warning exception time is 1000000 = 1ms. I said earlier in this thread the unit is nanoseconds.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Jochen Mader

unread,
Feb 3, 2017, 6:52:14 AM2/3/17
to ve...@googlegroups.com
And pleeeeeeasssee post the dumps in a Gist and DON'T paste them in an email.



For more options, visit https://groups.google.com/d/optout.



--
Jochen Mader | Lead IT Consultant

codecentric AG | Elsenheimerstr. 55a | 80687 München | Deutschland
tel: +49 89 215486633 | fax: +49 89 215486699 | mobil: +49 152 51862390
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de

E. Ulrich Kriegel

unread,
Feb 3, 2017, 7:12:20 AM2/3/17
to vert.x
He dump  is available from https://gist.github.com/ukr15/7e5606efe2bf7efcdd1f7dd3d8a98940,
but it's the same as send before. It automatically appears after a few seconds.

Thomas SEGISMONT

unread,
Feb 3, 2017, 11:40:58 AM2/3/17
to ve...@googlegroups.com
Thanks. Looking at the stack trace and your code, the best option would be for you to put together a small reproducer on GitHub.

2017-02-03 13:12 GMT+01:00 E. Ulrich Kriegel <ulrich....@gmail.com>:
He dump  is available from https://gist.github.com/ukr15/7e5606efe2bf7efcdd1f7dd3d8a98940,
but it's the same as send before. It automatically appears after a few seconds.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

E. Ulrich Kriegel

unread,
Feb 6, 2017, 10:22:50 AM2/6/17
to vert.x
There is a 'simple reproducer': in order to get rid of the database, i had to introduce some simplifications
- a range instead of reading from an initial database
- no write to the output database.
However, the same position in the test code is affected.

The example can be found on GitHub https://github.com/ukr15/blocking
Build the example with
mvn clean install

Start it with
java -jar target/test-1.0-fat.jar  -Dvertx.options.warningExceptionTime=1000000000

It is an eclipse project.

Jez P

unread,
Feb 6, 2017, 10:34:37 AM2/6/17
to vert.x
I think you need to push it to github. At present there's only a readme in there.

Ulrich Kriegel

unread,
Feb 6, 2017, 10:37:06 AM2/6/17
to ve...@googlegroups.com
Now it’s there, it’s only one file

--
You received this message because you are subscribed to a topic in the Google Groups "vert.x" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/vertx/QbZ31cE6ZOk/unsubscribe.
To unsubscribe from this group and all its topics, send an email to vertx+un...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Ulrich Kriegel




javadevmtl

unread,
Feb 6, 2017, 5:15:45 PM2/6/17
to vert.x
I had similar issue try to wrap the full observalble around ezecute blocking and not each individual "encrypt"

I noticed if you get a "large" resultset even if you loop through it with an observable you can still get thread blocked. So i wrapped the whole observable in execute blocking...

javadevmtl

unread,
Feb 6, 2017, 5:20:32 PM2/6/17
to vert.x
Before I was using just observable and if the there was to many results. I would get block thread warning. So I wrapped everything it it never complained again.

http://pastebin.com/5DtCDAww

Thomas SEGISMONT

unread,
Feb 7, 2017, 5:16:12 AM2/7/17
to ve...@googlegroups.com
I had a look at your code. Here are a few comments.

1/ executeBlockingObservable should be used with ordered = false

While this is not the cause of the warning, it will help to process blocking tasks concurrently

      .flatMap(person -> vertx.executeBlockingObservable(_future -> {
        _future.complete(test1(person));
      }, false))

2/ You should use executeBlockingObservable for calling test2 as well

It seems to me you're trying to simulate a long running method with the for loop

So your resultObservable should be created as follows

        Observable<Integer> resultObservable = vertx.<List<Integer>>executeBlockingObservable(fut -> {
          fut.complete(test2());
        }, false).flatMap(Observable::from);



3/ You should subscribe on a blocking scheduler

This is the real issue.

RxJava does not make things automatically asynchronous. So when you subscribe to an observable, the current thread is going to emit the items, until the chain leads to an observable operating on a different scheduler.
In your example, the current thread is the event loop thread. So it will emit the 20000 items of the range, and will not start to handle the results of executeBlocking until all the 20000 items are emitted...

So, before calling subscribe, add

      .subscribeOn(RxHelper.blockingScheduler(vertx))

Note that this doesn't mean that the subscribe callbacks will be executed on a worker thread. In fact, they will be executed on the event loop thread. Why? Because the executeBlocingObservable operates on the current context (an event loop thread in a standard verticle).





2017-02-06 16:36 GMT+01:00 Ulrich Kriegel <ulrich....@gmail.com>:
Now it’s there, it’s only one file
Am 06.02.2017 um 16:22 schrieb E. Ulrich Kriegel <ulrich....@gmail.com>:

There is a 'simple reproducer': in order to get rid of the database, i had to introduce some simplifications
- a range instead of reading from an initial database
- no write to the output database.
However, the same position in the test code is affected.

The example can be found on GitHub https://github.com/ukr15/blocking
Build the example with
mvn clean install

Start it with
java -jar target/test-1.0-fat.jar  -Dvertx.options.warningExceptionTime=1000000000

It is an eclipse project.


--
You received this message because you are subscribed to a topic in the Google Groups "vert.x" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/vertx/QbZ31cE6ZOk/unsubscribe.
To unsubscribe from this group and all its topics, send an email to vertx+unsubscribe@googlegroups.com.

Ulrich Kriegel




--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.

E. Ulrich Kriegel

unread,
Feb 8, 2017, 4:23:33 AM2/8/17
to vert.x
Hi and thanks for the advice.
 
The code was changed as proposed  and works fine for Observable.range(0, 20000). However, for Observable.range(0, 200000)  io.vertx.core.VertxException: Thread blocked reappears.
The real code was changed accordingly, but it even issues the Thread blocked warning for 20000 items.


Asher Tarnopolski

unread,
Feb 9, 2017, 11:12:41 AM2/9/17
to vert.x
thomas, 

does 3/ means that it's not enough to use something like:

public <T> Observable<T> executeBlocking(Observable<T> blockingObservable) {
   
return Vertx.currentContext().executeBlockingObservable(f -> blockingObservable.subscribe(f::complete, f::fail), false);
}

but rather below impl will be ok?

protected <T> Observable<T> executeBlocking(Observable<T> blockingObservable) {
   
return Vertx.currentContext().executeBlockingObservable(f -> blockingObservable.subscribeOn(RxHelper.blockingScheduler(vertx)).subscribe(f::complete, f::fail), false);
}

thanks!

Thomas SEGISMONT

unread,
Feb 9, 2017, 11:46:06 AM2/9/17
to ve...@googlegroups.com
I'm afraid there's no one universal answer... You need to know on which scheduler your Observable operates (this will tell you on which thread the subscriber callbacks are exectured) and if subscription may be an expensive operation (in which case you will subscribe on a blocking scheduler).

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
Reply all
Reply to author
Forward
0 new messages