from twisted.internet.defer import inlineCallbacksfrom twisted.logger import Logger
from autobahn.twisted.util import sleepfrom autobahn.twisted.wamp import ApplicationSessionfrom autobahn.wamp.exception import ApplicationError
import time
class AppSession(ApplicationSession):
log = Logger()
@inlineCallbacks def onJoin(self, details):
# SUBSCRIBE to a topic and receive events # def onhello(msg): self.log.info("event for 'onhello' received: {msg}", msg=msg)
yield self.subscribe(onhello, 'com.example.onhello') self.log.info("subscribed to topic 'onhello'")
# REGISTER a procedure for remote calling # def add2(x, y): for z in range(0, 1000000): teste = z + 1 time.sleep( 5 ) return teste
yield self.register(add2, 'com.example.add2') self.log.info("procedure add2() registered") # REGISTER a procedure for remote calling # def add3(x, y): self.log.info("add3() called with {x} and {y}", x=x, y=y) return x + y
yield self.register(add3, 'com.example.add3') self.log.info("procedure add3() registered")
<html> <body> <h1>Hello WAMP</h1> <p>Open JavaScript console to watch output.</p> <script>AUTOBAHN_DEBUG = true;</script> <script src="http://autobahn.s3.amazonaws.com/autobahnjs/latest/autobahn.min.jgz"></script>
<script> // the URL of the WAMP Router (Crossbar.io) // var wsuri; if (document.location.origin == "file://") { wsuri = "ws://127.0.0.1:8080/ws";
} else { wsuri = (document.location.protocol === "http:" ? "ws:" : "wss:") + "//" + document.location.host + "/ws"; }
// the WAMP connection to the Router // var connection = new autobahn.Connection({ url: wsuri, realm: "realm1" });
// timers // var t1, t2;
// fired when connection is established and session attached // connection.onopen = function (session, details) {
console.log("Connected"); // CALL a remote procedure every second // for (var i = 0; i <= 2; i++) { session.call('com.example.add2', [2, 18]).then( function (res) { console.log("add2() result:", res); }, function (err) { console.log("add2() error:", err); } ); } };
// fired when connection was lost (or could not be established) // connection.onclose = function (reason, details) { console.log("Connection lost: " + reason); if (t1) { clearInterval(t1); t1 = null; } if (t2) { clearInterval(t2); t2 = null; } }
// now actually open the connection // connection.open();
</script> </body></html>
<html> <body> <h1>Hello WAMP</h1> <p>Open JavaScript console to watch output.</p> <script>AUTOBAHN_DEBUG = true;</script> <script src="http://autobahn.s3.amazonaws.com/autobahnjs/latest/autobahn.min.jgz"></script>
<script> // the URL of the WAMP Router (Crossbar.io) // var wsuri; if (document.location.origin == "file://") { wsuri = "ws://127.0.0.1:8080/ws";
} else { wsuri = (document.location.protocol === "http:" ? "ws:" : "wss:") + "//" + document.location.host + "/ws"; }
// the WAMP connection to the Router // var connection = new autobahn.Connection({ url: wsuri, realm: "realm1" });
// timers // var t1, t2;
// fired when connection is established and session attached // connection.onopen = function (session, details) {
console.log("Connected"); // CALL a remote procedure every second //
session.call('com.example.add3', [2, 18]).then( function (res) { console.log("add3() result:", res); }, function (err) { console.log("add3() error:", err); } );
};
// fired when connection was lost (or could not be established) // connection.onclose = function (reason, details) { console.log("Connection lost: " + reason); if (t1) { clearInterval(t1); t1 = null; } if (t2) { clearInterval(t2); t2 = null; } }
// now actually open the connection // connection.open();
</script> </body></html>
{ "version": 2, "workers": [ { "type": "router", "realms": [ { "name": "realm1", "roles": [ { "name": "anonymous", "permissions": [ { "uri": "", "match": "prefix", "allow": { "call": true, "register": true, "publish": true, "subscribe": true }, "disclose": { "caller": false, "publisher": false }, "cache": true } ] } ] } ], "transports": [ { "type": "web", "endpoint": { "type": "tcp", "port": 8080 }, "paths": { "/": { "type": "static", "directory": "../web" }, "ws": { "type": "websocket" } } } ] }, { "type": "container", "options": { "pythonpath": [ ".." ] }, "components": [ { "type": "class", "classname": "hello.AppSession", "realm": "realm1", "transport": { "type": "websocket", "endpoint": { "type": "tcp", "host": "127.0.0.1", "port": 8080 }, "url": "ws://127.0.0.1:8080/ws" } } ] } ]}
from autobahn.twisted.util import sleep
def add2(x, y):
self.log.info('Calling add2')
for z in range(0, 1000000):# This blocks, but only for a short time.
teste = z + 1
d = sleep(5)# This no longer blocks!
d.addCallback(lambda ignore: teste)
return d
@inlineCallbacks
def Demo:
def Block(q):
time.sleep(60)
q.put('done')
q = Queue()
reactor.callInThread(Block,q)
yield status = q.get()
returnValue(status)
Hello Daniel.
Indeed, the time.sleep appears to be an invalid test cause, like curl, its blocking code.For now, thanks to yours and other ppl's suggestions, we think we have found a solution.We've created a separate worker on the crossbar router just to handle this big blocking code / operations like our massive import.
So far the solution seems perfect cause, since its his own worker, with its own process (PID), all other regular requests aren't blocked and even if 2 people make 2 imports at the same time, queueing these 2 won't be bad. What we didnt want was for it to stop other different requests and modules (basicly, everything else :) ).
Thanks again for all your help.
@inlineCallbacks
def sendElectronicMail(self, From, To, Subject, Message, Cc=None, Bcc=None):
"""Send mail wrapper"""
msg = mailer.Message(From=From, To=To, CC=Cc, Bcc=Bcc)
msg.Subject = Subject
msg.Html = Message
msg.Body = 'Please enable HTML email to read this message'
def send(pipe):
"""Blocking routine to run in thread"""
log.msg("Email :: worker thread sending message")
try:
mailserver = mailer.Mailer('localhost')
mailserver.send(msg)
pipe.put('sent')
return
except Exception:
log.err()
pipe.put('error')
pipe = Queue()
reactor.callInThread(send,pipe)
log.msg("Email :: waiting on thread")
status = yield pipe.get(True)
log.msg("Email :: thread complete")
returnValue(status)