Pro Task

0 views
Skip to first unread message

Alix Stocking

unread,
Aug 5, 2024, 3:26:29 AM8/5/24
to chotarirag
Tasksare processes meant to execute one particularaction throughout their lifetime, often with little or nocommunication with other processes. The most common use casefor tasks is to convert sequential code into concurrent codeby computing a value asynchronously:

Tasks spawned with async can be awaited on by their callerprocess (and only their caller) as shown in the example above.They are implemented by spawning a process that sends a messageto the caller once the given computation is performed.


One of the common uses of tasks is to convert sequential codeinto concurrent code with Task.async/1 while keeping its semantics.When invoked, a new process will be created, linked and monitoredby the caller. Once the task action finishes, a message will be sentto the caller with the result.


Async tasks link the caller and the spawned process. Thismeans that, if the caller crashes, the task will crashtoo and vice-versa. This is on purpose: if the processmeant to receive the result no longer exists, there isno purpose in completing the computation. If this is notdesired, you will want to use supervised tasks, describedin a subsequent section.


We encourage developers to rely on supervised tasks as much as possible.Supervised tasks improve the visibility of how many tasks are runningat a given moment and enable a variety of patterns that give youexplicit control on how to handle the results, errors, and timeouts.Here is a summary:


Using Task.Supervisor.async_nolink/2 + Task.yield/2 + Task.shutdown/2allows you to execute tasks concurrently and retrieve their resultsor the reason they failed within a given time frame. If the task fails,the caller won't fail. You will receive the error reason either onyield or shutdown.


Furthermore, the supervisor guarantees all tasks terminate within aconfigurable shutdown period when your application shuts down. See theTask.Supervisor module for details on the supported operations.


Note that, as above, when working with distributed tasks, one should use theTask.Supervisor.async/5 function that expects explicit module, function,and arguments, instead of Task.Supervisor.async/3 that works with anonymousfunctions. That's because anonymous functions expect the same module versionto exist on all involved nodes. Check the Agent module documentation formore information on distributed processes as the limitations described thereapply to the whole ecosystem.


Since these tasks are supervised and not directly linked to the caller,they cannot be awaited on. By default, the functions Task.start/1and Task.start_link/1 are for fire-and-forget tasks, where you don'tcare about the results or if it completes successfully or not.


Opposite to GenServer, Agent and Supervisor, a Task hasa default :restart of :temporary. This means the task willnot be restarted even if it crashes. If you desire the task tobe restarted for non-successful exits, do:


For example, we recommend developers to always start tasks under a supervisor.This provides more visibility and allows you to control how those tasks areterminated when a node shuts down. That might look something likeTask.Supervisor.start_child(MySupervisor, task_function). This meansthat, although your code is the one invoking the task, the actual ancestor ofthe task is the supervisor, as the supervisor is the one effectively starting it.


The list of callers of the current process can be retrieved from the Processdictionary with Process.get(:"$callers"). This will return either nil ora list [pid_n, ..., pid2, pid1] with at least one entry where pid_n isthe PID that called the current process, pid2 called pid_n, and pid2 wascalled by pid1.


If you start an async, you must await. This is either doneby calling Task.await/2 or Task.yield/2 followed byTask.shutdown/2 on the returned task. Alternatively, if youspawn a task inside a GenServer, then the GenServer willautomatically await for you and call GenServer.handle_info/2with the task response and associated :DOWN message.


This function spawns a process that is linked to and monitoredby the caller process. The linking part is important because itaborts the task if the parent process dies. It also guaranteesthe code before async/await has the same properties after youadd the async call. For example, imagine you have this:


As before, if heavy_fun/0 fails, the whole computation willfail, including the caller process. If you don't want the taskto fail then you must change the heavy_fun/0 code in thesame way you would achieve it if you didn't have the async call.For example, to either return :ok, val :error results or,in more extreme cases, by using try/rescue. In other words,an asynchronous task should be thought of as an extension of thecaller process rather than a mechanism to isolate it from all errors.


The task created with this function stores :erlang.apply/2 inits :mfa metadata field, which is used internally to applythe anonymous function. Use async/3 if you want another functionto be used as metadata.


