Boilermaker API Reference
boilermaker.app.Boilermaker(state, service_bus_client, enable_opentelemetry=False, results_storage=None)
Async task runner for Azure Service Bus queues.
Boilermaker allows you to register async functions as background tasks, schedule them for execution, and run workers to process them. It provides retry policies, task chaining, callbacks, and comprehensive error handling.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
Any
|
Shared application state passed to all tasks as first argument |
required |
service_bus_client
|
AzureServiceBus | ManagedAzureServiceBusSender
|
Azure Service Bus client for message handling |
required |
enable_opentelemetry
|
bool
|
Enable OpenTelemetry tracing (default: False) |
False
|
Example:
from boilermaker import Boilermaker
from boilermaker.service_bus import AzureServiceBus
# Set up ServiceBus client
client = AzureServiceBus.from_config(config)
# Create app with shared state
app = Boilermaker({"counter": 0}, client)
# Register a task
@app.task()
async def my_task(state, message: str):
state["counter"] += 1
print(f"Processing: {message}")
return "completed"
# Schedule a task
await app.apply_async(my_task, "hello world")
# Run worker (in separate process)
await app.run()
Source code in boilermaker/app.py
apply_async(fn, *args, delay=0, publish_attempts=1, policy=None, **kwargs)
async
Schedule a task for background execution.
Creates a task and immediately publishes it to the Azure Service Bus queue. This is the most common way to schedule background work.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
TaskHandler
|
Registered async function to execute |
required |
*args
|
Positional arguments for the function |
()
|
|
delay
|
int
|
Seconds to delay before task becomes visible (default: 0) |
0
|
publish_attempts
|
int
|
Retry attempts for publishing failures (default: 1) |
1
|
policy
|
RetryPolicy | None
|
Override retry policy for this task instance |
None
|
**kwargs
|
Keyword arguments for the function |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
Task |
Task
|
The published task |
Raises:
| Type | Description |
|---|---|
BoilermakerAppException
|
If publishing fails after all attempts |
ValueError
|
If function is not registered |
Example:
# Simple task scheduling
await app.apply_async(send_email, "user@example.com")
# With custom retry policy
await app.apply_async(
process_image,
image_url="https://example.com/image.jpg",
policy=retries.RetryPolicy(max_tries=3)
)
# Delayed execution (5 minutes)
await app.apply_async(cleanup_temp_files, delay=300)
Source code in boilermaker/app.py
chain(*tasks, on_failure=None)
Chain multiple tasks to run sequentially on success.
Creates a workflow where each task runs only if the previous one succeeds. If any task fails, the chain stops and the optional failure handler runs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*tasks
|
Task
|
Task instances to chain together (minimum 2 required) |
()
|
on_failure
|
Task | None
|
Optional task to run if any task in the chain fails |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Task |
Task
|
The first task in the chain (publish this to start the workflow) |
Raises:
| Type | Description |
|---|---|
ValueError
|
If fewer than 2 tasks provided or on_failure is not a Task |
Example:
# Create individual tasks
fetch = app.create_task(fetch_data, url)
process = app.create_task(process_data)
save = app.create_task(save_results)
cleanup = app.create_task(cleanup_on_failure)
# Chain them together
workflow = app.chain(fetch, process, save, on_failure=cleanup)
# Start the workflow
await app.publish_task(workflow)
Source code in boilermaker/app.py
close()
async
create_task(fn, *args, policy=None, **kwargs)
Create a Task instance without publishing it to the queue.
This allows you to set up callbacks, modify task properties, or build workflows before publishing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
TaskHandler
|
Registered async function to create task for |
required |
*args
|
Positional arguments to pass to the function |
()
|
|
policy
|
RetryPolicy | None
|
Optional retry policy override for this task instance |
None
|
**kwargs
|
Keyword arguments to pass to the function |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
Task |
Task
|
Configured task instance ready for publishing |
Raises:
| Type | Description |
|---|---|
ValueError
|
If function is not registered |
Example:
# Create task without publishing
task = app.create_task(send_email, "user@example.com", subject="Welcome")
# Set up callback
task.on_success = app.create_task(track_email_sent)
# Publish when ready
await app.publish_task(task)
Source code in boilermaker/app.py
message_handler(msg, receiver)
async
Process a single Service Bus message containing a Task.
This method is called by the worker loop for each received message. It deserializes the Task, executes the associated function, and handles success, failure, retries, and callbacks.
Args:
msg: The received ServiceBusReceivedMessage containing the Task JSON
Returns:
None
Source code in boilermaker/app.py
publish_graph(graph)
async
Publish a TaskGraph workflow to storage and schedule initial tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph
|
TaskGraph
|
TaskGraph instance to publish |
required |
Returns:
| Name | Type | Description |
|---|---|---|
TaskGraph |
TaskGraph
|
The same graph instance |
Raises:
| Type | Description |
|---|---|
BoilermakerAppException
|
If storage or task publishing fails |
Source code in boilermaker/app.py
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 | |
publish_task(task, delay=0, publish_attempts=1, unique_msg_id=None)
async
Publish a task to the Azure Service Bus queue.
Serializes the task to JSON and sends it to the configured queue. Sets the task's sequence_number after successful publishing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Task
|
Task instance to publish |
required |
delay
|
int
|
Seconds to delay before task becomes visible (default: 0) |
0
|
publish_attempts
|
int
|
Number of retry attempts for publishing (default: 1) |
1
|
Returns:
| Name | Type | Description |
|---|---|---|
Task |
Task
|
The published task instance |
Raises:
| Type | Description |
|---|---|
BoilermakerAppException
|
If publishing fails after all attempts |
Example:
task = app.create_task(my_function, "arg1", kwarg="value")
published = await app.publish_task(task, delay=60) # 1 minute delay
print(f"Published with sequence: {published.sequence_number}")
Source code in boilermaker/app.py
register_async(fn, **options)
Register an async function as a background task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
TaskHandler
|
Async function that takes state as first parameter |
required |
**options
|
Task configuration options including: - policy: RetryPolicy for handling failures - should_dead_letter: Whether to dead letter failed messages - acks_late: Message acknowledgment timing |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
self |
For method chaining |
Raises:
| Type | Description |
|---|---|
ValueError
|
If function is already registered or not async |
Example:
async def send_email(state, recipient: str, subject: str):
await state.email_client.send(recipient, subject)
app.register_async(
send_email,
policy=retries.RetryPolicy(max_tries=3)
)
Source code in boilermaker/app.py
register_many_async(fns, **options)
Register multiple async functions with the same configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fns
|
list[TaskHandler]
|
List of async functions to register |
required |
**options
|
Common task configuration applied to all functions |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
self |
For method chaining |
Example:
tasks = [process_email, send_notification, update_metrics]
app.register_many_async(tasks, policy=retries.NoRetry())
Source code in boilermaker/app.py
run()
async
Start the worker to process tasks from the queue.
This is the main worker loop that: 1. Connects to Azure Service Bus queue 2. Receives messages containing tasks 3. Executes the tasks with registered functions 4. Handles retries, callbacks, and error scenarios 5. Manages graceful shutdown on SIGINT/SIGTERM
The worker runs indefinitely until interrupted. Each message is processed according to its task configuration (retries, callbacks, etc.).
Note
- Run this in a separate process from your main application
- Multiple workers can run in parallel for horizontal scaling
- Workers automatically handle message acknowledgment
- Interrupted workers will abandon current message back to queue
Example:
# In your worker process
app = Boilermaker(app_state, service_bus_client)
# Register your tasks...
await app.run() # Runs forever
Source code in boilermaker/app.py
signal_handler(scope)
async
We would like to reschedule any open messages on SIGINT/SIGTERM
Source code in boilermaker/app.py
task(**options)
Decorator to register an async function as a background task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**options
|
Optional task configuration including 'policy' for retry settings |
{}
|
Returns:
| Type | Description |
|---|---|
|
Decorator function that registers the task and returns the original function |
Example:
@app.task(policy=retries.RetryPolicy.default())
async def process_data(state, item_id: str):
return await state.db.process(item_id)
Source code in boilermaker/app.py
options: show_root_heading: true show_source: true heading_level: 2
Message Lock Renewal
For long-running tasks that may exceed Azure Service Bus message lock duration, it's possible to use renew_message_lock() to maintain exclusive access to the message (and prevent it from being redelivered):
@app.task()
async def process_large_file(state, file_path: str):
"""Process a large file with periodic lock renewal."""
lines = await read_file_lines(file_path)
for i, line in enumerate(lines):
await process_line(line)
# Renew message lock every 50 lines
if i % 50 == 0:
await state.app.renew_message_lock()
return f"Processed {len(lines)} lines"
More information
Use message lock renewal for tasks that take longer than the message-lease duration for your queue.
Consult the Azure documentation for more information.