Is it possible to use a Comparator to override a database/SQL-side comparison symmetrically?

525 views
Skip to first unread message

Rudolf Cardinal

unread,
Feb 9, 2016, 12:30:58 AM2/9/16
to sqlalchemy

Dear all,


I've been trying to implement a datetime-style field in SQL Alchemy that, in the database backend, uses a specific ISO-8601 format (e.g. "2013-04-30T00:26:03.000+05:00"), and is a datetime at the Python end. The primary database engine is MySQL.

I don't want to implement this as a hybrid property, because I want a lot of these fields and to be able to use them automatically. I've implemented a TypeDecorator with a custom comparator (code below). Let's call the field type DateTimeAsIsoTextThe field works fine for the following operations:

  • save from Python to database
  • load from database to Python
  • compare database column to literal, either as (DateTimeAsIsoText op LITERAL), or (LITERAL op DateTimeAsIsoText)
  • compare DateTimeAsIsoText column to another column of the same type
  • compare DateTimeAsIsoText to DATETIME
  • ... but not DATETIME to DateTimeAsIsoText.

I'm stuck because the comparator's operate() function isn't called in this situation, it seems, and neither is its reverse_operate() function.


I'm using SQLAlchemy 1.0.11 (and MySQL 5.6.28, though I suspect that isn't relevant).


Sample code is below.


Results: All comparisons work, except the last set of comparisons, of a plain DATETIME field to a DateTimeAsIsoText field (in that order; comparison section "E" in the code below).


Am I missing something obvious, or is it not possible to override SQL comparisons when the SQLAlchemy field is on the right-hand side of a comparison?


Thank you!

all the best,

Rudolf.



Code:


#!/usr/bin/env python3


import datetime

import dateutil.parser

import getpass

import logging

import pytz

import sqlalchemy

from sqlalchemy import (

    Column,

    DateTime,

    Integer,

    String,

    TypeDecorator

)

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy.orm import sessionmaker

from sqlalchemy.sql.expression import func


logger = logging.getLogger(__name__)

logger.addHandler(logging.NullHandler())


Base = declarative_base()



# =============================================================================

# Ancillary functions

# =============================================================================


def heading(x):

    print("=" * 79)

    print(x)

    print("=" * 79)



def ask_user(prompt, default=None, returntype=None, mask=False):

    if default is not None:

        fullprompt = "{} [{}]: ".format(prompt, default)

    else:

        fullprompt = "{}: ".format(prompt)

    success = False

    while not success:

        if mask:

            value = getpass.getpass(fullprompt) or default

        else:

            value = input(fullprompt) or default

        if returntype is not None:

            try:

                value = returntype(value)

                success = True

            except:

                print("Bad value, try again")

        else:

            success = True

    return value



def engine_mysql(user, password, host, port, database, echo=True,

                 interface="pymysql"):

    CONNECTSTRING = (

        "mysql+{interface}://{user}:{password}@{host}:{port}/"

        "{database}".format(

            interface=interface,

            user=user,

            password=password,

            host=host,

            port=port,

            database=database

        ))

    # Removed "?charset=utf8&use_unicode=0"; with PyMySQL==0.7.1 it causes

    # TypeError: 'str' does not support the buffer interface

    # because dates come back as e.g. b'2013-05-30 06:00:00' and then the

    # convert_datetime function in pymysql/converters.py chokes.

    return sqlalchemy.create_engine(CONNECTSTRING, echo=echo)



def engine_mysql_commandline(echo=True):

    host = ask_user("Host", "localhost")

    port = 3306

    database = ask_user("Database", "testdb")

    user = ask_user("User", "root")

    password = ask_user("Password", mask=True)

    return engine_mysql(user, password, host, port, database, echo=echo)



# =============================================================================

# Custom date/time field as ISO-8601 text including timezone

# =============================================================================


def python_datetime_to_iso(x):

    """From a Python datetime to an ISO-formatted string in our particular

    format."""

    # https://docs.python.org/3.4/library/datetime.html#strftime-strptime-behavior  # noqa

    try:

        mainpart = x.strftime("%Y-%m-%dT%H:%M:%S.%f")  # microsecond accuracy

        timezone = x.strftime("%z")  # won't have the colon in

        return mainpart + timezone[:-2] + ":" + timezone[-2:]

    except AttributeError:

        return None



