>
> I tried initially without any additional column indexes, and (observed
> by watching debug output) the program started off quickly enough, but
> after a hundred or so records the rate of insertion started to drop
> off dramatically. I reasoned that this was because the code was being
> forced to perform a linear search for each new entry processed, and
> hypothesized that this would be improved by judicious addition of some
> additional column indexes. I duly did this, but the performance is
> not observably improved. Code snippets below.
>
> I'm using Python 2.5.1, SA 0.4.4 and SQLite 3.5.7
>
> I'm not looking for blinding performance, just "some time today". I'm
> trying to process about 1300 source records, and so far it's taken a
> couple of hours at 100% CPU utilization. The sqlite data file is
> still zero length. I can't tell how far it's got through the source
> data.
1300 rows, even if individually fetched and inserted one by one (which
it seems is what youre doing), shouldn't take more than a minute. So
some questions to ask are, how big is this table you're selecting
from ? You are filtering on about ten columns. What is important to
note there is that if you did in fact place indexes on all ten columns
to speed up selecting, you'd also directly impact the speed of
insertion negatively. So thats something to consider.
>
>
> My questions, then, are:
> 1. does the SQLAlchemy ORM/query/filter_by use column indexes to
> optimize queries?
your database is what would be taking advantage of column indexes. If
you tell SQLA to select from a table and filter on these 10 columns,
theres no particular optimization to be done at the SQL construction
layer.
>
> 2. if so, are there any particular steps I need to take to ensure this
> happens?
you should experiment with your databases "explain plan" feature to
show what indexes are taking effect for the SQL being issued. With
SQLA, turn on SQL echoing to see the conversation taking place.
> As a supplementary question, are there any good reference materials
> for SA whose intended audience level lies somewhere between the
> excellent introductory tutorials and concise API reference available
> at the web site?
There is an oreilly book, which I havent seen yet, coming out in
June. I cant speak to its accuracy or up-to-dateness since we
release new features very frequently.
>
> and the access code like this:
>
> # Update occurrence count or add new message
> emlq = session.query(Email).filter_by(
> sendadr = sadr,
> fromadr = fadr,
> toadr = tadr,
> sendnam = snam,
> fromnam = fnam,
> tonam = tnam,
> subject = msgsubj,
> received = msgrcvd,
> spam = msgspam,
> folderid = folderid)
> try:
> eml = emlq.one()
> except Exception, e:
> eml = Email(
> sadr, snam, fadr, fnam, tadr, tnam,
> msgsubj, msgrcvd, msgspam,
> mboxname, folderid)
> eml.occurs += 1
> session.save_or_update(eml)
One thing you definitely want to do here is flush() your session after
several rows. The reason for your latency might be just lots of
pending data building up in the session unflushed.
Additionally, for a datafile import I'd probably not use the ORM at
all and use straight SQL constructs, i.e.
mytable.select().where(sendadr=sadr, fromadr=fadr, ...), and then
mytable.update() or mytable.insert() depending on the results. The
SQL expression tutorial lays it all out how to use those.
2:00 seems very high- is that 2 minutes? Below are two similar bulk
table loads. The first uses the same insert-or-update methodology and
only the relational layer (no ORM)- that clocks in at 1.25 seconds on my
laptop. The second is an ORM implementation with a different duplicate
detection methodology- that clocks in at 2.0 seconds.
---
##
## Relational version
##
import os
import time
import random
from sqlalchemy import *
from sqlalchemy.exceptions import IntegrityError
data_cols = (
'sendadr', 'fromadr', 'toadr', 'sendnam', 'fromnam',
'tonam', 'subject', 'received', 'spam', 'folderid' )
chunk = lambda: '%x' % random.getrandbits(400)
dataset = [dict((col, chunk()) for col in data_cols)
for _ in xrange(1200)]
dupes = random.sample(dataset, 50)
db = '1krows.db'
if os.path.exists(db):
os.unlink(db)
engine = create_engine('sqlite:///%s' % db)
metadata = MetaData(engine)
table = Table('t', metadata,
Column('id', Integer, primary_key=True),
Column('occurs', Integer, default=1),
*(Column(col, Text) for col in data_cols))
table.append_constraint(UniqueConstraint(*data_cols))
metadata.create_all()
table.insert().execute(dupes)
assert table.select().count().scalar() == 50
start = time.time()
insert = table.insert()
update = (table.update().
where(and_(*((table.c[col] == bindparam(col))
for col in data_cols))).
values({'occurs': table.c.occurs+1}))
conn = engine.connect()
tx = conn.begin()
for row in dataset:
try:
conn.execute(insert, row)
except IntegrityError:
conn.execute(update, row)
tx.commit()
end = time.time()
assert table.select().count().scalar() == 1200
assert select([func.count(table.c.id)],
table.c.occurs==2).scalar() == 50
print "elapsed: %04f" % (end - start)
##
## ORM version
##
import hashlib
import os
import time
import random
from sqlalchemy import *
from sqlalchemy.orm import *
data_cols = (
'sendadr', 'fromadr', 'toadr', 'sendnam', 'fromnam',
'tonam', 'subject', 'received', 'spam', 'folderid' )
chunk = lambda: '%x' % random.getrandbits(400)
dataset = [dict((col, chunk()) for col in data_cols)
for _ in xrange(1200)]
def hashrow(row):
return hashlib.sha1(
','.join(row[c] for c in data_cols)).hexdigest()
dupes = []
for row in random.sample(dataset, 50):
dupe = row.copy()
dupe['hash'] = hashrow(dupe)
dupes.append(dupe)
db = '1krows.db'
if os.path.exists(db):
os.unlink(db)
engine = create_engine('sqlite:///%s' % db)
metadata = MetaData(engine)
table = Table('t', metadata,
Column('id', Integer, primary_key=True),
Column('occurs', Integer, default=1),
Column('hash', String(40), unique=True),
*(Column(col, Text) for col in data_cols))
metadata.create_all()
table.insert().execute(dupes)
assert table.select().count().scalar() == 50
class Email(object):
def __init__(self, **kw):
for key, value in kw.items():
setattr(self, key, value)
def hashval(self):
return hashrow(dict((col, getattr(self, col))
for col in data_cols))
mapper(Email, table)
start = time.time()
session = create_session()
session.begin()
data = [Email(**row) for row in dataset]
chunk, remaining = [], [(e.hashval(), e) for e in data]
while remaining:
chunk, remaining = remaining[:100], remaining[100:]
by_hash = dict(chunk)
dupes = (session.query(Email).
filter(Email.hash.in_(by_hash.keys()))).all()
for dupe in dupes:
dupe.occurs += 1
by_hash.pop(dupe.hash)
for hashval, email in by_hash.items():
email.hash = hashval
session.save(email)
session.flush()
session.commit()
end = time.time()
assert table.select().count().scalar() == 1200
assert select([func.count(table.c.id)],
table.c.occurs==2).scalar() == 50
print "elapsed: %04f" % (end - start)