3 essential async patterns for building a Python service

Learn how to avoid the most common pitfalls when creating a service

blog-thumb-elevate-our-work-1680x980.png

A service is an application that runs forever, reacting to events differently. For example, a website or a web service gets HTTP requests and sends back responses in various formats (HTML, JSON, etc.). For the connectors-python project, we’re pulling work from Elasticsearch, grabbing data from some source to inject into Elasticsearch, and going back to an idling state.

The structure of such a service using modern Python is based on starting a global task via an async function that loops forever and idles when it’s not triggering some work. It’s a pretty straightforward pattern and offers a simple model: whenever something needs to happen in that main function, it’s deferred to another asynchronous task, and life goes on.

There are a few considerations to make such an application work well. We want to be sure to do the following:

  • Exit in a clean and fast way
  • Avoid tasks explosion
  • Harness your memory usage

Projects like Trio, AnyIO, and to some extent, some features in Curio, are addressing these topics in various ways, but this blog post focuses on implementing solutions using vanilla Python.

[Related article: Perf8: Performance metrics for Python]

Make a graceful and fast exit

Let’s consider the following example, where the main task checks for some work to do and then sleeps for a minute by calling asyncio.sleep(). It implements a clean signal handler to do some cleanups and toggle the running flag for a graceful termination:

import asyncio
import os
import signal


async def main():
    loop = asyncio.get_running_loop()
    running = True

    def shutdown():
        nonlocal running
        # cleanup work
        running = False  # will end the loop

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, shutdown)

    while running:         
        await check_and_execute_work()
        await asyncio.sleep(60)

asyncio.run(main())

But this implementation will make your service unresponsive for up to a minute when handling the shutdown signals if it is waiting for sleep to be over. This can be a minor annoyance, but in some execution environments, it can be a bigger problem. You could also have concurrent sleep calls in other tasks in your application, which will slow down a clean termination because you will have to wait for all tasks to finish. 

To avoid the problem, we need to be able to immediately cancel all active sleeps. A simple pattern is to build an asyncio.sleep factory that keeps track of all sleep tasks and can cancel them if needed. 

Below is a class we use in our project:

class CancellableSleeps:
    def __init__(self):
        self._sleeps = set()

    async def sleep(self, delay, result=None, *, loop=None):
        async def _sleep(delay, result=None, *, loop=None):
            coro = asyncio.sleep(delay, result=result)
            task = asyncio.ensure_future(coro)
            self._sleeps.add(task)
            try:
                return await task
            except asyncio.CancelledError:
                print("Sleep canceled")
                return result
            finally:
                self._sleeps.remove(task)

        await _sleep(delay, result=result, loop=loop)

    def cancel(self):
        for task in self._sleeps:
            task.cancel()

This class keeps track of all running asyncio.sleep() tasks and provides a way to cancel them immediately. To use it in the service, you must create one class instance and have all the code use its sleep() method instead of asyncio.sleep().

Below is the modified version of the main() function that uses the CancellableSleeps class:

async def main():
    sleeps = CancellableSleeps()
    loop = asyncio.get_running_loop()
    running = True

    def shutdown():
        nonlocal running
        # cleanup work
        running = False  # will end the loop
        sleeps.cancel() # cancel all running sleep tasks

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, shutdown)

    while running:
        await check_and_execute_work()
        await sleeps.sleep(60)

The shutdown() function will toggle the running flag and cancel all sleeps. That same sleeps object could be passed around and used by all tasks that need to run a sleep task.

Notice that this shared cancellation pattern is implemented in Trio if you use that library (see Trio’s core functionality).

Pool your tasks

A caller has two possible strategies to execute a task by calling an async function. It can wait for its termination with the await keyword, which makes the caller itself block for the result. The second option is to use a fire-and-forget strategy, where the caller adds a task in the event loop and continues with its life without waiting for the task to finish.

The second pattern is handy when you need concurrency from within an async function — for instance, when your code needs to send updates to Elasticsearch and you know you can do it through many concurrent requests.

A possible function that is doing this could be:

async def send_data():
    tasks = [] 
    while have_data: 
        batch = await get_batch_of data()
        t = asyncio.create_task(send_data_to_es(batch))
        tasks.append(t)
        t.add_done_callaback(tasks.remove)

    # make sure we wait for all tasks to end
    await asyncio.gather(*tasks)

This loop allows several send_data_to_es() tasks to run concurrently, but if the get_batch_data() is way faster than the execution time of send_data_to_es(), you’ve just built a tasks bomb! Calls to Elasticsearch are going to pile up and eventually time out. This may also lead to memory issues if the batches use a lot of memory.

To avoid this problem, send_data_to_es() tasks need to be throttled, and a generic way to do it is to build a pool of tasks with an upper limit on concurrency.

Below is a class that we use for this throttling:

class ConcurrentTasks:
    def __init__(self, max_concurrency=5, 
                 results_callback=None):
        self.max_concurrency = max_concurrency
    	self.tasks = []
        self.results_callback = results_callback
    	self._task_over = asyncio.Event()

    def __len__(self):
        return len(self.tasks)

    def _callback(self, task, result_callback=None):
        self.tasks.remove(task)
    	self._task_over.set()
    	if task.exception():
            raise task.exception()
    	if result_callback is not None:
            result_callback(task.result())
    	# global callback
    	if self.results_callback is not None:
            self.results_callback(task.result())

    async def put(self, coroutine, result_callback=None):
    	# If self.tasks has reached its max size
        # we wait for one task to finish
    	if len(self.tasks) >= self.max_concurrency:
            await self._task_over.wait()
            # rearm
            self._task_over.clear()
    	task = asyncio.create_task(coroutine())
    	self.tasks.append(task)
    	task.add_done_callback(
         functools.partial(self._callback,
                           result_callback=result_callback)
    	)
    	return task

    async def join(self):
    	await asyncio.gather(*self.tasks)

It’s a bit different from a queue because there’s no sequential ordering for triggering the execution of tasks. The class will ensure that no more than max_concurrency tasks can be active simultaneously.
The class offers two methods: put(), which can be used to add an async function for execution, and join(), which can be used to wait until all tasks are over. put() will block until the pool has room for a new task. Below is the modified send_data() function that uses it:

import functools

async def send_data():
    tasks = ConcurrentTasks(5) 
    while have_data: 
        batch = await get_batch_of data()
        await tasks.put(
            functools.partial(
                send_data_to_es, batch)
            )
        )

    # make sure we wait for all tasks to end
    await tasks.join()

A couple of libraries provide a similar feature: aiometer, asyncio-pool.

Python 3.11, released just a few months ago, also introduced the TaskGroup class that can be used as a context manager to run tasks in parallel. From the documentation:

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coro(...))
        task2 = tg.create_task(another_coro(...))
    print("Both tasks have completed now.")

We are not using Python 3.11 yet in production as it is quite recent, but moving to asyncio.TaskGroup going forward would make a lot of sense.

Harness your memory usage

Memory is a critical resource for a service application. The service can’t grow indefinitely in memory when dealing with a lot of data.

In connectors-python, we are building an intermediate service between a third-party service that provides some data and Elasticsearch. The data gets massaged in our pipeline and is passed along. Running many concurrent tasks that hold data to do this work means that the Python heap and its process Resident Set Size (RSS) will grow.

In the previous example, the send_data() function will hold in memory five concurrent batches of data it is planning to send to Elasticsearch. Depending on how get_batch_of_data and send_data_to_es are implemented, you might get up to three copies of the same batch simultaneously in the heap unless you pass around the same objects. 

To avoid memory bloat, we need to have an upper limit for the data the service holds. Python has a queue class that we can use to store and limit items we are dealing with, but it is not memory aware — it does not know how much memory each item uses.

You could use sys.getsizeof() on simple objects, but it won’t work for complex objects composed of other objects. It won’t recursively find out the memory size of each linked object, and the value you will get will be off.

>>> import sys
>>> sys.getsizeof(['A', 'B', []])
80
>>> sys.getsizeof(['A', 'B', ['C', 'D', 'E']])
80

To get the real size, you can use the Pympler library:

>>> from pympler import asizeof
>>> asizeof.asizeof(['A', 'B', []])
248
>>> asizeof.asizeof(['A', 'B', ['C', 'D', 'E']])
448

Calling asizeof() has a small CPU cost, but it is extremely useful to measure the real memory you are using. Based on this library, we’ve created a MemQueue class derived from asyncio.Queue, which will store the memory size of items and let you define a maximum size in memory.
This class gets the size item when put or put_nowait are called, and provides a similar locking mechanism you can find in asyncio.Queue.

class MemQueue(asyncio.Queue):
    def __init__(
        self, maxsize=0, maxmemsize=0, 
        refresh_interval=1.0, refresh_timeout=60
    ):
    	super().__init__(maxsize)
    	self.maxmemsize = maxmemsize
    	self.refresh_interval = refresh_interval
    	self.refresh_timeout = refresh_timeout

    async def put(self, item):
    	item_size = get_size(item)

    	# specific locking code (see original class in GitHub)
         
    	super().put_nowait((item_size, item))

    def put_nowait(self, item):
        item_size = get_size(item)

    	if self.full(item_size):
            raise asyncio.QueueFull
    	super().put_nowait((item_size, item))

The locking mechanism is based on creating a Future object that gets enqueued in a collections.deque, and put will just sit there until a get call removes an item and unblock that future. You can find the full implementation of this class in the utils.py module in the project. On top of this, a timeout will be raised after some time if a put can’t succeed. See the implementation details in https://github.com/elastic/connectors-python/blob/main/connectors/utils.py.

The MemQueue class is almost an in-place replacement for asyncio.Queue. In the example below, it’s used in a consumer-producer pattern:

FINISHED = -1

# queue of 5 MiB max, and 1000 items max
q = MemQueue(maxsize=1000, maxmemsize=5*1024*1024)

async def pull_data():
    async for item in stream_of_data():
        await q.put(item)  # block until <5MiB or <1k items
    await q.put(FINISHED)

async def push_data():
    while True:
        Item_size, item = await q.get()
        if item == FINISHED:
            break
        push_item(item)

# running the producer and consumer
asyncio.run(asyncio.gather(pull_data(), push_data()))

What do you think?

You can find all these classes in action in the connectors-python project. If you use them or have improvement ideas, please start a discussion in our issue tracker. It’s an open-source project, and contributions are welcome!