Multi-process Server

Based on Async Context Manager, this module provides an automated lifecycle management for multi-process servers with explicit initialization steps and graceful shutdown steps.

server(func)

A decorator wrapper for AsyncServerContextManager.

Usage example:

@aiotools.server
async def myserver(loop, pidx, args):
    await do_init(args)
    stop_sig = yield
    if stop_sig == signal.SIGINT:
        await do_graceful_shutdown()
    else:
        await do_forced_shutdown()

aiotools.start_server(myserver, ...)
class AsyncServerContextManager(func: Callable[[...], Any], args, kwargs)[source]

A modified version of contextlib.asynccontextmanager().

The implementation detail is mostly taken from the contextlib standard library, with a minor change to inject self.yield_return into the wrapped async generator.

yield_return: Optional[signal.Signals]
exception InterruptedBySignal[source]

A new BaseException that represents interruption by an arbitrary UNIX signal.

Since this is a BaseException instead of Exception, it behaves like KeyboardInterrupt and SystemExit exceptions (i.e., bypassing except clauses catching the Exception type only)

The first argument of this exception is the signal number received.

class ServerMainContextManager(func, args, kwargs)[source]

A modified version of contextlib.contextmanager().

The implementation detail is mostly taken from the contextlib standard library, with a minor change to inject self.yield_return into the wrapped generator.

yield_return: Optional[signal.Signals]
main(func)

A decorator wrapper for ServerMainContextManager

Usage example:

@aiotools.main
def mymain():
    server_args = do_init()
    stop_sig = yield server_args
    if stop_sig == signal.SIGINT:
        do_graceful_shutdown()
    else:
        do_forced_shutdown()

aiotools.start_server(..., main_ctxmgr=mymain, ...)
start_server(worker_actxmgr: typing.Callable[[asyncio.events.AbstractEventLoop, int, typing.Sequence[typing.Any]], aiotools.server.AsyncServerContextManager], main_ctxmgr: typing.Optional[typing.Callable[[], aiotools.server.ServerMainContextManager]] = None, extra_procs: typing.Iterable[typing.Callable] = (), stop_signals: typing.Iterable[signal.Signals] = (<Signals.SIGINT: 2>, <Signals.SIGTERM: 15>), num_workers: int = 1, args: typing.Iterable[typing.Any] = (), wait_timeout: typing.Optional[float] = None) None[source]

Starts a multi-process server where each process has their own individual asyncio event loop. Their lifecycles are automantically managed – if the main program receives one of the signals specified in stop_signals it will initiate the shutdown routines on each worker that stops the event loop gracefully.

Parameters
  • worker_actxmgr

    An asynchronous context manager that dicates the initialization and shutdown steps of each worker. It should accept the following three arguments:

    • loop: the asyncio event loop created and set by aiotools

    • pidx: the 0-based index of the worker (use this for per-worker logging)

    • args: a concatenated tuple of values yielded by main_ctxmgr and the user-defined arguments in args.

    aiotools automatically installs an interruption handler that calls loop.stop() to the given event loop, regardless of using either threading or multiprocessing.

  • main_ctxmgr – An optional context manager that performs global initialization and shutdown steps of the whole program. It may yield one or more values to be passed to worker processes along with args passed to this function. There is no arguments passed to those functions since you can directly access sys.argv to parse command line arguments and/or read user configurations.

  • extra_procs

    An iterable of functions that consist of extra processes whose lifecycles are synchronized with other workers. They should set up their own signal handlers.

    It should accept the following three arguments:

    • intr_event: Always None, kept for legacy

    • pidx: same to worker_actxmgr argument

    • args: same to worker_actxmgr argument

  • stop_signals – A list of UNIX signals that the main program to recognize as termination signals.

  • num_workers – The number of children workers.

  • args – The user-defined arguments passed to workers and extra processes. If main_ctxmgr yields one or more values, they are prepended to this user arguments when passed to workers and extra processes.

  • wait_timeout – The timeout in seconds before forcibly killing all remaining child processes after sending initial stop signals.

Returns

None

Changed in version 0.3.2: The name of argument num_proc is changed to num_workers. Even if num_workers is 1, a child is created instead of doing everything at the main thread.

New in version 0.3.2: The argument extra_procs and main_ctxmgr.

New in version 0.4.0: Now supports use of threading instead of multiprocessing via use_threading option.

Changed in version 0.8.0: Now worker_actxmgr must be an instance of AsyncServerContextManager or async generators decorated by @aiotools.server.

Now main_ctxmgr must be an instance of ServerMainContextManager or plain generators decorated by @aiotools.main.

The usage is same to asynchronous context managers, but optionally you can distinguish the received stop signal by retrieving the return value of the yield statement.

In extra_procs in non-threaded mode, stop signals are converted into either one of KeyboardInterrupt, SystemExit, or InterruptedBySignal exception.

New in version 0.8.4: start_method argument can be set to change the subprocess spawning implementation.

Deprecated since version 1.2.0: The start_method and use_threading arguments, in favor of our new afork() function which provides better synchronization and pid-fd support.

Changed in version 1.2.0: The extra_procs will be always separate processes since use_threading is deprecated and thus intr_event arguments are now always None.

New in version 1.5.5: The wait_timeout argument.