snippetpythonModeratepending
Python asyncio.Queue for producer-consumer patterns
Viewed 0 times
asyncio queueproducer consumerbackpressureworkerstask_done
Problem
Need to process work items concurrently with backpressure, using producer-consumer pattern in async Python.
Solution
asyncio.Queue with multiple workers:
Key:
import asyncio
async def producer(queue: asyncio.Queue, items: list):
for item in items:
await queue.put(item) # Blocks if queue is full (backpressure)
print(f'Produced: {item}')
# Signal workers to stop
for _ in range(NUM_WORKERS):
await queue.put(None)
async def worker(name: str, queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
try:
await process(item)
print(f'{name} processed: {item}')
except Exception as e:
print(f'{name} failed on {item}: {e}')
finally:
queue.task_done()
async def process(item):
await asyncio.sleep(0.1) # Simulate work
NUM_WORKERS = 5
async def main():
queue = asyncio.Queue(maxsize=10) # Backpressure at 10
items = list(range(50))
# Start workers
workers = [
asyncio.create_task(worker(f'worker-{i}', queue))
for i in range(NUM_WORKERS)
]
# Start producer
await producer(queue, items)
# Wait for all items to be processed
await queue.join()
# Wait for workers to finish
await asyncio.gather(*workers)
asyncio.run(main())Key:
maxsize provides backpressure - producer blocks when queue is full, preventing memory issues with fast producers.Why
asyncio.Queue provides thread-safe, async-compatible coordination between producers and consumers with built-in backpressure.
Context
Python async applications processing work items concurrently
Revisions (0)
No revisions yet.