def iso_to_python_datetime(x):

    """From an ISO-formatted string to a Python datetime, with timezone."""

    try:

        return dateutil.parser.parse(x)

    except (AttributeError, ValueError):

        return None



def python_datetime_to_utc(x):

    """From a Python datetime, with timezone, to a UTC Python version."""

    try:

        return x.astimezone(pytz.utc)

    except AttributeError:

        return None



def mysql_isotzdatetime_to_utcdatetime(x):

    """Creates an SQL expression wrapping a field containing our ISO-8601 text,

    making a DATETIME out of it, in the UTC timezone."""

    # For format, see

    #   https://dev.mysql.com/doc/refman/5.5/en/date-and-time-functions.html#function_date-format  # noqa

    # Note the use of "%i" for minutes.

    # Things after "func." get passed to the database engine as literal SQL

    # functions; http://docs.sqlalchemy.org/en/latest/core/tutorial.html

    return func.CONVERT_TZ(

        func.STR_TO_DATE(

            func.LEFT(x, func.LENGTH(x) - 6),

            '%Y-%m-%dT%H:%i:%S.%f'

        ),

        func.RIGHT(x, 6),

        "+00:00"

    )



def mysql_unknown_field_to_utcdatetime(x):

    """The field might be a DATETIME, or an ISO-formatted field."""

    return func.IF(

        func.LENGTH(x) == 19,

        # ... length of a plain DATETIME e.g. 2013-05-30 00:00:00

        x,

        mysql_isotzdatetime_to_utcdatetime(x)

    )



class DateTimeAsIsoText(TypeDecorator):

    '''Stores date/time values as ISO-8601.'''

    impl = sqlalchemy.types.String(32)  # underlying SQL type


    def process_bind_param(self, value, dialect):

        """Convert things on the way from Python to the database."""

        logger.debug(

            "process_bind_param(self={}, value={}, dialect={})".format(

                repr(self), repr(value), repr(dialect)))

        return python_datetime_to_iso(value)


    def process_literal_param(self, value, dialect):

        """Convert things on the way from Python to the database."""

        logger.debug(

            "process_literal_param(self={}, value={}, dialect={})".format(

                repr(self), repr(value), repr(dialect)))

        return python_datetime_to_iso(value)


    def process_result_value(self, value, dialect):

        """Convert things on the way from the database to Python."""

        logger.debug(

            "process_result_value(self={}, value={}, dialect={})".format(

                repr(self), repr(value), repr(dialect)))

        return iso_to_python_datetime(value)


    class comparator_factory(TypeDecorator.Comparator):

        """Process SQL for when we are comparing our column, in the database,

        to something else."""

        def operate(self, op, other):

            if isinstance(other, datetime.datetime):

                processed_other = python_datetime_to_utc(other)

            else:

                # OK. At this point, "other" could be a plain DATETIME field,

                # or a DateTimeAsIsoText field (or potentially something

                # else that we don't really care about). If it's a DATETIME,

                # then we assume it is already in UTC.

                processed_other = mysql_unknown_field_to_utcdatetime(other)

            logger.debug("operate(self={}, op={}, other={})".format(

                repr(self), repr(op), repr(other)))

            logger.debug("self.expr = {}".format(repr(self.expr)))

            # traceback.print_stack()

            return op(mysql_isotzdatetime_to_utcdatetime(self.expr),

                      processed_other)

            # NOT YET IMPLEMENTED: dialects other than MySQL, and how to

            # detect the dialect at this point.


        def reverse_operate(self, op, other):

            assert False, "I don't think this is ever being called"



class TestIso(Base):

    __tablename__ = 'test_iso_datetime'

    id = Column(Integer, primary_key=True)

    name = Column(String(20))

    plain_datetime = Column(DateTime)

    when_created = Column(DateTimeAsIsoText)

    when_deleted = Column(DateTimeAsIsoText)


    def __repr__(self):

        return (

            "<TestIso(id={}, name={}, "

            "plain_datetime={}, when_created={}, when_deleted={})>".format(

                self.id, self.name,

                repr(self.plain_datetime),

                repr(self.when_created), repr(self.when_deleted),

                self.q1)

        )



# =============================================================================

# Main, with unit testing

# =============================================================================


def test():

    logging.basicConfig(level=logging.DEBUG)


    engine = engine_mysql_commandline(echo=True)

    engine.connect()

    Session = sessionmaker()

    Session.configure(bind=engine)  # once engine is available

    session = Session()

    # Create tables

    Base.metadata.create_all(engine)


    # -------------------------------------------------------------------------

    # Unit testing for DateTimeAsIsoText

    # -------------------------------------------------------------------------


    # Insert things

    t0_str = "2013-04-30T00:26:03.000+05:00"  # previous month

    t1_str = "2013-05-30T03:26:00.000+01:00"  # Sweden

    t2_str = "2013-05-30T03:00:00.000+00:00"  # London

    t3_str = "2013-05-30T01:00:00.000-05:00"  # New York

    t2b_str = "2013-05-30T04:00:00.000+01:00"  # equals t2


    t0 = dateutil.parser.parse(t0_str)

    t1 = dateutil.parser.parse(t1_str)

    t2 = dateutil.parser.parse(t2_str)

    t3 = dateutil.parser.parse(t3_str)

    t2b = dateutil.parser.parse(t2b_str)


    # t1 -> t3 decrease lexically, but increase temporally

    assert t1_str > t2_str > t3_str

    assert t0 < t1 < t2 == t2b < t3


    session.query(TestIso).delete()

    alice = TestIso(id=1, name="alice",

                    when_created=t1, when_deleted=t2,

                    plain_datetime=python_datetime_to_utc(t3))

    session.add(alice)

    bob = TestIso(id=2, name="bob",

                  when_created=t2, when_deleted=t3,

                  plain_datetime=python_datetime_to_utc(t0))

    session.add(bob)

    celia = TestIso(id=3, name="celia",

                    when_created=t3, when_deleted=t2,

                    plain_datetime=python_datetime_to_utc(t1))

    session.add(celia)

    david = TestIso(id=4, name="david",

                    when_created=t3, when_deleted=t3,

                    plain_datetime=python_datetime_to_utc(t1))

    session.add(david)

    edgar = TestIso(id=5, name="edgar",

                    when_created=t2b, when_deleted=t2,

                    plain_datetime=python_datetime_to_utc(t2b))

    session.add(edgar)

    session.commit()


    heading("A. DateTimeAsIsoText test: DateTimeAsIsoText field VERSUS literal")

    q = session.query(TestIso).filter(TestIso.when_created < t2)

    assert q.all() == [alice]

    q = session.query(TestIso).filter(TestIso.when_created == t2)

    assert q.all() == [bob, edgar]

    q = session.query(TestIso).filter(TestIso.when_created > t2)

    assert q.all() == [celia, david]


    heading("B. DateTimeAsIsoText test: literal VERSUS DateTimeAsIsoText "

            "field")

    q = session.query(TestIso).filter(t2 > TestIso.when_created)

    assert q.all() == [alice]

    q = session.query(TestIso).filter(t2 == TestIso.when_created)

    assert q.all() == [bob, edgar]

    q = session.query(TestIso).filter(t2 < TestIso.when_created)

    assert q.all() == [celia, david]


    heading("C. DateTimeAsIsoText test: DateTimeAsIsoText field VERSUS "

            "DateTimeAsIsoText field")

    q = session.query(TestIso).filter(TestIso.when_created <

                                      TestIso.when_deleted)

    assert q.all() == [alice, bob]

    q = session.query(TestIso).filter(TestIso.when_created ==

                                      TestIso.when_deleted)

    assert q.all() == [david, edgar]

    q = session.query(TestIso).filter(TestIso.when_created >

                                      TestIso.when_deleted)

    assert q.all() == [celia]


    heading("D. DateTimeAsIsoText test: DateTimeAsIsoText field VERSUS "

            "plain DATETIME field")

    q = session.query(TestIso).filter(TestIso.when_created <

                                      TestIso.plain_datetime)

    assert q.all() == [alice]

    q = session.query(TestIso).filter(TestIso.when_created ==

                                      TestIso.plain_datetime)

    # CAUTION: don't have any non-zero millisecond components; they'll get

    # stripped from the plain DATETIME and exact comparisons will then fail.

    assert q.all() == [edgar]

    q = session.query(TestIso).filter(TestIso.when_created >

                                      TestIso.plain_datetime)

    assert q.all() == [bob, celia, david]


    heading("E. DateTimeAsIsoText testplain DATETIME field VERSUS "

            "DateTimeAsIsoText field")

    q = session.query(TestIso).filter(TestIso.plain_datetime >

                                      TestIso.when_created)

    assert q.all() == [alice]

    q = session.query(TestIso).filter(TestIso.plain_datetime ==

                                      TestIso.when_created)

    assert q.all() == [edgar]

    q = session.query(TestIso).filter(TestIso.plain_datetime <

                                      TestIso.when_created)

    assert q.all() == [bob, celia, david]


    heading("F. DateTimeAsIsoText test: SELECT everything")

    q = session.query(TestIso)

    assert q.all() == [alice, bob, celia, david, edgar]


