Skip to content

Task Graph API Reference

API documentation for TaskGraph, TaskGraphBuilder, and TaskChain classes used to build complex workflows with dependencies.

Module-level Constants

LAST_ADDED

from boilermaker.task import LAST_ADDED

Sentinel value used as the default for depends_on parameters. When passed (or omitted, since it is the default), the method resolves dependencies using the builder's internal cursor — the last task(s) added. Use None to declare an explicit root task with no parents.

TaskChain

boilermaker.task.graph.TaskChain(*tasks, on_any_failure=None)

An ordered sequence of tasks forming a composable workflow unit.

TaskChain groups tasks that always execute sequentially (A → B → C). The chain exposes its first task (head) and last task (last) so it can be wired into a TaskGraphBuilder with explicit dependency references.

TaskChain is a value object — it has no builder methods, no cursor, and no graph state. Its sole purpose is to be passed to TaskGraphBuilder.add_chain().

Parameters:

Name Type Description Default
*tasks Task

One or more Task objects to chain in order.

()
on_failure

Optional Task to run if ANY task in the chain fails. Registered on every task in the chain when embedded via add_chain().

required

Raises:

Type Description
ValueError

If zero tasks are provided.

Properties

head: The first task in the chain (entry point). last: The last task in the chain (exit point). Use in depends_on to express "wait for this entire chain to complete".

Source code in boilermaker/task/graph.py
def __init__(self, *tasks: Task, on_any_failure: Task | None = None) -> None:
    if len(tasks) == 0:
        raise ValueError("TaskChain requires at least one task.")
    self._tasks: list[Task] = list(tasks)
    self.on_any_failure: Task | None = on_any_failure

head property

The first task in the chain. Execution begins here.

last property

The last task in the chain. Use in depends_on to wait for this chain.

__iter__()

Iterate over the tasks in the chain in order.

Source code in boilermaker/task/graph.py
def __iter__(self):
    """Iterate over the tasks in the chain in order."""
    return iter(self._tasks)

options: show_source: true show_root_heading: true members: - init - head - last

TaskGraphBuilder

boilermaker.task.graph.TaskGraphBuilder()

Builder class for constructing TaskGraph instances with flexible dependency management.

Supports multiple patterns:

  1. Layer-based building for simple sequential/parallel workflows
  2. Explicit dependency management for complex DAGs
  3. Success and failure callback chaining

Examples:

# Simple chain: A -> B -> C
builder = TaskGraphBuilder().add(taskA).then(taskB).then(taskC)

# Parallel execution: A, B, C all run in parallel
builder = TaskGraphBuilder().parallel(taskA, taskB, taskC)

# Complex dependencies: D depends on A and B, but C runs independently
builder = (TaskGraphBuilder()
    .add(taskA)
    .add(taskB)
    .add(taskC)
    .add(taskD, depends_on=[taskA.task_id, taskB.task_id]))

# With failure handling (inline on_failure= kwarg)
builder = (TaskGraphBuilder()
    .add(taskA, on_failure=error_handler)
    .then(taskB))
Source code in boilermaker/task/graph.py
def __init__(self) -> None:
    self._tasks: dict[TaskId, Task] = {}
    self._dependencies: dict[TaskId, set[TaskId]] = {}
    self._failure_callbacks: dict[TaskId, set[Task]] = {}
    self._last_added: list[TaskId] = []  # Track recently added tasks for chaining
    self._all_failed_callback: Task | None = None

add(task, *, depends_on=LAST_ADDED, on_failure=None)

Add a task with optional explicit dependencies.

Parameters:

Name Type Description Default
task Task

Task to add

required
depends_on list[Task | TaskId | TaskChain] | None | LastAddedSingleton

Controls parent resolution: - Omitted / LAST_ADDED: depend on the last task(s) added (cursor-following) - None: root task with no parents - list[Task | TaskId | TaskChain]: explicit parent list

LAST_ADDED
on_failure Task | None

Optional task to run if this task fails

None

Returns:

Type Description
TaskGraphBuilder

Self for method chaining

Source code in boilermaker/task/graph.py
def add(
    self,
    task: Task,
    *,
    depends_on: list[Task | TaskId | TaskChain] | None | LastAddedSingleton = LAST_ADDED,
    on_failure: Task | None = None,
) -> "TaskGraphBuilder":
    """Add a task with optional explicit dependencies.

    Args:
        task: Task to add
        depends_on: Controls parent resolution:
            - Omitted / LAST_ADDED: depend on the last task(s) added (cursor-following)
            - None: root task with no parents
            - list[Task | TaskId | TaskChain]: explicit parent list
        on_failure: Optional task to run if this task fails

    Returns:
        Self for method chaining
    """
    if depends_on is LAST_ADDED:
        parent_ids = list(self._last_added)
    elif depends_on is None:
        parent_ids = []
    else:
        # Because `_LastAddedSentinel` is a singleton type used to indicate the default
        # We have to help the type checker understand that depends_on is not the sentinel.
        depends = typing.cast(list[Task | TaskId | TaskChain], depends_on)
        parent_ids = self._resolve_dependencies(depends)
    self._register_task(task, parent_ids=parent_ids, on_failure=on_failure)
    self._last_added = [task.task_id]
    return self

add_chain(chain, *, depends_on=LAST_ADDED)

