Hello.
This is the current version of our windowed query:
from sqlalchemy.sql.expression import distinct
def windowed_query(query, column, options_or_callback, window_size=100):
"""Perform (a correct) yield_per() operation. See WindowedQuery.yield_per()
for more.
EXAMPLE:
q = session.query(Foo).filter(Foo.name.like(u'%foo%'))
wq = windowed_query(q, Foo.id, [subqueryload(Foo.bars)])
for each_foo in wq:
print
each_foo.name
for each_bar in each_foo.bars:
print each_bar
"""
return WindowedQuery(
query,
column,
options_or_callback
).yield_per(window_size)
class WindowedQuery(object):
"""Perform (a correct) yield_per() operation."""
def __init__(self, query, column, options_or_callback):
"""
The query MUST have NO options(...) and NO order_by(...). It MUST
contain all necessary join() and filter() to limit the result set as
desired.
The column is the id column of the main result ORM class. It is used to
divide the results into windows of equal size.
The options_or_callback can be a list of Query.options(...) such as
subqueryload(). If so, the following query is created to fetch data of
each window:
q = session.query(self._column.class_).options(*self._options)
q = q.filter(self._column.in_(each_window)
The options_or_callback can be a one-argument function responsible for
complete processing of one window. Its only argument is the list of ids
of the window. It MUST return an iterable over results. It is called
once for each window.
"""
self._query = query
self._column = column
self._session = query.session
if isinstance(options_or_callback, list):
self._options = options_or_callback
self._callback = None
else:
self._options = None
self._callback = options_or_callback
def yield_per(self, window_size):
"""Process results in chunks (windows).
Steps:
* Obtain ids of ALL result rows via slightly modified self._query.
* Divide ids to chunks of equal size and perform ONE query for EACH
chunk to fetch the data.
A chunk is determined by the test q.filter(self._column.in_(chunk)).
This is the only way that works in presence of the read-committed
isolation level.
"""
if self._options is not None:
return self._yield_per_options(window_size)
if self._callback is not None:
return self._yield_per_callback(window_size)
def _yield_per_options(self, window_size):
"""Deduce data query from self._column and self._options."""
q = self._session.query(self._column.class_).options(*self._options)
for each_window in self._windows(window_size):
for each_result in q.filter(self._column.in_(each_window)):
yield each_result
def _yield_per_callback(self, window_size):
"""Use a callback function responsible for obtaining the results:
def callback(win):
# Equivalent to the use of self._options.
q = session.query(Foo).options(...)
q = q.filter(Foo.id.in_(win))
return q
"""
for each_window in self._windows(window_size):
for each_result in self._callback(each_window):
yield each_result
def _windows(self, window_size):
win = []
win_size = 0
for each in self._q_column():
if win_size < window_size:
win.append(each)
win_size += 1
if win_size == window_size:
yield win
win = []
win_size = 0
if win_size > 0:
yield win
def _q_column(self):
"""distinct() ensures that each id is returned at most once despite
a possible multiplying effect of a join().
"""
return self._query.with_entities(distinct(self._column))
We use it in the production for a few days now. Perhaps someone will find it
useful too.
Ladislav Lenart