I participated in the design and development of a couple of concurrency
libraries for shared-memory multiprocessors long before such machines were
popular. So when I started using java.util.concurrent
I was already somewhat comfortable with the concepts. But when I used
it more intensely for production work in the
Google Calendar server,
I ran into a couple of "gotcha"
situations. I'd like to tell you about one in particular, in part because it might
help you avoid the problem yourself, and in part because I believe this issue
exposes some missing functionality in the concurrency framework.
Many parallel programming problems can be expressed using
fork-join parallelism, in which tasks spawn, or "fork",
a number of subtasks that can be executed in parallel. The caller then
waits for these subtasks to complete by "join"ing with them.
Consider the following sequential program. It is an abstract model of
some larger program that has three logical layers.
class Program {
static final int N = 3;
public static void main(String[] args) {
doSomeWork();
loopNtimes(N, new Runnable() {
public void run() { doLayerOne(); }
});
System.out.println();
}
static void doLayerOne() {
doSomeWork();
loopNtimes(N, new Runnable() {
public void run() { doLayerTwo(); }
});
}
static void doLayerTwo() {
doSomeWork();
loopNtimes(N, new Runnable() {
public void run() { doLayerThree(); }
});
}
static void doLayerThree() {
doSomeWork();
}
static void loopNtimes(int n, Runnable runnable) {
for (int i=0; i<n; i++) runnable.run();
}
static void doSomeWork() {
System.out.print(".");
try { Thread.sleep(500L); } catch (InterruptedException _) {}
}
}
This program prints 40 dots, taking a half second for each one. It runs to
completion in about 20 seconds. Let's rewrite the loops as concurrent (instead
of sequential) loops, using an idiom recommended by Martin Buchholz. To do
that we replace the method loopNtimes with the following:
static ExecutorService threadPool = Executors.newCachedThreadPool();
static void loopNtimes(int n, Runnable runnable) {
Collection<Callable<Object>> c = new ArrayList<Callable<Object>>();
for (int i=0; i<n; i++) c.add(Executors.callable(runnable));
Collection<Future<Object>> futures = null;
try { futures = threadPool.invokeAll(c); } catch (InterruptedException _) {}
if (futures != null) for (Future<Object> f : futures) {
try { f.get(); }
catch (InterruptedException ex) {}
catch (ExecutionException ex) {
ex.printStackTrace();
System.exit(1);
}
}
}
This requires a couple of other minor changes to the program (two import
statements and System.exit(0) at the end of
main), but the program now runs in two seconds
instead of twenty. So far so good, but if N is larger, say a hundred,
this program fails. It throws OutOfMemoryError becuase it tries to
allocate too many threads. My first attempt to fix this replaced
the thread pool by one containing a fixed number of threads:
static ExecutorService threadPool = Executors.newFixedThreadPool(100);
This version of the program works and runs in 2 seconds. But why
should we use 100 threads? If we
imagine that the Thread.sleep statements represent
computationally intensive parts of the program, it might make more sense to
have a number of threads approximately the same as the number of physical
processors. I'm running this on a machine with an Intel Cetrino
Duo processor, which acts roughly like 2 processors. Let's be generous, however,
and make ten threads. So we modify this version of the program by changing
100 to 10. That won't be as fast as the version with 100 threads, but just
how fast will it be?
If you haven't guessed the punch line by now, I'll tell you: with ten
threads in the pool the program prints 11 periods and then
deadlocks! If you use a debugger to examine the state of the program to figure out
what's going on, you'll find the main thread waiting for
invokeAll, three threads in
doLayerOne waiting for
invokeAll, seven threads in
doLayerTwo waiting for
invokeAll, and there are no threads
left to do any of the work of calling
doLayerThree. This is a classic
thread starvation deadlock.
If you're just trying
out this program to see what happens, you might be slightly annoyed
and finally give up and hit control-C to quit the Java program,
but when our program (Google Calendar)
encounters this kind of problem our customers get annoyed,
give up, and sign up for a competitor like
Yahoo Calendar
or 30Boxes. Hey, don't
click those links; trust me, you really want
Google Calendar.
My point is that we can't leave this to chance.
What can or should we do about this problem? The first idea is to change
the 10 back into 100, but those numbers are pulled out of thin air. Without
analyzing the behavior and interaction of all the places where the thread
pool is used, understanding the dynamic performance of the application
under real loads, and placing bounds on the number of tasks that will
be used at each level in the
program's hierarchy, it is difficult or impossible to pick a number that will
always avoid this kind of deadlock. Another idea is to use unbounded
thread pools, but as we've seen under high load situations those can
cause an explosion in the number of threads, resulting in the program
failing by running out of memory.
What we did to address this issue is avoid the single monolithic thread
pool altogether. Instead, we use a separate thread pool at every level in
the hierarchy. In terms of this example, we would have a thread
pool for use in main, one for use
in doLayerOne, and one for use
in doLayerTwo.
Every subsystem that requires concurrency
gets its own personal thread pool.
That way every layer that uses concurrency is guaranteed to make progress
when it has work to do, so this kind of deadlock cannot occur.
But there is a cost to this as well: balancing the sizes of these thread
pools is a black art. During operation we have hundreds of
threads, most of which are sitting around doing nothing. Besides being a waste
of resources, the generous surplus of "extra" threads make debugging
more difficult than it should be. If the system doesn't break down so
neatly into layers (perhaps because there are recursive loops in the
call cycle of the subsystems) then even this solution can break down
and result in thread starvation.
The situation is not entirely hopeless. In my opinion, this kind of
thread starvation should never occur because there is always
one thread that can contribute processing power toward execution the
subtasks: the thread that is waiting for the subtasks to complete.
Here's
the implementation of invokeAll as it appears in the JDK:
public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
FutureTask<T> f = new FutureTask<T>(t);
futures.add(f);
execute(f);
}
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
f.get();
} catch(CancellationException ignore) {
} catch(ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
This code does not use the current thread to do any
of the work of invoking the callables. Below is a slightly modified
version (I've added a line to the original and refactored
it to make it a static method that we can put in the program) that
uses the current thread to do any work that another thread
hasn't already started. I've highlighted the newly added code:
public static <T> List<Future<T>> invokeAll(
ExecutorService threadPool, Collection<Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
FutureTask<T> f = new FutureTask<T>(t);
futures.add(f);
threadPool.execute(f);
}
// force unstarted futures to execute using the current thread
for (Future<T> f : futures) ((FutureTask)f).run();
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
f.get();
} catch(CancellationException ignore) {
} catch(ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
Using this version of invokeAll, the program does not
experience thread starvation. If the thread pool is reduced in size
to just one thread, the program runs to completion in about 11 seconds, because two
threads are contributing to doing the work (the main thread and
the thread from the pool).
I discussed this issue with Doug Lea, and he warned me that selecting
tasks for efficient scheduling in a fork-join concurrency framework is not
trivial; the standard solution is to have a double-ended queue for each
worker task where it enqueues its subtasks. The worker removes the most
recently generated task from this queue for itself to process, thereby
simulating a depth-first execution strategy in the single-thread case.
When a worker finds itself without any work to do, it steals work from
the other end of the queue of another task. That is, it should steal one
of the least-recently created (course-grained) subtasks. In addition,
it is beneficial to have a mechanism to avoid the queue altogether for
the bottom of the call chain. Doug told me that this strategy was pioneered
by the
Cilk work, but I first learned
about this strategy 10 years earlier reading
WorkCrews: An
Abstraction for Controlling Parallelism by Mark T. Vandervoorde and Eric
S. Roberts. My implementation provides exactly this
behavior but with a much simpler implementation. The invocation
of run executes one of the tasks most
recently generated by the current thread. When a thread has no more
work to do, it removes work from the queue of the underlying
ExecutorService, which is a FIFO queue, and so it takes
the least-recently generated task of all workers. On the other hand,
because this implementation shares a single queue among all worker
threads, there may be additional synchronization overhead compared to
the WorkCrews/Cilk solution.
It is possible to use the existing concurrency utilities to work
around the problem, if you don't mind the task scheduling being far
from optimal. You can do that by setting CallerRuns policy
on a ThreadPoolExecutor,
and using a synchronous queue:
static ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
static {
threadPool.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy());
}
Doug explained to me that the
earlier public-domain
version of the concurrency
utilities had a full implementation of a framework for fork-join
parallelism, but they didn't get included in JDK5:
"... The vast majority of such usages are
nicest to support as "loop parallelism" utilities.
And it is not so much that utilities based on FJ tasks
are inconvenient to use that has kept them
out, but instead uncertainty about basic APIs until closures
are settled. All of the methods for aggregate operations on
collections and arrays (applyToAll, map, reduce,
mapReduce, findAny, findAll, etc) require function-type
arguments (perhaps along with some sort of purity annotation as
might be introduced via JSR305) that, depending on how things work
out otherwise, would need to be introduced more generally."
Did you think I would get through an entire blog post without
mentioning Closures?