Skip to content

Clients API Reference

API documentation for Azure service clients.

Types

aio_azure_clients_toolbox.clients.types

Azure Blob Storage

aio_azure_clients_toolbox.clients.azure_blobs

AzureBlobStorageClient(az_storage_url, container_name, credentials)

Parameters:

Name Type Description Default
az_storage_url str

The URI to the storage account.

required
container_name str

The container name for the blob.

required
credentials DefaultAzureCredential

The credentials with which to authenticate.

required
Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
def __init__(
    self,
    az_storage_url: str,
    container_name: str,
    credentials: DefaultAzureCredential,
):
    if not az_storage_url.endswith("/"):
        az_storage_url = f"{az_storage_url}/"

    self.az_storage_url: str = az_storage_url

    if container_name.startswith("/"):
        container_name = container_name[1:]

    self.container_name = container_name
    self.credentials = credentials
delete_blob(blob_name, **kwargs) async

delete a blob from the container.

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required

Raises: AzureBlobError: If the blob cannot be deleted. Returns: None

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def delete_blob(self, blob_name: str, **kwargs) -> None:
    """delete a blob from the container.

    Args:
        blob_name (str): The name of the blob.
    Raises:
        AzureBlobError: If the blob cannot be deleted.
    Returns: None
    """
    async with self.get_blob_client(blob_name) as client:
        return await client.delete_blob(**kwargs)
download_blob(blob_name, **kwargs) async

Download a blob from the container into bytes in memory.

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required

Raises: AzureBlobError: If the blob cannot be downloaded. Returns: bytes: ALL bytes of the blob.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def download_blob(self, blob_name: str, **kwargs) -> bytes:
    """Download a blob from the container into bytes in memory.

    Args:
        blob_name (str): The name of the blob.
    Raises:
        AzureBlobError: If the blob cannot be downloaded.
    Returns:
        bytes: *ALL* bytes of the blob.
    """
    async with self.get_blob_download_stream(blob_name, **kwargs) as stream:
        return await stream.readall()
download_blob_to_dir(workspace_dir, blob_name, **kwargs) async

Download Blob to a workspace_dir.

Parameters:

Name Type Description Default
workspace_dir str

The directory to save the blob.

required
blob_name str

The name of the blob.

required

Raises: AzureBlobError: If the blob cannot be downloaded. Returns: str: The path to the saved blob.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def download_blob_to_dir(self, workspace_dir: str, blob_name: str, **kwargs) -> str:
    """
    Download Blob to a workspace_dir.

    Args:
        workspace_dir (str): The directory to save the blob.
        blob_name (str): The name of the blob.
    Raises:
        AzureBlobError: If the blob cannot be downloaded.
    Returns:
        str: The path to the saved blob.
    """
    save_path = os.path.join(workspace_dir, os.path.basename(blob_name))

    # Write file into file path in tempdir
    async with aiofiles.open(save_path, "wb") as fl:
        async with self.get_blob_client(blob_name) as client:
            stream = await client.download_blob(**kwargs)
            # Read data in chunks to avoid loading all into memory at once
            async for chunk in stream.chunks():
                # `chunk` is a byte array
                await fl.write(chunk)
    return save_path
get_blob_client(blob_name) async

Simple async context manager to get a BlobClient.

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required

Raises: AttributeError: If az_storage_url is not configured. AzureBlobError: If the blob cannot be accessed. Returns: BlobClient: The blob client.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
@asynccontextmanager
async def get_blob_client(self, blob_name: str) -> typing.AsyncIterator[BlobClient]:
    """Simple async context manager to get a BlobClient.

    Args:
        blob_name (str): The name of the blob.
    Raises:
        AttributeError: If `az_storage_url` is not configured.
        AzureBlobError: If the blob cannot be accessed.
    Returns:
        BlobClient: The blob client.
    """
    if not self.az_storage_url:
        raise AttributeError("`az_storage_url` is improperly configured")

    async with self.get_blob_service_client() as blob_service_client:
        client = blob_service_client.get_blob_client(self.container_name, blob_name)
        try:
            yield client
        except HttpResponseError as exc:
            raise AzureBlobError(exc) from exc
get_blob_download_stream(blob_name, **kwargs) async

Async context manager that yields a stream for downloading a blob from the container.

The underlying BlobClient is kept open for the duration of the async with block, so all reads and property access must happen inside it.

This is useful, for example, if you want to stream a blob directly into another service without loading it all into memory at once, or to inspect StorageStreamDownloader.properties (e.g. etag, size, content_settings.content_type, etc.) before consuming the data.

Example::

async with client.get_blob_download_stream("my-blob") as stream:
    etag = stream.properties.etag
    async for chunk in stream.chunks():
        process(chunk)

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required

Raises: AzureBlobError: If the blob cannot be accessed. Yields: StorageStreamDownloader: A stream downloader for the blob.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
@asynccontextmanager
async def get_blob_download_stream(self, blob_name: str, **kwargs) -> typing.AsyncIterator[StorageStreamDownloader]:
    """Async context manager that yields a stream for downloading a blob from the container.

    The underlying BlobClient is kept open for the duration of the `async with` block,
    so all reads and property access must happen inside it.

    This is useful, for example, if you want to stream a blob directly into another service without
    loading it all into memory at once, or to inspect `StorageStreamDownloader.properties`
    (e.g. etag, size, content_settings.content_type, etc.) before consuming the data.

    Example::

        async with client.get_blob_download_stream("my-blob") as stream:
            etag = stream.properties.etag
            async for chunk in stream.chunks():
                process(chunk)

    Args:
        blob_name (str): The name of the blob.
    Raises:
        AzureBlobError: If the blob cannot be accessed.
    Yields:
        StorageStreamDownloader: A stream downloader for the blob.
    """
    async with self.get_blob_client(blob_name) as client:
        try:
            yield await client.download_blob(**kwargs)
        except HttpResponseError as exc:
            raise AzureBlobError(exc) from exc
get_blob_sas_token(blob_name, expiry=None) async

Returns a read-only sas token for the blob with an automatically generated user delegation key. For more than one, it's more efficient to call get_blob_sas_token_list (below).

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required
expiry Optional[datetime]

The expiry time of the token.

None

Returns: str: The sas token.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def get_blob_sas_token(self, blob_name: str, expiry: datetime.datetime | None = None) -> str:
    """
    Returns a read-only sas token for the blob with an automatically generated
    user delegation key. For more than one, it's more efficient to call
    `get_blob_sas_token_list` (below).

    Args:
        blob_name (str): The name of the blob.
        expiry (Optional[datetime.datetime]): The expiry time of the token.
    Returns:
        str: The sas token.
    """
    now = datetime.datetime.now(tz=datetime.UTC)
    if expiry is None:
        expiry = now + datetime.timedelta(hours=1)

    async with self.get_blob_service_client() as blob_service_client:
        user_delegation_key = await blob_service_client.get_user_delegation_key(now, expiry)
        return generate_blob_sas(
            blob_service_client.account_name,
            self.container_name,
            blob_name,
            user_delegation_key=user_delegation_key,
            permission=BlobSasPermissions(read=True),
            expiry=expiry,
        )
get_blob_sas_token_list(blob_names, expiry=None) async

Returns a dict of blob-name -> read-only sas tokens using an automatically generated user delegation key.

This function has the benefit of reusing a single BlobServiceClient for all tokens generated, so it will be a lot quicker than creating a new BlobServiceClient for each name.

Parameters:

Name Type Description Default
blob_names List[str]

A list of blob names.

required
expiry Optional[datetime]

The expiry time of the token.

None

