# -*- coding: utf-8 -*-
from functools import partial
from pulsar.apps import wsgi
from pulsar.apps.data import create_store
from pulsar.utils.pep import to_string
from pulsar.apps.ds import pulsards_url
from pulsar.apps.data import store, create_store, register_store, PubSubClient, RemoteStore
from pulsar import Connection, Pool
import select
import psycopg2
import aiopg
REDIS_DSN = 'redis://localhost:6379/2'
PG_DSN = 'postgresql://scailer:123@localhost:5432/dbname'
TIMEOUT = 60.0
class PGStore(RemoteStore):
def _init(self, **kwargs):
self.pool_size = kwargs.get('pool_size', 10)
self.timeout = kwargs.get('timeout', 60.0)
self._pool = aiopg.pool.Pool(
self._buildurl(), minsize=self.pool_size, maxsize=self.pool_size,
loop=self._loop, timeout=self.timeout, enable_json=True,
enable_hstore=True, enable_uuid=True, echo=False)
@property
def pool(self):
return self._pool
def connect(self, protocol_factory=None):
return self.pool.acquire()
async def execute(self, *args, **options):
val = None
with await self.pool.cursor() as cur:
await cur.execute(*args, **options)
val = await cur.fetchall()
return val
register_store('postgresql', 'app.PGStore')
class WebChat(wsgi.LazyWsgi):
def __init__(self, server_name):
def setup(self, environ):
self.cfg = environ['pulsar.cfg']
loop = environ['pulsar.connection']._loop
self.redis = create_store(REDIS_DSN, loop=loop)
self.pg = create_store(PG_DSN, loop=loop, pool_size=10, timeout=60.0) return wsgi.WsgiHandler([wsgi.Router('/', get=self.page)])
async def page(self, request):
redis_client = self.redis.client()
val = await redis_client.get(request.url_data.get("key"))
print ('VALUE FROM REDIS:', val)
val2 = await self.pg.execute('SELECT * FROM activity')
print ('VALUE FROM PG (SIMPLE):', len(val2))
async with self.pg.connect() as conn:
async with conn.cursor() as cur:
await cur.execute('SELECT * FROM activity')
val3 = await cur.fetchall()
print ('VALUE FROM PG (EXTENDED):', len(val2))
data = 'OK'
request.response.content_type = 'text/html'
request.response.content = to_string(data % request.environ)
return request.response
def server(callable=None, name=None, data_store=None, **params):
name = name or 'wsgi'
data_store = pulsards_url(data_store)
return wsgi.WSGIServer(callable=WebChat(name), name=name,
data_store=data_store, **params)
if __name__ == '__main__': # pragma nocover
server().start()