Go и RabbitMQ - как вычитывать сообщения без блокировки?

690 views
Skip to first unread message

Денис Новосибирский

unread,
Oct 20, 2015, 8:01:21 AM10/20/15
to Golang Russian
Здравствуйте!
Есть у нас PHP-скрипт, вычитывающий сообщения из пачки очередей RabbitMQ. Я решил написать его аналог на Go, чтобы познакомиться с эти языком поближе. Для чтения из кролика использую вот эту либу https://github.com/streadway/amqp (других вроде и нет). Столкнулся с такой проблемой: либа построена таким образом, что при чтении из очереди мы вынуждены читать из go-канала. При этом если сообщений нет, то выполнение кода блокируется. Мне же нужно при отсутствии сообщений в очереди как-то узнавать об этом и завершать процесс чтения (не имеет значения, запущен он в го-рутине или в основном потоке).


        q, err := ch.QueueDeclare(
            "queue_name", // name
            true,         // durable
            false,        // delete when unused
            false,        // exclusive
            false,        // no-wait
            nil,          // arguments
        )
        if err != nil {
            continue
        }
        msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            false,  // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        if err != nil {
            continue
        }
        for d := range msgs {
           // Здесь получаем блокировку. Запуск в go-рутине не решает проблему, т.к. всё равно нужно знать, есть сообщения в очереди или нет.
        }


Alexey Palazhchenko

unread,
Oct 20, 2015, 8:02:03 AM10/20/15
to gola...@googlegroups.com
select с default: https://tour.golang.org/concurrency/6

20 окт. 2015 г., в 14:54, Денис Новосибирский <telyuk...@gmail.com> написал(а):

--
Вы получили это сообщение, поскольку подписаны на группу "Golang Russian".
Чтобы отменить подписку на эту группу и больше не получать от нее сообщения, отправьте письмо на электронный адрес golang-ru+...@googlegroups.com.
Чтобы настроить другие параметры, перейдите по ссылке https://groups.google.com/d/optout.

Александр Карасев

unread,
Oct 20, 2015, 8:05:16 AM10/20/15
to gola...@googlegroups.com
Или можно использовать метод QueueInspect, в котором можно посмотреть количество сообщений в канале
https://godoc.org/github.com/streadway/amqp#Channel.QueueInspect


20 окт. 2015 г., в 15:02, Alexey Palazhchenko <alexey.pa...@gmail.com> написал(а):
Message has been deleted

Vladimir Suchkov

unread,
Oct 20, 2015, 10:32:06 AM10/20/15
to Golang Russian

    for{

        select{

            case d:=<-msgs:

            // есть сообщение

            default:

            // нет сообщения

        }

    }





вторник, 20 октября 2015 г., 15:01:21 UTC+3 пользователь Денис Новосибирский написал:

Денис Новосибирский

unread,
Oct 21, 2015, 1:59:30 AM10/21/15
to Golang Russian
Select с default пробовал - почему-то выполнение кода проходит в секцию default даже в том случае, когда сообщения есть в очереди. Возможно, либа написана так, что они вычитываются как-то асинхронно и default успевает отработать.

А вот вариант с методом QueueInspect, пожалуй, то, что надо. Спасибо.

Silent

unread,
Oct 21, 2015, 3:58:24 AM10/21/15
to Golang Russian
Select с default пробовал - почему-то выполнение кода проходит в секцию default даже в том случае, когда сообщения есть в очереди. Возможно, либа написана так, что они вычитываются как-то асинхронно и default успевает отработать.

Выполнение кода проходит в секцию default потому как итерация цикла априори быстрее чем получение данных (сообщения) из сервиса rabbitmq.
Код с QueueInspect будет корректен только в случае если в очередь больше никто не будет ничего класть, либо по окончании цикла еще раз смотреть, если ли сообщения.
Можно сделать вместо default ветку с таймаутом, вот так
for {
 
select {
 
case d := <-msgs:
   
// есть сообщение

 
case <-time.After(200 * time.Millisecond):
   
// нет сообщения
 
}
}

А еще лучше не тащить из php эту архитектуру "скрипт запускается по крону", а сделать демоном, пусть себе висит на блокировке канала, все равно ж все сообщения надо обработать

Денис Новосибирский

unread,
Oct 21, 2015, 9:59:39 AM10/21/15
to Golang Russian
Про daemon понятно, вероятно, так и придётся делать. Но, на мой взгляд, это навязывание архитектуры. А если, к примеру, нужно вычитывать сообщения по требованию и точно известно, что в ближайшие сутки никто туда писать не будет?

