data from bus to publishing function

91 views
Skip to first unread message

IngoognI

unread,
Jan 10, 2017, 4:14:35 PM1/10/17
to cherrypy-users
A bunch of sensors is posting their data. I want to republish their data in a graph on a site using SSE. Simple versions of the latter work. Pushing data into the bus is also fine.

My problem is how to get the data into the publishing function? I can nest the print_cpu in the pubcpu and have the data printed, but not pushed to a site.

Codes below.

Ingo


---%<---server.py
import cherrypy
class Root():
    @cherrypy.expose
    @cherrypy.tools.json_in()
    def sensor(self, **kwargs):
        """push data POSTed by sensor to the bus"""
        input_json = cherrypy.request.json
        cpu = input_json['cpu']
        cherrypy.engine.publish("cpu", cpu)
   
    @cherrypy.expose
    def pubcpu(self):
        """publish data from sensor channel to page (SSE)"""
        pass
       
    def print_cpu(cpu):
        print("\n", cpu, "\n")
    cherrypy.engine.subscribe("cpu", print_cpu)

if __name__ == '__main__':
    cherrypy.quickstart(Root())
---

A more extende version of pubcpu for SSE would look like:

    @cherrypy.expose
    def pubcpu(self):
        cherrypy.response.headers["Content-Type"] = "text/event-stream"
        def pub(cpu):
            print("\n", cpu ,"\n")
            yield "event: time\n" + "data: " + str(cpu)+ "\n\n"
        return pub(cpu) #<-------??????????? put what in place of cpu
        cherrypy.engine.subscribe("cpu", pub)
    pubcpu._cp_config = {'response.stream': True}

---%<---sensor.py
import psutil, requests
headers = {"Content-Type": "application/json"}
url = 'http://localhost:8080/sensor/'
while True:
    payload={'cpu': psutil.cpu_percent(2)}
    r = requests.post(url, headers=headers, json=payload)
---

Jason R. Coombs

unread,
Jan 12, 2017, 8:18:18 AM1/12/17
to cherrypy-users
Hi Ingo. I think what you're asking is how do you store state. You want to retain state across multiple requests, and in this example, have one request accept a piece of data and a subsequent request return that piece of data.

In the simplest form, you could simply store this value in a global variable or an instance variable on the Root instance.

In .sensor, "self.cpu = input_json['cpu']". Then in .pubcpu, "print(self.cpu)" or "yield 'event' + str(cpu) + '\n'".