Returns: dict: A dict of blob-name -> sas token.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def get_blob_sas_token_list(
    self,
    blob_names: list[str],
    expiry: datetime.datetime | None = None,
) -> dict[str, str]:
    """
    Returns a dict of blob-name -> read-only sas tokens using an automatically
    generated user delegation key.

    This function has the benefit of reusing a single BlobServiceClient
    for all tokens generated, so it will be a lot quicker than creating a
    new BlobServiceClient for *each* name.

    Args:
        blob_names (List[str]): A list of blob names.
        expiry (Optional[datetime.datetime]): The expiry time of the token.
    Returns:
        dict: A dict of blob-name -> sas token.
    """
    now = datetime.datetime.now(tz=datetime.UTC)
    if expiry is None:
        expiry = now + datetime.timedelta(hours=1)

    async with self.get_blob_service_client() as blob_service_client:
        user_delegation_key = await blob_service_client.get_user_delegation_key(now, expiry)

        tokens = {}
        for blob_name in blob_names:
            token = generate_blob_sas(
                blob_service_client.account_name,
                self.container_name,
                blob_name,
                user_delegation_key=user_delegation_key,
                permission=BlobSasPermissions(read=True),
                expiry=expiry,
            )
            tokens[blob_name] = token
        return tokens
get_blob_sas_url(blob_name, expiry=None) async

Returns a full download URL with sas token

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required
expiry Optional[datetime]

The expiry time of the token.

None

Returns: str: The full download URL with sas token.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def get_blob_sas_url(self, blob_name: str, expiry: datetime.datetime | None = None) -> str:
    """Returns a full download URL with sas token

    Args:
        blob_name (str): The name of the blob.
        expiry (Optional[datetime.datetime]): The expiry time of the token.
    Returns:
        str: The full download URL with sas token.
    """
    sas_token = await self.get_blob_sas_token(blob_name, expiry=expiry)
    return self._make_blob_url(blob_name, sas_token)
get_blob_sas_url_list(blob_names, expiry=None) async

Returns a dict of blob-name -> download URL with sas token

Parameters:

Name Type Description Default
blob_names List[str]

A list of blob names.

required
expiry Optional[datetime]

The expiry time of the token.

None

Returns: dict: A dict of blob-name -> download-URL-with-sas-token.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def get_blob_sas_url_list(
    self,
    blob_names: list[str],
    expiry: datetime.datetime | None = None,
) -> dict[str, str]:
    """
    Returns a dict of blob-name -> download URL with sas token

    Args:
        blob_names (List[str]): A list of blob names.
        expiry (Optional[datetime.datetime]): The expiry time of the token.
    Returns:
        dict: A dict of blob-name -> download-URL-with-sas-token.
    """
    tokens = await self.get_blob_sas_token_list(blob_names, expiry=expiry)
    results = {blob_name: self._make_blob_url(blob_name, token) for blob_name, token in tokens.items()}
    return results
get_blob_service_client()

Simple method to construct BlobServiceClient.

Note: calling async with blob_service_client()... opens a pipeline which will exit afterward. Thus, you need to either open-close a single one of these manually or throw it away after every async context manager session.

Returns: BlobServiceClient: The blob service client.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
def get_blob_service_client(self) -> BlobServiceClient:
    """
    Simple method to construct BlobServiceClient.

    Note: calling `async with blob_service_client()...` *opens*
    a pipeline which will exit afterward. Thus, you need to either
    open-close a single one of these manually or throw it away
    after every async context manager session.

    Args:
        None
    Returns:
        BlobServiceClient: The blob service client.
    """
    return BlobServiceClient(
        self.az_storage_url,
        credential=self.credentials,
    )
list_blobs(prefix=None, **kwargs) async

List blobs in the container: convenience wrapper around ContainerClient.list_blobs. Args: prefix (Optional[str]): The prefix to filter blobs. Returns: AsyncGenerator[BlobProperties]: A generator of blob properties.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def list_blobs(
    self, prefix: str | None = None, **kwargs
) -> AsyncGenerator[BlobProperties]:
    """List blobs in the container: convenience wrapper around ContainerClient.list_blobs.
    Args:
        prefix (Optional[str]): The prefix to filter blobs.
    Returns:
        AsyncGenerator[BlobProperties]: A generator of blob properties.
    """
    async with self.get_blob_service_client() as blob_service_client:
        container_client = blob_service_client.get_container_client(
            self.container_name
        )
        async for blob in container_client.list_blobs(
            name_starts_with=prefix, **kwargs
        ):
            yield blob
safe_blob_name(blob_name, quoting=False) staticmethod

Run a filter on blob names to make them 'safer'.

The most reliable blob names are urlencoded, but it's not strictly required outside of in sas-token-urls.

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required
quoting bool

Whether to urlsafe encode the name.

False

Returns: str: The 'safer' blob name.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
@staticmethod
def safe_blob_name(blob_name: str, quoting=False) -> str:
    """
    Run a filter on blob names to make them 'safer'.

    The most reliable blob names are urlencoded, but it's not strictly required
    outside of in sas-token-urls.

    Args:
        blob_name (str): The name of the blob.
        quoting (bool): Whether to urlsafe encode the name.
    Returns:
        str: The 'safer' blob name.
    """
    return blobify_filename(blob_name, quoting=quoting)
upload_blob(blob_name, file_data, **kwargs) async

Upload a blob to the container.

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required
file_data Union[bytes, str, Iterable, AsyncIterable, IO]

The data to upload.

required
**kwargs

Additional keyword arguments (passed to BlobClient.upload_blob method).

{}

Raises:

Type Description
AzureBlobError

If the blob cannot be uploaded.

Returns: tuple[bool, dict]: A tuple of a boolean indicating success and the result.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def upload_blob(
    self,
    blob_name: str,
    file_data: bytes
    | str
    | typing.Iterable[typing.AnyStr]
    | typing.AsyncIterable[typing.AnyStr]
    | typing.IO[typing.AnyStr],
    **kwargs,
) -> tuple[bool, dict]:
    """Upload a blob to the container.

    Args:
        blob_name (str): The name of the blob.
        file_data (Union[bytes, str, Iterable, AsyncIterable, IO]): The data to upload.
        **kwargs: Additional keyword arguments (passed to `BlobClient.upload_blob method`).

    Raises:
        AzureBlobError: If the blob cannot be uploaded.
    Returns:
        tuple[bool, dict]: A tuple of a boolean indicating success and the result.
    """
    async with self.get_blob_client(blob_name) as client:
        result = await client.upload_blob(
            file_data,
            blob_type="BlockBlob",
            **kwargs,
        )

    if result.get("error_code") is not None:
        return False, result

    return True, result
upload_blob_from_url(blob_name, file_url, overwrite=True) async

Upload a blob from another URL (can be blob-url with a sas-token)

Note: upload_blob_from_url means it will overwrite destination if it exists!

result usually looks like this: { "etag": ""0x8DBBAF4B8A6017C"", "last_modified": "2023-09-21T22:47:23+00:00", "content_md5": null, "client_request_id": "d3e9c022-58d0-11ee-9777-422808c7c565", "request_id": "b855e9cc-701e-0035-7ddd-ec4cc0000000", "version": "2023-08-03", "version_id": "2023-09-21T22:47:23.5730812Z", "date": "2023-09-21T22:47:23+00:00", "request_server_encrypted": true, "encryption_key_sha256": null, "encryption_scope": null }

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required
file_url str

The URL of the file to upload.

required
overwrite bool

Whether to overwrite the destination if it exists.

True

