Анализ и обработка данных средствами Tarantool

468 views
Skip to first unread message

Юрий Лосев

unread,
Apr 7, 2016, 11:30:00 AM4/7/16
to tarantool-ru
Добрый день.
Весьма заинтересовала данная система, и я попытался применить её для потока данных аккаунтинга с радиус-серверов.
Я не уверен, что правильно мыслю с точки зрения tarantool, потому решил испросить совета.
На данный момент эта система работает посредством классической СУБД, и работает весьма паршиво. Я начал переводить её на redis... и здесь вспомнил про tarantool, стало интересно, можно ли применить его в данной сфере. Ибо redis накладывает свои ограничения, в частности, либо все нужные данные нужно писать непосредственно с сервера, в разные "namespace", либо иметь, какой-то демон, который будет обрабатывать их и раскладывать как надо. А хочется всетаки иметь черный ящик, в который ты забросил данные, а они там обработались по заранее прописанной логике, которую к тому же можно изменить, что-то добавить или убрать налету.

Итак суть, есть поток данных вида:

uniqid | username | vlan | ip | status | timestamp | inputPackets | outputPackets .. и еще часть полей, на самом деле перечислять все нет смысла.

На каждого пользователя по сути приходят следующие записи:
1. Start запись (status = "Start")
2. Одна или несколько Update записей (status = "Update")
3. Stop запись (status = "Stop")

у них у всех uniqid одинаков, соответственно первичный индекс либо делать по uniqid+timestamp, либо что мне пока больше нравится, по сути апдейтить предыдущую запись, обновляя её поле status, тогда primary_index = uniqid

Это преамбула, а теперь собственно вопросы.

Мне нет нужды хранить все сессии пользователя в памяти, хотелось бы хранить допустим последние 3 сессии, а остальные вытеснять на диск.

1. Есть ли некий аналог LPUSH + LTRIM redis`а. Или я должен реализовать это через lua процедуру? По сути нужен кольцевой буффер,только без удаления. Стоит ли здесь использовать некий кастомный explorationd, или есть более подходящие инструменты.
2. Я не нашел в документации операции move между space. Правильно ли я понимаю, что я должен сначала сделать insert в sophia-space, а потом удалить данные из памяти?
3. Правильно ли я понимаю, что есть только низкоуровневые операции с индексами. То есть я не могу сделать выборку "not equal", или не могу допустим выбрать по двум одиночным индексам(по составному естественно могу), либо мне нужно выбрать сначала 1 индекс, потом второй, и самому искать их пересечения.
4. Также, как я понимаю, нет оператора ИЛИ для индексов, то есть я не могу реализовать что-то типа: "SELECT * FROM radacct WHERE status="Start" or status="Update"", опять же нужна lua процедура?
5. Нет операции для выборки только уникальных значений индекса? Допустим пользователи постоянно повторяются, и я хочу сделать чтото типа: "SELECT DISTINCT(username) FROM radacct WHERE vlan=...". Опять же нужно реализовывать через lua, итерируя по индексу `username`?


Хотелось бы также иметь какуюто статистику, её нельзя назвать realtime, скорее near-realtime, ибо задержка в несколько секунд неважна.
Например: Количество пользователей online в конкретном вилане. Или количество уникальных пользователей в вилане за неделю.

Не вижу смысла для этого хранить всех пользователей за неделю в памяти, ведь достаточно для этого сделать отдельный space, и заливать туда только vlan, username, timestamp допустим

Здесь я не понимаю, как построить процессинг. У меня есть space с исходными записями сессий (raw_acct), а дальше мне нужно както обработать каждую приходящую в него запись, и собрать по ним.
Что здесь лучше использовать? expirationd? И анализировать + перекладывать записи по нужным space`ам (online_users, 1week_users). Или есть что-то лучше?

