Task API Reference
boilermaker.task.Task
Bases: BaseModel
Represents a serializable task with retry policies and callback chains.
A Task encapsulates a function call with its arguments, retry configuration, and optional success/failure callbacks. Tasks are JSON-serializable and can be published to Azure Service Bus for asynchronous execution.
Attributes:
Name | Type | Description |
---|---|---|
task_id |
str
|
Unique UUID7 identifier for timestamp-ordered task identification |
should_dead_letter |
bool
|
Whether failed tasks should be dead-lettered (default: True) |
acks_late |
bool
|
Whether to acknowledge messages after processing (default: True) |
function_name |
str
|
Name of the registered function to execute |
attempts |
RetryAttempts
|
Retry attempt tracking and metadata |
policy |
RetryPolicy
|
Retry policy governing backoff and max attempts |
payload |
dict[str, Any]
|
Function arguments and keyword arguments (must be JSON-serializable) |
diagnostic_id |
str | None
|
OpenTelemetry parent trace ID for distributed tracing |
_sequence_number |
int | None
|
Service Bus sequence number (set after publishing) |
on_success |
Optional[Task]
|
Optional callback task to run on successful completion |
on_failure |
Optional[Task]
|
Optional callback task to run on failure |
Example
Create task with default settings
task = Task.default("my_function", args=[1, 2], kwargs={"key": "value"})
Create task with custom retry policy
policy = RetryPolicy(max_tries=5, backoff_mode="exponential") task = Task.si(my_function, arg1, kwarg=value, policy=policy)
Chain tasks with callbacks
task1 >> task2 >> task3 # Success chain task1.on_failure = error_handler_task
acks_early
property
Whether the task acknowledges messages before processing.
Returns:
Name | Type | Description |
---|---|---|
bool |
True if acks_late is False, meaning messages are acknowledged immediately upon receipt |
can_retry
property
Whether the task can be retried based on current attempts.
Returns:
Name | Type | Description |
---|---|---|
bool |
True if current attempts are less than or equal to the maximum tries allowed by the retry policy |
__lshift__(other)
Set success callback using << operator (left-shift chaining).
Creates a success callback chain where this task executes if the right-hand task completes successfully.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other
|
Task
|
Task that will trigger this task on success |
required |
Returns:
Name | Type | Description |
---|---|---|
Task |
Task
|
This task, allowing for continued chaining |
Example
task1 << task2 << task3
If task3 succeeds, run task2
If task2 succeeds, run task1
Source code in boilermaker/task.py
__rshift__(other)
Set success callback using >> operator (right-shift chaining).
Creates a success callback chain where the right-hand task executes if this task completes successfully.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other
|
Task
|
Task to execute on success |
required |
Returns:
Name | Type | Description |
---|---|---|
Task |
Task
|
The other task, allowing for continued chaining |
Example
task1 >> task2 >> task3
If task1 succeeds, run task2
If task2 succeeds, run task3
Source code in boilermaker/task.py
default(function_name, **kwargs)
classmethod
Create a Task with default retry settings.
Convenience method to create a task with sensible defaults for retry attempts and policy. Additional keyword arguments override default values.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function_name
|
str
|
Name of the function to execute |
required |
**kwargs
|
Additional task attributes to override defaults |
{}
|
Returns:
Name | Type | Description |
---|---|---|
Task |
New task instance with default retry configuration |
Example
task = Task.default("process_data", payload={"data": [1, 2, 3]})
Source code in boilermaker/task.py
get_next_delay()
Calculate the delay before the next retry attempt.
Uses the task's retry policy to determine the appropriate delay based on the current number of attempts.
Returns:
Name | Type | Description |
---|---|---|
int |
Delay in seconds before next retry attempt |
Source code in boilermaker/task.py
record_attempt()
Record a new execution attempt with current timestamp.
Increments the attempt counter and records the current UTC timestamp for tracking retry intervals and debugging.
Returns:
Name | Type | Description |
---|---|---|
RetryAttempts |
Updated attempts object with incremented count |
Source code in boilermaker/task.py
si(fn, *fn_args, should_dead_letter=True, acks_late=True, policy=None, **fn_kwargs)
classmethod
Create an immutable signature task from a function and arguments.
Creates a task bound to specific function arguments, useful for preparing tasks with callbacks or custom settings before publishing. The function arguments are captured at creation time.
Note: This implementation creates immutable signatures only. Future versions may support mutable signatures where task outputs are passed to subsequent tasks in a chain.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn
|
Callable
|
The function to be executed |
required |
*fn_args
|
Positional arguments for the function |
()
|
|
should_dead_letter
|
bool
|
Whether to dead-letter failed tasks (default: True) |
True
|
acks_late
|
bool
|
Whether to acknowledge after processing (default: True) |
True
|
policy
|
RetryPolicy | None
|
Custom retry policy (uses default if None) |
None
|
**fn_kwargs
|
Keyword arguments for the function |
{}
|
Returns:
Name | Type | Description |
---|---|---|
Task |
Task
|
New task with bound function signature |
Example
def process_data(data, format="json"): ... return f"Processed {data} as {format}"
task = Task.si(process_data, [1, 2, 3], format="xml")
Arguments are bound to the task