10.11. Tasks and Threads

Planned Content

A detailed description of the Lean multi-threading model, including:

  • Tasks, priorities, and threads

  • Synchronization and communication primitives

    • IO.Ref as a lock

    • IO.Mutex

    • IO.Channel

    • IO.Condvar

    • IO.Promise

Tracked at issue #90

🔗structure
Task.{u} (α : Type u) : Type u

Task α is a primitive for asynchronous computation. It represents a computation that will resolve to a value of type α, possibly being computed on another thread. This is similar to Future in Scala, Promise in Javascript, and JoinHandle in Rust.

The tasks have an overridden representation in the runtime.

Constructor

Task.pure.{u}

Task.pure (a : α) constructs a task that is already resolved with value a.

Fields

get : α

Blocks the current thread until the given task has finished execution, and then returns the result of the task. If the current thread is itself executing a (non-dedicated) task, the maximum threadpool size is temporarily increased by one while waiting so as to ensure the process cannot be deadlocked by threadpool starvation. Note that when the current thread is unblocked, more tasks than the configured threadpool size may temporarily be running at the same time until sufficiently many tasks have finished.

Task.map and Task.bind should be preferred over Task.get for setting up task dependencies where possible as they do not require temporarily growing the threadpool in this way.

🔗def
Task.spawn.{u} {α : Type u} (fn : Unitα)
  (prio : Task.Priority :=
    Task.Priority.default) :
  Task α

spawn fn : Task α constructs and immediately launches a new task for evaluating the function fn () : α asynchronously.

prio, if provided, is the priority of the task.

🔗def
Task.Priority : Type

Task priority. Tasks with higher priority will always be scheduled before ones with lower priority.

🔗def
Task.Priority.default : Task.Priority

The default priority for spawned tasks, also the lowest priority: 0.

🔗def
Task.Priority.max : Task.Priority

The highest regular priority for spawned tasks: 8.

Spawning a task with a priority higher than Task.Priority.max is not an error but will spawn a dedicated worker for the task, see Task.Priority.dedicated. Regular priority tasks are placed in a thread pool and worked on according to the priority order.

🔗def
Task.Priority.dedicated : Task.Priority

Any priority higher than Task.Priority.max will result in the task being scheduled immediately on a dedicated thread. This is particularly useful for long-running and/or I/O-bound tasks since Lean will by default allocate no more non-dedicated workers than the number of cores to reduce context switches.

🔗def
Task.get.{u} {α : Type u} (self : Task α) : α

Blocks the current thread until the given task has finished execution, and then returns the result of the task. If the current thread is itself executing a (non-dedicated) task, the maximum threadpool size is temporarily increased by one while waiting so as to ensure the process cannot be deadlocked by threadpool starvation. Note that when the current thread is unblocked, more tasks than the configured threadpool size may temporarily be running at the same time until sufficiently many tasks have finished.

Task.map and Task.bind should be preferred over Task.get for setting up task dependencies where possible as they do not require temporarily growing the threadpool in this way.

🔗def
Task.map.{u_1, u_2} {α : Type u_1}
  {β : Type u_2} (f : αβ) (x : Task α)
  (prio : Task.Priority :=
    Task.Priority.default)
  (sync : Bool := false) : Task β

map f x maps function f over the task x: that is, it constructs (and immediately launches) a new task which will wait for the value of x to be available and then calls f on the result.

prio, if provided, is the priority of the task. If sync is set to true, f is executed on the current thread if x has already finished and otherwise on the thread that x finished on. prio is ignored in this case. This should only be done when executing f is cheap and non-blocking.

🔗def
Task.bind.{u_1, u_2} {α : Type u_1}
  {β : Type u_2} (x : Task α) (f : αTask β)
  (prio : Task.Priority :=
    Task.Priority.default)
  (sync : Bool := false) : Task β

bind x f does a monad "bind" operation on the task x with function f: that is, it constructs (and immediately launches) a new task which will wait for the value of x to be available and then calls f on the result, resulting in a new task which is then run for a result.

prio, if provided, is the priority of the task. If sync is set to true, f is executed on the current thread if x has already finished and otherwise on the thread that x finished on. prio is ignored in this case. This should only be done when executing f is cheap and non-blocking.

🔗opaque
IO.wait {α : Type} (t : Task α) : BaseIO α

Wait for the task to finish, then return its result.

🔗opaque
IO.waitAny {α : Type} (tasks : List (Task α))
  (h : tasks.length > 0 := by
    exact Nat.zero_lt_succ _) :
  BaseIO α

Wait until any of the tasks in the given list has finished, then return its result.