Или здесь может быть лучше применить queue? Просто как я понимаю, в очередь ктото должен данные толкать. Не хотелось бы делать этого с радиус сервера, ибо он должен быстро записать данные, и обслуживать следующую запись. Нагрузка конечно копеечная, для таких систем (~1000 - 2000rps, может будет чуть больше, если мы сможем уменьшить таймаут, предыдущая система не справлялась с такими цифрами.), но не будет ли очередь тормозить этот процесс? Впринципе лаг в несколько секунд абсолютно не важен.
И с использованием очереди придется еще прогонять через нее каждое событие аккаунтинга, а не оперировать одной сущностью "сессия", которая просто может быть в разном статусе.

По-моему у меня еще были вопросы, но я получился по-моему и так чересчур многословен, за что прошу меня простить. Если прижмет, спрошу в дальнейшем. Просто нет ничего хуже, чем пытаться положить новую технологию на рельсы предыдущего опыта, поэтому я пытаюсь понять, правильно ли я подхожу к tarantool.

Можно тыкать моськой в документацию, где я что-то не нашел/пропустил.

Спасибо.

Konstantin Osipov

unread,
Apr 7, 2016, 12:03:28 PM4/7/16
to tarant...@googlegroups.com
* Юрий Лосев <kei...@gmail.com> [16/04/07 18:34]:
> Мне нет нужды хранить все сессии пользователя в памяти, хотелось бы хранить
> допустим последние 3 сессии, а остальные вытеснять на диск.

На диск - это куда? В другую базу, в sophia engine?

> 1. Есть ли некий аналог LPUSH + LTRIM redis`а. Или я должен реализовать это
> через lua процедуру? По сути нужен кольцевой буффер,только без удаления.

У таплов есть добавление поля в конец, и удаление из начала.
Только пока не понятно, как лучше представить отдельное событие -
как поле тапла, или как отдельный тапл. Какой размер события?
Сколько событий собираетесь хранить в памяти (3?)

> Стоит ли здесь использовать некий кастомный explorationd, или есть более
> подходящие инструменты.

Сейчас только в чате обсуждали как прикрутить expiraitond для
вытеснения в sophia engine. Так что это однозначно один из
вариантов.

Т.е. я вижу следующие принципиальные подходы:

1) Хранить только K последних событий. Всё в памяти, в момент
добавления K+1 события, самое старое удаляется. Хранимки на
Lua.

2) Хранить только K последних событий. Но чистку делать в
background, с помощью expirationd. Т.е. обходить все таплы в
background и удалять старые события.

3) То же самое что и 2), но старые события не удалять, а пушать в
sophia или в postgres/mysql.

2) и 3) делаются через expiraitond. Сделать ли индекс по id,
tstamp или только по id, зависит от размера события. Старайтесь
чтобы таплы в среднем были меньше 300-400 байт - это оптимально
для нас.

> 2. Я не нашел в документации операции move между space. Правильно ли я
> понимаю, что я должен сначала сделать insert в sophia-space, а потом
> удалить данные из памяти?

Да.

> 3. Правильно ли я понимаю, что есть только низкоуровневые операции с
> индексами. То есть я не могу сделать выборку "not equal", или не могу
> допустим выбрать по двум одиночным индексам(по составному естественно
> могу), либо мне нужно выбрать сначала 1 индекс, потом второй, и самому
> искать их пересечения.

Да.

> 4. Также, как я понимаю, нет оператора ИЛИ для индексов, то есть я не могу
> реализовать что-то типа: "SELECT * FROM radacct WHERE status="Start" or
> status="Update"", опять же нужна lua процедура?

Это в целом можно попробовать сделать с помощью bitset индекса.
Либо сделать чтобы update шёл после start и сделать status > x

> 5. Нет операции для выборки только уникальных значений индекса? Допустим
> пользователи постоянно повторяются, и я хочу сделать чтото типа: "SELECT
> DISTINCT(username) FROM radacct WHERE vlan=...". Опять же нужно
> реализовывать через lua, итерируя по индексу `username`?

Я бы сделал индекс vlan, username, т.к. WHERE vlan = есть. И вроде
этот индекс будет сам по себе уже уникальный. При поиске можно
задать частичный ключ (vlan).

> Хотелось бы также иметь какуюто статистику, её нельзя назвать realtime,
> скорее near-realtime, ибо задержка в несколько секунд неважна.
> Например: Количество пользователей online в конкретном вилане. Или
> количество уникальных пользователей в вилане за неделю.

Есть index:count().

> Не вижу смысла для этого хранить всех пользователей за неделю в памяти,
> ведь достаточно для этого сделать отдельный space, и заливать туда только
> vlan, username, timestamp допустим
>
> Здесь я не понимаю, как построить процессинг. У меня есть space с исходными
> записями сессий (raw_acct), а дальше мне нужно както обработать каждую
> приходящую в него запись, и собрать по ним.
> Что здесь лучше использовать? expirationd? И анализировать + перекладывать
> записи по нужным space`ам (online_users, 1week_users). Или есть что-то
> лучше?

