How to know when a multi-spawned process set finish its work?

101 views
Skip to first unread message

Giovanni Giorgi

unread,
Apr 12, 2021, 12:36:16 PM4/12/21
to erlang-q...@erlang.org

Hi all,

 a newbie question here.

I have done a small erlang server following the behavior application, here

https://github.com/daitangio/er_zauker/blob/erlang-24-migration/src/er_zauker_app.erl

To make a long story short, my server scans a set of directories and index files using redis as backend database.

It works well when I runs on small set of files.

But when I run it on a very huge set of files, it seems to "finish" before indexing all the files; when it starts, the client wait until every file is processed and the server can send him a report about the status:

er_zauker_indexer!{self(),directory,"."},
er_zauker_app:wait_worker_done().
 
The relevant part seems correct (see below) but I think I have done a stupid mistake, but I cannot understand where is it.
Where can I find an example for this use case?
 
 
wait_worker_done()->
    waitAllWorkerDone(1,erlang:monotonic_time(second)).

waitAllWorkerDone(RunningWorker,StartTimestamp) when RunningWorker >0 ->
    er_zauker_indexer!{self(),report},
    receive 
    {worker,0} ->
        io:format("All workers done~n~n");
    {worker, RunningGuys, files_processed, TotalFilesDone} ->
        if 
        RunningGuys  /= RunningWorker ->             
            SecondsRunning= erlang:monotonic_time(second)-StartTimestamp,
            FilesSec=TotalFilesDone/SecondsRunning,
            io:format("[~p]s Workers[~p]  Files processed:~p Files/sec: ~p ~n",[SecondsRunning,RunningGuys,TotalFilesDone,FilesSec]),
            timer:sleep(200);
           true -> 
            %% Okey so nothing changed so far...sleep a bit
            timer:sleep(100)
        end,
        %% Master sleep value
        timer:sleep(990),
        waitAllWorkerDone(RunningGuys,StartTimestamp)
    after 5000 ->
        io:format("~n-----------------------------~n"),
        io:format(" Mmmm no info in the last 5 sec... when was running:~p Workers~n",[RunningWorker]),
        io:format(" ?System is stuck? "),
        io:format("------------------------------~n"),
        waitAllWorkerDone(RunningWorker,StartTimestamp)
    end;
waitAllWorkerDone(0,_->
    io:format("All worker Finished").
 
 



--
Giovanni Giorgi via webmail

Giovanni Giorgi

unread,
Apr 13, 2021, 7:44:46 AM4/13/21
to erlang-q...@erlang.org
On 2021-04-13 10:55, Maria Scott wrote:
> Hi Giovanni,
>
>> https://github.com/daitangio/er_zauker/blob/erlang-24-migration/src/er_zauker_app.erl
>
> first, sorry, I have no idea what this actually does and how it does
> it ^^; I only skimmed the code. So what I'm saying here is a bit
> guesswork...
> From what I understand, you have a component that scans a tree on your
> file system and delivers file names. And you have another component
> that starts a process to index the files. Right?
...
> If I read it correctly, you don't wait until _every file_ has been
> _processed_. Instead, you wait until _a certain number of processes
> you are monitoring_ (not necessarily the ones you wait for, you only
> check if there _is_ a 'DOWN' message, not the monitor refs and/or
> pids) has _terminated_ (~line 100), _for whatever reason_ (you don't
> check the Reason for the termination). So some processes might have
> crashed and not done any work at all.
...
> and then a worker process may crash because it cannot open a file. But
> you don't notice because, see above. May that be it?

Yeah,
thank you Maria, I think you are right.

So I suppose my entire code is not well written :(

Where can I find an example of a good code for this use case (even in
the standard OTP library of course, I do not fear to read OTP code :) ?

Maria Scott

unread,
Apr 13, 2021, 10:49:36 AM4/13/21
to Giovanni Giorgi, erlang-q...@erlang.org
Hi Giovanni,

> So I suppose my entire code is not well written :(

Let's say I would maybe do it a bit differently ;)))

> Where can I find an example of a good code for this use case (even in
> the standard OTP library of course, I do not fear to read OTP code :) ?

Hm, sorry, nothing comes readily to mind =^^=

Kind regards,
Maria

die...@schoen.or.at

unread,
Apr 13, 2021, 11:30:05 AM4/13/21
to Giovanni Giorgi, erlang-q...@erlang.org
Hi Giovanni,

I had a quick look into the code, and I think that the messages do not match..

You wait for {worker, 0} in the waitAllWorkerDone function, but I do not think that this message
is generated anywhere.
In that function you send a {self(), report} to the daemon, to which it will respond with 
{worker,RunningWorker,files_processed,FilesProcessed}.
Although RunningWorker can eventually become 0 when all workers exit, so the record starts with
{worker, 0
but there are the other two items which prevent the matching with the expected outcome.
Could this be the issue?

Anyway, thanks for sharing your code, it is always interesting to see how somebody else is tackling a problem!

On a more highlevel view, are you really interesting in intermediate results?
If I wanted only the end result, I think I would use OTP with a gen_server and a synchronous call.

kind regards,
dieter

Giovanni Giorgi

unread,
Apr 13, 2021, 3:14:22 PM4/13/21
to erlang-q...@erlang.org

Hi all,

 my idea was to able to monitor the execution, but I must explore the gen_server+synchronous call in the future.

I was able to fix the bug following Maria suggestion (thank you Maria!).

The failing processes was dying due a redis timeout, probably because I used a redis MULTI/EXEC transaction which can lead to race conditions on the redis side.

I implemented a small database to track down failing processes and respawing... The idea is only to track down the timeout errors and so I changed the server to match "good" and "timeout" DOWN cases like

...

{'DOWN', Reference, process, _Pid, normal} ->
 indexerDaemon(RunningWorker-1,FilesProcessed+1, maps:remove(Reference,MonitorRefMap) );


{'DOWN', Reference, process, Pid, {timeout, Detail}} ->
 %% MMMmm we must assume still files to be processed?
 #{ Reference := FailedFile } = MonitorRefMap,
 io:format("!! Timeout Error on ~p ~n Detail: ~p~n", [FailedFile, {'DOWN', Reference, process, Pid, {timeout, Detail}}]),
 % We suppose a timeout error and we push back 
 % Remove old Reference
 UpdatedRefMap=maps:remove(Reference,MonitorRefMap),
 NewPid=spawn(er_zauker_util, load_file_if_needed,[FailedFile]),
 MonitorRef = erlang:monitor(process,NewPid),
 NewRecoveryRefMap=UpdatedRefMap#{ MonitorRef => FailedFile },
 indexerDaemon(RunningWorker,FilesProcessed,NewRecoveryRefMap);

I do not know if there is some other smart way of doing it.

Thank you for your hints!!

...

Krukoff, John

unread,
Apr 14, 2021, 2:18:26 AM4/14/21
to Giovanni Giorgi, erlang-q...@erlang.org
> Where can I find an example of a good code for this use case (even in the standard OTP library of course, I do not fear to read OTP code :) ?

Hi Giovanni,

FWIW, when I had a similar need for a simple CLI script I wrote long ago, rpc:pmap/3 was a good solution for me:

https://erlang.org/doc/man/rpc.html#pmap-3

which I used for this small utility script:

https://github.com/jkrukoff/fprof-totals/blob/master/src/fprof_totals.erl#L147

Don't know if it qualifies as "good", but it is at least short.

--
Giovanni Giorgi via webmail
*************************************************************************
This e-mail may contain confidential or privileged information.
If you are not the intended recipient, please notify the sender immediately and then delete it.

TIAA
*************************************************************************

Reply all
Reply to author
Forward
0 new messages