Usability doubts when executing multiple statements in a row

274 views
Skip to first unread message

Lukas Eder

unread,
Apr 7, 2021, 12:14:08 PM4/7/21
to r2dbc
Hi folks.

I have some usability doubts. I'd like to execute multiple statements in a row on an open connection. The spec website shows some examples:

But those Publishers are never executed. This helps illustrate the SPI capabilities, but not really how to use the things.

How do I best wrap them in a Flux *and* pass along the Connection? I mean, I can use zip and wrap everything in cumbersome tuples, but that seems quite complicated, and with only one misstep, the compiler will send me on a cryptic journey of minutes to discover my mistake...

Maybe I'm overlooking something, but perhaps, all these Publishers should return some new type, let's call it X, with a method X.connection() and possibly other utility methods. Result would then extend X, because I might need to get a hold of a Connection also when fetching results...

Thoughts?
Lukas

Michael McMahon

unread,
Apr 7, 2021, 4:53:45 PM4/7/21
to r2dbc
I'll often follow a pattern where the Connection is used in a flatMap operator. This creates a scope in which the Connection is accessible to downstream operators:
ConnectionFactory connectionFactory = 
  ConnectionFactories.get("r2dbc:oracle://db.example.com:1521/db.service.name"); 
Mono.from(connectionFactory.create())
  .flatMapMany(connection -> // Scope with Connection begins here
    Flux.from(connection.createStatement(
      "SELECT 'Hello, Oracle' FROM sys.dual")
      .execute())
     .flatMap(result -> 
       result.map((row, metadata) ->
         row.get(0, String.class)))
     .doOnNext(System.out::println) 
     .thenMany(connection.close())) // Scope with Connection ends here
   .subscribe();

I think Reactor might offer other ways to solve this problem as well, but I'm not sure. Maybe someone who knows Reactor better than I do would like to suggest something?

Lukas Eder

unread,
Apr 8, 2021, 4:31:00 AM4/8/21
to Michael McMahon, r2dbc
Hi Michael,

Thanks a lot for your example. I see that makes sense. Somehow it did not occur to me to create fluxes with contents that capture the surrounding scope - trying to keep all lambdas pure as they will compose better when the flux declaration becomes more complex.

But of course, this might work. One thing that I continue to find confusing is the idea that a Publisher is "lazy", but its construction is often eager. In your example, the connection.close() call is invoked before any of the statements are executed. But since it returns a Publisher, which is subscribed to only after statement execution, that's fine. I would have expected an overload Flux.thenMany(Supplier<? extends Publisher<T>>) to allow for passing connection::close, but that's not available. I guess it's fine?

But my original suggestion stands. I think it would be worth evaluating whether the underlying Connection could be made available from other R2DBC types, just like in JDBC, it is usually possible to access it from e.g. Statement.getConnection(). It's a low hanging fruit for the SPI but could add quite some value to complex streams...

--
You received this message because you are subscribed to a topic in the Google Groups "r2dbc" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/r2dbc/nyIXQ0EQddQ/unsubscribe.
To unsubscribe from this group and all its topics, send an email to r2dbc+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/r2dbc/cb95cbac-ea25-405e-ae8a-335545b06441n%40googlegroups.com.

Mark Paluch

unread,
Apr 8, 2021, 5:28:26 AM4/8/21
to r2dbc
Reactor provides a conveniently managed resource closure with usingWhen. I haven't found a counterpart yet in RxJava, happy to learn how to achieve the same with RxJava.

You'd go as follows about it:
    
Flux.usingWhen(factory.create(), connection -> {

    return Flux.usingWhen(Mono.from(connection.beginTransaction()).thenReturn(connection),
           txConnection -> {

                return doSomething(txConnection).then(doSomethingElse(txConnection));
           }, Connection::commitTransaction, // on success
        (txConnection, throwable) -> txConnection.rollbackTransaction(), // on error
        Connection::rollbackTransaction // on cancel
        );

}, Connection::close);

Since Publishers are expected to materialize (do their work) upon subscription/request(n), it's generally safe to assume eager construction.

Regarding navigation convenience, across objects (Connection, Statement, and more) we've always seen the SPI as a touching point between drivers and integration libraries only, not as something application code would use directly. Integrations should be able to afford a certain degree of complexity because of their purpose.

Cheers,
Mark

Dávid Karnok

unread,
Apr 8, 2021, 6:09:04 AM4/8/21
to Mark Paluch, r2dbc
> Flux.thenMany(Supplier<? extends Publisher<T>>) to allow for passing connection::close

Use defer(connection::close).

> Connection could be made available from other R2DBC types

If capture is trouble for you and you plan to do a custom set of lambda-customizable operators, you can define them with BiConsumer or BiFunction, handing in the connection object. Alternatives could be using tuples or the context feature of Reactor.

> usingWhen [...] I haven't found a counterpart yet in RxJava

You have to compose it from other (somewhat costly) operators at the moment:

static <T, D> Flowable<T> usingWhen(
        Publisher<? extends D> resource,
        Function<? super D, ? extends Publisher<? extends T>> use,
        Function<? super D, ? extends Publisher<?>> cleanup) {
    return
            Maybe.fromPublisher(resource)
            .flatMapPublisher(res ->
                Flowable.using(
                        () -> res,
                        use,
                        resc -> Flowable.fromPublisher(cleanup.apply(resc)).subscribe(),
                        false
                )
            );
}

static <T, D> Flowable<T> usingWhen(
        Publisher<? extends D> resource,
        Function<? super D, ? extends Publisher<? extends T>> use,
        Function<? super D, ? extends Publisher<?>> onComplete,
        Function<? super D, ? extends Publisher<?>> onError,
        Function<? super D, ? extends Publisher<?>> onCancel) {
    return
            Maybe.fromPublisher(resource)
            .flatMapPublisher(res ->
                Flowable.fromPublisher(use.apply(res))
                .flatMap(
                        v -> Flowable.just(v),
                        e -> Flowable.fromPublisher(onError.apply(res)).ignoreElements().toFlowable(),
                        () -> Flowable.fromPublisher(onComplete.apply(res)).ignoreElements().toFlowable()
                )
                .doOnCancel(() -> Flowable.fromPublisher(onCancel.apply(res)).subscribe())
            );
}


You received this message because you are subscribed to the Google Groups "r2dbc" group.
To unsubscribe from this group and stop receiving emails from it, send an email to r2dbc+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/r2dbc/c9d7be40-0f14-49f9-aa36-fea2026820a6n%40googlegroups.com.


--
Best regards,
David Karnok

Lukas Eder

unread,
Apr 8, 2021, 7:46:10 AM4/8/21
to Mark Paluch, r2dbc
On Thu, Apr 8, 2021 at 11:28 AM Mark Paluch <mpa...@paluch.biz> wrote:
Reactor provides a conveniently managed resource closure with usingWhen.

That's good to know. Named after C#'s using statement, I suspect?
 
You'd go as follows about it:
    
Flux.usingWhen(factory.create(), connection -> {

    return Flux.usingWhen(Mono.from(connection.beginTransaction()).thenReturn(connection),
           txConnection -> {

                return doSomething(txConnection).then(doSomethingElse(txConnection));
           }, Connection::commitTransaction, // on success
        (txConnection, throwable) -> txConnection.rollbackTransaction(), // on error
        Connection::rollbackTransaction // on cancel
        );

}, Connection::close);

Since Publishers are expected to materialize (do their work) upon subscription/request(n), it's generally safe to assume eager construction.

Well, in my case (jOOQ), a Publisher is a dynamic SQL query that is now being constructed eagerly (the expression tree, not the SQL string). Yes, the execution, which is the biggest part of the work, is still lazy, but the construction has some cost. But that's a topic for reactor, not for R2DBC - and the defer() method suggested by Dávid will help work around this. As always with such rich APIs, it takes lots of practice to write idiomatic code... Clearly, I'm not there yet.

For example, I would have never thought of using Mono.from(trx).thenReturn(connection). Clever move :)
 
Regarding navigation convenience, across objects (Connection, Statement, and more) we've always seen the SPI as a touching point between drivers and integration libraries only, not as something application code would use directly. Integrations should be able to afford a certain degree of complexity because of their purpose.

I get this point, but here's an alternative point of view:

Do you know the many hacks I had to implement because the JDBC spec folks overlooked adding a SQLInput.getConnection() or SQLOutput.getConnection() method? :)

It's hard to create new objects (Arrays, Blobs, Clobs, SQLXML, Structs, etc.) in JDBC without having a Connection reference. ThreadLocal to the rescue. In a JDBC context, the PreparedStatement creation can usually be safely expected to happen on the same thread as some later variable binding, so I can put my JDBC Connection in a ThreadLocal somewhere and access it from everywhere else. This isn't the case in R2DBC.

So far, I haven't run into many of these cases yet where I would have needed to get a hold of a Connection internally other than having to close it. But this might change in the future, if R2DBC ever decides to implement better resource management for very large objects, when String or ByteBuffer won't work. My prediction here is that if R2DBC is successful (which we all hope!), then there will be other use-cases of binding directly to the SPI, just as there were with JDBC. There will be more tooling building on it, more libraries proxying it, more people wanting to inject some custom behaviour on this layer (e.g. intercept SQL strings, etc.), so I think that the "ubiquitous Connection", which was a great feature in JDBC, could also be very useful to R2DBC in the future.

Anyway, this is something to keep in mind. I guess it can still be done later.

Cheers,
Lukas
 
Reply all
Reply to author
Forward
0 new messages