if __name__ == '__main__':

    test()


Mike Bayer

unread,
Feb 9, 2016, 9:43:06 AM2/9/16
to sqlal...@googlegroups.com


On 02/09/2016 12:30 AM, Rudolf Cardinal wrote:
> Dear all,
>
>
> I've been trying to implement a datetime-style field in SQL Alchemy
> that, in the database backend, uses a specific ISO-8601 format (e.g.
> "2013-04-30T00:26:03.000+05:00"), and is a datetime at the Python end.
> The primary database engine is MySQL.
>
> I don't want to implement this as a hybrid property, because I want a
> lot of these fields and to be able to use them automatically. I've
> implemented a TypeDecorator with a custom comparator (code below). Let's
> call the field type DateTimeAsIsoText. The field works fine for the
> following operations:
>
> * save from Python to database
> * load from database to Python
> * compare database column to literal, either as (DateTimeAsIsoText op
> LITERAL), or (LITERAL op DateTimeAsIsoText)
> * compare DateTimeAsIsoText column to another column of the same type
> * compare DateTimeAsIsoText to DATETIME
> * ... but *not* DATETIME to DateTimeAsIsoText.
>
> I'm stuck because the comparator's operate() function isn't called in
> this situation, it seems, and neither is its reverse_operate() function.

well Python only allows one side's operator override to be called, so
when DateTime is on the left, its own operate() method is called and not
yours.

Because DateTime doesn't know anything about your type, in order to make
it aware, you have to change it.

I couldn't get your coercion logic to do exactly the expected thing, but
the general idea is like this:

class ISOComparableDateTime(DateTime):
class comparator_factory(DateTime.comparator_factory):
def operate(self, op, other):
if isinstance(other.type, DateTimeAsIsoText):
return op(mysql_isotzdatetime_to_utcdatetime(other),
mysql_unknown_field_to_utcdatetime(self.expr))
else:
return DateTime.comparator_factory.operate(self, op, other)



class TestIso(Base):
__tablename__ = 'test_iso_datetime'
id = Column(Integer, primary_key=True)
name = Column(String(20))
plain_datetime = Column(ISOComparableDateTime)
when_created = Column(DateTimeAsIsoText)
when_deleted = Column(DateTimeAsIsoText)


I'm not familiar with any other hook right now that can otherwise change
what DateTime does with an unknown datatype on the right side. Not
that there couldn't be such a hook, but at the moment I don't think
there's a "custom coerce the right-hand-side" hook so using a modified
DateTime on the left is the most direct solution.




>
>
> I'm using SQLAlchemy 1.0.11 (and MySQL 5.6.28, though I suspect that
> isn't relevant).
>
>
> Sample code is below.
>
>
> *Results: *All comparisons work, except the last set of comparisons, of
> a plain DATETIME field to a DateTimeAsIsoText field /(in that order/;
> comparison section "E" in the code below).
>
>
> Am I missing something obvious, or is it not possible to override SQL
> comparisons when the SQLAlchemy field is on the right-hand side of a
> comparison?
>
>
> Thank you!
>
> all the best,
>
> Rudolf.
>
>
>
> *Code:*
> --
> You received this message because you are subscribed to the Google
> Groups "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to sqlalchemy+...@googlegroups.com
> <mailto:sqlalchemy+...@googlegroups.com>.
> To post to this group, send email to sqlal...@googlegroups.com
> <mailto:sqlal...@googlegroups.com>.
> Visit this group at https://groups.google.com/group/sqlalchemy.
> For more options, visit https://groups.google.com/d/optout.

Rudolf Cardinal

unread,
Feb 9, 2016, 10:18:12 PM2/9/16
to sqlalchemy
Dear Mike,

Thank you very much! That makes sense; good to know.

all the best,
Rudolf.

immerrr again

unread,
Feb 11, 2016, 1:16:41 PM2/11/16
to sqlalchemy
> well Python only allows one side's operator override to be called, so
> when DateTime is on the left, its own operate() method is called and not
> yours.

