Unable to do multiple Assynchronous RPC's with PHP & Thruway

176 views
Skip to first unread message

Francisco Vitorino

unread,
Apr 27, 2016, 1:30:37 PM4/27/16
to Crossbar
Hello.

We have been developing a web application with multiple modules which uses a javascript framework as frontend (and autobahnJS) and a Crossbar router on the backend using Thruway PHP to handle the database queries and such.

So far, all of the web application's modules have been using simple and rather fast calls (RPC) and we never realised there was an issue.

However, one of the new modules does a rather large import from a external service.
This means that the front end (web application) does a call with a start date and a end date variables. The PHP on the router receives these dates and does a CURL call to the external service to receive a bunch of information, the size of that information varying on the time difference of the start and end date, the further apart, the more information.
Then for each line of information the CURL sends back, the PHP will do a series of queries into the applications database.

Now the problem is that, like we said, this process can be rather long (somewhere between 2 to 5 or 10 minutes), again depending on the time difference.
While this PHP is running, the router or thruway won't handle any more RPC's that may come from the front end. These new calls get put "on hold" or in a "waiting line" sort of system and are only handled when the big PHP import call is complete.

This behaviour shows that our system or the way its setup is not set up to handle Asynchronous calls, which kinda destroys the ideia of what we were building.
We cannot have one person using the web application on its computer and starting the import make any other users of the web application come to a halt until the import process is complete.

Now, we've already been told that this is probably a problem on the Thruway level, not a real Crossbar issue cause crossbar can handle assynchronous RPC's.
But this is a real serious issue for our project and we're kinda struggling with this as we need to make sure our system can handle multiple assynchronous calls. If not, we'll have to study a different solution for our application.
So, having said this,i decided to post it here on this group as well and ask for the help of any1 in this group (crossbar dev's, users, any1 that has developed also with PHP and thruway, etc.).

Can any1, please, help us find a solution to this issue?
How can we make the router reply to other calls despite the import is still running?
Did we configure something wrong on router or the connection?

Thank you very much for all and any assistance provided.

Alexander Gödde

unread,
Apr 27, 2016, 1:59:03 PM4/27/16
to Crossbar
Hello Francesco,

the issue is not with the router. Add another component to your application (easiest: run one from the Crossbar.io examples repo in the browser) and call that while the call to your PHP backend is running. Crossbar.io will route that to the component and return the result independently of the call to the PHP backend.

Since I don't work with PHP I have no idea what frameworks and mechanisms for async actions there are. Maybe others on the mailing list can help you here.

Regards,

Alex

Francisco Vitorino

unread,
Apr 27, 2016, 2:05:07 PM4/27/16
to Crossbar
Hello alex,

First of all, thank you for your reply.

We've been "cracking" our heads on this issue and, having not yet obtained any positive results and also after reading your comments that this could be related to PHP and Thruway, we decided to make another test.

This new test was done using one of your Crossbar.io repo examples (Python "hello wamp" example).

We had this test running and made a small change.
We changed the add2 function to make a giant for loop so that it would last, something like, 10 to 20 seconds.
We also added a new function called add3 that would just return some static text.

We then changed the index.html to only call the add2 function once and we created another html page (index2.html) that calls add3 function once.

We then proceed to open index.html page so that it would call the add2 function and its loop. While that loop was running, we opened the index2.html page so it would call add3 function and the conclusion was the same, The add3 function would not output the static text until the add2 function's loop and RPC was not complete.

Again, i'd like to point out that this was done using not the PHP but the python hello world example / demo.

If necessary, we can provide with this example we used since its a lot smaller.

Thank you once again.

L. Daniel Burr

unread,
Apr 27, 2016, 2:21:20 PM4/27/16
to Crossbar

Hi Francisco,

From your description, it sounds very much like you are writing blocking code that runs in a single process.  If there is only 1 process, which receives all the calls from the router, and you implement blocking code in that process, then you are enforcing serial execution.  No concurrency is possible in that case.

Could you please post your configuration and your example code?  Then maybe I can say something more helpful :)

Thanks,

Daniel

Francisco Vitorino

unread,
Apr 27, 2016, 2:50:31 PM4/27/16
to Crossbar
Hello Daniel.

