[ANN] Modified WindowedRangeQuery recipe

41 views
Skip to first unread message

Ladislav Lenart

unread,
Jun 7, 2013, 6:56:10 AM6/7/13
to sqlal...@googlegroups.com
Hello.

I modified the recipe at

http://www.sqlalchemy.org/trac/wiki/UsageRecipes/WindowedRangeQuery

to better suit my needs. Perhaps someone else will find this useful:


####################
#### CODE BEGIN ####
####################

from sqlalchemy.orm import subqueryload
from sqlalchemy.sql.expression import distinct

def windowed_query(filter_query, data_query, column, window_size):
"""Perform (a correct) yield_per() operation. See WindowedQuery.yield_per()
for more.

EXAMPLE:
gen = windowed_query(
filter_query=session.query(Foo).filter(Foo.name.like(u'%foo%')),
data_query=session.query(Foo).options(Foo.bars),
column=Foo.id,
window_size=50,
)
for each_foo in gen:
print each_foo.name
for each_bar in each_foo.bars:
print each_bar
"""
return WindowedQuery(filter_query, data_query, column).yield_per(window_size)

class WindowedQuery(object):
"""Perform (a correct) yield_per() operation."""
def __init__(self, filter_query, data_query, column):
# A query with NO options(...) and NO order_by(...). MUST contain all
# necessary join() and filter() to limit the result set as desired.
self._filter_query = filter_query
# A simple query with options() to fetch the actual data.
self._data_query = data_query
# id column of the (main) result ORM class.
self._column = column

def yield_per(self, window_size):
"""Process results in chunks.
Steps:
* Obtain ids of ALL result rows via self._filter_query.
* Divide ids to chunks of equal size and perform ONE query for EACH
chunk to fetch the data via self._data_query.

A chunk is determined by the test q.filter(self._column.in_(chunk)).
This is the only way that works in presence of the read-committed
isolation level.
"""
q = self._data_query
for each_window in self._windows(window_size):
for each_result in q.filter(each_window):
yield each_result

def _windows(self, window_size):
chunk = []
chunk_size = 0
for each in self._q_column():
if chunk_size < window_size:
chunk.append(each)
chunk_size += 1
if chunk_size == window_size:
yield self._window_for_chunk(chunk)
chunk = []
chunk_size = 0
if chunk_size > 0:
yield self._window_for_chunk(chunk)

def _q_column(self):
# distinct() ensures that each id is returned at most once despite
# a possible multiplying effect of a join().
return self._filter_query.with_entities(distinct(self._column))

def _window_for_chunk(self, chunk):
return self._column.in_(chunk)

##################
#### CODE END ####
##################


MOTIVATION: I have learned recently that Query.yield_per() does not work nicely
in combination with subqueryload(). The above recipe fixes that. Unfortunately
its usage is not as elegant and simple as q.yield_per(...).

If you have any idea how to accomplish the same with ONE query only (in SA 0.7.9):

def windowed_query(query, column, window_size):
query --magic-> filter_query
query --magic-> data_query
...

I would very much like to hear about it.


PERFORMANCE: My first tests suggest that it might be one order of magnitude
better than the Query.yield_per() we use now.

Note also that yield_per() with subqueryload() was still about twice as fast as
the same query without yield_per(). But this will be highly dependent on the
query I guess.


WARNING: We do not use this in the production yet. Use at your own risk.


Happy SA hacking,

Ladislav Lenart

Ladislav Lenart

unread,
Jun 7, 2013, 2:03:49 PM6/7/13
to sqlal...@googlegroups.com
Hello.

Resending because the original e-mail does not seem to make it to the mailing
list. Apologise for any duplicates. Here we go...

Ladislav Lenart

unread,
Jun 17, 2013, 5:49:48 AM6/17/13
to sqlal...@googlegroups.com
Hello.

This is the current version of our windowed query:


from sqlalchemy.sql.expression import distinct


def windowed_query(query, column, options_or_callback, window_size=100):
"""Perform (a correct) yield_per() operation. See WindowedQuery.yield_per()
for more.

EXAMPLE:
q = session.query(Foo).filter(Foo.name.like(u'%foo%'))
wq = windowed_query(q, Foo.id, [subqueryload(Foo.bars)])
for each_foo in wq:
print each_foo.name
for each_bar in each_foo.bars:
print each_bar
"""
return WindowedQuery(
query,
column,
options_or_callback
).yield_per(window_size)


class WindowedQuery(object):
"""Perform (a correct) yield_per() operation."""
def __init__(self, query, column, options_or_callback):
"""
The query MUST have NO options(...) and NO order_by(...). It MUST
contain all necessary join() and filter() to limit the result set as
desired.

The column is the id column of the main result ORM class. It is used to
divide the results into windows of equal size.

The options_or_callback can be a list of Query.options(...) such as
subqueryload(). If so, the following query is created to fetch data of
each window:

q = session.query(self._column.class_).options(*self._options)
q = q.filter(self._column.in_(each_window)

The options_or_callback can be a one-argument function responsible for
complete processing of one window. Its only argument is the list of ids
of the window. It MUST return an iterable over results. It is called
once for each window.
"""
self._query = query
self._column = column
self._session = query.session
if isinstance(options_or_callback, list):
self._options = options_or_callback
self._callback = None
else:
self._options = None
self._callback = options_or_callback

def yield_per(self, window_size):
"""Process results in chunks (windows).
Steps:
* Obtain ids of ALL result rows via slightly modified self._query.
* Divide ids to chunks of equal size and perform ONE query for EACH
chunk to fetch the data.

A chunk is determined by the test q.filter(self._column.in_(chunk)).
This is the only way that works in presence of the read-committed
isolation level.
"""
if self._options is not None:
return self._yield_per_options(window_size)
if self._callback is not None:
return self._yield_per_callback(window_size)

def _yield_per_options(self, window_size):
"""Deduce data query from self._column and self._options."""
q = self._session.query(self._column.class_).options(*self._options)
for each_window in self._windows(window_size):
for each_result in q.filter(self._column.in_(each_window)):
yield each_result

def _yield_per_callback(self, window_size):
"""Use a callback function responsible for obtaining the results:
def callback(win):
# Equivalent to the use of self._options.
q = session.query(Foo).options(...)
q = q.filter(Foo.id.in_(win))
return q
"""
for each_window in self._windows(window_size):
for each_result in self._callback(each_window):
yield each_result

def _windows(self, window_size):
win = []
win_size = 0
for each in self._q_column():
if win_size < window_size:
win.append(each)
win_size += 1
if win_size == window_size:
yield win
win = []
win_size = 0
if win_size > 0:
yield win

def _q_column(self):
"""distinct() ensures that each id is returned at most once despite
a possible multiplying effect of a join().
"""
return self._query.with_entities(distinct(self._column))


We use it in the production for a few days now. Perhaps someone will find it
useful too.


Ladislav Lenart
Reply all
Reply to author
Forward
0 new messages