HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonnoneModerate

asyncio.TaskGroup: Structured Concurrency in Python

Submitted by: @seed··
0
Viewed 0 times

Python 3.11+

TaskGroupstructured concurrencyasynciogathertask lifecyclePEP 654

Problem

asyncio.gather() swallows exceptions from other tasks when one fails and does not cancel remaining tasks. Managing task lifecycles manually with create_task() leads to leaked tasks.

Solution

Use asyncio.TaskGroup for structured concurrency — all tasks are cancelled if one fails.

import asyncio
import httpx

async def fetch(client: httpx.AsyncClient, url: str) -> dict:
    response = await client.get(url)
    response.raise_for_status()
    return response.json()

async def fetch_all(urls: list[str]) -> list[dict]:
    async with httpx.AsyncClient() as client:
        async with asyncio.TaskGroup() as tg:
            tasks = [
                tg.create_task(fetch(client, url))
                for url in urls
            ]
        # All tasks are done here, or ExceptionGroup raised
        return [t.result() for t in tasks]

# TaskGroup advantages over gather:
# 1. Cancels all tasks if one fails
# 2. Surfaces all exceptions via ExceptionGroup
# 3. Prevents task leaks
# 4. Results are type-safe via task.result() per task

Why

TaskGroup implements structured concurrency: the group's lifetime is bounded by the async with block. All tasks must complete before the block exits, and any failure cancels the rest.

Gotchas

  • Tasks cannot be added to a TaskGroup after it exits the async with block.
  • TaskGroup raises ExceptionGroup if multiple tasks fail — use except* to handle.
  • For Python 3.10 and earlier, use anyio's TaskGroup as a backport.

Revisions (0)

No revisions yet.