is this how the right way to try and solve this problem?

15 views
Skip to first unread message

braweb

unread,
Sep 1, 2010, 3:06:54 PM9/1/10
to cogen
Hello, i'm trying to use cogen to stream some market prices to
connected clients. Below is a modified version of the echoserver. This
works great in the sense that i can see the concurrency in action.
However each connected client gets slower and slower. Can you tell me
if this is how to achieve what i'm trying to do?



##################################################

import sys
import time
import random

from cogen.core import sockets
from cogen.core import schedulers
from cogen.core.coroutines import coroutine

from datetime import datetime



@coroutine
def server():
srv = sockets.Socket()
adr = ('0.0.0.0', len(sys.argv)>1 and int(sys.argv[1]) or 1200)
srv.bind(adr)
srv.listen(64)
while 1:
print "Listening on", adr
conn, addr = yield srv.accept()
print "Connection from %s:%s" % addr
m.add(handler, args=(conn, addr))



def writer():
market_prices = ['EURUSD: 1.3456','USDJPY: 8.4512','GOOGLE:
23.00','USDCHF: 98.4590','NZDEUR: 3.36','EBAY: 5.78']
yield '%s - %s\r\n' % (random.choice(market_prices),
datetime.now().strftime('%Y-%m-%d %H:%M:%S'))




@coroutine
def handler(sock, addr):
fh=sock.makefile()
yield fh.write("WELCOME TO ECHO SERVER !\r\n")
yield fh.flush()

while 1:
send_prices = writer()
yield fh.write(send_prices.next())
yield fh.flush()
time.sleep(0.01)


m = schedulers.Scheduler()
m.add(server)
m.run()

#############################################################

Thanks
Jerome

Ionel Maries Cristian

unread,
Sep 1, 2010, 3:34:13 PM9/1/10
to co...@googlegroups.com
Hey,

There's no need to have the write yield something that you can just return.

Your performance problems are caused by time.sleep call as it blocks the whole cogen main loop (no other coroutine can run in that time). Instead use something like "yield cogen.core.events.Sleep(0.01)".

-- ionel




--
You received this message because you are subscribed to the Google Groups "cogen" group.
To post to this group, send email to co...@googlegroups.com.
To unsubscribe from this group, send email to cogen+un...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cogen?hl=en.


braweb

unread,
Sep 1, 2010, 4:43:54 PM9/1/10
to cogen
Awesome. Works really fast now. Thanks for the quick response.

The last issue i need to solve is to be able to read from the socket.
However the below code results in a wait for the read before it sends.
I want to be able to send and receive at the same time. Can you help?

Thanks again.


#######################################

import sys
import time
import random

from cogen.core import sockets
from cogen.core import schedulers
from cogen.core.coroutines import coroutine
from cogen.core.events import Sleep


from datetime import datetime



@coroutine
def server():
srv = sockets.Socket()
adr = ('0.0.0.0', len(sys.argv)>1 and int(sys.argv[1]) or 1200)
srv.bind(adr)
srv.listen(64)
while 1:
print "Listening on", adr
conn, addr = yield srv.accept()
print "Connection from %s:%s" % addr
m.add(handler, args=(conn, addr))


@coroutine
def writer():
market_prices = ['EURUSD: 1.3456','USDJPY: 8.4512','GOOGLE:
23.00','USDCHF: 98.4590','NZDEUR: 3.36','EBAY: 5.78']
return '%s - %s\r\n' % (random.choice(market_prices),
datetime.now().strftime('%Y-%m-%d %H:%M:%S'))




@coroutine
def handler(sock, addr):
fh=sock.makefile()
yield fh.write("WELCOME TO ECHO SERVER !\r\n")
yield fh.flush()

while 1:
line = yield fh.readline(1024)
if line.strip() == 'exit':
yield fh.write("GOOD BYE")
yield fh.close()
sock.close()
return
yield fh.write(line)
yield fh.flush()

send_prices = yield writer()
yield fh.write(send_prices)
yield fh.flush()
yield Sleep(0.003)


m = schedulers.Scheduler()
m.add(server)
m.run()

###############################################################


On Sep 1, 1:34 pm, Ionel Maries Cristian <ionel...@gmail.com> wrote:
> Hey,
>
> There's no need to have the write yield something that you can just return.
>
> Your performance problems are caused by time.sleep call as it blocks the
> whole cogen main loop (no other coroutine can run in that time). Instead use
> something like "yield cogen.core.events.Sleep(0.01)".
>
> -- ionel
>
> > cogen+un...@googlegroups.com <cogen%2Bunsu...@googlegroups.com>.

Ionel Maries Cristian

unread,
Sep 1, 2010, 5:03:50 PM9/1/10
to co...@googlegroups.com
You could use 2 coroutines - one for read and one for write, and then have a closing flag. Here's an example:


import sys
import time
import random

from cogen.core import sockets
from cogen.core import schedulers
from cogen.core.coroutines import coroutine
from cogen.core import events


from datetime import datetime

@coroutine
def server():
   srv = sockets.Socket()
   adr = ('0.0.0.0', len(sys.argv)>1 and int(sys.argv[1]) or 1200)
   srv.bind(adr)
   srv.listen(64)
   while 1:
       print "Listening on", adr
       conn, addr = yield srv.accept()
       print "Connection from %s:%s" % addr
       m.add(Client(conn, addr).handler)


def get_prices():

   market_prices = ['EURUSD: 1.3456','USDJPY: 8.4512','GOOGLE:23.00','USDCHF: 98.4590','NZDEUR: 3.36','EBAY: 5.78']
   return '%s - %s\r\n' % (random.choice(market_prices),
datetime.now().strftime('%Y-%m-%d %H:%M:%S'))


class Client:
  def __init__(self, sock, add):
    self.sock = sock
    self.fh = sock.makefile(bufsize=0)
    self.addr = addr
    self.should_close = False

 
  @coroutine
  def reader(self):
    while 1:
      line = yield self.fh.readline(1024)
      yield self.fh.write(line)
      if line.strip() == 'exit':
        self.should_close = True
        return
 
  @coroutine
  def handler(self):
    yield self.fh.write("WELCOME TO ECHO SERVER !\r\n")
    yield events.AddCoro(self.reader)
    while not self.should_close:
        yield events.Sleep(0.003)
        yield self.fh.write(get_prices())
       
    yield self.fh.write("GOOD BYE")
    yield self.fh.close()

       

m = schedulers.Scheduler()
m.add(server)
m.run()

-- ionel



To unsubscribe from this group, send email to cogen+un...@googlegroups.com.

braweb

unread,
Sep 1, 2010, 6:12:37 PM9/1/10
to cogen
Hi Ionel,

For whatever reason the reader seems to only do what it is intended to
do if i comment out this region:

while not self.should_close:
yield events.Sleep(0.003)
yield self.fh.write(get_prices())

yield self.fh.write("GOOD BYE")
yield self.fh.close()


Do you know why?

Thanks for all the help. I truly appreciate it.

Jerome
> > cogen%2Bunsu...@googlegroups.com<cogen%252Buns...@googlegroups.com>

Ionel Maries Cristian

unread,
Sep 1, 2010, 7:04:39 PM9/1/10
to co...@googlegroups.com
You probably use the iocp proactor. Unfortunately it doesn't work so well with concurrent reads and writes on the same socket.
 
You could use the select proactor, eg:
    from cogen.core.proactors import has_select
    m = schedulers.Scheduler(proactor=has_select())

-- ionel



To unsubscribe from this group, send email to cogen+un...@googlegroups.com.

braweb

unread,
Sep 1, 2010, 8:47:09 PM9/1/10
to cogen
Brilliant. Thanks a million. Works great.

On Sep 1, 5:04 pm, Ionel Maries Cristian <ionel...@gmail.com> wrote:
> You probably use the iocp proactor. Unfortunately it doesn't work so well
> with concurrent reads and writes on the same socket.
>
> You could use the select proactor, eg:
>     from cogen.core.proactors import has_select
>     m = schedulers.Scheduler(proactor=has_select())
>
> -- ionel
>
> > > > cogen%2Bunsu...@googlegroups.com<cogen%252Buns...@googlegroups.com>
> > <cogen%252Buns...@googlegroups.com<cogen%25252Bun...@googlegroups.com>

braweb

unread,
Sep 2, 2010, 3:16:56 PM9/2/10
to cogen
Hi Ionel, one other question (hopefully :) ) ... do you know if redis
or rabbitmq will block if used with cogen ?



On Sep 1, 5:04 pm, Ionel Maries Cristian <ionel...@gmail.com> wrote:
> You probably use the iocp proactor. Unfortunately it doesn't work so well
> with concurrent reads and writes on the same socket.
>
> You could use the select proactor, eg:
>     from cogen.core.proactors import has_select
>     m = schedulers.Scheduler(proactor=has_select())
>
> -- ionel
>
> > > > cogen%2Bunsu...@googlegroups.com<cogen%252Buns...@googlegroups.com>
> > <cogen%252Buns...@googlegroups.com<cogen%25252Bun...@googlegroups.com>

Ionel Maries Cristian

unread,
Sep 2, 2010, 3:36:10 PM9/2/10
to co...@googlegroups.com
The clients would block if they aren't adapted to cogen coroutines.

-- ionel



To unsubscribe from this group, send email to cogen+un...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages