Example usage for transactionAsync/transactionResultAsync

1,415 views
Skip to first unread message

ni...@auptix.com

unread,
May 26, 2016, 1:35:45 AM5/26/16
to jOOQ User Group
Neither in the jOOQ 3.8 manual, nor anywhere online, can I find examples of how to use the new transactionAsync() and transactionResultAsync() methods.

I have an existing method that I'd like to transition to the async paradigm. It goes something like this:

Mutator<User> mutator = u -> { ... };        
User updatedUser = DSL.using(configuration).transactionResult(transactionConfiguration -> {
    UserRecord userRecord = DSL.using(transactionConfiguration)
            .selectFrom(USER)
            .where(USER.USER_ID.equal(userId))
            .forUpdate()
            .fetchOne();

    if (userRecord == null) {
        throw new ObjectNotFoundException("User(userId = %d) not found", userId);
    }
    else {
        User user = new User(userRecord);
        mutator.mutate(user);
        userRecord.store();
        return user;
    }
});

fetchOne(), store(), and transactionResult() are all blocking calls. The first obvious step is to use the new transactionResultAsync() method, which looks like this:

CompletionStage<User> updatedUserStage
        = DSL.using(configuration).transactionResultAsync(transactionConfiguration -> {

    UserRecord userRecord = DSL.using(transactionConfiguration)
            .selectFrom(USER)
            .where(USER.USER_ID.equal(userId))
            .forUpdate()
            .fetchOne();

    if (userRecord == null) {
        throw new ObjectNotFoundException("User(userId = %d) not found", userId);
    }
    else {
        User user = new User(userRecord);
        mutator.mutate(user);
        userRecord.store();
        return user;
    }
});

This is a step in the right direction. The caller of the method now has a CompletionStage that can be chained, composed, etc. in a non-blocking way. But store() and fetchOne() are still blocking calls, so the executor thread that invokes the transactionResultAsync() lambda, is going to block.

This is where I'm uncertain on how to proceed. Was it the designers' intent that we would stop here? I'd like to continue with fetchAsync() but it starts to get hairy:

CompletionStage<CompletionStage<User>> userCompletionStageCompletionStage = DSL.using(configuration)
        .transactionResultAsync(transactionConfiguration ->
                DSL.using(transactionConfiguration)
                .selectFrom(USER)
                .where(USER.USER_ID.equal(userId))
                .forUpdate()
                .fetchAsync()
                .thenApply(result -> {
                    if (result.isEmpty()) {
                        throw new ObjectNotFoundException("User(userId = %d) not found", userId);
                    }
                    else {
                        UserRecord userRecord = result.get(0);
                        User user = new User(userRecord);
                        mutator.mutate(user);
                        userRecord.store();
                        return user;
                    }
                })
        );

That just doesn't seem right. If it wasn't for the fact that the asynchronous transactional scope "wraps" the we could compose the stages together and flatten the stages. This is, in fact, the approach we could take if we wanted to squash that last blocking call, store():

CompletionStage<CompletionStage<User>> userCompletionStageCompletionStage = DSL.using(configuration)
                .transactionResultAsync(transactionConfiguration -> {
                    CompletionStage<Result<UserRecord>> selectCompletionStage = DSL.using(transactionConfiguration)
                            .selectFrom(USER)
                            .where(USER.USER_ID.equal(userId))
                            .forUpdate()
                            .fetchAsync();

                    Function<Result<UserRecord>, CompletionStage<User>> mutateAndPersistAsync = result -> {
                        if (result.isEmpty()) {
                            throw new ObjectNotFoundException("User(userId = %d) not found", userId);
                        }
                        else {
                            return CompletableFuture.supplyAsync(() -> {
                                UserRecord userRecord = result.get(0);
                                User user = new User(userRecord);
                                mutator.mutate(user);
                                userRecord.store();
                                return user;
                            }, configuration.executorProvider().provide());
                        }
                    };

                    return selectCompletionStage.thenCompose(mutateAndPersistAsync);
                });

But that still leaves us with CompletionStage<CompletionStage<User>>. Am I missing something? Any and all feedback appreciated.

