Custom Clients
Patterns for creating custom Azure clients by subclassing the library's base classes.
Subclassing Patterns
Domain-Specific Cosmos Client
from aio_azure_clients_toolbox import ManagedCosmos
from azure.core import MatchConditions
from datetime import datetime
import uuid
class UserRepository(ManagedCosmos):
"""Domain-specific repository for user management."""
def __init__(self, config):
super().__init__(
endpoint=config.cosmos_endpoint,
dbname=config.cosmos_database,
container_name="users",
credential=config.get_credential()
)
async def create_user(self, user_data: dict) -> dict:
"""Create a new user with automatic ID and timestamp generation."""
user_document = {
"id": str(uuid.uuid4()),
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
"version": 1,
**user_data
}
async with self.get_container_client() as container:
return await container.create_item(body=user_document)
async def get_user(self, user_id: str) -> dict | None:
"""Get user by ID, return None if not found."""
try:
async with self.get_container_client() as container:
return await container.read_item(
item=user_id,
partition_key=user_id
)
except exceptions.CosmosResourceNotFoundError:
return None
Event Publishing Client
from aio_azure_clients_toolbox.clients.eventgrid import EventGridClient
import json
from datetime import datetime
from typing import Any
class DomainEventPublisher:
"""Domain-specific event publishing with automatic enrichment."""
def __init__(self, eventgrid_client: EventGridClient, service_name: str):
self.client = eventgrid_client
self.service_name = service_name
async def publish_user_event(self, event_type: str, user_id: str,
data: dict, correlation_id: str = None):
"""Publish user domain events."""
await self._publish_domain_event(
domain="user",
event_type=event_type,
entity_id=user_id,
data=data,
correlation_id=correlation_id
)
Message Queue Client
from aio_azure_clients_toolbox import ManagedAzureServiceBusSender
import json
import asyncio
from datetime import datetime, timedelta
from typing import Callable, Any
class MessageQueueClient(ManagedAzureServiceBusSender):
"""Enhanced Service Bus client with retry and dead letter handling."""
def __init__(self, namespace_url: str, queue_name: str, credential,
max_retries: int = 3, **kwargs):
super().__init__(namespace_url, queue_name, credential, **kwargs)
self.max_retries = max_retries
self.message_handlers = {}
async def send_message_with_retry(self, message: dict,
delay_seconds: int = 0,
priority: str = "normal") -> bool:
"""Send message with automatic retry on failure."""
message_envelope = {
"id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"priority": priority,
"retry_count": 0,
"payload": message
}
for attempt in range(self.max_retries + 1):
try:
await self.send_message(
json.dumps(message_envelope),
delay=delay_seconds
)
return True
except Exception as e:
if attempt == self.max_retries:
# Send to dead letter queue or log
await self._handle_failed_message(message_envelope, e)
return False
# Exponential backoff
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
message_envelope["retry_count"] = attempt + 1
return False