Raises: AzureBlobError: If the blob cannot be uploaded. Returns: dict: The result of the upload request.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def upload_blob_from_url(
    self,
    blob_name: str,
    file_url: str,
    overwrite=True,
):
    """
    Upload a blob from another URL (can be blob-url with a sas-token)

    # Note: upload_blob_from_url means it will *overwrite* destination if it exists!

    `result` usually looks like this:
        {
            "etag": "\"0x8DBBAF4B8A6017C\"",
            "last_modified": "2023-09-21T22:47:23+00:00",
            "content_md5": null,
            "client_request_id": "d3e9c022-58d0-11ee-9777-422808c7c565",
            "request_id": "b855e9cc-701e-0035-7ddd-ec4cc0000000",
            "version": "2023-08-03",
            "version_id": "2023-09-21T22:47:23.5730812Z",
            "date": "2023-09-21T22:47:23+00:00",
            "request_server_encrypted": true,
            "encryption_key_sha256": null,
            "encryption_scope": null
        }

    Args:
        blob_name (str): The name of the blob.
        file_url (str): The URL of the file to upload.
        overwrite (bool): Whether to overwrite the destination if it exists.
    Raises:
        AzureBlobError: If the blob cannot be uploaded.
    Returns:
        dict: The result of the upload request.
    """
    async with self.get_blob_client(blob_name) as client:
        return await client.upload_blob_from_url(file_url, overwrite=overwrite)

blobify_filename(name, quoting=False)

see: https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction#blobs

A blob name can contain any combination of characters.

A blob name must be at least one character long and cannot be more than 1,024 characters long, for blobs in Azure Storage.

Blob names are case-sensitive.

Reserved URL characters must be properly escaped.*

If your account does not have a hierarchical namespace, then the number of path segments comprising the blob name cannot exceed 254. A path segment is the string between consecutive delimiter characters (e.g., the forward slash '/') that corresponds to the name of a virtual directory.

Avoid blob names that end with a dot, a forward slash, a backslash, or a sequence or combination of the these. No path segments should end with a dot.

  • urlsafe encode any blob URLs! Not the names!
Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
def blobify_filename(name: str, quoting=False) -> str:
    """
    see: https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction#blobs

    A blob name can contain any combination of characters.

    A blob name must be at least one character long and cannot
    be more than 1,024 characters long, for blobs in Azure Storage.

    Blob names are case-sensitive.

    Reserved URL characters must be properly escaped.*

    If your account does not have a hierarchical namespace,
    then the number of path segments comprising the blob name cannot exceed 254.
    A path segment is the string between consecutive delimiter characters (e.g., the forward slash '/')
    that corresponds to the name of a virtual directory.

    Avoid blob names that end with a dot, a forward slash, a backslash, or a sequence
    or combination of the these. No path segments should end with a dot.

    * urlsafe encode any blob URLs! Not the names!
    """
    name = name.strip()
    name = DISALLOWED_CHARS_PAT.sub("", name)
    name = chop_trailing_dot(chop_starting_dot(name))
    if quoting:
        return (urllib.parse.quote(name))[:BLOB_NAME_CHAR_LIMIT]
    return name[:BLOB_NAME_CHAR_LIMIT]

Cosmos DB

aio_azure_clients_toolbox.clients.cosmos

Cosmos(endpoint, dbname, container_name, credential_factory, cosmos_client_ttl_seconds=CLIENT_TTL_SECONDS_DEFAULT)

Applications can subclass this class to interact with their container

Source code in aio_azure_clients_toolbox/clients/cosmos.py
def __init__(
    self,
    endpoint: str,
    dbname: str,
    container_name: str,
    credential_factory: CredentialFactory,
    cosmos_client_ttl_seconds: int = CLIENT_TTL_SECONDS_DEFAULT,
):
    self.container_name = container_name
    self.connection_manager = ConnectionManager(
        endpoint,
        dbname,
        container_name,
        credential_factory,
        lifespan_enabled=False,
        cosmos_client_ttl_seconds=cosmos_client_ttl_seconds,
    )
get_container_client() async

This async context manager will yield a container client.

Because making connections is expensive, we'd like to preserve them for a while.

Source code in aio_azure_clients_toolbox/clients/cosmos.py
@asynccontextmanager
async def get_container_client(self):
    """
    This async context manager will yield a container client.

    Because making connections is expensive, we'd like to preserve them
    for a while.
    """
    # If already closed throws
    # AttributeError: 'NoneType' object has no attribute '__aenter__'
    async with self.connection_manager as client:
        yield client

ManagedCosmos(endpoint, dbname, container_name, credential_factory, client_limit=connection_pooling.DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT, max_size=connection_pooling.DEFAULT_MAX_SIZE, max_idle_seconds=CLIENT_IDLE_SECONDS_DEFAULT, max_lifespan_seconds=CLIENT_TTL_SECONDS_DEFAULT, pool_connection_create_timeout=10, pool_get_timeout=60, max_concurrent_creates=None)

Bases: AbstractorConnector

"Managed" version of the above: uses a connection pool to keep connections alive.

Applications can subclass this class to interact with their container

Parameters:

Name Type Description Default
endpoint str

A string URL of the Cosmos server.

required
dbname str

Cosmos database name.

required
container_name str

Cosmos container name.

required
credential_factory CredentialFactory

A callable that returns an async DefaultAzureCredential which may be used to authenticate to the container.

required
client_limit int

Client limit per connection (default: 100).

DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT
max_size int

Connection pool size (default: 10).

DEFAULT_MAX_SIZE
max_idle_seconds int

Maximum duration allowed for an idle connection before recylcing it.

CLIENT_IDLE_SECONDS_DEFAULT
max_lifespan_seconds int

Optional setting which controls how long a connection live before recycling.

CLIENT_TTL_SECONDS_DEFAULT
pool_connection_create_timeout int

Timeout for creating a connection in the pool (default: 10 seconds).

10
pool_get_timeout int

Timeout for getting a connection from the pool (default: 60 seconds).

60
max_concurrent_creates int | None

