[erlang-questions] Memory-effective queue handling

108 views
Skip to first unread message

Attila Rajmund Nohl

unread,
Jul 24, 2012, 11:11:31 AM7/24/12
to erlang-questions
Hello!

I have a data structure where I manage "historical data": a list of
timestamps when events happened. I only need to know how many events
happened in the last 24 hours. When an event happens, I put the
timestamp into the queue (I use the queue module in OTP). When I check
how many values are in the queue, I also remove the entries that are
older than 24 hours. My problem is: when I remove the old elements, I
can do it one by one and at each step the (possibly big) queue gets
copied and I run out of memory. Is there a cleverer way to remove
elements from queue? Or maybe I shall use an ordered_set ETS table
instead of a queue?
_______________________________________________
erlang-questions mailing list
erlang-q...@erlang.org
http://erlang.org/mailman/listinfo/erlang-questions

Dmitry Kolesnikov

unread,
Jul 24, 2012, 12:11:19 PM7/24/12
to Attila Rajmund Nohl, erlang-questions
Hello,

I do not think that this is a good idea to keep whole 24h statistic is same queue.
For "cleanup" efficiency you could have a queue per time slot (e.g. per hour, per 30min, etc)
Instead of queue, I am using patched gb_trees exactly for same purposes but I am doing both read/write.

- Dmitry

Erik Søe Sørensen

unread,
Jul 24, 2012, 2:36:43 PM7/24/12
to Attila Rajmund Nohl, erlang-questions


Den 24/07/2012 17.11 skrev "Attila Rajmund Nohl" <attila...@gmail.com>:
>
> Hello!
>
> I have a data structure where I manage "historical data": a list of
> timestamps when events happened. I only need to know how many events
> happened in the last 24 hours. When an event happens, I put the
> timestamp into the queue (I use the queue module in OTP). When I check
> how many values are in the queue, I also remove the entries that are
> older than 24 hours. My problem is: when I remove the old elements, I
> can do it one by one and at each step the (possibly big) queue gets
> copied and I run out of memory.

I am curious: how do you manage to do this? Can we see the code?
(Certainly it shouldn't be the case that much copying should be done at each step.)

Ronny Meeus

unread,
Jul 24, 2012, 2:52:07 PM7/24/12
to Erik Søe Sørensen, erlang-questions
A possibility is to count the number of elements that have to be
present in the new list by just iterating over it.
Once you know the number of elements, the queue can be split in 2.

----
Ronny

Knut Nesheim

unread,
Jul 24, 2012, 5:09:05 PM7/24/12
to Attila Rajmund Nohl, erlang-questions
On Tue, Jul 24, 2012 at 5:11 PM, Attila Rajmund Nohl
<attila...@gmail.com> wrote:
> Hello!
>
> I have a data structure where I manage "historical data": a list of
> timestamps when events happened. I only need to know how many events
> happened in the last 24 hours.

Using an ordered_set ETS-table and iterating over the table to count
and delete events will be very fast and not explode your memory usage.
You can also enable compression on the table. ETS allows concurrent
writes which removes a potential disaster if you are using a single
process in charge of the queue (with many events, this process might
get overloaded).

However, if you are only interested in the count of events, maybe you
don't need to store all events in full? If you are ok with losing some
granularity, you can use one "bucket" per minute or second or whatever
for the last 24 hours. In the bucket you keep the count in that time
period, incrementing it for every event. Finding the current bucket
can be done using "Now rem NumBuckets" where Now has the granularity
you need. The count for the last 24 hours is the sum of all the
elements in the table.

If you use an ETS table to keep the count (ets:update_counter/3 is
very nice for this), you can reset the count by doing a decrement of
the current bucket every minute or second by fetching the current
count, then decrementing with that value (this would work even with
interleaved operations.) If you are able to use a single process in
charge of getting the count polling-style, you could copy the buckets
to a second table at every polling interval. The diff between the two
tables would be the number of events since last poll.

With this solution, the space required is determined by the
granularity. Finding the sum is a traversal away. Pruning can be
fiddly, but requires no CPU or memory intensive work. To me it sounds
good on paper, maybe there are some obstacles I didn't think of. Maybe
there would be very high write lock contention on the single element
to update, maybe ets:update_counter/3 still performs very well.

For problems like these, pushing the requirements might be very
helpful. If you could do a good sample of the events, you can still
get an accurate answer when you ask for the count and keep your
current approach.

Regards
Knut

Richard O'Keefe

unread,
Jul 24, 2012, 8:58:34 PM7/24/12
to Erik Søe Sørensen, erlang-questions
You have a collection of {Time_Stamp, Data} items covering the
last 24 hours. You want to add such pairs, and you also want
to remove old pairs of this kind.

Some kind of balanced search tree such as an AVL
tree or 2-3 tree can do this in O(lg n) time per
updated, where n is the number of pairs in your
collection.

A very simple hack would be to divide a sequence into
some number of buckets where the first and last are
special, so we have
{Old,Middling,New}
New is a list running from newest to oldest within
the current bucket. To add a new event:

add_event(When, What, {Old,Middling,New}) ->
{Old,Middling,[{When,What}|New]}.

Old is a list running from oldest to newest within
the oldest bucket.

To remove old events:

remove_old_events(Now, {Old,Middling,New}) ->
Bound = <| however you calculate Now - 24 hours |>,
{remove_expired(Bound, Old),Middling,New}.

remove_expired(Bound, [{When,What}|Old])
when When =< Bound ->
remove_expired(Bound, Old);
remove_expired(_, Old) ->
Old.

One extra step is to shift the buckets when the hour
strikes:

hourly_shift({_,{A,B,C,D,E,F,G,H,I,J,K,L,M,N,
O,P,Q,R,S,T,U,V,W,X},New}) ->
{A,{B,C,D,E,F,G,H,I,J,K,L,M,N,
O,P,Q,R,S,T,U,V,W,X,lists:reverse(New)},[]}.

To walk over the elements in increasing order:

foldl(Fun, Acc, {Old,{A,B,C,D,E,F,G,I,J,K,L,M,N,
O,P,Q,R,S,T,U,V,W,X},New}) ->
Acc_0 = foldl(Fun, Acc, Old),
Acc_1 = foldl(Fun, Acc_1, A),
...
Acc24 = foldl(Fun, Acc23, X),
foldl(Fun, Acc24, lists:reverse(New)).

Whether this is a good idea depends on what else you want
to do. It also matters how many events you expect to be
in the data structure.

Oh yes, to keep track of how many events there are,
use
{Count,Old,Middling,New}
with the other changes to the code being obvious.

Attila Rajmund Nohl

unread,
Jul 25, 2012, 5:57:14 AM7/25/12
to Erik Søe Sørensen, erlang-questions
2012/7/24 Erik Søe Sørensen <eri...@gmail.com>:
>
> Den 24/07/2012 17.11 skrev "Attila Rajmund Nohl" <attila...@gmail.com>:
>
>
>>
>> Hello!
>>
>> I have a data structure where I manage "historical data": a list of
>> timestamps when events happened. I only need to know how many events
>> happened in the last 24 hours. When an event happens, I put the
>> timestamp into the queue (I use the queue module in OTP). When I check
>> how many values are in the queue, I also remove the entries that are
>> older than 24 hours. My problem is: when I remove the old elements, I
>> can do it one by one and at each step the (possibly big) queue gets
>> copied and I run out of memory.
> I am curious: how do you manage to do this? Can we see the code?
> (Certainly it shouldn't be the case that much copying should be done at each
> step.)

I can show an excerpt from the code:

increase_counter(#ch{queue=Q, counter_value=CV}=CH, Value) ->
Date = ?MODULE:get_date(),
NewQueue=queue:in({Date, Value}, Q),
CH#ch{queue=NewQueue, counter_value=CV+Value}.

get_value(#ch{hist_len=HL}=CH) ->
% get the values from the queue until we reach a first element in the queue
% which is not older then the hist_len (e.g. not older than 15 mins)
Date = ?MODULE:get_date(),
get_younger_than(CH, Date-HL).

get_younger_than(#ch{queue=Q, counter_value=CV}=CH, TargetDate) ->
case queue:out(Q) of
{empty, _} ->
0 = CV, % this is an assert, the value shall be 0
{CH, 0};
{{value, {Date, Value}}, NewQ} when Date < TargetDate ->
get_younger_than(CH#ch{queue=NewQ, counter_value=CV-Value}, TargetDate);
{{value, _}, _} ->
% the currently removed entry is too young, so leave that in the
% queue (i.e. use the parameter queue).
{CH, CV}
end.

The copying is done in the queue:out call, because it returns not just
the first value, but the rest of the queu (newQ) too.

Richard Carlsson

unread,
Jul 25, 2012, 8:38:07 AM7/25/12
to erlang-q...@erlang.org, attila.r.nohl@gmail.com >> Attila Rajmund Nohl
You'd only run out of space if the queue is so long that you don't have
memory enough to call lists:reverse() on the current in-half of the
queue (and after that, the old queue will immediately become garbage).
Can that really be the case?

But you should be able to improve the behaviour of the above code by
using queue:peek_r(Q) instead of queue:out(Q), and then call
queue:drop_r(Q) in the second case clause only, so no rewriting of the
queue representation is done unless necessary.

/Richard

Erik Søe Sørensen

unread,
Jul 25, 2012, 9:01:32 AM7/25/12
to Attila Rajmund Nohl, erlang-questions

That doesn't copy the entire queue. That is, occasionally it recreates the *spine* of a new list the length of the entire queue, but normally it does way less than that.
The problem is that the way your code works, that worst-case is every time, and the throwing-away of old entries is also done from the start each time: your get_value() does not return the pruned queue.
A further consequence of that is that Richard's statement doesn't hold: because you keep a reference (apparently) to the original, unpruned queue, the memory will have to hold both list spines simultaneously.
So I'd suggest you change the return value of get_value.

Richard Carlsson

unread,
Jul 25, 2012, 9:21:00 AM7/25/12
to Erik Søe Sørensen, erlang-questions
I might be a bit tired, but where does he still hold a reference to the
original queue (unless somewhere outside the shown code)? And if so, is
the problem really that he can't hold 2 copies of the queue at the same
time? - In that case he's pushing the limits anyway. As far as I can
see, get_value() tail calls to get_younger_than(), so no references are
kept there, and get_younger_than() keeps tailcalling itself with the
updated queue in its #ch-record, dropping the previous queue. The final
#ch-record gets returned to the caller. Only the result of the last call
to queue:out() does any wasted rewriting.

/Richard

Attila Rajmund Nohl

unread,
Jul 25, 2012, 9:44:35 AM7/25/12
to Richard Carlsson, erlang-q...@erlang.org
2012/7/25 Richard Carlsson <carlsson...@gmail.com>:
[...]
> You'd only run out of space if the queue is so long that you don't have
> memory enough to call lists:reverse() on the current in-half of the queue
> (and after that, the old queue will immediately become garbage). Can that
> really be the case?

It might happen. According to the crash dump the process in question uses
Stack+heap: 228065700

words of memory. I store a tuple of bigint and a small integer in the
queue, and the queue is essentially a list (actually two lists, but
doesn't matter), so one entry in the queue costs 2+3(?)+1+1=7 words.
The process has some other stuff in its state, but that should be
negligible. The erlang VM run for less than 13 hours. If I calculate
correctly, this means that there should be around 600 events per
second though 13 hours to reach this amount of memory. After some
internal conversation I think it is possible that due to an unrelated
bug I do get this much events.

> But you should be able to improve the behaviour of the above code by using
> queue:peek_r(Q) instead of queue:out(Q), and then call queue:drop_r(Q) in
> the second case clause only, so no rewriting of the queue representation is
> done unless necessary.

Thanks.

Richard Carlsson

unread,
Jul 25, 2012, 10:18:06 AM7/25/12
to Attila Rajmund Nohl, erlang-q...@erlang.org
On 07/25/2012 03:44 PM, Attila Rajmund Nohl wrote:
> 2012/7/25 Richard Carlsson <carlsson...@gmail.com>:
> [...]
>> You'd only run out of space if the queue is so long that you don't have
>> memory enough to call lists:reverse() on the current in-half of the queue
>> (and after that, the old queue will immediately become garbage). Can that
>> really be the case?
>
> It might happen. According to the crash dump the process in question uses
> Stack+heap: 228065700
>
> words of memory. I store a tuple of bigint and a small integer in the
> queue, and the queue is essentially a list (actually two lists, but
> doesn't matter), so one entry in the queue costs 2+3(?)+1+1=7 words.
> The process has some other stuff in its state, but that should be
> negligible. The erlang VM run for less than 13 hours. If I calculate
> correctly, this means that there should be around 600 events per
> second though 13 hours to reach this amount of memory. After some
> internal conversation I think it is possible that due to an unrelated
> bug I do get this much events.

Probably what you should use for a case like this is an ordered_set ets
table, with the date as key and ets:last() to pick the oldest entry. The
queue module is nondestructive and is nice in many ways, but it relies
on amortized behaviour, and huge queues like yours make the intermittent
periods of heavy internal work stick out like a sore thumb. The
reversing of enormous lists isn't any good for responsiveness either,
even if you had the memory. If you want to stick with a functional
implementation, the gb_trees module will work pretty well, using
gb_trees:smallest() to peek at the oldest entry.

/Richard

Erik Søe Sørensen

unread,
Jul 26, 2012, 4:10:33 AM7/26/12
to Richard Carlsson, erlang-questions

And I may be assuming too much by thinking that get_value is called by a server loop which keeps holding on to the queue. Sorry I wasn't explicit about that; if it wasn't so, then what I said didn't make much sense.
(It's vacation time and email-by-phone time, so my editing facilities are a bit limited...)

Reply all
Reply to author
Forward
0 new messages