PRECONDITION_FAILED - unknown delivery tag 1

3,395 views
Skip to first unread message

Reinier

unread,
Oct 22, 2015, 2:10:04 PM10/22/15
to rabbitmq-users
Tried for hours to solve this by myself, but I'm at a loss... I'm also very new at RabbitMQ, so probably it is the way I structured things. But I just can't figure it out...
Please, if you could point me in the right direction that would be greatly appreciated :)

I have one main producer that creates messages. And two consumers/workers, that consume messages.
The first consumer consumes the messages of the main producer. But it also creates the messages for the second consumer.

My problem is two fold:

1) The first worker takes in all the messages. And acknoledges them without having handled them. But it does actually handle them (it downloads a bunch of files)
2) The second worker fails after consuming all messages, with: "PRECONDITION_FAILED - unknown delivery tag 1". This one does not take all the messages from the queue, but handles them one by one. (it extracts the downloaded files)

I get the following error:

PHP Fatal error:  Uncaught exception 'PhpAmqpLib\Exception\AMQPProtocolChannelException' with message 'PRECONDITION_FAILED - unknown delivery tag 1' in C:\project\vendor\videlalvaro\php-amqplib\PhpAmqpLib\Channel\AMQPChannel.php:191
Stack trace:
#0 [internal function]: PhpAmqpLib\Channel\AMQPChannel->channel_close(Object(PhpAmqpLib\Wire\AMQPReader))
#1 C:\project\vendor\videlalvaro\php-amqplib\PhpAmqpLib\Channel\AbstractChannel.php(194): call_user_func(Array, Object(PhpAmqpLib\Wire\AMQPReader))
#2 C:\project\vendor\videlalvaro\php-amqplib\PhpAmqpLib\Channel\AbstractChannel.php(393): PhpAmqpLib\Channel\AbstractChannel->dispatch('20,40', '\x01\x96,PRECONDITION...', NULL)
#3 C:\project\extract-worker.php(66): PhpAmqpLib\Channel\AbstractChannel->wait()
#4 {main}
  thrown in C:\project\vendor\videlalvaro\php-amqplib\PhpAmqpLib\Channel\AMQPChannel.php on line 191

Have one main file that creates many messages using this:

$download_channel = $connection->channel();
$download_channel->exchange_declare('import-exchange', 'direct', false, true, false);
$download_queue_name = "download";
$download_channel->queue_declare($download_queue_name, false, true, false, false);
$exchange_name = "import-exchange";
$download_binding_key = $download_queue_name;
$download_channel->queue_bind($download_queue_name, $exchange_name, $download_binding_key);
$data = "";
$data['date'] = $year . "-" . $month . "-" . $day;
$data['year'] = $year;
$data['month'] = $month;
$data['day'] = $day;
$data['country'] = $country;
$data['resource'] = $resource;
$data['task'] = "download";
$data['download_url'] = $THING->getExternalResourceLocation( $resource, $year, $month, $day, $country );
$data = json_encode($data);
$msg = new AMQPMessage($data,
                        array('delivery_mode' => 2) # make message persistent
                      );
$task = "download";
$download_channel->basic_publish($msg, 'import-exchange', "download");
$download_channel->close();

Have two workers.

First worker file that consumes the messages like this:

$download_connection = new AMQPStreamConnection('192.168.99.100', 5672, 'guest', 'guest');
$download_channel = $download_connection->channel();
$download_channel->basic_qos(null, 1, null);

$queue_name = "download";
$download_channel->queue_declare($queue_name, false, true, false, false);

echo ' [*] Waiting for things to download. To exit press CTRL+C', "\n";

$callback = function($msg){
$THING = new ThingUtil();
$data = json_decode($msg->body, true);
$date = $data['date'];
$year = $data['year'];
$month = $data['month'];
$day = $data['day'];
$country = $data['country'];
$resource = $data['resource'];
$task = $data['task'];
$download_url = $data['download_url'];
echo "Downloading: " . $download_url . "\n";
  //echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
  if ( $THING->downloadDataFromResourceForSpecificDay( $resource, $year, $month, $day, $country ) == TRUE )
  {
  echo ' [v] ','Download complete', "\n\n";
  // send ack (acknowledge) tag with message
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  // queue file to extract
  sendToExtractQueue( $resource, $year, $month, $day, $country);
  } else {
  echo ' [x] ','Download failed', "\n\n";
  }
};

$download_channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($download_channel->callbacks)) {
$download_channel->wait();
}

$download_channel->close();
$download_connection->close();

