Implementation of delayed queue for PHP AMQP

490 views
Skip to first unread message

Yan Cheng Cheok

unread,
Mar 24, 2017, 3:39:32 AM3/24/17
to rabbitmq-users

recently, I did a quick implementation on producer/ consumer queue system.



    <?php
   
namespace Queue;
   
   
use PhpAmqpLib\Connection\AMQPStreamConnection;
   
use PhpAmqpLib\Message\AMQPMessage;
   
use PhpAmqpLib\Wire\AMQPTable;    

   
class Amqp
   
{
       
private $connection;
       
private $queueName;
       
private $delayedQueueName;
       
private $channel;
       
private $callback;
   
       
public function __construct($host, $port, $login, $password, $queueName)
       
{
            $this
->connection = new AMQPStreamConnection($host, $port, $login, $password);
            $this
->queueName = $queueName;
            $this
->delayedQueueName = null;
            $this
->channel = $this->connection->channel();
           
// First, we need to make sure that RabbitMQ will never lose our queue.
           
// In order to do so, we need to declare it as durable. To do so we pass
           
// the third parameter to queue_declare as true.
            $this
->channel->queue_declare($queueName, false, true, false, false);
       
}
   
       
public function __destruct()
       
{
            $this
->close();
       
}
   
       
// Just in case : http://stackoverflow.com/questions/151660/can-i-trust-php-destruct-method-to-be-called
       
// We should call close explicitly if possible.
       
public function close()
       
{
           
if (!is_null($this->channel)) {
                $this
->channel->close();
                $this
->channel = null;
           
}
   
           
if (!is_null($this->connection)) {
                $this
->connection->close();
                $this
->connection = null;
           
}
       
}
   
       
public function produceWithDelay($data, $delay)
       
{
           
if (is_null($this->delayedQueueName))
           
{
                $delayedQueueName
= $this->queueName . '.delayed';
   
               
// First, we need to make sure that RabbitMQ will never lose our queue.
               
// In order to do so, we need to declare it as durable. To do so we pass
               
// the third parameter to queue_declare as true.
                $this
->channel->queue_declare($this->delayedQueueName, false, true, false, false, false,
                   
new AMQPTable(array(
                       
'x-dead-letter-exchange' => '',
                       
'x-dead-letter-routing-key' => $this->queueName
                   
))
               
);
   
                $this
->delayedQueueName = $delayedQueueName;
           
}
   
            $msg
= new AMQPMessage(
                $data
,
                array
(
                   
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                   
'expiration' => $delay
               
)
           
);
   
            $this
->channel->basic_publish($msg, '', $this->delayedQueueName);
       
}
   
       
public function produce($data)
       
{
            $msg
= new AMQPMessage(
                $data
,
                array
('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
           
);
   
            $this
->channel->basic_publish($msg, '', $this->queueName);
       
}
   
       
public function consume($callback)
       
{
            $this
->callback = $callback;
   
           
// This tells RabbitMQ not to give more than one message to a worker at
           
// a time.
            $this
->channel->basic_qos(null, 1, null);
   
           
// Requires ack.
            $this
->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'consumeCallback'));
   
           
while(count($this->channel->callbacks)) {
                $this
->channel->wait();
           
}
       
}
   
       
public function consumeCallback($msg)
       
{
            call_user_func_array
(
                $this
->callback,
                array
($msg)
           
);
   
           
// Very important to ack, in order to remove msg from queue. Ack after
           
// callback, as exception might happen in callback.
            $msg
->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
       
}
   
       
public function getQueueSize()
       
{
           
// three tuple containing (<queue name>, <message count>, <consumer count>)
            $tuple
= $this->channel->queue_declare($this->queueName, false, true, false, false);
           
if ($tuple != null && isset($tuple[1])) {
               
return $tuple[1];
           
}
           
return -1;
       
}
   
}





public function produce and public function consume pair works as expected.


However, when it comes with delayed queue system


public function produceWithDelay and public function consume pair doesn't work as expected. The consumer which calls consume, not able to receive any item, even waiting for some period of time.


I believe something not right with my produceWithDelay implementation. May I know what's wrong is that?

dfed...@pivotal.io

unread,
Mar 24, 2017, 7:29:05 AM3/24/17
to rabbitmq-users
Message expiration guarantees that a message won't be sent to consumers after an expiration, but does not guarantee that it will be routed to dead letter exchange right after the expiration. Expiry is triggered by queue activity (publishes/consumes). So it's not supposed to be used as a delayed message mechanism. Although it can be used like this, but there is not guarantee that message will be expired if queue is not active.

There is a delayed message exchange https://github.com/rabbitmq/rabbitmq-delayed-message-exchange but it also have some limitations (described in README).
Reply all
Reply to author
Forward
0 new messages