Server-side is a standalone PHP daemon.
The problem seemingly happens at random, I have no sure way to reproduce it except to give it a long repetitive task and wait for it to break. As I said above, this is the latest observed behavior: Server-side PHP Daemon sends the response info to Rabbit MQ as observed via TCPDUMP but RabbitMQ doesn't send it back to the client-side PHP. Then the client-side PHP times out after my set 45 seconds.
require $apipath . '/vendor/autoload.php';
use
PhpAmqpLib\Connection\AMQPStreamConnection,
PhpAmqpLib\Message\AMQPMessage;
define('AMQP_DEBUG', true);
// one connection
list($user, $pwd, $host, $port) = $_ = conf('rabbitmq.credentials.user, rabbitmq.credentials.password, rabbitmq.host, rabbitmq.port');
$connection = new AMQPStreamConnection($host, $port, $user, $pwd,'/',false,'AMQPLAIN',null,'en_US',3.0,180.0,null,false,0);
echo 'Connected to rabbitmq', PHP_EOL;
$RunCallback = function($req) use($connection){
$Publish = function($data) use($req) {
print "Command ".$req->body." Starting Delivery ".time()."\n";
$channel = $req->delivery_info['channel'];
$arguments = ['correlation_id' => $req->get('correlation_id')];
$msg = new AMQPMessage(json_encode($data), $arguments);
$channel->basic_publish($msg, 'shell', $req->get('reply_to'));
print "Command ".$req->body." basic_published at ".time()."\n";
$channel->basic_ack($req->delivery_info['delivery_tag']);
print "Command ".$req->body." basic_ack at ".time()."\n";
};
$channel = $req->delivery_info['channel'];
$routing_key = $req->get('routing_key');
switch ($routing_key) {
case 'command.run':
$RunClass = '\Polaris\\Shell\\Run\\Command';
break;
default:
$Publish(['invalid command', 1]);
return;
break;
}
$Method = new \ReflectionMethod($RunClass, 'execute');
$args = json_decode($req->body, true);
print "Starting Command ".$req->body." at ".time()."\n";
$data = $Method->invokeArgs(new $RunClass, [$args]);
print "Command ".$req->body." finished at ".time()."\n";
// send back result
$Publish($data);
};
$queues = ['command_run_queue' => 'command.run'];
// one channel
$channel = $connection->channel();
$channel->basic_qos(null, 1, null);
$channel->exchange_declare('shell', 'direct');
foreach ($queues as $queue_name => $bind_key) {
$channel->queue_declare($queue_name, false, false, false, false);
$channel->queue_bind($queue_name, 'shell', $bind_key);
$channel->basic_consume($queue_name, '', false, false, false, false, $RunCallback);
}
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();