Artem Andreenko

unread,
Oct 21, 2015, 7:25:03 PM10/21/15
to gola...@googlegroups.com
А просто посмотреть len(msgs)?

2015-10-21 16:59 GMT+03:00 Денис Новосибирский <telyuk...@gmail.com>:
Про daemon понятно, вероятно, так и придётся делать. Но, на мой взгляд, это навязывание архитектуры. А если, к примеру, нужно вычитывать сообщения по требованию и точно известно, что в ближайшие сутки никто туда писать не будет?

--

Vladimir Suchkov

unread,
Oct 22, 2015, 4:15:59 AM10/22/15
to Golang Russian, mio...@gmail.com
    if len(msg) > 0 { // вот здесь длина будет 0, а в канал уже что то упало
        d := <-msg
    }


четверг, 22 октября 2015 г., 2:25:03 UTC+3 пользователь Artem Andreenko написал:

Artem Andreenko

unread,
Oct 22, 2015, 9:41:42 AM10/22/15
to Vladimir Suchkov, Golang Russian
Странно, вот пример:

Alex Lurye

unread,
Oct 22, 2015, 9:49:05 AM10/22/15
to gola...@googlegroups.com, Vladimir Suchkov
Что странного? Вывод этой программы не определен. Он зависит от того, успеет ли дочерняя горутина считать данные (0) или нет (1).

Artem Andreenko

unread,
Oct 22, 2015, 10:11:18 AM10/22/15
to gola...@googlegroups.com, Vladimir Suchkov
Я дал альтернативное решение для вопроса топик стартера:

"... Столкнулся с такой проблемой: либа построена таким образом, что при чтении из очереди мы вынуждены читать из go-канала. При этом если сообщений нет, то выполнение кода блокируется. Мне же нужно при отсутствии сообщений в очереди как-то узнавать об этом и завершать процесс чтения (не имеет значения, запущен он в го-рутине или в основном потоке)."

Vladimir Suchkov

unread,
Oct 22, 2015, 11:55:38 AM10/22/15
to Golang Russian, 0x3e...@gmail.com, mio...@gmail.com
http://play.golang.org/p/PFlv0h5RZO

четверг, 22 октября 2015 г., 16:41:42 UTC+3 пользователь Artem Andreenko написал:

Artem Andreenko

unread,
Oct 22, 2015, 12:28:38 PM10/22/15
to Vladimir Suchkov, Golang Russian


2015-10-22 18:55 GMT+03:00 Vladimir Suchkov <0x3e...@gmail.com>:

Вы же сами закомментировали логирование момента, когда len > 0. В чем суть проблемы то?

Vladimir Suchkov

unread,
Oct 22, 2015, 12:40:41 PM10/22/15
to Golang Russian, 0x3e...@gmail.com, mio...@gmail.com
Суть проблемы в том, что "check: 1" программа может написать, а может не написать
вот вам более веселый пример: http://play.golang.org/p/sh2EfbJ8Kv

четверг, 22 октября 2015 г., 19:28:38 UTC+3 пользователь Artem Andreenko написал:

Artem Andreenko

unread,
Oct 22, 2015, 12:43:15 PM10/22/15
to Vladimir Suchkov, Golang Russian
Вы, наверное, шутите ;-)

Vladimir Suchkov

unread,
Oct 22, 2015, 12:55:01 PM10/22/15
to Golang Russian, 0x3e...@gmail.com, mio...@gmail.com
Если сделать так http://play.golang.org/p/u54I67r9gg и запустить локально, а не в плэйграунде, то будет через раз зацикливается 
;)

четверг, 22 октября 2015 г., 19:43:15 UTC+3 пользователь Artem Andreenko написал:

Artem Andreenko

unread,
Oct 22, 2015, 4:52:03 PM10/22/15
to Vladimir Suchkov, Golang Russian
Я не вижу в чем проблема? Хотите сказать, что функция len(chan) работает неверно?

Vladimir Suchkov

unread,
Oct 23, 2015, 7:58:28 AM10/23/15
to Golang Russian, 0x3e...@gmail.com, mio...@gmail.com
Хочу сказать, что len(chan) не совсем подходит для проверки канала на отсутствие данных.

четверг, 22 октября 2015 г., 23:52:03 UTC+3 пользователь Artem Andreenko написал:
Reply all
Reply to author
Forward
0 new messages