Excuse me for barging in, but the operator function can return a "NotImplemented" singleton and Python will retry the operation reflected on the other operand:

 https://docs.python.org/3.5/reference/datamodel.html#emulating-numeric-types

E.g.: https://gist.github.com/immerrr/c42ed17f437c8473e8d8

Mike Bayer

unread,
Feb 11, 2016, 1:48:27 PM2/11/16
to sqlal...@googlegroups.com
Good point, but this may not be an option. Currently we return
NULLTYPE for such ambigious comparisons but it's not known what side
effects would occur if we across the board assumed the other side should
operate (plus the other side needs to *never* return NotImplemented).

immerrr again

unread,
Feb 12, 2016, 2:29:44 AM2/12/16
to sqlal...@googlegroups.com
On Thu, Feb 11, 2016 at 8:48 PM, Mike Bayer <mik...@zzzcomputing.com> wrote:
>
> Good point, but this may not be an option. Currently we return NULLTYPE
> for such ambigious comparisons but it's not known what side effects would
> occur if we across the board assumed the other side should operate (plus the
> other side needs to *never* return NotImplemented).

I don't see why does it need that. It can return NotImplemented, too.
In py3, if both sides of the binary operator return NotImplemented,
the operation itself is not implemented and a TypeError is raised. In
py2, for arithmetic operations the same happens, but for comparison
ops the result is that of "id(left) OP id(right)", which is good for
sorting collections of unorderable objects, but bad otherwise.

Given that the current default comparator returns a BinaryOperation
unconditionally, returning NotImplemented from it could indeed break
the existing code. But what if the comparator itself, before returning
the BinaryOperation as previously, would let the other side run the
corresponding reflected operation. If the result is NotImplemented,
then proceed with the usual behaviour, if not, return the result of
the reflected operation.

AFAICS, the only backward incompatible situation is when the other
side is silly enough to return some gibberish rather than
NotImplemented for unhandled operand types. Previously, it was not
even consulted, and now it would return gibberish instead of a
BinaryOperation. To me this sounds as an incorrect implementation of
Python spec.

Mike Bayer

unread,
Feb 12, 2016, 6:55:33 PM2/12/16
to sqlal...@googlegroups.com
I'm not really seeing at the moment how this is feasible considering
that one side or the other *has* to override the Python method and
return a BinaryExpression. Simply put, which side should win? If e.g.
I have an Integer type on one side, and a MagicInteger on the other,
what rule determines that Integer defers to MagicInteger ? What if
MagicInteger is compared against a SuperMagicInteger?

I think you're getting at this, that both sides somehow would want to
coordinate here. But I don't see why that has to happen all the way
down at the Python operator level as by the time we're making this
choice we're a few levels into a call stack already, and I can tell you
that the whole system of hooks already has a lot of paths that can
easily be tricked into going into endless loops if things arent just right.

What i had in mind is that the type / operator system would have some
other system of consulting the other side of the expression where it
gets a chance to veto the active-side's decision on what typing behavior
we want to coerce, likely within the _adapt_expression() method or close
to it.

A naive implementation of "return NotImplemented if we don't know what
to do with a type" looks like:

diff --git a/lib/sqlalchemy/sql/default_comparator.py
b/lib/sqlalchemy/sql/default_comparator.py
index 1bb1c34..c74e27f 100644
--- a/lib/sqlalchemy/sql/default_comparator.py
+++ b/lib/sqlalchemy/sql/default_comparator.py
@@ -82,6 +82,8 @@ def _binary_operate(expr, op, obj, reverse=False,
result_type=None,
if result_type is None:
op, result_type = left.comparator._adapt_expression(
op, right.comparator)
+ if result_type is NotImplemented:
+ return NotImplemented

return BinaryExpression(
left, right, op, type_=result_type, modifiers=kw)
diff --git a/lib/sqlalchemy/sql/sqltypes.py b/lib/sqlalchemy/sql/sqltypes.py
index 81630fe..9e0d01f 100644
--- a/lib/sqlalchemy/sql/sqltypes.py
+++ b/lib/sqlalchemy/sql/sqltypes.py
@@ -53,7 +53,7 @@ class _DateAffinity(object):
op, to_instance(
self.type._expression_adaptations.
get(op, self._blank_dict).
- get(othertype, NULLTYPE))
+ get(othertype, NotImplemented))
)
comparator_factory = Comparator


With just that, tons of basic tests fail. Feel free to work on this issue.






>
Reply all
Reply to author
Forward
0 new messages