function sendToExtractQueue( $resource, $year, $month, $day, $country )
{
$THING= new ThingUtil();
$extract_connection = new AMQPStreamConnection('192.168.99.100', 5672, 'guest', 'guest');

$extract_channel = $extract_connection->channel();

$extract_queue_name = "extract";
$extract_channel->queue_declare($extract_queue_name, false, true, false, false);

$exchange_name = "import-exchange";
$extract_binding_key = $extract_queue_name;
$extract_channel->queue_bind($extract_queue_name, $exchange_name, $extract_binding_key);

$extract_channel->exchange_declare('import-exchange', 'direct', false, true, false);

$data = "";
$data['date'] = $year . "-" . $month . "-" . $day;
$data['year'] = $year;
$data['month'] = $month;
$data['day'] = $day;
$data['country'] = $country;
$data['resource'] = $resource;
$data['task'] = "extract";
$data['file_to_extract'] = $THING->getLocalResourceLocation( $resource, $year, $month, $day, $country );
$data = json_encode($data);
$msg = new AMQPMessage($data,
                       array('delivery_mode' => 2) # make message persistent
                     );

$task = "extract";
$extract_channel->basic_publish($msg, 'distributor-import-exchange', "extract");

$extract_channel->close();
}

The Second consumes them like this:

$extract_connection = new AMQPStreamConnection('192.168.99.100', 5672, 'guest', 'guest');
$extract_channel = $extract_connection->channel();

$extract_queue_name = "extract";
$extract_channel->queue_declare($extract_queue_name, false, true, false, false);

echo ' [*] Waiting for things to extract... To exit press CTRL+C', "\n";

$callback = function($msg){
$Thing= new ThingUtil();
$data="";
$data = json_decode($msg->body, true);
$date = $data['date'];
$year = $data['year'];
$month = $data['month'];
$day = $data['day'];
$country = $data['country'];
$resource = $data['resource'];
$task = $data['task'];
$file_to_extract = $data['file_to_extract'];
echo "Extracting: " . $file_to_extract . "\n";

if ( $THING->extractResourceFileWithGzip( $resource, $year, $month, $day, $country ) == TRUE )
{
echo " [v] Extract complete\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
} else {
  echo ' [x] ','Extract failed', "\n\n";
}
};

// make it 'fair dispatch' the tasks
$extract_channel->basic_qos(null, 1, null);

$extract_channel->basic_consume($extract_queue_name, '', false, true, false, false, $callback);

while(count($extract_channel->callbacks)) {
$extract_channel->wait();
}

$extract_channel->close();
$extract_connection->close();

Please, if you could point me in the right direction that would be greatly appreciated :)

Michael Klishin

unread,
Oct 22, 2015, 2:12:00 PM10/22/15
to rabbitm...@googlegroups.com, Reinier
 On 22 Oct 2015 at 21:10:06, Reinier (reinie...@gmail.com) wrote:
> Uncaught exception 'PhpAmqpLib\Exception\AMQPProtocolChannelException'
> with message 'PRECONDITION_FAILED - unknown delivery tag 1'

This most often means you ack/reject a delivery more than once, or try acking/rejecting a delivery tag
that is not “known” to the channel.

Every delivery with manual acknowledgements mode must be acked/rejected only once and on
the same channel it was delivered on.
--
MK

Staff Software Engineer, Pivotal/RabbitMQ


Alvaro Videla

unread,
Oct 22, 2015, 2:13:55 PM10/22/15
to rabbitm...@googlegroups.com, Reinier
I would assume the problem happens because here:

$extract_channel->basic_consume($extract_queue_name, '', false, true, false, false, $callback);

the consumer is started in no_ack = true mode, ie: the server doesn't expect acks from this consumer. Also making this, disabled basic_qos on the line above.

So to fix it, set the no_ack flag to false. (yes, the name makes no sense, but that's how it's called at the protocol level).



--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send an email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reinier

unread,
Oct 23, 2015, 4:27:07 AM10/23/15
to rabbitmq-users, reinie...@gmail.com
Yes, this much I knew. Just didn't understand why it was happening. Alvaro Videla gave me the insight. I thought queue_declare and basic_consume had to get the same values. It worked once I changed the basic_consume to false-false-false-false.

Thank you!


Op donderdag 22 oktober 2015 20:12:00 UTC+2 schreef Michael Klishin:

Reinier

unread,
Oct 23, 2015, 4:43:05 AM10/23/15
to rabbitmq-users, reinie...@gmail.com
Yes, you gave me the insight. I thought queue_declare and basic_consume had to get the same boolean values. It worked once I changed the basic_consume to false-false-false-false.

Thank you!

And thank you for writing the php-ampqplib! It is so much fun to play (work) with RabbitMQ.


Op donderdag 22 oktober 2015 20:13:55 UTC+2 schreef Alvaro Videla:
Reply all
Reply to author
Forward
0 new messages