Skip to content

Task API Reference

boilermaker.task.Task

Bases: BaseModel

Represents a serializable task with retry policies and callback chains.

A Task encapsulates a function call with its arguments, retry configuration, and optional success/failure callbacks. Tasks are JSON-serializable and can be published to Azure Service Bus for asynchronous execution.

Attributes:

Name Type Description
task_id str

Unique UUID7 identifier for timestamp-ordered task identification

should_dead_letter bool

Whether failed tasks should be dead-lettered (default: True)

acks_late bool

Whether to acknowledge messages after processing (default: True)

function_name str

Name of the registered function to execute

attempts RetryAttempts

Retry attempt tracking and metadata

policy RetryPolicy

Retry policy governing backoff and max attempts

payload dict[str, Any]

Function arguments and keyword arguments (must be JSON-serializable)

diagnostic_id str | None

OpenTelemetry parent trace ID for distributed tracing

_sequence_number int | None

Service Bus sequence number (set after publishing)

on_success Optional[Task]

Optional callback task to run on successful completion

on_failure Optional[Task]

Optional callback task to run on failure

Example

Create task with default settings

task = Task.default("my_function", args=[1, 2], kwargs={"key": "value"})

Create task with custom retry policy

policy = RetryPolicy(max_tries=5, backoff_mode="exponential") task = Task.si(my_function, arg1, kwarg=value, policy=policy)

Chain tasks with callbacks

task1 >> task2 >> task3 # Success chain task1.on_failure = error_handler_task

acks_early property

Whether the task acknowledges messages before processing.

Returns:

Name Type Description
bool

True if acks_late is False, meaning messages are acknowledged immediately upon receipt

can_retry property

Whether the task can be retried based on current attempts.

Returns:

Name Type Description
bool

True if current attempts are less than or equal to the maximum tries allowed by the retry policy

__lshift__(other)

Set success callback using << operator (left-shift chaining).

Creates a success callback chain where this task executes if the right-hand task completes successfully.

Parameters:

Name Type Description Default
other Task

Task that will trigger this task on success

required

Returns:

Name Type Description
Task Task

This task, allowing for continued chaining

Example

task1 << task2 << task3

If task3 succeeds, run task2

If task2 succeeds, run task1

Source code in boilermaker/task.py
def __lshift__(self, other: "Task") -> "Task":
    """Set success callback using << operator (left-shift chaining).

    Creates a success callback chain where this task executes
    if the right-hand task completes successfully.

    Args:
        other: Task that will trigger this task on success

    Returns:
        Task: This task, allowing for continued chaining

    Example:
        >>> task1 << task2 << task3
        # If task3 succeeds, run task2
        # If task2 succeeds, run task1
    """
    other.on_success = self
    return self

__rshift__(other)

Set success callback using >> operator (right-shift chaining).

Creates a success callback chain where the right-hand task executes if this task completes successfully.

Parameters:

Name Type Description Default
other Task

Task to execute on success

required

Returns:

Name Type Description
Task Task

The other task, allowing for continued chaining

Example

task1 >> task2 >> task3

If task1 succeeds, run task2

If task2 succeeds, run task3

Source code in boilermaker/task.py
def __rshift__(self, other: "Task") -> "Task":
    """Set success callback using >> operator (right-shift chaining).

    Creates a success callback chain where the right-hand task
    executes if this task completes successfully.

    Args:
        other: Task to execute on success

    Returns:
        Task: The other task, allowing for continued chaining

    Example:
        >>> task1 >> task2 >> task3
        # If task1 succeeds, run task2
        # If task2 succeeds, run task3
    """
    self.on_success = other
    return other

default(function_name, **kwargs) classmethod

Create a Task with default retry settings.

Convenience method to create a task with sensible defaults for retry attempts and policy. Additional keyword arguments override default values.

Parameters:

Name Type Description Default
function_name str

Name of the function to execute

required
**kwargs

Additional task attributes to override defaults

{}

Returns:

Name Type Description
Task

New task instance with default retry configuration

Example

task = Task.default("process_data", payload={"data": [1, 2, 3]})

