☑ The State of Python Coroutines: asyncio - Callbacks vs. Coroutines

This is part 3 of the “State of Python Coroutines” series of posts:

I recently spotted that Python 3.5 has added yet more features to make coroutines more straightforward to implement and use. Since I’m well behind the curve I thought I’d bring myself back up to date over a series of blog posts, each going over some functionality added in successive Python versions — this one covers more of the asyncio module that was added in Python 3.4.

python code

In the preceding post in this series I introduced the asyncio module and its utility as an event loop for coroutines. However, this isn’t the only use of the module — its primary purpose is to act as an event loop for various forms of I/O such as network sockets and pipes to child processes. In this post, then, I’d like to compare the two main approaches to doing this: using callbacks and using coroutines.

A brief digression: handling multiple connections

Anyone that’s done a decent amount of non-blocking I/O can probably skim or skip this section — for anyone who’s not come across this problem in their coding experience, this might be useful.

There are quite a few occasions where you end up needing to handle multiple I/O streams simultaneously. An obvious one is something like a webserver, where you want to handle multiple network connections concurrently. There are other examples, though — one thing that crops up quite often for me is managing multiple child processes, where I want to stream output from them as soon as it’s generated. Another possibility is where you’re making multiple HTTP requests that you want to be fetched in parallel.

In all these cases you want your application to respond immediately to input received on any stream, but at the same time it’s clear you need to block and wait for input — endlessly looping and polling each stream would be a massive waste of system resources. Typically there are two main approaches to this: threads and non-blocking I/O1.

These days threads seem to be the more popular solution — each I/O stream has a new thread allocated for it and the stack of this thread encapsulates its complete state. This makes it easy for programmers who aren’t used to dealing with event loops — they can continue to write simple sequential code that uses standard blocking I/O calls to yield as required. It has some downsides, however — cooperating with other threads requires the overhead of synchronisation and if the turnover of connections is high (consider, say, a busy DNS server) then it’s slightly wasteful to be continually creating and destroying thread stacks. If you want to solve the C10k problem, for example, I think you’d struggle to do it using a thread per connection.

The other alternative is to use a single thread and have it wait for activity on any stream, then process that input and go back to sleep again until another stream is ready. This is typically simpler in some ways — for example, you don’t need any locking between connections because you’re only processing one at any given time. It’s also perfectly performant in cases where you expect to be primarily IO-bound (i.e. handling connections won’t require significant CPU time) — indeed, depending on how the data structures associated with your connections are allocated this approach could improve performance by avoiding false sharing issues.

The downside to this method is that it’s a rather less intuitive for many programmers. In general you’d like to write some straight-line code to handle a single connection, then have some magical means to extend that to multiple connections in parallel — that’s the lure of threading. But there is a way we can achieve, to some extent, the best of both worlds (spoiler alert: it’s coroutines).

The mainstays for implementing non-blocking I/O loops in the Unix world have long been select(), introduced by BSD in 1983, and the slightly later poll(), added to System V in 1986. There are some minor differences but in both cases the model is very similar:

  • Register a list of file descriptors to watch for activity.
  • Call the function to wait for activity on any of them.
  • Examine the returned value to discover which descriptors are active and process them.
  • Loop around to the beginning and wait again.

This is often known as the event loop — it’s a loop, it handles events. Implementing an event loop is quite straightforward, but the downside is that the programmer essentially has to find their own way to maintain the state associated with each connection. This often isn’t too tricky, but sometimes when the connection handling is very context-dependent it can make the code rather hard to follow. If often feels like scrabbling to implement some half-arsed version of closures and it would preferable to let language designers worry about that sort of thing.

The rest of this article will focus on how we can use asyncio to stop worrying so much about some of these details and write more natural code whilst still getting the benefits of the non-blocking I/O approach.

asyncio with callbacks

