Task API Reference
boilermaker.task.Task
Bases: BaseModel
Represents a serializable task with retry policies and callback chains.
A Task encapsulates a function call with its arguments, retry configuration, and optional success/failure callbacks. Tasks are JSON-serializable and can be published to Azure Service Bus for asynchronous execution.
Attributes:
| Name | Type | Description |
|---|---|---|
task_id |
TaskId
|
Unique UUID7 identifier for timestamp-ordered task identification |
should_dead_letter |
bool
|
Whether failed tasks should be dead-lettered (default: True) |
acks_late |
bool
|
Whether to acknowledge messages after processing (default: True) |
function_name |
str
|
Name of the registered function to execute |
attempts |
RetryAttempts
|
Retry attempt tracking and metadata |
policy |
RetryPolicy
|
Retry policy governing backoff and max attempts |
payload |
dict[str, Any]
|
Function arguments and keyword arguments (must be JSON-serializable) |
_sequence_number |
int | None
|
Service Bus sequence number (set after publishing) |
on_success |
Optional[Task]
|
Optional callback task to run on successful completion |
on_failure |
Optional[Task]
|
Optional callback task to run on failure |
Example:
# Create task with default settings
task = Task.default("my_function", args=[1, 2], kwargs={"key": "value"})
# Create task with custom retry policy
policy = RetryPolicy(max_tries=5, backoff_mode="exponential")
task = Task.si(my_function, arg1, kwarg=value, policy=policy)
# Chain tasks with callbacks
task1 >> task2 >> task3 # Success chain
task1.on_failure = error_handler_task
acks_early
property
Whether the task acknowledges messages before processing.
Returns:
| Name | Type | Description |
|---|---|---|
bool |
True if acks_late is False, meaning messages are acknowledged immediately upon receipt |
can_retry
property
Whether the task can be retried based on current attempts.
Returns:
| Name | Type | Description |
|---|---|---|
bool |
True if current attempts are less than or equal to the maximum tries allowed by the retry policy |
diagnostic_id
cached
property
Get the OpenTelemetry diagnostic ID if available.
Returns:
| Type | Description |
|---|---|
str | None
|
str | None: The diagnostic ID for tracing, or None if not set |
msg
property
writable
Get the associated Service Bus message if available.
Returns:
| Type | Description |
|---|---|
ServiceBusReceivedMessage | None
|
ServiceBusReceivedMessage | None: The associated Service Bus message, or None if not set |
sequence_number
cached
property
Get the Service Bus sequence number if available.
Returns:
| Type | Description |
|---|---|
int | None
|
int | None: The sequence number of the associated Service Bus message, or None if not set |
__hash__()
__lshift__(other)
Set success callback using << operator (left-shift chaining).
Creates a success callback chain where this task executes if the right-hand task completes successfully.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
Task
|
Task that will trigger this task on success |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Task |
Task
|
This task, allowing for continued chaining |
Example:
# If task3 succeeds, run task2
# If task2 succeeds, run task1
task1 << task2 << task3
Source code in boilermaker/task/task.py
__rshift__(other)
Set success callback using >> operator (right-shift chaining).
Creates a success callback chain where the right-hand task executes if this task completes successfully.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
Task
|
Task to execute on success |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Task |
Task
|
The other task, allowing for continued chaining |
Example:
# If task1 succeeds, run task2
# If task2 succeeds, run task3
task1 >> task2 >> task3
Source code in boilermaker/task/task.py
default(function_name, **kwargs)
classmethod
Create a Task with default retry settings.
Convenience method to create a task with sensible defaults for retry attempts and policy. Additional keyword arguments override default values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
function_name
|
str
|
Name of the function to execute |
required |
**kwargs
|
Additional task attributes to override defaults |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
Task |
New task instance with default retry configuration |
Example:
task = Task.default("process_data", payload={"data": [1, 2, 3]})
Source code in boilermaker/task/task.py
get_next_delay()
Calculate the delay before the next retry attempt.
Uses the task's retry policy to determine the appropriate delay based on the current number of attempts.
Returns:
| Name | Type | Description |
|---|---|---|
int |
Delay in seconds before next retry attempt |
Source code in boilermaker/task/task.py
mark_published(sequence_number)
Mark the task as published by setting the sequence number.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_number
|
int
|
The Service Bus sequence number assigned upon publishing |
required |
Source code in boilermaker/task/task.py
record_attempt()
Record a new execution attempt with current timestamp.
Increments the attempt counter and records the current UTC timestamp for tracking retry intervals and debugging.
Returns:
| Name | Type | Description |
|---|---|---|
RetryAttempts |
Updated attempts object with incremented count |
Source code in boilermaker/task/task.py
si(fn, *fn_args, should_dead_letter=True, acks_late=True, policy=None, **fn_kwargs)
classmethod
Create an immutable signature task from a function and arguments.
Creates a task bound to specific function arguments, useful for preparing tasks with callbacks or custom settings before publishing. The function arguments are captured at creation time.
Note: This implementation creates immutable signatures only. Future versions may support mutable signatures where task outputs are passed to subsequent tasks in a chain.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
TaskHandler
|
The function to be executed |
required |
*fn_args
|
Positional arguments for the function |
()
|
|
should_dead_letter
|
bool
|
Whether to dead-letter failed tasks (default: True) |
True
|
acks_late
|
bool
|
Whether to acknowledge after processing (default: True) |
True
|
policy
|
RetryPolicy | None
|
Custom retry policy (uses default if None) |
None
|
**fn_kwargs
|
Keyword arguments for the function |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
Task |
Task
|
New task with bound function signature |
Example:
def process_data(data, format="json"):
return f"Processed {data} as {format}"
# Arguments are bound to the task **immutably** at creation time
task = Task.si(process_data, [1, 2, 3], format="xml")
Source code in boilermaker/task/task.py
options: show_root_heading: true show_source: true heading_level: 2