Это зависит от характера нагрузки. expirationd подходит для
ситуаций когда можно что-то вычислить в background, и результат
сложить куда-то отдельно. Он экономит CPU в целом.
Если хватает CPU, можно всё вычислять в realtime, при обработке
входящего события - и такой код всегда проще поддерживать.

Вы сколько событий в секунду на инстанс хотите обрабатывать?

> Или здесь может быть лучше применить queue? Просто как я понимаю, в очередь
> ктото должен данные толкать. Не хотелось бы делать этого с радиус сервера,
> ибо он должен быстро записать данные, и обслуживать следующую запись.

> Нагрузка конечно копеечная, для таких систем (~1000 - 2000rps, может будет
> чуть больше, если мы сможем уменьшить таймаут, предыдущая система не
> справлялась с такими цифрами.), но не будет ли очередь тормозить этот
> процесс? Впринципе лаг в несколько секунд абсолютно не важен.
> И с использованием очереди придется еще прогонять через нее каждое событие
> аккаунтинга, а не оперировать одной сущностью "сессия", которая просто
> может быть в разном статусе.

Имхо при такой нагрузке можно пробовать все действия (кроме записи
в Софию-Постгрес) делать в реалтайме в хранимке обрабатывающей
событие.

> По-моему у меня еще были вопросы, но я получился по-моему и так чересчур
> многословен, за что прошу меня простить. Если прижмет, спрошу в дальнейшем.
> Просто нет ничего хуже, чем пытаться положить новую технологию на рельсы
> предыдущего опыта, поэтому я пытаюсь понять, правильно ли я подхожу к
> tarantool.

Очень похоже что мы как раз для вас. На чём-то ещё будете городить
адовый огород, у нас должно всё завестись в 300-400 строк на Lua.

> Можно тыкать моськой в документацию, где я что-то не нашел/пропустил.

Лучше конкретизировать вопросы. И нужно ещё понять сколько вообще
данных и каким железом хотите обойтись. Пока что видно только про
RPS, но неясно сколько всего users/vlan, сколько одно событие.

Надо бы оценить сколкьо памяти понадобится на хранение 1 события,
и за какой срок нужно хранить историю.

--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.org - www.twitter.com/kostja_osipov

Юрий Лосев

unread,
Apr 8, 2016, 3:54:11 AM4/8/16
to tarantool-ru


На диск - это куда? В другую базу, в sophia engine?

Хотелось бы здесь использовать sophia engine. Эти данные по сути нужны на всякий случай, и хранить бы их месяца 3. Если что-то понадбоится там найти - full scan сделать не проблема. Не хочется плодить еще баз для этого, по крайней мере пока.

У таплов есть добавление поля в конец, и удаление из начала.
Только пока не понятно, как лучше представить отдельное событие -
как поле тапла, или как отдельный тапл. Какой размер события?
Сколько событий собираетесь хранить в памяти (3?)

Примерно 150 000 . 50 000 пользователей (в пике может чуть больше), на каждого 3-5 последних событий. (можно пока считать что 3)



Т.е. я вижу следующие принципиальные подходы:

1) Хранить только K последних событий. Всё в памяти, в момент
   добавления K+1 события, самое старое удаляется. Хранимки на
   Lua.

А вот это через что у вас реализуется? Или нужно писать хранимку и подавать событие ей на вход, а уже в хранимке транзакционно проводить операцию по добавлению/удалению?
 