Max number of connections that can be created simultaneously across all pool slots. Defaults to max(max_size // 3, 1).

None
Source code in aio_azure_clients_toolbox/clients/cosmos.py
def __init__(
    self,
    endpoint: str,
    dbname: str,
    container_name: str,
    credential_factory: CredentialFactory,
    client_limit: int = connection_pooling.DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT,
    max_size: int = connection_pooling.DEFAULT_MAX_SIZE,
    max_idle_seconds: int = CLIENT_IDLE_SECONDS_DEFAULT,
    max_lifespan_seconds: int = CLIENT_TTL_SECONDS_DEFAULT,
    pool_connection_create_timeout: int = 10,
    pool_get_timeout: int = 60,
    max_concurrent_creates: int | None = None,
):
    self.endpoint = endpoint
    self.dbname = dbname
    self.container_name = container_name
    if not callable(credential_factory):
        raise ValueError(
            "credential_factory must be a callable returning a credential"
        )
    self.credential_factory = credential_factory
    self.max_idle_seconds = max_idle_seconds
    self.pool = connection_pooling.ConnectionPool(
        self,
        client_limit=client_limit,
        max_size=max_size,
        max_idle_seconds=max_idle_seconds,
        max_lifespan_seconds=max_lifespan_seconds,
        max_concurrent_creates=max_concurrent_creates,
    )
    self.pool_kwargs = {
        "timeout": pool_get_timeout,
        "acquire_timeout": pool_connection_create_timeout,
    }
close() async

Closes all connections in our pool

Source code in aio_azure_clients_toolbox/clients/cosmos.py
async def close(self):
    """Closes all connections in our pool"""
    await self.pool.closeall()
create() async

Creates a new connection for our pool

Source code in aio_azure_clients_toolbox/clients/cosmos.py
async def create(self):
    """Creates a new connection for our pool"""
    client = SimpleCosmos(
        self.endpoint,
        self.dbname,
        self.container_name,
        self.credential_factory(),
    )
    await client.get_container_client()
    return client
get_container_client() async

This async context manager will yield a container client.

Because making connections is expensive, we'd like to preserve them for a while.

Source code in aio_azure_clients_toolbox/clients/cosmos.py
@asynccontextmanager
async def get_container_client(self):
    """
    This async context manager will yield a container client.

    Because making connections is expensive, we'd like to preserve them
    for a while.
    """
    async with self.pool.get(**self.pool_kwargs) as conn:
        try:
            yield conn
        except RuntimeError as e:
            logger.error(f"RuntimeError occurred; Closing connection: {e}")
            await self.pool.expire_conn(conn)
            raise

SimpleCosmos(endpoint, dbname, container_name, credential)

Applications can subclass this class to keep async connections open

Source code in aio_azure_clients_toolbox/clients/cosmos.py
def __init__(
    self,
    endpoint: str,
    dbname: str,
    container_name: str,
    credential: DefaultAzureCredential,
):
    self.endpoint = endpoint
    self.credential = credential
    self.db_name = dbname
    self.container_name = container_name
    # when these connecttions gets created they will be parked here
    self._container: ContainerProxy | None = None
    self._client: CosmosClient | None = None
    self._db: DatabaseProxy | None = None
get_container_client() async

This method will return a container client.

Source code in aio_azure_clients_toolbox/clients/cosmos.py
async def get_container_client(self) -> "SimpleCosmos":
    """
    This method will return a container client.
    """
    self._client = CosmosClient(self.endpoint, credential=self.credential)
    self._db = self._client.get_database_client(self.db_name)
    self._container = self._db.get_container_client(self.container_name)
    return self

ConnectionManager(endpoint, dbname, container_name, credential_factory, lifespan_enabled=False, cosmos_client_ttl_seconds=CLIENT_TTL_SECONDS_DEFAULT)

Source code in aio_azure_clients_toolbox/clients/cosmos.py
def __init__(
    self,
    endpoint: str,
    dbname: str,
    container_name: str,
    credential_factory: CredentialFactory,
    lifespan_enabled: bool = False,
    cosmos_client_ttl_seconds: int = CLIENT_TTL_SECONDS_DEFAULT,
):
    self.endpoint = endpoint
    if not callable(credential_factory):
        raise ValueError(
            "credential_factory must be a callable returning a credential"
        )

    self.db_name = dbname
    self.container_name = container_name
    self.credential_factory = credential_factory
    self.client_lifespan_seconds = cosmos_client_ttl_seconds
    self.lifespan_enabled = lifespan_enabled
    if self.lifespan_enabled and not self.client_lifespan_seconds:
        raise ValueError(f"Bad value for client lifespan {self.client_lifespan_seconds}")
    if self.lifespan_enabled and not isinstance(self.client_lifespan_seconds, int):
        raise ValueError(f"Bad value for client lifespan {self.client_lifespan_seconds}")
    if self.lifespan_enabled and self.client_lifespan_seconds < 0:
        raise ValueError(f"Client lifespan must be positive: {self.client_lifespan_seconds}")

    # These are clients that must be managed manually
    self._credential = None
    self._client = None
    self._database = None
    self._container = None
    # This is a call to time.monotonic() which can't go backwards
    # and represents *seconds*
    self._client_lifespan_started = None
is_container_closed property

Check if any attributes set to None

__aenter__() async

Here we manage our connection: - if still alive, we return - if needing to recyle, we recyle and create - if not created, we create

Source code in aio_azure_clients_toolbox/clients/cosmos.py
async def __aenter__(self):
    """
    Here we manage our connection:
    - if still alive, we return
    - if needing to recyle, we recyle and create
    - if not created, we create
    """
    try:
        return await self.get_container_client()
    except exceptions.CosmosHttpResponseError as exc:
        raise ValueError("Container client cannot be constructed") from exc
__aexit__(exc_type, exc, tb) async

Close connection if needing to recycle

Source code in aio_azure_clients_toolbox/clients/cosmos.py
async def __aexit__(self, exc_type, exc, tb):
    """Close connection if needing to recycle"""
    if self.should_recycle_container:
        await self.recycle_container()
get_container_client() async

This method will return a container client.

Because making connections is expensive, we'd like to preserve them for a while.

Source code in aio_azure_clients_toolbox/clients/cosmos.py
async def get_container_client(self):
    """
    This method will return a container client.

    Because making connections is expensive, we'd like to preserve them
    for a while.
    """
    if self.should_recycle_container:
        logger.info("Recycling Cosmos client")
        await self.recycle_container()

    if self.is_container_closed:
        logger.info("Creating new Cosmos client")
        self._credential = self.credential_factory()
        self._client = CosmosClient(self.endpoint, credential=self._credential)
        self._database = self._client.get_database_client(self.db_name)
        self._container = self._database.get_container_client(self.container_name)
        self._client_lifespan_started = time.monotonic()

    return self._container

PatchOp

Bases: StrEnum

Following is an example of patch operations

operations = [ {"op": "add", "path": "/favorite_color", "value": "red"}, {"op": "remove", "path": "/ttl"}, {"op": "replace", "path": "/tax_amount", "value": 14}, {"op": "set", "path": "/items/0/discount", "value": 20.0512}, {"op": "incr", "path": "/total_due", "value": 5}, {"op": "move", "from": "/freight", "path": "/service_addition"} ]

Note: set Set operation adds a property if it doesn't already exist (except if there was an Array ) while replace operation fails if the property doesn't exist.

as_op(path, value)

These variables make sense for all but Move

Source code in aio_azure_clients_toolbox/clients/cosmos.py
def as_op(self, path: str, value: str):
    """These variables make sense for all but Move"""
    if self is PatchOp.Move:
        return {"op": self.value, "from": path, "path": value}

    return {"op": self.value, "path": path, "value": value}

Operation(op, path, value) dataclass

For turning patch Operations into instructions Cosmos understands

Event Grid

aio_azure_clients_toolbox.clients.eventgrid

EventGridClient(config, credential=None, async_credential=None)

A generic eventgrid client

This generic eventgrid client provides a few nice features on top of the native azure python client. Primarily it provides a convenient way to configure publishing to multiple topics using a single client.

Example:

```
topic1 = EventGridTopicConfig("topic1", "https://azure.net/topic1")
topic2 = EventGridTopicConfig("topic2", "https://azure.net/topic2")

client_config = EventGridConfig([topic1, topic2])
managed_identity_credential = DefaultAzureCredential() client =
EventGridClient(config, credential=credential)
```

The client run asynchronously or synchronously. To run the client async provide the async_credential arg when creating the client and use the asyncmethods, e.g. client.async_emit_event().

```
from azure.identity.aio import DefaultAzureCredential

credential = DefaultAzureCredential()
topic = EventGridTopicConfig("topic", "https://azure.net/topic")
config = EventGridConfig(topic)
client = EventGridClient(config, async_credential=credential)
await client.async_emit_event("topic", "ident", {},"event-type", "subject")
```

To run the client synchronously, provide the credential arg when creating the client and call non-prefixed functions.

```
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
topic = EventGridTopicConfig("topic", "https://azure.net/topic")
config = EventGridConfig(topic)
client = EventGridClient(config,redential=credential)
client.emit_event"topic", "ident", {}, "event-type", "subject")
```

Internally sync/async versions of the azure eventgrid clients will be called accordingly.

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def __init__(
    self,
    config: EventGridConfig,
    credential: DefaultAzureCredential | None = None,
    async_credential: AsyncDefaultAzureCredential | None = None,
):
    if not credential and not async_credential:
        raise ValueError("Must provide credential or async_credential")

    if credential and async_credential:
        raise ValueError("Must provide only ONE of credential or async_credential")

    self.config = config
    self.credential: DefaultAzureCredential | None = None
    self.async_credential: AsyncDefaultAzureCredential | None = None

    if credential:
        self.credential = credential
        self._init_clients()
    else:
        assert async_credential, "async_credential must be provided if credential is not provided"
        self.async_credential = async_credential
        self._init_async_clients()
async_emit_event(topic, event_type, subject, data, data_version='v1', **kwargs) async

Emit an event grid asynchronously.

Exceptions:

Raises HttpResponseError exception if failed to emit
Source code in aio_azure_clients_toolbox/clients/eventgrid.py
async def async_emit_event(
    self, topic: str, event_type: str, subject: str, data: dict, data_version: str = "v1", **kwargs
) -> None:
    """Emit an event grid asynchronously.

    Exceptions:

        Raises HttpResponseError exception if failed to emit
    """
    event = EventGridEvent(
        data=data, subject=subject, event_type=event_type, data_version=data_version, **kwargs
    )

    client = self.get_async_client(topic)
    return await client.send(event)
emit_event(topic, event_type, subject, data, data_version='v1', **kwargs)

Emit an event grid synchronously.

Exceptions:

Raises HttpResponseError exception if failed to emit
Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def emit_event(
    self, topic: str, event_type: str, subject: str, data: dict, data_version: str = "v1", **kwargs
) -> None:
    """Emit an event grid synchronously.

    Exceptions:

        Raises HttpResponseError exception if failed to emit
    """
    event = EventGridEvent(
        data=data, subject=subject, event_type=event_type, data_version=data_version, **kwargs
    )

    client = self.get_client(topic)
    return client.send(event)
get_async_client(topic)

Get the async azure publisher client for the name topic.

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def get_async_client(self, topic: str) -> AsyncEventGridPublisherClient:
    """Get the async azure publisher client for the name topic."""
    return self.async_clients[topic]
get_client(topic)

Get the azure publisher client for the named topic.

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def get_client(self, topic: str) -> EventGridPublisherClient:
    """Get the azure publisher client for the named topic."""
    return self.clients[topic]
is_sync()

Check if this client is configured to be syncrhonous.

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def is_sync(self):
    """Check if this client is configured to be syncrhonous."""
    return self.async_credential is None

EventGridConfig(topic_configs)

Configuration for all topics available to a single event grid client

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def __init__(self, topic_configs: EventGridTopicConfig | list[EventGridTopicConfig]):
    self.topic_configs = {}

    if isinstance(topic_configs, EventGridTopicConfig):
        topic_configs = [topic_configs]

    for topic_config in topic_configs:
        self.topic_configs[topic_config.name] = topic_config
config(topic)

Get the config for a topic by name.

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def config(self, topic: str) -> EventGridTopicConfig:
    """Get the config for a topic by name."""
    return self.topic_configs[topic]
topics()

Get list of topic names in this config.

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def topics(self) -> list[str]:
    """Get list of topic names in this config."""
    return list(self.topic_configs.keys())
url(topic)

Get the URL for a topic.

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def url(self, topic: str) -> str:
    """Get the URL for a topic."""
    return self.config(topic).url

EventGridTopicConfig(name, url) dataclass

Configuration for one event grid topic subscription

Event Hub

aio_azure_clients_toolbox.clients.eventhub

Eventhub(eventhub_namespace, eventhub_name, credential=None, eventhub_transport_type=TRANSPORT_PURE_AMQP, connection_string=None)

Low-level EventHub producer client without connection pooling.

For connection pooling see ManagedAzureEventhubProducer below.

Parameters:

Name Type Description Default
eventhub_namespace str

Fully-qualified EventHub namespace hostname (e.g. "myns.servicebus.windows.net").

required
eventhub_name str

EventHub name (the "topic").

required
credential DefaultAzureCredential | None

An async DefaultAzureCredential instance. Mutually exclusive with connection_string; exactly one must be supplied.

None
eventhub_transport_type str

Transport protocol. Pass "amqp" (default) for pure AMQP or any other value to use AMQP-over-WebSocket.

TRANSPORT_PURE_AMQP
connection_string str | None

An Azure Event Hubs connection string. Mutually exclusive with credential; exactly one must be supplied.

None
Source code in aio_azure_clients_toolbox/clients/eventhub.py
def __init__(
    self,
    eventhub_namespace: str,
    eventhub_name: str,
    credential: DefaultAzureCredential | None = None,
    eventhub_transport_type: str = TRANSPORT_PURE_AMQP,
    connection_string: str | None = None,
):
    if not any([isinstance(connection_string, str), credential is not None]):
        raise ValueError(
            "credential must be a DefaultAzureCredential or connection_string must be a string"
        )
    self.evhub_namespace = eventhub_namespace
    self.evhub_name = eventhub_name
    self.credential = credential
    self.connection_string = connection_string
    self.transport_type = (
        {}
        if eventhub_transport_type == TRANSPORT_PURE_AMQP
        else {"transport_type": TransportType.AmqpOverWebsocket}
    )
    self._client: EventHubProducerClient | None = self.get_client()
send_event(event, partition_key=None) async

Send a single EventHub event. See send_events_batch for sending multiple events

partition_key will make a particular string identifier "sticky" for a particular partition.

For instance, if you use a Salesforce record identifier as the partition_key then you can ensure that a particular consumer always receives those events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
async def send_event(
    self,
    event: bytes | str,
    partition_key: str | None = None,
) -> EventDataBatch:
    """
    Send a *single* EventHub event. See `send_events_batch` for
    sending multiple events

    `partition_key` will make a particular string identifier
    "sticky" for a particular partition.

    For instance, if you use a Salesforce record identifier as the `partition_key`
    then you can ensure that a _particular consumer_ always receives _those_ events.
    """
    # Create a batch.
    event_data_batch: EventDataBatch = await self.client.create_batch(partition_key=partition_key)

    # Add events to the batch.
    event_data_batch.add(EventData(event))

    # Send the batch of events to the event hub.
    await self.client.send_batch(event_data_batch)

    return event_data_batch
send_event_data(event, partition_key=None) async

Send a single EventHub event which is already encoded as EventData.

partition_key will make a particular string identifier "sticky" for a particular partition.

For instance, if you use a Salesforce record identifier as the partition_key then you can ensure that a particular consumer always receives those events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
async def send_event_data(
    self,
    event: EventData,
    partition_key: str | None = None,
) -> EventDataBatch:
    """
    Send a *single* EventHub event which is already encoded as `EventData`.

    `partition_key` will make a particular string identifier
    "sticky" for a particular partition.

    For instance, if you use a Salesforce record identifier as the `partition_key`
    then you can ensure that a _particular consumer_ always receives _those_ events.
    """
    # Create a batch.
    event_data_batch: EventDataBatch = await self.client.create_batch(partition_key=partition_key)

    # Add events to the batch.
    event_data_batch.add(event)

    # Send the batch of events to the event hub.
    await self.client.send_batch(event_data_batch)

    return event_data_batch
send_events_batch(events_list, partition_key=None) async

Sending events in a batch is more performant than sending individual events.

partition_key will make a particular string identifier "sticky" for a particular partition.

For instance, if you use a Salesforce record identifier as the partition_key then you can ensure that a particular consumer always receives those events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
async def send_events_batch(
    self,
    events_list: list[bytes | str],
    partition_key: str | None = None,
) -> EventDataBatch:
    """
    Sending events in a batch is more performant than sending individual events.

    `partition_key` will make a particular string identifier
    "sticky" for a particular partition.

    For instance, if you use a Salesforce record identifier as the `partition_key`
    then you can ensure that a _particular consumer_ always receives _those_ events.
    """
    # Create a batch.
    event_data_batch: EventDataBatch = await self.client.create_batch(partition_key=partition_key)

    # Add events to the batch.
    for event in events_list:
        event_data_batch.add(EventData(event))

    # Send the batch of events to the event hub.
    await self.client.send_batch(event_data_batch)
    return event_data_batch
send_events_data_batch(event_data_batch) async

Sending events in a batch is more performant than sending individual events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
async def send_events_data_batch(
    self,
    event_data_batch: EventDataBatch,
) -> EventDataBatch:
    """
    Sending events in a batch is more performant than sending individual events.
    """
    # Send the batch of events to the event hub.
    await self.client.send_batch(event_data_batch)
    return event_data_batch

ManagedAzureEventhubProducer(eventhub_namespace, eventhub_name, credential_factory=None, eventhub_transport_type=TRANSPORT_PURE_AMQP, client_limit=connection_pooling.DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT, max_size=connection_pooling.DEFAULT_MAX_SIZE, max_idle_seconds=EVENTHUB_SEND_TTL_SECONDS, ready_message='Connection established', max_lifespan_seconds=None, pool_connection_create_timeout=10, pool_get_timeout=60, connection_string=None, max_concurrent_creates=None)

Bases: AbstractorConnector

Azure Eventhub Producer client with connnection pooling built in.

Parameters:

Name Type Description Default
eventhub_namespace str

String representing the Eventhub namespace.

required
eventhub_name str

Eventhub name (the "topic").

required
credential_factory CredentialFactory | None

A callable that returns an async DefaultAzureCredential. Mutually exclusive with connection_string; exactly one must be supplied.

None
client_limit int

Client limit per connection (default: 100).

DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT
max_size int

Connection pool size (default: 10).

DEFAULT_MAX_SIZE
max_idle_seconds int

Maximum duration allowed for an idle connection before recycling it.

EVENTHUB_SEND_TTL_SECONDS
max_lifespan_seconds int | None

Optional setting which controls how long a connection lives before recycling.

None
pool_connection_create_timeout int

Timeout for creating a connection in the pool (default: 10 seconds).

10
pool_get_timeout int

Timeout for getting a connection from the pool (default: 60 seconds).

60
max_concurrent_creates int | None

Max number of connections that can be created simultaneously across all pool slots. Defaults to max(max_size // 3, 1).

None
ready_message str | bytes | EventData

A string, bytes, or EventData object representing the first "ready" message sent to establish connection.

'Connection established'
connection_string str | None

An Azure Event Hubs connection string. Mutually exclusive with credential_factory; exactly one must be supplied.

None
Source code in aio_azure_clients_toolbox/clients/eventhub.py
def __init__(
    self,
    eventhub_namespace: str,
    eventhub_name: str,
    credential_factory: CredentialFactory | None = None,
    eventhub_transport_type: str = TRANSPORT_PURE_AMQP,
    client_limit: int = connection_pooling.DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT,
    max_size: int = connection_pooling.DEFAULT_MAX_SIZE,
    max_idle_seconds: int = EVENTHUB_SEND_TTL_SECONDS,
    ready_message: str | bytes | EventData = "Connection established",
    max_lifespan_seconds: int | None = None,
    pool_connection_create_timeout: int = 10,
    pool_get_timeout: int = 60,
    connection_string: str | None = None,
    max_concurrent_creates: int | None = None,
):
    self.eventhub_namespace = eventhub_namespace
    self.eventhub_name = eventhub_name
    self.eventhub_transport_type = eventhub_transport_type
    self.connection_string = connection_string
    if not any([isinstance(connection_string, str), callable(credential_factory)]):
        raise ValueError(
            "credential_factory must be a callable returning a credential or connection_string must be a string"
        )
    self.credential_factory = credential_factory
    self.pool = connection_pooling.ConnectionPool(
        self,
        client_limit=client_limit,
        max_size=max_size,
        max_idle_seconds=max_idle_seconds,
        max_lifespan_seconds=max_lifespan_seconds,
        max_concurrent_creates=max_concurrent_creates,
    )
    if not isinstance(ready_message, (str, bytes, EventData)):
        raise ValueError("ready_message must be a string, bytes, or EventData")
    self.ready_message = ready_message

    self.pool_kwargs = {
        "timeout": pool_get_timeout,
        "acquire_timeout": pool_connection_create_timeout,
    }
close() async

Closes all connections in our pool

Source code in aio_azure_clients_toolbox/clients/eventhub.py
async def close(self):
    """Closes all connections in our pool"""
    await self.pool.closeall()
create() async

Creates a new connection for our pool

Source code in aio_azure_clients_toolbox/clients/eventhub.py
async def create(self) -> connection_pooling.AbstractConnection:
    """Creates a new connection for our pool"""
    return cast(connection_pooling.AbstractConnection, Eventhub(
        self.eventhub_namespace,
        self.eventhub_name,
        self.credential_factory() if self.credential_factory is not None else None,
        eventhub_transport_type=self.eventhub_transport_type,
        connection_string=self.connection_string,
    ))
ready(conn) async

Establishes readiness for a new connection

Source code in aio_azure_clients_toolbox/clients/eventhub.py
@connection_pooling.send_time_deco(logger, "Eventhub.ready")
async def ready(self, conn: Eventhub) -> bool:
    """Establishes readiness for a new connection"""
    # Create a batch.
    event_data_batch: EventDataBatch = await conn.create_batch()
    # Prepare ready message as an event
    if isinstance(self.ready_message, EventData):
        event_data_batch.add(self.ready_message)
    else:
        event_data_batch.add(EventData(self.ready_message))
    attempts = 2

    while attempts > 0:
        try:
            # Send the batch of events to the event hub.
            await conn.send_batch(event_data_batch)
            return True
        except AuthenticationError:
            logger.warning("Eventhub readiness check failed due to authentication error. Cancelling.")
            logger.error(f"{traceback.format_exc()}")
            return False
        except EventHubError:
            logger.warning(f"Eventhub readiness check #{3 - attempts} failed; trying again.")
            logger.error(f"{traceback.format_exc()}")
            attempts -= 1

    logger.error("Eventhub readiness check failed. Not ready.")
    return False
send_event(event, partition_key=None) async

Send a single EventHub event. See send_events_batch for sending multiple events

partition_key will make a particular string identifier "sticky" for a particular partition.

For instance, if you use a Salesforce record identifier as the partition_key then you can ensure that a particular consumer always receives those events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
@connection_pooling.send_time_deco(logger, "Eventhub.send_event")
async def send_event(
    self,
    event: bytes | str,
    partition_key: str | None = None,
) -> EventDataBatch:
    """
    Send a *single* EventHub event. See `send_events_batch` for
    sending multiple events

    `partition_key` will make a particular string identifier
    "sticky" for a particular partition.

    For instance, if you use a Salesforce record identifier as the `partition_key`
    then you can ensure that a _particular consumer_ always receives _those_ events.
    """
    async with self.pool.get(**self.pool_kwargs) as conn:
        evhub_conn = cast(Eventhub, conn)
        # Create a batch.
        event_data_batch: EventDataBatch = await evhub_conn.create_batch(partition_key=partition_key)

        # Add events to the batch.
        event_data_batch.add(EventData(event))

        logger.debug("Sending eventhub batch")

        try:
            # Send the batch of events to the event hub.
            await evhub_conn.send_batch(event_data_batch)
        except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError):
            logger.error(f"Error sending event: {event!r}")
            logger.error(f"{traceback.format_exc()}")
            # Mark this connection closed so it won't be reused
            await self.pool.expire_conn(conn)
            raise
        return event_data_batch
send_event_data(event, partition_key=None) async

Send a single EventHub event which is already encoded as EventData.

partition_key will make a particular string identifier "sticky" for a particular partition.

For instance, if you use a Salesforce record identifier as the partition_key then you can ensure that a particular consumer always receives those events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
@connection_pooling.send_time_deco(logger, "Eventhub.send_event_data")
async def send_event_data(
    self,
    event: EventData,
    partition_key: str | None = None,
) -> EventDataBatch:
    """
    Send a *single* EventHub event which is already encoded as `EventData`.

    `partition_key` will make a particular string identifier
    "sticky" for a particular partition.

    For instance, if you use a Salesforce record identifier as the `partition_key`
    then you can ensure that a _particular consumer_ always receives _those_ events.
    """
    async with self.pool.get(**self.pool_kwargs) as conn:
        evhub_conn = cast(Eventhub, conn)
        # Create a batch.
        event_data_batch: EventDataBatch = await evhub_conn.create_batch(partition_key=partition_key)

        # Add events to the batch.
        event_data_batch.add(event)

        logger.debug("Sending eventhub batch")
        # Send the batch of events to the event hub.
        try:
            await evhub_conn.send_batch(event_data_batch)
            return event_data_batch
        except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError):
            logger.error(f"Error sending event: {event}")
            logger.error(f"{traceback.format_exc()}")
            # Mark this connection closed so it won't be reused
            await self.pool.expire_conn(conn)
            raise
