I'm trying to implement support for Postgres's composite types, which essentially let you make a type which is a struct of other types. This involves several kinds of functionality:
* DDL compilation stuff to actually create the type before it's used. If there's nested types, then one type must be created before another type can use it.
I actually got the DDL compilation to work, but I had to mess around with a lot of SQLAlchemy internals, copying liberally from SchemaType and the ENUM implementation. It works, but I don't understand how it works, and I'm quite certain I'm doing several things wrongly. I'd also appreciate some tips on how to properly do the register_composite calls on the connections which will use them, in dependency order (and of course not doing them if the types haven't been created yet).
Is there a particular reason _CreateDropBase is internal-only? It seems useful for any DDL constructs which will need to create and drop things.
from collections import namedtuple, OrderedDict
from sqlalchemy import event, util
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.types import UserDefinedType, to_instance, SchemaType
from sqlalchemy.sql.expression import ColumnElement
from sqlalchemy.schema import _CreateDropBase
class CreateCompositeType(_CreateDropBase):
pass
@compiles(CreateCompositeType)
def visit_create_composite_type(create, compiler, **kw):
type_ = create.element
fields = []
for key, value in type_.typemap.items():
fields.append("{} {}".format(key, compiler.dialect.type_compiler.process(to_instance(value))))
return "CREATE TYPE {} AS ({})".format(compiler.preparer.format_type(type_), ", ".join(fields))
class DropCompositeType(_CreateDropBase):
pass
@compiles(DropCompositeType)
def visit_drop_composite_type(drop, compiler, **kw):
type_ = drop.element
return "DROP TYPE {}".format(compiler.preparer.format_type(type_))
# PGCompositeElement and PGCompositeType are based on Michael Bayer's
# example at
class PGCompositeElement(ColumnElement):
def __init__(self, base, attrname, type_):
self.base = base
self.attrname = attrname
self.type = to_instance(type_)
@compiles(PGCompositeElement)
def _compile_pgelem(element, compiler, **kw):
return "%s.%s" % (
compiler.process(element.base, **kw),
element.attrname
)
class _Namespace(object):
def __init__(self, comparator):
self.comparator = comparator
def __getattr__(self, key):
try:
type_ = self.comparator.type.typemap[key]
except KeyError:
raise KeyError(
"Type '%s' doesn't have an attribute: '%s'" %
(self.comparator.type, key))
return PGCompositeElement(
self.comparator.expr,
key,
type_)
class PGCompositeType(UserDefinedType, SchemaType):
def __init__(self, name, typemap):
SchemaType.__init__(self)
if not isinstance(typemap, OrderedDict):
typemap = OrderedDict(typemap)
self.typemap = typemap
self.tupletype = namedtuple(name, typemap.keys())
class comparator_factory(UserDefinedType.Comparator):
@property
def attrs(self):
return _Namespace(self)
def get_col_spec(self):
def create(self, bind, checkfirst=True):
if not checkfirst or not bind.dialect.has_type(bind,
self.name, schema=self.schema):
bind.execute(CreateCompositeType(self))
def drop(self, bind, checkfirst=True):
if not checkfirst or bind.dialect.has_type(bind,
self.name, schema=self.schema):
bind.execute(DropCompositeType(self))
def _set_parent(self, parent):
if isinstance(parent, PGCompositeType):
# already have parent.... duh.
self._set_table(self, parent)
else:
super(PGCompositeType, self)._set_parent(parent)
for value in self.typemap.values():
if isinstance(value, SchemaType):
value._set_parent_with_dispatch(self)
def _set_table(self, column, table):
event.listen(
table,
"before_create",
util.portable_instancemethod(
self._on_table_create)
)
event.listen(
table,
"after_drop",
util.portable_instancemethod(self._on_table_drop)
)
def _on_table_create(self, target, bind, checkfirst, **kw):
self.dispatch.before_create(self, bind, checkfirst, **kw)
self.create(bind=bind, checkfirst=checkfirst)
def _on_metadata_drop(self, target, bind, checkfirst, **kw):
self.drop(bind=bind, checkfirst=checkfirst)
self.dispatch.after_drop(self, bind, checkfirst, **kw)