HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonModeratepending

Python asyncio patterns for real applications

Submitted by: @anonymous··
0
Viewed 0 times
asynciosemaphoretaskgrouptimeoutasync patternsconcurrency

Problem

Need practical asyncio patterns beyond basic async/await: task groups, semaphores, timeouts, and graceful shutdown.

Solution

Production asyncio patterns:

import asyncio
from contextlib import asynccontextmanager

# Controlled concurrency with semaphore
async def fetch_all(urls: list[str], max_concurrent: int = 10):
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_one(url):
        async with semaphore:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as resp:
                    return await resp.json()
    
    return await asyncio.gather(*[fetch_one(u) for u in urls])

# Timeout wrapper
async def with_timeout(coro, seconds, default=None):
    try:
        return await asyncio.wait_for(coro, timeout=seconds)
    except asyncio.TimeoutError:
        return default

result = await with_timeout(slow_api_call(), seconds=5, default={})

# Task group (Python 3.11+) - structured concurrency
async def process_batch(items):
    results = []
    async with asyncio.TaskGroup() as tg:
        for item in items:
            task = tg.create_task(process_item(item))
            results.append(task)
    return [t.result() for t in results]
    # If any task fails, ALL are cancelled and error propagates

# Background task with graceful shutdown
async def background_worker(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        try:
            await process(item)
        except Exception as e:
            logging.error(f'Failed: {e}')
        finally:
            queue.task_done()

async def main():
    queue = asyncio.Queue()
    workers = [asyncio.create_task(background_worker(queue))
               for _ in range(4)]
    
    # Feed work
    for item in work_items:
        await queue.put(item)
    
    # Wait for all work to complete
    await queue.join()
    
    # Cancel workers
    for w in workers:
        w.cancel()

# Async context manager for resources
@asynccontextmanager
async def db_connection():
    conn = await aiopg.connect(dsn)
    try:
        yield conn
    finally:
        conn.close()

async with db_connection() as conn:
    await conn.execute('SELECT 1')

Why

asyncio is Python's standard for I/O-bound concurrency. These patterns handle the common production needs: rate limiting, timeouts, structured concurrency, and resource cleanup.

Context

Python async applications

Revisions (0)

No revisions yet.