Of course, if you use this technique, the state is stored in memory of the Python process, so won't work if you have more than one process serving your site. Also, if the set/get operation isn't threadsafe (i.e. it doesn't happen atomically), you could run into issues even in a single process. However, for this simple example, setting an instance attribute should be sufficient.

You can't use the engine pub/sub model to save state across requests. Data that's published to the bus is routed to subscribing routines immediately and isn't queued for retrieval in another request.

Hope that helps.

IngoognI

unread,
Jan 12, 2017, 8:30:25 AM1/12/17
to cherrypy-users
Storing the data is not the problem Jason. As I understand I can write a tool that gets the data from the bus, as posted by the sensor, and store it in a DB or a simple circular array. I can pull it from there by polling on updates.

What I want is to get data from the bus to a webpage for example have PostgreSQL push the data on the bus with their listen/notify ( https://pypi.python.org/pypi/pgpubsub/0.0.5 ).

In my example I left out all the stuff in the middle, but it shows the essence of my problem.
So, how can I turn  cherrypy.engine.subscribe("cpu", print_cpu) into a variable within pubcpu (i think)


Thanks,

Ingo

IngoognI

unread,
Jan 12, 2017, 8:44:05 AM1/12/17
to cherrypy-users
Although as per your suggestion re-publishing staright away I can still in parallel push the data to a db. That solves 80% for me as there is also data comming in from other sources,

Thanks Jason.

IngoognI

unread,
Jan 16, 2017, 5:43:31 AM1/16/17
to cherrypy-users
Got it working over coffee break so I can now push data from the web to the web or from a tool to the web. May need some polish, appreciate input on that.


import cherrypy
import os
import time
import threading

e = threading.Event()

class Var(object):

    @property
    def x(self):
        return self._x

    @x.setter
    def x(self, value):
        e.set()
        self._x = value



class Root():

    @cherrypy.expose
    @cherrypy.tools.json_in()
    def sensor(self, **kwargs):
        input_json = cherrypy.request.json
        cherrypy.engine.publish("cpu", str(input_json['cpu']))


    @cherrypy.expose
    def pubcpu(self):
        cherrypy.response.headers["Content-Type"] = "text/event-stream"
        k = Var()
        def value(v):
            k.x = v
        cherrypy.engine.subscribe("cpu", value)   
        def pub():
            while True:
                e.wait()
                yield "event: time\n" + "data: " + k.x + "\n\n"
                e.clear()
        return pub()       
    pubcpu._cp_config = {'response.stream': True}

    @cherrypy.expose
    def index(self):
        return '''<!DOCTYPE html>
<html>
 <head>
  <title>Server-sent events test</title>
 </head>
 <body>
  <script type="text/javascript">
    document.addEventListener('DOMContentLoaded', function () {
      var source = new EventSource('pubcpu');
      source.addEventListener('cpu', function (event) {
        document.getElementById('test').innerHTML = event.data;
      });
      source.addEventListener('error', function (event){
        console.log('SSE error:', event);
        console.log('SSE state:', source.readyState);
      });
    }, false);
  </script>
  <p><span id="test"></span></p>
 </body>
</html>'''

if __name__ == '__main__':
    cherrypy.quickstart(Root(), '/')

IngoognI

unread,
Jan 16, 2017, 8:19:39 AM1/16/17
to cherrypy-users
yield "event: time\n" + "data: " + k.x + "\n\n"

should be

yield "event: cpu\n" + "data: " + k.x + "\n\n"

Ingo

Jason R. Coombs

unread,
Jan 16, 2017, 9:37:55 AM1/16/17
to cherrypy-users
Glad to hear you got it working.

I see more clearly now what it is you're trying to achieve. You want to publish events from one request and have them streamed on blocking connections on other requests (possibly multiple).

I think you'll run into issues with your current implementation over time because the 'subscribe' calls never get cleaned up. That is, the server will accumulate subscribers to the `cpu` event. You'd like to be able to unsubscribe from that event if the client were to disconnect.

Which I think leads to another issue where a call to pubcpu will never exit. I'm not aware of any mechanism by which the server could signal to that request handler when the remote connection has dropped, so I suspect each client that connects will consume a thread until the server runs out of threads (30 by default, IIRC).

But perhaps the biggest concern I would have is that of the global event. I don't believe a single event will trigger multiple threads, so you may still be stuck with only one client getting the event to publish the message. Have you tried connecting two separate connections to the pubcpu while it's working? My guess is you only see events coming through on one connection or the other for a particular event.

IngoognI

unread,
Jan 16, 2017, 11:26:32 AM1/16/17
to cherrypy-users
Jason,

So far I only used the standard settings of Cherrypy (server.thread_pool = 10
) and can get 8 tabs in the browser with the application. On two different boxes the sum is also 8 tabs. It's not rock solid though. While messing around at a certain moment everything running in that venv came to a grinding halt, including the sensor data provider. No error messages, just nothing going on any more :( Tried it a few times but cannot replicate the problem consistently

When everything runs closing and re-opening tabs on both machines keeps working up to these 8.

Would it make sense to put the e = threading.Event inside the Var class?

(Is there a way to add something to engine.subscribe() so that there is a return value that can be used as a trigger?)

Ingo

IngoognI

unread,
Jan 16, 2017, 12:29:41 PM1/16/17
to cherrypy-users
regarding disconnects, sending a heartbeat with a blanc character:
http://stackoverflow.com/questions/14472301/cherrypy-check-connection-status

Ingo

Björn Pedersen

unread,
Jan 17, 2017, 3:38:57 AM1/17/17
to cherrypy-users
Hi,

Well, have you thought about using websockets? ws4py integrates nicely with cherrypy.

in your receiving function you would use "engine.ublish('websocket-broadcast', json.dumps(data))"
to get the data to all clients.

Björn

IngoognI

unread,
Jan 17, 2017, 12:21:51 PM1/17/17
to cherrypy-users
Björn,

SSE should be sufficient for my goal and it is (actually was) not my main problem. It took some time to figure out how get the data from the bus and publish it.It works, though not very elegant.

Regarding my SSE problem I'll open an other tread,

Ingo

IngoognI

unread,
Jan 22, 2017, 4:52:43 AM1/22/17
to cherrypy-users
@ it again. Under stress things started to go wrong, as Jason predicted. Connection where made at the wrong moment and would find no data. Double data started appearing on existing connections. So I moved the whole Var class inside the pubcpu method and connecting seems works fine now, looks strange though.

Then I added the StatusMonitor tool http://tools.cherrypy.org/wiki/StatusTool and got it working (although I do not understand how it does what it does). It very nicely shows that the sensor method first gets all the threads until there are requests to pubcpu. After closing several connections to pubcpu they are removed from the thread list, takes about 10 seconds. So that seems to work fine too.

Again as Jason predicted, the subscribe calls to the 'cpu' channel are not cleaned up. Now if the StatusMonitor can see that a thread is not used any more, whatever triggers that can that be used to unsubscribe from the channel if they are related somehow?

Ingo

IngoognI

unread,
Jan 22, 2017, 8:12:33 AM1/22/17
to cherrypy-users
Well well, what about that:

@cherrypy.expose
def pubcpu(self):
   
class Var(object):
        e
= threading.Event()
        disconnected
= 0
       
       
@property
       
def val(self):
           
return self._val

       
@val.setter
       
def val(self, value):
           
self.e.set()
           
self._val = value
           
self.disconnected += 1

       
def wait(self):
           
self.e.wait()
       
def clear(self):
           
self.e.clear()

       
def publish(self):
           
while True:
               
self.e.wait()
               
self.disconnected = 0
               
yield self._val
               
self.e.clear()

           
    cherrypy
.response.headers["Content-Type"] = "text/event-stream"
    k
= Var()
   
def value(v):

       
print("\n Value: ",v)
        k
.val = v
       
if k.disconnected > 1:
            cherrypy
.engine.unsubscribe("cpu", value)


    cherrypy
.engine.subscribe("cpu", value)            
   
def pub():

       
for val in k.publish():
           
yield "event: time\n" + "data: " + str(val) + "\n\n"

   
return pub()        
pubcpu
._cp_config = {'response.stream': True}


Ingo


Reply all
Reply to author
Forward
0 new messages