Source code in boilermaker/task.py
@classmethod
def default(cls, function_name: str, **kwargs):
    """Create a Task with default retry settings.

    Convenience method to create a task with sensible defaults for
    retry attempts and policy. Additional keyword arguments override
    default values.

    Args:
        function_name: Name of the function to execute
        **kwargs: Additional task attributes to override defaults

    Returns:
        Task: New task instance with default retry configuration

    Example:
        >>> task = Task.default("process_data", payload={"data": [1, 2, 3]})
    """
    attempts = retries.RetryAttempts.default()
    policy = retries.RetryPolicy.default()
    if "policy" in kwargs:
        policy = kwargs.pop("policy")
    return cls(
        attempts=attempts,
        function_name=function_name,
        policy=policy,
        payload={},
        diagnostic_id=None,
        **kwargs,
    )

get_next_delay()

Calculate the delay before the next retry attempt.

Uses the task's retry policy to determine the appropriate delay based on the current number of attempts.

Returns:

Name Type Description
int

Delay in seconds before next retry attempt

Source code in boilermaker/task.py
def get_next_delay(self):
    """Calculate the delay before the next retry attempt.

    Uses the task's retry policy to determine the appropriate
    delay based on the current number of attempts.

    Returns:
        int: Delay in seconds before next retry attempt
    """
    return self.policy.get_delay_interval(self.attempts.attempts)

record_attempt()

Record a new execution attempt with current timestamp.

Increments the attempt counter and records the current UTC timestamp for tracking retry intervals and debugging.

Returns:

Name Type Description
RetryAttempts

Updated attempts object with incremented count

Source code in boilermaker/task.py
def record_attempt(self):
    """Record a new execution attempt with current timestamp.

    Increments the attempt counter and records the current UTC
    timestamp for tracking retry intervals and debugging.

    Returns:
        RetryAttempts: Updated attempts object with incremented count
    """
    now = datetime.datetime.now(datetime.UTC)
    return self.attempts.inc(now)

si(fn, *fn_args, should_dead_letter=True, acks_late=True, policy=None, **fn_kwargs) classmethod

Create an immutable signature task from a function and arguments.

Creates a task bound to specific function arguments, useful for preparing tasks with callbacks or custom settings before publishing. The function arguments are captured at creation time.

Note: This implementation creates immutable signatures only. Future versions may support mutable signatures where task outputs are passed to subsequent tasks in a chain.

Parameters:

Name Type Description Default
fn Callable

The function to be executed

required
*fn_args

Positional arguments for the function

()
should_dead_letter bool

Whether to dead-letter failed tasks (default: True)

True
acks_late bool

Whether to acknowledge after processing (default: True)

True
policy RetryPolicy | None

Custom retry policy (uses default if None)

None
**fn_kwargs

Keyword arguments for the function

{}

Returns:

Name Type Description
Task Task

New task with bound function signature

Example

def process_data(data, format="json"): ... return f"Processed {data} as {format}"

task = Task.si(process_data, [1, 2, 3], format="xml")

Arguments are bound to the task

Source code in boilermaker/task.py
@classmethod
def si(
    cls,
    fn: typing.Callable,
    *fn_args,
    should_dead_letter: bool = True,
    acks_late: bool = True,
    policy: retries.RetryPolicy | None = None,
    **fn_kwargs,
) -> "Task":
    """Create an immutable signature task from a function and arguments.

    Creates a task bound to specific function arguments, useful for
    preparing tasks with callbacks or custom settings before publishing.
    The function arguments are captured at creation time.

    Note: This implementation creates immutable signatures only.
    Future versions may support mutable signatures where task outputs
    are passed to subsequent tasks in a chain.

    Args:
        fn: The function to be executed
        *fn_args: Positional arguments for the function
        should_dead_letter: Whether to dead-letter failed tasks (default: True)
        acks_late: Whether to acknowledge after processing (default: True)
        policy: Custom retry policy (uses default if None)
        **fn_kwargs: Keyword arguments for the function

    Returns:
        Task: New task with bound function signature

    Example:
        >>> def process_data(data, format="json"):
        ...     return f"Processed {data} as {format}"
        >>>
        >>> task = Task.si(process_data, [1, 2, 3], format="xml")
        >>> # Arguments are bound to the task
    """
    attempts = retries.RetryAttempts.default()
    policy = policy or retries.RetryPolicy.default()

    return cls(
        should_dead_letter=should_dead_letter,
        acks_late=acks_late,
        attempts=attempts,
        function_name=fn.__name__,
        policy=policy,
        payload={
            "args": fn_args,
            "kwargs": fn_kwargs,
        },
        diagnostic_id=None,
    )