2) Хранить только K последних событий. Но чистку делать в
   background, с помощью expirationd. Т.е. обходить все таплы в
   background и удалять старые события.

3) То же самое что и 2), но старые события не удалять, а пушать в
   sophia или в postgres/mysql.

Вот 3ий вариант мне пока больше всего нравится. Только вопрос по expirationd:
А несколько задач expirationd могут работать параллельно? (по разным индексам/space`ам) . Каждая job`a в своем треде запускается?
Насколько много задач можно запустить? Сколько проца хватит?
Как часто они запускаются, и регулируется ли это как-то?
И на что указывает параметр "full_scan_time". Если я него неправильно укажу, что случится?


2) и 3) делаются через expiraitond. Сделать ли индекс по id,
   tstamp или только по id, зависит от размера события. Старайтесь
   чтобы таплы в среднем были меньше 300-400 байт - это оптимально
   для нас.

Увы, не знаю как в lua посчитать размер тапла. Я сделал такой тапл в python, то он ~150 байт.
 




> 5. Нет операции для выборки только уникальных значений индекса? Допустим
> пользователи постоянно повторяются, и я хочу сделать чтото типа: "SELECT
> DISTINCT(username) FROM radacct WHERE vlan=...". Опять же нужно
> реализовывать через lua, итерируя по индексу `username`?

Я бы сделал индекс vlan, username, т.к. WHERE vlan = есть. И вроде
этот индекс будет сам по себе уже уникальный. При поиске можно
задать частичный ключ (vlan).

Тут я видимо вопрос неправильно изложил. Я имел ввиду, что хочу получить все уникальные логины в space`е. Без повторений и прочих данных. Я понимаю что вопрос звучит глупо, применительно к индексам, скорее всего я должен сам писать хранимку для этого. Просто думалось, что может есть какой-нибудь метод unique(). Чтото типа: box.space.example.index.username:unique()
 


> Не вижу смысла для этого хранить всех пользователей за неделю в памяти,
> ведь достаточно для этого сделать отдельный space, и заливать туда только
> vlan, username, timestamp допустим
>
> Здесь я не понимаю, как построить процессинг. У меня есть space с исходными
> записями сессий (raw_acct), а дальше мне нужно както обработать каждую
> приходящую в него запись, и собрать по ним.
> Что здесь лучше использовать? expirationd? И анализировать + перекладывать
> записи по нужным space`ам (online_users, 1week_users). Или есть что-то
> лучше?

Это зависит от характера нагрузки. expirationd подходит для
ситуаций когда можно что-то вычислить в background, и результат
сложить куда-то отдельно. Он экономит CPU в целом.
Если хватает CPU, можно всё вычислять в realtime, при обработке
входящего события - и такой код всегда проще поддерживать.

Вы сколько событий в секунду на инстанс хотите обрабатывать?

Я думаю, если сравнивать с объемами, с которыми вы привыкли работать - то очень мало :) Сейчас всего событий в секунду ~200, могут быть всплески до ~500 при аварийных ситуациях. Правда эта цифра создана из-за ограничений РСУБД. Если все замкнуть на тарантул и уменьшить таймаут (что давало бы более хорошую статистику), то даже в худшем случае думаю в 1500-2000 уложимся. Это входящих событий. Я так понимаю, для tarantool - это ниочем, в 1 инстанс сможет справиться.
 

Имхо при такой нагрузке можно пробовать все действия (кроме записи
в Софию-Постгрес) делать в реалтайме в хранимке обрабатывающей
событие.

Мне тоже кажется, что должно быть легко. Просто не хочется старые данные хранить в оперативной памяти, она увы в текущий момент у нас весьма дифицитна :)
Я для интереса влил в тарантул некоторое количество событие. Сейчас 300 000 событий с парой индексов, занимают ~200Mb оперативки. Это собрано буквально за час. Если хранить в памяти события хотябы за неделю... получится очень непрактичное использование ресурсов (ладно бы по ним какуюто статистику собирать, а то ведь получается что её надо собрать только при получении сообщения).

