Appropriate use of lazy fetching with fetchSize + fetchStream ?

43 de afișări
Accesați primul mesaj necitit

LMey

necitită,
7 mar. 2018, 12:15:5107.03.2018
– jOOQ User Group
Hi,

I'm not sure I'm using lazy fetching with streams correctly... I'm using Jooq 3.9.6 en Postgresql 9.5.

This is my (simplified) repo method :

public Stream<Example> findLazy() {
return dslContext.select(...)
.fetchSize(100)
.fetchStream()
.map(ExampleMapper::apply);
}

And my service method (running in a transaction) :

try (Stream<Example> stream = exampleRepository.findLazy()) {
stream.forEach(magic::doStuff);
}

Will this chunk appropriately or load everything into memory ? How does this work exactly ? Sadly with postgresql I have not found any way of checking if this works.

Thanks

Lukas Eder

necitită,
8 mar. 2018, 03:39:1408.03.2018
– jooq...@googlegroups.com
Hello,

The API usage is correct:
  • fetchSize() overrides the JDBC driver's default, which in the case of PostgreSQL is 0 (reading the source code), meaning that all rows are fetched in one go by default.
  • You're using jOOQ's fetchStream(), which keeps an open JDBC ResultSet internally, so jOOQ also doesn't fetch everything in memory
  • You're correctly wrapping the Stream in a try-with-resources statement, to be sure that there are no resource leaks
If you want to validate the behaviour, put some breakpoints into the source code of the PostgreSQL JDBC driver, or turn on debug logging of the driver. It should emit some debug information, e.g. in org.postgresql.core.v3.QueryExecutorImpl.sendExecute()

I hope this helps,
Lukas

--
You received this message because you are subscribed to the Google Groups "jOOQ User Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to jooq-user+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

LMey

necitită,
8 mar. 2018, 04:03:2408.03.2018
– jOOQ User Group
Alright thanks. I'm seeing this when doing chunks of 1 :

-2018-03-08 09:52:59.811 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  FE=> Parse(stmt=null,query="REDACTED",oids={1043,0,0})
-2018-03-08 09:52:59.811 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  FE=> Bind(stmt=REDACTED>)
-2018-03-08 09:52:59.811 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  FE=> Describe(portal=C_5)
-2018-03-08 09:52:59.811 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  FE=> Execute(portal=C_5,limit=1)
-2018-03-08 09:52:59.811 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  FE=> Sync
-2018-03-08 09:52:59.813 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  <=BE ParseComplete [null]
-2018-03-08 09:52:59.813 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  <=BE BindComplete [C_5]
-2018-03-08 09:52:59.813 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  <=BE RowDescription(7)
-2018-03-08 09:52:59.813 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :         Field(fieldNameRedacted,TEXT,65535,T)
-2018-03-08 09:52:59.813 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :         Field(fieldNameRedacted,TIMESTAMPTZ,8,T)
-2018-03-08 09:52:59.813 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :         Field(fieldNameRedacted,INT8,8,T)
-2018-03-08 09:52:59.813 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :         Field(fieldNameRedacted,UUID,16,T)
-2018-03-08 09:52:59.813 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :         Field(fieldNameRedacted,BPCHAR,65535,T)
-2018-03-08 09:52:59.813 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :         Field(fieldNameRedacted,NUMERIC,65535,T)
-2018-03-08 09:52:59.813 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :         Field(fieldNameRedacted,NUMERIC,65535,T)
-2018-03-08 09:52:59.813 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  <=BE DataRow(len=79)
-2018-03-08 09:52:59.813 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  <=BE PortalSuspended
-2018-03-08 09:52:59.813 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  <=BE ReadyForQuery(T)
-2018-03-08 09:52:59.843 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  FE=> Execute(portal=C_5,limit=1)
-2018-03-08 09:52:59.843 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  FE=> Sync
-2018-03-08 09:52:59.843 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  <=BE DataRow(len=80)
-2018-03-08 09:52:59.843 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  <=BE PortalSuspended
-2018-03-08 09:52:59.843 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  <=BE ReadyForQuery(T)
-2018-03-08 09:52:59.844 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  FE=> Execute(portal=C_5,limit=1)
-2018-03-08 09:52:59.844 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  FE=> Sync
-2018-03-08 09:52:59.844 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  <=BE CommandStatus(SELECT 0)
-2018-03-08 09:52:59.844 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  <=BE ReadyForQuery(T)
-2018-03-08 09:52:59.987 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :   simple execute, handler=org.postgresql.jdbc.PgConnection$TransactionCommandHandler@6658f08a, maxRows=0, fetchSize=0, flags=22
-2018-03-08 09:52:59.988 TRACE 28417 --- [           main] o.postgresql.core.v3.QueryExecutorImpl   :  FE=> ClosePortal(C_5)