Embed a TaskChain into the graph as a composable unit.

Cursor behavior

depends_on=LAST_ADDED (default): chain.head depends on current cursor. Cursor is REPLACED with [chain.last.task_id]. depends_on=None: chain.head is an independent root (no parents). Cursor ACCUMULATES — chain.last is APPENDED to existing cursor. This enables fan-in: two add_chain(depends_on=None) calls followed by .then(join) creates a task that waits for BOTH chains. depends_on=[...]: chain.head depends on resolved deps. Cursor is REPLACED with [chain.last.task_id].

Parameters:

Name Type Description Default
chain TaskChain

TaskChain to embed

required
depends_on list[Task | TaskId | TaskChain] | None | LastAddedSingleton

Controls parent resolution for the chain's head task

LAST_ADDED

Returns:

Type Description
TaskGraphBuilder

Self for method chaining

Source code in boilermaker/task/graph.py
def add_chain(
    self,
    chain: "TaskChain",
    *,
    depends_on: list[Task | TaskId | TaskChain] | None | LastAddedSingleton = LAST_ADDED,
) -> "TaskGraphBuilder":
    """Embed a TaskChain into the graph as a composable unit.

    Cursor behavior:
        depends_on=LAST_ADDED (default): chain.head depends on current cursor.
            Cursor is REPLACED with [chain.last.task_id].
        depends_on=None: chain.head is an independent root (no parents).
            Cursor ACCUMULATES — chain.last is APPENDED to existing cursor.
            This enables fan-in: two add_chain(depends_on=None) calls followed
            by .then(join) creates a task that waits for BOTH chains.
        depends_on=[...]: chain.head depends on resolved deps.
            Cursor is REPLACED with [chain.last.task_id].

    Args:
        chain: TaskChain to embed
        depends_on: Controls parent resolution for the chain's head task

    Returns:
        Self for method chaining
    """
    if depends_on is None:
        first_task_parents: list[TaskId] = []
    elif depends_on is LAST_ADDED:
        first_task_parents = list(self._last_added)
    else:
        depends = typing.cast(list[Task | TaskId | TaskChain], depends_on)
        first_task_parents = self._resolve_dependencies(depends)

    chain_parent = None
    for task in chain:
        if chain_parent is None:
            parent_task_ids = first_task_parents
        else:
            parent_task_ids = [chain_parent.task_id]

        self._register_task(task, parent_ids=parent_task_ids, on_failure=chain.on_any_failure)
        chain_parent = task

    # ACCUMULATE cursor for independent roots, REPLACE for all others
    if depends_on is None:
        self._last_added.append(chain.last.task_id)
    else:
        self._last_added = [chain.last.task_id]

    return self

build(on_all_failed=None)

Build and return the final TaskGraph.

Parameters:

Name Type Description Default
on_all_failed Task | None

Optional graph-level fan-in error callback task. Equivalent to calling .on_all_failed(task) before .build(). Raises ValueError if both this kwarg and .on_all_failed() are set.

None

Returns:

Name Type Description
TaskGraph TaskGraph

Constructed graph with all tasks and dependencies

Raises:

Type Description
ValueError

If the graph would contain cycles.

ValueError

If the graph has no tasks.

ValueError

If both on_all_failed kwarg and .on_all_failed() method have been used on the same builder.

Source code in boilermaker/task/graph.py
def build(self, on_all_failed: Task | None = None) -> TaskGraph:
    """Build and return the final TaskGraph.

    Args:
        on_all_failed: Optional graph-level fan-in error callback task. Equivalent
            to calling ``.on_all_failed(task)`` before ``.build()``. Raises
            ``ValueError`` if both this kwarg and ``.on_all_failed()`` are set.

    Returns:
        TaskGraph: Constructed graph with all tasks and dependencies

    Raises:
        ValueError: If the graph would contain cycles.
        ValueError: If the graph has no tasks.
        ValueError: If both ``on_all_failed`` kwarg and ``.on_all_failed()`` method
                    have been used on the same builder.
    """
    if not self._tasks:
        raise ValueError("Cannot build an empty graph. Add at least one task before calling build().")

    if on_all_failed is not None and self._all_failed_callback is not None:
        raise ValueError(
            "on_all_failed set via both method and kwarg — use one or the other."
        )

    tg = TaskGraph()
    # Add all tasks with their dependencies
    for task_id, task in self._tasks.items():
        dependencies = list(self._dependencies.get(task_id, set()))
        tg.add_task(task, parent_ids=dependencies)

    # Add failure callbacks
    for parent_task_id, callback_tasks in self._failure_callbacks.items():
        for callback_task in callback_tasks:
            tg.add_failure_callback(parent_task_id, callback_task)

    # Add the graph-level all-failed callback if one was registered
    effective_callback = on_all_failed or self._all_failed_callback
    if effective_callback is not None:
        tg.add_all_failed_callback(effective_callback)

    return tg

on_all_failed(task)

Register a graph-level fan-in error callback.

The callback fires exactly once after the entire graph reaches terminal-failed state (all main tasks and per-task failure callbacks have finished, with at least one main-task failure).

The task is NOT added to the DAG as a node — it is passed to TaskGraph.add_all_failed_callback() during build().

Parameters:

Name Type Description Default
task Task

The Task to run when the graph reaches terminal-failed state.

required

Returns:

Type Description
TaskGraphBuilder

Self for method chaining.

Raises:

Type Description
ValueError

If on_all_failed has already been called on this builder.

Source code in boilermaker/task/graph.py
def on_all_failed(self, task: Task) -> "TaskGraphBuilder":
    """Register a graph-level fan-in error callback.

    The callback fires exactly once after the entire graph reaches terminal-failed
    state (all main tasks and per-task failure callbacks have finished, with at
    least one main-task failure).

    The task is NOT added to the DAG as a node — it is passed to
    ``TaskGraph.add_all_failed_callback()`` during ``build()``.

    Args:
        task: The Task to run when the graph reaches terminal-failed state.

    Returns:
        Self for method chaining.

    Raises:
        ValueError: If ``on_all_failed`` has already been called on this builder.
    """
    if self._all_failed_callback is not None:
        raise ValueError("on_all_failed is already set.")
    self._all_failed_callback = task
    return self

parallel(*tasks, depends_on=LAST_ADDED, on_failure=None)

Add multiple tasks to run in parallel.

Args:

*tasks: Tasks to run in parallel (variadic)
depends_on: Controls parent resolution:
    - Omitted / LAST_ADDED: depend on the last task(s) added (cursor-following)
    - None: root tasks with no parents
    - list[Task | TaskId | TaskChain]: explicit parent list
on_failure: Optional task to run if any of these tasks fail

Returns:

Self for method chaining
Source code in boilermaker/task/graph.py
def parallel(
    self,
    *tasks: Task,
    depends_on: list[Task | TaskId | TaskChain] | None | LastAddedSingleton = LAST_ADDED,
    on_failure: Task | None = None,
) -> "TaskGraphBuilder":
    """Add multiple tasks to run in parallel.

    Args:

        *tasks: Tasks to run in parallel (variadic)
        depends_on: Controls parent resolution:
            - Omitted / LAST_ADDED: depend on the last task(s) added (cursor-following)
            - None: root tasks with no parents
            - list[Task | TaskId | TaskChain]: explicit parent list
        on_failure: Optional task to run if any of these tasks fail

    Returns:

        Self for method chaining
    """
    if len(tasks) == 0:
        raise ValueError("parallel requires at least one task.")

    if depends_on is LAST_ADDED:
        shared_parents = list(self._last_added)
    elif depends_on is None:
        shared_parents = []
    else:
        depends = typing.cast(list[Task | TaskId | TaskChain], depends_on)
        shared_parents = self._resolve_dependencies(depends)

    for t in tasks:
        self._register_task(t, parent_ids=shared_parents, on_failure=on_failure)

    self._last_added = [t.task_id for t in tasks]
    return self

sequence(*tasks)

Convenience method to run tasks in sequence: A -> B -> C -> ...

Parameters:

Name Type Description Default
*tasks Task

Tasks to run in order

()

Returns:

Type Description
TaskGraphBuilder

Self for method chaining

Source code in boilermaker/task/graph.py
def sequence(self, *tasks: Task) -> "TaskGraphBuilder":
    """Convenience method to run tasks in sequence: A -> B -> C -> ...

    Args:
        *tasks: Tasks to run in order

    Returns:
        Self for method chaining
    """
    if not tasks:
        return self

    # Add first task
    self.add(tasks[0])

    # Chain the rest
    for task in tasks[1:]:
        self.then(task)

    return self

then(task, *, on_failure=None)

Add a task that depends on the previously added task(s).

Args:

task: Task to add that depends on last added tasks
on_failure: Optional task to run if this task fails

Returns:

Self for method chaining
Source code in boilermaker/task/graph.py
def then(self, task: Task, *, on_failure: Task | None = None) -> "TaskGraphBuilder":
    """Add a task that depends on the previously added task(s).

    Args:

        task: Task to add that depends on last added tasks
        on_failure: Optional task to run if this task fails

    Returns:

        Self for method chaining
    """
    if not self._last_added:
        raise ValueError(
            "No tasks have been added yet. Call .add() or .add_chain() first, "
            "then use .then() to continue the chain."
        )

    return self.add(task, depends_on=LAST_ADDED, on_failure=on_failure)

options: show_source: true show_root_heading: true members: - init - add - then - parallel - add_chain - chain - on_all_failed - build

TaskGraph

boilermaker.task.graph.TaskGraph

Bases: BaseModel

Represents a Directed Acyclic Graph (DAG) of tasks.

A TaskGraph encapsulates a collection of tasks with defined dependencies ("antecedents"). Each task can have multiple child tasks that depend on its successful completion.

Each task in the graph is represented as a node, and edges define the parent-child relationships between tasks. When all antecedent tasks of a given task are completed successfully, that task becomes eligible for execution.

Every task eligible for execution can be immediately published to the task queue (which allows for parallel execution of independent tasks).

In short, if all parent tasks have succeeded, we can immediately schedule their children.

We expect the graph to be a DAG: no cycles are allowed.

Each completed task will have its result stored in persistent storage, and checking whether the next set of tasks is "ready" means deserializing their antecedents and checking their statuses. In other words, we expect the graph to be serialized to storage when a TaskGraph is first published but we also expect it to be loaded into memory from storage at the conclusion of each task execution.

The order of operations is like this:

  • [Send]: Task published -> Graph serialized to storage. We do not write it again!
  • [Receive]: Task invoked
    • Evaluation -> TaskResult stored to storage.
    • Graph loaded from storage (includes latest TaskResultSlim instances).
    • Check which tasks are ready.
    • Publish ready tasks.

Attributes:

root_id: Unique identifier for the root of the DAG
children: Mapping of task IDs to Task instances
edges: Mapping of parent task IDs to lists of child task IDs
# On write -> TaskResult; on read -> TaskResultSlim
results: Mapping of task IDs to their TaskResult or TaskResultSlim

storage_path property

Returns the storage path for this task graph.

add_all_failed_callback(callback_task)

Register a graph-level fan-in error callback.

The callback fires exactly once after the graph reaches terminal-failed state (all main tasks and per-task failure callbacks have finished, with at least one failure among the main tasks).

Unlike per-task failure callbacks, this task is stored in fail_children but is NOT added to fail_edges. That omission is the mechanism that prevents a circular dependency: _get_reachable_failure_tasks() (and therefore is_complete()) does not wait for this task via the BFS path.

Parameters:

Name Type Description Default
callback_task Task

The Task to run when the graph reaches terminal-failed state.

required

Raises:

Type Description
ValueError

If a callback is already registered (only one per graph is supported).

ValueError

If callback_task.task_id is already in children (guards against reusing a main task as the callback).

ValueError

If callback_task.task_id is already in fail_children (guards against reusing a per-task failure callback).

Source code in boilermaker/task/graph.py
def add_all_failed_callback(self, callback_task: Task) -> None:
    """Register a graph-level fan-in error callback.

    The callback fires exactly once after the graph reaches terminal-failed state
    (all main tasks and per-task failure callbacks have finished, with at least one
    failure among the main tasks).

    Unlike per-task failure callbacks, this task is stored in ``fail_children`` but
    is NOT added to ``fail_edges``. That omission is the mechanism that prevents a
    circular dependency: ``_get_reachable_failure_tasks()`` (and therefore
    ``is_complete()``) does not wait for this task via the BFS path.

    Args:
        callback_task: The Task to run when the graph reaches terminal-failed state.

    Raises:
        ValueError: If a callback is already registered (only one per graph is supported).
        ValueError: If ``callback_task.task_id`` is already in ``children``
                    (guards against reusing a main task as the callback).
        ValueError: If ``callback_task.task_id`` is already in ``fail_children``
                    (guards against reusing a per-task failure callback).
    """
    if self.all_failed_callback_id is not None:
        raise ValueError(
            "all_failed_callback is already set; only one is supported per graph."
        )
    if callback_task.task_id in self.children:
        raise ValueError(
            f"Task {callback_task.task_id!r} is already registered as a main task. "
            "A task cannot serve as both a main task and the all_failed_callback."
        )
    if callback_task.task_id in self.fail_children:
        raise ValueError(
            f"Task {callback_task.task_id!r} is already registered as a per-task failure "
            "callback. A task cannot serve as both a per-task callback and the "
            "all_failed_callback."
        )

    callback_task.graph_id = self.graph_id
    self.fail_children[callback_task.task_id] = callback_task
    self.all_failed_callback_id = callback_task.task_id

    # Handle on_success / on_failure chaining on the callback task itself.
    if callback_task.on_success:
        self.add_task(callback_task.on_success, parent_ids=[callback_task.task_id])
        callback_task.on_success = None

    if callback_task.on_failure:
        self.add_failure_callback(callback_task.task_id, callback_task.on_failure)
        callback_task.on_failure = None

add_failure_callback(parent_id, callback_task)

Add an failure callback task to the graph.

Args:

parent_id: The task ID that will trigger the error callback
callback_task: The Task instance to add as an error callback

Raises:

ValueError: If adding this callback would create a cycle in the DAG
Source code in boilermaker/task/graph.py
def add_failure_callback(self, parent_id: TaskId, callback_task: Task) -> None:
    """Add an failure callback task to the graph.

    Args:

        parent_id: The task ID that will trigger the error callback
        callback_task: The Task instance to add as an error callback

    Raises:

        ValueError: If adding this callback would create a cycle in the DAG
    """

    callback_task.graph_id = self.graph_id

    if callback_task.task_id not in self.fail_children:
        self.fail_children[callback_task.task_id] = callback_task

    if parent_id not in self.fail_edges:
        self.fail_edges[parent_id] = set()
    self.fail_edges[parent_id].add(callback_task.task_id)

    # Check for cycles after adding the edge
    if self._detect_cycles():
        # Rollback the changes
        self.fail_edges[parent_id].remove(callback_task.task_id)
        if not self.fail_edges[parent_id]:  # Remove empty set
            del self.fail_edges[parent_id]
        del self.fail_children[callback_task.task_id]
        raise ValueError(f"Adding failure callback for task {parent_id} would create a cycle in the DAG")

    # handle recursion
    if callback_task.on_success:
        self.add_task(callback_task.on_success, parent_ids=[callback_task.task_id])
        callback_task.on_success = None

    if callback_task.on_failure:
        self.add_failure_callback(callback_task.task_id, callback_task.on_failure)
        callback_task.on_failure = None

add_result(result)

Mark a task as completed with result.

