Tried for hours to solve this by myself, but I'm at a loss... I'm also very new at RabbitMQ, so probably it is the way I structured things. But I just can't figure it out...
I have one main producer that creates messages. And two consumers/workers, that consume messages.
The first consumer consumes the messages of the main producer. But it also creates the messages for the second consumer.
1) The first worker takes in all the messages. And acknoledges them without having handled them. But it does actually handle them (it downloads a bunch of files)
2) The second worker fails after consuming all messages, with: "PRECONDITION_FAILED - unknown delivery tag 1". This one does not take all the messages from the queue, but handles them one by one. (it extracts the downloaded files)
PHP Fatal error: Uncaught exception 'PhpAmqpLib\Exception\AMQPProtocolChannelException' with message 'PRECONDITION_FAILED - unknown delivery tag 1' in C:\project\vendor\videlalvaro\php-amqplib\PhpAmqpLib\Channel\AMQPChannel.php:191
Stack trace:
#0 [internal function]: PhpAmqpLib\Channel\AMQPChannel->channel_close(Object(PhpAmqpLib\Wire\AMQPReader))
#1 C:\project\vendor\videlalvaro\php-amqplib\PhpAmqpLib\Channel\AbstractChannel.php(194): call_user_func(Array, Object(PhpAmqpLib\Wire\AMQPReader))
#2 C:\project\vendor\videlalvaro\php-amqplib\PhpAmqpLib\Channel\AbstractChannel.php(393): PhpAmqpLib\Channel\AbstractChannel->dispatch('20,40', '\x01\x96,PRECONDITION...', NULL)
#3 C:\project\extract-worker.php(66): PhpAmqpLib\Channel\AbstractChannel->wait()
#4 {main}
thrown in C:\project\vendor\videlalvaro\php-amqplib\PhpAmqpLib\Channel\AMQPChannel.php on line 191
$download_channel = $connection->channel();
$download_channel->exchange_declare('import-exchange', 'direct', false, true, false);
$download_queue_name = "download";
$download_channel->queue_declare($download_queue_name, false, true, false, false);
$exchange_name = "import-exchange";
$download_binding_key = $download_queue_name;
$download_channel->queue_bind($download_queue_name, $exchange_name, $download_binding_key);
$data['date'] = $year . "-" . $month . "-" . $day;
$data['country'] = $country;
$data['resource'] = $resource;
$data['task'] = "download";
$data['download_url'] = $THING->getExternalResourceLocation( $resource, $year, $month, $day, $country );
$data = json_encode($data);
$msg = new AMQPMessage($data,
array('delivery_mode' => 2) # make message persistent
$download_channel->basic_publish($msg, 'import-exchange', "download");
$download_channel->close();
$download_connection = new AMQPStreamConnection('192.168.99.100', 5672, 'guest', 'guest');
$download_channel = $download_connection->channel();
$download_channel->basic_qos(null, 1, null);
$queue_name = "download";
$download_channel->queue_declare($queue_name, false, true, false, false);
echo ' [*] Waiting for things to download. To exit press CTRL+C', "\n";
$callback = function($msg){
$THING = new ThingUtil();
$data = json_decode($msg->body, true);
$country = $data['country'];
$resource = $data['resource'];
$download_url = $data['download_url'];
echo "Downloading: " . $download_url . "\n";
//echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
if ( $THING->downloadDataFromResourceForSpecificDay( $resource, $year, $month, $day, $country ) == TRUE )
echo ' [v] ','Download complete', "\n\n";
// send ack (acknowledge) tag with message
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
sendToExtractQueue( $resource, $year, $month, $day, $country);
echo ' [x] ','Download failed', "\n\n";
$download_channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($download_channel->callbacks)) {
$download_channel->wait();
$download_channel->close();
$download_connection->close();
function sendToExtractQueue( $resource, $year, $month, $day, $country )
$extract_connection = new AMQPStreamConnection('192.168.99.100', 5672, 'guest', 'guest');
$extract_channel = $extract_connection->channel();
$extract_queue_name = "extract";
$extract_channel->queue_declare($extract_queue_name, false, true, false, false);
$exchange_name = "import-exchange";
$extract_binding_key = $extract_queue_name;
$extract_channel->queue_bind($extract_queue_name, $exchange_name, $extract_binding_key);
$extract_channel->exchange_declare('import-exchange', 'direct', false, true, false);
$data['date'] = $year . "-" . $month . "-" . $day;
$data['country'] = $country;
$data['resource'] = $resource;
$data['task'] = "extract";
$data['file_to_extract'] = $THING->getLocalResourceLocation( $resource, $year, $month, $day, $country );
$data = json_encode($data);
$msg = new AMQPMessage($data,
array('delivery_mode' => 2) # make message persistent
$extract_channel->basic_publish($msg, 'distributor-import-exchange', "extract");
$extract_channel->close();
$extract_connection = new AMQPStreamConnection('192.168.99.100', 5672, 'guest', 'guest');
$extract_channel = $extract_connection->channel();
$extract_queue_name = "extract";
$extract_channel->queue_declare($extract_queue_name, false, true, false, false);
echo ' [*] Waiting for things to extract... To exit press CTRL+C', "\n";
$callback = function($msg){
$data = json_decode($msg->body, true);
$country = $data['country'];
$resource = $data['resource'];
$file_to_extract = $data['file_to_extract'];
echo "Extracting: " . $file_to_extract . "\n";
if ( $THING->extractResourceFileWithGzip( $resource, $year, $month, $day, $country ) == TRUE )
echo " [v] Extract complete\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
echo ' [x] ','Extract failed', "\n\n";
// make it 'fair dispatch' the tasks
$extract_channel->basic_qos(null, 1, null);
$extract_channel->basic_consume($extract_queue_name, '', false, true, false, false, $callback);
while(count($extract_channel->callbacks)) {
$extract_channel->wait();
$extract_channel->close();
$extract_connection->close();