I have written a bit of threaded Python code recently. I’m collecting here a bunch of notes and thoughts for programming threaded python. A lot of this is inspired by my exploration of Erlang. Opinions and ideas are very welcome, as I am fairly new to this.
Its worth noting that while this approach is more manageable than shared state, it still suffers from a lack of composability – See Steven’s comments below.
Introduction
While I won’t go into detail on the libraries mentioned, here is a quick summary of the basics:
-
threadingcontains the classThreadand the functioncurrentThread, as well as other things I haven’t needed. Looking atThreadis advisable but im only using the constructor,runandstart -
timei just use thesleepfunction -
Queuecontains a classQueuethat are used extensively.
Message Queues
Queue.Queue is the fundamental primitive. It implements a thread safe syncronised queue that blocks. I use this (as suggested in Python in a Nutshell) for every thread if it needs to communicate anything.
The basic idiom I have been using is:
1 2 3 4 5 6 7 8 9 | class MyProcess(Thread): def __init__(self): super(MyProcess, self).__init__() self._messages = Queue() def run(self): while True: message = self._messages.get() # do stuff with the message |
Messages
Erlang uses its destructuring and unifaction as a powerful message passing tool, that feels really unnatural, so I have taken to having operations on the class be the messages it performs inside the thread.
In addition, I am using lambdas as the messages. eg:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | class MyProcess(Thread): def __init__(self): super(MyProcess, self).__init__() self._messages = Queue() def run(self): while True: message = self._messages.get() message() def do_atomicly(self): if currentThread() is not self: self._messages.put(lambda:self.do_atomicly()) else: pass # my atomic action here |
Each message them becomes an outward facing method on the MyProcess instance that is run atomically in the thread when it next gets a chance.
The following is approximately a message decorator:
1 2 3 4 5 6 7 | def message(method): def handler(self, *args, **kwargs): if currentThread() is not self._thread: self._messages.put(lambda:method(self, *args, **kwargs)) else: method(self, *args, **kwargs) return handler |
Its worth noting that messages don’t return anything. You will need to push changes back with other messages later on, so your message may want to include a sender.
Composition rather than inheritance
This model so far has been using inheritance for implementation. Steven‘s suggestion which I am leaning heavily towards is this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | def loop_over_queue(q): def loop(): while True: q.get()() return loop class MyProcess(Object): def __init__(self): self._messages = Queue() self._thread = Thread(target=loop_over_queue(self._messages)) self._thread.start() @message def do_atomicly(self): pass # my atomic action here |
This results in better adherence to the single responsibility principle and be conceptually easier to understand. The thread is owned by the object and they interact through a message queue.
loop_over_queue is a generalised main loop for a message processing thread. Obviously you may want a richer main loop, eg one with exception handling.
Timing
Sleeping a thread with a loop blocking on messages means you wont get messages for x seconds. That may be fine, but it might not be. In this situation you want an alarm thread. eg:
1 2 3 4 5 | def alarm(seconds, callback): def waiter(): time.sleep(seconds) callback() Thread(target=waiter).start() |
The callback is going to be a function that probably calls a message.
On the absence of locks
If you are familiar with threading you may notice a complete disregard for explicit locking shown above. This is because shared state concurrency is hard. To avoid issues of deadlocks and race conditions, no two pieces of code can write to the same object
Astute observers will notice that a lot of pass by reference of objects goes on here with message queues and that python does not enforce the copy on send / immutable behaviour that this model assumes.
There are two rules here:
- A thread sending a message must ensure that any data it wishes to retain and (in particular) mutate must be copied before sending.
- A thread recieving a message may assume that all the data it recieves is safe to mutate
Comments
Performing multiple actions in a transaction is difficult using the model discussed above. Consider the following where db is implemented as a process as discussed above.
Thread A:
q = Queue()
db.get(b)
db.set(b.get()+1);
Thread B:
q = Queue()
db.get(b)
db.set(b.get()+1);
There is a race condition here. One solution is to have mutate instead of set and get. for instance:
Thread:
db.mutate(lambda x: x+1)
This fixes the immediate problem but is not very general. A more flexible solution is to just have an atomically message that takes an action. for instance:
Thread:
def action:
q = Queue()
db.get(b)
db.set(b.get()+1);
db.atomically(action)
This approach causes some issues, for instance the db is unable to process any messages until the transaction has finished. This will cause a deadlock in the situation where db needs to receive a message in order to complete set or get. Solutions to this involve having two queues, one for public API events and another for internal, implementation events. At this point things get far too messy for my liking. STM wins again ;)
Suggestions welcome — Steven
Other bits
Heres a simple function for defering a callable to a background thread, which just executes it and dies:
1 2 3 4 5 | def fire_and_forget(func, *args, **kwargs): """Runs a function in its own thread. func shouldnt mess with it args.""" Thread(target=lambda:func(*args, **kwargs)).start() |
eg, from a django context:
fire_and_forget(send_mail, "subject", "body", "from@example.com", ["to@example.com"])
mattw is using a slight variation of this:
1 2 3 4 | def background(func, *args, **kwargs): t = Thread(target = lambda: func(*args, **kwargs)) t.setDaemon(True) t.start() |
(I’m not sure if the setDaemon call is truly necessary, but it seems like a good idea given that we don’t care about any further contact with the thread.)
A thread can be flagged as a
daemon thread”. The significance of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread. The flag can be set with the setDaemon() method and retrieved with the isDaemon() method.
I might be wrong, but the way I read that it is saying that the if the process exits, then it will wait for any non-daemonic threads. If the thread is a daemon it will just be killed.
When the function that is the thread’s entry point exits, I believe the thread just exits. If you know when the process may die its ok, but if it may just die after a request is handled (depending on the implementation of the sites handler) this may just result in the task being aborted half way through.
mattw says: Ah, ok, I’d figured it was something like that. I was thinking maybe it ‘disowned’ the thread or something; e.g. the calling context didn’t have to wait around. Mea culpa. I do believe I’ll change it back.
Exiting a thread
I’m not sure how solid this is, but I think it is probably the easiest
way to get a thread to exit. complete will get pushed onto the queue as the next action and will cause the thread to finish by raising an exception:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | class ThreadComplete(Exception): pass def wait_for_complete(fun): """A function decorator that will catch ThreadComplete for you and ignore it, while letting other exceptions through. """ def inner(*args, **kwargs): try: fun(*args, **kwargs) except ThreadComplete, e: pass return inner @message def complete(self): raise ThreadComplete() |
This means you can then add complete to a message handling class.
