Thom Nichols

Thom

Technology is evolution outside the gene pool

Python Concurrency using a State Machine

I've been doing steadily heavier Python development, lately running into some concurrency programming problems.  I never thought I'd say this, but Python's concurrency APIs look java.util.concurrent look light-years ahead.  Some of Python's libraries seem designed after the Java equivalents (see threading.Thread and threading.Condition) but there's still definitely a gap. 

One notable omission is a Thread.interrupt() call.  Of course this isn't useful unless threads themselves consistently check for Thread.is_interrupted().  But the result of this missing API is that it's very easy to write python code that blocks (say, on a Lock.acquire() or a read() operation) and is completely unresponsive as a result. 

Thankfully, Condition.wait() takes an optional timeout, and higher-level primitives like Queue take advantage of that and provide a timeout parameter for most operations.  Let me put this simply: when you are doing thread-based concurrency, all blocking operations need to take a timeout (at least in Python) if you want them to be responsive.  Even if during that timeout all the thread does is check for some exit condition and then re-attempt the blocking operation.  Not having a timeout or some interrupt mechanism is a recipe for deadlock.  

So, the Condition class works (if nothing else) but not everyone understands it terribly well.  Queue is useful but not for all types of synchronization.  So here's another common concurrency idiom: the State Machine [wikipedia].  The idea is to let a thread do very simple operations like "do x as long as I'm in state A" and "If I'm in state A, perform some operation and transition to state B."  The magic really happens in the 'transition' function, which prevents race conditions and ensures two threads can't both perform a transition from A to B (or worse, from A to two different states) at the same time.  Once one thread has transitioned from A to B, you have to get back to A before you can transition to B again. 

A Simple Example

Imagine you have a single connection from a multi-threaded client to a server.  Any one of those client threads might detect that the connection has terminated, but you definitely don't want more than one thread to perform the connection at the same time.  So, you could simply synchronize your connect method, but it's not easy to tell if a socket is connected in Python until you try to read or write from it either.  So now you need a boolean value to establish if you're connected or not.  Something like this:
def connect(self.host,port):
    self.lock.acquire()
    try:
        if not self.connected:
            # perform connection operation here
            self.connected = True
    finally:
        self.lock.release() 

And you need to similarly synchronize the code where you set connected to False.  Otherwise you risk one thread calling connect() while another suddenly thinks it's disconnected and re-sets self.connected = False.  A better alternative is to use a State Machine to atomically transition between two (or more) states.  A two- state machine is as simple as it gets, but its utility versus the traditional Lock+boolean is still apparent.  Here's the above example written using a state machine:

# we create the state machine before the threads are started:
self.state = StateMachine(('connected', 'disconnected))

def connect(self,host,port):
    with self.state.transition_ctx('disconnected','connected') as locked:
        if locked: 
            # only one thread gets the lock; this one does the work:
            # ... do yer connection stuff ...
        else: print "Another thread is already connecting..."

This accomplishes three things: (1) ensures only one thread can transition from 'disconnected' to 'connected' (2) notify other waiting threads when the 'with' block exits, and (3) does not perform the transition if any exception is thrown from within the 'with' block.

Source code is here.  Dig in.

Next Steps

The next logical step is to add something like 'event handlers' -- but I'll call them transition handlers.  Imagine if you made a call like state.register_transition( 'disconnected', 'connected', self.connect ).  This suggests simply calling state.transition('disconnected', 'connected') will perform the connect (and maybe other operations) in a chain-of-responsibility sort of pattern.  There are a lot of places you could go from here.  Fork the code, try it out; if you find it particularly interesting, maybe I'll package it as its own project.

Category: Python concurrency