aiopg & pulsar

35 views
Skip to first unread message

Дмитрий Власов

unread,
Jun 17, 2016, 1:27:53 AM6/17/16
to python-pulsar
Hello!

I want to use postgres in my project, and tried write RemoteStore for it. 
I have working example, but don't sure that they pulsar-way correct. 
Do your have some best practice for working with sql databases? 
How do you think, my PGStore may be used in production code?

Thanks.


# -*- coding: utf-8 -*-

from functools import partial
from pulsar.apps import wsgi

from pulsar.apps.data import create_store
from pulsar.utils.pep import to_string
from pulsar.apps.ds import pulsards_url
from pulsar.apps.data import store, create_store, register_store, PubSubClient, RemoteStore
from pulsar import Connection, Pool

import select
import psycopg2
import aiopg

REDIS_DSN = 'redis://localhost:6379/2'
PG_DSN = 'postgresql://scailer:123@localhost:5432/dbname'
TIMEOUT = 60.0


class PGStore(RemoteStore):
    def _init(self, **kwargs):
        self.pool_size = kwargs.get('pool_size', 10)
        self.timeout = kwargs.get('timeout', 60.0)
        self._pool = aiopg.pool.Pool(
            self._buildurl(), minsize=self.pool_size, maxsize=self.pool_size,
            loop=self._loop, timeout=self.timeout, enable_json=True,
            enable_hstore=True, enable_uuid=True, echo=False)

    @property
    def pool(self):
        return self._pool

    def connect(self, protocol_factory=None):
        return self.pool.acquire()

    async def execute(self, *args, **options):
        val = None
        with await self.pool.cursor() as cur:
            await cur.execute(*args, **options)
            val = await cur.fetchall()
        return val

register_store('postgresql', 'app.PGStore')


class WebChat(wsgi.LazyWsgi):
    def __init__(self, server_name):
        self.name = server_name

    def setup(self, environ):
        self.cfg = environ['pulsar.cfg']
        loop = environ['pulsar.connection']._loop
        self.redis = create_store(REDIS_DSN, loop=loop)
        self.pg = create_store(PG_DSN, loop=loop, pool_size=10, timeout=60.0)
        return wsgi.WsgiHandler([wsgi.Router('/', get=self.page)])

    async def page(self, request):
        redis_client = self.redis.client()
        val = await redis_client.get(request.url_data.get("key"))
        print ('VALUE FROM REDIS:', val)

        val2 = await self.pg.execute('SELECT * FROM activity')
        print ('VALUE FROM PG (SIMPLE):', len(val2))

        async with self.pg.connect() as conn:
            async with conn.cursor() as cur:
                await cur.execute('SELECT * FROM activity')
                val3 = await cur.fetchall()
        print ('VALUE FROM PG (EXTENDED):', len(val2))

        data = 'OK'
        request.response.content_type = 'text/html'
        request.response.content = to_string(data % request.environ)
        return request.response


def server(callable=None, name=None, data_store=None, **params):
    name = name or 'wsgi'
    data_store = pulsards_url(data_store)
    return wsgi.WSGIServer(callable=WebChat(name), name=name,
                           data_store=data_store, **params)


if __name__ == '__main__':  # pragma nocover
    server().start()



lsbardel

unread,
Jun 17, 2016, 6:33:52 AM6/17/16
to python-pulsar
Hi,


On Friday, June 17, 2016 at 6:27:53 AM UTC+1, Дмитрий Власов wrote:
Hello!

I want to use postgres in my project, and tried write RemoteStore for it.  
I have working example, but don't sure that they pulsar-way correct. 
Do your have some best practice for working with sql databases? 

What you have done is correct.
However it is a quite low-level sql store.
 
How do you think, my PGStore may be used in production code?

I don't see why not.

If you are interested in using PostgreSql and pulsar you can also check out pulsar-odm

It allows you to leverage the SqlAlchemy ORM in an implicit asynchronous way (via greenlets).

Reply all
Reply to author
Forward
0 new messages