Error Handling
Handle task failures with retries, callbacks, and dead letter queues.
Retry Policies
Configure retry behavior when registering tasks:
from boilermaker import retries
# Exponential backoff
@app.task(policy=retries.RetryPolicy(
max_tries=5,
delay=30,
delay_max=600,
retry_mode=retries.RetryMode.Exponential
))
async def task_with_retries(state, data: str):
"""Task that retries on failure."""
# Task logic here
pass
# No retries for critical operations
@app.task(policy=retries.NoRetry())
async def no_retry_task(state, payment_id: str):
"""Task that should not retry (e.g., payments)."""
pass
Exception Handling in Tasks
Use RetryException
to trigger retries:
from boilermaker.retries import RetryException
@app.task(policy=retries.RetryPolicy.default())
async def resilient_task(state, data: dict):
"""Task with error handling."""
try:
# Your task logic
result = await some_operation(data)
return result
except TemporaryError as e:
# Retry for temporary errors
raise RetryException(f"Temporary error: {e}")
except PermanentError as e:
# Don't retry, log error
state.errors.append(str(e))
return {"status": "failed", "error": str(e)}
Message Lock Renewal for Long Tasks
For long-running tasks that may exceed the Azure Service Bus message lock duration, renew the lock periodically:
@app.task()
async def long_running_task(state, data_file: str):
"""Process large dataset with lock renewal."""
items = await load_large_dataset(data_file)
for i, item in enumerate(items):
await process_item(item)
# Renew lock every 100 items to prevent timeout
if i % 100 == 0:
await state.app.renew_message_lock()
return f"Processed {len(items)} items"
More information
Use message lock renewal for tasks that take longer than the message-lease duration for your queue.
Consult the Azure documentation for more information.
Success/Failure Callbacks
Chain tasks for error handling workflows.
Note: Results are not automatically passed between tasks.
from boilermaker.task import Task
@app.task()
async def process_data(state, job_id: str):
"""Main task that may fail."""
try:
# Processing logic
result = await process_job(job_id)
state.jobs[job_id] = {"status": "completed", "result": result}
return result
except Exception as e:
state.jobs[job_id] = {"status": "failed", "error": str(e)}
raise
@app.task()
async def handle_success(state, job_id: str):
"""Success callback using job_id to find results."""
job_info = state.jobs[job_id]
# Send success notification
await notify_user(job_id, "completed")
return {"notified": True}
@app.task()
async def handle_failure(state, job_id: str, error_msg: str):
"""Failure callback."""
# Log error and notify admin
await notify_admin(f"Job {job_id} failed: {error_msg}")
return {"admin_notified": True}
# Set up task chain with callbacks
job_id = "job_123"
main_task = Task.si(process_data, job_id)
success_task = Task.si(handle_success, job_id)
failure_task = Task.si(handle_failure, job_id, "Processing failed")
# Chain success and failure paths
main_task.success_callback = success_task
main_task.failure_callback = failure_task
await app.apply_async_task(main_task)
Dead Letter Queues
Configure dead letter behavior:
@app.task(
policy=retries.RetryPolicy(max_tries=3),
should_dead_letter=True # Send failed messages to dead letter queue
)
async def important_task(state, data: dict):
"""Task where failures should be investigated."""
pass
@app.task(
should_dead_letter=False # Don't dead letter, just drop
)
async def optional_task(state, data: dict):
"""Task where failures can be ignored."""
pass