Supporting concurrency is increasingly important. In the past, mainstream concurrent programming generally meant ensuring that the code interacting with relatively slow network, disk, database, and other I/O resources did not unduly slow things down. Exploiting parallelism was typically only seen in such domains as scientific computing with the apps running on supercomputers.
But there are new factors at work now. The semiconductor industry continues to work feverishly to uphold Moore’s Law of exponential increase in chip density. Chip designers used to apply this bounty to speeding up an individual CPU. For a variety of reasons this old approach no longer works as well, so now chip designers are cramming chips with more CPUs and hardware threads. Speeding up execution means harnessing the parallelism of the hardware, and it is now our job as software developers to do that work.
The Java platform can help out here. The Java platform is arguably the most robust environment for running concurrent code today, and this functionality can be readily be used from Jython. The problem remains that writing concurrent code is still not easy. This is especially true with respect to a concurrency model based on threads, which is what today’s hardware natively exposes.
This means we have to be concerned with thread safety, which arises as an issue because of the existence of mutable objects that are shared between threads. (Mutable state might be avoidable in functional programming, but it would be hard to avoid in any but the most trivial Python code.) If you attempt to solve concurrency issues through synchronization, you run into other problems: besides the potential performance hit, there are opportunities for deadlock and livelock.
Note
Implementations of the JVM (like HotSpot) can often avoid the overhead of synchronization. We will discuss what’s necessary for this scenario to happen later in this chapter.
Given all of these issues, it has been argued that threading is just too difficult to get right. We’re not convinced by that argument. Useful concurrent systems have been written on the JVM, and that includes apps written in Jython. Key success factors in writing such code include:
One issue that you will have to consider in writing concurrent code is how much to make your implementation dependent on the Java platform. Here are our recommendations:
In practice, using Java’s support for higher level primitives should not impact the portability of your code so much. Using tasks in particular tends to keep all of this well isolated, and such thread safety considerations as thread confinement and safe publication remain the same.
Lastly, remember you can always mix and match.
Creating threads is easy, perhaps too easy. This example downloads a web page concurrently.
Listing 19-1. test_thread_creation.py
from threading import Thread
import urllib2
downloaded_page = None # global
def download(url):
"""Download ``url`` as a single string"""
global downloaded_page
downloaded_page = urllib2.urlopen(url).read()
print "Downloaded", downloaded_page[:200]
def main_work():
# do some busy work in parallel
print "Started main task"
x = 0
for i in xrange(100000000):
x += 1
print "Completed main task"
if __name__ == '__main__':
# perform the download in the background
Thread(target=lambda: download("http://www.jython.org")).start()
main_work()
Be careful not to inadvertently invoke the function; target takes a reference to the function object (typically a name if a normal function). Calling the function instead creates an amusing bug where your target function runs now, so everything looks fine at first. But no concurrency is happening, because the function call is actually being run by the invoking thread, not this new thread.
The target function can be a regular function, or an object that is callable (implements __call__). This latter case can make it harder to see that the target is a function object!
To wait for a thread to complete, call join on it. This enables working with the concurrent result. The only problem is getting the result. As we will see, publishing results into variables is safe in Jython, but it’s not the nicest way.
Note
Daemon threads present an alluring alternative to managing the lifecycle of threads. A thread is set to be a daemon thread before it is started:
# create a thread t
t.setDaemon(True)t.start()
Daemon status is inherited by any child threads. Upon JVM shutdown, any daemon threads are simply terminated, without an opportunity—or need—to perform cleanup or orderly shutdown.
This lack of cleanup means it’s important that daemon threads never hold any external resources, such as database connections or file handles. Any such resource will not be properly closed upon a JVM exit. For similar reasons, a daemon thread should never make an import attempt, as this can interfere with Jython’s orderly shutdown.
In production, the only use case for daemon threads is when they are strictly used to work with in-memory objects, typically for some sort of housekeeping. For example, you might use them to maintain a cache or compute an index.
Having said that, daemon threads are certainly convenient when playing around with some ideas. Maybe your lifecycle management of a program is to use “Control-C” to terminate. Unlike regular threads, running daemon threads won’t get in the way and prevent JVM shutdown. Likewise, a later example demonstrating deadlock uses daemon threads to enable shutdown without waiting on these deadlocked threads.
With that in mind, it’s generally best not use daemon threads. At the very least, serious thought should be given to their usage.
The threading.local class enables each thread to have its own instances of some objects in an otherwise shared environment. Its usage is deceptively simple:simply create an instance of threading.local, or a subclass, and assign it to a variable or other name. This variable could be global, or part of some other namespace. So far, this is just like working with any other object in Python.
Threads can then share the variable, but with a twist: each thread will see a different, thread-specific version of the object. This object can have arbitrary attributes added to it, each of which will not be visible to other threads.
Other options include subclassing threading.local. As usual, this allows one to define defaults and specify a more nuanced properties model. But one unique, and potentially useful, aspect is that any attributes specified in __slots__ will be shared across threads.
However, there’s a big problem when working with thread locals. Usually they don’t make sense because threads are not the right scope, but an object or a function is, especially through a closure. If you are using thread locals, you are implicitly adopting a model where threads are partitioning the work. But then you are binding the given piece of work to a thread. This makes using a thread pool problematic, because you have to clean up after the thread.
Note
In fact, we see this very problem in the Jython runtime. A certain amount of context needs to be made available to execute Python code. In the past, we would look this “ThreadState” up from the thread. Historically, this may have been in fact faster, but it now slows things down, and unnecessarily limits what a given thread can do. A future refactoring of Jython will likely remove the use of “ThreadState” completely, simultaneously speeding and cleaning things up. Having said that, thread locals might be useful in certain cases. One common scenario is where your code is being called by a component that you didn’t write. Or you may need to access a thread-local singleton. And of course, if you are using code whose architecture mandates thread locals, it’s just something you will have to work with.
But often this is unnecessary. Your code may be different, but Python gives you good tools to avoid action at a distance. You can use closures, decorators, even sometimes selectively monkey patching modules. Take advantage of the fact that Python is a dynamic language, with strong support for metaprogramming, and remember that the Jython implementation makes these techniques accessible when working with even recalcitrant Java code.
In the end, thread locals are an interesting aside. They do not work well in a task-oriented model, because you don’t want to associate context with a worker thread that will be assigned to arbitrary tasks. Without a lot of care, this can make for a confused mess.
Jython lacks the global interpreter lock (GIL), which is an implementation detail of CPython. For CPython, the GIL means that only one thread at a time can run Python code. This restriction also applies to much of the supporting runtime as well as extension modules that do not release the GIL. (Unfortunately development efforts to remove the GIL in CPython have so far only had the effect of slowing down Python execution significantly.)
The impact of the GIL on CPython programming is that threads are not as useful as they are in Jython. Concurrency will only be seen in interacting with I/O as well as scenarios where computation is performed by an extension module on data structures managed outside of CPython’s runtime. Instead, developers typically will use a process-oriented model to evade the restrictiveness of the GIL.
Again, Jython does not have the straightjacket of the GIL. This is because all Python threads are mapped to Java threads and use standard Java garbage collection support (the main reason for the GIL in CPython is because of the reference counting GC system). The important ramification here is that you can use threads for compute-intensive tasks that are written in Python.
Python does, however, define a module import lock, which is implemented by Jython. This lock is acquired whenever an import of any name is made. This is true whether the import goes through the import statement, the equivalent __import__ builtin, or related code. It’s important to note that even if the corresponding module has already been imported, the module import lock will still be acquired, if only briefly.
So don’t write code like this in a hot loop, especially in threaded code:
Listing 19-2.
def slow_things_way_down():
from foo import bar, baz
...
It may still make sense to defer your imports. Such deferral can decrease the start time of your app. Just keep in mind that thread(s) performing such imports will be forced to run single threaded because of this lock. So it might make sense for your code to perform deferred imports in a background thread.
Listing 19-3. background_import.py
from threading import Thread
import time
def make_imports():
import unicodedata
background_import = Thread(target=make_imports)
background_import.start()
print "Do something else while we wait for the import"
for i in xrange(10):
print i
time.sleep(0.1)
print "Now join..."
background_import.join()
print "And actually use unicodedata"
import unicodedata
So as you can see, you need to do at least two imports of a given module; one in the background thread; the other in the actual place(s) where the module’s namespace is being used.
Here’s why we need the module import lock: upon the first import, the import procedure runs the (implicit) top-level function of the module. Even though many modules are often declarative in nature, in Python all definitions are done at runtime. Such definitions potentially include further imports (recursive imports), and the top-level function can certainly perform much more complex tasks. The module import lock simplifies this setup so that it’s safely published. We will discuss this concept further later in this chapter.
Note that in the current implementation, the module import lock is global for the entire Jython runtime. This may change in the future.
It’s usually best to avoid managing the lifecycle of threads directly. Instead, the task model often provides a better abstraction.
Tasks describe the asynchronous computation to be performed. Although there are other options, the object you submit to be executed should implement Java’s Callable interface (a call method without arguments), as this best maps into working with a Python method or function. Tasks move through the states of being created, submitted (to an executor), started, and completed. Tasks can also be canceled or interrupted.
Executors run tasks using a set of threads. This might be one thread, a thread pool, or as many threads as necessary to run all currently submitted tasks concurrently. The specific choice comprises the executor policy, but generally you want to use a thread pool so as to control the degree of concurrency.
Futures allow code to access the result of a computation – or an exception, if thrown – in a task only at the point when it’s needed. Up until that point, the using code can run concurrently with that task. If it’s not ready, a wait-on dependency is introduced.
We are going to look at how we can use this functionality by using the example of downloading web pages. We will wrap this up so it’s easy to work with, tracking the state of the download, as well as any timing information.
Listing 19-4. downloader.py
import threading
import time
import urllib2
from java.util.concurrent import Callable
class Downloader(Callable):
def __init__(self, url):
self.url = url
self.started = None
self.completed = None
self.result = None
self.thread_used = None
self.exception = None
def __str__(self):
if self.exception:
return "[%s] %s download error %s in %.2fs" % \
(self.thread_used, self.url, self.exception,
self.completed - self.started, ) #, self.result)
elif self.completed:
return "[%s] %s downloaded %dK in %.2fs" % \
(self.thread_used, self.url, len(self.result)/1024,
self.completed - self.started, ) #, self.result)
elif self.started:
return "[%s] %s started at %s" % \
(self.thread_used, self.url, self.started)
else:
return "[%s] %s not yet scheduled" % \
(self.thread_used, self.url)
# needed to implement the Callable interface;
# any exceptions will be wrapped as either ExecutionException
# or InterruptedException
def call(self):
self.thread_used = threading.currentThread().getName()
self.started = time.time()
try:
self.result = urllib2.urlopen(self.url).read()
except Exception, ex:
self.exception = ex
self.completed = time.time()
return self
In Jython any other task could be done in this fashion, whether it is a database query or a computationally intensive task written in Python. It just needs to support the Callable interface.
Next, we need to create the futures. Upon completion of a future, either the result is returned, or an exception is thrown into the caller. This exception will be one of:
(This pushing of the exception into the asynchronous caller is thus similar to how a coroutine works when send is called on it.)
Now we have what we need to multiplex the downloads of several web pages over a thread pool.
Listing 19-5. test_futures.py
from downloader import Downloader
from shutdown import shutdown_and_await_termination
from java.util.concurrent import Executors, TimeUnit
MAX_CONCURRENT = 3
SITES = [
"http://www.cnn.com/",
"http://www.nytimes.com/",
"http://www.washingtonpost.com/",
"http://www.dailycamera.com/",
"http://www.timescall.com/",
]
pool = Executors.newFixedThreadPool(MAX_CONCURRENT)
downloaders = [Downloader(url) for url in SITES]
futures = pool.invokeAll(downloaders)
for future in futures:
print future.get(5, TimeUnit.SECONDS)
shutdown_and_await_termination(pool, 5)
Up until the get method on the returned future, the caller runs concurrently with this task. The get call then introduces a wait-on dependency on the task’s completion. (So this is like calling join on the supporting thread.)
Shutting down a thread pool should be as simple as calling the shutdown method on the pool. However, you may need to take into account that this shutdown can happen during extraordinary times in your code. Here’s the Jython version of a robust shutdown function, shutdown_and_await_termination, as provided in the standard Java docs.
Listing 19-6. shutdown.py
from java.util.concurrent import TimeUnit
def shutdown_and_await_termination(pool, timeout):
pool.shutdown()
try:
if not pool.awaitTermination(timeout, TimeUnit.SECONDS):
pool.shutdownNow()
if (not pool.awaitTermination(timeout, TimeUnit.SECONDS)):
print >> sys.stderr, "Pool did not terminate"
except InterruptedException, ex:
# (Re-)Cancel if current thread also interrupted
pool.shutdownNow()
# Preserve interrupt status
Thread.currentThread().interrupt()
The CompletionService interface provides a nice abstraction to working with futures. The scenario is that instead of waiting for all the futures to complete, as our code did with invokeAll, or otherwise polling them, the completion service will push futures as they are completed onto a synchronized queue. This queue can then be consumed, by consumers running in one or more threads.
Listing 19-7. test_completion.py
from downloader import Downloader
from shutdown import shutdown_and_await_termination
from java.util.concurrent import Executors, ExecutorCompletionService
import os
import hashlib
MAX_CONCURRENT = 3
SITES = [
"http://www.cnn.com/",
"http://www.nytimes.com/",
"http://www.washingtonpost.com/",
"http://www.dailycamera.com/",
"http://www.timescall.com/",
# generate a random web site name that is very, very unlikely to exist
"http://" + hashlib.md5(
"unlikely-web-site-" + os.urandom(4)).hexdigest() + ".com",
]
pool = Executors.newFixedThreadPool(MAX_CONCURRENT)
ecs = ExecutorCompletionService(pool)
# this function could spider the links from these roots;
# for now just schedule these roots directly
def scheduler(roots):
for site in roots:
yield site
# submit tasks indefinitely
for site in scheduler(SITES):
ecs.submit(Downloader(site))
# work with results as soon as they become available
submitted = len(SITES)
while submitted > 0:
result = ecs.take().get()
# here we just do something unimaginative with the result;
# consider parsing it with tools like beautiful soup
print result
submitted -= 1
print "shutting pool down..."
shutdown_and_await_termination(pool, 5)
print "done"
This setup enables a natural flow. Although it may be tempting to then schedule everything through the completion service’s queue, there are limits. For example, if you’re writing a scalable web spider, you would want to externalize this work queue, but for simple management, it would certainly suffice.
Note
Why use tasks instead of threads? A common practice too often seen in production code is the addition of threading in a haphazard fashion:
Heterogeneous threads. Perhaps you have one thread that queries the database. And another that rebuilds an associated index. What happens when you need to add another query?
Dependencies are managed through a variety of channels, instead of being formally structured. This can result in a rats’ nest of threads synchronizing on a variety of objects, often with timers and other event sources thrown in the mix.
It’s certainly possible to make this sort of setup work—just debug away—but using tasks, with explicit wait-on dependencies and time scheduling, makes it far simpler to build a simple, scalable system.
Thread safety addresses such questions as:
Jython ensures that its underlying mutable collection types – dict, list, and set – cannot be corrupted. But updates still might get lost in a data race.
However, other Java collection objects that your code might use would typically not have such no-corruption guarantees. If you need to use LinkedHashMap, so as to support an ordered dictionary, you will need to consider thread safety if it will be both shared and mutated.
Here’s a simple test harness we will use in our examples. ThreadSafetyTestCase subclasses unittest.TestCase, adding a new method assertContended.
Listing 19-8. threadsafety.py
import threading
import unittest
class ThreadSafetyTestCase(unittest.TestCase):
def assertContended(self, f, num_threads=20, timeout=2., args=()):
threads = []
for i in xrange(num_threads):
t = threading.Thread(target=f, args=args)
t.start()
threads.append(t)
for t in threads:
t.join(timeout)
timeout = 0.
for t in threads:
self.assertFalse(t.isAlive())
This new method runs a target function and asserts that all threads properly terminate. Then the testing code needs to check for any other invariants.
For example, we use this idea in Jython to test that certain operations on the list type are atomic. The idea is to apply a sequence of operations that perform an operation, then reverse it. One step forward, one step back. The net result should be right where you started, an empty list, which is what the test code asserts.
Listing 19-9. test_list.py
from threadsafety import ThreadSafetyTestCase
import threading
import time
import unittest
class ListThreadSafety(ThreadSafetyTestCase):
def test_append_remove(self):
lst = []
def tester():
# preserve invariant by adding, then removing a unique
# value (in this case, a reference to the worker thread
# executing this function)
ct = threading.currentThread()
for i in range(1000):
lst.append(ct)
time.sleep(0.0001)
lst.remove(ct)
self.assertContended(tester)
self.assertEqual(lst, [])
if __name__ == '__main__':
unittest.main()
Of course these concerns do not apply at all to immutable objects. Commonly used objects like strings, numbers, datetimes, tuples, and frozen sets are immutable, and you can create your own immutable objects too.
There are a number of other strategies in solving thread safety issues. We will look at them as follows:
We use synchronization to control the entry of threads into code blocks corresponding to synchronized resources. Through this control we can prevent data races, assuming a correct synchronization protocol. (This can be a big assumption!)
A threading.Lock ensures entry by only one thread. (In Jython, but unlike CPython, such locks are always reentrant; there’s no distinction between threading.Lock and threading.RLock.) Other threads have to wait until that thread exits the lock. Such explicit locks are the simplest and perhaps most portable synchronization to perform.
You should generally manage the entry and exit of such locks through a with-statement; failing that, you must use a try-finally to ensure that the lock is always released when exiting a block of code.
Here’s some example code using the with-statement. The code allocates a lock, then shares it amongst some tasks.
Listing 19-10. test_lock.py—LockTestCase.test_with_lock
def test_with_lock(self):
counter = [0]
lock = Lock()
def loop100(counter):
for i in xrange(100):
with lock:
counter[0] += 1
# sleeping helps ensures that all threads run in our test
time.sleep(0.0001)
self.assertContended(loop100, args=(counter,), num_threads=20)
self.assertEqual(counter[0], 2000) # 20 threads * 100 loops/thread
Alternatively, you can do this with try-finally.
Listing 19-11. test_lock.py—LockTestCase.test_try_finally_lock
def test_try_finally_lock(self):
counter = [0]
lock = Lock()
def loop100(counter):
for i in xrange(100):
lock.acquire()
try:
counter[0] += 1
finally:
lock.release()
time.sleep(0.0001)
self.assertContended(loop100, args=(counter,), num_threads=20)
self.assertEqual(counter[0], 2000)
But don’t do this. It’s actually slower than the with-statement, and using the with-statement version also results in more idiomatic Python code.
Another possibility is to use the synchronize module, which is specific to Jython. This module provides a make_synchronized decorator function, which wraps any callable in Jython in a synchronized block.
Listing 19-12. test_synchronized.py
from synchronize import make_synchronized
from threadsafety import ThreadSafetyTestCase
import time
import unittest
@make_synchronized
def increment_counter(counter):
counter[0] += 1
# sleeping helps ensures that all threads run in our test
time.sleep(0.0001)
class SynchronizedTestCase(ThreadSafetyTestCase):
def test_counter(self):
def loop100(counter):
for i in xrange(100):
increment_counter(counter)
counter = [0]
self.assertContended(loop100, args=(counter,), num_threads=20)
self.assertEqual(counter[0], 2000) # 20 threads * 100 loops/thread
if __name__ == '__main__':
unittest.main()
In this case, you don’t need to explicitly release anything. Even in the case of an exception, the synchronization lock is always released upon exit from the function. Again, this version is also slower than the with-statement form, and it doesn’t use explicit locks.
Note
Synchronization and the with-statement
Jython’s current runtime (as of 2.5.1) can execute the with-statement form more efficiently through both runtime support and how this statement is compiled. The reason is that most JVMs can perform analysis on a chunk of code (the compilation unit, including any inlining) to avoid synchronization overhead, so long as two conditions are met. First, the chunk contains both the lock and unlock. And second, the chunk is not too long for the JVM to perform its analysis. The with-statement’s semantics make it relatively easy for us to do that when working with built-in types like threading.Lock, while avoiding the overhead of Java runtime reflection.
In the future, support of the new invokedynamic bytecode should collapse these performance differences.
The threading module offers portability, but it’s also minimalist. You may want to use the synchronizers in Java.util.concurrent instead of their wrapped versions in threading. In particular, this approach is necessary if you want to wait on a lock with a timeout. Also, you may want to use factories like Collections.synchronizedMap, when applicable, to ensure the underlying Java object has the desired synchronization.
But use synchronization carefully: this code will always eventually deadlock.
Listing 19-13. deadlock.py
from __future__ import with_statement
from threading import Thread, Lock
from java.lang.management import ManagementFactory
import time, threading
def cause_deadlock():
counter = [0]
lock_one = Lock()
lock_two = Lock()
threads = [
Thread(
name="thread #1", target=acquire_locks,
args=(counter, lock_one, lock_two)),
Thread(
name="thread #2 (reversed)", target=acquire_locks,
args=(counter, lock_two, lock_one))]
for thread in threads:
thread.setDaemon(True) # make shutdown possible after deadlock
thread.start()
thread_mxbean = ManagementFactory.getThreadMXBean()
while True:
time.sleep(1)
print "monitoring thread", counter[0]
thread_ids = thread_mxbean.findDeadlockedThreads()
if thread_ids:
print "monitoring thread: deadlock detected, shutting down", list(thread_ids)
break
def acquire_locks(counter, lock1, lock2):
# Should eventually deadlock if locks are acquired in different order
name = threading.currentThread().getName()
while True:
with lock1:
with lock2:
counter[0] += 1
print name, counter[0]
if __name__ == '__main__':
cause_deadlock()
Deadlock results from a cycle of any length of wait-on dependencies. For example, Alice is waiting on Bob, but Bob is waiting on Alice. Without a timeout or other change in strategy—Alice just gets tired of waiting on Bob!—this deadlock will not be broken.
Avoiding deadlocks can be done by never acquiring locks such that a cycle like that can be created. If we rewrote the example so that locks are acquired in the same order (Bob always allows Alice to go first), there would be no deadlocks. However, this ordering is not always so easy to do. Often, a more robust strategy is to allow for timeouts.
The Queue module implements a first-in, first-out synchronized queue. (Synchronized queues are also called blocking queues, and that’s how they are described in java.util.concurrent.) Such queues represent a thread-safe way to send objects from one or more producing threads to one or more consuming threads.
Often, you will define a poison object to shut down the queue. This will allow any consuming, but waiting, threads to immediately shut down. Or just use Java’s support for executors to get an off-the-shelf solution.
If you need to implement another policy, such as last-in, first-out or based on a priority, you can use the comparable synchronized queues in java.util.concurrent as appropriate. (Note these have since been implemented in Python 2.6, so they will be made available when Jython 2.6 is eventually released.)
Condition objects allow for one thread to notify another thread that’s waiting on a condition to wake up; notifyAll is used to wake up all such threads. Along with Queue, this is probably the most versatile of the synchronizing objects for real usage.
Condition objects are always associated with a Lock. Your code needs to bracket waiting and notifying the condition by acquiring the corresponding lock, then finally (as always!) releasing it. As usual, this is easiest done in the context of the with-statement.
For example, here’s how we actually implement a Queue in the standard library of Jython (just modified here to use the with-statement). We can’t use a standard Java blocking queue, because the requirement of being able to join on the queue when there’s no more work to be performed requires a third condition variable.
Listing 19-15. Queue.py
"""A multi-producer, multi-consumer queue."""
from __future__ import with_statement
from time import time as _time
from collections import deque
__all__ = ['Empty', 'Full', 'Queue']
class Empty(Exception):
"Exception raised by Queue.get(block=0)/get_nowait()."
pass
class Full(Exception):
"Exception raised by Queue.put(block=0)/put_nowait()."
pass
class Queue:
"""Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
"""
def __init__(self, maxsize=0):
try:
import threading
except ImportError:
import dummy_threading as threading
self._init(maxsize)
# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = threading.Lock()
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = threading.Condition(self.mutex)
# Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
self.not_full = threading.Condition(self.mutex)
# Notify all_tasks_done whenever the number of unfinished tasks
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
def task_done(self):
"""Indicate that a formerly enqueued task is complete.
Used by Queue consumer threads. For each get() used to fetch a task,
a subsequent call to task_done() tells the queue that the processing
on the task is complete.
If a join() is currently blocking, it will resume when all items
have been processed (meaning that a task_done() call was received
for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items
placed in the queue.
"""
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notifyAll()
self.unfinished_tasks = unfinished
def join(self):
"""Blocks until all items in the Queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the
queue. The count goes down whenever a consumer thread calls task_done()
to indicate the item was retrieved and all work on it is complete.
When the count of unfinished tasks drops to zero, join() unblocks.
"""
with self.all_tasks_done:
while self.unfinished_tasks:
self.all_tasks_done.wait()
def qsize(self):
"""Return the approximate size of the queue (not reliable!)."""
self.mutex.acquire()
n = self._qsize()
self.mutex.release()
return n
def empty(self):
"""Return True if the queue is empty, False otherwise (not reliable!)."""
self.mutex.acquire()
n = self._empty()
self.mutex.release()
return n
def full(self):
"""Return True if the queue is full, False otherwise (not reliable!)."""
self.mutex.acquire()
n = self._full()
self.mutex.release()
return n
def put(self, item, block=True, timeout=None):
"""Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
"""
with self.not_full:
if not block:
if self._full():
raise Full
elif timeout is None:
while self._full():
self.not_full.wait()
else:
if timeout < 0:
raise ValueError("'timeout' must be a positive number")
endtime = _time() + timeout
while self._full():
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
def put_nowait(self, item):
"""Put an item into the queue without blocking.
Only enqueue the item if a free slot is immediately available.
Otherwise raise the Full exception.
"""
return self.put(item, False)
def get(self, block=True, timeout=None):
"""Remove and return an item from the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
"""
with self.not_empty:
if not block:
if self._empty():
raise Empty
elif timeout is None:
while self._empty():
self.not_empty.wait()
else:
if timeout < 0:
raise ValueError("'timeout' must be a positive number")
endtime = _time() + timeout
while self._empty():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
def get_nowait(self):
"""Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise
raise the Empty exception.
"""
return self.get(False)
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
# Initialize the queue representation
def _init(self, maxsize):
self.maxsize = maxsize
self.queue = deque()
def _qsize(self):
return len(self.queue)
# Check whether the queue is empty
def _empty(self):
return not self.queue
# Check whether the queue is full
def _full(self):
return self.maxsize > 0 and len(self.queue) == self.maxsize
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
# Get an item from the queue
def _get(self):
return self.queue.popleft()
There are other mechanisms to synchronize, including exchangers, barriers, latches, etc. You can use semaphores to describe scenarios where it’s possible for multiple threads to enter, or use locks that are set up to distinguish reads from writes. There are many possibilities for the Java platform. In our experience, Jython should be able to work with any of them.
An atomic operation is inherently thread safe. Data races and object corruption do not occur, and it’s not possible for other threads to see an inconsistent view.
Atomic operations are therefore simpler to use than synchronization. In addition, atomic operations will often use underlying support in the CPU, such as a compare-and-swap instruction. They may use locking too. The important thing to know is that the lock is not directly visible. Also, if synchronization is used, it’s not possible to expand the scope of the synchronization. In particular, callbacks and iteration are not feasible.
Python guarantees the atomicity of certain operations, although at best it’s only informally documented. Fredrik Lundh’s article on “Thread Synchronization Methods in Python” summarizes the mailing list discussions and the state of the CPython implementation. Quoting his article, the following are atomic operations for Python code:
Although unstated, this also applies to equivalent ops on the builtin set type.
For CPython, this atomicity emerges from combining its Global Interpreter Lock (GIL), the Python bytecode virtual machine execution loop, and the fact that types like dict and list are implemented natively in C and do not release the GIL.
Despite the fact that this is in some sense accidentally emergent, it is a useful simplification for the developer. It’s what existing Python code expects, so this is what we have implemented in Jython.
In particular, because dict is a ConcurrentHashMap, we also expose the following methods to atomically update dictionaries:
It’s important to note that iterations are not atomic, even on a ConcurrentHashMap.
Atomic operations are useful, but they are pretty limited too. Often, you still need to use synchronization to prevent data races, and this has to be done with care to avoid deadlocks and starvation.
Thread confinement is often the best way to resolve problems seen in working with mutable objects. In practice, you probably don’t need to share a large percentage of the mutable objects used in your code. Very simply put, if you don’t share, then thread safety issues go away.
Not all problems can be reduced to using thread confinement. There are likely some shared objects in your system, but in practice most can be eliminated. And often the shared state can be someone else’s problem:
Unfortunately thread confinement is not without issues in Jython. For example, if you use StringIO, you have to pay the cost that this class uses list, which is synchronized. Although it’s possible to further optimize the Jython implementation of the Python standard library, if a section of code is hot enough, you may want to consider rewriting that in Java to ensure no additional synchronization overhead.
Lastly, thread confinement is not perfect in Python, because of the possibility of introspecting on frame objects. This means your code can see local variables in other threads, and the objects they point to. But this is really more of an issue for how optimizable Jython is when run on the JVM. It won’t cause thread safety issues if you don’t exploit this loophole. We will discuss this more in the next section, on the Python Memory Model.
Reasoning about concurrency in Python is easier than in Java. This is because the memory model is not as surprising to our conventional reasoning about how programs operate. However, this also means that Python code sacrifices significant performance to keep it simpler.
Here’s why. In order to maximize Java performance, it’s allowed for a CPU to arbitrarily re-order the operations performed by Java code, subject to the constraints imposed by happens-before and synchronizes-with relationships. (The published Java memory model goes into more details on these constraints.) Although such reordering is not visible within a given thread, the problem is that it’s visible to other threads. Of course, this visibility only applies to changes made to non-local objects; thread confinement still applies. In particular, this means you cannot rely on the apparent sequential ordering of Java code when looking at two or more threads.
Python is different. The fundamental thing to know about Python, and what we have implemented in Jython, is that setting any attribute in Python is a volatile write; and getting any attribute is a volatile read. This is because Python attributes are stored in dictionaries, and in Jython, this follows the semantics of the backing ConcurrentHashMap. So get and set are volatile. So this means that Python code has sequential consistency. Execution follows the ordering of statements in the code. There are no surprises here.
This means that safe publication is pretty much trivial in Python, when compared to Java. Safe publication means the thread safe association of an object with a name. Because this is always a memory-fenced operation in Python, your code simply needs to ensure that the object itself is built in a thread-safe fashion; then publish it all at once by setting the appropriate variable to this object.
If you need to create module-level objects—singletons—then you should do this in the top-level script of the module so that the module import lock is in effect.
Long-threading threads should provide some opportunity for cancellation. The typical pattern is something like the following example.
Listing 19-16.
class DoSomething(Runnable):
def __init__(self):
cancelled = False
def run(self):
while not self.cancelled:
do_stuff()
Remember, Python variables are always volatile, unlike Java. There are no problems with using a cancelled flag like this.
Thread interruption allows for even more responsive cancellation. In particular, if a thread is waiting on most any synchronizers, such as a condition variable or on file I/O, this action will cause the waited-on method to exit with an InterruptedException. (Unfortunately lock acquisition, except under certain cases such as using lockInterruptibly on the underlying Java lock, is not interruptible.)
Although Python’s threading module does not itself support interruption, it is available through the standard Java thread API. First, let’s import this class (we will rename it to JThread so it doesn’t conflict with Python’s version).
Listing 19-17.
from java.lang import Thread as JThread
As you have seen, you can use Java threads as if they are Python threads. So logically you should be able to do the converse: use Python threads as if they are Java threads. It would be nice to make calls like JThread.interrupt(obj).
Note
Incidentally, this formulation, instead of obj.interrupt(), looks like a static method on a class, as long as we pass in the object as the first argument. This adaptation is a good use of Python’s explicit self.
But there’s a problem here. As of the latest released version (Jython 2.5.1), we forgot to include an appropriate __tojava__ method on the Thread class! So this looks like you can’t do this trick after all.
Or can you? What if you didn’t have to wait until we fix this bug? You could explore the source code – or look at the class with dir. One possibility would be to use the nominally private _thread attribute on the Thread object. After all _thread is the attribute for the underlying Java thread. Yes, this is an implementation detail, but it’s probably fine to use. It’s not so likely to change.
But we can do even better. We can monkey patch the Thread class such that it has an appropriate __tojava__ method, but only if it doesn’t exist. So this patching is likely to work with a future version of Jython because we are going to fix this missing method before we even consider changing its implementation and removing _thread.
So here’s how we can monkey patch, following a recipe of Guido van Rossum.
Listing 19-18. monkeypatch.py
# http://mail.python.org/pipermail/python-dev/2008-January/076194.html
# - a recipe of Guido van Rossum
def monkeypatch_method(cls):
def decorator(func):
setattr(cls, func.__name__, func)
return func
return decorator
# and a useful variant, with a good ugly name
def monkeypatch_method_if_not_set(cls):
def decorator(func):
if not hasattr(cls, func.__name__):
setattr(cls, func.__name__, func)
return func
return decorator
This monkeypatch_method decorator allows us to add a method to a class after the fact. (This is what Ruby developers call opening a class.) Use this power with care. But again, you shouldn’t worry too much when you keep such fixes to a minimum, especially when it’s essentially a bug fix like this one. In our case, we will use a variant, the monkeypatch_method_if_not_set decorator, to ensure we only patch if it has not been fixed by a later version.
Putting it all together, we have this code:
Listing 19-19. interrupt.py
from __future__ import with_statement
from threading import Condition, Lock, Thread
from java.lang import Thread as JThread, InterruptedException
from monkeypatch import monkeypatch_method_if_not_set
import time, threading
@monkeypatch_method_if_not_set(Thread)
def __tojava__(self, java_type):
return self._thread
def be_unfair():
unfair_condition = Condition()
threads = [
Thread(
name="thread #%d" % i,
target=wait_until_interrupted,
args=(unfair_condition,))
for i in xrange(5)]
for thread in threads:
thread.start()
time.sleep(5)
# threads should not be doing anything now, can verify by looking at some shared state
# instead of notifying, we will interrupt the threads
for thread in threads:
JThread.interrupt(thread)
# or you can use this equivalent op
# thread.__tojava__(JThread).interrupt()
for thread in threads:
thread.join()
def wait_until_interrupted(cv):
name = threading.currentThread().getName()
with cv:
while not JThread.currentThread().isInterrupted():
try:
print "Waiting pointlessly %s" % name
cv.wait()
except InterruptedException, e:
break
print "Finished %s" % name
if __name__ == '__main__':
be_unfair()
(It does rely on the use of threading.Condition to have something to wait on.)
Lastly, you could simply access interruption through the cancel method provided by a Future. No need to monkey patch!
Jython can fully take advantage of the underlying Java platform’s support for concurrency. You can also use the standard Python threading constructs, which in most cases just wrap the corresponding Java functionality. The standard mutable Python collection types have been implemented in Jython with concurrency in mind; and Python’s sequential consistency removes some potential bugs.
But concurrent programming is still not easy to get right, either in Python or in Java. You should consider higher-level concurrency primitives, such as tasks, and you should be disciplined in how your code shares mutable state.