Boilermaker API Reference
boilermaker.app.Boilermaker(state, service_bus_client=None, enable_opentelemetry=False)
Async task runner for Azure Service Bus queues.
Boilermaker allows you to register async functions as background tasks, schedule them for execution, and run workers to process them. It provides retry policies, task chaining, callbacks, and comprehensive error handling.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state
|
Any
|
Shared application state passed to all tasks as first argument |
required |
service_bus_client
|
AzureServiceBus | ManagedAzureServiceBusSender
|
Azure Service Bus client for message handling |
None
|
enable_opentelemetry
|
bool
|
Enable OpenTelemetry tracing (default: False) |
False
|
Example
from boilermaker import Boilermaker from boilermaker.service_bus import AzureServiceBus
Set up ServiceBus client
client = AzureServiceBus.from_config(config)
Create app with shared state
app = Boilermaker({"counter": 0}, client)
Register a task
@app.task() async def my_task(state, message: str): state["counter"] += 1 print(f"Processing: {message}") return "completed"
Schedule a task
await app.apply_async(my_task, "hello world")
Run worker (in separate process)
await app.run()
Source code in boilermaker/app.py
apply_async(fn, *args, delay=0, publish_attempts=1, policy=None, **kwargs)
async
Schedule a task for background execution.
Creates a task and immediately publishes it to the Azure Service Bus queue. This is the most common way to schedule background work.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn
|
TaskHandler
|
Registered async function to execute |
required |
*args
|
Positional arguments for the function |
()
|
|
delay
|
int
|
Seconds to delay before task becomes visible (default: 0) |
0
|
publish_attempts
|
int
|
Retry attempts for publishing failures (default: 1) |
1
|
policy
|
RetryPolicy | None
|
Override retry policy for this task instance |
None
|
**kwargs
|
Keyword arguments for the function |
{}
|
Returns:
Name | Type | Description |
---|---|---|
Task |
Task
|
The published task with sequence_number set |
Raises:
Type | Description |
---|---|
BoilermakerAppException
|
If publishing fails after all attempts |
ValueError
|
If function is not registered |
Example
Simple task scheduling
await app.apply_async(send_email, "user@example.com")
With custom retry policy
await app.apply_async( process_image, image_url="https://example.com/image.jpg", policy=retries.RetryPolicy(max_tries=3) )
Delayed execution (5 minutes)
await app.apply_async(cleanup_temp_files, delay=300)
Source code in boilermaker/app.py
chain(*tasks, on_failure=None)
Chain multiple tasks to run sequentially on success.
Creates a workflow where each task runs only if the previous one succeeds. If any task fails, the chain stops and the optional failure handler runs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*tasks
|
Task
|
Task instances to chain together (minimum 2 required) |
()
|
on_failure
|
Task | None
|
Optional task to run if any task in the chain fails |
None
|
Returns:
Name | Type | Description |
---|---|---|
Task |
Task
|
The first task in the chain (publish this to start the workflow) |
Raises:
Type | Description |
---|---|
ValueError
|
If fewer than 2 tasks provided or on_failure is not a Task |
Example
Create individual tasks
fetch = app.create_task(fetch_data, url) process = app.create_task(process_data) save = app.create_task(save_results) cleanup = app.create_task(cleanup_on_failure)
Chain them together
workflow = app.chain(fetch, process, save, on_failure=cleanup)
Start the workflow
await app.publish_task(workflow)
Source code in boilermaker/app.py
create_task(fn, *args, policy=None, **kwargs)
Create a Task instance without publishing it to the queue.
This allows you to set up callbacks, modify task properties, or build workflows before publishing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn
|
TaskHandler
|
Registered async function to create task for |
required |
*args
|
Positional arguments to pass to the function |
()
|
|
policy
|
RetryPolicy | None
|
Optional retry policy override for this task instance |
None
|
**kwargs
|
Keyword arguments to pass to the function |
{}
|
Returns:
Name | Type | Description |
---|---|---|
Task |
Task
|
Configured task instance ready for publishing |
Raises:
Type | Description |
---|---|
ValueError
|
If function is not registered |
Example
Create task without publishing
task = app.create_task(send_email, "user@example.com", subject="Welcome")
Set up callback
task.on_success = app.create_task(track_email_sent)
Publish when ready
await app.publish_task(task)
Source code in boilermaker/app.py
message_handler(msg, receiver)
async
Individual message handler
Source code in boilermaker/app.py
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 |
|
publish_task(task, delay=0, publish_attempts=1)
async
Publish a task to the Azure Service Bus queue.
Serializes the task to JSON and sends it to the configured queue. Sets the task's sequence_number after successful publishing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task
|
Task
|
Task instance to publish |
required |
delay
|
int
|
Seconds to delay before task becomes visible (default: 0) |
0
|
publish_attempts
|
int
|
Number of retry attempts for publishing (default: 1) |
1
|
Returns:
Name | Type | Description |
---|---|---|
Task |
Task
|
The same task with sequence_number populated |
Raises:
Type | Description |
---|---|
BoilermakerAppException
|
If publishing fails after all attempts |
Example
task = app.create_task(my_function, "arg1", kwarg="value") published = await app.publish_task(task, delay=60) # 1 minute delay print(f"Published with sequence: {published._sequence_number}")
Source code in boilermaker/app.py
register_async(fn, **options)
Register an async function as a background task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn
|
TaskHandler
|
Async function that takes state as first parameter |
required |
**options
|
Task configuration options including: - policy: RetryPolicy for handling failures - should_dead_letter: Whether to dead letter failed messages - acks_late: Message acknowledgment timing |
{}
|
Returns:
Name | Type | Description |
---|---|---|
self |
For method chaining |
Raises:
Type | Description |
---|---|
ValueError
|
If function is already registered or not async |
Example
async def send_email(state, recipient: str, subject: str): await state.email_client.send(recipient, subject)
app.register_async( send_email, policy=retries.RetryPolicy(max_tries=3) )
Source code in boilermaker/app.py
register_many_async(fns, **options)
Register multiple async functions with the same configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fns
|
list[TaskHandler]
|
List of async functions to register |
required |
**options
|
Common task configuration applied to all functions |
{}
|
Returns:
Name | Type | Description |
---|---|---|
self |
For method chaining |
Example
tasks = [process_email, send_notification, update_metrics] app.register_many_async(tasks, policy=retries.NoRetry())
Source code in boilermaker/app.py
renew_message_lock()
async
Renew the lock on the current message being processed.
Source code in boilermaker/app.py
run()
async
Start the worker to process tasks from the queue.
This is the main worker loop that: 1. Connects to Azure Service Bus queue 2. Receives messages containing tasks 3. Executes the tasks with registered functions 4. Handles retries, callbacks, and error scenarios 5. Manages graceful shutdown on SIGINT/SIGTERM
The worker runs indefinitely until interrupted. Each message is processed according to its task configuration (retries, callbacks, etc.).
Note
- Run this in a separate process from your main application
- Multiple workers can run in parallel for horizontal scaling
- Workers automatically handle message acknowledgment
- Interrupted workers will abandon current message back to queue
Example
In your worker process
app = Boilermaker(app_state, service_bus_client)
Register your tasks...
await app.run() # Runs forever
Source code in boilermaker/app.py
signal_handler(receiver, scope)
async
We would like to reschedule any open messages on SIGINT/SIGTERM
Source code in boilermaker/app.py
task(**options)
Decorator to register an async function as a background task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**options
|
Optional task configuration including 'policy' for retry settings |
{}
|
Returns:
Type | Description |
---|---|
Decorator function that registers the task and returns the original function |
Example
@app.task(policy=retries.RetryPolicy.default()) async def process_data(state, item_id: str): return await state.db.process(item_id)
Source code in boilermaker/app.py
task_handler(task, sequence_number)
async
Dynamically look up function requested and then evaluate it.
Source code in boilermaker/app.py
Message Lock Renewal
For long-running tasks that may exceed Azure Service Bus message lock duration, it's possible to use renew_message_lock()
to maintain exclusive access to the message (and prevent it from being redelivered):
@app.task()
async def process_large_file(state, file_path: str):
"""Process a large file with periodic lock renewal."""
lines = await read_file_lines(file_path)
for i, line in enumerate(lines):
await process_line(line)
# Renew message lock every 50 lines
if i % 50 == 0:
await state.app.renew_message_lock()
return f"Processed {len(lines)} lines"
More information
Use message lock renewal for tasks that take longer than the message-lease duration for your queue.
Consult the Azure documentation for more information.