One problem with using the likes of select() is that it can encourage you to drive all your coding from one big loop. Without a bit of work, this tends to run counter to the design principle of separating concerns, so we’d like to move as much as possible out of this big loop. Ideally we’d also like to abstract it, implement in a library somewhere and get benefits of reusing well-tested code. This is partiularly important for event loops where the potential for serious issues (such as getting into a busy loop) is rather higher than in a lot of areas of code.

The most common way to hook into a generic event look is with callbacks. The application registers callback functions which are to be invoked when particular events occur, and then the application jumps into a wait function whose purpose is to simply loop until there are events and invoke the appropriate callbacks.

It’s unsurprising, then, that asyncio is designed to support the callback approach. To illustrate this I’ve turned to my usual example of a chat server — this is a really simple daemon that waits for socket connections (e.g. using netcat or telnet) then prompts for a username and allows connected users to talk to each other.

This implementation is, of course, exceedingly basic — it’s meant to be an example, not a fully-featured application. Here’s the code, I’ll touch on the highlights afterwards.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import asyncio
import sys

class ChatServer:

    class ChatProtocol(asyncio.Protocol):

        def __init__(self, chat_server):
            self.chat_server = chat_server
            self.username = None
            self.buffer = ""
            self.transport = None

        def connection_made(self, transport):
            # Callback: when connection is established, pass in transport.
            self.transport = transport
            welcome = "Welcome to " + self.chat_server.server_name
            self.send_msg(welcome + "\nUsername: ")

        def data_received(self, data):
            # Callback: whenever data is received - not necessarily buffered.
            data = data.decode("utf-8")
            self.buffer += data
            self.handle_lines()

        def connection_lost(self, exc):
            # Callback: client disconnected.
            if self.username is not None:
                self.chat_server.remove_user(self.username)

        def send_msg(self, msg):
            self.transport.write(msg.encode("utf-8"))

        def handle_lines(self):
            while "\n" in self.buffer:
                line, self.buffer = self.buffer.split("\n", 1)
                if self.username is None:
                    if self.chat_server.add_user(line, self.transport):
                        self.username = line
                    else:
                        self.send_msg("Sorry, that name is taken\nUsername: ")
                else:
                    self.chat_server.user_message(self.username, line)


    def __init__(self, server_name, port, loop):
        self.server_name = server_name
        self.connections = {}
        self.server = loop.create_server(
                lambda: self.ChatProtocol(self),
                host="", port=port)

    def broadcast(self, message):
        for transport in self.connections.values():
            transport.write((message + "\n").encode("utf-8"))

    def add_user(self, username, transport):
        if username in self.connections:
            return False
        self.connections[username] = transport
        self.broadcast("User " + username + " joined the room")
        return True

    def remove_user(self, username):
        del self.connections[username]
        self.broadcast("User " + username + " left the room")

    def get_users(self):
        return self.connections.keys()

    def user_message(self, username, msg):
        self.broadcast(username + ": " + msg)


def main(argv):

    loop = asyncio.get_event_loop()
    chat_server = ChatServer("Test Server", 4455, loop)
    loop.run_until_complete(chat_server.server)
    try:
        loop.run_forever()
    finally:
        loop.close()


if __name__ == "__main__":
sys.exit(main(sys.argv))

The ChatServer class provides the main functionality of the application, tracking the users that are connected and providing methods to send messages. The interaction with asycio, however, is provided by the nested ChatProtocol class. To explain what this is doing, I’ll summarise a little terminology.

The asyncio module splits IO handling into two areas of responsibility — transports take care of getting raw bytes from one place to another and protocols are responsible for interpreting those bytes into some more meaningful form. In the case of a HTTP request, for example, the transport would read and write from the TCP socket and the protocol would marshal up the request and parse the response to exract the headers and body.

This is something asyncio took from the Twisted networking framework and it’s one of the aspects I really appreciate. All too many HTTP client libraries, for example, jumble up the transport and protocol handling into one big mess such that changing one aspect but still making use of the rest is far too difficult.