Thank you very much for your reply.

Before we add anything else, i would like to add the following to my previous message:

we also did another simple test.
We edited the index.html to call the add2 function 3 times (a small for i =0; i<=2... loop).
The result was that the 3 calls are made but the browser only gets the results once all 3 loops have been ran on the router side.
This is even weirder.
To complement this test, we changed the add2 function to, after the big for loop, sleep for 5 seconds.
This test would show us if the 3 loops would take 5 or 15 seconds (showing concurrency or not) to output.
The result was 15 seconds, which means that each of the 3 loops ran sequentiallyand not concurrently.

As for what you said, Daniel, i understand that yes, we might be creating only 1 process and writing blocking.
How can we fix this? how can we make our code be able to run concurrently?

Here is the configuration and code we used:

Hello.py:

from twisted.internet.defer import inlineCallbacks
from twisted.logger import Logger

from autobahn.twisted.util import sleep
from autobahn.twisted.wamp import ApplicationSession
from autobahn.wamp.exception import ApplicationError

import time

class AppSession(ApplicationSession):

    log = Logger()

    @inlineCallbacks
    def onJoin(self, details):

        # SUBSCRIBE to a topic and receive events
        #
        def onhello(msg):
            self.log.info("event for 'onhello' received: {msg}", msg=msg)

        yield self.subscribe(onhello, 'com.example.onhello')
        self.log.info("subscribed to topic 'onhello'")

        # REGISTER a procedure for remote calling
        #
        def add2(x, y):
            for z in range(0, 1000000):
                teste = z + 1
            time.sleep( 5 )
            return teste

        yield self.register(add2, 'com.example.add2')
        self.log.info("procedure add2() registered")
        
        # REGISTER a procedure for remote calling
        #
        def add3(x, y):
            self.log.info("add3() called with {x} and {y}", x=x, y=y)
            return x + y

        yield self.register(add3, 'com.example.add3')
        self.log.info("procedure add3() registered")

index.html:

<html>
   <body>
      <h1>Hello WAMP</h1>
      <p>Open JavaScript console to watch output.</p>
      <script>AUTOBAHN_DEBUG = true;</script>

      <script>
         // the URL of the WAMP Router (Crossbar.io)
         //
         var wsuri;
         if (document.location.origin == "file://") {
            wsuri = "ws://127.0.0.1:8080/ws";

         } else {
            wsuri = (document.location.protocol === "http:" ? "ws:" : "wss:") + "//" +
                        document.location.host + "/ws";
         }


         // the WAMP connection to the Router
         //
         var connection = new autobahn.Connection({
            url: wsuri,
            realm: "realm1"
         });


         // timers
         //
         var t1, t2;


         // fired when connection is established and session attached
         //
         connection.onopen = function (session, details) {

            console.log("Connected");
             
            // CALL a remote procedure every second
            //
             
            for (var i = 0; i <= 2; i++) {
                session.call('com.example.add2', [2, 18]).then(
                  function (res) {
                     console.log("add2() result:", res);
                  },
                  function (err) {
                     console.log("add2() error:", err);
                  }
               );
            }
         };


         // fired when connection was lost (or could not be established)
         //
         connection.onclose = function (reason, details) {
            console.log("Connection lost: " + reason);
            if (t1) {
               clearInterval(t1);
               t1 = null;
            }
            if (t2) {
               clearInterval(t2);
               t2 = null;
            }
         }


         // now actually open the connection
         //
         connection.open();

      </script>
   </body>
</html>


index2.html: 

