15.11. Tasks and Threads🔗
Tasks are the fundamental primitive for writing multi-threaded code.
A Task α
represents a computation that, at some point, will resolve to a value of type α
; it may be computed on a separate thread.
When a task has resolved, its value can be read; attempting to get the value of a task before it resolves causes the current thread to block until the task has resolved.
Tasks are similar to promises in JavaScript, JoinHandle
in Rust, and Future
in Scala.
Tasks may either carry out pure computations or IO
actions.
The API of pure tasks resembles that of thunks: Task.spawn
creates a Task α
from a function in Unit → α
, and Task.get
waits until the function's value has been computed and then returns it.
The value is cached, so subsequent requests do not need to recompute it.
The key difference lies in when the computation occurs: while the values of thunks are not computed until they are forced, tasks execute opportunistically in a separate thread.
Tasks in IO
are created using IO.asTask
.
Similarly, BaseIO.asTask
and EIO.asTask
create tasks in other IO
monads.
These tasks may have side effects, and can communicate with other tasks.
When the last reference to a task is dropped it is cancelled.
Pure tasks created with Task.spawn
are terminated upon cancellation.
Tasks spawned with IO.asTask
, EIO.asTask
, or BaseIO.asTask
continue executing and must explicitly check for cancellation using IO.checkCanceled
.
Tasks may be explicitly cancelled using IO.cancel
.
The Lean runtime maintains a thread pool for running tasks.
The size of the thread pool is determined by the environment variable LEAN_NUM_THREADS
if it is set, or by the number of logical processors on the current machine otherwise.
The size of the thread pool is not a hard limit; in certain situations it may be exceeded to avoid deadlocks.
By default, these threads are used to run tasks; each task has a priority (Task.Priority
), and higher-priority tasks take precedence over lower-priority tasks.
Tasks may also be assigned to dedicated threads by spawning them with a sufficiently high priority.
🔗typeTask.{u} (α : Type u) : Type u
Task α
is a primitive for asynchronous computation.
It represents a computation that will resolve to a value of type α
,
possibly being computed on another thread. This is similar to Future
in Scala,
Promise
in Javascript, and JoinHandle
in Rust.
The tasks have an overridden representation in the runtime.
15.11.1. Creating Tasks
Pure tasks should typically be created with Task.spawn
, as Task.pure
is a task that's already been resolved with the provided value.
Impure tasks are created by one of the asTask
actions.
15.11.1.1. Pure Tasks
Pure tasks may be created outside the IO
family of monads.
They are terminated when the last reference to them is dropped.
🔗defTask.spawn.{u} {α : Type u} (fn : Unit → α)
(prio : Task.Priority :=
Task.Priority.default) :
Task α
spawn fn : Task α
constructs and immediately launches a new task for
evaluating the function fn () : α
asynchronously.
prio
, if provided, is the priority of the task.
🔗Task.pure.{u} {α : Type u} (get : α) : Task α
Task.pure (a : α)
constructs a task that is already resolved with value a
.
15.11.1.2. Impure Tasks
When spawning a task with side effects using one of the asTask
functions, it's important to actually execute the resulting IO
action.
A task is spawned each time the resulting action is executed, not when asTask
is called.
Impure tasks continue running even when there are no references to them, though this does result in cancellation being requested.
Cancellation may also be explicitly requested using IO.cancel
.
The impure task must check for cancellation using IO.checkCanceled
.
🔗opaqueBaseIO.asTask {α : Type} (act : BaseIO α)
(prio : Task.Priority :=
Task.Priority.default) :
BaseIO (Task α)
Runs act
in a separate Task
, with priority prio
.
Running the resulting BaseIO
action causes the task to be started eagerly. Pure accesses to the
Task
do not influence the impure act
.
Unlike pure tasks created by Task.spawn
, tasks created by this function will run even if the last
reference to the task is dropped. The act
should explicitly check for cancellation via
IO.checkCanceled
if it should be terminated or otherwise react to the last reference being
dropped.
🔗defEIO.asTask {ε α : Type} (act : EIO ε α)
(prio : Task.Priority :=
Task.Priority.default) :
BaseIO (Task (Except ε α))
Runs act
in a separate Task
, with priority prio
. Because EIO ε
actions may throw an exception
of type ε
, the result of the task is an Except ε α
.
Running the resulting IO
action causes the task to be started eagerly. Pure accesses to the Task
do not influence the impure act
.
Unlike pure tasks created by Task.spawn
, tasks created by this function will run even if the last
reference to the task is dropped. The act
should explicitly check for cancellation via
IO.checkCanceled
if it should be terminated or otherwise react to the last reference being
dropped.
🔗defIO.asTask {α : Type} (act : IO α)
(prio : Task.Priority :=
Task.Priority.default) :
BaseIO (Task (Except IO.Error α))
Runs act
in a separate Task
, with priority prio
. Because IO
actions may throw an exception
of type IO.Error
, the result of the task is an Except IO.Error α
.
Running the resulting BaseIO
action causes the task to be started eagerly. Pure accesses to the
Task
do not influence the impure act
. Because IO
actions may throw an exception of type
IO.Error
, the result of the task is an Except IO.Error α
.
Unlike pure tasks created by Task.spawn
, tasks created by this function will run even if the last
reference to the task is dropped. The act
should explicitly check for cancellation via
IO.checkCanceled
if it should be terminated or otherwise react to the last reference being
dropped.
15.11.1.3. Priorities
Task priorities are used by the thread scheduler to assign tasks to threads.
Within the priority range default
–max
, higher-priority tasks always take precedence over lower-priority tasks.
Tasks spawned with priority dedicated
are assigned their own dedicated threads and do not contend with other tasks for the threads in the thread pool.
🔗defTask.Priority : Type
Task priority.
Tasks with higher priority will always be scheduled before tasks with lower priority. Tasks with a
priority greater than Task.Priority.max
are scheduled on dedicated threads.
🔗defTask.Priority.default : Task.Priority
The default priority for spawned tasks, also the lowest priority: 0
.
🔗defTask.Priority.max : Task.Priority
The highest regular priority for spawned tasks: 8
.
Spawning a task with a priority higher than Task.Priority.max
is not an error but will spawn a
dedicated worker for the task. This is indicated using Task.Priority.dedicated
. Regular priority
tasks are placed in a thread pool and worked on according to their priority order.
🔗defTask.Priority.dedicated : Task.Priority
Indicates that a task should be scheduled on a dedicated thread.
Any priority higher than Task.Priority.max
will result in the task being scheduled
immediately on a dedicated thread. This is particularly useful for long-running and/or
I/O-bound tasks since Lean will, by default, allocate no more non-dedicated workers
than the number of cores to reduce context switches.
15.11.2. Task Results
🔗defTask.get.{u} {α : Type u} (self : Task α) : α
Blocks the current thread until the given task has finished execution, and then returns the result
of the task. If the current thread is itself executing a (non-dedicated) task, the maximum
threadpool size is temporarily increased by one while waiting so as to ensure the process cannot
be deadlocked by threadpool starvation. Note that when the current thread is unblocked, more tasks
than the configured threadpool size may temporarily be running at the same time until sufficiently
many tasks have finished.
Task.map
and Task.bind
should be preferred over Task.get
for setting up task dependencies
where possible as they do not require temporarily growing the threadpool in this way.
🔗opaqueIO.wait {α : Type} (t : Task α) : BaseIO α
Waits for the task to finish, then returns its result.
🔗opaqueIO.waitAny {α : Type} (tasks : List (Task α))
(h : tasks.length > 0 := by
exact Nat.zero_lt_succ _) :
BaseIO α
Waits until any of the tasks in the list has finished, then return its result.
15.11.3. Sequencing Tasks
These operators create new tasks from old ones.
When possible, it's good to use Task.map
or Task.bind
instead of manually calling Task.get
in a new task because they don't temporarily increase the size of the thread pool.
🔗defTask.map.{u_1, u_2} {α : Type u_1}
{β : Type u_2} (f : α → β) (x : Task α)
(prio : Task.Priority :=
Task.Priority.default)
(sync : Bool := false) : Task β
map f x
maps function f
over the task x
: that is, it constructs
(and immediately launches) a new task which will wait for the value of x
to
be available and then calls f
on the result.
prio
, if provided, is the priority of the task.
If sync
is set to true, f
is executed on the current thread if x
has already finished and
otherwise on the thread that x
finished on. prio
is ignored in this case. This should only be
done when executing f
is cheap and non-blocking.
🔗defTask.bind.{u_1, u_2} {α : Type u_1}
{β : Type u_2} (x : Task α) (f : α → Task β)
(prio : Task.Priority :=
Task.Priority.default)
(sync : Bool := false) : Task β
bind x f
does a monad "bind" operation on the task x
with function f
:
that is, it constructs (and immediately launches) a new task which will wait
for the value of x
to be available and then calls f
on the result,
resulting in a new task which is then run for a result.
prio
, if provided, is the priority of the task.
If sync
is set to true, f
is executed on the current thread if x
has already finished and
otherwise on the thread that x
finished on. prio
is ignored in this case. This should only be
done when executing f
is cheap and non-blocking.
🔗opaqueBaseIO.mapTask.{u_1} {α : Type u_1} {β : Type}
(f : α → BaseIO β) (t : Task α)
(prio : Task.Priority :=
Task.Priority.default)
(sync : Bool := false) : BaseIO (Task β)
Creates a new task that waits for t
to complete and then runs the BaseIO
action f
on its
result. This new task has priority prio
.
Running the resulting BaseIO
action causes the task to be started eagerly. Unlike pure tasks
created by Task.spawn
, tasks created by this function will run even if the last reference to the
task is dropped. The act
should explicitly check for cancellation via IO.checkCanceled
if it
should be terminated or otherwise react to the last reference being dropped.
🔗defEIO.mapTask.{u_1} {α : Type u_1} {ε β : Type}
(f : α → EIO ε β) (t : Task α)
(prio : Task.Priority :=
Task.Priority.default)
(sync : Bool := false) :
BaseIO (Task (Except ε β))
Creates a new task that waits for t
to complete and then runs the IO
action f
on its result.
This new task has priority prio
.
Running the resulting BaseIO
action causes the task to be started eagerly. Unlike pure tasks
created by Task.spawn
, tasks created by this function will run even if the last reference to the
task is dropped. The act
should explicitly check for cancellation via IO.checkCanceled
if it
should be terminated or otherwise react to the last reference being dropped. Because EIO ε
actions
may throw an exception of type ε
, the result of the task is an Except ε α
.
🔗defIO.mapTask.{u_1} {α : Type u_1} {β : Type}
(f : α → IO β) (t : Task α)
(prio : Task.Priority :=
Task.Priority.default)
(sync : Bool := false) :
BaseIO (Task (Except IO.Error β))
Creates a new task that waits for t
to complete and then runs the IO
action f
on its result.
This new task has priority prio
.
Running the resulting BaseIO
action causes the task to be started eagerly. Unlike pure tasks
created by Task.spawn
, tasks created by this function will run even if the last reference to the
task is dropped. The act
should explicitly check for cancellation via IO.checkCanceled
if it
should be terminated or otherwise react to the last reference being dropped. Because IO
actions
may throw an exception of type IO.Error
, the result of the task is an Except IO.Error α
.
🔗defBaseIO.mapTasks.{u_1} {α : Type u_1} {β : Type}
(f : List α → BaseIO β)
(tasks : List (Task α))
(prio : Task.Priority :=
Task.Priority.default)
(sync : Bool := false) : BaseIO (Task β)
Creates a new task that waits for all the tasks in the list tasks
to complete, and then runs the
IO
action f
on their results. This new task has priority prio
.
Running the resulting BaseIO
action causes the task to be started eagerly. Unlike pure tasks
created by Task.spawn
, tasks created by this function will run even if the last reference to the
task is dropped. The act
should explicitly check for cancellation via IO.checkCanceled
if it
should be terminated or otherwise react to the last reference being dropped.
🔗defEIO.mapTasks.{u_1} {α : Type u_1} {ε β : Type}
(f : List α → EIO ε β) (tasks : List (Task α))
(prio : Task.Priority :=
Task.Priority.default)
(sync : Bool := false) :
BaseIO (Task (Except ε β))
Creates a new task that waits for all the tasks in the list tasks
to complete, and then runs the
EIO ε
action f
on their results. This new task has priority prio
.
Running the resulting BaseIO
action causes the task to be started eagerly. Unlike pure tasks
created by Task.spawn
, tasks created by this function will run even if the last reference to the
task is dropped. The act
should explicitly check for cancellation via IO.checkCanceled
if it
should be terminated or otherwise react to the last reference being dropped.
🔗opaqueBaseIO.bindTask.{u_1} {α : Type u_1} {β : Type}
(t : Task α) (f : α → BaseIO (Task β))
(prio : Task.Priority :=
Task.Priority.default)
(sync : Bool := false) : BaseIO (Task β)
Creates a new task that waits for t
to complete, runs the IO
action f
on its result, and then
continues as the resulting task. This new task has priority prio
.
Running the resulting BaseIO
action causes this new task to be started eagerly. Unlike pure tasks
created by Task.spawn
, tasks created by this function will run even if the last reference to the
task is dropped. The act
should explicitly check for cancellation via IO.checkCanceled
if it
should be terminated or otherwise react to the last reference being dropped.
🔗defEIO.bindTask.{u_1} {α : Type u_1} {ε β : Type}
(t : Task α)
(f : α → EIO ε (Task (Except ε β)))
(prio : Task.Priority :=
Task.Priority.default)
(sync : Bool := false) :
BaseIO (Task (Except ε β))
Creates a new task that waits for t
to complete, runs the EIO ε
action f
on its result, and
then continues as the resulting task. This new task has priority prio
.
Running the resulting BaseIO
action causes this new task to be started eagerly. Unlike pure tasks
created by Task.spawn
, tasks created by this function will run even if the last reference to the
task is dropped. The act
should explicitly check for cancellation via IO.checkCanceled
if it
should be terminated or otherwise react to the last reference being dropped. Because EIO ε
actions
may throw an exception of type ε
, the result of the task is an Except ε α
.
🔗defIO.bindTask.{u_1} {α : Type u_1} {β : Type}
(t : Task α)
(f : α → IO (Task (Except IO.Error β)))
(prio : Task.Priority :=
Task.Priority.default)
(sync : Bool := false) :
BaseIO (Task (Except IO.Error β))
Creates a new task that waits for t
to complete, runs the IO
action f
on its result, and then
continues as the resulting task. This new task has priority prio
.
Running the resulting BaseIO
action causes this new task to be started eagerly. Unlike pure tasks
created by Task.spawn
, tasks created by this function will run even if the last reference to the
task is dropped. The act
should explicitly check for cancellation via IO.checkCanceled
if it
should be terminated or otherwise react to the last reference being dropped. Because IO
actions
may throw an exception of type IO.Error
, the result of the task is an Except IO.Error α
.
🔗defBaseIO.chainTask.{u_1} {α : Type u_1}
(t : Task α) (f : α → BaseIO Unit)
(prio : Task.Priority :=
Task.Priority.default)
(sync : Bool := false) : BaseIO Unit
Creates a new task that waits for t
to complete and then runs the IO
action f
on its result.
This new task has priority prio
.
This is a version of BaseIO.mapTask
that ignores the result value.
Running the resulting BaseIO
action causes the task to be started eagerly. Unlike pure tasks
created by Task.spawn
, tasks created by this function will run even if the last reference to the
task is dropped. The act
should explicitly check for cancellation via IO.checkCanceled
if it
should be terminated or otherwise react to the last reference being dropped.
🔗defEIO.chainTask.{u_1} {α : Type u_1} {ε : Type}
(t : Task α) (f : α → EIO ε Unit)
(prio : Task.Priority :=
Task.Priority.default)
(sync : Bool := false) : EIO ε Unit
Creates a new task that waits for t
to complete and then runs the EIO ε
action f
on its result.
This new task has priority prio
.
This is a version of EIO.mapTask
that ignores the result value.
Running the resulting EIO ε
action causes the task to be started eagerly. Unlike pure tasks
created by Task.spawn
, tasks created by this function will run even if the last reference to the
task is dropped. The act
should explicitly check for cancellation via IO.checkCanceled
if it
should be terminated or otherwise react to the last reference being dropped.
🔗defIO.chainTask.{u_1} {α : Type u_1} (t : Task α)
(f : α → IO Unit)
(prio : Task.Priority :=
Task.Priority.default)
(sync : Bool := false) : IO Unit
Creates a new task that waits for t
to complete and then runs the IO
action f
on its result.
This new task has priority prio
.
This is a version of IO.mapTask
that ignores the result value.
Running the resulting IO
action causes the task to be started eagerly. Unlike pure tasks created
by Task.spawn
, tasks created by this function will run even if the last reference to the task is
dropped. The act should explicitly check for cancellation via IO.checkCanceled
if it should be
terminated or otherwise react to the last reference being dropped.
15.11.4. Cancellation and Status
Impure tasks should use IO.checkCanceled
to react to cancellation, which occurs either as a result of IO.cancel
or when the last reference to the task is dropped.
Pure tasks are terminated automatically upon cancellation.
🔗opaqueIO.cancel.{u_1} {α : Type u_1} :
Task α → BaseIO Unit
Requests cooperative cancellation of the task. The task must explicitly call IO.checkCanceled
to
react to the cancellation.
🔗opaqueIO.checkCanceled : BaseIO Bool
Checks whether the current task's cancellation flag has been set by calling IO.cancel
or by
dropping the last reference to the task.
🔗defIO.hasFinished.{u_1} {α : Type u_1}
(task : Task α) : BaseIO Bool
Checks whether the task has finished execution, at which point calling Task.get
will return
immediately.
🔗opaqueIO.getTaskState.{u_1} {α : Type u_1} :
Task α → BaseIO IO.TaskState
Returns the current state of a task in the Lean runtime's task manager.
For tasks derived from Promise
s, the states waiting
and running
should be considered
equivalent.
🔗inductive typeIO.TaskState : Type
The current state of a Task
in the Lean runtime's task manager.
Constructors
waiting : IO.TaskState
The Task
is waiting to be run.
It can be waiting for dependencies to complete or sitting in the task manager queue waiting for a
thread to run on.
🔗opaqueIO.getTID : BaseIO UInt64
Returns the thread ID of the calling thread.
15.11.5. Promises
Promises represent a value that will be supplied in the future.
Supplying the value is called resolving the promise.
Once created, a promise can be stored in a data structure or passed around like any other value, and attempts to read from it will block until it is resolved.
🔗defIO.Promise (α : Type) : Type
Promise α
allows you to create a Task α
whose value is provided later by calling resolve
.
Typical usage is as follows:
-
let promise ← Promise.new
creates a promise
-
promise.result? : Task (Option α)
can now be passed around
-
promise.result?.get
blocks until the promise is resolved
-
promise.resolve a
resolves the promise
-
promise.result?.get
now returns some a
If the promise is dropped without ever being resolved, promise.result?.get
will return none
.
See Promise.result!/resultD
for other ways to handle this case.
🔗defIO.Promise.isResolved {α : Type}
(promise : IO.Promise α) : BaseIO Bool
Checks whether the promise has already been resolved, i.e. whether access to result*
will return
immediately.
🔗opaqueIO.Promise.result? {α : Type}
(promise : IO.Promise α) : Task (Option α)
Like Promise.result
, but resolves to none
if the promise is dropped without ever being resolved.
🔗defIO.Promise.result! {α : Type}
(promise : IO.Promise α) : Task α
The result task of a Promise
.
The task blocks until Promise.resolve
is called. If the promise is dropped without ever being
resolved, evaluating the task will panic and, when not using fatal panics, block forever. Use
Promise.result?
to handle this case explicitly.
🔗defIO.Promise.resultD {α : Type}
(promise : IO.Promise α) (dflt : α) : Task α
Like Promise.result
, but resolves to dflt
if the promise is dropped without ever being resolved.
🔗opaqueIO.Promise.resolve {α : Type} (value : α)
(promise : IO.Promise α) : BaseIO Unit
Resolves a Promise
.
Only the first call to this function has an effect.
15.11.6. Communication Between Tasks
In addition to the types and operations described in this section, IO.Ref
can be used as a lock.
Taking the reference (using take
) causes other threads to block when reading until the reference is set
again.
This pattern is described in the section on reference cells.
15.11.6.1. Channels
The types and functions in this section are available after importing Std.Sync.Channel
.
🔗defStd.Channel (α : Type) : Type
FIFO channel with unbounded buffer, where recv?
returns a Task
.
A channel can be closed. Once it is closed, all send
s are ignored, and
recv?
returns none
once the queue is empty.
🔗defStd.Channel.send {α : Type} (ch : Std.Channel α)
(v : α) : BaseIO Unit
Sends a message on an Channel
.
This function does not block.
🔗defStd.Channel.recv? {α : Type}
(ch : Std.Channel α) :
BaseIO (Task (Option α))
Receives a message, without blocking.
The returned task waits for the message.
Every message is only received once.
Returns none
if the channel is closed and the queue is empty.
🔗defStd.Channel.recvAllCurrent {α : Type}
(ch : Std.Channel α) : BaseIO (Array α)
Receives all currently queued messages from the channel.
Those messages are dequeued and will not be returned by recv?
.
🔗defStd.Channel.sync {α : Type}
(ch : Std.Channel α) : Std.Channel.Sync α
Accesses synchronous (blocking) version of channel operations.
For example, ch.sync.recv?
blocks until the next message,
and for msg in ch.sync do ...
iterates synchronously over the channel.
These functions should only be used in dedicated threads.
🔗defStd.Channel.Sync (α : Type) : Type
Type tag for synchronous (blocking) operations on a Channel
.
🔗defStd.Channel.Sync.recv? {α : Type}
(ch : Std.Channel.Sync α) : BaseIO (Option α)
Synchronously receives a message from the channel.
Every message is only received once.
Returns none
if the channel is closed and the queue is empty.
Synchronous channels can also be read using Lean.Parser.Term.doFor : doElem
`for x in e do s` iterates over `e` assuming `e`'s type has an instance of the `ForIn` typeclass.
`break` and `continue` are supported inside `for` loops.
`for x in e, x2 in e2, ... do s` iterates of the given collections in parallel,
until at least one of them is exhausted.
The types of `e2` etc. must implement the `ToStream` typeclass.
for
loops.
In particular, there is an instance of type ForIn m (Std.Channel.Sync α) α
for every monad m
with a MonadLiftT BaseIO m
instance.
15.11.6.2. Mutexes
The types and functions in this section are available after importing Std.Sync.Mutex
.
🔗typeStd.Mutex (α : Type) : Type
Mutual exclusion primitive (lock) guarding shared state of type α
.
The type Mutex α
is similar to IO.Ref α
,
except that concurrent accesses are guarded by a mutex
instead of atomic pointer operations and busy-waiting.
🔗defStd.AtomicT (σ : Type) (m : Type → Type)
(α : Type) : Type
AtomicT α m
is the monad that can be atomically executed inside a Mutex α
,
with outside monad m
.
The action has access to the state α
of the mutex (via get
and set
).
15.11.6.3. Condition Variables
The types and functions in this section are available after importing Std.Sync.Mutex
.
🔗opaqueStd.Condvar.wait (condvar : Std.Condvar)
(mutex : Std.BaseMutex) : BaseIO Unit
Waits until another thread calls notifyOne
or notifyAll
.
🔗opaqueStd.Condvar.notifyOne (condvar : Std.Condvar) :
BaseIO Unit
Wakes up a single other thread executing wait
.
🔗defStd.Condvar.waitUntil.{u_1}
{m : Type → Type u_1} [Monad m]
[MonadLift BaseIO m] (condvar : Std.Condvar)
(mutex : Std.BaseMutex) (pred : m Bool) :
m Unit
Waits on the condition variable until the predicate is true.