Лучше конкретизировать вопросы. И нужно ещё понять сколько вообще
данных и каким железом хотите обойтись. Пока что видно только про
RPS, но неясно сколько всего users/vlan, сколько одно событие.

Надо бы оценить сколкьо памяти понадобится на хранение 1 события,
и за какой срок нужно хранить историю.

users = ~50000, ночью сильно меньше, днем примерно так.
vlan = ~10000.
Одно событие, я уже написал, в питоне ~150 байт. Могу дать пример:
"1c4f950e0312b43399290a0c89ba0c90", "0/1/0/3119.2415_D90EC019006F42F2", "192.168.1.1" ,"myuserlogin", "192.168.100.100", "1078.d22f.567d", "3119.2415", 0, 0, 0, "User-Error", "Stop", 1460100856

может парочка полей еще добавится. Но думаю 300 байт никак не превысить.
История: тут я уже писал. В памяти - 3-5 последних событий. На диске - месяца 3.

Железо...  тут как обычно, хочется уложить в наименьшие объемы,  по крайней мере по памяти. У нас свои сервера, поэтому выделить под это дело виртуалку (VMWare) - нет особых проблем.
 

Konstantin Osipov

unread,
Apr 8, 2016, 4:40:07 AM4/8/16
to tarant...@googlegroups.com
* Юрий Лосев <kei...@gmail.com> [16/04/08 11:20]:
> > У таплов есть добавление поля в конец, и удаление из начала.
> > Только пока не понятно, как лучше представить отдельное событие -
> > как поле тапла, или как отдельный тапл. Какой размер события?
> > Сколько событий собираетесь хранить в памяти (3?)
> > Примерно 150 000 . 50 000 пользователей (в пике может чуть больше), на
> каждого 3-5 последних событий. (можно пока считать что 3)

Я пока не могу дать гарантии что София достаточно стабильна. Мы
пытаемся запустить её на похожем проекте, но по-прежнему находим
определённые проблемы.
> > 1) Хранить только K последних событий. Всё в памяти, в момент
> > добавления K+1 события, самое старое удаляется. Хранимки на
> > Lua.
> >
> > А вот это через что у вас реализуется? Или нужно писать хранимку и
> подавать событие ей на вход, а уже в хранимке транзакционно проводить
> операцию по добавлению/удалению?

Именно так.

> Вот 3ий вариант мне пока больше всего нравится. Только вопрос по
> expirationd:
> А несколько задач expirationd могут работать параллельно? (по разным
> индексам/space`ам) . Каждая job`a в своем треде запускается?
> Насколько много задач можно запустить? Сколько проца хватит?

Могут. Каждая в своём файбере. Много - тысячи. Расходование проца
зависит от настройки задач - сколько времени на полный просмотр
индекса и сколько записи за 1 итерацию просматривается. Думаю,
можно подтюнить так что сильно жрать не будет.

> Как часто они запускаются, и регулируется ли это как-то?

Да, опции tuples_per_iteration и full_scan_time, здесь:

https://github.com/tarantool/expirationd/blob/master/expirationd.lua#L212

https://github.com/tarantool/expirationd/blob/master/expirationd.lua#L213

> И на что указывает параметр "full_scan_time". Если я него неправильно
> укажу, что случится?

Идея этого параметра такая: перед началом цикла просмотра спейса,
expiraitond берёт размер space', full_scan_time и прикидывает как
часто иму нужно просыпаться чтобы успеть весь спейс просмотреть за
full_scan_time секунд с учётом того что он просматривает за раз
только tuples_per_iteration записей.

> > 2) и 3) делаются через expiraitond. Сделать ли индекс по id,
> > tstamp или только по id, зависит от размера события. Старайтесь
> > чтобы таплы в среднем были меньше 300-400 байт - это оптимально
> > для нас.
> >
> > Увы, не знаю как в lua посчитать размер тапла. Я сделал такой тапл в
> python, то он ~150 байт.

Сохраните данные, и в Lua сделайте tuple:bsize()

