Skip to content

Connection Pooling API Reference

API documentation for the connection pooling module.

aio_azure_clients_toolbox.connection_pooling

connection_pooling.py

The idea for our Azure clients is that unlike database connections all of our clients can be reused across multiple requesters.

What we really need to achieve is the following:

  • A "pool" of connections, where
  • Each connection may be shared by more than 1 requester, and
  • Each connection has an idle lifespan.

The latter is the most important because Azure will enforce idle timeouts for all sockets. For this reason, we will do the following: - lazily create connections as needed - share connections between many requesters - put connections back into an idle data structure when necessary - when connections are dead (exception or idle timeout) then we'll lock and recreate - when a connection has exceeded its "share" amount, we'll lock and create a new one.

ConnectionPool(connector, client_limit=DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT, max_size=DEFAULT_MAX_SIZE, max_idle_seconds=DEFAULT_CONNECTION_MAX_IDLE_SECONDS, max_lifespan_seconds=None)

Our goal here is to allow many clients to share connections, but to expire them when they've reached their idle time limits.

Most clients can call this with the default values below.

Parameters:

Name Type Description Default
connector AbstractorConnector

An instance of an AbstractConnector for creating connections.

required
client_limit int

The max clients allowed per connection (default: 100).

DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT
max_size int

The max size for the connection pool or max connections held (default: 10).

DEFAULT_MAX_SIZE
max_idle_seconds int

Maximum duration allowed for an idle connection before recylcing it.

DEFAULT_CONNECTION_MAX_IDLE_SECONDS
max_lifespan_seconds int | None

Optional setting which controls how long a connection live before recycling.

None
Source code in aio_azure_clients_toolbox/connection_pooling.py
def __init__(
    self,
    connector: AbstractorConnector,
    client_limit: int = DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT,
    max_size: int = DEFAULT_MAX_SIZE,
    max_idle_seconds: int = DEFAULT_CONNECTION_MAX_IDLE_SECONDS,
    max_lifespan_seconds: int | None = None,
):
    # Each shared connection allows up to this many connections
    self.client_limit = client_limit
    # Pool's max size
    self.max_size = max_size
    if self.max_size < 1:
        raise ValueError("max_size must a postive integer")
    self.connector = connector

    # A pool is just a heap of connection-managing things
    # All synchronization primitives are in the connections
    self._pool = [
        SharedTransportConnection(
            self.connector,
            client_limit=self.client_limit,
            max_idle_seconds=max_idle_seconds,
            max_lifespan_seconds=max_lifespan_seconds,
        )
        for _ in range(self.max_size)
    ]
    heapq.heapify(self._pool)
    self.max_lifespan_seconds = max_lifespan_seconds

closeall() async

Close all connections

Source code in aio_azure_clients_toolbox/connection_pooling.py
async def closeall(self) -> None:
    """Close all connections"""
    async with create_task_group() as tg:
        for conn in self._pool:
            tg.start_soon(conn.close)

expire_conn(connection) async

Expire a connection. Because we yield AbstractConnections while our pool is SharedTransportConnections, we need to give clients a way to look up a connection and expire it directly.

Source code in aio_azure_clients_toolbox/connection_pooling.py
async def expire_conn(self, connection: AbstractConnection) -> None:
    """
    Expire a connection.
    Because we yield AbstractConnections while our pool is SharedTransportConnections,
    we need to give clients a way to look up a connection and expire it directly.
    """
    for conn in self._pool:
        if conn.eq(connection):
            await conn.close()
            break
    heapq.heapify(self._pool)
    return None

get(timeout=60.0, acquire_timeout=10.0) async

Pull out an idle connection.

The binary heap allows us to always pull out the youngest connection, which is the one most likely to connect without issues. This relies on the less-than/greater-than implementation above.

Throws: ConnectionsExhausted if too many connections opened.

Source code in aio_azure_clients_toolbox/connection_pooling.py
@asynccontextmanager
async def get(
    self,
    timeout: float = 60.0,
    acquire_timeout: float = 10.0,
) -> AsyncGenerator[AbstractConnection, None]:
    """
    Pull out an idle connection.

    The binary heap allows us to always pull out the *youngest*
    connection, which is the one most likely to connect without issues.
    This relies on the less-than/greater-than implementation above.

    Throws: `ConnectionsExhausted` if too many connections opened.
    """
    connection_reached = False
    total_time: float = 0
    # We'll loop almost all connections in the pool to find a candidate
    # We add one in case it's zero.
    now = time.monotonic()
    conn_check_n = math.floor(self.max_size // 1.25) or 1
    while not connection_reached and total_time < timeout:
        for shareable_conn in heapq.nsmallest(conn_check_n, self._pool):
            if shareable_conn.available:
                logger.debug(
                    f"[ConnPool] Acquiring {shareable_conn} with "
                    f"active clients={shareable_conn.current_client_count}"
                )
                async with shareable_conn.acquire(timeout=acquire_timeout) as conn:
                    # Do not yield connections that should be closed
                    if conn is not None and not shareable_conn.should_close:
                        yield conn
                        connection_reached = True
                        break
            elif shareable_conn.closeable:
                logger.debug(f"[ConnPool] {shareable_conn} Closing expired connection")
                await shareable_conn.close()

        # Arbitrary async yield to avoid busy loop
        await anyio.sleep(0.002)
        latest = time.monotonic()
        total_time += latest - now

    if connection_reached:
        heapq.heapify(self._pool)
    else:
        raise ConnectionsExhausted(
            "No connections available: consider using a larger value for `client_limit`"
        )

SharedTransportConnection(connector, client_limit=DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT, max_lifespan_seconds=None, max_idle_seconds=DEFAULT_CONNECTION_MAX_IDLE_SECONDS)

Each connection can be shared by many clients.

The problem we need to solve for most pressingly is idle timeouts, but we also have problems around opening, establishing, and closing connections.

Thus, each connection has the following lifecycle phases:

  • Closed
  • Open and not ready
  • Open and ready

These are also the critical sections of work, so transitioning from one phase to another involves locking the resource.

The problem is that when a client first attempts to use a connection, it calls one of the network-communication methods, and at that point, the connection is established. To other clients who are awaiting their turn, the connection already looks open, so they may try to use it early and fail. The same problem happens on closing: one client closes while another still thinks the connection is live.

Outside of this, after we have sent for the first time, we're fine to share the connection as much as we want. Thus, we need to lock out all clients during its critical sections of work:

  • Lock this connection when opening an underlying connection
  • Lock this connection when establishing "readiness" (first usage)
  • Lock this connection when closing an underlying connection

At all other points we can share it a whole lot (between 100 clients or more). To see what's happening, enable debug logs: logger.getLogger("aio_azure_clients_toolbox").setLevel(logging.DEBUG)

Footnote: most Azure sdk clients use aiohttp shared transports below the surface which actually has threadpooling with up to 100 connections. We wanted something more generic, though, which is why this class exists.

Azure Python SDK has an example of a shared transport for Azure clients but we wanted to start simpler and more agnostic here:

https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/core/azure-core/samples/example_shared_transport_async.py#L51-L71

Based on the Azure example, though, we should easily be able to share one of these "connection" objects with 100 requesters.

Parameters:

Name Type Description Default
connector AbstractorConnector

An instance of an AbstractConnector for creating connections.

required
client_limit int

The max clients allowed per connection (default: 100).

DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT
max_idle_seconds int

Maximum duration allowed for an idle connection before recylcing it.

DEFAULT_CONNECTION_MAX_IDLE_SECONDS
max_lifespan_seconds int | None

Optional setting which controls how long a connection live before recycling.

None
Source code in aio_azure_clients_toolbox/connection_pooling.py
def __init__(
    self,
    connector: AbstractorConnector,
    client_limit: int = DEFAULT_SHARED_TRANSPORT_CLIENT_LIMIT,
    max_lifespan_seconds: int | None = None,
    max_idle_seconds: int = DEFAULT_CONNECTION_MAX_IDLE_SECONDS,
) -> None:
    # When was this connection created
    self.connection_created_ts: int | None = None
    # When did this connection last become idle
    self.last_idle_start: int | None = None
    # What's the max lifespan (idle or active) allowed for this connection
    if max_lifespan_seconds is None:
        # If `None`, this feature is disabled
        self.max_lifespan_ns = None
    else:
        self.max_lifespan_ns = max_lifespan_seconds * NANO_TIME_MULT

    self.max_clients_allowed = client_limit
    # What's the max idle time allowed for this connection
    self.max_idle_ns = max_idle_seconds * NANO_TIME_MULT
    # How many clients are allowed
    self.client_limiter = anyio.CapacityLimiter(total_tokens=client_limit)
    # Is the connection ready to close
    self._should_close = False

    self._connector: AbstractorConnector = connector
    self._connection: AbstractConnection | None = None
    self._open_close_lock: anyio.Lock = anyio.Lock()
    self._id = (binascii.hexlify(os.urandom(3))).decode()
    self._ready = anyio.Event()

available property

Check if connection exists and client usage limit has been reached

closeable property

Check if connection can be closed (no clients using it)

expired property

Calculate if connection has been idle or active longer than allowed

is_ready property

Proxy for whether our readiness Event has been set.

lifetime property

Check the lifetime of this object (in nanos)

should_close property

Check if connection should be closed

time_spent_idle property

Check the idle time of this object (in nanos)

acquire(timeout=10.0) async

Acquire a connection with a timeout

Source code in aio_azure_clients_toolbox/connection_pooling.py
@asynccontextmanager
async def acquire(
    self, timeout: float = 10.0
) -> AsyncGenerator[AbstractConnection | None, None]:
    """Acquire a connection with a timeout"""
    acquired_conn = None
    async with self.client_limiter:
        async with create_task_group():
            with move_on_after(timeout) as scope:
                acquired_conn = await self.checkout()

        # If this were nested under `create_task_group` then any exceptions
        # get thrown under `BaseExceptionGroup`, which is surprising for clients.
        # See: https://github.com/agronholm/anyio/issues/141
        if not scope.cancelled_caught and acquired_conn:
            try:
                yield acquired_conn
            finally:
                await self.checkin(acquired_conn)
        else:
            yield None
            await self.checkin(None)

check_readiness() async

Indicates when ready by waiting for the connector to signal

Source code in aio_azure_clients_toolbox/connection_pooling.py
async def check_readiness(self) -> None:
    """Indicates when ready by waiting for the connector to signal"""
    if self._ready.is_set():
        return None

    # We use a lock when making sure the connection is ready
    # Our goal is to set readiness Event once for one client.
    if self._connection:
        async with self._open_close_lock:
            logger.debug(f"[check_readiness {self}] Setting readiness")
            is_ready = await self._connector.ready(self._connection)
            if is_ready:
                self._ready.set()
            else:
                raise ConnectionFailed(f"{self} Failed readying connection")

checkin(conn=None) async

Called after a connection has been used

Source code in aio_azure_clients_toolbox/connection_pooling.py
async def checkin(self, conn: AbstractConnection | None = None) -> None:
    """Called after a connection has been used"""

    # we only consider idle time to start when *one* client is connected
    if self.current_client_count == 1 and conn is not None:
        logger.debug(f"[checkin {self}] is now idle")
        self.last_idle_start = time.monotonic_ns()

    # Check if TTL exceeded for this connection
    if self.max_lifespan_ns is not None and self.expired:
        logger.debug(f"[checkin {self}] Marking connection to be closed after use")
        self._should_close = True

    logger.debug(f"[checkin {self}] current_client_count is now {self.current_client_count - 1}")

checkout() async

This function has the important job of keeping track of last_idle_start and making sure a connection has been established and that it is ready.

Must be followed by checkin!

Source code in aio_azure_clients_toolbox/connection_pooling.py
async def checkout(self) -> AbstractConnection:
    """
    This function has the important job of keeping
    track of `last_idle_start` and making sure a connection has been
    established and that it is ready.

    Must be followed by checkin!
    """
    # Bookkeeping: we want to know how long it takes to acquire a connection
    now = time.monotonic_ns()

    if self.expired and self.current_client_count == 1:
        logger.debug(f"[checkout {self}] Retiring Connection past its lifespan")
        # This may look surprising, but `close()` may take a while and we want to discourage
        # checkouts *while we are closing this client*.
        self._should_close = True
        await self.close()

    if not self._connection:
        self._connection = await self.create()
        # Make sure connection is ready
        # one thing we could do here is yield the connection and set our event after
        # the *first* successful usage, but defining that success is tougher...?
        await self.check_readiness()

    self.last_idle_start = None

    # We do not want to use locks here because it creates a lot of lock contention,
    # We *only* need to wait for the first successful connection to indicate readiness
    await self._ready.wait()

    # Debug timings to reveal the extent of lock contention
    logger.debug(
        f"[checkout {self}] available in {time.monotonic_ns() - now}ns. "
        f"Active client count: {self.current_client_count}"
    )
    return self._connection

close() async

Closes the connection

Source code in aio_azure_clients_toolbox/connection_pooling.py
async def close(self) -> None:
    """Closes the connection"""
    if self._connection is None:
        return None

    # We use a lock on *closing* a connection
    async with self._open_close_lock:
        logger.debug(f"[close {self}] Closing the Connection")
        try:
            await self._connection.close()
        except Exception:
            pass

        # Reset attributes to initial state
        self._should_close = False
        self._connection = None
        self._ready = anyio.Event()
        self.last_idle_start = None
        if self.max_lifespan_ns is not None:
            self.connection_created_ts = None

create() async

Establishes the connection or reuses existing if already created.

Source code in aio_azure_clients_toolbox/connection_pooling.py
async def create(self) -> AbstractConnection:
    """Establishes the connection or reuses existing if already created."""
    if self._connection:
        return self._connection

    # We use a lock on *opening* a connection
    async with self._open_close_lock:
        logger.debug(f"[create {self}] Creating a new connection")
        self._connection = await self._connector.create()
        # Check if we need to expire connections based on lifespan
        if self.max_lifespan_ns is not None:
            self.connection_created_ts = time.monotonic_ns()

        return self._connection

AbstractorConnector

Bases: ABC

AbstractConnection

Bases: ABC

ConnectionsExhausted

Bases: ValueError

ConnectionFailed

Bases: ConnectionError