<html>
   <body>
      <h1>Hello WAMP</h1>
      <p>Open JavaScript console to watch output.</p>
      <script>AUTOBAHN_DEBUG = true;</script>

      <script>
         // the URL of the WAMP Router (Crossbar.io)
         //
         var wsuri;
         if (document.location.origin == "file://") {
            wsuri = "ws://127.0.0.1:8080/ws";

         } else {
            wsuri = (document.location.protocol === "http:" ? "ws:" : "wss:") + "//" +
                        document.location.host + "/ws";
         }


         // the WAMP connection to the Router
         //
         var connection = new autobahn.Connection({
            url: wsuri,
            realm: "realm1"
         });


         // timers
         //
         var t1, t2;


         // fired when connection is established and session attached
         //
         connection.onopen = function (session, details) {

            console.log("Connected");
             
            // CALL a remote procedure every second
            //

               session.call('com.example.add3', [2, 18]).then(
                  function (res) {
                     console.log("add3() result:", res);
                  },
                  function (err) {
                     console.log("add3() error:", err);
                  }
               );

         };


         // fired when connection was lost (or could not be established)
         //
         connection.onclose = function (reason, details) {
            console.log("Connection lost: " + reason);
            if (t1) {
               clearInterval(t1);
               t1 = null;
            }
            if (t2) {
               clearInterval(t2);
               t2 = null;
            }
         }


         // now actually open the connection
         //
         connection.open();

      </script>
   </body>
</html>

crossbar config (its the example standard):

{
    "version": 2,
    "workers": [
        {
            "type": "router",
            "realms": [
                {
                    "name": "realm1",
                    "roles": [
                        {
                            "name": "anonymous",
                            "permissions": [
                                {
                                    "uri": "",
                                    "match": "prefix",
                                    "allow": {
                                        "call": true,
                                        "register": true,
                                        "publish": true,
                                        "subscribe": true
                                    },
                                    "disclose": {
                                        "caller": false,
                                        "publisher": false
                                    },
                                    "cache": true
                                }
                            ]
                        }
                    ]
                }
            ],
            "transports": [
                {
                    "type": "web",
                    "endpoint": {
                        "type": "tcp",
                        "port": 8080
                    },
                    "paths": {
                        "/": {
                            "type": "static",
                            "directory": "../web"
                        },
                        "ws": {
                            "type": "websocket"
                        }
                    }
                }
            ]
        },
        {
            "type": "container",
            "options": {
                "pythonpath": [
                    ".."
                ]
            },
            "components": [
                {
                    "type": "class",
                    "classname": "hello.AppSession",
                    "realm": "realm1",
                    "transport": {
                        "type": "websocket",
                        "endpoint": {
                            "type": "tcp",
                            "host": "127.0.0.1",
                            "port": 8080
                        },
                        "url": "ws://127.0.0.1:8080/ws"
                    }
                }
            ]
        }
    ]
}

Also, I am attaching a zip file containing the complete python example.

Thanks once again for your help.
PythonCrossbarExample.zip

L. Daniel Burr

unread,
Apr 27, 2016, 4:36:05 PM4/27/16
to Crossbar
Hi Francisco,

Thank you for the complete example.

The first thing that is problematic in your example is the use of time.sleep().  This call blocks the hello.AppSession process for 5 seconds, during which it can do nothing.

Try this instead, and you should see very different results:

from autobahn.twisted.util import sleep

def add2(x, y):
self.log.info('Calling add2')
for z in range(0, 1000000):# This blocks, but only for a short time.

teste = z + 1
    d = sleep(5)# This no longer blocks!
d.addCallback(lambda ignore: teste)
return d

Note that I know absolutely nothing about PHP/Thruway, so I can't give you specific advice there; however, I can say that if your PHP code is performing either long-running computations, or blocking i/o (e.g., a database query), then the general solution will be to perform that work in a separate thread, or in a sub-process.

I hope this helps you,

Daniel

Francisco Vitorino

unread,
Apr 28, 2016, 6:40:25 AM4/28/16
to Crossbar
Hello Daniel.

Once more, thanks for your reply and for your time.

Indeed, the time.sleep appears to be an invalid test cause, like curl, its blocking code.

For now, thanks to yours and other ppl's suggestions, we think we have found a solution.

We've created a separate worker on the crossbar router just to handle this big blocking code / operations like our massive import.
So far the solution seems perfect cause, since its his own worker, with its own process (PID), all other regular requests aren't blocked and even if 2 people make 2 imports at the same time, queueing these 2 won't be bad. What we didnt want was for it to stop other different requests and modules (basicly, everything else :) ).

Thanks again for all your help.
Message has been deleted

Gareth Bult

unread,
May 3, 2016, 5:54:02 PM5/3/16
to Crossbar
Hi Francisco,

Not sure of the PHP syntax, but in Python, something like this (untested) might do what you want;

@inlineCallbacks
def Demo:

  
def Block(q):
    time
