Engineering
In this article, I will introduce the new asyncio.TaskGroup API that was added in Python 3.11 and the aiotools.PersistentTaskGroup API that I have developed and am proposing.
This is particularly important for implementing structured concurrency. In fact, we've encountered bugs in Backend.AI's development that were caused by not properly following structured concurrency principles.
Structured Concurrency
Have you ever heard of structured programming?
Structured programming is a concept that emerged in the late 1960s and early 1970s, during the early era of computing. It became more widely known through Dijkstra's paper "Go To Statement Considered Harmful." Dijkstra made significant contributions to programming language development.
At that time, many programs were developed using assembly language or other languages that we would now consider low-level. Today, when writing programs in procedural languages, it's taken for granted that instructions are executed in the order they are written, using conditional statements (branches), loops, recursion, and subroutines (functions) to express control flow. However, this was not the case back then. Excessive use of GOTO made it extremely difficult for even the person who wrote the program to mentally trace why the control flow jumped from one point to another after some time had passed.
By using structured programming, the number of possible ways the control flow can change or jump becomes limited. By using specific programming language keywords for these constructs, anyone can understand the intent behind the change in execution flow. Because the possible paths of execution flow can always be analyzed, a program that combines these control structures can also always be analyzed. By demonstrating that Turing-complete programs (capable of doing everything a computer can do) could be written with just a few of these control structures, the foundation for modern programming languages was laid.
Structured concurrency extends this concept to asynchronous programming and coroutines. Its origins can be traced back to the fork-join model, but it was brought to mainstream attention in 2018 through a blog post by Nathaniel J. Smith, who was involved in the introduction of Python's wheel package format and the development of trio, an alternative asyncio implementation. The post's subtitle, "Notes on structured concurrency, or go statement considered harmful," was a clear homage to Dijkstra's paper. In it, he argued that all coroutines (or branches of execution flow) must have clear entry and exit points, and it must be guaranteed that all associated asynchronous operations are completed before the exit point.
Especially in asynchronous programming, a situation can arise where a task being awaited is canceled at an arbitrary point. This creates the need to ensure that any additional asynchronous tasks spawned by that task are all canceled together before control returns to the caller. Otherwise, the entity expecting a result may have already disappeared, while the task continues to run, leading to issues like excessive resource consumption or memory leaks.
asyncio.TaskGroup
The asyncio.TaskGroup class was added to Python 3.11. The implementation that went into the standard library introduces the nursery concept from Nathaniel J. Smith's trio. This implementation was based on an alternative implementation by the core asyncio contributors at EdgeDB and was subsequently debugged and improved by Guido van Rossum.
Asyncio's high-level API already provides a few control structures for asynchronous tasks. A prime example is asyncio.gather(), which is equivalent to JavaScript's Promise.all(). It's a function that waits for all given coroutines or asynchronous tasks to complete before returning.
However, this function has a problem. It would be great if all tasks always succeeded, but what happens if one task fails? To guarantee structured concurrency, one of the following approaches must be taken:
- Wait until all tasks have a result, whether success or failure.
- If one task fails, cancel the other ongoing tasks and immediately proceed to the next line of code.
asyncio.gather() appears to take the first approach, but when one task fails, it lets the other tasks continue running while it immediately raises the failed task's exception and returns. This leaves no way to retrieve the results of the other tasks, nor is there a way to catch and handle their potential exceptions. (In asyncio, the event loop itself has a fallback exception handler, so the only thing that can be done is to log the error.) Alternatively, specifying the return_exceptions=True option makes it wait for all tasks to finish, even if failures occur, and then returns a list of results and exceptions. However, this requires adding cumbersome code to loop through and check for exceptions, as shown below.
results = await asyncio.gather(mycoro(...), ..., return_exceptions=True)
for result in results:
if isinstance(result, Exception):
... # handle error
else:
... # handle resultThis is where asyncio.TaskGroup comes in. It takes the second approach, with some enhancements. When the TaskGroup itself is canceled, all its internal asynchronous tasks are also canceled. Furthermore, it is used via the async with statement. The latter point is particularly important for implementing structured concurrency because the Python language syntax guarantees that the exit handler will always be executed under any circumstances.
async with asyncio.TaskGroup() as tg:
tg.create_task(mycoro(...))
tg.create_task(mycoro(...))
...
tg.create_task(mycoro(...))
# Here, it is guaranteed that all inner tasks were either completed or cancelled.When one or more of the asynchronous tasks raise an exception, the ExceptionGroup and the except* syntax were newly added in Python 3.11 to handle them collectively and to easily filter for specific exception types. A detailed introduction to ExceptionGroup is too lengthy to cover here, so please refer to the PEP-654 documentation. aiotools provides a backported implementation so that TaskGroup can be used in Python versions 3.6 and later. This implementation was based on EdgeDB's implementation and the official implementation. In Python 3.11 and later, it will automatically switch to using the standard library's implementation and ExceptionGroup.
aiotools.PersistentTaskGroup
Is this really enough?
In Backend.AI, there's a common pattern of waiting for a group of asynchronous tasks, especially when running a 'cluster session' composed of multiple containers distributed across several nodes. Once the scheduler decides which agents will host the containers for a cluster session, it regroups the containers by agent and sends each agent a bundle of containers to create. The manager then waits for the asynchronous tasks for multiple agents, and each agent, in turn, waits for the asynchronous tasks to create each container in the bundle it received. If just one container on one agent fails to be created, how should the entire operation be canceled?
In scenarios like the one above, Backend.AI more often uses the first pattern (wait for all tasks to complete). The reason is that if an asynchronous task involves many side effects from external system calls, it is very difficult to roll back only the side effects that have occurred up to the point of cancellation. This is made even more difficult because the parts that handle the filesystem currently use asyncio's thread pool executor, making it harder to stop tasks midway. A mechanism would need to be implemented to separately record whether each side effect has been executed and then, upon cancellation, collect only the executed ones and undo them in reverse order.
To ensure structured concurrency even in these situations, I developed PersistentTaskGroup. Like TaskGroup, it structurally guarantees that all internal asynchronous tasks are finished upon exit. The difference is that a failure in one task does not trigger the cancellation of other tasks; instead, it calls a pre-specified exception handler. In addition to async with-based usage, it also supports a separate shutdown() method so it can be attached to the lifecycle handlers of long-lived objects.
# async-with block usage
import aiotools
async def main():
async with aiotools.PersistentTaskGroup() as tg:
tg.create_task(...)
tg.create_task(...)
tg.create_task(...)
...
# Here, it is guaranteed that all tasks have finished.# Object lifecycle synchronization usage
import aiotools
class LongLivedObject:
def __init__(self, ...):
self._tg = aiotools.PersistentTaskGroup()
...
async def aclose(self):
await self._tg.shutdown()
# Here, it is guaranteed that all tasks have finished.
async def some_work(self, ...):
self._tg.create_task(...)
self._tg.create_task(...)
...The create_task() method returns an asyncio.Future object, which allows you to get the asynchronous result of the task. It doesn't return the Task object directly because PersistentTaskGroup is responsible for managing the Task's lifecycle; the Future object acts as a proxy to get the result. Canceling the original task using the returned Future object is not supported. However, you can await it to get the result returned by the individual task or to catch an exception raised by it.
The introduction of the Persistent Task Group helped fix the following bugs:
- When container creation failed on an agent, the manager would record in the database that the session creation was canceled due to an error, but this record was not being written reliably.
- When a pre-validation check failed while adding a session creation request to the scheduler queue, in some cases this was not properly propagated as an error return value for the session creation request API.
I actually opened an issue to add the Persistent Task Group API to Python 3.11, but the conclusion was to monitor the stability of the aiotools implementation for a while and consider its inclusion around version 3.12.
Because the codebase of Backend.AI is quite large, there are still many places that use asyncio.gather(..., return_exceptions=True). We plan to gradually replace these with PersistentTaskGroup. I believe this will become the foundation for providing an even more stable Backend.AI platform in the future. I also hope this article has been helpful to other developers of Python asyncio-based applications.
https://aiotools.readthedocs.io/en/latest/aiotools.taskgroup.html
