Обход дерева в Эрланг.

175 views
Skip to first unread message

Evgen Polivoda

unread,
Mar 24, 2015, 5:45:05 PM3/24/15
to erlang-...@googlegroups.com
Здравствуйте друзья, 
озадачился немного вопросом. Как правильно сделать обход дерева и получить при этом результат используя все преимущества Эрланга.
Например. Генератор Sitemap сайта.
Задаем начальные условия - стартовать с такого то URL и глубина такая то. 
Классно сделать парсинг каждой страницы отдельным процессом - будет быстро и круто.
Но если парсинг страницы  - это отдельный процесс, то куда передавать результат(ссылки)? в отдельный процесс для результатов или вышестоящему процессу который меня родил(если он конечно не умер)?
Парсинг получается каскадным, одна страница за собой тянет еще n страниц. А как мне узнать когда собственно парсинг закончится, потому как по факту запускается тысяча процессов, отрабатывают свое и уходят.

То есть вот такой затык с логикой, прошу прощения за сумбурность.  Задача для меня представляет чисто академический интерес.
 

Sergey Prokhorov

unread,
Mar 24, 2015, 6:21:51 PM3/24/15
to erlang-...@googlegroups.com
Я делал паука, который рекурсивно собирал все страницы с сайта. Делал фиксироваанное количество воркеров + ETS таблица для контроля дубликатов,  очередь в отдельном процессе (обычная очередь из модуля queue).
Воркер забирает из очереди URL - этот URL кладется в dict "таски в работе". Процесс очереди вешает на воркера монитор. Если воркер упал, то этот URL из дикта добавляется обратно в очередь. После обработки страницы воркер берет все найденные на ней URL-ы, нормализует их, отсеивает по ETS таблице те, которые уже известны и добавляет новые в очередь. Потом забирает из очереди следующее задание.

среда, 25 марта 2015 г., 0:45:05 UTC+3 пользователь Evgen Polivoda написал:

Sergey Prokhorov

unread,
Mar 24, 2015, 6:27:53 PM3/24/15
to erlang-...@googlegroups.com
А, впрочем, скину сам код этой очереди:

%%% @author Sergey Prokhorov <me****seriyps.ru>
%%% @copyright (C) 2013, Sergey Prokhorov
%%% @doc
%%% Pub-Sub queue, where each uniq element can enter only once.
%%% Useful for recursive website scraping.
%%% TODO: add 'namespaces'
%%% @end
%%% Created :  5 Nov 2013 by Sergey Prokhorov <me****seriyps.ru>

-module(ec_uniq_queue).
-behaviour(gen_server).
-compile([{parse_transform, lager_transform}]).

-export([start_link/1, listen/0, push/1, push_all/1, ack/1,
         status/0, reset/0, reset/1]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).


-record(state,
        {known_tab :: ets:tid(),
         jobs_queue :: queue(),
         listeners :: [pid()],
         unacked :: dict()}).

-define(SERVER, ?MODULE).
-define(TBL, awd_queue_known_tab).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

start_link(Params) ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, Params, []).

% subscribe to new task
listen() ->
    gen_server:cast(?SERVER, {listen, self()}).

% acknowlege, that task was done successfully
ack(Ref) ->
    gen_server:cast(?SERVER, {ack, Ref}).

% push new task {UniqKey, Value}. UniqKey is probably normalized URL
-spec push({any(), any()}) -> ok | false.
push(Item) ->
    push_all([Item]).

-spec push_all([{Key::any(), Value::any()}]) -> ok | false.
push_all([]) ->
    false;
push_all(Items) ->
    Items1 = store_filter_unknown(Items, ?TBL),
    case Items1 of
        [] -> false;
        _ ->
            gen_server:cast(?SERVER, {push_all, Items1})
    end.

status() ->
    gen_server:call(?SERVER, status).


reset() ->
    reset([jobs, listeners, known]).

-spec reset([jobs | listeners | known]) -> ok.
reset(What) ->
    gen_server:cast(?SERVER, {reset, What}).


%% Callbacks
init([{init_items, Items}]) ->
    S1 = store_init(),
    S2 = store_push_all(Items, S1),
    {ok, S2}.

handle_call(status, _From, State) ->
    Resp =  [{Key, store_status(Key, State)}
               || Key <- [n_jobs,
                          n_listeners,
                          n_known,
                          n_unacked]],
    {reply, Resp, State};
handle_call(_Request, _From, State) ->
    {reply, ok, State}.

handle_cast({push_all, Items}, State) ->
    State1 = store_push_all(Items, State),
    {noreply, State1};
handle_cast({listen, Who}, State) ->
    State1 = store_listen(Who, State),
    {noreply, State1};
handle_cast({ack, Ref}, State) ->
    State1 = store_ack(Ref, State),             %XXX: maybe ack should do listen automatically?
    {noreply, State1};
handle_cast({reset, What}, State) ->
    State1 = store_reset(What, State),
    {noreply, State1};
handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info(dump, State) ->
    %% TODO: implement
    store_dump(State),
    {noreply, State};
handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, State) ->
    State1 = store_worker_down(MonitorRef, State),
    {noreply, State1};
handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------

store_init() ->
    Tid = ets:new(?TBL, [named_table,
                         protected,
                         set,
                         {keypos, 1},
                         {read_concurrency, true}]),
    #state{known_tab=Tid,
           jobs_queue=queue:new(),
           listeners=[],
           unacked=dict:new()}.

