Monday, November 20, 2006

A Thread Pool Puzzler

I participated in the design and development of a couple of concurrency libraries for shared-memory multiprocessors long before such machines were popular. So when I started using java.util.concurrent I was already somewhat comfortable with the concepts. But when I used it more intensely for production work in the Google Calendar server, I ran into a couple of "gotcha" situations. I'd like to tell you about one in particular, in part because it might help you avoid the problem yourself, and in part because I believe this issue exposes some missing functionality in the concurrency framework.

Many parallel programming problems can be expressed using fork-join parallelism, in which tasks spawn, or "fork", a number of subtasks that can be executed in parallel. The caller then waits for these subtasks to complete by "join"ing with them. Consider the following sequential program. It is an abstract model of some larger program that has three logical layers.

class Program {
    static final int N = 3;
    public static void main(String[] args) {
        doSomeWork();
        loopNtimes(N, new Runnable() {
                public void run() { doLayerOne(); }
            });
        System.out.println();
    }

    static void doLayerOne() {
        doSomeWork();
        loopNtimes(N, new Runnable() {
                public void run() { doLayerTwo(); }
            });
    }

    static void doLayerTwo() {
        doSomeWork();
        loopNtimes(N, new Runnable() {
                public void run() { doLayerThree(); }
            });
    }

    static void doLayerThree() {
        doSomeWork();
    }
    static void loopNtimes(int n, Runnable runnable) {
        for (int i=0; i<n; i++) runnable.run();
    }
    static void doSomeWork() {
        System.out.print(".");
        try { Thread.sleep(500L); } catch (InterruptedException _) {}
    }
}

This program prints 40 dots, taking a half second for each one. It runs to completion in about 20 seconds. Let's rewrite the loops as concurrent (instead of sequential) loops, using an idiom recommended by Martin Buchholz. To do that we replace the method loopNtimes with the following:

    static ExecutorService threadPool = Executors.newCachedThreadPool();
      static void loopNtimes(int n, Runnable runnable) {
        Collection<Callable<Object>> c = new ArrayList<Callable<Object>>();
        for (int i=0; i<n; i++) c.add(Executors.callable(runnable));
        Collection<Future<Object>> futures = null;
        try { futures = threadPool.invokeAll(c); } catch (InterruptedException _) {}
        if (futures != null) for (Future<Object> f : futures) {
            try { f.get(); }
            catch (InterruptedException ex) {}
            catch (ExecutionException ex) {
                ex.printStackTrace();
                System.exit(1);
            }
        }
    }

This requires a couple of other minor changes to the program (two import statements and System.exit(0) at the end of main), but the program now runs in two seconds instead of twenty. So far so good, but if N is larger, say a hundred, this program fails. It throws OutOfMemoryError becuase it tries to allocate too many threads. My first attempt to fix this replaced the thread pool by one containing a fixed number of threads:

    static ExecutorService threadPool = Executors.newFixedThreadPool(100);

This version of the program works and runs in 2 seconds. But why should we use 100 threads? If we imagine that the Thread.sleep statements represent computationally intensive parts of the program, it might make more sense to have a number of threads approximately the same as the number of physical processors. I'm running this on a machine with an Intel Cetrino Duo processor, which acts roughly like 2 processors. Let's be generous, however, and make ten threads. So we modify this version of the program by changing 100 to 10. That won't be as fast as the version with 100 threads, but just how fast will it be?

If you haven't guessed the punch line by now, I'll tell you: with ten threads in the pool the program prints 11 periods and then deadlocks! If you use a debugger to examine the state of the program to figure out what's going on, you'll find the main thread waiting for invokeAll, three threads in doLayerOne waiting for invokeAll, seven threads in doLayerTwo waiting for invokeAll, and there are no threads left to do any of the work of calling doLayerThree. This is a classic thread starvation deadlock.

If you're just trying out this program to see what happens, you might be slightly annoyed and finally give up and hit control-C to quit the Java program, but when our program (Google Calendar) encounters this kind of problem our customers get annoyed, give up, and sign up for a competitor like Yahoo Calendar or 30Boxes. Hey, don't click those links; trust me, you really want Google Calendar. My point is that we can't leave this to chance.

