Threads in php and MongoDB

101 views
Skip to first unread message

Johan Lundquist

unread,
Oct 12, 2014, 9:46:17 AM10/12/14
to mongo...@googlegroups.com
In CentOS 6.5 I am trying to traverse a MongoDB cursor in a separate thread with php (with the pthreads extension). It works when using the WorkerThread as a normal non threaded class (using line 3 and 9 instead of 4 and 10). But when I try to run it as a thread it just prints out "running" and thats it. So it seems to stop without any error or warning when entering the loop. What could be the problem here?

<?php

//class WorkerThread {
class WorkerThread extends Worker {

   
public function __construct($cursor) {
        $this
->cursor = $cursor;
   
}

   
//public function start() {
   
public function run() {
        echo
"running";
        $idoc
= 0;
       
foreach ($this->cursor as $document) {
            echo
"doc number " . $idoc . "\n";
            $idoc
++;
       
}  
        echo
"finished";
   
}
}

$cursor
= (new MongoClient())->some_database->some_collection->find();

$worker
= new WorkerThread($cursor);

$worker
->start();

$worker
->join();

?>




Jeremy Mikola

unread,
Oct 16, 2014, 12:19:01 PM10/16/14
to mongo...@googlegroups.com
On Sun, Oct 12, 2014 at 9:46 AM, Johan Lundquist <jol...@gmail.com> wrote:
In CentOS 6.5 I am trying to traverse a MongoDB cursor in a separate thread with php (with the pthreads extension). It works when using the WorkerThread as a normal non threaded class (using line 3 and 9 instead of 4 and 10). But when I try to run it as a thread it just prints out "running" and thats it. So it seems to stop without any error or warning when entering the loop. What could be the problem here?

I compiled PHP 5.6 with ZTS support (for threading) and installed the 1.5.7 driver and pthreads extension to test this. With a bit of debugging, I found that the thread was actually dying on the cursor iteration (i.e. foreach loop). A MongoException with the message "The MongoCursor object has not been correctly initialized by its constructor" is thrown. If you manually try/catch this exception and print its error message, you should be able to see it. For instance:

try {
  iterator_to_array($this->cursor)
} catch (Exception $e) {
  var_dump($e->getMessage());
}


I'm not sure why, but the uncaught exception within the thread is never printed (as you'd typically see in a single-threaded PHP script). It appears that the thread simply dies on the fatal error.

But back to the exception in question. This is thrown from some sanity-checking code within the driver that checks to see if the internal data structures are as they should be. In this case, I believe the driver is checking for the MongoClient reference in the cursor and not finding it. This is likely due to the cursor object being shared between threads; PHP may be copying the MongoCursor object and its PHP-accessible properties, but not the internal struct, which is where we actually store the MongoClient dependency.

Unfortunately, the pthreads extension is not something we officially support in the driver. ZTS (Zend thread safety) is supported, but we do not specifically test with pthreads.

That said, I thought you may have better luck constructing MongoClient instances in each thread and ensuring that none of the core classes get shared across threads. I gisted a proof-of-concept here: https://gist.github.com/jmikola/49d32593c215165bb197. Based on the results, it looks like each thread maintained its own persistent connections; however, one curiosity here was that all threads reported the same PID (both in the userland getmypid() method and internally). This was a bit concerning, since we use the PID as part of the hash to uniquely identify persistent connections -- but it didn't seem to affect the proof-of-concept (likely because the socket container structs were isolated to each thread).

Johan Lundquist

unread,
Oct 18, 2014, 3:41:45 AM10/18/14
to mongo...@googlegroups.com
Thanks for that extensive answer and the gist Jeremy. i will definitely try it. Eventually I used forks to achieve true parallelism, but threads are still the preferred way to proceed since it is easier to collect and merge the final result that way.  

Rafael Almeida

unread,
Jul 22, 2015, 2:55:23 PM7/22/15
to mongodb-dev
Hi Johan,

Do you have any sample about how did you do that? 

Johan Lundquist

unread,
Jul 23, 2015, 1:56:29 PM7/23/15
to mongodb-dev, rafael...@gmail.com
Hi Rafael,

I made something like this:

<?php

//comment

class PipeListener extends Worker {
public function __construct($fifoFile) {
$this->fifoFile = $fifoFile;
}

public $isReady = false;
public $result = 0.0;
public $nSas = 0;
public function run() {

$pipe_read = fopen($this->fifoFile, 'r');
if( ! $pipe_read) die('Error: Could not open the named pipe: '. posix_strerror(posix_errno()) . "\n");

$message = fgets($pipe_read);
$docs = json_decode($message, true);
echo 'Received from the pipe: '. count($docs) . " docs\n";

$this->isReady = true;
}
}


$listeners = array();

foreach (array("1","2","3","4") as $suffix) {
$fifoFile = '/tmp/fifo' . $suffix;
if(file_exists($fifoFile)) unlink($fifoFile);
if( ! posix_mkfifo($fifoFile, 0700)) die('Error: Could not create a named pipe: '. posix_strerror(posix_errno()) . "\n");
array_push($listeners, new PipeListener($fifoFile));
}
foreach ($listeners as $listener){
$listener->start();
}

$cursor = (new MongoClient())->database->collection->find();

$nsas = $cursor->count()/4;
$rest = $cursor->count()%4;
// $nsas = $cursor->count();
// $rest = 0;

$iSegment = 0;
$mainProcess = false;

if (pcntl_fork()) {
$iSegment = 0;
$mainProcess = true;
}
else if (pcntl_fork()){
$iSegment = 1;
}
else if (pcntl_fork()){
$iSegment = 2;
}
else {
$iSegment = 3;
}

$cursor->skip($nsas * $iSegment);
$message = array();
$max = $nsas-1;
if ($iSegment == 3) $max = $nsas-1+$rest;
for ($idoc=0; $idoc<=$max; $idoc++){
if ($cursor->hasNext()) {
$doc = $cursor->getNext();
array_push($message, $doc["key"]);
}
}

$fifoFile = '/tmp/fifo' . strval($iSegment+1);

$pipe_write = fopen($fifoFile, 'w');
if( ! $pipe_write) die('Error: Could not open the named pipe: '. posix_strerror(posix_errno()) . "\n");
//echo "sending from: " . $nsas * $iSegment . "\n";
fwrite($pipe_write, json_encode($message));
fclose($pipe_write);


if ($mainProcess) {
echo "I am the main process\n";

$allFinished = false;
while (! $allFinished) {
$allFinished = true;
foreach ($listeners as $listener){
$allFinished = $allFinished && $listener->isReady;
//echo "hej\n";
}
}

//echo var_export($allFinished,true) .  "\n";
foreach ($listeners as $listener){
echo $listener->result . " " . $listener->nSas . "\n";
//echo "hej\n";
}
}
?>


Reply all
Reply to author
Forward
0 new messages