send_events_batch(events_list, partition_key=None) async

Sending events in a batch is more performant than sending individual events.

partition_key will make a particular string identifier "sticky" for a particular partition.

For instance, if you use a Salesforce record identifier as the partition_key then you can ensure that a particular consumer always receives those events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
@connection_pooling.send_time_deco(logger, "Eventhub.send_events_batch")
async def send_events_batch(
    self,
    events_list: list[bytes | str],
    partition_key: str | None = None,
) -> EventDataBatch:
    """
    Sending events in a batch is more performant than sending individual events.

    `partition_key` will make a particular string identifier
    "sticky" for a particular partition.

    For instance, if you use a Salesforce record identifier as the `partition_key`
    then you can ensure that a _particular consumer_ always receives _those_ events.
    """
    async with self.pool.get(**self.pool_kwargs) as conn:
        evhub_conn = cast(Eventhub, conn)
        # Create a batch.
        event_data_batch: EventDataBatch = await evhub_conn.create_batch(partition_key=partition_key)

        # Add events to the batch.
        for event in events_list:
            event_data_batch.add(EventData(event))

        logger.debug("Sending eventhub batch")
        # Send the batch of events to the event hub.
        try:
            await evhub_conn.send_batch(event_data_batch)
            return event_data_batch
        except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError):
            logger.error(f"Error sending event: {traceback.format_exc()}")
            # Mark this connection closed so it won't be reused
            await self.pool.expire_conn(conn)
            raise
send_events_data_batch(event_data_batch) async

Sending events in a batch is more performant than sending individual events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
@connection_pooling.send_time_deco(logger, "Eventhub.send_events_data_batch")
async def send_events_data_batch(
    self,
    event_data_batch: EventDataBatch,
) -> EventDataBatch:
    """
    Sending events in a batch is more performant than sending individual events.
    """
    async with self.pool.get(**self.pool_kwargs) as conn:
        evhub_conn = cast(Eventhub, conn)
        logger.debug("Sending eventhub batch")
        # Send the batch of events to the event hub.
        try:
            await evhub_conn.send_batch(event_data_batch)
            return event_data_batch
        except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError):
            logger.error(f"Error sending batch {traceback.format_exc()}")
            # Mark this connection closed so it won't be reused
            await self.pool.expire_conn(conn)
            raise

Service Bus

aio_azure_clients_toolbox.clients.service_bus

service_bus.py

Wrapper class around a ServiceBusClient which allows sending messages or subscribing to a queue.

AzureServiceBus(service_bus_namespace_url, service_bus_queue_name, credential_factory=None, socket_timeout=1, connection_string=None)

Basic AzureServiceBus client without connection pooling.

For connection pooling see ManagedAzureServiceBus below.

Parameters:

Name Type Description Default
service_bus_namespace_url str

String representing the ServiceBus namespace URL.

required
service_bus_queue_name str

Queue name.

required
credential_factory CredentialFactory | None

A callable that returns an async DefaultAzureCredential. Mutually exclusive with connection_string; exactly one must be supplied.

None
socket_timeout float

Socket timeout in seconds (default: 1). Azure's own default is 0.2 s.

1
connection_string str | None

An Azure Service Bus connection string. Mutually exclusive with credential_factory; exactly one must be supplied.

None
Source code in aio_azure_clients_toolbox/clients/service_bus.py
def __init__(
    self,
    service_bus_namespace_url: str,
    service_bus_queue_name: str,
    credential_factory: CredentialFactory | None=None,
    socket_timeout: float = 1,  # Value in seconds. Azure default value is 0.2s
    connection_string: str | None = None,
):
    self.namespace_url = service_bus_namespace_url
    self.queue_name = service_bus_queue_name
    self.connection_string: str | None = connection_string

    if not any([isinstance(connection_string, str), callable(credential_factory)]):
        raise ValueError(
            "credential_factory must be a callable returning a credential or connection_string must be a string"
        )
    self.credential_factory = credential_factory
    self._receiver_client: ServiceBusReceiver | None = None
    self._receiver_credential: DefaultAzureCredential | None = None
    self._sender_client: SendClientCloseWrapper | ServiceBusSender | None = None
    self._socket_timeout: float = socket_timeout
send_message(msg, delay=0, unique_msg_id=None, **msg_kwargs) async

Schedule a message for delivery.

Parameters:

Name Type Description Default
msg str

Message body to send.

required
delay int

Delay in seconds before the message is available for delivery.

0
unique_msg_id str | None

Optional unique Service Bus message_id used for deduplication.

None
**msg_kwargs

Additional keyword arguments forwarded directly to :class:azure.servicebus.ServiceBusMessage (e.g. content_type, correlation_id, subject, partition_key, session_id, reply_to, time_to_live).

{}
Source code in aio_azure_clients_toolbox/clients/service_bus.py
async def send_message(self, msg: str, delay: int = 0, unique_msg_id: str | None = None, **msg_kwargs):
    """Schedule a message for delivery.

    Args:
        msg:
            Message body to send.
        delay:
            Delay in seconds before the message is available for delivery.
        unique_msg_id:
            Optional unique Service Bus ``message_id`` used for deduplication.
        **msg_kwargs:
            Additional keyword arguments forwarded directly to
            :class:`azure.servicebus.ServiceBusMessage` (e.g.
            ``content_type``, ``correlation_id``, ``subject``,
            ``partition_key``, ``session_id``, ``reply_to``,
            ``time_to_live``).
    """
    message = ServiceBusMessage(msg, message_id=unique_msg_id, **msg_kwargs)
    now = datetime.datetime.now(tz=datetime.UTC)
    scheduled_time_utc = now + datetime.timedelta(seconds=delay)
    sender = self.get_sender()
    await sender.schedule_messages(message, scheduled_time_utc)

ManagedAzureServiceBusSender(service_bus_namespace_url, service_bus_queue_name, credential_factory=None, client_limit=connection_pooling.DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT, max_size=connection_pooling.DEFAULT_MAX_SIZE, max_idle_seconds=SERVICE_BUS_SEND_TTL_SECONDS, max_lifespan_seconds=None, ready_message='Connection established', pool_connection_create_timeout=10, pool_get_timeout=60, connection_string=None, max_concurrent_creates=None)

Bases: AbstractorConnector

Azure ServiceBus Sender client with connnection pooling built in.

Parameters:

Name Type Description Default
service_bus_namespace_url str

String representing the ServiceBus namespace URL.

required
service_bus_queue_name str

Queue name (the "topic").

required
credential_factory CredentialFactory | None

A callable that returns an async DefaultAzureCredential. Mutually exclusive with connection_string; exactly one must be supplied.

None
client_limit int

Client limit per connection (default: 100).

DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT
max_size int

Connection pool size (default: 10).

DEFAULT_MAX_SIZE
max_idle_seconds int

Maximum duration allowed for an idle connection before recylcing it.

SERVICE_BUS_SEND_TTL_SECONDS
max_lifespan_seconds int | None

Optional setting which controls how long a connection lives before recycling.

None
pool_connection_create_timeout int

Timeout for creating a connection in the pool (default: 10 seconds).

10
pool_get_timeout int

Timeout for getting a connection from the pool (default: 60 seconds).

60
max_concurrent_creates int | None

Max number of connections that can be created simultaneously across all pool slots. Defaults to max(max_size // 3, 1).

None
ready_message str | bytes

A string or bytes representing the first "ready" message sent to establish connection.

'Connection established'
connection_string str | None

An Azure Service Bus connection string. Mutually exclusive with credential_factory; exactly one must be supplied.

None
Source code in aio_azure_clients_toolbox/clients/service_bus.py
def __init__(
    self,
    service_bus_namespace_url: str,
    service_bus_queue_name: str,
    credential_factory: CredentialFactory | None = None,
    client_limit: int = connection_pooling.DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT,
    max_size: int = connection_pooling.DEFAULT_MAX_SIZE,
    max_idle_seconds: int = SERVICE_BUS_SEND_TTL_SECONDS,
    max_lifespan_seconds: int | None = None,
    ready_message: str | bytes = "Connection established",
    pool_connection_create_timeout: int = 10,
    pool_get_timeout: int = 60,
    connection_string: str | None = None,
    max_concurrent_creates: int | None = None,
):
    self.service_bus_namespace_url = service_bus_namespace_url
    self.service_bus_queue_name = service_bus_queue_name
    self.connection_string = connection_string
    if not any([isinstance(connection_string, str), callable(credential_factory)]):
        raise ValueError(
            "credential_factory must be a callable returning a credential or connection_string must be a string"
        )

    self.credential_factory = credential_factory
    self.pool = connection_pooling.ConnectionPool(
        self,
        client_limit=client_limit,
        max_size=max_size,
        max_idle_seconds=max_idle_seconds,
        max_lifespan_seconds=max_lifespan_seconds,
        max_concurrent_creates=max_concurrent_creates,
    )
    if not isinstance(ready_message, (str, bytes)):
        raise ValueError("ready_message must be a string or bytes")
    self.ready_message = ready_message

    self.pool_kwargs = {
        "timeout": pool_get_timeout,
        "acquire_timeout": pool_connection_create_timeout,
    }
close() async

Closes all connections in our pool

Source code in aio_azure_clients_toolbox/clients/service_bus.py
async def close(self):
    """Closes all connections in our pool"""
    if hasattr(self, '_receiver_client') and self._receiver_client is not None:
        await self._receiver_client.close()
        self._receiver_client = None
    await self.pool.closeall()
create() async

Creates a new connection for our pool

Source code in aio_azure_clients_toolbox/clients/service_bus.py
async def create(self) -> connection_pooling.AbstractConnection:
    """Creates a new connection for our pool"""
    return cast(connection_pooling.AbstractConnection, self.get_sender())
get_receiver()

Proxy for AzureServiceBus.get_receiver. Here for consistency with above class.

Source code in aio_azure_clients_toolbox/clients/service_bus.py
def get_receiver(self) -> ServiceBusReceiver:
    """
    Proxy for AzureServiceBus.get_receiver. Here
    for consistency with above class.
    """
    self._receiver_client = AzureServiceBus(
        self.service_bus_namespace_url,
        self.service_bus_queue_name,
        self.credential_factory,
        connection_string=self.connection_string,
    )
    return self._receiver_client.get_receiver()
ready(conn) async

Establishes readiness for a new connection

Source code in aio_azure_clients_toolbox/clients/service_bus.py
@connection_pooling.send_time_deco(logger, "ServiceBus.ready")
async def ready(self, conn: SendClientCloseWrapper) -> bool:
    """Establishes readiness for a new connection"""
    message = ServiceBusMessage(self.ready_message)
    now = datetime.datetime.now(tz=datetime.UTC)
    attempts = 2
    while attempts > 0:
        try:
            await conn.schedule_messages(message, now)
            return True
        except (ServiceBusAuthorizationError, ServiceBusAuthenticationError):
            # We do not believe these will improve with repeated tries
            logger.error(
                "ServiceBus Authorization or Authentication error. Not ready."
            )
            raise
        except (AttributeError, ServiceBusError, exceptions.AzureError):
            logger.warning(
                f"ServiceBus readiness check #{3 - attempts} failed; trying again."
            )
            logger.error(f"{traceback.format_exc()}")
            attempts -= 1

    logger.error("ServiceBus readiness check failed. Not ready.")
    return False
send_message(msg, delay=0, unique_msg_id=None, **msg_kwargs) async

Schedule a message for delivery using a pooled sender connection.

Parameters:

Name Type Description Default
msg str

Message body to send.

required
delay int

Delay in seconds before the message is available for delivery.

0
unique_msg_id str | None

Optional unique Service Bus message_id used for deduplication.

None
**msg_kwargs

Additional keyword arguments forwarded directly to :class:azure.servicebus.ServiceBusMessage (e.g. content_type, correlation_id, subject, partition_key, session_id, reply_to, time_to_live).

{}
Source code in aio_azure_clients_toolbox/clients/service_bus.py
@connection_pooling.send_time_deco(logger, "ServiceBus.send_message")
async def send_message(self, msg: str, delay: int = 0, unique_msg_id: str | None = None, **msg_kwargs):
    """Schedule a message for delivery using a pooled sender connection.

    Args:
        msg:
            Message body to send.
        delay:
            Delay in seconds before the message is available for delivery.
        unique_msg_id:
            Optional unique Service Bus ``message_id`` used for deduplication.
        **msg_kwargs:
            Additional keyword arguments forwarded directly to
            :class:`azure.servicebus.ServiceBusMessage` (e.g.
            ``content_type``, ``correlation_id``, ``subject``,
            ``partition_key``, ``session_id``, ``reply_to``,
            ``time_to_live``).
    """
    message = ServiceBusMessage(msg, message_id=unique_msg_id, **msg_kwargs)
    now = datetime.datetime.now(tz=datetime.UTC)
    scheduled_time_utc = now + datetime.timedelta(seconds=delay)
    async with self.pool.get(**self.pool_kwargs) as conn:
        try:
            await cast(SendClientCloseWrapper, conn).schedule_messages(
                message, scheduled_time_utc
            )
        except (
            ServiceBusCommunicationError,
            ServiceBusAuthorizationError,
            ServiceBusAuthenticationError,
            ServiceBusConnectionError,
        ):
            logger.exception(
                f"ServiceBus.send_message failed. Expiring connection: {traceback.format_exc()}"
            )
            await self.pool.expire_conn(conn)
            raise