def apply_one(func, shard):
''' take a shard, connect to the database, create a cursor, and
pass it to func '''
db = shard.establishConnection()
try:
cursor = db.cursor()
yield func(cursor)
finally:
db.close()
# func first to mimic the parameter order of map()
def apply_all(func, shards):
''' for each shard connect to the database, create a cursor, and
pass it to func '''
return map(lambda shard: apply_one(func, shard), shards)
def selectOneFromShards(shards, sql, args):
''' execute sql on each shard, returning the first row (if any) on
each shard'''
def fetch(cursor):
curser.fetchone(sql, args)
return filter(None, apply_all(fetch, shards))
def selectManyFromShards(shards, sql, args=None, size=None):
''' execute sql on each shard, collecting the rows from each shard,
up to size rows total'''
def fetch(cursor, stillToFetch):
''' take a cursor and fetch its rows, returning no more than
stillToFetch '''
cursor.execute(sql, args)
return cursor.fetchmany(stillToFetch)
def selector(state, shard):
''' apply a shard to the current state '''
if state["size"] is None or state["size"] < 1
# do nothing to the current state
return state
else
# apply a fetch to the shard using the size of the current state
results = apply_one(lambda cursor: fetch(cursor,
state["size"]), shard)
# create an updated state
return {"size": state["size"] - len(results), "results":
state["results"].extend(results)}
# create an initial state from the the size with empty results
initial_state = {"size": size, "results": []}
# reduce each shard into the current state
final_state = reduce(selector, shards, initial_state)
return filter(None, final_state["results"])
# helper functions to handle linked lists
def selectOne(self, sql, args):
return selectOneFromShards(valid_shards(self._shard), sql, args)
def selectMany(self, sql, args=None, size=None):
return selectManyFromShards(valid_shards(self._shard), sql, args,
size)
Also, did that blog post introduce a bug into selectMany? I thought
you had to call cursor.execute(sql, args) and cursor.fetchmany(size),
but it seems to just use cursor.fetchmany(sql, args). Wierd.