Friday, October 13, 2006

Concurrent Loops Using Java Closures

The java.util.concurrent package has very nice support for writing concurrent loops. I used it recently to improve the performance of some code in the Google Calendar Server. In the Calendar Server, we have a class representing an event on a calendar. The following method on an event computed the set of attendees to the event. The old code looked something like this:

public Collection<Principal> getAttendees() {
    List<Principal> result = new ArrayList<Principal>();
    for (EventResponse r : getResponses()) {
        if (r.mayAttend()) {
           
result.add(r.getAttendee());
        }

    }
    return result;
}

The problem with this code, it turned out, was that getAttendee() has to talk to a remote server to look up the Principal in a database, and although the remote server is very fast, it is far enough away that there is some latency. Since this loop is sequential, when there are many responses it spends most of its time waiting for a response from the remote server, over and over again. Consequently the call to getAttendees() was very slow for events with many attendees. One solution to this kind of problem would be to send a single batch request, and that is probably the best long-term solution to the problem, but the server in this case doesn't yet know how to handle batch requests. So our fix to the immediate performance problem was to use java.util.concurrent and make the requests in parallel; this did speed things up significantly:

public Collection<Principal> getAttendees() {
    final List<Principal> result = new ArrayList<Principal>();
    CompletionService<Void> ecs =
        new ExecutorCompletionService<Void>(threadPool);
    for (final EventResponse r : getResponses()) {
        ecs.submit(new Callable<Void>() {
            public Void call() {
                if (r.mayAttend()) {
                    try {

                        Principal attendee = r.getAttendee();
                        synchronized (result) {
                            result.add(attendee);
                        }
                    } catch (Throwable ex) {
                        LOGGER.log(Level.SEVERE, "Uncaught Exception", ex);
                    }
                }
                return null;
            }
        });
    }
    // wait for the tasks to complete
    for (final EventResponse r : getResponses()) {
        try {
            /*discard*/ ecs.take().get();
        } catch (InterruptedException ex) {
            throw new AssertionError(ex); // shouldn't happen
        } catch (ExecutionException ex) {
            LOGGER.log(Level.SEVERE, "Uncaught Exception", ex);
        }
    }
    return result;
}

When I find code like this I have a few reactions. First, my eyes glaze over at the complexity. Then, if I'm interested, I look carefully at the code to try to understand it. After all, I'm likely to want to do something like this again someday. Finally, I bookmark it so I can add it to my bag of tricks.
I don't think writing a concurrent loop should have to be so complex. Here is what I would like to have written:

public Collection<Principal> getAttendees() {
    List<Principal> result = new ArrayList<Principal>();
    for eachConcurrently(EventResponse r : getResponses(), threadPool) {
        if (r.mayAttend()) {
            Principal attendee = r.getAttendee();
            synchronized (result) {
                result.add(attendee);
            }
        }
    }
    return result;
}

You might think that in order to do this kind of thing we would need to add a concurrent looping statement to the language. Actually, it is possible to add the concurrent looping construct as a library method if you have closures in the language! It isn't trivial to write, but that's why we have people like Doug Lea in the world. An API like this "for eachConcurrently" thing should be written once by an expert and placed into the JDK for everyone to use.

What should happen if you use continue, break, or return within the body of this loop, or throw an exception? The continue case is easy: it just completes execution of that one iteration, and the other iterations proceed on their merry way. The semantics of break are a bit subtle, but obvious once you realize this is supposed to act like a loop: it completes the current iteration and cancels any other iterations that have not completed, and control returns to the caller of for eachConcurrently. Handling a return statement is similar: it cancels any uncompleted iterations and returns from the enclosing method, which in this case would be getAttendees. Finally, any exception that propagates out of a loop iteration cancels uncompleted iterations and propagates from the loop.

p.s.: Martin Buchholz offered the follow improved version of the code using java.util.concurrent:

public Collection<Principal> getAttendees() {
    final Collection<Principal> result
        = new ConcurrentLinkedQueue<Principal>();

    final Collection<Callable<Object>> tasks
        = new ArrayList<Callable<Object>>();
    for (final EventResponse r : getResponses())
        tasks.add(Executors.callable(new Runnable() { public void run() {
            try {
                if (r.mayAttend())
                    result.add(r.getAttendee());
            } catch (Throwable ex) {
                LOGGER.log(Level.SEVERE, "Uncaught Exception", ex);
            }}}));
    try {
        threadpool.invokeAll(tasks);
    } catch (InterruptedException ex) {}
    return new ArrayList<Principal>(result);

}

20 comments:

Chris Quenelle said...

If you've got a language
with closures, then what I would prefer to write instead of that for-each line is something like this:

getResponses().eachConcurrently(threadPool)

I was reading about Scala recently. I stumbled across a reference while I was reading about your Java
closures proposal. The motivation behind Scala is 100% on track. We need a modern functional language that is well integrated with Java/C#. There are some tasks that a functional language is better designed for, and some tasks for a procedural language. You should be able to mix them together when needed.

Anonymous said...

Seems a good few people(i guess...) are interested in collection.forXXX(....){....} than for-eachXXXX(....){....}
style. What are the advantages of for-eachxxxx looping.?

Neal Gafter said...

The advantage is that it is possible to do without breaking backward compatibility.

Brian Oxley said...

For more Smalltalkish-style looping (collection.forEach(something interesting on each item)), try looking at the JAggregate library:

http://jaggregate.sf.net/

Anonymous said...

Unfortunately Martin's version doesn't quite compile:
1) the call to Executors.callable(Runnable) returns a Callable<Object&rt; rather than the required Callable<Void&rt;. I think Executors.callable(Runnable, T) is what's required here, but is there a nicer alternative to passing '(Void) null' as the second argument (it's a bit obscure)?
2) the call to threadpool.invokeAll(tasks) needs to be wrapped in a try...catch.

Once the additional exception handling has been added the code starts to approach the level of complexity illustrated by your sans-closures concurrent example.

Having said that, neither the original code nor your for-eachConcurrently example wrap r.getAttendee() in a try...catch as your Java 5 concurrent example does, so unless I'm missing the point there's a slight case of comparing apples-and-oranges in the exception handling anyway. But I can see that if it did throw some sort of exception, closures should make it as natural to deal with in the concurrent case as it would be in the original non-concurrent case. It also sounds as though the semantics of the other control-transfer statements would be very natural, and useful.

I like what I see in your for-eachConcurrently example, because I can understand it at a glance, but what I'm trying to get my head around is exactly which of the benefits are due to adding just closures, which to adding closures and a new method, and which to adding closures, a new method, and a new variation of the for statement.

For example, could I not write:

public Collection<Principal&rt; getAttendees() {
List<Principal&rt; result = new ArrayList<Principal&rt;();
Collections.eachConcurrently(getResponses(), threadPool, (EventResponse r) {
if (r.mayAttend()) {
Principal attendee = r.getAttendee();
synchronized (result) {
result.add(attendee);
}
}
});
return result;
}

Either way, I do find the for-eachConcurrently version to be more readable, and I'm now starting to think that IDEs may actually be able to use the 'for' qualifier on methods to aid API discoverability to some extent after all.

One final question: if the getAttendees() method was required to return a List<Principal&rt;, holding the Principals in the order that their responses were received (assuming getResponses() returned an appropriate List<EventResponse&rt;) - ie. first-come-first-served, do you think a variation of eachConcurrently could be crafted to facilitate this? Obviously the results could be sorted once they've all been retrieved, but I'm curious to see how far this idea could go.

Interesting stuff.

Matthias said...

Neal, how would you achieve the semantics for continue, break, ... you are referring to? I don't understand how that would be possible with your closure proposal.

Otherwise, you're indeed comparing apples to oranges. An inner class version is minimally longer.

Jochen "blackdrag" Theodorou said...

I compared Groovy and Java here a little. Just to show people the difference

Anonymous said...

