Best way to real large database line by line in Vertx

757 views
Skip to first unread message

E. Ulrich Kriegel

unread,
Apr 20, 2017, 9:55:14 AM4/20/17
to vert.x
Hi there,
we have to read a larger database with ~13M entries line by line to construct a redis cache. Since my clients use both, Oracle and Postgres databases, the new async driver cannot be used without separating the code.
We tried the following in the start method of a verticle

...


int [] count = {0};

JsonObject dbConfig = new JsonObject()

.put("driver_class", "org.postgresql.Driver")

.put("password", "test")

.put( "url", "jdbc:postgresql://localhost:5432/test")

.put("user", "test");

JDBCClient.createShared(vertx, dbConfig, "test").rxGetConnection()

.flatMap(conn->conn.rxQueryStream("SELECT * FROM table"))

.map(rs->rs.handler(row->count[0]+= 1).endHandler(v ->System.out.println(count[0])))

.subscribe();

}


Works fine for small result sets, e.g. if a LIMIT is set in the query. However, trying to read the full database results in ThreadBlockedException.
Whats wrong with my code?

Any help is welcome

Message has been deleted

Paulo Lopes

unread,
Apr 21, 2017, 9:28:42 AM4/21/17
to vert.x
Can you provide a stacktrace so we can see where is it blocking?

Streaming should be your best approach since it really streams the result set to your client, while the resultset approach first tries to inflate the whole data in memory as a json object.

E. Ulrich Kriegel

unread,
Apr 21, 2017, 9:43:28 AM4/21/17
to vert.x
Lessons learned

With a code modification as given below, the ThreadBlockingException could be postponed up to a result set size of 9M entries

int limit = 1000000;
JsonObject
dbConfig = new JsonObject().put("driver_class", "org.postgresql.Driver").put("password", "test")

                               .put("url", "jdbc:postgresql://localhost:5432/test").put("user", "test");

               Single<SQLConnection> connSingle = JDBCClient.createShared(vertx, dbConfig, "Pool_1").rxGetConnection();

               vertx.executeBlocking(future -> {

                       int[] count = { 0 };

                       long t0 = System.currentTimeMillis();  

                                        connSingle.flatMap(conn -> vertx.<SQLRowStream>rxExecuteBlocking(fut->{

                                               conn.rxQueryStream(String.format("SELECT * FROM person LIMIT %s", limit))

                                                       .subscribeOn(RxHelper.blockingScheduler(vertx))

                                                       .subscribe(y->fut.complete(y));

                                               }, false))

                                       .map(rs -> rs.handler(row -> {

                                               if (count[0] == 0) {

                                                       System.out.println(String.format("limit %s time: %s",limit, (System.currentTimeMillis() - t0)));

                                               }

                                               count[0] += 1;

                                               if (count[0] % 100000 == 0) {

                                                       System.out.println(count[0]);

                                                       System.out.flush();

                                               }


                                        }).endHandler(v -> {

                                               System.out.println(count[0]);

                                               future.complete();

                                       })).subscribe(res -> {

                                       }, t -> {

                                               System.out.println("T: " + t);

                                               future.fail(t);

                                       });

               }, false,  res -> {

                       if (res.failed()) {

                               System.out.println("RES: " + res.cause());

                       } else {

                               System.out.println("finish");

                       }

               });

The following diagrams shows the time (in sec) before the stream starts emitting rows vs the size of the fetched result set.


As the next attempt i tried the new async postgres driver, however it throws a StackOverflowError after 4000 rows fetched.

Paulo Lopes

unread,
Apr 21, 2017, 1:12:34 PM4/21/17
to vert.x
The asynchronous driver always fetches the whole response in memory and this is a driver limitation since there's no way to get the low level cursor.

E. Ulrich Kriegel

unread,
Jun 26, 2017, 10:33:29 AM6/26/17
to vert.x
Hi Paolo,


Am Freitag, 21. April 2017 19:12:34 UTC+2 schrieb Paulo Lopes:
The asynchronous driver always fetches the whole response in memory and this is a driver limitation since there's no way to get the low level cursor.


now i have time to come back to this topic.
Would the 'old' sync driver allow to fetch larger result > 9M entries sets as stream? 


Paulo Lopes

unread,
Jun 26, 2017, 2:51:18 PM6/26/17
to vert.x
Yes, and it batches the reads so it won't block the event loop.

ad...@cs.miami.edu

unread,
Jun 26, 2017, 9:26:29 PM6/26/17
to vert.x
while I might be stating the obvious, have you tried using a "paging type query" where you do a number of smaller queries with limit/offset until the entire database is covered?  This will keep the client memory down and works pretty well.

-Adam

Paulo Lopes

unread,
Jun 27, 2017, 6:02:52 AM6/27/17
to vert.x


On Tuesday, June 27, 2017 at 3:26:29 AM UTC+2, ad...@cs.miami.edu wrote:
while I might be stating the obvious, have you tried using a "paging type query" where you do a number of smaller queries with limit/offset until the entire database is covered?  This will keep the client memory down and works pretty well.

It depends on the isolation level you're interested, by issuing several queries and if there are inserts or updates or deletes happen in between the paging mght be inconsistent...
 

ad...@cs.miami.edu

unread,
Jun 27, 2017, 1:22:29 PM6/27/17
to vert.x
>> It depends on the isolation level you're interested, by issuing several queries and if there are inserts or updates or deletes happen in between the paging mght be inconsistent...
 
That is true.  But even a single query can lead to problems.  If a single select * gets all the data.  There will still be a delay from the time the data is retrieved and when all that data gets into redis.

-Adam
Reply all
Reply to author
Forward
0 new messages