.sleep(60)
    q
.put('done')

  q = Queue()
  reactor
.callInThread(Block,q)
  
yield status = q.get()
  returnValue
(status)


i.e. run the blocking routine in a separate thread, then use something that "plays nice" with yield like Queue so you can wait for a return value ... ??

Dante Lorenso

unread,
May 14, 2016, 4:06:18 AM5/14/16
to Crossbar
On Thursday, April 28, 2016 at 5:40:25 AM UTC-5, Francisco Vitorino wrote:
Hello Daniel.
Indeed, the time.sleep appears to be an invalid test cause, like curl, its blocking code.
For now, thanks to yours and other ppl's suggestions, we think we have found a solution.
We've created a separate worker on the crossbar router just to handle this big blocking code / operations like our massive import.

Francisco,

You will have many problems like this with your attempted solution.  There are many things you want to do that will always result in blocking calls.  Crossbar is designed as a message queue.  By that, you need to only use it as such and allow all messages to move freely and quickly without any blocking code.  What you are trying to do with your callee/client is implement a task queue.  

Message Queue != Task Queue

Here is something I've done recently ...

Use 'beanstalkd' as a task queue.  You can insert multiple jobs into this queue for processing and they will pile up.  Then, use 'supervisor' to run a pool of worker processes (i have supervisor run a pool of 10 workers on some machines).  Each worker will connect to beanstalkd and pop a task from the task queue.  If no tasks exist, the worker will block and wait for a task.  If more than 10 tasks exist, each worker gets a task and the remaining tasks sit in the queue waiting for a worker to become ready.  

With this strategy, you have designed a system that will process 0 to 10 tasks concurrently (no task blocks any other task) but your system will never run more than 10 tasks at once.  If your workers are written to pop jobs off the beanstalkd queue, process them, and report the results back to crossbar, you can have crossbar simple relay pub/sub events to a function that inserts these jobs into your beanstalk queue.  

In short ... use crossbar only for message queuing and keep it fast.  Use beanstalkd to implement a task queue that can pile up.  Use supervisor to control a pool of worker processes to process your task queue jobs.  And finally, have the workers publish results of their work back into crossbar.  Congrats, now your workers are free to use blocking I/O to their hearts content :-)

  crossbar -> beanstalkd -> supervisor -> worker -> crossbar

On an Ubuntu 16.04 LTS server, you can simply 'apt install beanstalkd supervisor crossbar' so setting this up is trivial.

 
So far the solution seems perfect cause, since its his own worker, with its own process (PID), all other regular requests aren't blocked and even if 2 people make 2 imports at the same time, queueing these 2 won't be bad. What we didnt want was for it to stop other different requests and modules (basicly, everything else :) ).

You can create task queues of different priorities and have special workers to process each queue differently with different numbers of workers in each pool.
 
Thanks again for all your help.

Good luck, hope this give you some ideas.

-- Dante

Gareth Bult

unread,
May 14, 2016, 7:28:27 AM5/14/16
to Crossbar
Ok, turns out I ran into a similar issue and had to implement my own suggestion .. if it helps, this works for me, so long as it's not abused by too many concurrent instances .... (in testing failed DNS, this can block for between 20-60s)


   
@inlineCallbacks
   
def sendElectronicMail(self, From, To, Subject, Message, Cc=None, Bcc=None):
       
"""Send mail wrapper"""            
        msg
= mailer.Message(From=From, To=To, CC=Cc, Bcc=Bcc)
        msg
.Subject = Subject
        msg
.Html = Message
        msg
.Body = 'Please enable HTML email to read this message'
       
       
def send(pipe):
           
"""Blocking routine to run in thread"""
            log
.msg("Email :: worker thread sending message")
           
try:        
                mailserver
= mailer.Mailer('localhost')
                mailserver
.send(msg)
                pipe
.put('sent')
               
return
           
except Exception:
                log
.err()
            pipe
.put('error')

        pipe
= Queue()
        reactor
.callInThread(send,pipe)
        log
.msg("Email :: waiting on thread")
        status
= yield pipe.get(True)
        log
.msg("Email :: thread complete")
        returnValue
(status)





Reply all
Reply to author
Forward
0 new messages