Skip to content

Clients API Reference

API documentation for Azure service clients.

Types

aio_azure_clients_toolbox.clients.types

Azure Blob Storage

aio_azure_clients_toolbox.clients.azure_blobs

AzureBlobStorageClient(az_storage_url, container_name, credentials)

Parameters:

Name Type Description Default
az_storage_url str

The URI to the storage account.

required
container_name str

The container name for the blob.

required
credentials DefaultAzureCredential

The credentials with which to authenticate.

required
Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
def __init__(
    self,
    az_storage_url: str,
    container_name: str,
    credentials: DefaultAzureCredential,
):
    if not az_storage_url.endswith("/"):
        az_storage_url = f"{az_storage_url}/"

    self.az_storage_url: str = az_storage_url

    if container_name.startswith("/"):
        container_name = container_name[1:]

    self.container_name = container_name
    self.credentials = credentials
delete_blob(blob_name) async

delete a blob from the container.

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required

Raises: AzureBlobError: If the blob cannot be deleted. Returns: None

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def delete_blob(self, blob_name: str) -> None:
    """delete a blob from the container.

    Args:
        blob_name (str): The name of the blob.
    Raises:
        AzureBlobError: If the blob cannot be deleted.
    Returns: None
    """
    async with self.get_blob_client(blob_name) as client:
        return await client.delete_blob()
download_blob(blob_name) async

Download a blob from the container into bytes in memory.

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required

Raises: AzureBlobError: If the blob cannot be downloaded. Returns: bytes: ALL bytes of the blob.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def download_blob(self, blob_name: str) -> bytes:
    """Download a blob from the container into bytes in memory.

    Args:
        blob_name (str): The name of the blob.
    Raises:
        AzureBlobError: If the blob cannot be downloaded.
    Returns:
        bytes: *ALL* bytes of the blob.
    """
    async with self.get_blob_client(blob_name) as client:
        stream = await client.download_blob()
        return await stream.readall()
download_blob_to_dir(workspace_dir, blob_name) async

Download Blob to a workspace_dir.

Parameters:

Name Type Description Default
workspace_dir str

The directory to save the blob.

required
blob_name str

The name of the blob.

required

Raises: AzureBlobError: If the blob cannot be downloaded. Returns: str: The path to the saved blob.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def download_blob_to_dir(self, workspace_dir: str, blob_name: str) -> str:
    """
    Download Blob to a workspace_dir.

    Args:
        workspace_dir (str): The directory to save the blob.
        blob_name (str): The name of the blob.
    Raises:
        AzureBlobError: If the blob cannot be downloaded.
    Returns:
        str: The path to the saved blob.
    """
    save_path = os.path.join(workspace_dir, os.path.basename(blob_name))

    # Write file into file path in tempdir
    async with aiofiles.open(save_path, "wb") as fl:
        async with self.get_blob_client(blob_name) as client:
            stream = await client.download_blob()
            # Read data in chunks to avoid loading all into memory at once
            async for chunk in stream.chunks():
                # `chunk` is a byte array
                await fl.write(chunk)
    return save_path
get_blob_client(blob_name) async

Simple async context manager to get a BlobClient.

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required

Raises: AttributeError: If az_storage_url is not configured. AzureBlobError: If the blob cannot be accessed. Returns: BlobClient: The blob client.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
@asynccontextmanager
async def get_blob_client(self, blob_name: str) -> typing.AsyncIterator[BlobClient]:
    """Simple async context manager to get a BlobClient.

    Args:
        blob_name (str): The name of the blob.
    Raises:
        AttributeError: If `az_storage_url` is not configured.
        AzureBlobError: If the blob cannot be accessed.
    Returns:
        BlobClient: The blob client.
    """
    if not self.az_storage_url:
        raise AttributeError("`az_storage_url` is improperly configured")

    async with self.get_blob_service_client() as blob_service_client:
        client = blob_service_client.get_blob_client(self.container_name, blob_name)
        try:
            yield client  # type: ignore
        except HttpResponseError as exc:
            raise AzureBlobError(exc) from exc
get_blob_sas_token(blob_name, expiry=None) async

Returns a read-only sas token for the blob with an automatically generated user delegation key. For more than one, it's more efficient to call get_blob_sas_token_list (below).

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required
expiry Optional[datetime]

The expiry time of the token.

None

Returns: str: The sas token.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def get_blob_sas_token(self, blob_name: str, expiry: datetime.datetime | None = None) -> str:
    """
    Returns a read-only sas token for the blob with an automatically generated
    user delegation key. For more than one, it's more efficient to call
    `get_blob_sas_token_list` (below).

    Args:
        blob_name (str): The name of the blob.
        expiry (Optional[datetime.datetime]): The expiry time of the token.
    Returns:
        str: The sas token.
    """
    now = datetime.datetime.now(tz=datetime.UTC)
    if expiry is None:
        expiry = now + datetime.timedelta(hours=1)

    async with self.get_blob_service_client() as blob_service_client:
        user_delegation_key = await blob_service_client.get_user_delegation_key(now, expiry)
        return generate_blob_sas(
            blob_service_client.account_name,
            self.container_name,
            blob_name,
            user_delegation_key=user_delegation_key,
            permission=BlobSasPermissions(read=True),
            expiry=expiry,
        )
get_blob_sas_token_list(blob_names, expiry=None) async

Returns a dict of blob-name -> read-only sas tokens using an automatically generated user delegation key.

This function has the benefit of reusing a single BlobServiceClient for all tokens generated, so it will be a lot quicker than creating a new BlobServiceClient for each name.

Parameters:

Name Type Description Default
blob_names List[str]

A list of blob names.

required
expiry Optional[datetime]

The expiry time of the token.

None

Returns: dict: A dict of blob-name -> sas token.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def get_blob_sas_token_list(
    self,
    blob_names: list[str],
    expiry: datetime.datetime | None = None,
) -> dict[str, str]:
    """
    Returns a dict of blob-name -> read-only sas tokens using an automatically
    generated user delegation key.

    This function has the benefit of reusing a single BlobServiceClient
    for all tokens generated, so it will be a lot quicker than creating a
    new BlobServiceClient for *each* name.

    Args:
        blob_names (List[str]): A list of blob names.
        expiry (Optional[datetime.datetime]): The expiry time of the token.
    Returns:
        dict: A dict of blob-name -> sas token.
    """
    now = datetime.datetime.now(tz=datetime.UTC)
    if expiry is None:
        expiry = now + datetime.timedelta(hours=1)

    async with self.get_blob_service_client() as blob_service_client:
        user_delegation_key = await blob_service_client.get_user_delegation_key(now, expiry)

        tokens = {}
        for blob_name in blob_names:
            token = generate_blob_sas(
                blob_service_client.account_name,
                self.container_name,
                blob_name,
                user_delegation_key=user_delegation_key,
                permission=BlobSasPermissions(read=True),
                expiry=expiry,
            )
            tokens[blob_name] = token
        return tokens
get_blob_sas_url(blob_name, expiry=None) async

Returns a full download URL with sas token

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required
expiry Optional[datetime]

The expiry time of the token.

None

Returns: str: The full download URL with sas token.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def get_blob_sas_url(self, blob_name: str, expiry: datetime.datetime | None = None) -> str:
    """Returns a full download URL with sas token

    Args:
        blob_name (str): The name of the blob.
        expiry (Optional[datetime.datetime]): The expiry time of the token.
    Returns:
        str: The full download URL with sas token.
    """
    sas_token = await self.get_blob_sas_token(blob_name, expiry=expiry)
    return self._make_blob_url(blob_name, sas_token)
get_blob_sas_url_list(blob_names, expiry=None) async

Returns a dict of blob-name -> download URL with sas token

Parameters:

Name Type Description Default
blob_names List[str]

A list of blob names.

required
expiry Optional[datetime]

The expiry time of the token.

None

Returns: dict: A dict of blob-name -> download-URL-with-sas-token.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def get_blob_sas_url_list(
    self,
    blob_names: list[str],
    expiry: datetime.datetime | None = None,
) -> dict[str, str]:
    """
    Returns a dict of blob-name -> download URL with sas token

    Args:
        blob_names (List[str]): A list of blob names.
        expiry (Optional[datetime.datetime]): The expiry time of the token.
    Returns:
        dict: A dict of blob-name -> download-URL-with-sas-token.
    """
    tokens = await self.get_blob_sas_token_list(blob_names, expiry=expiry)
    results = {blob_name: self._make_blob_url(blob_name, token) for blob_name, token in tokens.items()}
    return results
get_blob_service_client()

Simple method to construct BlobServiceClient.

Note: calling async with blob_service_client()... opens a pipeline which will exit afterward. Thus, you need to either open-close a single one of these manually or throw it away after every async context manager session.

Returns: BlobServiceClient: The blob service client.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
def get_blob_service_client(self) -> BlobServiceClient:
    """
    Simple method to construct BlobServiceClient.

    Note: calling `async with blob_service_client()...` *opens*
    a pipeline which will exit afterward. Thus, you need to either
    open-close a single one of these manually or throw it away
    after every async context manager session.

    Args:
        None
    Returns:
        BlobServiceClient: The blob service client.
    """
    return BlobServiceClient(
        self.az_storage_url,
        credential=self.credentials,  # type: ignore
    )
list_blobs(prefix=None, **kwargs) async

List blobs in the container: convenience wrapper around ContainerClient.list_blobs. Args: prefix (Optional[str]): The prefix to filter blobs. Returns: AsyncGenerator[BlobProperties]: A generator of blob properties.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def list_blobs(
    self, prefix: str | None = None, **kwargs
) -> AsyncGenerator[BlobProperties]:
    """List blobs in the container: convenience wrapper around ContainerClient.list_blobs.
    Args:
        prefix (Optional[str]): The prefix to filter blobs.
    Returns:
        AsyncGenerator[BlobProperties]: A generator of blob properties.
    """
    async with self.get_blob_service_client() as blob_service_client:
        container_client = blob_service_client.get_container_client(
            self.container_name
        )
        async for blob in container_client.list_blobs(
            name_starts_with=prefix, **kwargs
        ):
            yield blob
safe_blob_name(blob_name, quoting=False) staticmethod

Run a filter on blob names to make them 'safer'.

The most reliable blob names are urlencoded, but it's not strictly required outside of in sas-token-urls.

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required
quoting bool

Whether to urlsafe encode the name.

False

Returns: str: The 'safer' blob name.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
@staticmethod
def safe_blob_name(blob_name: str, quoting=False) -> str:
    """
    Run a filter on blob names to make them 'safer'.

    The most reliable blob names are urlencoded, but it's not strictly required
    outside of in sas-token-urls.

    Args:
        blob_name (str): The name of the blob.
        quoting (bool): Whether to urlsafe encode the name.
    Returns:
        str: The 'safer' blob name.
    """
    return blobify_filename(blob_name, quoting=quoting)
upload_blob(blob_name, file_data, **kwargs) async

Upload a blob to the container.

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required
file_data Union[bytes, str, Iterable, AsyncIterable, IO]

The data to upload.

required
**kwargs

Additional keyword arguments (passed to BlobClient.upload_blob method).

{}

Raises:

Type Description
AzureBlobError

If the blob cannot be uploaded.

Returns: tuple[bool, dict]: A tuple of a boolean indicating success and the result.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def upload_blob(
    self,
    blob_name: str,
    file_data: bytes
    | str
    | typing.Iterable[typing.AnyStr]
    | typing.AsyncIterable[typing.AnyStr]
    | typing.IO[typing.AnyStr],
    **kwargs,
) -> tuple[bool, dict]:
    """Upload a blob to the container.

    Args:
        blob_name (str): The name of the blob.
        file_data (Union[bytes, str, Iterable, AsyncIterable, IO]): The data to upload.
        **kwargs: Additional keyword arguments (passed to `BlobClient.upload_blob method`).

    Raises:
        AzureBlobError: If the blob cannot be uploaded.
    Returns:
        tuple[bool, dict]: A tuple of a boolean indicating success and the result.
    """
    async with self.get_blob_client(blob_name) as client:
        result = await client.upload_blob(
            file_data,
            blob_type="BlockBlob",
            **kwargs,  # type: ignore
        )

    if result.get("error_code") is not None:
        return False, result

    return True, result
upload_blob_from_url(blob_name, file_url, overwrite=True) async

Upload a blob from another URL (can be blob-url with a sas-token)

Note: upload_blob_from_url means it will overwrite destination if it exists!

result usually looks like this: { "etag": ""0x8DBBAF4B8A6017C"", "last_modified": "2023-09-21T22:47:23+00:00", "content_md5": null, "client_request_id": "d3e9c022-58d0-11ee-9777-422808c7c565", "request_id": "b855e9cc-701e-0035-7ddd-ec4cc0000000", "version": "2023-08-03", "version_id": "2023-09-21T22:47:23.5730812Z", "date": "2023-09-21T22:47:23+00:00", "request_server_encrypted": true, "encryption_key_sha256": null, "encryption_scope": null }

Parameters:

Name Type Description Default
blob_name str

The name of the blob.

required
file_url str

The URL of the file to upload.

required
overwrite bool

Whether to overwrite the destination if it exists.

True

