работа с очередями

81 views
Skip to first unread message

dkozhe...@hlcompany.ru

unread,
Oct 26, 2020, 7:39:45 AM10/26/20
to tarantool-ru
подскажите пожалуйста, как можно смоделировать такую задачу:
в данный момент есть rest api, обертка над очередями. клиенты вставляют в очередь свои данные, сервер висит на long-poling в цикле выгребает данные, обрабатывает и подтверждает обработку в очереди. все работает.
теперь в другой задаче, захотелось чтобы сервером обработки был тарантул, т.е. данные по rest api принимаем, а затем этот-же тарантул пишет содержимое очереди в базу данных.
не понятно, в контексте чего запускать бесконечный цикл обработки.
или порождать задание, после каждого запроса, которое очередь блокирует? как правильно сделать?

Спасибо.

Alexander Turenko

unread,
Oct 29, 2020, 2:34:25 PM10/29/20
to tarantool-ru
Навскидку:

Запускаем инстанс тарантула с queue ([1]). Берем http.server ([2]) и
накручиваем http api поверх take и ack. Бесконечным циклом в данном
случае будет являться event loop тарантула.


WBR, Alexander Turenko.

понедельник, 26 октября 2020 г. в 14:39:45 UTC+3, dkozhe...@hlcompany.ru:

dkozhe...@hlcompany.ru

unread,
Nov 2, 2020, 12:25:40 AM11/2/20
to tarantool-ru
возможно я чего-то не понял, но по моему сейчас это именно так и работает (т.е. вызов take происходит в контексте GET /docs  )
т.е. сейчас апи выглядит так:
POST /docs - добавляет элемент в очередь
GET /docs - цикл с таймаутом, в котором take. если take что-то нашел и закончился таймаут выдачи - выдаются данные, если нет, то ждем таймаут ожидания и потом возврашем что ничего не нашли. Данный метод в клиентском приложении опрашивает бесконечный цикл
PUT /docs/<q_id>?cmd=(ack/release) - вызывается, когда клиентское приложение сделало что-то полезное с тем, что ему отдал GET /docs

Как хочется переделать:

POST /docs - Добавляет события в очередь
и где-то в контексте тарантула крутится цикл с take /ack для обработки этой очереди.

или просто, ниже старта сервера ? а как корректно останов обработать?

local httpd = cfgserv()
httpd:start()

while true do
    local data = db.q_savedoc:take(timeToTimeout)
    --- обработка
end



четверг, 29 октября 2020 г. в 22:34:25 UTC+4, totkto...@gmail.com:

Alexander Turenko

unread,
Nov 2, 2020, 12:47:36 AM11/2/20
to tarantool-ru
Я уловил идею так: хочется завести фоновый воркер, который будет делать
некоторые действия над задачами из очереди.

Для этого нужно завести файбер ([1]), в котором крутить цикл с take:

fiber.create(function()
    while true do
        local task = queue.tube.foo:take()
        <...process task...>
    end
end)

Таких файберов можно завести несколько. Это имеет смысл, если обработка задачи
ждет IO (например, делается HTTP-запрос).

Если обработка задачи может выбросить ошибку, то стоит завести сторожевой
файбер (перезапускалку файбера, обрабатывающего задачи). Подсмотреть реализацию
такого подхода можно в expirationd ([2]), поискать по слову guardian.

[1]: https://www.tarantool.io/en/doc/latest/reference/reference_lua/fiber/
[2]: https://github.com/tarantool/expirationd/blob/master/expirationd.lua

WBR, Alexander Turenko.

понедельник, 2 ноября 2020 г. в 08:25:40 UTC+3, dkozhe...@hlcompany.ru:

dkozhe...@hlcompany.ru

unread,
Nov 2, 2020, 4:29:15 AM11/2/20
to tarantool-ru
Да, спасибо, файбер - это то что мне нужно. Как я раньше без него жил - не понятно.
Узнал много нового о тарантуле :)

понедельник, 2 ноября 2020 г. в 09:47:36 UTC+4, totkto...@gmail.com:
Reply all
Reply to author
Forward
0 new messages