Task.Supervisor.async_stream
async_stream
, go back to Task.Supervisor module for more information.
async_stream(supervisor, enumerable, fun, options \\ [])
View Source (since 1.4.0)Specs
async_stream( Supervisor.supervisor(), Enumerable.t(), (term() -> term()), keyword() ) :: Enumerable.t()
Returns a stream that runs the given function fun
concurrently
on each element in enumerable
.
Each element in enumerable
is passed as argument to the given function fun
and processed by its own task. The tasks will be spawned under the given
supervisor
and linked to the current process, similarly to async/2
.
See async_stream/6
for discussion, options, and examples.
async_stream(supervisor, enumerable, module, function, args, options \\ [])
View Source (since 1.4.0)Specs
async_stream( Supervisor.supervisor(), Enumerable.t(), module(), atom(), [term()], keyword() ) :: Enumerable.t()
Returns a stream where the given function (module
and function
)
is mapped concurrently on each element in enumerable
.
Each element will be prepended to the given args
and processed by its
own task. The tasks will be spawned under the given supervisor
and
linked to the current process, similarly to async/4
.
When streamed, each task will emit {:ok, value}
upon successful
completion or {:exit, reason}
if the caller is trapping exits.
The order of results depends on the value of the :ordered
option.
The level of concurrency and the time tasks are allowed to run can be controlled via options (see the "Options" section below).
If you find yourself trapping exits to handle exits inside
the async stream, consider using async_stream_nolink/6
to start tasks
that are not linked to the calling process.
Options
:max_concurrency
- sets the maximum number of tasks to run at the same time. Defaults toSystem.schedulers_online/0
.:ordered
- whether the results should be returned in the same order as the input stream. This option is useful when you have large streams and don't want to buffer results before they are delivered. This is also useful when you're using the tasks for side effects. Defaults totrue
.:timeout
- the maximum amount of time to wait (in milliseconds) without receiving a task reply (across all running tasks). Defaults to5000
.:on_timeout
- what do to when a task times out. The possible values are::exit
(default) - the process that spawned the tasks exits.:kill_task
- the task that timed out is killed. The value emitted for that task is{:exit, :timeout}
.
:shutdown
-:brutal_kill
if the tasks must be killed directly on shutdown or an integer indicating the timeout value. Defaults to5000
milliseconds.
Examples
Let's build a stream and then enumerate it:
stream = Task.Supervisor.async_stream(MySupervisor, collection, Mod, :expensive_fun, [])
Enum.to_list(stream)