Raises: AzureBlobError: If the blob cannot be uploaded. Returns: dict: The result of the upload request.

Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
async def upload_blob_from_url(
    self,
    blob_name: str,
    file_url: str,
    overwrite=True,
):
    """
    Upload a blob from another URL (can be blob-url with a sas-token)

    # Note: upload_blob_from_url means it will *overwrite* destination if it exists!

    `result` usually looks like this:
        {
            "etag": "\"0x8DBBAF4B8A6017C\"",
            "last_modified": "2023-09-21T22:47:23+00:00",
            "content_md5": null,
            "client_request_id": "d3e9c022-58d0-11ee-9777-422808c7c565",
            "request_id": "b855e9cc-701e-0035-7ddd-ec4cc0000000",
            "version": "2023-08-03",
            "version_id": "2023-09-21T22:47:23.5730812Z",
            "date": "2023-09-21T22:47:23+00:00",
            "request_server_encrypted": true,
            "encryption_key_sha256": null,
            "encryption_scope": null
        }

    Args:
        blob_name (str): The name of the blob.
        file_url (str): The URL of the file to upload.
        overwrite (bool): Whether to overwrite the destination if it exists.
    Raises:
        AzureBlobError: If the blob cannot be uploaded.
    Returns:
        dict: The result of the upload request.
    """
    async with self.get_blob_client(blob_name) as client:
        return await client.upload_blob_from_url(file_url, overwrite=overwrite)

blobify_filename(name, quoting=False)

see: https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction#blobs

A blob name can contain any combination of characters.

A blob name must be at least one character long and cannot be more than 1,024 characters long, for blobs in Azure Storage.

Blob names are case-sensitive.

Reserved URL characters must be properly escaped.*

If your account does not have a hierarchical namespace, then the number of path segments comprising the blob name cannot exceed 254. A path segment is the string between consecutive delimiter characters (e.g., the forward slash '/') that corresponds to the name of a virtual directory.

Avoid blob names that end with a dot, a forward slash, a backslash, or a sequence or combination of the these. No path segments should end with a dot.

  • urlsafe encode any blob URLs! Not the names!
Source code in aio_azure_clients_toolbox/clients/azure_blobs.py
def blobify_filename(name: str, quoting=False) -> str:
    """
    see: https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction#blobs

    A blob name can contain any combination of characters.

    A blob name must be at least one character long and cannot
    be more than 1,024 characters long, for blobs in Azure Storage.

    Blob names are case-sensitive.

    Reserved URL characters must be properly escaped.*

    If your account does not have a hierarchical namespace,
    then the number of path segments comprising the blob name cannot exceed 254.
    A path segment is the string between consecutive delimiter characters (e.g., the forward slash '/')
    that corresponds to the name of a virtual directory.

    Avoid blob names that end with a dot, a forward slash, a backslash, or a sequence
    or combination of the these. No path segments should end with a dot.

    * urlsafe encode any blob URLs! Not the names!
    """
    name = name.strip()
    name = DISALLOWED_CHARS_PAT.sub("", name)
    name = chop_trailing_dot(chop_starting_dot(name))
    if quoting:
        return (urllib.parse.quote(name))[:BLOB_NAME_CHAR_LIMIT]
    return name[:BLOB_NAME_CHAR_LIMIT]

Cosmos DB

aio_azure_clients_toolbox.clients.cosmos

Cosmos(endpoint, dbname, container_name, credential_factory, cosmos_client_ttl_seconds=CLIENT_TTL_SECONDS_DEFAULT)

Applications can subclass this class to interact with their container

Source code in aio_azure_clients_toolbox/clients/cosmos.py
def __init__(
    self,
    endpoint: str,
    dbname: str,
    container_name: str,
    credential_factory: CredentialFactory,
    cosmos_client_ttl_seconds: int = CLIENT_TTL_SECONDS_DEFAULT,
):
    self.container_name = container_name
    self.connection_manager = ConnectionManager(
        endpoint,
        dbname,
        container_name,
        credential_factory,
        lifespan_enabled=False,
        cosmos_client_ttl_seconds=cosmos_client_ttl_seconds,
    )
get_container_client() async

This async context manager will yield a container client.

Because making connections is expensive, we'd like to preserve them for a while.

Source code in aio_azure_clients_toolbox/clients/cosmos.py
@asynccontextmanager
async def get_container_client(self):
    """
    This async context manager will yield a container client.

    Because making connections is expensive, we'd like to preserve them
    for a while.
    """
    # If already closed throws
    # AttributeError: 'NoneType' object has no attribute '__aenter__'
    async with self.connection_manager as client:
        yield client

ManagedCosmos(endpoint, dbname, container_name, credential_factory, client_limit=connection_pooling.DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT, max_size=connection_pooling.DEFAULT_MAX_SIZE, max_idle_seconds=CLIENT_IDLE_SECONDS_DEFAULT, max_lifespan_seconds=CLIENT_TTL_SECONDS_DEFAULT, pool_connection_create_timeout=10, pool_get_timeout=60)

Bases: AbstractorConnector

"Managed" version of the above: uses a connection pool to keep connections alive.

Applications can subclass this class to interact with their container

Parameters:

Name Type Description Default
endpoint str

A string URL of the Cosmos server.

required
dbname str

Cosmos database name.

required
container_name str

Cosmos container name.

required
credential_factory CredentialFactory

A callable that returns an async DefaultAzureCredential which may be used to authenticate to the container.

required
client_limit int

Client limit per connection (default: 100).

DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT
max_size int

Connection pool size (default: 10).

DEFAULT_MAX_SIZE
max_idle_seconds int

Maximum duration allowed for an idle connection before recylcing it.

CLIENT_IDLE_SECONDS_DEFAULT
max_lifespan_seconds int

Optional setting which controls how long a connection live before recycling.

CLIENT_TTL_SECONDS_DEFAULT
pool_connection_create_timeout int

Timeout for creating a connection in the pool (default: 10 seconds).

10
pool_get_timeout int

Timeout for getting a connection from the pool (default: 60 seconds).

60
Source code in aio_azure_clients_toolbox/clients/cosmos.py
def __init__(
    self,
    endpoint: str,
    dbname: str,
    container_name: str,
    credential_factory: CredentialFactory,
    client_limit: int = connection_pooling.DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT,
    max_size: int = connection_pooling.DEFAULT_MAX_SIZE,
    max_idle_seconds: int = CLIENT_IDLE_SECONDS_DEFAULT,
    max_lifespan_seconds: int = CLIENT_TTL_SECONDS_DEFAULT,
    pool_connection_create_timeout: int = 10,
    pool_get_timeout: int = 60,
):
    self.endpoint = endpoint
    self.dbname = dbname
    self.container_name = container_name
    if not callable(credential_factory):
        raise ValueError(
            "credential_factory must be a callable returning a credential"
        )
    self.credential_factory = credential_factory
    self.max_idle_seconds = max_idle_seconds
    self.pool = connection_pooling.ConnectionPool(
        self,
        client_limit=client_limit,
        max_size=max_size,
        max_idle_seconds=max_idle_seconds,
        max_lifespan_seconds=max_lifespan_seconds,
    )
    self.pool_kwargs = {
        "timeout": pool_get_timeout,
        "acquire_timeout": pool_connection_create_timeout,
    }
close() async

Closes all connections in our pool

Source code in aio_azure_clients_toolbox/clients/cosmos.py
async def close(self):
    """Closes all connections in our pool"""
    await self.pool.closeall()
create() async

Creates a new connection for our pool

