Using before_update and before_insert events listeners with PostgreSQL upserts

2,090 views
Skip to first unread message

Chris Johnson

unread,
Oct 19, 2018, 1:32:07 PM10/19/18
to sqlalchemy
Python Version: 3.6.6
SQLAlchemy version: 1.2.12
PostgreSQL version: 10.5

This is my fist time using PGSQL over MySQL or SQLite and my first project using SQLAlchemy over SQLite or the Django ORM. I'm also not very good at programming in general so please forgive the simple questions.

I have a script that I'm using to record incoming messages from an API and/or a websocket, including edits to messages. The script is using upserts to save data to the database. I tried adding an event listeners for updates and inserts but the event listeners are not firing off their function when a message is inserted or updated. Here is the test code I'm working with:

class User(newBase):
    __tablename__
= 'user'
    __table_args__
= {'autoload':True}
class Message(newBase):
    __tablename__
= 'message'
    __table_args__
= {'autoload':True}
class Lobby(newBase):
    __tablename__
= 'lobby'
    __table_args__
= {'autoload':True}
class Community(newBase):
    __tablename__
= 'community'
    __table_args__
= {'autoload':True}


for table in tables:
   
for row in _yield_limit(olddb.query(table), table.frame):
        logging
.debug(row.frame)
        messagedata
= row.raw
        message
= wsrsi.sortframe(messagedata)
       
if message[0] == 'message':
            message
= message[1]
           
if message.iserased == True:
                eraser
= message.erasedby
                eraserid
= eraser.id
                userInsert
(eraser)
           
else:
                 eraser
= None
                 eraserid
= None
            userInsert
(message.author)


            messageins
= insert(Message).values(
                id
= message.id,
                iserased
= message.iserased,
                erasedby
= eraserid,
                author
= message.author.id,
                body
= message.body,
               
private = message.isprivate,
                created
= convTime(message.timecreated),
                edited
= convTime(message.timeedited),
                lobby
= message.lobbyid,
                usermentions
= message.mentions,
           
)
           
            message_dict
= {
                c
.name: c
               
for c in messageins.excluded
           
}


            update_message
= messageins.on_conflict_do_update(
                index_elements
= ["id"],
                set_
=message_dict
           
)


           
try:
                newdb
.execute(update_message)
           
except IntegrityError as e:
               
print('Foreign key violation error, logging.')
               
with open('fkerrors.txt', 'a+') as f:
                    f
.write(str(e)+'\n')
                    f
.write(str(message.raw))
                    f
.write('\n================================\n')
                    f
.close()
           
except ValueError as e:
               
if str(e) == 'A string literal cannot contain NUL (0x00) characters.':
                   
print('Null character found in message')
                   
with open('nullerrors.txt', 'a+') as f:
                        f
.write(str(e)+'\n')
                        f
.write(str(message.raw))
                        f
.write('\n================================\n')
                        f
.close

For reference, `table` is an array of tables from an old MySQL database I've inherited that I'm migrating to PGSQL.

This is my event listener code:

@listens_for(Message, 'before_update')
def saveHistory(mapper, connect, target):
    historicaldata
= dict(target)
   
print("SaveHistory triggered\n")


event.listen(Message, 'before_insert', saveHistory)
event.listen(Message, 'before_update', saveHistory)

I've tried arranging this in several different ways including using the decorator on the table classes but no matter how I structure this the event listener does not fire on insert. I've done a lot of searching but I haven't been able to find any information on using event listeners specifically with PGSQL upserts so I don't know if the problem is with my code or if it's just not supported when using upserts. Am I doing something wrong?

Simon King

unread,
Oct 22, 2018, 4:30:04 AM10/22/18
to sqlal...@googlegroups.com
I think the problem is that you are trying to use ORM-level events
(https://docs.sqlalchemy.org/en/latest/orm/events.html), but you
aren't using the ORM to insert your data. Those events would fire if
you created a Session, added your Message objects to that Session, and
then flushed it. You are bypassing the ORM and using the insert()
construct directly, so the events don't fire.

Unfortunately I don't think there's any way right now to use "ON
CONFLICT DO UPDATE" from the ORM. There is Session.merge
(https://docs.sqlalchemy.org/en/latest/orm/session_state_management.html#merging),
but that's an in-python check to see if the record already exists,
rather than allowing the DB to do it. It might work for you, depending
on how fast you need to process these messages.

Simon
> --
> SQLAlchemy -
> The Python SQL Toolkit and Object Relational Mapper
>
> http://www.sqlalchemy.org/
>
> To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description.
> ---
> You received this message because you are subscribed to the Google Groups "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+...@googlegroups.com.
> To post to this group, send email to sqlal...@googlegroups.com.
> Visit this group at https://groups.google.com/group/sqlalchemy.
> For more options, visit https://groups.google.com/d/optout.

Chris Johnson

unread,
Oct 26, 2018, 11:37:10 AM10/26/18
to sqlalchemy
Unfortunately this is very write-heavy.

What would be cheaper for the database server, using this merge method or manually doing an exists query and handling it myself?

For reference, what I'm trying to do is save edit history do a field in the row. It works roughly like this:

Columns: ID (PK), author, messageBody, timestamp, editedTimestamp, editHistory

editHistory is a JSONB field containing a list of dicts containing partial rows that are archives of previous versions. So it would look something like this in the JSONB data:

[
   
{
       
"author": author,
       
"messagebody": messageBody,
       
"editedTimestamp": editedTimestamp,
   
}
]


1. Attempt insert
2. If ID PK exists, append a dict of the editable fields to the JSON list in editHistory
3. Update Row

Mike Bayer

unread,
Oct 26, 2018, 12:50:09 PM10/26/18
to sqlal...@googlegroups.com
the most efficient way is to SELECT all the rows that exist already,
often I do this in batches if there are a very large number of
objects, then hold onto them so that they stay referenced in the
session. then use session.merge() on your modified records (or the
appropriate batch matching the range you've loaded) to merge them in
quickly, acting upon the records that you know you've loaded already.
Session.merge() uses the already-loaded object and will not emit an
additional SELECT statement when called. When you watch your SQL
logs, you should see exactly one SELECT statement per batch, then
INSERT / UPDATE statements totalling the number of objects you've
merged.

Mike Bayer

unread,
Oct 26, 2018, 12:53:41 PM10/26/18
to sqlal...@googlegroups.com
On Fri, Oct 26, 2018 at 12:49 PM Mike Bayer <mik...@zzzcomputing.com> wrote:
>
> the most efficient way is to SELECT all the rows that exist already,
> often I do this in batches if there are a very large number of
> objects, then hold onto them so that they stay referenced in the
> session. then use session.merge() on your modified records (or the
> appropriate batch matching the range you've loaded) to merge them in
> quickly, acting upon the records that you know you've loaded already.
> Session.merge() uses the already-loaded object and will not emit an
> additional SELECT statement when called. When you watch your SQL
> logs, you should see exactly one SELECT statement per batch, then
> INSERT / UPDATE statements totalling the number of objects you've
> merged.

Oops, slight mistake, Session.merge() will still emit SELECT for the
rows that don't exist in the identity map. So also, as you SELECT
the objects, store the primary keys for each object in a set. Then
when you are to persist your changes, use Session.add() if the primary
key is not present in the set yet, or use session.merge() if it is.
Reply all
Reply to author
Forward
0 new messages