Source code for aiotools.func

import collections
import functools
from typing import Optional

from .compat import get_running_loop

__all__ = (

_CacheEntry = collections.namedtuple("_CacheEntry", "value expire_at")

[docs]def apartial(coro, *args, **kwargs): """ Wraps a coroutine function with pre-defined arguments (including keyword arguments). It is an asynchronous version of :func:`functools.partial`. """ @functools.wraps(coro) async def wrapped(*cargs, **ckwargs): return await coro(*args, *cargs, **kwargs, **ckwargs) return wrapped
[docs]def lru_cache( maxsize: int = 128, typed: bool = False, expire_after: Optional[float] = None, ): """ A simple LRU cache just like :func:`functools.lru_cache`, but it works for coroutines. This is not as heavily optimized as :func:`functools.lru_cache` which uses an internal C implementation, as it targets async operations that take a long time. It follows the same API that the standard functools provides. The wrapped function has ``cache_clear()`` method to flush the cache manually, but leaves ``cache_info()`` for statistics unimplemented. Note that calling the coroutine multiple times with the same arguments before the first call returns may incur duplicate executions. This function is not thread-safe. Args: maxsize: The maximum number of cached entries. typed: Cache keys in different types separately (e.g., ``3`` and ``3.0`` will be different keys). expire_after: Re-calculate the value if the configured time has passed even when the cache is hit. When re-calculation happens the expiration timer is also reset. """ if maxsize is not None and not isinstance(maxsize, int): raise TypeError("Expected maxsize to be an integer or None") def wrapper(coro): sentinel = object() # unique object to distinguish None as result cache = collections.OrderedDict() cache_get = cache.get cache_del = cache.__delitem__ cache_set = cache.__setitem__ cache_len = cache.__len__ cache_move = cache.move_to_end make_key = functools._make_key # We don't use explicit locks like the standard functools, # because this lru_cache is intended for use in asyncio coroutines. # The only context interleaving happens when calling the user-defined # coroutine, so there is no need to add extra synchronization guards. @functools.wraps(coro) async def wrapped(*args, **kwargs): now = get_running_loop().time() k = make_key(args, kwargs, typed) entry = cache_get(k, sentinel) if entry is not sentinel: if entry.expire_at is None: return entry.value if entry.expire_at >= now: return entry.value cache_del(k) result = await coro(*args, **kwargs) if maxsize is not None and cache_len() >= maxsize: cache.popitem(last=False) if expire_after is not None: expire_at = now + expire_after else: expire_at = None cache_set(k, _CacheEntry(result, expire_at)) cache_move(k, last=True) return result def cache_clear(): cache.clear() def cache_info(): raise NotImplementedError wrapped.cache_clear = cache_clear wrapped.cache_info = cache_info return wrapped return wrapper