store_reset([jobs | Reset], S) ->
    store_reset(Reset, S#state{jobs_queue=queue:new()});
store_reset([listeners | Reset], S) ->
    store_reset(Reset, S#state{listeners=[]});
store_reset([known | Reset], #state{known_tab=Tid}=S) ->
    ets:delete_all_objects(Tid),
    store_reset(Reset, S);
store_reset([], S) ->
    S.

store_filter_unknown([], _Tab) ->
    [];
store_filter_unknown([{Key, _}=Item | Items], Tab) ->
    case ets:lookup(Tab, Key) of
        [] ->
            [Item | store_filter_unknown(Items, Tab)];
        [_] ->
            store_filter_unknown(Items, Tab)
    end.

store_push_all(Items, #state{jobs_queue=Q}=S) ->
    case ets:insert_new(?TBL, [{Key} || {Key, _} <- Items]) of
        true ->
            NewItems = queue:from_list(Items),
            NewQ = queue:join(Q, NewItems),
            store_check_invariant(S#state{jobs_queue=NewQ});
        false ->
            lager:warning("Refine items in store: ~p", [length(Items)]),
            store_push_all(store_filter_unknown(Items, ?TBL), S)
    end.

store_listen(Who, #state{listeners=L}=S) ->
    store_check_invariant(S#state{listeners=[Who | L]}).

store_ack(Ref, #state{unacked=Unacked}=S) ->
    NewUnacked = dict:erase(Ref, Unacked),
    erlang:demonitor(Ref),
    S#state{unacked=NewUnacked}.

store_worker_down(Ref, #state{unacked=Unacked, jobs_queue=Q}=S) ->
    case dict:find(Ref, Unacked) of
        error -> S;
        {ok, Job} ->
            Q1 = queue:in_r(Job, Q),
            Unacked1 = dict:erase(Ref, Unacked),
            store_check_invariant(S#state{unacked=Unacked1, jobs_queue=Q1})
    end.

store_check_invariant(#state{listeners=L, jobs_queue=Q, unacked=Unacked}=S) ->
    case {L, queue:out(Q)} of
        {[], _} ->
            S;
        {_, {empty, _}} ->
            S;
        {[Pid | L1], {{value, {Key, Payload}=Job}, Q1}} ->
            case is_process_alive(Pid) of
                false ->
                    % don't pass jobs to dead workers. This check can be omitted,
                    % because worker will be monitored anyway.
                    % This is just optimization.
                    store_check_invariant(S#state{listeners=L1});
                true ->
                    Ref = erlang:monitor(process, Pid),
                    Pid ! {ec_uniq_queue_msg, Ref, Key, Payload},
                    Unacked1 = dict:store(Ref, Job, Unacked),
                    store_check_invariant(S#state{listeners=L1,
                                                  jobs_queue=Q1,
                                                  unacked=Unacked1})
            end
    end.

store_dump(#state{jobs_queue=Q}) ->
    EtsFile = "",
    QueueFile = "",
    ok = ets:tab2file(?TBL, EtsFile),
    ok = file:write_file(QueueFile, term_to_binary(Q)).


store_status(n_known, #state{known_tab=Tid}) ->
    Info = ets:info(Tid),
    proplists:get_value(size, Info);
store_status(Key, S) ->
    case Key of
        n_jobs -> queue:len(S#state.jobs_queue);
        n_listeners -> length(S#state.listeners);
        n_unacked -> dict:size(S#state.unacked)
    end.


-ifdef(TEST).
store_monitoring_test() ->
    S = store_init(),
    Pid = spawn(fun() ->
                        receive stop -> ok end
                end),
    S1 = store_listen(Pid, S),
    ?assertEqual(1, store_status(n_listeners, S1)),
    S2 = store_push_all([{1, 1}, {2, 2}], S1),
    io:format(user, "~p~n", [S2]),
    ?assertEqual(0, store_status(n_listeners, S2)),
    ?assertEqual(2, store_status(n_known, S2)),
    ?assertEqual(1, store_status(n_unacked, S2)),
    ?assertEqual(1, store_status(n_jobs, S2)),
    Pid ! stop,
    Ref = receive {'DOWN', MonitorRef, process, Pid, _Info} -> MonitorRef end,
    S3 = store_worker_down(Ref, S2),
    ?assertEqual(0, store_status(n_listeners, S3)),
    ?assertEqual(2, store_status(n_known, S3)),
    ?assertEqual(0, store_status(n_unacked, S3)),
    ?assertEqual(2, store_status(n_jobs, S3)).
-endif.




среда, 25 марта 2015 г., 0:45:05 UTC+3 пользователь Evgen Polivoda написал:
Здравствуйте друзья, 

Max Lapshin

unread,
Mar 25, 2015, 2:28:10 AM3/25/15
to erlang-...@googlegroups.com
Мне кажется, что для этой задачи обработка её деревом как раз антипаттерн.

Подобные вещи надо решать пулом из N воркеров, что бы они работали быстро, но не мешая друг другу. 

ets или какой-то дисковый сторадж — самое оно.

zheka_13

unread,
Mar 25, 2015, 4:30:03 AM3/25/15
to erlang-...@googlegroups.com
Спасибо большое за ответы. Возникли вопросы. 
1. Сергей, как вы узнаете что паук собрал наконец таки все страницы и можно с ними что то сделать. Мониторите длину очереди?
2. Я сам что то наподобие такого придумал. Но душа просит - проще. Слишком сложно. 
3. Пул из фиксированного  числа воркеров чет не нравится. Больше нравится идея   - процессы рождаются отрабатывают задачу и умирают по мере надобности, но количество их не превышает константного порогового значения.
4. Если немного конкретнее оформить то что меня интересует: есть дерево с узлами, каждый узел дерева обрабатывается отдельным процессом эрланга, как узнать что обработка всего дерева завершена? Это не совсем рекурсия - это как бы паралельная рекурсия. Можно например мониторить пиды потомков и если их всех уже нет то умирать самому и так по цепочке вверх?
  

Max Lapshin

unread,
Mar 25, 2015, 4:42:40 AM3/25/15
to erlang-...@googlegroups.com
У вас один процесс скачает страничку, на которой 10 тыс линков и вы запустите 10 тыс процессов, которые начнут что-то лопатить.

Они отработают медленнее, чем 20 процессов по 500 страниц каждый.

Alexander Tchitchigin

unread,
Mar 25, 2015, 7:12:34 AM3/25/15
to erlang-...@googlegroups.com
2015-03-25 11:30 GMT+03:00 zheka_13 <pol...@gmail.com>:
Спасибо большое за ответы. Возникли вопросы. 

3. Пул из фиксированного  числа воркеров чет не нравится. Больше нравится идея   - процессы рождаются отрабатывают задачу и умирают по мере надобности, но количество их не превышает константного порогового значения.

Процесс, который ждёт очередного задания и ничего не делает ничем не хуже мёртвого процесса - ни тот, ни другой ресурсов не потребляют. Но ожидающий лучше тем, что стартует быстрее.

 
4. Если немного конкретнее оформить то что меня интересует: есть дерево с узлами, каждый узел дерева обрабатывается отдельным процессом эрланга, как узнать что обработка всего дерева завершена? Это не совсем рекурсия - это как бы паралельная рекурсия. Можно например мониторить пиды потомков и если их всех уже нет то умирать самому и так по цепочке вверх?
 
Стандартная практика - ждать сообщений о завершении работы от дочерних процессов. После получения нужного количества сообщений о завершении - посылать сообщение о завершении своему родителю и умирать.

--
С уважением,
Александр.

zheka_13

unread,
Mar 25, 2015, 8:01:23 AM3/25/15
to erlang-...@googlegroups.com
Хм. тогда получается нестыковочка между 3 и 4
3. Но ожидающий лучше тем, что стартует быстрее.Значит воркеры не должны умирать.
4. Стандартная практика - ждать сообщений о завершении работы от дочерних процессов. После получения нужного количества сообщений о завершении - посылать сообщение о завершении своему родителю и умирать.

как тогда сделать контроль того что дерево обошли и результат готов если никто не умирает. причем изначально и дерева то нет оно строится по ходу прохода. хм ... интересно. Получается воркеры берут линки из очереди, обрабатывают и если надо то в очередь помещают следующие линки, потом их обрабатывают и т. д. а когда ж конец? 

25 марта 2015 г., 13:12 пользователь Alexander Tchitchigin <sad....@gmail.com> написал:

--
Вы получили это сообщение, поскольку подписаны на группу "Erlang по-русски".
Чтобы отменить подписку на эту группу и больше не получать от нее сообщения, отправьте письмо на электронный адрес erlang-russia...@googlegroups.com.
Чтобы отправлять сообщения в эту группу, отправьте письмо на электронный адрес erlang-...@googlegroups.com.
Чтобы настроить другие параметры, перейдите по ссылке https://groups.google.com/d/optout.

Alexander Tchitchigin

unread,
Mar 25, 2015, 8:23:21 AM3/25/15
to erlang-...@googlegroups.com
Да, между 3 и 4 нестыковочка, потому что 3 - это про воркеров, а 4 - без воркеров, как Вы просили.

Очевидно, завершать работу пора тогда, когда очередь опустела и никто в неё ничего не добавляет.
Далее два варианта: активный и пассивный.
В активном какой-то процесс рассылает задания воркерам. Если заданий больше нет - значит, можно рапортовать о завершении.
В пассивном воркеры сами забирают задания из очереди. Если очередь пуста - воркер может рапортовать об этом. Если все воркеры отрапортовали, что очередь пуста - никто в неё уже ничего не положит.

--
С уважением,
Александр.

Sergey Prokhorov

unread,
Mar 25, 2015, 7:12:13 PM3/25/15
to erlang-...@googlegroups.com


среда, 25 марта 2015 г., 11:30:03 UTC+3 пользователь Evgen Polivoda написал:
Спасибо большое за ответы. Возникли вопросы. 
1. Сергей, как вы узнаете что паук собрал наконец таки все страницы и можно с ними что то сделать. Мониторите длину очереди?
У меня не было необходимости как-то сигнализировать об окончании сбора всех страниц. Но если бы была необходимость, то, конечно, отлавливал бы этот момент по условию: "длина очереди == 0" && "задач в обработке == 0" и отправлял кому-то об этом сообщение
 
2. Я сам что то наподобие такого придумал. Но душа просит - проще. Слишком сложно. 
По моему самое нормальное решение из возможных. И сервер, с которого странички собираете не положите, и свой сервер не повалите из за роста количества процессов. И в случае чего поставить процесс на паузу просто (приостанавливаете очередь и всё, можно даже сдампить очередь и ETS таблицу на диск и перезапустить VM при необходимости, не теряя прогресса)
 
3. Пул из фиксированного  числа воркеров чет не нравится. Больше нравится идея   - процессы рождаются отрабатывают задачу и умирают по мере надобности, но количество их не превышает константного порогового значения.
А говорили что хотите как проще...
 
4. Если немного конкретнее оформить то что меня интересует: есть дерево с узлами, каждый узел дерева обрабатывается отдельным процессом эрланга, как узнать что обработка всего дерева завершена? Это не совсем рекурсия - это как бы паралельная рекурсия. Можно например мониторить пиды потомков и если их всех уже нет то умирать самому и так по цепочке вверх?
Ну это уж как-нибудь без меня...
Reply all
Reply to author
Forward
0 new messages