patternpythonnoneModerate
asyncio.TaskGroup: Structured Concurrency in Python
Viewed 0 times
Python 3.11+
TaskGroupstructured concurrencyasynciogathertask lifecyclePEP 654
Problem
asyncio.gather() swallows exceptions from other tasks when one fails and does not cancel remaining tasks. Managing task lifecycles manually with create_task() leads to leaked tasks.
Solution
Use asyncio.TaskGroup for structured concurrency — all tasks are cancelled if one fails.
import asyncio
import httpx
async def fetch(client: httpx.AsyncClient, url: str) -> dict:
response = await client.get(url)
response.raise_for_status()
return response.json()
async def fetch_all(urls: list[str]) -> list[dict]:
async with httpx.AsyncClient() as client:
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(fetch(client, url))
for url in urls
]
# All tasks are done here, or ExceptionGroup raised
return [t.result() for t in tasks]
# TaskGroup advantages over gather:
# 1. Cancels all tasks if one fails
# 2. Surfaces all exceptions via ExceptionGroup
# 3. Prevents task leaks
# 4. Results are type-safe via task.result() per taskWhy
TaskGroup implements structured concurrency: the group's lifetime is bounded by the async with block. All tasks must complete before the block exits, and any failure cancels the rest.
Gotchas
- Tasks cannot be added to a TaskGroup after it exits the async with block.
- TaskGroup raises ExceptionGroup if multiple tasks fail — use except* to handle.
- For Python 3.10 and earlier, use anyio's TaskGroup as a backport.
Revisions (0)
No revisions yet.