[grassyknoll commit] r966 - branches/unhork/grassyknoll/backend/sql

0 views
Skip to first unread message

codesite...@google.com

unread,
Dec 29, 2008, 12:22:08 PM12/29/08
to grassykno...@googlegroups.com
Author: jemfinch
Date: Mon Dec 29 09:21:32 2008
New Revision: 966

Added:
branches/unhork/grassyknoll/backend/sql/PostgresqlCollection.py

Log:
Issue #161: Initial import of PostgresqlCollection. Somehow I lost the
test_PostgresqlCollection.py (I have the .pyc but can't easily decompile
it) so I'll recreate that on my other computer.

Added: branches/unhork/grassyknoll/backend/sql/PostgresqlCollection.py
==============================================================================
--- (empty file)
+++ branches/unhork/grassyknoll/backend/sql/PostgresqlCollection.py Mon Dec
29 09:21:32 2008
@@ -0,0 +1,71 @@
+"""a L{SqlCollection} based on L{PostgreSQL}"""
+
+import re
+import datetime
+
+import psycopg2
+
+from SqlCollection import SqlCollection
+import grassyknoll.backend.sql.TableMaker as TableMaker
+
+
+class PostgresqlCollection(SqlCollection):
+ placeholder = '%s'
+ def __init__(self, location):
+ """
+ @param location: an L{Bunch} with host (default: 'localhost'),
port (default: 3306), user, passwd, and db attributes.
+ @type location: L{Bunch}
+ """
+ super(PostgresqlCollection, self).__init__(location)
+ self.connection = self.connect(location)
+
+ _namesRe = re.compile(r':(\w+)')
+ def execute(self, *args, **kwargs):
+ if len(args) > 1:
+ (sql, parameters) = args
+ if isinstance(parameters, dict):
+ # Using named parameters, which psycopg2 supports
differently
+ # SqlCollection uses the :<name> format for named
parameters.
+ sql = self._namesRe.sub(r'%(\1)s', sql)
+ args = (sql, parameters)
+ return super(PostgresqlCollection, self).execute(*args, **kwargs)
+
+ @classmethod
+ def connect(cls, location):
+ dsn_parts = []
+ if hasattr(location, 'host'): dsn_parts.append('host=%r' %
location.host)
+ if hasattr(location, 'port'): dsn_parts.append('port=%r' %
location.port)
+ if hasattr(location, 'user'): dsn_parts.append('user=%r' %
location.user)
+ if hasattr(location, 'passwd'): dsn_parts.append('password=%r' %
location.passwd)
+ if hasattr(location, 'db'): dsn_parts.append('dbname=%r' %
location.db)
+ return psycopg2.connect(dsn=' '.join(dsn_parts))
+
+ @classmethod
+ def _allocate(cls, location):
+ assert isinstance(location.table, TableMaker.Table)
+ connection = cls.connect(location)
+ location.table.create(connection.cursor())
+ connection.commit()
+
+ @classmethod
+ def _deallocate(cls, location):
+ connection = cls.connect(location)
+ cursor = connection.cursor()
+ cursor.execute("""DROP TABLE %s""" % location.table.name)
+ connection.commit()
+ connection.close()
+
+ @classmethod
+ def exists(cls, location):
+ try:
+ connection = cls.connect(location)
+ except Exception:
+ return False
+ else:
+ cursor = connection.cursor()
+ cursor.execute("""SELECT 1
+ FROM information_schema.tables
+ WHERE table_catalog=%s
+ AND table_name=%s""", (location.db,
location.table.name))
+ connection.close()
+ return bool(cursor.rowcount)

Reply all
Reply to author
Forward
0 new messages