Source code in boilermaker/task/graph.py
def add_result(self, result: TaskResult) -> TaskResult:
    """Mark a task as completed with result."""
    if result.task_id not in self.children and result.task_id not in self.fail_children:
        raise ValueError(f"Task {result.task_id} not found in graph")

    self.results[result.task_id] = result
    return result

add_task(task, parent_ids=None)

Add a task to the graph.

Parameters:

Name Type Description Default
task Task

The Task instance to add to the graph.

required
parent_ids list[TaskId] | None

Optional list of parent task IDs to create dependencies

None

Raises:

Type Description
ValueError

If adding this task would create a cycle in the DAG

Source code in boilermaker/task/graph.py
def add_task(self, task: Task, parent_ids: list[TaskId] | None = None) -> None:
    """Add a task to the graph.

    Args:
        task: The Task instance to add to the graph.
        parent_ids: Optional list of parent task IDs to create dependencies

    Raises:
        ValueError: If adding this task would create a cycle in the DAG
    """
    # Ensure task is part of this graph
    task.graph_id = self.graph_id
    self.children[task.task_id] = task

    if parent_ids:
        for parent_id in parent_ids:
            if parent_id not in self.edges:
                self.edges[parent_id] = set()
            self.edges[parent_id].add(task.task_id)

        # Check for cycles after adding the edges
        if self._detect_cycles():
            # Rollback the changes
            for parent_id in parent_ids:
                self.edges[parent_id].remove(task.task_id)
                if not self.edges[parent_id]:  # Remove empty set
                    del self.edges[parent_id]
            del self.children[task.task_id]
            raise ValueError(
                f"Adding task {task.task_id} with parents {parent_ids} would create a cycle in the DAG"
            )

    # If we leave `on_success` and `on_failure` it's potentially confusing for both callers
    # and our own evaluation. It also has the potential to create cycles inadvertently, so we
    # dynamically add on_success and on_failure into edges here and remove these as individual task callbacks.
    if task.on_success:
        self.add_task(task.on_success, parent_ids=[task.task_id])
        task.on_success = None  # Clear to avoid duplication

    if task.on_failure:
        self.add_failure_callback(task.task_id, task.on_failure)
        task.on_failure = None

all_antecedents_finished(task_id)

Check if all antecedent tasks of a given task are completed with Success or Failure result.

Source code in boilermaker/task/graph.py
def all_antecedents_finished(self, task_id: TaskId) -> bool:
    """Check if all antecedent tasks of a given task are completed with `Success` or `Failure` result."""
    for parent_id, children_ids in self.edges.items():
        if task_id in children_ids:
            if parent_id not in self.results:
                return False
            # Check if the parent has actually finished (not just exists in results)
            if not self.results[parent_id].status.finished:
                return False
    # If we get here, all antecedents have finished *OR* there are no antecedents.
    return True

all_antecedents_succeeded(task_id)

Check if all antecedent tasks of a given task are completed with Success result.

Source code in boilermaker/task/graph.py
def all_antecedents_succeeded(self, task_id: TaskId) -> bool:
    """Check if all antecedent tasks of a given task are completed with `Success` result."""
    for parent_id, children_ids in self.edges.items():
        if task_id in children_ids:
            if parent_id not in self.results:
                return False
            if not self.results[parent_id].succeeded:
                return False
    # If we get here, all antecedents succeeded *OR* there are no antecedents.
    return True

completed_successfully()

Check if all tasks in the graph have completed successfully.

Source code in boilermaker/task/graph.py
def completed_successfully(self) -> bool:
    """Check if all tasks in the graph have completed successfully."""
    return all(
        map(
            lambda task_id: self.get_status(task_id) == TaskStatus.Success,
            self.children.keys(),
        )
    )

detect_stalled_tasks()

Identify tasks that appear stalled (non-terminal, non-pending status).

A task is considered stalled if its status is one of: - Scheduled: should have been picked up by a worker - Started: should have completed by now - Retry: should have a pending SB message (but may have been dropped by dedup)

Returns a list of (task_id, function_name, status) tuples for stalled tasks. This method does NOT determine root cause — it only identifies candidates.

Source code in boilermaker/task/graph.py
def detect_stalled_tasks(self) -> list[tuple[TaskId, str, TaskStatus]]:
    """Identify tasks that appear stalled (non-terminal, non-pending status).

    A task is considered stalled if its status is one of:
    - Scheduled: should have been picked up by a worker
    - Started: should have completed by now
    - Retry: should have a pending SB message (but may have been dropped by dedup)

    Returns a list of (task_id, function_name, status) tuples for stalled tasks.
    This method does NOT determine root cause — it only identifies candidates.
    """
    stalled_statuses = {TaskStatus.Scheduled, TaskStatus.Started, TaskStatus.Retry}
    stalled: list[tuple[TaskId, str, TaskStatus]] = []

    for task_id, task in itertools.chain(self.children.items(), self.fail_children.items()):
        result = self.results.get(task_id)
        if result is not None and result.status in stalled_statuses:
            stalled.append((task_id, task.function_name, result.status))

    return stalled

generate_all_failed_callback_task()

Yield the graph-level all-failed callback task when it is ready to be dispatched.

Yields the callback task exactly once when: 1. An all_failed_callback is registered (all_failed_callback_id is not None). 2. The graph has reached terminal-failed state (is_terminal_failed() is True). 3. The callback's result status is Pending (not yet scheduled or completed).

Yields nothing if: - No callback is registered. - The graph has not yet settled. - The graph completed without any failures. - The callback has already been scheduled or has finished. - The callback's result blob does not exist yet (treated as not-yet-schedulable, consistent with generate_ready_tasks()).

Source code in boilermaker/task/graph.py
def generate_all_failed_callback_task(self) -> Generator[Task]:
    """Yield the graph-level all-failed callback task when it is ready to be dispatched.

    Yields the callback task exactly once when:
    1. An ``all_failed_callback`` is registered (``all_failed_callback_id`` is not None).
    2. The graph has reached terminal-failed state (``is_terminal_failed()`` is True).
    3. The callback's result status is ``Pending`` (not yet scheduled or completed).

    Yields nothing if:
    - No callback is registered.
    - The graph has not yet settled.
    - The graph completed without any failures.
    - The callback has already been scheduled or has finished.
    - The callback's result blob does not exist yet (treated as not-yet-schedulable,
      consistent with ``generate_ready_tasks()``).
    """
    if self.all_failed_callback_id is None:
        return
    cb_id = self.all_failed_callback_id
    result = self.results.get(cb_id)
    is_pending = result is not None and result.status == TaskStatus.Pending
    if self.is_terminal_failed() and is_pending:
        yield self.fail_children[cb_id]

generate_failure_ready_tasks()

Get a list of failure callback tasks that are ready to be executed.

A failure task is ready if:

  1. It hasn't started yet (no result or Pending status)
  2. At least one of its triggering parent tasks has failed
  3. All other dependencies (if any) have finished
Source code in boilermaker/task/graph.py
def generate_failure_ready_tasks(self) -> Generator[Task]:
    """Get a list of failure callback tasks that are ready to be executed.

    A failure task is ready if:

    1. It hasn't started yet (no result or Pending status)
    2. At least one of its triggering parent tasks has failed
    3. All other dependencies (if any) have finished
    """
    for task_id in self.fail_children.keys():
        # Check if this failure task has already started
        task_result = self.results.get(task_id)
        is_not_started = task_result is None or task_result.status == TaskStatus.Pending

        if not is_not_started:
            continue

        # Find which parent task(s) would trigger this failure callback
        triggering_parents = []
        for parent_id, fail_child_ids in self.fail_edges.items():
            if task_id in fail_child_ids:
                triggering_parents.append(parent_id)

        # Check if any triggering parent has failed
        for parent_id in triggering_parents:
            # Check if parent has failed status in results
            if parent_id in self.results and self.results[parent_id].status.failed:
                yield self.fail_children[task_id]
                break  # Prevent double-yield when multiple parents have failed

generate_pending_results()

Generate pending TaskResultSlim entries for all tasks.

This should probably only be run when the graph is first created and stored

Source code in boilermaker/task/graph.py
def generate_pending_results(self) -> Generator[TaskResultSlim]:
    """
    Generate pending TaskResultSlim entries for all tasks.

    This should probably only be run when the graph is first created and stored
    """
    for task_id in itertools.chain.from_iterable((self.children.keys(), self.fail_children.keys())):
        # Create a pending result if it doesn't exist
        if self.get_result(task_id) is None:
            pending_result = TaskResultSlim(
                task_id=task_id,
                graph_id=self.graph_id,
                status=TaskStatus.Pending,
            )
            self.results[pending_result.task_id] = pending_result

        task_result = self.results[task_id]
        if task_result.status == TaskStatus.Pending:
            yield task_result

generate_ready_tasks()

Get a list of tasks that are ready to be executed (not started and all antecedents succeeded).

Source code in boilermaker/task/graph.py
def generate_ready_tasks(self) -> Generator[Task]:
    """Get a list of tasks that are ready to be executed (not started and all antecedents succeeded)."""
    for task_id in self.children.keys():
        task_result = self.results.get(task_id)
        if task_result is None:
            logger.warning(
                f"Task {task_id} has no result blob in graph {self.graph_id}; "
                "skipping. This may indicate a partial store_graph failure."
            )
            continue
        if task_result.status == TaskStatus.Pending and self.task_is_ready(task_id):
            yield self.children[task_id]

generate_scheduled_tasks()

Yield tasks already in Scheduled status whose scheduling conditions are still met.

Intended for crash-recovery in continue_graph: if a prior invocation wrote a task to Scheduled in blob storage but crashed before publishing the Service Bus message, this method identifies it on redelivery so that continue_graph can re-publish it without a second blob write.

NOTE: continue_graph does not currently call this method. The crash-recovery path is not yet active. See BMO-56 / staff-engineer review Concern 3.

Conditions
  • Regular child tasks: all antecedents must have succeeded (same predicate as generate_ready_tasks).
  • Failure callback tasks: at least one triggering parent must have a failed status (same predicate as generate_failure_ready_tasks).
  • all_failed_callback: is_terminal_failed() must be True.