Будет примерно столько же в Lua.
> >
> > > 5. Нет операции для выборки только уникальных значений индекса? Допустим
> > > пользователи постоянно повторяются, и я хочу сделать чтото типа: "SELECT
> > > DISTINCT(username) FROM radacct WHERE vlan=...". Опять же нужно
> > > реализовывать через lua, итерируя по индексу `username`?
> >
> > Я бы сделал индекс vlan, username, т.к. WHERE vlan = есть. И вроде
> > этот индекс будет сам по себе уже уникальный. При поиске можно
> > задать частичный ключ (vlan).
> >
> > Тут я видимо вопрос неправильно изложил. Я имел ввиду, что хочу получить
> все уникальные логины в space`е. Без повторений и прочих данных. Я понимаю
> что вопрос звучит глупо, применительно к индексам, скорее всего я должен
> сам писать хранимку для этого. Просто думалось, что может есть какой-нибудь
> метод unique(). Чтото типа: box.space.example.index.username:unique()

Нет, такого нет. Надо значит заводить отдельный space с логинами?

> Я думаю, если сравнивать с объемами, с которыми вы привыкли работать - то
> очень мало :) Сейчас всего событий в секунду ~200, могут быть всплески до
> ~500 при аварийных ситуациях. Правда эта цифра создана из-за ограничений
> РСУБД. Если все замкнуть на тарантул и уменьшить таймаут (что давало бы
> более хорошую статистику), то даже в худшем случае думаю в 1500-2000
> уложимся. Это входящих событий. Я так понимаю, для tarantool - это ниочем,
> в 1 инстанс сможет справиться.

Да, но может не хватить памяти. Т.е. важно ещё понять сколько
всего данных будет храниться.

> Я для интереса влил в тарантул некоторое количество событие. Сейчас 300 000
> событий с парой индексов, занимают ~200Mb оперативки. Это собрано буквально
> за час. Если хранить в памяти события хотябы за неделю... получится очень
> непрактичное использование ресурсов (ладно бы по ним какуюто статистику
> собирать, а то ведь получается что её надо собрать только при получении
> сообщения).

ОК, т.е. всего будет ок. 5 гигов оперативных данных (сутки) и 450
гигов данных за 3 месяца, это если события никак не агрегировать и
не сжимать.


> > Надо бы оценить сколкьо памяти понадобится на хранение 1 события,
> > и за какой срок нужно хранить историю.
> >
> > users = ~50000, ночью сильно меньше, днем примерно так.
> vlan = ~10000.
> Одно событие, я уже написал, в питоне ~150 байт. Могу дать пример:
> "1c4f950e0312b43399290a0c89ba0c90", "0/1/0/3119.2415_D90EC019006F42F2",
> "192.168.1.1" ,"myuserlogin", "192.168.100.100", "1078.d22f.567d",
> "3119.2415", 0, 0, 0, "User-Error", "Stop", 1460100856

ОК, при таком раскладе можно хранить каждое событие и как 1 поле тапла,
т.е. ключ userid, а можно как отдельный тапл, т.е. ключ userid,
tstamp. Как удобнее.

Юрий Лосев

unread,
Apr 8, 2016, 4:58:03 AM4/8/16
to tarantool-ru
Константин, а есть batch insert/delete?
Нельзя при итерации собрать временную таблицу, а потом отдать её на удаление целиком?

Konstantin Osipov

unread,
Apr 8, 2016, 5:43:37 AM4/8/16
to tarant...@googlegroups.com
* Юрий Лосев <kei...@gmail.com> [16/04/08 12:01]:
> Константин, а есть batch insert/delete?
> Нельзя при итерации собрать временную таблицу, а потом отдать её на
> удаление целиком?

В сетевом протоколе есть, в Lua - нет. Он особо ничего в Lua не
и не даёт, я пробовал добавлять в 1.6, получил 20-30% снижение по
CPU, оно того не стоило.


Если хотите снизить обращения из Lua в WAL, то делайте delete в одной
транзакции. Вся транзакция пишется в WAL в момент commit.
Reply all
Reply to author
Forward
0 new messages