Hi all,
I've got a queue in postgres that currently uses a raw sql query to dequeue items. The queue has multiple "topics" that the metadata/selector uses to dequeue specific items
DELETE FROM queue
WHERE id = (
SELECT id
FROM queue
WHERE :selector = ANY (metadata)
ORDER BY queue_date
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *;
I'm trying to put this into sqlalchemy, and have the following code.
search = (
session.query(Queue.id)
.filter(Queue.metadata.any_() == selector)
.order_by(Queue.queue_date)
.with_for_update(skip_locked=True)
.limit(1)
.cte("search")
)
delete_returning = (
Queue.__table__.delete()
.where(search == Queue.id)
.returning(literal("*"))
)
However the SQL generated by SQLalchmey is missing the SKIP_LOCKED statement, which is quite important for the efficiency of the queue.
Generated SQL:
FROM queue
WHERE :param_1 = ANY (queue.metadata) ORDER BY queue.queue_date
LIMIT :param_2 FOR UPDATE) RETURNING :param_3 AS anon_1
This is strange to me as printing the string from just the search subquery (if you take out the CTE) includes the SKIP_LOCKED statement
SELECT queue.id AS queue
FROM queue
WHERE %(param_1)s = ANY (queue.metadata) ORDER BY queue.queue_date
LIMIT %(param_2)s FOR UPDATE SKIP LOCKED
I'm using SQLAlchemy 1.2.0, python 3.6 on OSX, psycopg2==2.7.1 and postgres 9.6.8
The other way of solving this would be just to keep using raw SQL and convert the resulting RowProxy to the ORM object, so if anyone knows what the proper way of going about that, it would also be appreciated :)
Thanks, Joel