Source code in boilermaker/task/graph.py
def generate_scheduled_tasks(self) -> Generator[Task]:
    """Yield tasks already in Scheduled status whose scheduling conditions are still met.

    Intended for crash-recovery in ``continue_graph``: if a prior invocation wrote
    a task to ``Scheduled`` in blob storage but crashed before publishing the
    Service Bus message, this method identifies it on redelivery so that
    ``continue_graph`` can re-publish it without a second blob write.

    NOTE: ``continue_graph`` does not currently call this method. The crash-recovery
    path is not yet active. See BMO-56 / staff-engineer review Concern 3.

    Conditions:
      - Regular child tasks: all antecedents must have succeeded
        (same predicate as ``generate_ready_tasks``).
      - Failure callback tasks: at least one triggering parent must have a
        failed status (same predicate as ``generate_failure_ready_tasks``).
      - all_failed_callback: ``is_terminal_failed()`` must be True.
    """
    # Regular children already in Scheduled status
    for task_id, task in self.children.items():
        result = self.results.get(task_id)
        if result is not None and result.status == TaskStatus.Scheduled:
            if self.all_antecedents_succeeded(task_id):
                yield task

    # Failure callback children already in Scheduled status
    for task_id, task in self.fail_children.items():
        result = self.results.get(task_id)
        if result is not None and result.status == TaskStatus.Scheduled:
            # At least one triggering parent must have a failed status
            for parent_id, fail_child_ids in self.fail_edges.items():
                if task_id in fail_child_ids:
                    if parent_id in self.results and self.results[parent_id].status.failed:
                        yield task
                        break  # Prevent double-yield when multiple parents have failed

    # all_failed_callback crash-recovery: if the callback was written to Scheduled
    # in blob storage but the Service Bus publish crashed before completing, re-yield
    # it on redelivery so continue_graph() can re-publish it.
    #
    # NOTE: continue_graph() does not currently call generate_scheduled_tasks(), so
    # this block is not yet reachable from any production code path. It is preserved
    # here for a follow-on issue that will wire continue_graph() to call this method.
    if self.all_failed_callback_id is not None:
        cb_id = self.all_failed_callback_id
        result = self.results.get(cb_id)
        if result is not None and result.status == TaskStatus.Scheduled and self.is_terminal_failed():
            yield self.fail_children[cb_id]

get_result(task_id)

Get the result of a completed task.

Source code in boilermaker/task/graph.py
def get_result(self, task_id: TaskId) -> TaskResultSlim | TaskResult | None:
    """Get the result of a completed task."""
    return self.results.get(task_id)

get_status(task_id)

Check if a task is completed.

Source code in boilermaker/task/graph.py
def get_status(self, task_id: TaskId) -> TaskStatus | None:
    """Check if a task is completed."""
    if tr := self.get_result(task_id):
        return tr.status
    return None

graph_path(graph_id) classmethod

Returns the storage path for an arbitrary GraphId.

Source code in boilermaker/task/graph.py
@classmethod
def graph_path(cls, graph_id: GraphId) -> Path:
    """Returns the storage path for an arbitrary GraphId."""
    return Path(graph_id) / cls.StorageName

has_failures()

Check if any tasks in the graph have failed.

Source code in boilermaker/task/graph.py
def has_failures(self) -> bool:
    """Check if any tasks in the graph have failed."""
    failure_statuses = TaskStatus.failure_types()
    return any(
        self.get_status(task_id) in failure_statuses
        for task_id in self.children.keys()
        if self.get_status(task_id) is not None
    )

is_complete()

Check if the graph has finished executing (reached a terminal state).

A graph is complete when: 1. All main tasks (children) are in finished states, AND 2. All reachable failure callback tasks are in finished states, AND 3. If the graph is terminal-failed and an all_failed_callback is registered, that callback must also have reached a finished state.

We don't expect ALL failure children to be invoked, only those reachable from actual failures. The graph is in a terminal state when we've processed as far as we can in both the main graph and the failure graph.

Source code in boilermaker/task/graph.py
def is_complete(self) -> bool:
    """Check if the graph has finished executing (reached a terminal state).

    A graph is complete when:
    1. All main tasks (children) are in finished states, AND
    2. All reachable failure callback tasks are in finished states, AND
    3. If the graph is terminal-failed and an ``all_failed_callback`` is registered,
       that callback must also have reached a finished state.

    We don't expect ALL failure children to be invoked, only those reachable
    from actual failures. The graph is in a terminal state when we've processed
    as far as we can in both the main graph and the failure graph.
    """
    if not self._is_complete_main_and_failure_tasks():
        return False

    # If the graph finished with failures and a callback is registered,
    # the callback must also have finished before we declare the graph complete.
    if self.all_failed_callback_id is not None and self.has_failures():
        cb_status = self.get_status(self.all_failed_callback_id)
        if cb_status is None or not cb_status.finished:
            return False

    return True

is_terminal_failed()

Check if the graph has reached a terminal-failed state.

Terminal-failed means all main tasks and per-task failure callbacks have settled (same check as the core of is_complete()), AND at least one main task has a failure-type status.

This is the trigger condition for dispatching the all_failed_callback.

Returns:

Type Description
bool

True when the graph has fully settled with at least one failure.

Source code in boilermaker/task/graph.py
def is_terminal_failed(self) -> bool:
    """Check if the graph has reached a terminal-failed state.

    Terminal-failed means all main tasks and per-task failure callbacks have
    settled (same check as the core of ``is_complete()``), AND at least one
    main task has a failure-type status.

    This is the trigger condition for dispatching the ``all_failed_callback``.

    Returns:
        True when the graph has fully settled with at least one failure.
    """
    return self._is_complete_main_and_failure_tasks() and self.has_failures()