What can or should we do about this problem? The first idea is to change the 10 back into 100, but those numbers are pulled out of thin air. Without analyzing the behavior and interaction of all the places where the thread pool is used, understanding the dynamic performance of the application under real loads, and placing bounds on the number of tasks that will be used at each level in the program's hierarchy, it is difficult or impossible to pick a number that will always avoid this kind of deadlock. Another idea is to use unbounded thread pools, but as we've seen under high load situations those can cause an explosion in the number of threads, resulting in the program failing by running out of memory.

What we did to address this issue is avoid the single monolithic thread pool altogether. Instead, we use a separate thread pool at every level in the hierarchy. In terms of this example, we would have a thread pool for use in main, one for use in doLayerOne, and one for use in doLayerTwo. Every subsystem that requires concurrency gets its own personal thread pool. That way every layer that uses concurrency is guaranteed to make progress when it has work to do, so this kind of deadlock cannot occur. But there is a cost to this as well: balancing the sizes of these thread pools is a black art. During operation we have hundreds of threads, most of which are sitting around doing nothing. Besides being a waste of resources, the generous surplus of "extra" threads make debugging more difficult than it should be. If the system doesn't break down so neatly into layers (perhaps because there are recursive loops in the call cycle of the subsystems) then even this solution can break down and result in thread starvation.

The situation is not entirely hopeless. In my opinion, this kind of thread starvation should never occur because there is always one thread that can contribute processing power toward execution the subtasks: the thread that is waiting for the subtasks to complete. Here's the implementation of invokeAll as it appears in the JDK:

    public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                FutureTask<T> f = new FutureTask<T>(t);
                futures.add(f);
                execute(f);
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try { 
                        f.get(); 
                    } catch(CancellationException ignore) {
                    } catch(ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures) 
                    f.cancel(true);
        }
    }

This code does not use the current thread to do any of the work of invoking the callables. Below is a slightly modified version (I've added a line to the original and refactored it to make it a static method that we can put in the program) that uses the current thread to do any work that another thread hasn't already started. I've highlighted the newly added code:

    public static <T> List<Future<T>> invokeAll(
            ExecutorService threadPool, Collection<Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                FutureTask<T> f = new FutureTask<T>(t);
                futures.add(f);
                threadPool.execute(f);
            }
            // force unstarted futures to execute using the current thread
            for (Future<T> f : futures) ((FutureTask)f).run();
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try { 
                        f.get(); 
                    } catch(CancellationException ignore) {
                    } catch(ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures) 
                    f.cancel(true);
        }
    }

Using this version of invokeAll, the program does not experience thread starvation. If the thread pool is reduced in size to just one thread, the program runs to completion in about 11 seconds, because two threads are contributing to doing the work (the main thread and the thread from the pool).

I discussed this issue with Doug Lea, and he warned me that selecting tasks for efficient scheduling in a fork-join concurrency framework is not trivial; the standard solution is to have a double-ended queue for each worker task where it enqueues its subtasks. The worker removes the most recently generated task from this queue for itself to process, thereby simulating a depth-first execution strategy in the single-thread case. When a worker finds itself without any work to do, it steals work from the other end of the queue of another task. That is, it should steal one of the least-recently created (course-grained) subtasks. In addition, it is beneficial to have a mechanism to avoid the queue altogether for the bottom of the call chain. Doug told me that this strategy was pioneered by the Cilk work, but I first learned about this strategy 10 years earlier reading WorkCrews: An Abstraction for Controlling Parallelism by Mark T. Vandervoorde and Eric S. Roberts. My implementation provides exactly this behavior but with a much simpler implementation. The invocation of run executes one of the tasks most recently generated by the current thread. When a thread has no more work to do, it removes work from the queue of the underlying ExecutorService, which is a FIFO queue, and so it takes the least-recently generated task of all workers. On the other hand, because this implementation shares a single queue among all worker threads, there may be additional synchronization overhead compared to the WorkCrews/Cilk solution.

It is possible to use the existing concurrency utilities to work around the problem, if you don't mind the task scheduling being far from optimal. You can do that by setting CallerRuns policy on a ThreadPoolExecutor, and using a synchronous queue:

static ThreadPoolExecutor threadPool =
  new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS,
                         new SynchronousQueue<Runnable>());
static {
    threadPool.setRejectedExecutionHandler(
        new ThreadPoolExecutor.CallerRunsPolicy());
}

Doug explained to me that the earlier public-domain version of the concurrency utilities had a full implementation of a framework for fork-join parallelism, but they didn't get included in JDK5:

"... The vast majority of such usages are nicest to support as "loop parallelism" utilities. And it is not so much that utilities based on FJ tasks are inconvenient to use that has kept them out, but instead uncertainty about basic APIs until closures are settled. All of the methods for aggregate operations on collections and arrays (applyToAll, map, reduce, mapReduce, findAny, findAll, etc) require function-type arguments (perhaps along with some sort of purity annotation as might be introduced via JSR305) that, depending on how things work out otherwise, would need to be introduced more generally."

Did you think I would get through an entire blog post without mentioning Closures?

13 comments:

axel said...

I think that even with your solution, the program might still deadlock. Assuming the size of your thread pool is equal to or less than N, it might happen that the thread pool is filled with "layerOne" tasks. Now the caller thread comes up and tries to run one of the enqueued tasks. Let's assume this will be a layerTwo task. The layerTwo task will enqueue a layerThree task and will block, waiting to join the layerThree task. The L3 task cannot execute because by now, we have used up all the thread pool threads plus the caller thread.

The problem is that a local change in "invokeAll" will not suffice, if one of the invoked Futures calls "invokeAll" again with the same thread pool.

The CallerRunsPolicy should solve that problem, as it is global per thread pool.

I think it's all related to thread reuse, similar to tail recursion optimization: Where tail recursion optimization reuses the current stack frame for the last procedure call, and thereby avoids overflowing the stack, in this case we reuse the current thread for a thread we would otherwise fork, and thereby avoid overflowing the thread pool.

Neal Gafter said...

axel: I suggest you try to demonstrate the deadlock in my solution. You'll find you can't. The L3 task, in your example, is executed by the same thread that is waiting for it to complete.

axel said...

sorry, that happens when you blog before breakfast. The error was in "will block, waiting to join the layerThree task". Of course it doesn't block, but does the work itself.

The beauty of this puzzler is that the code can be executed by a single thread, because parallel tasks do not need to communicate. The TRO analogy obviously has its limits. Thanks for the brain exercise!

Anonymous said...

Could someone clarify the complex JSE5 generics syntax to someone who is still in 1.4.2:
public [T] List[Future[T]] invokeAll(Collection[Callable[T]] tasks)

invokeAll accepts a Collection of Callable (of T?) and returns... what exactly is returned?

Neal Gafter said...

It returns a list of futures of t.

Anonymous said...

Pretty neato. More puzzlers!

Minkoo said...

How does the following code starts unstarted threads? To me, it seems that the code does not check whether f has already started or not.

for (Future[T] f : futures) ((FutureTask)f).run();

Neal Gafter said...

Minkoo: excellent question. The answer lies in the implementation of FutureTask.run(), which does nothing if the task has already started running.

Jim said...

Hi Neal,
Will the addition of the Deque interface in JDK 6 have any impact on an FJ implementation? Would it make things any simpler or more likely to be included in a future JDK release?

Thanks,
Jim

Neal Gafter said...

Jim: That would be a question to ask Doug Lea, but my understanding is that he has an implementation of the guts of the FJ framework ready to go, once the (language) issues are resolved relating to how its API should be expressed. I don't know if it uses Deque internally, but you can go get the code and check it out yourself!

Jim Bethancourt said...

Neal,
That's great to hear he's got one ready to go. It would definitely be interesting to break the source open and see what Doug has done. Is the source you're referring to in the Oswego concurrency package or a forthcoming JDK release?

Thanks,
Jim

Alex.C.P said...

I have been checking threadpool and also trying out your code. One simple question ; what if tasks hang; I thought this was a pretty standard real time scenario; but did not find much about handling this in the net surprisingly; Also in the snippet you have not shown us how this is handled.Thanks

Neal Gafter said...

@Alex. I suspect I don't understand your question, because here is my first reaction: if your computation hangs, then it won't finish.