For my culture : is this the PortalSuspended and multiple Execute limit 1 stuff that indicates if it works ?
To unsubscribe from this group and stop receiving emails from it, send an email to jooq-user+...@googlegroups.com.

Lukas Eder

necitită,
8 mar. 2018, 04:15:2908.03.2018
– jooq...@googlegroups.com
For my culture : is this the PortalSuspended and multiple Execute limit 1 stuff that indicates if it works ?

Probably :)
If you want to be sure, I think that the PostgreSQL mailing lists, or Stack Overflow are more appropriate channels...

LMey

necitită,
13 mar. 2018, 11:24:3213.03.2018
– jOOQ User Group
Of course. Just one last question regarding Jooq if you don't mind.

With a Cursor I can understand how it works... Like, it maintains an open ResultSet and you can fetch X records in a loop, do stuff with them and repeat. But how does Jooq handles that with a stream ? Is it abstracted ? You said it won't load everything into memory in my example using a forEach, but is it the case for all terminal stream operations ? Surely if I collect or sorted it will load everything right ?

Lukas Eder

necitită,
13 mar. 2018, 12:11:5513.03.2018
– jooq...@googlegroups.com
It's easy to explain. 

A "jOOQ Stream" is really just a wrapper around a "jOOQ Cursor" with some convenient API. 
A "jOOQ Cursor" is really just an Iterator wrapper around a JDBC ResultSet with some convenient API.

Now, every time a "jOOQ Stream" or a "jOOQ Cursor" pulls another value from the underlying JDBC ResultSet, there is some JDBC action going on. This directly translates to all terminal stream operations in their natural way.

To answer your question: Like any call to collect(), a call to collect() on a "jOOQ Stream" will iterate the entire ResultSet and collect stuff, just like a call to forEach() will iterate the entire ResultSet and do stuff with each row.

Hope this helps,
Lukas

LMey

necitită,
13 mar. 2018, 12:55:0713.03.2018
– jOOQ User Group
Thank you. Alright so AFAIK, maybe besides fiddling with the spliterator, there's no straightforward way of processing chunks from a stream. I guess I'd better be off working with a Cursor directly. I've seen fetchLazy does not accept a RecordMapper callback. Is there any way of using one when lazily fetching ? I haven't seen anything in the doc and I'd like to avoid a RecordN leaking from my repository.

Lukas Eder

necitită,
13 mar. 2018, 14:27:5713.03.2018
– jooq...@googlegroups.com
Indeed, the JDK's Stream API doesn't offer any way to create chunks / substreams based on chunk sizes. Perhaps, you might find some appropriate abstraction in jOOλ

But really, an imperative approach based on the org.jooq.Cursor type might be the easiest way forward. fetchLazy() produces a Cursor but doesn't actually fetch anything yet. You can then call appropriate methods on the Cursor, including Cursor.fetchNext(int):

I hope this helps,
Lukas
Răspundeți tuturor
Răspundeți autorului
Redirecționați
0 mesaje noi