Similar to async/1 except the function to be started isspecified by the given module, function_name, and args.The module, function_name, and its arity are stored asa tuple in the :mfa field for reflection purposes.


Each element of enumerable will be prepended to the given args andprocessed by its own task. Those tasks will be linked to an intermediateprocess that is then linked to the caller process. This means a failurein a task terminates the caller process and a failure in the callerprocess terminates all tasks.


Consider using Task.Supervisor.async_stream/6 to start tasksunder a supervisor. If you find yourself trapping exits to ensureerrors in the tasks do not terminate the caller process, considerusing Task.Supervisor.async_stream_nolink/6 to start tasks thatare not linked to the caller process.


:ordered - whether the results should be returned in the same orderas the input stream. When the output is ordered, Elixir may need tobuffer results to emit them in the original order. Setting this optionto false disables the need to buffer at the cost of removing ordering.This is also useful when you're using the tasks only for the side effects.Note that regardless of what :ordered is set to, the tasks willprocess asynchronously. If you need to process elements in order,consider using Enum.map/2 or Enum.each/2 instead. Defaults to true.


In the example above, we are executing three tasks and waiting for thefirst 2 to complete. We use Stream.filter/2 to restrict ourselves onlyto successfully completed tasks, and then use Enum.take/2 to retrieveN items. Note it is important to set both ordered: false andmax_concurrency: M, where M is the number of tasks, to make sure allcalls execute concurrently.


Running the example above in a machine with 8 cores will process 16 items,even though you want only 10 elements, since async_stream/3 process itemsconcurrently. That's because it will process 8 elements at once. Then all 8elements complete at roughly the same time, causing 8 elements to be kickedoff for processing. Out of these extra 8, only 2 will be used, and the restwill be terminated.


In other cases, you likely want to tweak :max_concurrency to limit howmany elements may be over processed at the cost of reducing concurrency.You can also set the number of elements to take to be a multiple of:max_concurrency. For instance, setting max_concurrency: 5 in theexample above.


A timeout, in milliseconds or :infinity, can be given with a default valueof 5000. If the timeout is exceeded, then the caller process will exit.If the task process is linked to the caller process which is the case whena task is started with async, then the task process will also exit. If thetask process is trapping exits or not linked to the caller process, then itwill continue to run.


This function assumes the task's monitor is still active or the monitor's:DOWN message is in the message queue. If it has been demonitored, or themessage already received, this function will wait for the duration of thetimeout awaiting the message.


:DOWN, ref, :process, pid, reason - since all tasks are alsomonitored, you will also receive the :DOWN message delivered byProcess.monitor/1. If you receive the :DOWN message without aa reply, it means the task crashed


Another consideration to have in mind is that tasks started by Task.async/1are always linked to their callers and you may not want the GenServer tocrash if the task crashes. Therefore, it is preferable to instead useTask.Supervisor.async_nolink/3 inside OTP behaviours. For completeness, hereis an example of a GenServer that start tasks and handles their results:


A timeout, in milliseconds or :infinity, can be given with a default valueof 5000. If the timeout is exceeded, then the caller process will exit.Any task processes that are linked to the caller process (which is the casewhen a task is started with async) will also exit. Any task processes thatare trapping exits or not linked to the caller process will continue to run.


This function assumes the tasks' monitors are still active or the monitor's:DOWN message is in the message queue. If any tasks have been demonitored,or the message already received, this function will wait for the duration ofthe timeout.


In some cases, it is useful to create a "completed" task that representsa task that has already run and generated a result. For example, whenprocessing data you may be able to determine that certain inputs areinvalid before dispatching them for further processing:


In many cases, Task.completed/1 may be avoided in favor of returning theresult directly. You should generally only require this variant when workingwith mixed asynchrony, when a group of inputs will be handled partiallysynchronously and partially asynchronously.


The second argument is either a timeout or :brutal_kill. In caseof a timeout, a :shutdown exit signal is sent to the task processand if it does not exit within the timeout, it is killed. With :brutal_killthe task is killed straight away. In case the task terminates abnormally(possibly killed by another process), this function will exit with the same reason.

3a8082e126
Reply all
Reply to author
Forward
0 new messages