Creating a truly scalable Thread Pool with ThreadPoolExecutor
Doug Lea’s ThreadPoolExecutor is a very useful part of the java.util.concurrent library. It implements a configurable thread pool that is easy to use and control.
Thread pools are used to move work that might take longer to complete away from the main application, as well as constraining how much of this work can be done in parallel with the application.
However, when used with an unbounded queue, you will find that ThreadPoolExecutor never allocates more than one thread, even when higher maximum pool sizes are allowed.
The problem is caused by this code block:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
You can see that ThreadPoolExecutor delegates the decision whether to add an additional thread to the pool to the workQueue. This queue however doesn’t know about the ThreadPoolExecutor containing it and, since it’s unbounded, will always accept a new object.
In other words, it is impossible to set up ThreadPoolExecutor so that the number of threads scales up and down dynamically.
Making ThreadPoolExecutor truly scalable
To create a ThreadPoolExecutor instance that scales the number of threads up and down, the BlockingQueue needs to have a reference to the ThreadPoolExecutor and needs to delegate method calls to offer to an underlying BlockingQueue only if the current thread pool size is equal to the ThreadPoolExecutor’s maximum pool size.
In all other cases, the BlockingQueue should return false to indicate that a new thread should be added to the pool.
A Groovy solution
Groovy makes it particularly easy to implement such a queue class. By using the @Delegate annotation, delegating code is generated at compile-time and only overriding methods need to be implemented.
Also, the @TupleConstructor and @Synchronized annotations reduce code bloat and make writing thread-safe code easier.
@TupleConstructor
class DynamicBlockingQueue<E> {
@Delegate
BlockingQueue<E> queue
ThreadPoolExecutor executor
@Synchronized
boolean offer(def e) {
!executor || executor.poolSize == executor.maximumPoolSize ?
queue.offer(e) : false
}
}
Use this class like this:
LinkedBlockingQueue<Runnable> delegate =
new LinkedBlockingQueue<Runnable>()
DynamicBlockingQueue queue = new DynamicBlockingQueue(queue: delegate)
ThreadPoolExecutor executor =
new ThreadPoolExecutor(0, 42 /* maximum number of threads */,
10, TimeUnit.SECONDS, queue)
queue.executor = executor
// Use your ThreadPoolExecutor
Maven Users
For everybody using Maven, my grabbag project already includes the class above. Add a dependency to Groovy 1.8.0 or greater, this artifact…
<dependency> <groupId>com.eaio</groupId> <artifactId>grabbag</artifactId> <version>1.6.2</version> </dependency>
… and use the com.eaio.concurrent.DynamicBlockingQueue class.

