<?php
//comment
class PipeListener extends Worker {
public function __construct($fifoFile) {
$this->fifoFile = $fifoFile;
}
public $isReady = false;
public $result = 0.0;
public $nSas = 0;
public function run() {
$pipe_read = fopen($this->fifoFile, 'r');
if( ! $pipe_read) die('Error: Could not open the named pipe: '. posix_strerror(posix_errno()) . "\n");
$message = fgets($pipe_read);
$docs = json_decode($message, true);
echo 'Received from the pipe: '. count($docs) . " docs\n";
$this->isReady = true;
}
}
$listeners = array();
foreach (array("1","2","3","4") as $suffix) {
$fifoFile = '/tmp/fifo' . $suffix;
if(file_exists($fifoFile)) unlink($fifoFile);
if( ! posix_mkfifo($fifoFile, 0700)) die('Error: Could not create a named pipe: '. posix_strerror(posix_errno()) . "\n");
array_push($listeners, new PipeListener($fifoFile));
}
foreach ($listeners as $listener){
$listener->start();
}
$cursor = (new MongoClient())->database->collection->find();
$nsas = $cursor->count()/4;
$rest = $cursor->count()%4;
// $nsas = $cursor->count();
// $rest = 0;
$iSegment = 0;
$mainProcess = false;
if (pcntl_fork()) {
$iSegment = 0;
$mainProcess = true;
}
else if (pcntl_fork()){
$iSegment = 1;
}
else if (pcntl_fork()){
$iSegment = 2;
}
else {
$iSegment = 3;
}
$cursor->skip($nsas * $iSegment);
$message = array();
$max = $nsas-1;
if ($iSegment == 3) $max = $nsas-1+$rest;
for ($idoc=0; $idoc<=$max; $idoc++){
if ($cursor->hasNext()) {
$doc = $cursor->getNext();
array_push($message, $doc["key"]);
}
}
$fifoFile = '/tmp/fifo' . strval($iSegment+1);
$pipe_write = fopen($fifoFile, 'w');
if( ! $pipe_write) die('Error: Could not open the named pipe: '. posix_strerror(posix_errno()) . "\n");
//echo "sending from: " . $nsas * $iSegment . "\n";
fwrite($pipe_write, json_encode($message));
fclose($pipe_write);
if ($mainProcess) {
echo "I am the main process\n";
$allFinished = false;
while (! $allFinished) {
$allFinished = true;
foreach ($listeners as $listener){
$allFinished = $allFinished && $listener->isReady;
//echo "hej\n";
}
}
//echo var_export($allFinished,true) . "\n";
foreach ($listeners as $listener){
echo $listener->result . " " . $listener->nSas . "\n";
//echo "hej\n";
}
}
?>