HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Execute coroutines in pool

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
coroutinespoolexecute

Problem

I want to run all the coroutines from the list in a pool of constant size and would like to ask if this is the right way how to achieve it. Is there any built in solution for this problem? I have not found any.

def subprocess_pool(coroutines, pool_size=3):
    loop = asyncio.get_event_loop()
    event_finished = asyncio.Event()

    def done(future):
        event_finished.set()

    @asyncio.coroutine 
    def scheduler(coroutines):
        num = len(coroutines)
        finished = 0
        started = 0
        while finished<num:
            if coroutines and (started - finished) < pool_size:
                cor = coroutines.pop(0)
                task = loop.create_task(cor) 
                task.add_done_callback(done)
                started += 1
            else:
                yield from event_finished.wait()
                event_finished.clear()
                finished += 1
        return True

    loop.run_until_complete(scheduler(coroutines))
    loop.close()

Solution

Making a pool of coroutines does not seem to be the way why we invented coroutines. Coroutines are meant to be lightweight, so that they can be created in very large numbers. Rather than limiting the number of coroutines, I think the right way is to limit the number of running coroutines, and

use an asyncio.Semaphore

async def worker(semaphore, coro):
    async with semaphore:
        await coro()

async def scheduler(corotines):
    tasks = []
    semaphore = asyncio.BoundedSemaphore(pool_size)
    for coro in corotines:
        tasks.append(loop.create_task(worker(coro))
    await asyncio.gather(*tasks)

loop.run_until_complete(scheduler(coroutines))
loop.close()


check out this article for a full example.

Code Snippets

async def worker(semaphore, coro):
    async with semaphore:
        await coro()


async def scheduler(corotines):
    tasks = []
    semaphore = asyncio.BoundedSemaphore(pool_size)
    for coro in corotines:
        tasks.append(loop.create_task(worker(coro))
    await asyncio.gather(*tasks)


loop.run_until_complete(scheduler(coroutines))
loop.close()

Context

StackExchange Code Review Q#122004, answer score: 5

Revisions (0)

No revisions yet.