Source code for aiotools.context

"""
Provides an implementation of asynchronous context manager and its applications.

.. note::

   The async context managers in this module are transparent aliases to
   ``contextlib.asynccontextmanager`` of the standard library in Python 3.7
   and later.
"""

import asyncio
import contextlib
from contextvars import ContextVar
from typing import (
    Generic,
    Iterable,
    List,
    Optional,
    TypeVar,
)

from .types import AsyncClosable

__all__ = [
    "resetting",
    "AsyncContextManager",
    "async_ctx_manager",
    "actxmgr",
    "aclosing",
    "closing_async",
    "AsyncContextGroup",
    "actxgroup",
    "AsyncExitStack",
]


T = TypeVar("T")
T_AsyncClosable = TypeVar("T_AsyncClosable", bound=AsyncClosable)

AbstractAsyncContextManager = contextlib.AbstractAsyncContextManager
AsyncContextManager = contextlib._AsyncGeneratorContextManager
AsyncExitStack = contextlib.AsyncExitStack
async_ctx_manager = contextlib.asynccontextmanager
aclosing = contextlib.aclosing


class resetting(Generic[T]):
    """
    An extra context manager to auto-reset the given context variable.
    It supports both the standard contextmanager protocol and the
    async-contextmanager protocol.

    .. versionadded:: 1.8.0
    """

    def __init__(self, ctxvar: ContextVar[T], value: T) -> None:
        self._ctxvar = ctxvar
        self._value = value

    def __enter__(self) -> None:
        self._token = self._ctxvar.set(self._value)

    async def __aenter__(self) -> None:
        self._token = self._ctxvar.set(self._value)

    def __exit__(self, *exc_info) -> Optional[bool]:
        self._ctxvar.reset(self._token)
        return None

    async def __aexit__(self, *exc_info) -> Optional[bool]:
        self._ctxvar.reset(self._token)
        return None


[docs]class closing_async(Generic[T_AsyncClosable]): """ An analogy to :func:`contextlib.closing` for objects defining the ``close()`` method as an async function. .. versionadded:: 1.5.6 """ def __init__(self, thing: T_AsyncClosable) -> None: self.thing = thing async def __aenter__(self) -> T_AsyncClosable: return self.thing async def __aexit__(self, *exc_info) -> Optional[bool]: await self.thing.close() return None
[docs]class AsyncContextGroup: """ Merges a group of context managers into a single context manager. Internally it uses :func:`asyncio.gather()` to execute them with overlapping, to reduce the execution time via asynchrony. Upon entering, you can get values produced by the entering steps from the passed context managers (those ``yield``-ed) using an ``as`` clause of the ``async with`` statement. After exits, you can check if the context managers have finished successfully by ensuring that the return values of ``exit_states()`` method are ``None``. .. note:: You cannot return values in context managers because they are generators. If an exception is raised before the ``yield`` statement of an async context manager, it is stored at the corresponding manager index in the as-clause variable. Similarly, if an exception is raised after the ``yield`` statement of an async context manager, it is stored at the corresponding manager index in the ``exit_states()`` return value. Any exceptions in a specific context manager does not interrupt others; this semantic is same to ``asyncio.gather()``'s when ``return_exceptions=True``. This means that, it is user's responsibility to check if the returned context values are exceptions or the intended ones inside the context body after entering. :param context_managers: An iterable of async context managers. If this is ``None``, you may add async context managers one by one using the :meth:`~.add` method. """ def __init__( self, context_managers: Optional[Iterable[AbstractAsyncContextManager]] = None ): # noqa self._cm = list(context_managers) if context_managers else [] self._cm_yields: List[asyncio.Task] = [] self._cm_exits: List[asyncio.Task] = []
[docs] def add(self, cm): """ TODO: fill description """ self._cm.append(cm)
async def __aenter__(self): # Exceptions in context managers are stored into _cm_yields list. # NOTE: There is no way to "skip" the context body even if the entering # process fails. self._cm_yields[:] = await asyncio.gather( *(e.__aenter__() for e in self._cm), return_exceptions=True ) return self._cm_yields async def __aexit__(self, *exc_info): # Clear references to context variables. self._cm_yields.clear() # Exceptions are stored into _cm_exits list. self._cm_exits[:] = await asyncio.gather( *(e.__aexit__(*exc_info) for e in self._cm), return_exceptions=True )
[docs] def exit_states(self): """ TODO: fill description """ return self._cm_exits
# Shorter aliases actxmgr = async_ctx_manager actxgroup = AsyncContextGroup