I'd prefer the getResponses().eachConcurrently(threadPool) style as well.
It seems to abstract away the loop better, so you get rid of some more boilerplate and only really have to "plug-in" the parts which are different in your particular usage. After all, that's a large part of doing abstraction.

Neil - you mentioned an advantage of not breaking backwards compatibility. What do you mean by this?

Neal Gafter said...

Calum: backward compatibility is broken by adding methods to an existing interface, because existing implementations of that interface don't provide an implementation of the new method.

Anonymous said...

In terms of backwards compatibility and adding methods to existing interfaces, how about "extension methods"?

So if you had a static method m1(a1, a2, a3) with an appropriate annotation to indicate that it's an "extension method", then you can call this method as a1.m1(a2, a3) - i.e. write it as if you were dispatching on the first argument.

In this way "new" methods could be added to existing classes/interfaces.

Ben Lings said...

Can I second Calum's request for extension methods as part of the closures proposal?

Plus: for a closure with only a single expression, eg

Given

@Extension
static void forEach(Iterable<T> source, {T => Void} block);
List<String> strings;

This

forEach(String item : strings) {
System.out.println(item);
}

could be written

forEach(String item : strings)
System.out.println(item);

or

forEach(strings, item => System.out.println(item));

or, with an extension method

strings.forEach(item => System.out.println(item));

or possibly even

strings.forEach() { item => System.out.println(item)};

(where I've also allowed the type of the closure's parameters to be inferred from the declared type in the function arguments)

Fireblaze said...

Your concurrent code should look like this:

public Collection>Principal< getAttendees2() {
ExecutorService executor = Executors.newFixedThreadPool(5);
final Collection>Principal< result = new ConcurrentLinkedQueue>Principal<();
for (final EventResponse r : getResponses()) {
executor.submit(new Runnable() { public void run() {
if (r.mayAttend()) {
result.add(r.getAttendee());
}
}});
try {
executor.awaitTermination(0, TimeUnit.SECONDS);
} catch (InterruptedException ignore) {}
}
return result;
}

Neal Gafter said...

@Fireblaze: creating threads for every execution of this concurrent loop is much more expensive than this solution.

Fireblaze said...

You can get the threads from your magic "threadPool". My point was that your concurrent code was bloated with unnessery code. And that my code is much smaller and easier to understand.

Fireblaze said...

Ehh you could just put the
"ExecutorService executor = Executors.newFixedThreadPool(5);" where your have your "threadpool".
Or just read the JavaDocs for Executors.newFixedThreadPool =>
"Creates a thread pool that reuses a fixed set of threads operating off a shared unbounded queue." So the "creating threads for every execution" your refer to is where exactly?

Neal Gafter said...

@Fireblaze: your code creates 5 threads every time the loop is executed.

Fireblaze said...

ExecutorService executor = Executors.newFixedThreadPool(5);
public Collection<Principal> getAttendees2() {
final Collection<Principal> result = new ConcurrentLinkedQueue>Principal<();
for (final EventResponse r : getResponses()) {
executor.submit(new Runnable() { public void run() {
if (r.mayAttend()) {
result.add(r.getAttendee());
}
}});
try {
executor.awaitTermination(0, TimeUnit.SECONDS);
} catch (InterruptedException ignore) {}
}
return result;
}

ExecutorService creation was in the wrong place... should be one line up.

Neal Gafter said...

@Fireblaze: the awaitTermination method's documentation is this: "Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first."

Since your timeout is zero seconds, the call is a no-op. Even if you change the timeout, you did not give a shutdown request, so the first of three conditions will not occur.

Andreas Hollmann said...

Hi,
With higher processor-number the parallel-loops becomes really important and should be integrated in JDK!
The work of Doug Lea is really cool! Here is an Open Source implementation of parallel-for, -foreach loops based on fork/join framework of Doug Lea (Appache License 2.0) parallel-for

Neal Gafter said...

@Andreas: that's useful, but unfortunately much more difficult to use, syntactically, than the language's built-in constructs. You also can't easily convert sequential to concurrent loops, or vice versa, because they are expressed so differently. That is one of the primary reasons to support this syntax.