--- title: Eight different ways to implement an asyncronous loop in python date: 2023-01-29 tags: [code, python, linux] description: Each time I have to work with asyncio I get frustrated. I find myself longing for the simplicity of callbacks in JavaScript. But maybe I just don't understand asyncio properly yet. --- [asyncio](https://peps.python.org/pep-3156/) was first added to the python standard library more than 10 years ago. Asynchronous I/O had already been possible before that, by using libraries such as twisted or gevent. But asyncio was an attempt to bring the community together and standardize on a common solution. So far this didn't really work out for me. Each time I have to work with asyncio I get frustrated. I find myself longing for the simplicity of callbacks in JavaScript. But maybe I just don't understand asyncio properly yet. I learn best by trying to recreate the thing I want to learn about. So in this post I will retrace the history of asynchronous programming. I will concentrate on python, but I guess much of this translates to other languages. Hopefully this will allow me to better understand and appreciate what asyncio is doing. And hopefully you will enjoy accompanying me on that journey. If you are interested, all eight implementations are available on [github](https://github.com/xi/python_async_loops). ## Setup The following script outputs random numbers at random intervals: ```bash ##!/bin/bash while true; do sleep $(($RANDOM % 5)) echo $RANDOM done ``` I will run two instances of that script in parallel: ```python proc1 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE) proc2 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE) ``` The processes will start immediately and run in parallel to our python code. We have to make sure to stop them when the program exits, e.g. because we press Ctrl-C: ```python def cleanup(): proc1.terminate() proc2.terminate() proc1.wait() proc2.wait() ``` We cannot use `proc.stdout.readline()` because it blocks until a complete line is available. So here is some code that helps us to get the last complete line we received: ```python class LineReader: def __init__(self, file): self.file = file self.buffer = b'' self.line = '' def read_line(self): chunk = os.read(self.file.fileno(), 1024) if not chunk: raise ValueError self.buffer += chunk lines = self.buffer.split(b'\n') if len(lines) > 1: self.line = lines[-2].decode('utf-8') self.buffer = lines[-1] reader1 = LineReader(proc1.stdout) reader2 = LineReader(proc2.stdout) ``` What we want to do is to always render the latest complete line from each process as well as the current time: ```python def render(): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(' '.join([now, reader1.line, reader2.line])) ``` With these basics out of the way, we can start with the loop itself. ## Implementation 1: Blocking Loop Putting it all together we get our first implementation: The blocking loop: ```python import datetime import os import subprocess class LineReader: def __init__(self, file): self.file = file self.buffer = b'' self.line = '' def read_line(self): chunk = os.read(self.file.fileno(), 1024) if not chunk: raise ValueError self.buffer += chunk lines = self.buffer.split(b'\n') if len(lines) > 1: self.line = lines[-2].decode('utf-8') self.buffer = lines[-1] def cleanup(): proc1.terminate() proc2.terminate() proc1.wait() proc2.wait() def render(): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(' '.join([now, reader1.line, reader2.line])) proc1 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE) proc2 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE) reader1 = LineReader(proc1.stdout) reader2 = LineReader(proc2.stdout) try: while True: for reader in [reader1, reader2]: reader.read_line() render() finally: cleanup() ``` In this version, `reader.read_line()` will block until data is available. So it will first wait for data from `proc1`, then wait for data from `proc2`, then render, repeat. This is not really async yet. ## Implementation 2: Busy Loop ```python import fcntl def set_nonblock(fd): fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) set_nonblock(proc1.stdout.fileno()) set_nonblock(proc2.stdout.fileno()) try: while True: for reader in [reader1, reader2]: try: reader.read_line() except BlockingIOError: pass render() finally: cleanup() ``` These are just the parts of the code that changed. I used `fnctl` to set the file descriptor to non-blocking mode. In this mode, `os.read()` will raise a `BlockingIOError` if there is nothing to read. This is great because we cannot get stuck on a blocking read. However, this loop will just keep trying and fully saturate the CPU. This is called a busy loop and obviously not what we want. ## Implementation 3: Sleepy Loop ```python import time try: while True: for reader in [reader1, reader2]: try: reader.read_line() except BlockingIOError: pass time.sleep(1) render() finally: cleanup() ``` By simply adding a `sleep()` we get the benefits of both of the first two implementation: We cannot get stuck on a blocking read, but we also do not end up in a busy loop. This is still far from perfect though: If data arrives quickly we introduce a very noticeable delay of 1 second. And if data arrives slowly we wake up much more often than would be needed. We can adjust the sleep duration to the specific case, but it will never be perfect. ## Implementation 4: Select Loop ```python import selectors selector = selectors.DefaultSelector() selector.register(proc1.stdout, selectors.EVENT_READ, reader1) selector.register(proc2.stdout, selectors.EVENT_READ, reader2) try: while True: for key, mask in selector.select(10): key.data.read_line() render() finally: cleanup() ``` What we actually want to do is sleep until one of the file descriptors is ready. That is exactly what [selectors](https://docs.python.org/3/library/selectors.html) are for. There are different system calls that can be used to implement a selector. It got its name from `select`, but nowadays your are more likely to use `epoll`. The selectors module will automatically pick the best option. In the code above I also added a timeout of 10 seconds to the select call. This allows us to update the time even if none of the file descriptors become available for some time. So with this implementation we have our first *real* async loop, and from here on out we will stick with selectors and only restructure the code surrounding them. ## Implementation 5: Callback Loop ```python class Loop: def __init__(self): self.selector = selectors.DefaultSelector() self.times = [] def set_timeout(self, callback, timeout): now = time.time() self.times.append((callback, now + timeout)) def set_interval(self, callback, timeout): def wrapper(): callback() self.set_timeout(wrapper, timeout) self.set_timeout(wrapper, 0) def register_file(self, file, callback): self.selector.register(file, selectors.EVENT_READ, callback) def unregister_file(self, file): self.selector.unregister(file) def run(self): while True: now = time.time() timeout = min((t - now for _, t in self.times), default=None) for key, mask in self.selector.select(timeout): key.data() keep = [] now = time.time() for callback, t in self.times: if t < now: callback() else: keep.append((callback, t)) self.times = keep def callback1(): try: reader1.read_line() except ValueError: loop.unregister_file(proc1.stdout) render() def callback2(): try: reader2.read_line() except ValueError: loop.unregister_file(proc2.stdout) render() loop = Loop() loop.register_file(proc1.stdout, callback1) loop.register_file(proc2.stdout, callback2) loop.set_interval(render, 10) try: loop.run() finally: cleanup() ``` This implementation improves on the previous one by being much more modular. You can register files with callbacks that will be executed whenever the file is ready. There is also a much more sophisticated system for timeouts and intervals, similar to what you might know from JavaScript. ## Aside: Everything is a File So far our loops can react to files and timeouts, but is that enough? My first hunch is that in unix, "everything is a file", so this should get us pretty far. But let's take a closer look. - I was surprised to learn that processes have *not* been files in unix for the longest time. So much for that saying. There have been some [hacky workarounds](https://stackoverflow.com/questions/75040072#75062615). But to my knowledge, the first proper solution is [`pidfd_open`](https://docs.python.org/3/library/os.html#os.pidfd_open) which was first included in Linux 5.3 in 2019. - Signals can interrupt your program at any time, e.g. in the middle of writing bits to stdout. If you are not careful, this can mess up your state. Luckily there is a simple solution you can use to integrate signals into your select loop: The [self-pipe trick](https://cr.yp.to/docs/selfpipe.html): ```python import signal def register_signal(sig, callback): def on_signal(*args): os.write(pipe_w, b'.') def wrapper(): os.read(pipe_r, 1) callback() pipe_r, pipe_w = os.pipe() signal.signal(sig, on_signal) loop.register_file(pipe_r, wrapper) ``` - Network connections use sockets which can be used with select. Unfortunately, most libraries that implement specific network protocols (e.g. HTTP) are not really reusable because they do not expose the underlying socket. Some years ago there was a [push to create more reusable protocol implementations](https://sans-io.readthedocs.io/) which produced the [hyper project](https://github.com/python-hyper). Unfortunately it didn't really gain traction. - Another issue with reusing existing code is that python likes to buffer a lot. This can have [surprising effects](https://github.com/python/cpython/issues/101053) when the selector tells you that the underlying file descriptor is empty, but there is still data available in the python buffer. ## Implementation 6: Generator Loop We are getting closer to asyncio, but there is still a lot of conceptual ground to cover. Before we get to async/await, we have to talk about generators. ### Motivation As I said in the introduction, I personally really like the callback approach. It is simple, just a selector and some callback functions. Compared to that I find asyncio with its coroutines and tasks and futures and awaitables and transports and protocols and async iterators and executors just confusing. But I recently read [Nathaniel J. Smith's posts on trio](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/), an alternative async loop for python, and I must admit that there are some solid arguments for async/await there. It boils down to this: *Splitting asynchronous execution into a setup and a callback phase does more harm then good.* Let's look at an example: In all the code samples so far I created subprocesses and made sure that they are terminated when the process exits: ```python proc = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE) try: do_something() finally: proc.terminate() proc.wait() ``` Now let's say this is not the whole program, but just one function. And let's further say that `do_something()` is asynchronous: ```python def foo(): proc = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE) def callback(): proc.terminate() proc.wait() loop.set_timeout(callback, 10) ``` We have a couple of issues here: - `callback()` gets executed directly by `loop.run()`, so if it raises an exception the stack trace will not contain `foo()`. - We need to share state (in this case `proc`) between setup and callback. In this case I did that by using a closure, but that's not very elegant. - There is no way to use a `try … finally` here, so if there is an exception in the gap between setup and callback, the cleanup code is never executed. This is the big one. Functions are a curious concept that allows us to get stack traces, easily share state, and do cleanup. The callback approach tries to get by without these benefits. The async/await approach instead tries to keep them by allowing to pause the execution of functions. ### The yield expression The yield expression has been part of python since [PEP 255](https://peps.python.org/pep-0255/) (2001) and got extended considerably in [PEP 342](https://peps.python.org/pep-0342/) (2005). It allows to pause execution of a function and hand control back to the caller. In its simplest form it can be used in a for loop: ```python def foo(): print('yielding 1') yield 1 print('yielding 2') yield 2 for x in foo(): print(x) ## yielding 1 ## 1 ## yielding 2 ## 2 ``` Another common use is to define context managers: ```python from contextlib import contextmanager @contextmanager def bar(): proc = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE) try: yield proc finally: proc.terminate() proc.wait() with bar() as proc: do_something() ``` A function that uses yield is called a *generator function*. Instead of a normal value it returns a *generator*. The first example can be rewriting roughly like this: ```python class FooGenerator: def __init__(self): self.state = 0 def __iter__(self): return self def __next__(self): if self.state == 0: self.state += 1 return 1 elif self.state == 1: self.state += 1 return 2 else: raise StopIteration() gen = iter(FooGenerator()) while True: try: x = next(gen) print(x) except StopIteration: break ``` It is important to distinguish these two conceptual layers. For example, raising `StopIteration` only makes sense in a generator, not in a generator function. There are a few more things you can do with generators: - returning from an iterator sets `StopIteration.value`: ```python def foo(): yield return 2 generator = foo() next(generator) # execute up to the first yield try: next(generator) except StopIteration as e: print(e.value) # 2 ``` - `generator.send(data)` passes control back to the generator, but it also gives the yield expression a value: ```python def foo(): data = yield print(data) generator = foo() next(generator) # execute up to the first yield generator.send('test1') # prints 'test1' and raises StopIteration ``` - `generator.throw(exc)` passes control back to the generator, but it makes the yield expression raise an exception: ```python def foo(): while True: try: yield except TypeError: print('type error') generator = foo() next(generator) # execute up to the first yield generator.throw(TypeError) # prints 'type error' generator.throw(ValueError) # raises ValueError ``` - `generator.close()` is like `generator.throw(GeneratorExit)` - `yield from foo` is like `for item in foo: yield item` For a more in-depth discussion of generators I can recommend the [introduction to async/await by Brett Cannon](https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/). ### The Loop ```python import datetime import os import selectors import subprocess import time selector = selectors.DefaultSelector() data = ['', '', ''] class LineReader: def __init__(self, file): self.file = file self.buffer = b'' self.line = '' def read_line(self): chunk = os.read(self.file.fileno(), 1024) if not chunk: raise ValueError self.buffer += chunk lines = self.buffer.split(b'\n') if len(lines) > 1: self.line = lines[-2].decode('utf-8') self.buffer = lines[-1] class Task: def __init__(self, gen): self.gen = gen self.files = set() self.times = set() self.done = False self.result = None def set_result(self, result): self.done = True self.result = result def init(self): try: self.files, self.times = next(self.gen) except StopIteration as e: self.set_result(e.value) def wakeup(self, files, now): try: if self.done: return elif any(t < now for t in self.times) or files & self.files: self.files, self.times = self.gen.send((files, now)) except StopIteration as e: self.set_result(e.value) def close(self): self.gen.close() def run(gen): task = Task(gen) try: task.init() while not task.done: now = time.time() timeout = min((t - now for t in task.times), default=None) files = {key.fileobj for key, mask in selector.select(timeout)} task.wakeup(files, time.time()) return task.result finally: task.close() def sleep(t): yield set(), {time.time() + t} def gather(*generators): subtasks = [Task(gen) for gen in generators] try: for task in subtasks: task.init() while True: wait_files = set().union( *[t.files for t in subtasks if not t.done] ) wait_times = set().union( *[t.times for t in subtasks if not t.done] ) files, now = yield wait_files, wait_times for task in subtasks: task.wakeup(files, now) if all(task.done for task in subtasks): return [task.result for task in subtasks] finally: for task in subtasks: task.close() def render(): data[0] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(' '.join(data)) def popen(cmd, i): proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) reader = LineReader(proc.stdout) selector.register(proc.stdout, selectors.EVENT_READ) try: while True: yield {proc.stdout}, set() reader.read_line() data[i] = reader.line render() except ValueError: pass finally: selector.unregister(proc.stdout) proc.terminate() proc.wait() def clock(): while True: yield from sleep(10) render() def amain(): yield from gather( popen(['./random.sh'], 1), popen(['./random.sh'], 2), clock(), ) run(amain()) ``` This is the complete code and not just the changed bits because there are so many changes all over the place. I hope with all the buildup it doesn't seem too crazy. Still there are some new concepts that I will try to expand on. First, note how all the setup and teardown for each individual subprocess is now bundled in `popen()`. This is exactly what I was talking about before: We can now trivially do cleanup. This is made possible by the little yield expression that pokes a hole in this function somewhere in the middle. The terminals on the one end of the communication are expressions like `yield {file}, set()` or `yield set(), {timeout}`. This means "pause this generator until this condition is met". On the other end of the communication there is `run()` which will figure out which files are available and send that up the chain. In between there is `gather()` which takes the information from both ends and figures out which of its subtasks should be unpaused. Most other function just pass through the messages by using `yield from`. All this is mediated by `Task` objects which keep track of the conditions and state of generators. ## Implementation 7: async/await Loop From here it is a small step to async/await. Generators that are used for asynchronous execution have already been called "coroutines" in PEP 342. [PEP 492](https://peps.python.org/pep-0492/) (2015) deprecated that approach in favor of "native coroutines" and async/await. Native coroutines are not really different from generator-based coroutines. You can still get the underlying generator by calling `coro.__await__()`. New syntax was introduced to keep the two concepts apart: Iterators use `yield`, coroutines use `async/await`. These two code snippets are more or less identical: ```python async def foo(): await sleep(10) ``` ```python class FooCoroutine: def __await__(self): return (yield from sleep(10).__await__()) ``` This is a minor change, but still it changes the syntax all over the place. So here is the full async/await implementation: ```python import datetime import os import selectors import subprocess import time selector = selectors.DefaultSelector() data = ['', '', ''] class LineReader: def __init__(self, file): self.file = file self.buffer = b'' self.line = '' def read_line(self): chunk = os.read(self.file.fileno(), 1024) if not chunk: raise ValueError self.buffer += chunk lines = self.buffer.split(b'\n') if len(lines) > 1: self.line = lines[-2].decode('utf-8') self.buffer = lines[-1] return self.line class AYield: def __init__(self, value): self.value = value def __await__(self): return (yield self.value) class Task: def __init__(self, coro): self.gen = coro.__await__() self.files = set() self.times = set() self.done = False self.result = None def set_result(self, result): self.done = True self.result = result def init(self): try: self.files, self.times = next(self.gen) except StopIteration as e: self.set_result(e.value) def wakeup(self, files, now): try: if self.done: return elif any(t < now for t in self.times) or files & self.files: self.files, self.times = self.gen.send((files, now)) except StopIteration as e: self.set_result(e.value) def close(self): self.gen.close() def run(coro): task = Task(coro) try: task.init() while not task.done: now = time.time() timeout = min((t - now for t in task.times), default=None) files = {key.fileobj for key, mask in selector.select(timeout)} task.wakeup(files, time.time()) return task.result finally: task.close() async def sleep(t): await AYield((set(), {time.time() + t})) async def gather(*coros): subtasks = [Task(coro) for coro in coros] try: for task in subtasks: task.init() while True: wait_files = set().union( *[t.files for t in subtasks if not t.done] ) wait_times = set().union( *[t.times for t in subtasks if not t.done] ) files, now = await AYield((wait_files, wait_times)) for task in subtasks: task.wakeup(files, now) if all(task.done for task in subtasks): return [task.result for task in subtasks] finally: for task in subtasks: task.close() def render(): data[0] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(' '.join(data)) async def popen(cmd, i): proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) reader = LineReader(proc.stdout) selector.register(proc.stdout, selectors.EVENT_READ) try: while True: await AYield(({proc.stdout}, set())) reader.read_line() data[i] = reader.line render() except ValueError: pass finally: selector.unregister(proc.stdout) proc.terminate() proc.wait() async def clock(): while True: await sleep(10) render() async def amain(): await gather( popen(['./random.sh'], 1), popen(['./random.sh'], 2), clock(), ) run(amain()) ``` ## Implementation 8: asyncio So which kinds of loop does asyncio use? After reading [PEP 3156](https://peps.python.org/pep-3156/) I would say: That's complicated. At the core, asyncio is a simple callback loop. The relevant functions are called `add_reader(file, callback)` and `call_later(delay, callback)`. But then asyncio adds a second layer using async/await. A simplified version looks roughly like this: ```python import asyncio class Future: def __init__(self): self.callbacks = [] self.result = None self.execution = None self.done = False def _set_done(self): self.done = True for callback in self.callbacks: callback(self) def set_result(self, result): self.result = result self._set_done() def set_exception(self, exception): self.exception = exception self._set_done() def add_done_callback(self, callback): self.callbacks.append(callback) def __await__(self): yield self class Task: def __init__(self, coro): self.gen = coro.__await__() def wakeup(self, future=None): try: if future and future.exception: new_future = self.gen.throw(future.exception) else: new_future = next(self.gen) new_future.add_done_callback(self.wakeup) except StopIteration: pass async def sleep(t): future = Future() loop.call_later(t, future.set_result, None) await future async def amain(): print('start') try: await sleep(5) loop.stop() finally: print('finish') loop = asyncio.new_event_loop() task = Task(amain()) task.wakeup() loop.run_forever() ``` When we call `task.wakeup()`, the coroutine `amain()` starts executing. It prints `'foo'`, creates a future, and tells the loop to resolve that future in 5 seconds. Then it yields that future back down to `wakeup()`, which registeres itself as a callback on the future. Now the loop starts running, waits for 5 seconds, and then resolves the future. Because `wakeup()` was added as a callback, it is now called again and passes control back into `amain()`, which prints `'finish'`, stops the loop, and raises `StopIteration`. In the earlier coroutine examples, I yielded files and timeouts as conditions. Since this version is hosted on a callback loop, it instead yields futures that wrap loop callbacks. This approach works reasonably well. But I also see some issues with it. ### Limited support for files You may have noticed that I did not implement the full subprocess example this time. This is because asyncio's coroutine layer doesn't really support files. Futures represent actions that are completed when the callback is called. File callbacks are called every time data is available for reading. This disconnect can probably be bridged somehow, but this post is already long enough and I didn't want to go down yet another rabbit hole. ### Futures are not a monad If you know some JavaScript you have probably come across Promises. Promises are basically the JavaScript equivalent of Futures. However, they have a much nicer API. They are basically a monad, and every Haskell fan can give you an impromptu lecture about the awesomeness of monads. Consider the following snippets that do virtually the same: ```javascript Promise.resolve(1) .then(x => x + 1) .finally(() => console.log('done')); ``` ```python import asyncio def increment(future): try: future2.set_result(future.result() + 1) except Exception as e: future2.set_exception(e) def print_done(future): print('done') loop = asyncio.new_event_loop() future1 = loop.create_future() future1.add_done_callback(increment) future1.set_result(1) future2 = loop.create_future() future2.add_done_callback(print_done) loop.run_until_complete(future2) ``` ### Naming Confusion So far we have "Coroutines", "Futures", and "Tasks". The asyncio documentation also uses the term "Awaitables" for anything that implements `__await__()`, so both Coroutines and Futures are Awaitables. What really makes this complicated is that `Task` inherits from `Future`. So in some places, Coroutines and Futures can be used interchangably because they are both Awaitables -- and in other places, Coroutines and Futures can be used interchangably because Coroutines can automatically be wrapped in Tasks which makes them Futures. I wonder whether it would have been better to call Tasks "CoroutineFutures" instead. Probably not. That makes them sound like they are a simple wrapper, when in fact they are the thing that is actually driving most of the coroutine layer. In any case I believe the asyncio documentation could benefit from a clear separation of layers. First should be a description of the high level coroutine API including `sleep()` and `gather()`. The second part could be about the callback layer, including `call_later()` and `add_reader()`. The third and final part could explain the low level plumbing for those people who want to dive deep. This is the only part that needs to mention terms like "Awaitable", "Task", or "Future". ## Conclusion These were eight different versions of asynchronous loops. I have certainly learned something. A bit about async primitives on linux and a lot about generators and coroutines in python. I hope this post serves as a helpful reference for future endeavors. The big question remains: Which approach is better? The simple cleanup in the coroutine approach is a huge advantage, but it comes at the cost of significant complexity compared to callbacks. The thought that we have to limit ourselves to one of them is not great. So here's to hoping we will someday find an approach that combines the benefits of both.