Source code in aio_azure_clients_toolbox/clients/cosmos.py
async def create(self):
    """Creates a new connection for our pool"""
    client = SimpleCosmos(
        self.endpoint,
        self.dbname,
        self.container_name,
        self.credential_factory(),
    )
    await client.get_container_client()
    return client
get_container_client() async

This async context manager will yield a container client.

Because making connections is expensive, we'd like to preserve them for a while.

Source code in aio_azure_clients_toolbox/clients/cosmos.py
@asynccontextmanager
async def get_container_client(self):
    """
    This async context manager will yield a container client.

    Because making connections is expensive, we'd like to preserve them
    for a while.
    """
    async with self.pool.get(**self.pool_kwargs) as conn:
        try:
            yield conn
        except RuntimeError as e:
            logger.error(f"RuntimeError occurred; Closing connection: {e}")
            await self.pool.expire_conn(conn)
            raise

SimpleCosmos(endpoint, dbname, container_name, credential)

Applications can subclass this class to keep async connections open

Source code in aio_azure_clients_toolbox/clients/cosmos.py
def __init__(
    self,
    endpoint: str,
    dbname: str,
    container_name: str,
    credential: DefaultAzureCredential,
):
    self.endpoint = endpoint
    self.credential = credential
    self.db_name = dbname
    self.container_name = container_name
    # when these connecttions gets created they will be parked here
    self._container: ContainerProxy | None = None
    self._client: CosmosClient | None = None
    self._db: DatabaseProxy | None = None
get_container_client() async

This method will return a container client.

Source code in aio_azure_clients_toolbox/clients/cosmos.py
async def get_container_client(self) -> "SimpleCosmos":
    """
    This method will return a container client.
    """
    self._client = CosmosClient(self.endpoint, credential=self.credential)
    self._db = self._client.get_database_client(self.db_name)
    self._container = self._db.get_container_client(self.container_name)
    return self

ConnectionManager(endpoint, dbname, container_name, credential_factory, lifespan_enabled=False, cosmos_client_ttl_seconds=CLIENT_TTL_SECONDS_DEFAULT)

Source code in aio_azure_clients_toolbox/clients/cosmos.py
def __init__(
    self,
    endpoint: str,
    dbname: str,
    container_name: str,
    credential_factory: CredentialFactory,
    lifespan_enabled: bool = False,
    cosmos_client_ttl_seconds: int = CLIENT_TTL_SECONDS_DEFAULT,
):
    self.endpoint = endpoint
    if not callable(credential_factory):
        raise ValueError(
            "credential_factory must be a callable returning a credential"
        )

    self.db_name = dbname
    self.container_name = container_name
    self.credential_factory = credential_factory
    self.client_lifespan_seconds = cosmos_client_ttl_seconds
    self.lifespan_enabled = lifespan_enabled
    if self.lifespan_enabled and not self.client_lifespan_seconds:
        raise ValueError(f"Bad value for client lifespan {self.client_lifespan_seconds}")
    if self.lifespan_enabled and not isinstance(self.client_lifespan_seconds, int):
        raise ValueError(f"Bad value for client lifespan {self.client_lifespan_seconds}")
    if self.lifespan_enabled and self.client_lifespan_seconds < 0:
        raise ValueError(f"Client lifespan must be positive: {self.client_lifespan_seconds}")

    # These are clients that must be managed manually
    self._credential = None
    self._client = None
    self._database = None
    self._container = None
    # This is a call to time.monotonic() which can't go backwards
    # and represents *seconds*
    self._client_lifespan_started = None
is_container_closed property

Check if any attributes set to None

__aenter__() async

Here we manage our connection: - if still alive, we return - if needing to recyle, we recyle and create - if not created, we create

Source code in aio_azure_clients_toolbox/clients/cosmos.py
async def __aenter__(self):
    """
    Here we manage our connection:
    - if still alive, we return
    - if needing to recyle, we recyle and create
    - if not created, we create
    """
    try:
        return await self.get_container_client()
    except exceptions.CosmosHttpResponseError as exc:
        raise ValueError("Container client cannot be constructed") from exc
__aexit__(exc_type, exc, tb) async

Close connection if needing to recycle

Source code in aio_azure_clients_toolbox/clients/cosmos.py
async def __aexit__(self, exc_type, exc, tb):
    """Close connection if needing to recycle"""
    if self.should_recycle_container:
        await self.recycle_container()
get_container_client() async

This method will return a container client.

Because making connections is expensive, we'd like to preserve them for a while.

Source code in aio_azure_clients_toolbox/clients/cosmos.py
async def get_container_client(self):
    """
    This method will return a container client.

    Because making connections is expensive, we'd like to preserve them
    for a while.
    """
    if self.should_recycle_container:
        logger.info("Recycling Cosmos client")
        await self.recycle_container()

    if self.is_container_closed:
        logger.info("Creating new Cosmos client")
        self._credential = self.credential_factory()
        self._client = CosmosClient(self.endpoint, credential=self._credential)
        self._database = self._client.get_database_client(self.db_name)
        self._container = self._database.get_container_client(self.container_name)
        self._client_lifespan_started = time.monotonic()

    return self._container

PatchOp

Bases: str, Enum

Following is an example of patch operations

operations = [ {"op": "add", "path": "/favorite_color", "value": "red"}, {"op": "remove", "path": "/ttl"}, {"op": "replace", "path": "/tax_amount", "value": 14}, {"op": "set", "path": "/items/0/discount", "value": 20.0512}, {"op": "incr", "path": "/total_due", "value": 5}, {"op": "move", "from": "/freight", "path": "/service_addition"} ]

Note: set Set operation adds a property if it doesn't already exist (except if there was an Array ) while replace operation fails if the property doesn't exist.

as_op(path, value)

These variables make sense for all but Move

Source code in aio_azure_clients_toolbox/clients/cosmos.py
def as_op(self, path: str, value: str):
    """These variables make sense for all but Move"""
    if self is PatchOp.Move:
        return {"op": self.value, "from": path, "path": value}

    return {"op": self.value, "path": path, "value": value}

Operation(op, path, value) dataclass

For turning patch Operations into instructions Cosmos understands

Event Grid

aio_azure_clients_toolbox.clients.eventgrid

EventGridClient(config, credential=None, async_credential=None)

A generic eventgrid client

This generic eventgrid client provides a few nice features on top of the native azure python client. Primarily it provides a convenient way to configure publishing to multiple topics using a single client.

Example:

```
topic1 = EventGridTopicConfig("topic1", "https://azure.net/topic1")
topic2 = EventGridTopicConfig("topic2", "https://azure.net/topic2")

client_config = EventGridConfig([topic1, topic2])
managed_identity_credential = DefaultAzureCredential() client =
EventGridClient(config, credential=credential)
```

The client run asynchronously or synchronously. To run the client async provide the async_credential arg when creating the client and use the asyncmethods, e.g. client.async_emit_event().

```
from azure.identity.aio import DefaultAzureCredential

credential = DefaultAzureCredential()
topic = EventGridTopicConfig("topic", "https://azure.net/topic")
config = EventGridConfig(topic)
client = EventGridClient(config, async_credential=credential)
await client.async_emit_event("topic", "ident", {},"event-type", "subject")
```

To run the client synchronously, provide the credential arg when creating the client and call non-prefixed functions.

```
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
topic = EventGridTopicConfig("topic", "https://azure.net/topic")
config = EventGridConfig(topic)
client = EventGridClient(config,redential=credential)
client.emit_event"topic", "ident", {}, "event-type", "subject")
```

Internally sync/async versions of the azure eventgrid clients will be called accordingly.

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def __init__(
    self,
    config: EventGridConfig,
    credential: DefaultAzureCredential | None = None,
    async_credential: AsyncDefaultAzureCredential | None = None,
):
    if not credential and not async_credential:
        raise ValueError("Must provide credential or async_credential")

    if credential and async_credential:
        raise ValueError("Must provide only ONE of credential or async_credential")

    self.config = config
    self.credential: DefaultAzureCredential | None = None
    self.async_credential: AsyncDefaultAzureCredential | None = None

    if credential:
        self.credential = credential
        self._init_clients()
    else:
        self.async_credential = async_credential
        self._init_async_clients()
async_emit_event(topic, event_type, subject, data, data_version='v1', **kwargs) async

Emit an event grid asynchronously.

Exceptions:

Raises HttpResponseError exception if failed to emit
Source code in aio_azure_clients_toolbox/clients/eventgrid.py
async def async_emit_event(
    self, topic: str, event_type: str, subject: str, data: dict, data_version: str = "v1", **kwargs
) -> None:
    """Emit an event grid asynchronously.

    Exceptions:

        Raises HttpResponseError exception if failed to emit
    """
    event = EventGridEvent(
        data=data, subject=subject, event_type=event_type, data_version=data_version, **kwargs
    )

    client = self.get_async_client(topic)
    return await client.send(event)
emit_event(topic, event_type, subject, data, data_version='v1', **kwargs)

Emit an event grid synchronously.

Exceptions:

Raises HttpResponseError exception if failed to emit
Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def emit_event(
    self, topic: str, event_type: str, subject: str, data: dict, data_version: str = "v1", **kwargs
) -> None:
    """Emit an event grid synchronously.

    Exceptions:

        Raises HttpResponseError exception if failed to emit
    """
    event = EventGridEvent(
        data=data, subject=subject, event_type=event_type, data_version=data_version, **kwargs
    )

    client = self.get_client(topic)
    return client.send(event)
get_async_client(topic)

Get the async azure publisher client for the name topic.

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def get_async_client(self, topic: str) -> AsyncEventGridPublisherClient:
    """Get the async azure publisher client for the name topic."""
    return self.async_clients[topic]
get_client(topic)

Get the azure publisher client for the named topic.

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def get_client(self, topic: str) -> EventGridPublisherClient:
    """Get the azure publisher client for the named topic."""
    return self.clients[topic]
is_sync()

Check if this client is configured to be syncrhonous.

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def is_sync(self):
    """Check if this client is configured to be syncrhonous."""
    return self.async_credential is None

EventGridConfig(topic_configs)

Configuration for all topics available to a single event grid client

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def __init__(self, topic_configs: EventGridTopicConfig | list[EventGridTopicConfig]):
    self.topic_configs = {}

    if isinstance(topic_configs, EventGridTopicConfig):
        topic_configs = [topic_configs]

    for topic_config in topic_configs:
        self.topic_configs[topic_config.name] = topic_config
config(topic)

Get the config for a topic by name.

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def config(self, topic: str) -> EventGridTopicConfig:
    """Get the config for a topic by name."""
    return self.topic_configs[topic]
topics()

Get list of topic names in this config.

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def topics(self) -> list[str]:
    """Get list of topic names in this config."""
    return list(self.topic_configs.keys())
url(topic)

Get the URL for a topic.

Source code in aio_azure_clients_toolbox/clients/eventgrid.py
def url(self, topic: str) -> str:
    """Get the URL for a topic."""
    return self.config(topic).url

EventGridTopicConfig(name, url) dataclass

Configuration for one event grid topic subscription

Event Hub

aio_azure_clients_toolbox.clients.eventhub

Eventhub(eventhub_namespace, eventhub_name, credential, eventhub_transport_type=TRANSPORT_PURE_AMQP)

Source code in aio_azure_clients_toolbox/clients/eventhub.py
def __init__(
    self,
    eventhub_namespace: str,
    eventhub_name: str,
    credential: DefaultAzureCredential,
    eventhub_transport_type: str = TRANSPORT_PURE_AMQP,
):
    self.evhub_namespace = eventhub_namespace
    self.evhub_name = eventhub_name
    self.credential = credential
    self.transport_type = (
        {}
        if eventhub_transport_type == TRANSPORT_PURE_AMQP
        else {"transport_type": TransportType.AmqpOverWebsocket}
    )
    self._client: EventHubProducerClient | None = self.get_client()
send_event(event, partition_key=None) async

Send a single EventHub event. See send_events_batch for sending multiple events

partition_key will make a particular string identifier "sticky" for a particular partition.

For instance, if you use a Salesforce record identifier as the partition_key then you can ensure that a particular consumer always receives those events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
async def send_event(
    self,
    event: bytes | str,
    partition_key: str | None = None,
) -> EventDataBatch:
    """
    Send a *single* EventHub event. See `send_events_batch` for
    sending multiple events

    `partition_key` will make a particular string identifier
    "sticky" for a particular partition.

    For instance, if you use a Salesforce record identifier as the `partition_key`
    then you can ensure that a _particular consumer_ always receives _those_ events.
    """
    # Create a batch.
    event_data_batch: EventDataBatch = await self.client.create_batch(partition_key=partition_key)

    # Add events to the batch.
    event_data_batch.add(EventData(event))

    # Send the batch of events to the event hub.
    await self.client.send_batch(event_data_batch)

    return event_data_batch
send_event_data(event, partition_key=None) async

Send a single EventHub event which is already encoded as EventData.

partition_key will make a particular string identifier "sticky" for a particular partition.

For instance, if you use a Salesforce record identifier as the partition_key then you can ensure that a particular consumer always receives those events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
async def send_event_data(
    self,
    event: EventData,
    partition_key: str | None = None,
) -> EventDataBatch:
    """
    Send a *single* EventHub event which is already encoded as `EventData`.

    `partition_key` will make a particular string identifier
    "sticky" for a particular partition.

    For instance, if you use a Salesforce record identifier as the `partition_key`
    then you can ensure that a _particular consumer_ always receives _those_ events.
    """
    # Create a batch.
    event_data_batch: EventDataBatch = await self.client.create_batch(partition_key=partition_key)

    # Add events to the batch.
    event_data_batch.add(event)

    # Send the batch of events to the event hub.
    await self.client.send_batch(event_data_batch)

    return event_data_batch
send_events_batch(events_list, partition_key=None) async

Sending events in a batch is more performant than sending individual events.

partition_key will make a particular string identifier "sticky" for a particular partition.

For instance, if you use a Salesforce record identifier as the partition_key then you can ensure that a particular consumer always receives those events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
async def send_events_batch(
    self,
    events_list: list[bytes | str],
    partition_key: str | None = None,
) -> EventDataBatch:
    """
    Sending events in a batch is more performant than sending individual events.

    `partition_key` will make a particular string identifier
    "sticky" for a particular partition.

    For instance, if you use a Salesforce record identifier as the `partition_key`
    then you can ensure that a _particular consumer_ always receives _those_ events.
    """
    # Create a batch.
    event_data_batch: EventDataBatch = await self.client.create_batch(partition_key=partition_key)

    # Add events to the batch.
    for event in events_list:
        event_data_batch.add(EventData(event))

    # Send the batch of events to the event hub.
    await self.client.send_batch(event_data_batch)
    return event_data_batch
send_events_data_batch(event_data_batch) async

Sending events in a batch is more performant than sending individual events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
async def send_events_data_batch(
    self,
    event_data_batch: EventDataBatch,
) -> EventDataBatch:
    """
    Sending events in a batch is more performant than sending individual events.
    """
    # Send the batch of events to the event hub.
    await self.client.send_batch(event_data_batch)
    return event_data_batch

ManagedAzureEventhubProducer(eventhub_namespace, eventhub_name, credential_factory, eventhub_transport_type=TRANSPORT_PURE_AMQP, client_limit=connection_pooling.DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT, max_size=connection_pooling.DEFAULT_MAX_SIZE, max_idle_seconds=EVENTHUB_SEND_TTL_SECONDS, ready_message='Connection established', max_lifespan_seconds=None, pool_connection_create_timeout=10, pool_get_timeout=60)

Bases: AbstractorConnector

Azure Eventhub Producer client with connnection pooling built in.

Parameters:

Name Type Description Default
eventhub_namespace str

String representing the Eventhub namespace.

required
eventhub_name str

Eventhub name (the "topic").

required
credential_factory CredentialFactory

A callable that returns an async DefaultAzureCredential which may be used to authenticate to the container.

required
client_limit int

Client limit per connection (default: 100).

DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT
max_size int

Connection pool size (default: 10).

DEFAULT_MAX_SIZE
max_idle_seconds int

Maximum duration allowed for an idle connection before recycling it.

EVENTHUB_SEND_TTL_SECONDS
max_lifespan_seconds int | None

Optional setting which controls how long a connection lives before recycling.

None
pool_connection_create_timeout int

Timeout for creating a connection in the pool (default: 10 seconds).

10
pool_get_timeout int

Timeout for getting a connection from the pool (default: 60 seconds).

60
ready_message str | bytes | EventData

A string, bytes, or EventData object representing the first "ready" message sent to establish connection.

'Connection established'
Source code in aio_azure_clients_toolbox/clients/eventhub.py
def __init__(
    self,
    eventhub_namespace: str,
    eventhub_name: str,
    credential_factory: CredentialFactory,
    eventhub_transport_type: str = TRANSPORT_PURE_AMQP,
    client_limit: int = connection_pooling.DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT,
    max_size: int = connection_pooling.DEFAULT_MAX_SIZE,
    max_idle_seconds: int = EVENTHUB_SEND_TTL_SECONDS,
    ready_message: str | bytes | EventData = "Connection established",
    max_lifespan_seconds: int | None = None,
    pool_connection_create_timeout: int = 10,
    pool_get_timeout: int = 60,
):
    self.eventhub_namespace = eventhub_namespace
    self.eventhub_name = eventhub_name
    self.eventhub_transport_type = eventhub_transport_type
    if not callable(credential_factory):
        raise ValueError(
            "credential_factory must be a callable returning a credential"
        )
    self.credential_factory = credential_factory
    self.pool = connection_pooling.ConnectionPool(
        self,
        client_limit=client_limit,
        max_size=max_size,
        max_idle_seconds=max_idle_seconds,
        max_lifespan_seconds=max_lifespan_seconds,
    )
    if not isinstance(ready_message, (str, bytes, EventData)):
        raise ValueError("ready_message must be a string, bytes, or EventData")
    self.ready_message = ready_message

    self.pool_kwargs = {
        "timeout": pool_get_timeout,
        "acquire_timeout": pool_connection_create_timeout,
    }
close() async

Closes all connections in our pool

Source code in aio_azure_clients_toolbox/clients/eventhub.py
async def close(self):
    """Closes all connections in our pool"""
    await self.pool.closeall()
create() async

Creates a new connection for our pool

Source code in aio_azure_clients_toolbox/clients/eventhub.py
async def create(self) -> Eventhub:
    """Creates a new connection for our pool"""
    return Eventhub(
        self.eventhub_namespace,
        self.eventhub_name,
        self.credential_factory(),
        eventhub_transport_type=self.eventhub_transport_type,
    )
ready(conn) async

Establishes readiness for a new connection

Source code in aio_azure_clients_toolbox/clients/eventhub.py
@connection_pooling.send_time_deco(logger, "Eventhub.ready")
async def ready(self, conn: Eventhub) -> bool:
    """Establishes readiness for a new connection"""
    # Create a batch.
    event_data_batch: EventDataBatch = await conn.create_batch()
    # Prepare ready message as an event
    if isinstance(self.ready_message, EventData):
        event_data_batch.add(self.ready_message)
    else:
        event_data_batch.add(EventData(self.ready_message))
    attempts = 2

    while attempts > 0:
        try:
            # Send the batch of events to the event hub.
            await conn.send_batch(event_data_batch)
            return True
        except AuthenticationError:
            logger.warning("Eventhub readiness check failed due to authentication error. Cancelling.")
            logger.error(f"{traceback.format_exc()}")
            return False
        except EventHubError:
            logger.warning(f"Eventhub readiness check #{3 - attempts} failed; trying again.")
            logger.error(f"{traceback.format_exc()}")
            attempts -= 1

    logger.error("Eventhub readiness check failed. Not ready.")
    return False
send_event(event, partition_key=None) async

Send a single EventHub event. See send_events_batch for sending multiple events

partition_key will make a particular string identifier "sticky" for a particular partition.

For instance, if you use a Salesforce record identifier as the partition_key then you can ensure that a particular consumer always receives those events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
@connection_pooling.send_time_deco(logger, "Eventhub.send_event")
async def send_event(
    self,
    event: bytes | str,
    partition_key: str | None = None,
) -> EventDataBatch:
    """
    Send a *single* EventHub event. See `send_events_batch` for
    sending multiple events

    `partition_key` will make a particular string identifier
    "sticky" for a particular partition.

    For instance, if you use a Salesforce record identifier as the `partition_key`
    then you can ensure that a _particular consumer_ always receives _those_ events.
    """
    async with self.pool.get(**self.pool_kwargs) as conn:
        # Create a batch.
        event_data_batch: EventDataBatch = await conn.create_batch(partition_key=partition_key)

        # Add events to the batch.
        event_data_batch.add(EventData(event))

        logger.debug("Sending eventhub batch")

        try:
            # Send the batch of events to the event hub.
            await conn.send_batch(event_data_batch)
        except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError):
            logger.error(f"Error sending event: {event}")
            logger.error(f"{traceback.format_exc()}")
            # Mark this connection closed so it won't be reused
            await self.pool.expire_conn(conn)
            raise
        return event_data_batch
send_event_data(event, partition_key=None) async

Send a single EventHub event which is already encoded as EventData.

partition_key will make a particular string identifier "sticky" for a particular partition.

For instance, if you use a Salesforce record identifier as the partition_key then you can ensure that a particular consumer always receives those events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
@connection_pooling.send_time_deco(logger, "Eventhub.send_event_data")
async def send_event_data(
    self,
    event: EventData,
    partition_key: str | None = None,
) -> EventDataBatch:
    """
    Send a *single* EventHub event which is already encoded as `EventData`.

    `partition_key` will make a particular string identifier
    "sticky" for a particular partition.

    For instance, if you use a Salesforce record identifier as the `partition_key`
    then you can ensure that a _particular consumer_ always receives _those_ events.
    """
    async with self.pool.get(**self.pool_kwargs) as conn:
        # Create a batch.
        event_data_batch: EventDataBatch = await conn.create_batch(partition_key=partition_key)

        # Add events to the batch.
        event_data_batch.add(event)

        logger.debug("Sending eventhub batch")
        # Send the batch of events to the event hub.
        try:
            await conn.send_batch(event_data_batch)
            return event_data_batch
        except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError):
            logger.error(f"Error sending event: {event}")
            logger.error(f"{traceback.format_exc()}")
            # Mark this connection closed so it won't be reused
            await self.pool.expire_conn(conn)
            raise
send_events_batch(events_list, partition_key=None) async

Sending events in a batch is more performant than sending individual events.

partition_key will make a particular string identifier "sticky" for a particular partition.

For instance, if you use a Salesforce record identifier as the partition_key then you can ensure that a particular consumer always receives those events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
@connection_pooling.send_time_deco(logger, "Eventhub.send_events_batch")
async def send_events_batch(
    self,
    events_list: list[bytes | str],
    partition_key: str | None = None,
) -> EventDataBatch:
    """
    Sending events in a batch is more performant than sending individual events.

    `partition_key` will make a particular string identifier
    "sticky" for a particular partition.

    For instance, if you use a Salesforce record identifier as the `partition_key`
    then you can ensure that a _particular consumer_ always receives _those_ events.
    """
    async with self.pool.get(**self.pool_kwargs) as conn:
        # Create a batch.
        event_data_batch: EventDataBatch = await conn.create_batch(partition_key=partition_key)

        # Add events to the batch.
        for event in events_list:
            event_data_batch.add(EventData(event))

        logger.debug("Sending eventhub batch")
        # Send the batch of events to the event hub.
        try:
            await conn.send_batch(event_data_batch)
            return event_data_batch
        except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError):
            logger.error(f"Error sending event: {traceback.format_exc()}")
            # Mark this connection closed so it won't be reused
            await self.pool.expire_conn(conn)
            raise
send_events_data_batch(event_data_batch) async

Sending events in a batch is more performant than sending individual events.

Source code in aio_azure_clients_toolbox/clients/eventhub.py
@connection_pooling.send_time_deco(logger, "Eventhub.send_events_data_batch")
async def send_events_data_batch(
    self,
    event_data_batch: EventDataBatch,
) -> EventDataBatch:
    """
    Sending events in a batch is more performant than sending individual events.
    """
    async with self.pool.get(**self.pool_kwargs) as conn:
        logger.debug("Sending eventhub batch")
        # Send the batch of events to the event hub.
        try:
            await conn.send_batch(event_data_batch)
            return event_data_batch
        except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError):
            logger.error(f"Error sending batch {traceback.format_exc()}")
            # Mark this connection closed so it won't be reused
            await self.pool.expire_conn(conn)
            raise

Service Bus

aio_azure_clients_toolbox.clients.service_bus

service_bus.py

Wrapper class around a ServiceBusClient which allows sending messages or subscribing to a queue.

AzureServiceBus(service_bus_namespace_url, service_bus_queue_name, credential_factory, socket_timeout=1)

Basic AzureServiceBus client without connection pooling.

For connection pooling see ManagedAzureServiceBus below.

Source code in aio_azure_clients_toolbox/clients/service_bus.py
def __init__(
    self,
    service_bus_namespace_url: str,
    service_bus_queue_name: str,
    credential_factory: CredentialFactory,
    socket_timeout: float = 1,  ## Value in seconds. Azure default value is 0.2s
):
    self.namespace_url = service_bus_namespace_url
    self.queue_name = service_bus_queue_name
    if not callable(credential_factory):
        raise ValueError(
            "credential_factory must be a callable returning a credential"
        )
    self.credential_factory = credential_factory
    self._receiver_client: ServiceBusReceiver | None = None
    self._receiver_credential: DefaultAzureCredential | None = None
    self._sender_client: SendClientCloseWrapper | None = None
    self._socket_timeout: float = socket_timeout
send_message(msg, delay=0, unique_msg_id=None, **msg_kwargs) async

Schedule a message for delivery.

Parameters:

Name Type Description Default
msg str

Message body to send.

required
delay int

Delay in seconds before the message is available for delivery.

0
unique_msg_id str | None

Optional unique Service Bus message_id used for deduplication.

None
**msg_kwargs

Additional keyword arguments forwarded directly to :class:azure.servicebus.ServiceBusMessage (e.g. content_type, correlation_id, subject, partition_key, session_id, reply_to, time_to_live).

{}
Source code in aio_azure_clients_toolbox/clients/service_bus.py
async def send_message(self, msg: str, delay: int = 0, unique_msg_id: str | None = None, **msg_kwargs):
    """Schedule a message for delivery.

    Args:
        msg:
            Message body to send.
        delay:
            Delay in seconds before the message is available for delivery.
        unique_msg_id:
            Optional unique Service Bus ``message_id`` used for deduplication.
        **msg_kwargs:
            Additional keyword arguments forwarded directly to
            :class:`azure.servicebus.ServiceBusMessage` (e.g.
            ``content_type``, ``correlation_id``, ``subject``,
            ``partition_key``, ``session_id``, ``reply_to``,
            ``time_to_live``).
    """
    message = ServiceBusMessage(msg, message_id=unique_msg_id, **msg_kwargs)
    now = datetime.datetime.now(tz=datetime.UTC)
    scheduled_time_utc = now + datetime.timedelta(seconds=delay)
    sender = self.get_sender()
    await sender.schedule_messages(message, scheduled_time_utc)

ManagedAzureServiceBusSender(service_bus_namespace_url, service_bus_queue_name, credential_factory, client_limit=connection_pooling.DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT, max_size=connection_pooling.DEFAULT_MAX_SIZE, max_idle_seconds=SERVICE_BUS_SEND_TTL_SECONDS, max_lifespan_seconds=None, ready_message='Connection established', pool_connection_create_timeout=10, pool_get_timeout=60)

Bases: AbstractorConnector

Azure ServiceBus Sender client with connnection pooling built in.

Parameters:

Name Type Description Default
service_bus_namespace_url str

String representing the ServiceBus namespace URL.

required
service_bus_queue_name str

Queue name (the "topic").

required
credential_factory CredentialFactory

A callable that returns an async DefaultAzureCredential which may be used to authenticate to the container.

required
client_limit int

Client limit per connection (default: 100).

DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT
max_size int

Connection pool size (default: 10).

DEFAULT_MAX_SIZE
max_idle_seconds int

Maximum duration allowed for an idle connection before recylcing it.

SERVICE_BUS_SEND_TTL_SECONDS
max_lifespan_seconds int | None

Optional setting which controls how long a connection lives before recycling.

None
pool_connection_create_timeout int

Timeout for creating a connection in the pool (default: 10 seconds).

10
pool_get_timeout int

Timeout for getting a connection from the pool (default: 60 seconds).

60
ready_message str | bytes

A string or bytes representing the first "ready" message sent to establish connection.

'Connection established'
Source code in aio_azure_clients_toolbox/clients/service_bus.py
def __init__(
    self,
    service_bus_namespace_url: str,
    service_bus_queue_name: str,
    credential_factory: CredentialFactory,
    client_limit: int = connection_pooling.DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT,
    max_size: int = connection_pooling.DEFAULT_MAX_SIZE,
    max_idle_seconds: int = SERVICE_BUS_SEND_TTL_SECONDS,
    max_lifespan_seconds: int | None = None,
    ready_message: str | bytes = "Connection established",
    pool_connection_create_timeout: int = 10,
    pool_get_timeout: int = 60,
):
    self.service_bus_namespace_url = service_bus_namespace_url
    self.service_bus_queue_name = service_bus_queue_name
    if not callable(credential_factory):
        raise ValueError(
            "credential_factory must be a callable returning a credential"
        )

    self.credential_factory = credential_factory

    self.pool = connection_pooling.ConnectionPool(
        self,
        client_limit=client_limit,
        max_size=max_size,
        max_idle_seconds=max_idle_seconds,
        max_lifespan_seconds=max_lifespan_seconds,
    )
    if not isinstance(ready_message, (str, bytes)):
        raise ValueError("ready_message must be a string or bytes")
    self.ready_message = ready_message

    self.pool_kwargs = {
        "timeout": pool_get_timeout,
        "acquire_timeout": pool_connection_create_timeout,
    }
close() async

Closes all connections in our pool

Source code in aio_azure_clients_toolbox/clients/service_bus.py
async def close(self):
    """Closes all connections in our pool"""
    await self.pool.closeall()
create() async

Creates a new connection for our pool

Source code in aio_azure_clients_toolbox/clients/service_bus.py
async def create(self) -> connection_pooling.AbstractConnection:
    """Creates a new connection for our pool"""
    return cast(connection_pooling.AbstractConnection, self.get_sender())
get_receiver()

Proxy for AzureServiceBus.get_receiver. Here for consistency with above class.

Source code in aio_azure_clients_toolbox/clients/service_bus.py
def get_receiver(self) -> ServiceBusReceiver:
    """
    Proxy for AzureServiceBus.get_receiver. Here
    for consistency with above class.
    """
    client = AzureServiceBus(
        self.service_bus_namespace_url,
        self.service_bus_queue_name,
        self.credential_factory,
    )
    return client.get_receiver()
ready(conn) async

Establishes readiness for a new connection

Source code in aio_azure_clients_toolbox/clients/service_bus.py
@connection_pooling.send_time_deco(logger, "ServiceBus.ready")
async def ready(self, conn: SendClientCloseWrapper) -> bool:
    """Establishes readiness for a new connection"""
    message = ServiceBusMessage(self.ready_message)
    now = datetime.datetime.now(tz=datetime.UTC)
    attempts = 2
    while attempts > 0:
        try:
            await conn.schedule_messages(message, now)
            return True
        except (ServiceBusAuthorizationError, ServiceBusAuthenticationError):
            # We do not believe these will improve with repeated tries
            logger.error(
                "ServiceBus Authorization or Authentication error. Not ready."
            )
            raise
        except (AttributeError, ServiceBusError, exceptions.AzureError):
            logger.warning(
                f"ServiceBus readiness check #{3 - attempts} failed; trying again."
            )
            logger.error(f"{traceback.format_exc()}")
            attempts -= 1

    logger.error("ServiceBus readiness check failed. Not ready.")
    return False
send_message(msg, delay=0, unique_msg_id=None, **msg_kwargs) async

Schedule a message for delivery using a pooled sender connection.

Parameters:

Name Type Description Default
msg str

Message body to send.

required
delay int

Delay in seconds before the message is available for delivery.

0
unique_msg_id str | None

Optional unique Service Bus message_id used for deduplication.

None
**msg_kwargs

Additional keyword arguments forwarded directly to :class:azure.servicebus.ServiceBusMessage (e.g. content_type, correlation_id, subject, partition_key, session_id, reply_to, time_to_live).

{}
Source code in aio_azure_clients_toolbox/clients/service_bus.py
@connection_pooling.send_time_deco(logger, "ServiceBus.send_message")
async def send_message(self, msg: str, delay: int = 0, unique_msg_id: str | None = None, **msg_kwargs):
    """Schedule a message for delivery using a pooled sender connection.

    Args:
        msg:
            Message body to send.
        delay:
            Delay in seconds before the message is available for delivery.
        unique_msg_id:
            Optional unique Service Bus ``message_id`` used for deduplication.
        **msg_kwargs:
            Additional keyword arguments forwarded directly to
            :class:`azure.servicebus.ServiceBusMessage` (e.g.
            ``content_type``, ``correlation_id``, ``subject``,
            ``partition_key``, ``session_id``, ``reply_to``,
            ``time_to_live``).
    """
    message = ServiceBusMessage(msg, message_id=unique_msg_id, **msg_kwargs)
    now = datetime.datetime.now(tz=datetime.UTC)
    scheduled_time_utc = now + datetime.timedelta(seconds=delay)
    async with self.pool.get(**self.pool_kwargs) as conn:
        try:
            await cast(SendClientCloseWrapper, conn).schedule_messages(
                message, scheduled_time_utc
            )
        except (
            ServiceBusCommunicationError,
            ServiceBusAuthorizationError,
            ServiceBusAuthenticationError,
            ServiceBusConnectionError,
        ):
            logger.exception(
                f"ServiceBus.send_message failed. Expiring connection: {traceback.format_exc()}"
            )
            await self.pool.expire_conn(conn)
            raise