Retry Policies
Retry policies in Boilermaker provide powerful and flexible ways to handle task failures. You can configure how tasks retry, when they retry, and when to give up and run failure callbacks.
Overview
Boilermaker's retry system supports:
- Three retry modes: Fixed, Linear, and Exponential backoff
- Configurable retry counts and delays
- Multiple ways to configure retries: at registration, scheduling, or runtime
- Exception-driven retries: Tasks only retry when they raise
RetryException
Retry Policy Structure
from boilermaker import retries
policy = retries.RetryPolicy(
max_tries=5, # Maximum number of attempts
delay=60, # Base delay in seconds
delay_max=3600, # Maximum delay (cap)
retry_mode=retries.RetryMode.Fixed # Retry mode
)
Retry Modes
Fixed Delay
Same delay between each retry:
policy = retries.RetryPolicy(
max_tries=3,
delay=30, # Always 30 seconds
retry_mode=retries.RetryMode.Fixed
)
# Delays: 30s, 30s, 30s
Linear Backoff
Delay increases linearly with attempt number:
policy = retries.RetryPolicy(
max_tries=4,
delay=30, # Base delay
retry_mode=retries.RetryMode.Linear
)
# Delays: 30s, 60s, 90s, 120s
Exponential Backoff
Delay doubles each time (with jitter to prevent thundering herd):
policy = retries.RetryPolicy(
max_tries=5,
delay=30, # Starting delay
delay_max=600, # Cap at 10 minutes
retry_mode=retries.RetryMode.Exponential
)
# Delays: ~30s, ~60s, ~120s, ~240s, ~480s (with random jitter)
How Retries Work
1. Tasks Only Retry on RetryException
Important: Tasks only retry when they raise a RetryException
. Other exceptions are treated as permanent failures.
from boilermaker.retries import RetryException
async def api_task(state, endpoint: str):
try:
response = await state.http_client.get(endpoint)
return response.json()
except ConnectionError:
# Network issues are usually temporary
raise RetryException("Connection failed, will retry")
except ValidationError:
# Validation errors are permanent
raise # Don't retry
2. Failure Callbacks Run After All Retries
Failure callbacks only execute after all retries have been exhausted:
async def unreliable_task(state):
if random.random() < 0.7: # 70% failure rate
raise RetryException("Random failure")
return "Success!"
async def cleanup_task(state):
print("Task failed permanently, cleaning up...")
# Set up task with failure callback
task = worker.create_task(unreliable_task)
task.on_failure = worker.create_task(cleanup_task)
await worker.publish_task(task)
Three Ways to Configure Retries
1. Default Policy at Registration
Set a default policy when registering the task:
from boilermaker import retries
async def flaky_api_call(state, url: str):
response = await state.http_client.get(url)
if response.status_code >= 500:
raise retries.RetryException("Server error, retrying...")
return response.json()
# Register with default retry policy
worker.register_async(
flaky_api_call,
policy=retries.RetryPolicy(
max_tries=3,
delay=60,
retry_mode=retries.RetryMode.Exponential
)
)
# All invocations will use this policy by default
await worker.apply_async(flaky_api_call, "https://api.example.com/data")
2. Override Policy When Scheduling
Override the default policy for a specific task invocation:
# Use the same task but with different retry behavior
critical_policy = retries.RetryPolicy(
max_tries=10, # More attempts for critical tasks
delay=30,
delay_max=300,
retry_mode=retries.RetryMode.Linear
)
# This specific invocation uses the critical policy
await worker.apply_async(
flaky_api_call,
"https://critical-api.example.com/data",
policy=critical_policy
)
3. Dynamic Policy from Exception
Change the retry policy from within the task:
async def adaptive_task(state, complexity: str):
try:
if complexity == "simple":
return await simple_operation()
else:
return await complex_operation()
except SimpleError:
# Use default policy for simple errors
raise retries.RetryException("Simple error occurred")
except ComplexError:
# Use aggressive retry for complex operations
raise retries.RetryExceptionDefaultExponential(
"Complex operation failed",
max_tries=8,
delay=120,
delay_max=1800 # 30 minutes max
)
Common Retry Patterns
API Integration
async def external_api_task(state, endpoint: str, payload: dict):
"""Task for calling external APIs with appropriate retry logic."""
try:
response = await state.http_client.post(endpoint, json=payload)
if response.status_code == 429: # Rate limited
raise retries.RetryExceptionDefaultLinear(
"Rate limited",
delay=60, # Wait longer for rate limits
max_tries=5
)
elif response.status_code >= 500: # Server error
raise retries.RetryException("Server error")
elif response.status_code >= 400: # Client error
raise ValueError(f"Client error: {response.status_code}") # Don't retry
return response.json()
except ConnectionError:
raise retries.RetryException("Connection failed")
except TimeoutError:
raise retries.RetryException("Request timed out")
# Register with exponential backoff for server errors
worker.register_async(
external_api_task,
policy=retries.RetryPolicy(
max_tries=5,
delay=30,
delay_max=600,
retry_mode=retries.RetryMode.Exponential
)
)
Database Operations
async def database_operation(state, query: str, params: dict):
"""Database task with retry for transient failures."""
try:
async with state.db.transaction():
result = await state.db.execute(query, params)
return result
except DeadlockError:
# Deadlocks are usually resolved quickly
raise retries.RetryExceptionDefaultExponential(
"Database deadlock",
delay=5,
delay_max=60,
max_tries=6
)
except ConnectionError:
# Connection issues might take longer to resolve
raise retries.RetryException("Database connection failed")
except IntegrityError:
# Constraint violations are permanent
raise # Don't retry
worker.register_async(
database_operation,
policy=retries.RetryPolicy(
max_tries=3,
delay=10,
retry_mode=retries.RetryMode.Linear
)
)
File Processing
async def process_file(state, file_path: str):
"""Process a file with retry for I/O issues."""
try:
content = await state.storage.read_file(file_path)
processed = await process_content(content)
await state.storage.write_file(f"{file_path}.processed", processed)
return len(processed)
except FileNotFoundError:
# File doesn't exist - permanent error
raise ValueError(f"File not found: {file_path}")
except PermissionError:
# Permission issues might be temporary (especially in cloud storage)
raise retries.RetryException("Permission denied")
except IOError:
# I/O errors are often temporary
raise retries.RetryException("I/O error occurred")
# Use linear backoff for I/O operations
worker.register_async(
process_file,
policy=retries.RetryPolicy(
max_tries=4,
delay=15,
retry_mode=retries.RetryMode.Linear
)
)
Pre-configured Policies
Boilermaker provides several pre-configured policies:
Default Policy
# Moderate retry with fixed delay
policy = retries.RetryPolicy.default()
# Equivalent to:
# RetryPolicy(max_tries=5, delay=120, delay_max=120, retry_mode=Fixed)
No Retry
# Never retry - useful for tasks that should only run once
policy = retries.NoRetry()
# Equivalent to:
# RetryPolicy(max_tries=1, delay=5, delay_max=5, retry_mode=Fixed)
worker.register_async(send_notification, policy=retries.NoRetry())
Convenience Exception Classes
# Default exponential backoff
raise retries.RetryExceptionDefaultExponential("Temporary failure")
# Default linear backoff
raise retries.RetryExceptionDefaultLinear("Temporary failure")
# Use default policy from task registration
raise retries.RetryExceptionDefault("Temporary failure")
Best Practices
1. Choose Appropriate Retry Modes
- Fixed: Simple scenarios, predictable delays
- Linear: I/O operations, database transactions
- Exponential: API calls, network operations (prevents overwhelming external services)
2. Set Reasonable Limits
# ✅ Good: Reasonable limits
policy = retries.RetryPolicy(
max_tries=5, # Don't retry forever
delay=30, # Not too frequent
delay_max=600 # Cap at 10 minutes
)
# ❌ Avoid: Excessive retries
policy = retries.RetryPolicy(
max_tries=50, # Too many attempts
delay=1, # Too frequent
delay_max=7200 # 2 hours is too long
)
3. Distinguish Error Types
async def smart_retry_task(state, data):
try:
return await risky_operation(data)
except ValidationError:
# Permanent error - don't retry
raise
except RateLimitError:
# Temporary but needs longer delays
raise retries.RetryExceptionDefaultLinear("Rate limited", delay=120)
except NetworkError:
# Temporary, exponential backoff appropriate
raise retries.RetryException("Network error")
4. Monitor and Alert
async def monitored_task(state, operation_id: str):
try:
result = await perform_operation(operation_id)
# Track success metrics
state.metrics.increment("task.success")
return result
except Exception as e:
# Track failure metrics
state.metrics.increment("task.failure")
if isinstance(e, CriticalError):
# Alert on critical errors
await state.alerts.send_alert(f"Critical error in {operation_id}: {e}")
# Decide whether to retry
if isinstance(e, TransientError):
raise retries.RetryException(f"Transient error: {e}")
else:
raise # Permanent failure
5. Test Retry Behavior
# Test your retry logic with controlled failures
async def test_retry_behavior():
failure_count = 0
async def failing_task(state):
nonlocal failure_count
failure_count += 1
if failure_count < 3:
raise retries.RetryException(f"Failure {failure_count}")
return "Success after retries"
worker.register_async(failing_task, policy=retries.RetryPolicy(max_tries=5))
result = await worker.apply_async(failing_task)
# Task should succeed on the 3rd attempt
Next Steps
- Error Handling - Comprehensive error handling strategies
- Callbacks & Chains - Combine retries with workflows
- Quick Start - See retry patterns in action