The transports that asyncio provides cover TCP, UDP, SSL and pipes to a subprocess, which means that most people won’t need to roll their own. The interesting part, then, is asycio.Protocol and that’s what ChatProtocol implements in the example above.

The first thing that happens is that the main() function instantiates the event loop — this occurs before anything else as it’s required for all the other operations. We then create a ChatServer instance whose constructor calls create_server() on the event loop. This opens a listening TCP socket on the specified port2 and takes a protocol factory as a parameter. Every time there is a connection on the listening socket, the factory will be used to manufacture a protocol instance to handle it.

The main loop then calls run_until_complete() passing the server that was returned by create_server() — this will block until the listening socket is fully open and ready to accept connections. This probably isn’t really required because the next thing it does is then call run_forever() which causes the event loop to process IO endlessly until explicitly terminated.

The meat of the application is then how ChatProtocol is implemented. This implements several callback methods which are invoked by the asyncio framework in response to different events:

  • A ChatProtocol instance is constructed in response to an incoming connection on the listening socket. No parameters are passed by asyncio — because the protocol needs an instance to the ChatServer instance this is passed via a closure by the lambda in the create_server() call.
  • Once the connection is ready, the connection_made() method is invoked which passes the transport that asyncio has allocated for the connection. This allows the protocol to store a reference to it for future writes, and also to trigger any actions required on a new connection — in this example, prompting the user for a username.
  • As data is received on the socket, data_received() is invoked to pass this to the protocol. In our example we only want line-oriented data (we don’t want to send a message to the chat room until the user presses return) so we buffer up data in a string and then process any complete lines found in it. Note that we also should take care of character encoding here — in our simplistic example we blindly assume UTF-8.
  • When we want to send data back to the user we invoke the write() method of the transport. Again, the transport expects raw bytes so we handle encoding to UTF-8 ourselves.
  • Finally, when the user terminates their connection then our connection_lost() method is invoked — in our example we use this to remove the user from the chatroom. Note that this is subtly different to the eof_received() callback which represents TCP half-close (i.e. the remote end called shutdown() with SHUT_WR) — this is important if you want to support protocols that indicate the end of a request in this manner.

That’s about all there is to it — with this in mind, the rest of example should be quite straightforward to follow. The only other aspect to mention is that once the loop has been terminated, we go ahead and call its close() method — this clears out any queued data, closes listening sockets, etc.

asyncio with coroutines

Since we’ve seen how to implement the chat server with callbacks, I think it’s high time we got back to the theme of this post and now compare that with an implementation of the same server with coroutines. In usual fashion, let’s jump in and look at the code first:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import asyncio
import sys

class ChatServer:

    def __init__(self, server_name, port, loop):
        self.server_name = server_name
        self.connections = {}
        self.server = loop.run_until_complete(
                asyncio.start_server(
                    self.accept_connection, "", port, loop=loop))

    def broadcast(self, message):
        for reader, writer in self.connections.values():
            writer.write((message + "\n").encode("utf-8"))

    @asyncio.coroutine
    def prompt_username(self, reader, writer):
        while True:
            writer.write("Enter username: ".encode("utf-8"))
            data = (yield from reader.readline()).decode("utf-8")
            if not data:
                return None
            username = data.strip()
            if username and username not in self.connections:
                self.connections[username] = (reader, writer)
                return username
            writer.write("Sorry, that username is taken.\n".encode("utf-8"))

    @asyncio.coroutine
    def handle_connection(self, username, reader):
        while True:
            data = (yield from reader.readline()).decode("utf-8")
            if not data:
                del self.connections[username]
                return None
            self.broadcast(username + ": " + data.strip())

    @asyncio.coroutine
    def accept_connection(self, reader, writer):
        writer.write(("Welcome to " + self.server_name + "\n").encode("utf-8"))
        username = (yield from self.prompt_username(reader, writer))
        if username is not None:
            self.broadcast("User %r has joined the room" % (username,))
            yield from self.handle_connection(username, reader)
            self.broadcast("User %r has left the room" % (username,))
        yield from writer.drain()


