with-open pattern

538 views
Skip to first unread message

Brian Craft

unread,
May 4, 2017, 1:35:48 PM5/4/17
to Clojure
The with-open style is used a lot in the jdbc lib, and elsewhere. It's pretty simple when data is very small, as you can just evaluate the entire result with doall, etc.

How do you deal with larger data, where you need to evaluate iteratively? If there's only one with-open it can be reasonably simple to pass the consumer into that context (though from ring it's pretty convoluted, due to needing to pass control to ring first). But if there are multiple with-open you have to nest them, with a continuation passing style, or middleware pattern, or something, which quickly becomes onerous as it affects all the code surrounding the with-open.

Is there some simpler pattern?

Josh Tilles

unread,
May 4, 2017, 2:22:36 PM5/4/17
to Clojure
I think the “reducible streams” approach described by Paul Stadig here has potential. It might not cover all of the scenarios you’re thinking of, though.

Brian Craft

unread,
May 4, 2017, 3:04:40 PM5/4/17
to Clojure
It's definitely the same problem, but I don't think it helps me. This part, in particular:

"If you treat this object like a sequence, it will fully consume the input stream and fully realize the decoded data in memory."

I'm specifically trying to avoid realizing the full collection in memory, because it won't fit.

Ghadi Shayban

unread,
May 4, 2017, 10:58:56 PM5/4/17
to Clojure
Hi Brian,
Both plain seqs or reducibles can be consumed without realizing the whole thing in memory, especially important (as you note) when the whole thing is humongous and dwarfs available memory.  To do it with a lazy seq, as in clojure.java.jdbc, you have to care of a few things. You must override :result-set-fn from clojure.java.jdbc/query [1] so that you can control realization through doseq or reduce.

Forbidden things: calling `doall` on the seq, or pouring it into a vector, or realizing the whole seq while still hanging on its head, or making a let binding of the resultset that you use more than once, or leak carelessly to some helper. 

If you have multiple bindings, my guess is that there is some unit of work for every item of a seq realized from a large 'main' query.  The same advice would apply here too, you just have to take care on many more queries to allow the garbage collector to do its job.  If you have a lot of "layers" perhaps you could decompose that into some simpler computations, or use a work queue mechanism or core.async channel.  Do you have a concrete example of the multiple bindings scenario?

Make sure that the underlying JDBC driver is not working against you by buffering the whole thing!  MySQL's JDBC driver, for example, requires a few magical settings [2] to prevent it from buffering internally
[1] http://clojure.github.io/java.jdbc/#clojure.java.jdbc/query

[2] From https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html:
stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY,
              java
.sql.ResultSet.CONCUR_READ_ONLY);
stmt
.setFetchSize(Integer.MIN_VALUE);

The combination of a forward-only, read-only result set, with a fetch size of Integer.MIN_VALUE serves as a signal to the driver to stream result sets row-by-row. After this, any result sets created with the statement will be retrieved row-by-row.



On Thursday, May 4, 2017 at 1:35:48 PM UTC-4, Brian Craft wrote:

Herwig Hochleitner

unread,
May 5, 2017, 8:10:27 AM5/5/17
to clo...@googlegroups.com
2017-05-04 19:35 GMT+02:00 Brian Craft <craft...@gmail.com>:
If there's only one with-open it can be reasonably simple to pass the consumer into that context (though from ring it's pretty convoluted, due to needing to pass control to ring first).

Creating streamed ring responses can be kind of tricky. In my experience, there are two patterns, that work well:

1) Feeding a PipedInputStream from a new thread:

(let [os (PipedOutputStream.)
      is (PipedInputStream. os)]
  (future (with-open [rs ...]
            (write-to! os rs)))
  {:body is})

2) Use a ring adapter that allows some form of asynchronous response. E.g. with the new ring 1.6 model

{:body (reify ring.core.protocols/StreamableResponseBody
         (write-body-to-stream [_ _ output-stream]
(with-open [rs ...
os output-stream]
(write-to! os rs))))}
 

Brian Craft

unread,
May 5, 2017, 2:18:17 PM5/5/17
to Clojure
This looks like a partial solution:


perhaps inspired by this discussion:

Ghadi Shayban

unread,
May 5, 2017, 6:50:24 PM5/5/17
to Clojure
Maybe I don't understand the problem at hand yet -- you can almost certainly do this without language help.   Can you help out with a concrete example or problematic code?

Paul Stadig

unread,
May 10, 2017, 8:39:19 AM5/10/17
to Clojure
Hey Brian,
The fact that a reducible (as I have implemented it in the reducible-stream library) will fully realize itself if it is used as a sequence is a drawback that I'm not happy with. In the back of my mind I believe there may be a way around it, but I'm not sure, and it is still a topic of further thought/research for me. However, I think it can easily be avoided.

The point of using a reducible instead of a sequence is to invert the control so that the collection is in charge of processing itself. When you use a reducible as a sequence you are processing the collection externally, making the collection responsible only for doling itself out one item at a time, which it cannot do if it must also manage the scope for some resource (like a connection) that must be closed.

I would suggest that you have more processing that needs to be pushed into the reduction process. So, instead of getting a database result set, processing it in some way, and then trying to pass it as a sequence into Ring (I'm not sure if this is exactly your problem, but I think it is representative?), where Ring will want to consume that sequence, which will trigger it to fully realize in memory---instead push the work that Ring would be doing into the reduction process. So, reduce over the result set with an output stream (or one end of a piped output stream), and in the reduction process write to the stream. Ring takes the other end of the pipe and consumes the stream to produce its result.

A reducible is a delayed computation, and with transducers or by other means you can layer more delayed computation onto the reducible. As soon as you fire the reducible it will produce its entire result, whether the firing is because of a call to `reduce` or `seq`. A reducible is like a spring with potential energy built in, and when it is sprung it springs. A lazy sequence is like a wheel, if you apply external force to it, it will turn, but otherwise it is inert. Probably a terrible analogy, but that's the best I can come up with. :)

I hope that's helpful.


Paul

http://www.realworldclojure.com/
http://paul.stadig.name/
@pjstadig
Reply all
Reply to author
Forward
0 new messages