schedule_task(task_id)

Mark a task as scheduled to prevent double-scheduling.

Source code in boilermaker/task/graph.py
def schedule_task(self, task_id: TaskId) -> TaskResult | TaskResultSlim:
    """Mark a task as scheduled to prevent double-scheduling."""
    if not (task_id in self.children or task_id in self.fail_children):
        raise ValueError(f"Task {task_id} not found in graph")
    if task_id not in self.results or self.results[task_id].status != TaskStatus.Pending:
        raise ValueError(f"Task {task_id} is not pending and cannot be scheduled")

    result = self.results[task_id]
    result.status = TaskStatus.Scheduled
    self.results[result.task_id] = result
    return result

task_is_ready(task_id)

Check if a task is ready to be executed (antecedents succeeded and not yet started).

Source code in boilermaker/task/graph.py
def task_is_ready(self, task_id: TaskId) -> bool:
    """Check if a task is ready to be executed (antecedents succeeded and not yet started)."""
    return self.all_antecedents_succeeded(task_id)

options: show_source: true show_root_heading: true members: - graph_id - children - fail_children - edges - fail_edges - all_failed_callback_id - results - add_task - add_failure_callback - add_all_failed_callback - schedule_task - add_result - task_is_ready - generate_ready_tasks - generate_failure_ready_tasks - generate_all_failed_callback_task - completed_successfully - has_failures - is_terminal_failed - is_complete

All-Failed Callback API

TaskGraphBuilder.on_all_failed(task: Task) -> TaskGraphBuilder

Registers a graph-level fan-in error callback. Returns self for chaining. Raises ValueError if called more than once on the same builder.

TaskGraphBuilder.build(on_all_failed: Task | None = None) -> TaskGraph

Equivalent to calling .on_all_failed(task) before .build(). Raises ValueError if both the method and the kwarg are used on the same builder instance.

TaskGraph.all_failed_callback_id: TaskId | None

None when no graph-level error callback is registered; otherwise the TaskId of the callback task. The callback task is stored in fail_children but has no entry in fail_edges.

TaskGraph.add_all_failed_callback(callback_task: Task) -> None

Registers callback_task as the graph-level fan-in error callback. Stores it in fail_children and sets all_failed_callback_id. Does not add it to fail_edges.

Raises ValueError if:

  • all_failed_callback_id is already set (only one per graph is supported), or
  • callback_task.task_id is already registered as a per-task failure callback.

If the callback task has on_success or on_failure set, those are resolved into the graph exactly as they are for tasks added via add_task().

TaskGraph.is_terminal_failed() -> bool

Returns True when all main tasks and all reachable per-task failure callbacks have reached a finished state, and at least one main task has a failure-type status (Failure, RetriesExhausted, or Deadlettered). This is the trigger condition for dispatching the all_failed_callback.

TaskGraph.generate_all_failed_callback_task() -> Generator[Task]

Yields the callback task at most once, when:

  1. all_failed_callback_id is set.
  2. is_terminal_failed() is True.
  3. The callback's result status is Pending (not yet scheduled or completed).

Yields nothing if no callback is registered, if the graph has not yet settled, if the graph completed without failures, or if the callback has already been scheduled or finished.

is_complete() interaction

When a graph-level callback is registered and the graph reaches terminal-failed state, is_complete() returns False until the callback itself has reached a finished state. If the graph completes without any failures, is_complete() behaves identically to the pre-callback behavior — the registered-but-untriggered callback has no effect on completion.

Quick Examples

from boilermaker.task import LAST_ADDED, TaskChain, TaskGraphBuilder

# Sequential workflow: A → B → C
graph = (
    TaskGraphBuilder()
    .add(task_a)
    .then(task_b)
    .then(task_c)
    .build()
)

# Fan-out: A → (B, C, D)
graph = (
    TaskGraphBuilder()
    .add(task_a)
    .parallel(task_b, task_c, task_d)
    .build()
)

# Diamond: A → (B, C) → D
graph = (
    TaskGraphBuilder()
    .add(task_a)
    .parallel(task_b, task_c)
    .then(task_d)
    .build()
)

# Inline failure handler
graph = (
    TaskGraphBuilder()
    .add(main_task, on_failure=cleanup_task)
    .then(success_task)  # only runs if main_task succeeds
    .build()
)

# Independent chains with fan-in join
chain_abc = TaskChain(task_a, task_b, task_c)
chain_de  = TaskChain(task_d, task_e)

graph = (
    TaskGraphBuilder()
    .add_chain(chain_abc, depends_on=None)  # root; cursor accumulates
    .add_chain(chain_de,  depends_on=None)  # root; cursor accumulates
    .then(task_f)                           # depends on both chain lasts
    .build()
)

# TaskChain with shared failure handler
pipeline = TaskChain(task_a, task_b, task_c, on_any_failure=error_handler)
graph = TaskGraphBuilder().add_chain(pipeline).build()

# Graph-level error callback (fires once after the whole graph settles with failures)
graph = (
    TaskGraphBuilder()
    .add(task_a)
    .add(task_b)
    .build(on_all_failed=handle_graph_failure)
)

# Equivalent using the chainable method
graph = (
    TaskGraphBuilder()
    .add(task_a)
    .add(task_b)
    .on_all_failed(handle_graph_failure)
    .build()
)