🔗opaque
BaseIO.mapTask.{u_1} {α : Type u_1} {β : Type}
  (f : αBaseIO β) (t : Task α)
  (prio : Task.Priority :=
    Task.Priority.default)
  (sync : Bool := false) : BaseIO (Task β)
🔗def
EIO.mapTask.{u_1} {α : Type u_1} {ε β : Type}
  (f : αEIO ε β) (t : Task α)
  (prio : Task.Priority :=
    Task.Priority.default)
  (sync : Bool := false) :
  BaseIO (Task (Except ε β))

EIO specialization of BaseIO.mapTask.

🔗def
IO.mapTask.{u_1} {α : Type u_1} {β : Type}
  (f : αIO β) (t : Task α)
  (prio : Task.Priority :=
    Task.Priority.default)
  (sync : Bool := false) :
  BaseIO (Task (Except IO.Error β))

IO specialization of EIO.mapTask.

🔗def
BaseIO.mapTasks.{u_1} {α : Type u_1} {β : Type}
  (f : List αBaseIO β)
  (tasks : List (Task α))
  (prio : Task.Priority :=
    Task.Priority.default)
  (sync : Bool := false) : BaseIO (Task β)
🔗def
EIO.mapTasks.{u_1} {α : Type u_1} {ε β : Type}
  (f : List αEIO ε β) (tasks : List (Task α))
  (prio : Task.Priority :=
    Task.Priority.default)
  (sync : Bool := false) :
  BaseIO (Task (Except ε β))

EIO specialization of BaseIO.mapTasks.

🔗def
IO.mapTasks.{u_1} {α : Type u_1} {β : Type}
  (f : List αIO β) (tasks : List (Task α))
  (prio : Task.Priority :=
    Task.Priority.default)
  (sync : Bool := false) :
  BaseIO (Task (Except IO.Error β))

IO specialization of EIO.mapTasks.

🔗opaque
BaseIO.bindTask.{u_1} {α : Type u_1} {β : Type}
  (t : Task α) (f : αBaseIO (Task β))
  (prio : Task.Priority :=
    Task.Priority.default)
  (sync : Bool := false) : BaseIO (Task β)
🔗def
EIO.bindTask.{u_1} {α : Type u_1} {ε β : Type}
  (t : Task α)
  (f : αEIO ε (Task (Except ε β)))
  (prio : Task.Priority :=
    Task.Priority.default)
  (sync : Bool := false) :
  BaseIO (Task (Except ε β))

EIO specialization of BaseIO.bindTask.

🔗def
IO.bindTask.{u_1} {α : Type u_1} {β : Type}
  (t : Task α)
  (f : αIO (Task (Except IO.Error β)))
  (prio : Task.Priority :=
    Task.Priority.default)
  (sync : Bool := false) :
  BaseIO (Task (Except IO.Error β))

IO specialization of EIO.bindTask.

🔗opaque
BaseIO.asTask {α : Type} (act : BaseIO α)
  (prio : Task.Priority :=
    Task.Priority.default) :
  BaseIO (Task α)

Run act in a separate Task. This is similar to Haskell's unsafeInterleaveIO, except that the Task is started eagerly as usual. Thus pure accesses to the Task do not influence the impure act computation. Unlike with pure tasks created by Task.spawn, tasks created by this function will be run even if the last reference to the task is dropped. The act should manually check for cancellation via IO.checkCanceled if it wants to react to that.

🔗def
EIO.asTask {ε α : Type} (act : EIO ε α)
  (prio : Task.Priority :=
    Task.Priority.default) :
  BaseIO (Task (Except ε α))

EIO specialization of BaseIO.asTask.

🔗def
IO.asTask {α : Type} (act : IO α)
  (prio : Task.Priority :=
    Task.Priority.default) :
  BaseIO (Task (Except IO.Error α))

IO specialization of EIO.asTask.

🔗opaque
IO.cancel.{u_1} {α : Type u_1} :
  Task αBaseIO Unit

Request cooperative cancellation of the task. The task must explicitly call IO.checkCanceled to react to the cancellation.

🔗opaque
IO.getTaskState.{u_1} {α : Type u_1} :
  Task αBaseIO IO.TaskState

Returns current state of the Task in the Lean runtime's task manager.

Note that for tasks derived from Promises, waiting and running should be considered equivalent.

🔗opaque
IO.checkCanceled : BaseIO Bool

Check if the task's cancellation flag has been set by calling IO.cancel or dropping the last reference to the task.

🔗def
IO.hasFinished.{u_1} {α : Type u_1}
  (task : Task α) : BaseIO Bool

Check if the task has finished execution, at which point calling Task.get will return immediately.

🔗opaque
IO.getTID : BaseIO UInt64

Returns the thread ID of the calling thread.