def main(argv):

    loop = asyncio.get_event_loop()
    server = ChatServer("Test Server", 4455, loop)
    try:
        loop.run_forever()
    finally:
        loop.close()


if __name__ == "__main__":
sys.exit(main(sys.argv))

As you can see, this version is written in quite a different style to the callback variant. This is because it’s using the streams API which is essentially a set of wrappers around the callbacks version which adapts them for use with a coroutines.

To use this API we call start_server() instead of create_server() — this wrapper changes the way the supplied callback is invoked and instead passes it two streams: StreamReader and StreamWriter instances. These represent the input and output sides of the socket, but importantly they’re also coroutines so that we can delegate to them with yield from.

On the subject of coroutines, you’ll notice that some of the methods have an @asyncio.coroutine decorator — this serves a practical function in Python 3.5 in that it enables you to delegate to the new style of coroutine that it defines. Pre-3.5 it’s therefore useful for future compatibility, but also serves as documentation that this method is being treated as a coroutine. You should always use it to decorate your coroutines, but this isn’t enforced anywhere.

Back to the code. Our accept_connection() method is the callback that we provided to the start_server() method and the lifetime of this method call is the same as the lifetime of the connection. We could implement the handling of a connection in a strictly linear fashion within this method — such is the flexibility of coroutines — but of course being good little software engineers we like to break things out into smaller functions.

In this case I’ve chosen to use a separate coroutine to handle prompting the user for their username, so accept_connection() delegates to prompt_username() with this line:

username = (yield from self.prompt_username(reader, writer))

Once delegated, this coroutine takes control for as long as it takes to obtain a unique username and then returns this value to the caller. It also handles storing the username and the writer in the connections member of the class — this is used by the broadcast() method to send messages to all users in the room.

The handle_connection() method is also implemented in quite a straightforward fashion, reading input and broadcasting it until it detects that the connection has been closed by an empty read. At this point it removes the user from the connections dictionary and returns control to accept_connection(). We finally call writer.drain() to send any last buffered output — this is rather pointless if the user’s connection was cut, but could still serve a purpose if they only half-closed or if the server is shutting down instead. After this we simply return and everything is cleaned up for us.

How does this version compare, then? It’s a little shorter for one thing — OK, that’s a little facile, what else? We’ve managed to lose the nested class, which seems to simplify the job somewhat — there’s less confusion about the division of responsibilities. We also don’t need to worry so much about where we store things — there’s no transport that we have to squirrel away somewhere while we wait for further callbacks. The reader and writer streams as just passed naturally through the callchain in an intuitive manner. Finally, we don’t have to engage in any messy buffering of data to obtain line-oriented input — the reader stream handles all that for us.

Conclusions

That about wraps it up for this post. Hopefully it’s been an interesting comparison — I know that I certainly feel like I understand the various layers of asyncio a little better having gone through this exercise.

It takes a bit of a shift in one’s thinking to use coroutine approach, and I think it’s helpful to have a bit of a handle on both mechanisms to better understand what’s going on under the hood, but overall the more I use the coroutine style for IO the more I like it. It feels like a good compromise between the intuitive straight-line approach of the thread-per-connection approach and the lock-free simplicity of non-blocking IO with callbacks.

In the next post I’m going to look at the new syntax for coroutines introduced in Python 3.5, which was the inspiration for writing this series of posts in the first place.


  1. Some people use the term asynchronous IO for what I’m discussing here, which is certainly the more general term, but I prefer to avoid it due to risk of confusion with the POSIX asynchronous IO interface. 

  2. In this example we use a hard-coded port of 4455 for simplicity. 

This is part 3 of the “State of Python Coroutines” series of posts:

5 Jul 2016 at 7:45AM by Andy Pearce in Software  | Photo by Andy Pearce  | Tags: python coroutines