Thanks!

Lukas Eder

unread,
May 26, 2016, 1:22:54 PM5/26/16
to jooq...@googlegroups.com
Hi Nick,

Thanks for your detailed message! Great to hear feedback about the new jOOQ 3.8 async API.

First off, the disappointment. Asynchronous database access is a lie, inevitably. For two reasons:

1. JDBC is blocking
2. Most databases are blocking (except for SQL Server and PostgreSQL to my knowledge, which ship with obscure, nonstandard, nonblocking APIs, but not JDBC)

And if you want, there's a third argument, which isn't as strict

3. Most Java APIs (Spring, JTA, etc.) assume thread-bound transactions. It is *very* unusual to retrieve a Connection from a pool and pass it around between threads.

With this in mind, let's review your code.

2016-05-26 2:12 GMT+02:00 <ni...@auptix.com>:
Neither in the jOOQ 3.8 manual, nor anywhere online, can I find examples of how to use the new transactionAsync() and transactionResultAsync() methods.

True, that's still a TODO, unfortunately.
Yes, indeed. See the above rationale about transactions usually being thread-bound. One might accept this as "oh well". But if you can guarantee (through your connection pool implementation) that this thread-bound-ness is not a requirement, we can go on

This is where I'm uncertain on how to proceed. Was it the designers' intent that we would stop here? I'd like to continue with fetchAsync() but it starts to get hairy:

CompletionStage<CompletionStage<User>> userCompletionStageCompletionStage = DSL.using(configuration)
        .transactionResultAsync(transactionConfiguration ->
                DSL.using(transactionConfiguration)
                .selectFrom(USER)
                .where(USER.USER_ID.equal(userId))
                .forUpdate()
                .fetchAsync()
                .thenApply(result -> {
                    if (result.isEmpty()) {
                        throw new ObjectNotFoundException("User(userId = %d) not found", userId);
                    }
                    else {
                        UserRecord userRecord = result.get(0);
                        User user = new User(userRecord);
                        mutator.mutate(user);
                        userRecord.store();
                        return user;
                    }
                })
        );

That just doesn't seem right.

No, it doesn't. Do note that not everyone uses transactions. FetchAsync() can be used as a standalone, chainable asynchronous fetch, e.g. with auto commit = true, or with readonly transaction mode.

The two APIs (transactionResultAsync() and fetchAsync()) are orthogonal, not really composable.
 
If it wasn't for the fact that the asynchronous transactional scope "wraps" the we could compose the stages together and flatten the stages. This is, in fact, the approach we could take if we wanted to squash that last blocking call, store():

CompletionStage<CompletionStage<User>> userCompletionStageCompletionStage = DSL.using(configuration)
                .transactionResultAsync(transactionConfiguration -> {
                    CompletionStage<Result<UserRecord>> selectCompletionStage = DSL.using(transactionConfiguration)
                            .selectFrom(USER)
                            .where(USER.USER_ID.equal(userId))
                            .forUpdate()
                            .fetchAsync();

                    Function<Result<UserRecord>, CompletionStage<User>> mutateAndPersistAsync = result -> {
                        if (result.isEmpty()) {
                            throw new ObjectNotFoundException("User(userId = %d) not found", userId);
                        }
                        else {
                            return CompletableFuture.supplyAsync(() -> {
                                UserRecord userRecord = result.get(0);
                                User user = new User(userRecord);
                                mutator.mutate(user);
                                userRecord.store();
                                return user;
                            }, configuration.executorProvider().provide());
                        }
                    };

                    return selectCompletionStage.thenCompose(mutateAndPersistAsync);
                });

But that still leaves us with CompletionStage<CompletionStage<User>>. Am I missing something? Any and all feedback appreciated.

That doesn't look right :)

But this discussion made me think about an alternative transaction API. What we would really need for cases like yours is an API that would allow for (pseudo code):

ctx.beginTransactionAsync()
   .thenApply(... -> ...)
   .thenApply(... -> ...)
   .thenApply(... -> commit());

With this model, the BEGIN TRANSACTION and COMMIT commands would be just like any other "async" database interaction, instead of the current API, which assumes that an "async" transaction is atomically blocking.

Does that make sense?

Samir Faci

unread,
May 26, 2016, 5:35:49 PM5/26/16
to jooq...@googlegroups.com
Oooh, that caught my attention.

I also have a use case where I need to inject some additional SQL to be executed within the same transaction.  

the code snippet you describe would sound ideal for us.


ctx.beginTransactionAsync()
   .thenApply(... -> ...)
   .thenApply(... -> ...)
   .thenApply(... -> commit());



--
You received this message because you are subscribed to the Google Groups "jOOQ User Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to jooq-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thank you
Samir Faci

Lukas Eder

unread,
May 27, 2016, 3:00:18 AM5/27/16
to jooq...@googlegroups.com
Thanks, Samir

2016-05-26 23:35 GMT+02:00 Samir Faci <sa...@esamir.com>:
I also have a use case where I need to inject some additional SQL to be executed within the same transaction.  

Would you mind elaborating your use-case a little bit?
 
the code snippet you describe would sound ideal for us.


ctx.beginTransactionAsync()
   .thenApply(... -> ...)
   .thenApply(... -> ...)
   .thenApply(... -> commit());


So far, this was just a very high level sketch. Let's assume we'd be going this way. There would be two new alternative transaction APIs: A blocking one and a non-blocking one. The blocking one might look just like JDBC or JTA:

Transaction transaction = ctx.beginTransaction();
ctx.insert()...
ctx.update()...
Savepoint savepoint = transaction.savepoint();
ctx.delete()...
transaction.commit();

The non-blocking one would need to maintain the transaction state throughout the .thenApply() call chain, exposing it in case someone wants to nest stuff (using savepoints) or commit/rollback early. 

The difficulty of this is that CompletionStage is not designed for this use-case. It is designed for passing only computation results to the next computation, not (transaction) contexts. This means that the context needs to stay external or implicit, which also violates the CompletionStage design.

One option would be to subtype the JDK's CompletionStage and make that a TransactionalCompletionStage. So, more specifically than what I've stated earlier:

ctx.beginTransactionAsync()
   .thenApply(transaction -> transaction.ctx().insert())
   .thenApply(transaction -> transaction.ctx().update())
   .thenApply(transaction -> transaction.savepoint())
   .thenApply(transaction -> transaction.ctx().delete())
   .thenApply(transaction -> commit());

Where "transaction" would be that TransactionalCompletionStage<T>, where <T> is the outcome of the previous computation (Integer in case of insert/update/delete, Result in case of fetch, Void in case of savepoint).

I'm a bit wary of implementing that, though. There hasn't been a lot of literature around, documenting such things (as with subtyping the Collections API). 
I'm very open to hear your thoughts on this matter.

Best
Lukas

ni...@auptix.com

unread,
May 27, 2016, 3:58:30 PM5/27/16
to jOOQ User Group
Thanks Lukas. That helps. Curious though, if the JDBC driver doesn't provide an async I/O API, what is the gain from adding async methods to the jOOQ API? Clients can always wrap jOOQ usage in CompletableFuture.supplyAsync(() -> ...) themselves and that would at least not perpetuate the lie.

Samir Faci

unread,
May 27, 2016, 5:12:45 PM5/27/16
to jooq...@googlegroups.com
This is sort of a tangent so i'll try to be concise.  

My use case was that we need to insert something and interject in the middle of a transaction.

ie.  

begin transaction;
// insert into audit_table(app_id, transaction_id);
// insert  + update a bunch of different tables
// commit transaction;

The part in the middle is really difficult to perform with the current spring wiring that we're using.  They have a pre-commit and a post-commit but in the pre-commit the transaction ID is invalid, in the post-commit we can do a valid insert but it's too late
since the trigger we have on the table would have already executed.  So we need something that would be executed right after the transaction has started.

We played around with spring 4.x, TransactionEvents etc and all of them are lacking unless you completely hijack the transaction and handle it manually.

The snippet you posted really caught my eye cause it seems to directly be relevant to my use case.  

--
Samir Faci





--
You received this message because you are subscribed to the Google Groups "jOOQ User Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to jooq-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Lukas Eder

unread,
May 28, 2016, 4:11:46 AM5/28/16
to jooq...@googlegroups.com
2016-05-27 21:58 GMT+02:00 <ni...@auptix.com>:
Thanks Lukas. That helps. Curious though, if the JDBC driver doesn't provide an async I/O API, what is the gain from adding async methods to the jOOQ API?

The consistent programming model, and the distant hope that a JDBC 5.0 version will help propagate asynchronicity one more layer down the stack. Of course, given the numerous attempts by the JDK library designers to get concurrency APIs right, we might've bet on the wrong horse once again (as we did before when we supported Future<R> fetchLater())
 
Clients can always wrap jOOQ usage in CompletableFuture.supplyAsync(() -> ...) themselves

Yes, they can. And they could also wrap Results in Streams themselves. And Object[] in Records, if they must. Etc. :)

But the gain from a consistent programming model is non negligible, I think. If you're doing these things yourself, you will need to make sure that you will always properly configure your Executors for each call. In that respect, the CompletableFuture API is a bit unfortunate as it always defaults to passing stuff to the ForkJoinPool (jOOQ has a Configuration.executorProvider() SPI to provide the default Executor throughout the CompletionStage chain), and you might not think of wrapping individual blocking operations in a ManagedBlocker in case your runnables do hit the ForkJoinPool. jOOQ does these things for you. That might already be helpful. 

If not, sure. You can wrap stuff yourself. In that case, I suggest reading this interesting blog post here:
 
and that would at least not perpetuate the lie.

It wouldn't perpetuate it to your DAO logic which immediately uses jOOQ, but it would most certainly perpetuate it to your service / client logic :)

Asynchronicity in RDBMS is a lie (someone, somewhere needs to keep some process waiting on that sweet table/row/cell lock. It's the database).
Just like non-serializable transactions are a lie. The latter, we've accepted to live with for several decades. I'm sure we'll accept to live with the former too.

Having said so, if you find anything that will help further improve the lie, I'm very happy to discuss!

Hope this helps,
Lukas

Lukas Eder

unread,
May 28, 2016, 4:23:33 AM5/28/16
to jooq...@googlegroups.com
Hi Samir,

Thank you very much for elaborating. I see, it's tangent, but it's still interesting to remember the bigger picture when designing API. Your use case could be covered if you were using jOOQ's transaction API, which allows you to register a TransactionProvider:

Your provider can be bound to any transaction implementation, including Spring TX, for instance.

With JTA, you'd probably be looking up a UserTransaction from JNDI, and that might happen in a very specific transaction layer, where you could inject your logic. I've done that for a previous employer, where we initialised the PL/SQL session context with global variables at the beginning of each transaction.

Another option might perhaps be to intercept this on a connection pool level? After all, a transaction usually begins right when the connection is acquired from the pool.

Interesting to see that Spring's TransactionalEventListener / TransactionPhase don't support any "BEFORE BEGIN" or "AFTER BEGIN" phases. You might be able to work around this by implementing your own PlatformTransactionManager, delegating to your existing one. You'd be injecting your logic right after the PlatformTransactionManager.getTransaction(TransactionDefinition) method.

Hope this helps,
Lukas

Lukas Eder

unread,
Jun 2, 2016, 7:28:24 AM6/2/16
to jOOQ User Group
Hi Samir,

I finally got around to follow up on this interesting topic. There's in fact already a feature request for this in the Spring Framework:

I have cross-referenced this discussion and your use-case. Let's see what happens.

Cheers,
Lukas
Lukas

--
To unsubscribe from this group and stop receiving emails from it, send an email to jooq-user+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thank you
Samir Faci

--
You received this message because you are subscribed to the Google Groups "jOOQ User Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to jooq-user+unsubscribe@googlegroups.com.

Samir Faci

unread,
Jun 2, 2016, 9:37:09 PM6/2/16
to jooq...@googlegroups.com
Thanks Lukas.  I'll be sure to watch that.  Hope that gets a better response then the Oracle ticket that was opened.  

:-) 



Lukas
Lukas

--
To unsubscribe from this group and stop receiving emails from it, send an email to jooq-user+...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thank you
Samir Faci

--
You received this message because you are subscribed to the Google Groups "jOOQ User Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to jooq-user+...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "jOOQ User Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to jooq-user+...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Lukas Eder

unread,
Jun 3, 2016, 1:26:03 AM6/3/16
to jooq...@googlegroups.com
It got an immediate response:

Inline-Bild 1

Whatever that means :-)

Lukas Eder

unread,
Jul 1, 2016, 4:53:51 AM7/1/16
to jOOQ User Group
So far, this was just a very high level sketch. Let's assume we'd be going this way. There would be two new alternative transaction APIs: A blocking one and a non-blocking one. The blocking one might look just like JDBC or JTA:

Transaction transaction = ctx.beginTransaction();
ctx.insert()...
ctx.update()...
Savepoint savepoint = transaction.savepoint();
ctx.delete()...
transaction.commit();

At the time we discussed this, I have forgotten to create an issue in the issue tracker. Here it is:

There has also been a related question on Stack Overflow:

We should definitely add this API as an alternative to the existing transaction API. 

oren...@gmail.com

unread,
Nov 23, 2016, 4:42:01 PM11/23/16
to jOOQ User Group
Hi,

Looking for an async way to work with mariadb I came across jooq and this thread, but also across this blog post:  https://blog.jooq.org/2014/09/23/asynchronous-sql-execution-with-jooq-and-java-8s-completablefuture/, and I found them contradicting - the older blog describes a working solution while from this recent thread it seems that the solution is not yet exist. 
What am I missing?

Thanks,
Oren

Lukas Eder

unread,
Nov 23, 2016, 5:26:17 PM11/23/16
to jooq...@googlegroups.com
Hello Oren,

Not sure what you're looking for in particular. Everything seems to be there:

"asynchronous" transactions:

"asynchronous" execution:

These are just some example APIs.
Are you looking for something particular?

Cheers
Lukas

--
You received this message because you are subscribed to the Google Groups "jOOQ User Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to jooq-user+unsubscribe@googlegroups.com.

oren...@gmail.com

unread,
Nov 24, 2016, 8:13:00 AM11/24/16
to jOOQ User Group
Hi Lukas,

You are right, I may be over looked the API reference.

Are you looking for something particular?

- nothing fancy, the snippet that you had shown (added below) is a good reference, but I got the impression that it is not ready yet, am I wrong?

ctx.beginTransactionAsync()
   .thenApply(transaction -> transaction.ctx().insert())
   .thenApply(transaction -> transaction.ctx().update())
   .thenApply(transaction -> transaction.savepoint())
   .thenApply(transaction -> transaction.ctx().delete())
   .thenApply(transaction -> commit());

Regards,
Oren
To unsubscribe from this group and stop receiving emails from it, send an email to jooq-user+...@googlegroups.com.

Lukas Eder

unread,
Nov 24, 2016, 9:46:30 AM11/24/16
to jooq...@googlegroups.com
Oh, I see, thanks for the update. Yes that API is not implemented yet. The relevant feature request is here:

I'm not yet 100% convinced that we should really add it, so there isn't really a target release version for this.

Cheers,
Lukas

To unsubscribe from this group and stop receiving emails from it, send an email to jooq-user+unsubscribe@googlegroups.com.

oren...@gmail.com

unread,
Nov 24, 2016, 12:20:55 PM11/24/16
to jOOQ User Group
OK, I see. So to sum up, currently I can use asy API but without the transaction support. Right?

Regards,
Oren

Lukas Eder

unread,
Nov 29, 2016, 6:29:54 AM11/29/16
to jooq...@googlegroups.com
Hi Oren,

There is support for "asynchronous" transactions in the jOOQ API:

But not in an "imperative" way as you asked for, i.e. you cannot explicitly set rollback points or commit points. Instead, the API is more functional, meaning the entire transaction is something that is sent somewhere for synchronous execution, but the thread that submitted the transactional code doesn't need to block.

Hope this helps,
Lukas

To unsubscribe from this group and stop receiving emails from it, send an email to jooq-user+unsubscribe@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages