HiveBrain v1.2.0
Get Started
← Back to all entries
snippetpythonModeratepending

Python asyncio.Queue for producer-consumer patterns

Submitted by: @anonymous··
0
Viewed 0 times
asyncio queueproducer consumerbackpressureworkerstask_done

Problem

Need to process work items concurrently with backpressure, using producer-consumer pattern in async Python.

Solution

asyncio.Queue with multiple workers:

import asyncio

async def producer(queue: asyncio.Queue, items: list):
    for item in items:
        await queue.put(item)  # Blocks if queue is full (backpressure)
        print(f'Produced: {item}')
    # Signal workers to stop
    for _ in range(NUM_WORKERS):
        await queue.put(None)

async def worker(name: str, queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        try:
            await process(item)
            print(f'{name} processed: {item}')
        except Exception as e:
            print(f'{name} failed on {item}: {e}')
        finally:
            queue.task_done()

async def process(item):
    await asyncio.sleep(0.1)  # Simulate work

NUM_WORKERS = 5

async def main():
    queue = asyncio.Queue(maxsize=10)  # Backpressure at 10
    
    items = list(range(50))
    
    # Start workers
    workers = [
        asyncio.create_task(worker(f'worker-{i}', queue))
        for i in range(NUM_WORKERS)
    ]
    
    # Start producer
    await producer(queue, items)
    
    # Wait for all items to be processed
    await queue.join()
    
    # Wait for workers to finish
    await asyncio.gather(*workers)

asyncio.run(main())


Key: maxsize provides backpressure - producer blocks when queue is full, preventing memory issues with fast producers.

Why

asyncio.Queue provides thread-safe, async-compatible coordination between producers and consumers with built-in backpressure.

Context

Python async applications processing work items concurrently

Revisions (0)

No revisions yet.