Nicolas Le Manchet

Adding asyncio support to a threaded task queue

I maintain Spinach, a Python library for distributing tasks to a cluster of machines. Since its inception it only supported threads as the unit of concurrency.

At the time this choice made sense since other available task queues had poor support for threads, good support for processes and the asynchronous landscape was still constantly changing. Fast forward a few years and Python now has async/await as part of the language and has largely settled on the asyncio framework. It now makes sense to support running tasks as asyncio coroutines.

The existing threaded implementation

Spinach has two levels of queues where jobs wait to be picked by a worker: a global queue that lives in Redis and a local one in each process running Spinach. When a process is running out of work to do it fetches a handful of jobs from Redis and puts them in the local in_queue that is consumed by a pool of worker threads managed by the Workers class:

import queue
import threading

class Workers:

    def __init__(self, num_workers: int):
        # Local queue consumed by worker threads
        self.in_queue = queue.Queue(maxsize=num_workers)

        # Start the thread pool
        for _ in range(self._num_workers):
            thread = threading.Thread(target=self._worker_func)
            thread.start()

    def _worker_func(self):
        while True:
            # Each worker thread blocks until a job is ready
            job = self.in_queue.get()

            if job is None:
                break

            # Actually execute the job
            job.task_func(*job.task_args, **job.task_kwargs)

Adding asyncio support

Creating a new kind of workers class running coroutines is straightforward. Instead of launching a pool of threads, a single thread managing the asyncio event loop is launched, creating one coroutine per worker.

import asyncio

class AsyncioWorkers:

    def __init__(self, num_workers: int):
        self.num_workers = num_workers
        self.in_queue = queue.Queue(maxsize=num_workers)

        # Create the thread managing the event loop
        thread = threading.Thread(
            target=asyncio.run,
            args=(self._async_interface_func,)
        )
        thread.start()

    async def _async_interface_func(self):
        # Create a future for each worker
        worker_futures = list()
        for _ in range(self.num_workers):
            worker_futures.append(
                asyncio.ensure_future(self._worker_func())
            )

        # Run the futures and wait for them to terminate
        await asyncio.gather(*worker_futures)

    async def _worker_func(self):
        while True:
            job = self.in_queue.get()  # Blocking!

            if job is None:
                break

            # Execute the asynchronous job
            await job.task_func(*job.task_args, **job.task_kwargs)

The _worker_func as implemented here has a problem: the queue blocks! This means that the whole event loop gets stuck when one worker waits for jobs, preventing other workers from making progress.

To solve this problem a new kind of Queue is needed, one that can be used from synchronous code to push jobs into it but that can also be used from asynchronous code to wait for jobs without blocking the event loop.

The asyncio ecosystem has such queue however it has a few design decisions that make it a poor fit for this use case. After shelving the feature for a few months waiting to come up with a better design, I stumbled upon a great talk by David Beazley "The Other Async" where he creates the exact queue needed from scratch.

Using his queue, workers can wait for jobs without blocking:

await job = self.in_queue.get_async()  # Not blocking :)

Wrapping it all together

With this change, Spinach can run asyncio tasks:

import asyncio

from spinach import Engine, MemoryBroker, AsyncioWorkers

spin = Engine(MemoryBroker())

@spin.task(name='sleep')
async def sleep(duration: int):
    await asyncio.sleep(duration)

spin.schedule(sleep, 1)
spin.start_workers(workers_class=AsyncioWorkers)

More information: