Task Graph API Reference
API documentation for TaskGraph, TaskGraphBuilder, and TaskChain classes used to build complex workflows with dependencies.
Module-level Constants
LAST_ADDED
Sentinel value used as the default for depends_on parameters. When passed (or omitted,
since it is the default), the method resolves dependencies using the builder's internal
cursor — the last task(s) added. Use None to declare an explicit root task with no
parents.
TaskChain
boilermaker.task.graph.TaskChain(*tasks, on_any_failure=None)
An ordered sequence of tasks forming a composable workflow unit.
TaskChain groups tasks that always execute sequentially (A → B → C). The chain exposes its first task (head) and last task (last) so it can be wired into a TaskGraphBuilder with explicit dependency references.
TaskChain is a value object — it has no builder methods, no cursor, and no graph state. Its sole purpose is to be passed to TaskGraphBuilder.add_chain().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*tasks
|
Task
|
One or more Task objects to chain in order. |
()
|
on_failure
|
Optional Task to run if ANY task in the chain fails. Registered on every task in the chain when embedded via add_chain(). |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If zero tasks are provided. |
Properties
head: The first task in the chain (entry point). last: The last task in the chain (exit point). Use in depends_on to express "wait for this entire chain to complete".
Source code in boilermaker/task/graph.py
head
property
The first task in the chain. Execution begins here.
last
property
The last task in the chain. Use in depends_on to wait for this chain.
options: show_source: true show_root_heading: true members: - init - head - last
TaskGraphBuilder
boilermaker.task.graph.TaskGraphBuilder()
Builder class for constructing TaskGraph instances with flexible dependency management.
Supports multiple patterns:
- Layer-based building for simple sequential/parallel workflows
- Explicit dependency management for complex DAGs
- Success and failure callback chaining
Examples:
# Simple chain: A -> B -> C
builder = TaskGraphBuilder().add(taskA).then(taskB).then(taskC)
# Parallel execution: A, B, C all run in parallel
builder = TaskGraphBuilder().parallel(taskA, taskB, taskC)
# Complex dependencies: D depends on A and B, but C runs independently
builder = (TaskGraphBuilder()
.add(taskA)
.add(taskB)
.add(taskC)
.add(taskD, depends_on=[taskA.task_id, taskB.task_id]))
# With failure handling (inline on_failure= kwarg)
builder = (TaskGraphBuilder()
.add(taskA, on_failure=error_handler)
.then(taskB))
Source code in boilermaker/task/graph.py
add(task, *, depends_on=LAST_ADDED, on_failure=None)
Add a task with optional explicit dependencies.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Task
|
Task to add |
required |
depends_on
|
list[Task | TaskId | TaskChain] | None | LastAddedSingleton
|
Controls parent resolution: - Omitted / LAST_ADDED: depend on the last task(s) added (cursor-following) - None: root task with no parents - list[Task | TaskId | TaskChain]: explicit parent list |
LAST_ADDED
|
on_failure
|
Task | None
|
Optional task to run if this task fails |
None
|
Returns:
| Type | Description |
|---|---|
TaskGraphBuilder
|
Self for method chaining |
Source code in boilermaker/task/graph.py
add_chain(chain, *, depends_on=LAST_ADDED)
Embed a TaskChain into the graph as a composable unit.
Cursor behavior
depends_on=LAST_ADDED (default): chain.head depends on current cursor. Cursor is REPLACED with [chain.last.task_id]. depends_on=None: chain.head is an independent root (no parents). Cursor ACCUMULATES — chain.last is APPENDED to existing cursor. This enables fan-in: two add_chain(depends_on=None) calls followed by .then(join) creates a task that waits for BOTH chains. depends_on=[...]: chain.head depends on resolved deps. Cursor is REPLACED with [chain.last.task_id].
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chain
|
TaskChain
|
TaskChain to embed |
required |
depends_on
|
list[Task | TaskId | TaskChain] | None | LastAddedSingleton
|
Controls parent resolution for the chain's head task |
LAST_ADDED
|
Returns:
| Type | Description |
|---|---|
TaskGraphBuilder
|
Self for method chaining |
Source code in boilermaker/task/graph.py
build(on_all_failed=None)
Build and return the final TaskGraph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_all_failed
|
Task | None
|
Optional graph-level fan-in error callback task. Equivalent
to calling |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TaskGraph |
TaskGraph
|
Constructed graph with all tasks and dependencies |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the graph would contain cycles. |
ValueError
|
If the graph has no tasks. |
ValueError
|
If both |
Source code in boilermaker/task/graph.py
on_all_failed(task)
Register a graph-level fan-in error callback.
The callback fires exactly once after the entire graph reaches terminal-failed state (all main tasks and per-task failure callbacks have finished, with at least one main-task failure).
The task is NOT added to the DAG as a node — it is passed to
TaskGraph.add_all_failed_callback() during build().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Task
|
The Task to run when the graph reaches terminal-failed state. |
required |
Returns:
| Type | Description |
|---|---|
TaskGraphBuilder
|
Self for method chaining. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Source code in boilermaker/task/graph.py
parallel(*tasks, depends_on=LAST_ADDED, on_failure=None)
Add multiple tasks to run in parallel.
Args:
*tasks: Tasks to run in parallel (variadic)
depends_on: Controls parent resolution:
- Omitted / LAST_ADDED: depend on the last task(s) added (cursor-following)
- None: root tasks with no parents
- list[Task | TaskId | TaskChain]: explicit parent list
on_failure: Optional task to run if any of these tasks fail
Returns:
Self for method chaining
Source code in boilermaker/task/graph.py
sequence(*tasks)
Convenience method to run tasks in sequence: A -> B -> C -> ...
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*tasks
|
Task
|
Tasks to run in order |
()
|
Returns:
| Type | Description |
|---|---|
TaskGraphBuilder
|
Self for method chaining |
Source code in boilermaker/task/graph.py
then(task, *, on_failure=None)
Add a task that depends on the previously added task(s).
Args:
task: Task to add that depends on last added tasks
on_failure: Optional task to run if this task fails
Returns:
Self for method chaining
Source code in boilermaker/task/graph.py
options: show_source: true show_root_heading: true members: - init - add - then - parallel - add_chain - chain - on_all_failed - build
TaskGraph
boilermaker.task.graph.TaskGraph
Bases: BaseModel
Represents a Directed Acyclic Graph (DAG) of tasks.
A TaskGraph encapsulates a collection of tasks with defined dependencies ("antecedents"). Each task can have multiple child tasks that depend on its successful completion.
Each task in the graph is represented as a node, and edges define the parent-child relationships between tasks. When all antecedent tasks of a given task are completed successfully, that task becomes eligible for execution.
Every task eligible for execution can be immediately published to the task queue (which allows for parallel execution of independent tasks).
In short, if all parent tasks have succeeded, we can immediately schedule their children.
We expect the graph to be a DAG: no cycles are allowed.
Each completed task will have its result stored in persistent storage, and checking whether the next set of tasks is "ready" means deserializing their antecedents and checking their statuses. In other words, we expect the graph to be serialized to storage when a TaskGraph is first published but we also expect it to be loaded into memory from storage at the conclusion of each task execution.
The order of operations is like this:
- [Send]: Task published -> Graph serialized to storage. We do not write it again!
- [Receive]: Task invoked
- Evaluation -> TaskResult stored to storage.
- Graph loaded from storage (includes latest TaskResultSlim instances).
- Check which tasks are ready.
- Publish ready tasks.
Attributes:
root_id: Unique identifier for the root of the DAG
children: Mapping of task IDs to Task instances
edges: Mapping of parent task IDs to lists of child task IDs
# On write -> TaskResult; on read -> TaskResultSlim
results: Mapping of task IDs to their TaskResult or TaskResultSlim
storage_path
property
Returns the storage path for this task graph.
add_all_failed_callback(callback_task)
Register a graph-level fan-in error callback.
The callback fires exactly once after the graph reaches terminal-failed state (all main tasks and per-task failure callbacks have finished, with at least one failure among the main tasks).
Unlike per-task failure callbacks, this task is stored in fail_children but
is NOT added to fail_edges. That omission is the mechanism that prevents a
circular dependency: _get_reachable_failure_tasks() (and therefore
is_complete()) does not wait for this task via the BFS path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback_task
|
Task
|
The Task to run when the graph reaches terminal-failed state. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If a callback is already registered (only one per graph is supported). |
ValueError
|
If |
ValueError
|
If |
Source code in boilermaker/task/graph.py
add_failure_callback(parent_id, callback_task)
Add an failure callback task to the graph.
Args:
parent_id: The task ID that will trigger the error callback
callback_task: The Task instance to add as an error callback
Raises:
ValueError: If adding this callback would create a cycle in the DAG
Source code in boilermaker/task/graph.py
add_result(result)
Mark a task as completed with result.
Source code in boilermaker/task/graph.py
add_task(task, parent_ids=None)
Add a task to the graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Task
|
The Task instance to add to the graph. |
required |
parent_ids
|
list[TaskId] | None
|
Optional list of parent task IDs to create dependencies |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If adding this task would create a cycle in the DAG |
Source code in boilermaker/task/graph.py
all_antecedents_finished(task_id)
Check if all antecedent tasks of a given task are completed with Success or Failure result.
Source code in boilermaker/task/graph.py
all_antecedents_succeeded(task_id)
Check if all antecedent tasks of a given task are completed with Success result.
Source code in boilermaker/task/graph.py
completed_successfully()
Check if all tasks in the graph have completed successfully.
detect_stalled_tasks()
Identify tasks that appear stalled (non-terminal, non-pending status).
A task is considered stalled if its status is one of: - Scheduled: should have been picked up by a worker - Started: should have completed by now - Retry: should have a pending SB message (but may have been dropped by dedup)
Returns a list of (task_id, function_name, status) tuples for stalled tasks. This method does NOT determine root cause — it only identifies candidates.
Source code in boilermaker/task/graph.py
generate_all_failed_callback_task()
Yield the graph-level all-failed callback task when it is ready to be dispatched.
Yields the callback task exactly once when:
1. An all_failed_callback is registered (all_failed_callback_id is not None).
2. The graph has reached terminal-failed state (is_terminal_failed() is True).
3. The callback's result status is Pending (not yet scheduled or completed).
Yields nothing if:
- No callback is registered.
- The graph has not yet settled.
- The graph completed without any failures.
- The callback has already been scheduled or has finished.
- The callback's result blob does not exist yet (treated as not-yet-schedulable,
consistent with generate_ready_tasks()).
Source code in boilermaker/task/graph.py
generate_failure_ready_tasks()
Get a list of failure callback tasks that are ready to be executed.
A failure task is ready if:
- It hasn't started yet (no result or Pending status)
- At least one of its triggering parent tasks has failed
- All other dependencies (if any) have finished
Source code in boilermaker/task/graph.py
generate_pending_results()
Generate pending TaskResultSlim entries for all tasks.
This should probably only be run when the graph is first created and stored
Source code in boilermaker/task/graph.py
generate_ready_tasks()
Get a list of tasks that are ready to be executed (not started and all antecedents succeeded).
Source code in boilermaker/task/graph.py
generate_scheduled_tasks()
Yield tasks already in Scheduled status whose scheduling conditions are still met.
Intended for crash-recovery in continue_graph: if a prior invocation wrote
a task to Scheduled in blob storage but crashed before publishing the
Service Bus message, this method identifies it on redelivery so that
continue_graph can re-publish it without a second blob write.
NOTE: continue_graph does not currently call this method. The crash-recovery
path is not yet active. See BMO-56 / staff-engineer review Concern 3.
Conditions
- Regular child tasks: all antecedents must have succeeded
(same predicate as
generate_ready_tasks). - Failure callback tasks: at least one triggering parent must have a
failed status (same predicate as
generate_failure_ready_tasks). - all_failed_callback:
is_terminal_failed()must be True.
Source code in boilermaker/task/graph.py
get_result(task_id)
get_status(task_id)
graph_path(graph_id)
classmethod
has_failures()
Check if any tasks in the graph have failed.
Source code in boilermaker/task/graph.py
is_complete()
Check if the graph has finished executing (reached a terminal state).
A graph is complete when:
1. All main tasks (children) are in finished states, AND
2. All reachable failure callback tasks are in finished states, AND
3. If the graph is terminal-failed and an all_failed_callback is registered,
that callback must also have reached a finished state.
We don't expect ALL failure children to be invoked, only those reachable from actual failures. The graph is in a terminal state when we've processed as far as we can in both the main graph and the failure graph.
Source code in boilermaker/task/graph.py
is_terminal_failed()
Check if the graph has reached a terminal-failed state.
Terminal-failed means all main tasks and per-task failure callbacks have
settled (same check as the core of is_complete()), AND at least one
main task has a failure-type status.
This is the trigger condition for dispatching the all_failed_callback.
Returns:
| Type | Description |
|---|---|
bool
|
True when the graph has fully settled with at least one failure. |
Source code in boilermaker/task/graph.py
schedule_task(task_id)
Mark a task as scheduled to prevent double-scheduling.
Source code in boilermaker/task/graph.py
task_is_ready(task_id)
Check if a task is ready to be executed (antecedents succeeded and not yet started).
options: show_source: true show_root_heading: true members: - graph_id - children - fail_children - edges - fail_edges - all_failed_callback_id - results - add_task - add_failure_callback - add_all_failed_callback - schedule_task - add_result - task_is_ready - generate_ready_tasks - generate_failure_ready_tasks - generate_all_failed_callback_task - completed_successfully - has_failures - is_terminal_failed - is_complete
All-Failed Callback API
TaskGraphBuilder.on_all_failed(task: Task) -> TaskGraphBuilder
Registers a graph-level fan-in error callback. Returns self for chaining. Raises ValueError
if called more than once on the same builder.
TaskGraphBuilder.build(on_all_failed: Task | None = None) -> TaskGraph
Equivalent to calling .on_all_failed(task) before .build(). Raises ValueError if both
the method and the kwarg are used on the same builder instance.
TaskGraph.all_failed_callback_id: TaskId | None
None when no graph-level error callback is registered; otherwise the TaskId of the
callback task. The callback task is stored in fail_children but has no entry in
fail_edges.
TaskGraph.add_all_failed_callback(callback_task: Task) -> None
Registers callback_task as the graph-level fan-in error callback. Stores it in
fail_children and sets all_failed_callback_id. Does not add it to fail_edges.
Raises ValueError if:
all_failed_callback_idis already set (only one per graph is supported), orcallback_task.task_idis already registered as a per-task failure callback.
If the callback task has on_success or on_failure set, those are resolved into the graph
exactly as they are for tasks added via add_task().
TaskGraph.is_terminal_failed() -> bool
Returns True when all main tasks and all reachable per-task failure callbacks have reached
a finished state, and at least one main task has a failure-type status (Failure,
RetriesExhausted, or Deadlettered). This is the trigger condition for dispatching the
all_failed_callback.
TaskGraph.generate_all_failed_callback_task() -> Generator[Task]
Yields the callback task at most once, when:
all_failed_callback_idis set.is_terminal_failed()isTrue.- The callback's result status is
Pending(not yet scheduled or completed).
Yields nothing if no callback is registered, if the graph has not yet settled, if the graph completed without failures, or if the callback has already been scheduled or finished.
is_complete() interaction
When a graph-level callback is registered and the graph reaches terminal-failed state,
is_complete() returns False until the callback itself has reached a finished state. If the
graph completes without any failures, is_complete() behaves identically to the pre-callback
behavior — the registered-but-untriggered callback has no effect on completion.
Quick Examples
from boilermaker.task import LAST_ADDED, TaskChain, TaskGraphBuilder
# Sequential workflow: A → B → C
graph = (
TaskGraphBuilder()
.add(task_a)
.then(task_b)
.then(task_c)
.build()
)
# Fan-out: A → (B, C, D)
graph = (
TaskGraphBuilder()
.add(task_a)
.parallel(task_b, task_c, task_d)
.build()
)
# Diamond: A → (B, C) → D
graph = (
TaskGraphBuilder()
.add(task_a)
.parallel(task_b, task_c)
.then(task_d)
.build()
)
# Inline failure handler
graph = (
TaskGraphBuilder()
.add(main_task, on_failure=cleanup_task)
.then(success_task) # only runs if main_task succeeds
.build()
)
# Independent chains with fan-in join
chain_abc = TaskChain(task_a, task_b, task_c)
chain_de = TaskChain(task_d, task_e)
graph = (
TaskGraphBuilder()
.add_chain(chain_abc, depends_on=None) # root; cursor accumulates
.add_chain(chain_de, depends_on=None) # root; cursor accumulates
.then(task_f) # depends on both chain lasts
.build()
)
# TaskChain with shared failure handler
pipeline = TaskChain(task_a, task_b, task_c, on_any_failure=error_handler)
graph = TaskGraphBuilder().add_chain(pipeline).build()
# Graph-level error callback (fires once after the whole graph settles with failures)
graph = (
TaskGraphBuilder()
.add(task_a)
.add(task_b)
.build(on_all_failed=handle_graph_failure)
)
# Equivalent using the chainable method
graph = (
TaskGraphBuilder()
.add(task_a)
.add(task_b)
.on_all_failed(handle_graph_failure)
.build()
)