I maintain Spinach, a Python library for distributing tasks to a cluster of machines. Since its inception it only supported threads as the unit of concurrency.
At the time this choice made sense since other available task queues had poor support for threads, good support for processes and the asynchronous landscape was still constantly changing. Fast forward a few years and Python now has async/await as part of the language and has largely settled on the asyncio framework. It now makes sense to support running tasks as asyncio coroutines.
The existing threaded implementation
Spinach has two levels of queues where jobs wait to be picked by a worker: a global queue that lives in Redis and a local one in each process running Spinach. When a process is running out of work to do it fetches a handful of jobs from Redis and puts them in the local in_queue that is consumed by a pool of worker threads managed by the Workers class:
import queue import threading class Workers: def __init__(self, num_workers: int): # Local queue consumed by worker threads self.in_queue = queue.Queue(maxsize=num_workers) # Start the thread pool for _ in range(self._num_workers): thread = threading.Thread(target=self._worker_func) thread.start() def _worker_func(self): while True: # Each worker thread blocks until a job is ready job = self.in_queue.get() if job is None: break # Actually execute the job job.task_func(*job.task_args, **job.task_kwargs)
Adding asyncio support
Creating a new kind of workers class running coroutines is straightforward. Instead of launching a pool of threads, a single thread managing the asyncio event loop is launched, creating one coroutine per worker.
import asyncio class AsyncioWorkers: def __init__(self, num_workers: int): self.num_workers = num_workers self.in_queue = queue.Queue(maxsize=num_workers) # Create the thread managing the event loop thread = threading.Thread( target=asyncio.run, args=(self._async_interface_func,) ) thread.start() async def _async_interface_func(self): # Create a future for each worker worker_futures = list() for _ in range(self.num_workers): worker_futures.append( asyncio.ensure_future(self._worker_func()) ) # Run the futures and wait for them to terminate await asyncio.gather(*worker_futures) async def _worker_func(self): while True: job = self.in_queue.get() # Blocking! if job is None: break # Execute the asynchronous job await job.task_func(*job.task_args, **job.task_kwargs)
The _worker_func as implemented here has a problem: the queue blocks! This means that the whole event loop gets stuck when one worker waits for jobs, preventing other workers from making progress.
To solve this problem a new kind of Queue is needed, one that can be used from synchronous code to push jobs into it but that can also be used from asynchronous code to wait for jobs without blocking the event loop.
The asyncio ecosystem has such queue however it has a few design decisions that make it a poor fit for this use case. After shelving the feature for a few months waiting to come up with a better design, I stumbled upon a great talk by David Beazley "The Other Async" where he creates the exact queue needed from scratch.
Using his queue, workers can wait for jobs without blocking:
await job = self.in_queue.get_async() # Not blocking :)
Wrapping it all together
With this change, Spinach can run asyncio tasks:
import asyncio from spinach import Engine, MemoryBroker, AsyncioWorkers spin = Engine(MemoryBroker()) @spin.task(name='sleep') async def sleep(duration: int): await asyncio.sleep(duration) spin.schedule(sleep, 1) spin.start_workers(workers_class=AsyncioWorkers)