%%% @copyright (C) 2013, Sergey Prokhorov
%%% @doc
%%% Pub-Sub queue, where each uniq element can enter only once.
%%% Useful for recursive website scraping.
%%% TODO: add 'namespaces'
%%% @end
%%% Created : 5 Nov 2013 by Sergey Prokhorov <me****seriyps.ru>
-module(ec_uniq_queue).
-behaviour(gen_server).
-compile([{parse_transform, lager_transform}]).
-export([start_link/1, listen/0, push/1, push_all/1, ack/1,
status/0, reset/0, reset/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state,
{known_tab :: ets:tid(),
jobs_queue :: queue(),
listeners :: [pid()],
unacked :: dict()}).
-define(SERVER, ?MODULE).
-define(TBL, awd_queue_known_tab).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
start_link(Params) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, Params, []).
% subscribe to new task
listen() ->
gen_server:cast(?SERVER, {listen, self()}).
% acknowlege, that task was done successfully
ack(Ref) ->
gen_server:cast(?SERVER, {ack, Ref}).
% push new task {UniqKey, Value}. UniqKey is probably normalized URL
-spec push({any(), any()}) -> ok | false.
push(Item) ->
push_all([Item]).
-spec push_all([{Key::any(), Value::any()}]) -> ok | false.
push_all([]) ->
false;
push_all(Items) ->
Items1 = store_filter_unknown(Items, ?TBL),
case Items1 of
[] -> false;
_ ->
gen_server:cast(?SERVER, {push_all, Items1})
end.
status() ->
gen_server:call(?SERVER, status).
reset() ->
reset([jobs, listeners, known]).
-spec reset([jobs | listeners | known]) -> ok.
reset(What) ->
gen_server:cast(?SERVER, {reset, What}).
%% Callbacks
init([{init_items, Items}]) ->
S1 = store_init(),
S2 = store_push_all(Items, S1),
{ok, S2}.
handle_call(status, _From, State) ->
Resp = [{Key, store_status(Key, State)}
|| Key <- [n_jobs,
n_listeners,
n_known,
n_unacked]],
{reply, Resp, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({push_all, Items}, State) ->
State1 = store_push_all(Items, State),
{noreply, State1};
handle_cast({listen, Who}, State) ->
State1 = store_listen(Who, State),
{noreply, State1};
handle_cast({ack, Ref}, State) ->
State1 = store_ack(Ref, State), %XXX: maybe ack should do listen automatically?
{noreply, State1};
handle_cast({reset, What}, State) ->
State1 = store_reset(What, State),
{noreply, State1};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(dump, State) ->
%% TODO: implement
store_dump(State),
{noreply, State};
handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, State) ->
State1 = store_worker_down(MonitorRef, State),
{noreply, State1};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
store_init() ->
Tid = ets:new(?TBL, [named_table,
protected,
set,
{keypos, 1},
{read_concurrency, true}]),
#state{known_tab=Tid,
jobs_queue=queue:new(),
listeners=[],
unacked=dict:new()}.
store_reset([jobs | Reset], S) ->
store_reset(Reset, S#state{jobs_queue=queue:new()});
store_reset([listeners | Reset], S) ->
store_reset(Reset, S#state{listeners=[]});
store_reset([known | Reset], #state{known_tab=Tid}=S) ->
ets:delete_all_objects(Tid),
store_reset(Reset, S);
store_reset([], S) ->
S.
store_filter_unknown([], _Tab) ->
[];
store_filter_unknown([{Key, _}=Item | Items], Tab) ->
case ets:lookup(Tab, Key) of
[] ->
[Item | store_filter_unknown(Items, Tab)];
[_] ->
store_filter_unknown(Items, Tab)
end.
store_push_all(Items, #state{jobs_queue=Q}=S) ->
case ets:insert_new(?TBL, [{Key} || {Key, _} <- Items]) of
true ->
NewItems = queue:from_list(Items),
NewQ = queue:join(Q, NewItems),
store_check_invariant(S#state{jobs_queue=NewQ});
false ->
lager:warning("Refine items in store: ~p", [length(Items)]),
store_push_all(store_filter_unknown(Items, ?TBL), S)
end.
store_listen(Who, #state{listeners=L}=S) ->
store_check_invariant(S#state{listeners=[Who | L]}).
store_ack(Ref, #state{unacked=Unacked}=S) ->
NewUnacked = dict:erase(Ref, Unacked),
erlang:demonitor(Ref),
S#state{unacked=NewUnacked}.
store_worker_down(Ref, #state{unacked=Unacked, jobs_queue=Q}=S) ->
case dict:find(Ref, Unacked) of
error -> S;
{ok, Job} ->
Q1 = queue:in_r(Job, Q),
Unacked1 = dict:erase(Ref, Unacked),
store_check_invariant(S#state{unacked=Unacked1, jobs_queue=Q1})
end.
store_check_invariant(#state{listeners=L, jobs_queue=Q, unacked=Unacked}=S) ->
case {L, queue:out(Q)} of
{[], _} ->
S;
{_, {empty, _}} ->
S;
{[Pid | L1], {{value, {Key, Payload}=Job}, Q1}} ->
case is_process_alive(Pid) of
false ->
% don't pass jobs to dead workers. This check can be omitted,
% because worker will be monitored anyway.
% This is just optimization.
store_check_invariant(S#state{listeners=L1});
true ->
Ref = erlang:monitor(process, Pid),
Pid ! {ec_uniq_queue_msg, Ref, Key, Payload},
Unacked1 = dict:store(Ref, Job, Unacked),
store_check_invariant(S#state{listeners=L1,
jobs_queue=Q1,
unacked=Unacked1})
end
end.
store_dump(#state{jobs_queue=Q}) ->
EtsFile = "",
QueueFile = "",
ok = ets:tab2file(?TBL, EtsFile),
ok = file:write_file(QueueFile, term_to_binary(Q)).
store_status(n_known, #state{known_tab=Tid}) ->
Info = ets:info(Tid),
proplists:get_value(size, Info);
store_status(Key, S) ->
case Key of
n_jobs -> queue:len(S#state.jobs_queue);
n_listeners -> length(S#state.listeners);
n_unacked -> dict:size(S#state.unacked)
end.
-ifdef(TEST).
store_monitoring_test() ->
S = store_init(),
Pid = spawn(fun() ->
receive stop -> ok end
end),
S1 = store_listen(Pid, S),
?assertEqual(1, store_status(n_listeners, S1)),
S2 = store_push_all([{1, 1}, {2, 2}], S1),
io:format(user, "~p~n", [S2]),
?assertEqual(0, store_status(n_listeners, S2)),
?assertEqual(2, store_status(n_known, S2)),
?assertEqual(1, store_status(n_unacked, S2)),
?assertEqual(1, store_status(n_jobs, S2)),
Pid ! stop,
Ref = receive {'DOWN', MonitorRef, process, Pid, _Info} -> MonitorRef end,
S3 = store_worker_down(Ref, S2),
?assertEqual(0, store_status(n_listeners, S3)),
?assertEqual(2, store_status(n_known, S3)),
?assertEqual(0, store_status(n_unacked, S3)),
?assertEqual(2, store_status(n_jobs, S3)).
-endif.