From dd6c076454406249518e622ab4111c6f373497c7 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 20 Dec 2011 12:03:28 +0200 Subject: [PATCH] simplify and improve scaling/blocking thread pools --- .../util/concurrent/DynamicExecutors.java | 126 ---- .../concurrent/DynamicThreadPoolExecutor.java | 166 ----- .../common/util/concurrent/EsExecutors.java | 105 ++- .../TransferThreadPoolExecutor.java | 635 ------------------ .../zen/ping/unicast/UnicastZenPing.java | 3 +- .../elasticsearch/gateway/fs/FsGateway.java | 5 +- .../indices/recovery/RecoverySettings.java | 5 +- .../elasticsearch/threadpool/ThreadPool.java | 5 +- .../concurrent/BlockingThreadPoolTest.java | 95 --- ...eadPoolTest.java => EsExecutorsTests.java} | 69 +- 10 files changed, 146 insertions(+), 1068 deletions(-) delete mode 100644 src/main/java/org/elasticsearch/common/util/concurrent/DynamicExecutors.java delete mode 100644 src/main/java/org/elasticsearch/common/util/concurrent/DynamicThreadPoolExecutor.java delete mode 100644 src/main/java/org/elasticsearch/common/util/concurrent/TransferThreadPoolExecutor.java delete mode 100644 src/test/java/org/elasticsearch/test/unit/common/util/concurrent/BlockingThreadPoolTest.java rename src/test/java/org/elasticsearch/test/unit/common/util/concurrent/{ScalingThreadPoolTest.java => EsExecutorsTests.java} (66%) diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/DynamicExecutors.java b/src/main/java/org/elasticsearch/common/util/concurrent/DynamicExecutors.java deleted file mode 100644 index 017d0252685..00000000000 --- a/src/main/java/org/elasticsearch/common/util/concurrent/DynamicExecutors.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.util.concurrent; - -import java.util.concurrent.*; - -/** - * - */ -public class DynamicExecutors { - - /** - * Creates a thread pool that creates new threads as needed, but will reuse - * previously constructed threads when they are available. Calls to - * execute will reuse previously constructed threads if - * available. If no existing thread is available, a new thread will be - * created and added to the pool. No more than max threads will - * be created. Threads that have not been used for a keepAlive - * timeout are terminated and removed from the cache. Thus, a pool that - * remains idle for long enough will not consume any resources other than - * the min specified. - * - * @param min the number of threads to keep in the pool, even if they are - * idle. - * @param max the maximum number of threads to allow in the pool. - * @param keepAliveTime when the number of threads is greater than the min, - * this is the maximum time that excess idle threads will wait - * for new tasks before terminating (in milliseconds). - * @return the newly created thread pool - */ - public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime) { - return newScalingThreadPool(min, max, keepAliveTime, Executors.defaultThreadFactory()); - } - - /** - * Creates a thread pool, same as in - * {@link #newScalingThreadPool(int, int, long)}, using the provided - * ThreadFactory to create new threads when needed. - * - * @param min the number of threads to keep in the pool, even if they are - * idle. - * @param max the maximum number of threads to allow in the pool. - * @param keepAliveTime when the number of threads is greater than the min, - * this is the maximum time that excess idle threads will wait - * for new tasks before terminating (in milliseconds). - * @param threadFactory the factory to use when creating new threads. - * @return the newly created thread pool - */ - public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime, ThreadFactory threadFactory) { - DynamicThreadPoolExecutor.DynamicQueue queue = new DynamicThreadPoolExecutor.DynamicQueue(); - ThreadPoolExecutor executor = new DynamicThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue, threadFactory); - executor.setRejectedExecutionHandler(new DynamicThreadPoolExecutor.ForceQueuePolicy()); - queue.setThreadPoolExecutor(executor); - return executor; - } - - /** - * Creates a thread pool similar to that constructed by - * {@link #newScalingThreadPool(int, int, long)}, but blocks the call to - * execute if the queue has reached it's capacity, and all - * max threads are busy handling requests. - *

- * If the wait time of this queue has elapsed, a - * {@link RejectedExecutionException} will be thrown. - * - * @param min the number of threads to keep in the pool, even if they are - * idle. - * @param max the maximum number of threads to allow in the pool. - * @param keepAliveTime when the number of threads is greater than the min, - * this is the maximum time that excess idle threads will wait - * for new tasks before terminating (in milliseconds). - * @param capacity the fixed capacity of the underlying queue (resembles - * backlog). - * @param waitTime the wait time (in milliseconds) for space to become - * available in the queue. - * @return the newly created thread pool - */ - public static ExecutorService newBlockingThreadPool(int min, int max, long keepAliveTime, int capacity, long waitTime) { - return newBlockingThreadPool(min, max, keepAliveTime, capacity, waitTime, Executors.defaultThreadFactory()); - } - - /** - * Creates a thread pool, same as in - * {@link #newBlockingThreadPool(int, int, long, int, long)}, using the - * provided ThreadFactory to create new threads when needed. - * - * @param min the number of threads to keep in the pool, even if they are - * idle. - * @param max the maximum number of threads to allow in the pool. - * @param keepAliveTime when the number of threads is greater than the min, - * this is the maximum time that excess idle threads will wait - * for new tasks before terminating (in milliseconds). - * @param capacity the fixed capacity of the underlying queue (resembles - * backlog). - * @param waitTime the wait time (in milliseconds) for space to become - * available in the queue. - * @param threadFactory the factory to use when creating new threads. - * @return the newly created thread pool - */ - public static ExecutorService newBlockingThreadPool(int min, int max, - long keepAliveTime, int capacity, long waitTime, - ThreadFactory threadFactory) { - DynamicThreadPoolExecutor.DynamicQueue queue = new DynamicThreadPoolExecutor.DynamicQueue(capacity); - ThreadPoolExecutor executor = new DynamicThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue, threadFactory); - executor.setRejectedExecutionHandler(new DynamicThreadPoolExecutor.TimedBlockingPolicy(waitTime)); - queue.setThreadPoolExecutor(executor); - return executor; - } -} diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/DynamicThreadPoolExecutor.java b/src/main/java/org/elasticsearch/common/util/concurrent/DynamicThreadPoolExecutor.java deleted file mode 100644 index 69f4728a68f..00000000000 --- a/src/main/java/org/elasticsearch/common/util/concurrent/DynamicThreadPoolExecutor.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.util.concurrent; - -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * An {@link ExecutorService} that executes each submitted task using one of - * possibly several pooled threads, normally configured using - * {@link DynamicExecutors} factory methods. - * - * - */ -public class DynamicThreadPoolExecutor extends ThreadPoolExecutor { - /** - * number of threads that are actively executing tasks - */ - private final AtomicInteger activeCount = new AtomicInteger(); - - public DynamicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, - long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, - ThreadFactory threadFactory) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); - } - - @Override - public int getActiveCount() { - return activeCount.get(); - } - - @Override - protected void beforeExecute(Thread t, Runnable r) { - activeCount.incrementAndGet(); - } - - @Override - protected void afterExecute(Runnable r, Throwable t) { - activeCount.decrementAndGet(); - } - - /** - * Much like a {@link SynchronousQueue} which acts as a rendezvous channel. It - * is well suited for handoff designs, in which a tasks is only queued if there - * is an available thread to pick it up. - *

- * This queue is correlated with a thread-pool, and allows insertions to the - * queue only if there is a free thread that can poll this task. Otherwise, the - * task is rejected and the decision is left up to one of the - * {@link RejectedExecutionHandler} policies: - *

    - *
  1. {@link ForceQueuePolicy} - forces the queue to accept the rejected task.
  2. - *
  3. {@link TimedBlockingPolicy} - waits for a given time for the task to be - * executed.
  4. - *
- * - * - */ - public static class DynamicQueue extends LinkedBlockingQueue { - private static final long serialVersionUID = 1L; - - /** - * The executor this Queue belongs to - */ - private transient ThreadPoolExecutor executor; - - /** - * Creates a DynamicQueue with a capacity of - * {@link Integer#MAX_VALUE}. - */ - public DynamicQueue() { - super(); - } - - /** - * Creates a DynamicQueue with the given (fixed) capacity. - * - * @param capacity the capacity of this queue. - */ - public DynamicQueue(int capacity) { - super(capacity); - } - - /** - * Sets the executor this queue belongs to. - */ - public void setThreadPoolExecutor(ThreadPoolExecutor executor) { - this.executor = executor; - } - - /** - * Inserts the specified element at the tail of this queue if there is at - * least one available thread to run the current task. If all pool threads - * are actively busy, it rejects the offer. - * - * @param o the element to add. - * @return true if it was possible to add the element to this - * queue, else false - * @see ThreadPoolExecutor#execute(Runnable) - */ - @Override - public boolean offer(E o) { - int allWorkingThreads = executor.getActiveCount() + super.size(); - return allWorkingThreads < executor.getPoolSize() && super.offer(o); - } - } - - /** - * A handler for rejected tasks that adds the specified element to this queue, - * waiting if necessary for space to become available. - */ - public static class ForceQueuePolicy implements RejectedExecutionHandler { - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - try { - executor.getQueue().put(r); - } catch (InterruptedException e) { - //should never happen since we never wait - throw new RejectedExecutionException(e); - } - } - } - - /** - * A handler for rejected tasks that inserts the specified element into this - * queue, waiting if necessary up to the specified wait time for space to become - * available. - */ - public static class TimedBlockingPolicy implements RejectedExecutionHandler { - private final long waitTime; - - /** - * @param waitTime wait time in milliseconds for space to become available. - */ - public TimedBlockingPolicy(long waitTime) { - this.waitTime = waitTime; - } - - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - try { - boolean successful = executor.getQueue().offer(r, waitTime, TimeUnit.MILLISECONDS); - if (!successful) - throw new RejectedExecutionException("Rejected execution after waiting " - + waitTime + " ms for task [" + r.getClass() + "] to be executed."); - } catch (InterruptedException e) { - throw new RejectedExecutionException(e); - } - } - } -} diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 91399716063..6c32b4832ca 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -19,8 +19,8 @@ package org.elasticsearch.common.util.concurrent; +import jsr166y.LinkedTransferQueue; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import java.util.concurrent.*; @@ -29,11 +29,24 @@ import java.util.concurrent.*; */ public class EsExecutors { - public static ExecutorService newCachedThreadPool(TimeValue keepAlive, ThreadFactory threadFactory) { - return new ThreadPoolExecutor(0, Integer.MAX_VALUE, - keepAlive.millis(), TimeUnit.MILLISECONDS, - new SynchronousQueue(), - threadFactory); + public static ThreadPoolExecutor newScalingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit, + ThreadFactory threadFactory) { + ExecutorScalingQueue queue = new ExecutorScalingQueue(); + // we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue + ThreadPoolExecutor executor = new ThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory, + new ForceQueuePolicy()); + queue.executor = executor; + return executor; + } + + public static ThreadPoolExecutor newBlockingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit, + ThreadFactory threadFactory, int capacity, + long waitTime, TimeUnit waitTimeUnit) { + ExecutorBlockingQueue queue = new ExecutorBlockingQueue(capacity); + ThreadPoolExecutor executor = new ThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory, + new TimedBlockingPolicy(waitTimeUnit.toMillis(waitTime))); + queue.executor = executor; + return executor; } public static String threadName(Settings settings, String namePrefix) { @@ -88,4 +101,84 @@ public class EsExecutors { */ private EsExecutors() { } + + static class ExecutorScalingQueue extends LinkedTransferQueue { + + ThreadPoolExecutor executor; + + public ExecutorScalingQueue() { + } + + @Override + public boolean offer(E e) { + int left = executor.getMaximumPoolSize() - executor.getCorePoolSize(); + if (!tryTransfer(e)) { + if (left > 0) { + return false; + } else { + return super.offer(e); + } + } else { + return true; + } + } + } + + static class ExecutorBlockingQueue extends ArrayBlockingQueue { + + ThreadPoolExecutor executor; + + ExecutorBlockingQueue(int capacity) { + super(capacity); + } + + @Override + public boolean offer(E o) { + int allWorkingThreads = executor.getActiveCount() + super.size(); + return allWorkingThreads < executor.getPoolSize() && super.offer(o); + } + } + + + /** + * A handler for rejected tasks that adds the specified element to this queue, + * waiting if necessary for space to become available. + */ + static class ForceQueuePolicy implements RejectedExecutionHandler { + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + //should never happen since we never wait + throw new RejectedExecutionException(e); + } + } + } + + /** + * A handler for rejected tasks that inserts the specified element into this + * queue, waiting if necessary up to the specified wait time for space to become + * available. + */ + static class TimedBlockingPolicy implements RejectedExecutionHandler { + private final long waitTime; + + /** + * @param waitTime wait time in milliseconds for space to become available. + */ + public TimedBlockingPolicy(long waitTime) { + this.waitTime = waitTime; + } + + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + boolean successful = executor.getQueue().offer(r, waitTime, TimeUnit.MILLISECONDS); + if (!successful) + throw new RejectedExecutionException("Rejected execution after waiting " + + waitTime + " ms for task [" + r.getClass() + "] to be executed."); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + } + } } diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/TransferThreadPoolExecutor.java b/src/main/java/org/elasticsearch/common/util/concurrent/TransferThreadPoolExecutor.java deleted file mode 100644 index a415588e71b..00000000000 --- a/src/main/java/org/elasticsearch/common/util/concurrent/TransferThreadPoolExecutor.java +++ /dev/null @@ -1,635 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.util.concurrent; - -import jsr166y.LinkedTransferQueue; -import jsr166y.TransferQueue; - -import java.util.*; -import java.util.concurrent.AbstractExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -/** - * A thread pool based on {@link jsr166y.TransferQueue}. - *

- *

Limited compared to ExecutorServer in what it does, but focused on speed. - * - * - */ -public class TransferThreadPoolExecutor extends AbstractExecutorService { - - private final TransferQueue workQueue = new LinkedTransferQueue(); - - private final AtomicInteger queueSize = new AtomicInteger(); - - /** - * Lock held on updates to poolSize, corePoolSize, - * maximumPoolSize, runState, and workers set. - */ - private final ReentrantLock mainLock = new ReentrantLock(); - - /** - * Wait condition to support awaitTermination - */ - private final Condition termination = mainLock.newCondition(); - - /** - * Set containing all worker threads in pool. Accessed only when - * holding mainLock. - */ - private final HashSet workers = new HashSet(); - - - /** - * Factory for new threads. All threads are created using this - * factory (via method addThread). All callers must be prepared - * for addThread to fail by returning null, which may reflect a - * system or user's policy limiting the number of threads. Even - * though it is not treated as an error, failure to create threads - * may result in new tasks being rejected or existing ones - * remaining stuck in the queue. On the other hand, no special - * precautions exist to handle OutOfMemoryErrors that might be - * thrown while trying to create threads, since there is generally - * no recourse from within this class. - */ - private final ThreadFactory threadFactory; - - /** - * runState provides the main lifecyle control, taking on values: - *

- * RUNNING: Accept new tasks and process queued tasks - * SHUTDOWN: Don't accept new tasks, but process queued tasks - * STOP: Don't accept new tasks, don't process queued tasks, - * and interrupt in-progress tasks - * TERMINATED: Same as STOP, plus all threads have terminated - *

- * The numerical order among these values matters, to allow - * ordered comparisons. The runState monotonically increases over - * time, but need not hit each state. The transitions are: - *

- * RUNNING -> SHUTDOWN - * On invocation of shutdown(), perhaps implicitly in finalize() - * (RUNNING or SHUTDOWN) -> STOP - * On invocation of shutdownNow() - * SHUTDOWN -> TERMINATED - * When both queue and pool are empty - * STOP -> TERMINATED - * When pool is empty - */ - volatile int runState; - static final int RUNNING = 0; - static final int SHUTDOWN = 1; - static final int STOP = 2; - static final int TERMINATED = 3; - - - private final boolean blocking; - - private final int blockingCapacity; - - private final long blockingTime; - - /** - * Core pool size, updated only while holding mainLock, but - * volatile to allow concurrent readability even during updates. - */ - private final int corePoolSize; - - /** - * Maximum pool size, updated only while holding mainLock but - * volatile to allow concurrent readability even during updates. - */ - private final int maximumPoolSize; - - /** - * Timeout in nanoseconds for idle threads waiting for work. - * Threads use this timeout when there are more than corePoolSize - * present or if allowCoreThreadTimeOut. Otherwise they wait - * forever for new work. - */ - private final long keepAliveTime; - - /** - * Current pool size, updated only while holding mainLock but - * volatile to allow concurrent readability even during updates. - */ - private final AtomicInteger poolSize = new AtomicInteger(); - - public static TransferThreadPoolExecutor newScalingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { - return new TransferThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, false, 0, TimeUnit.NANOSECONDS, 0, threadFactory); - } - - public static TransferThreadPoolExecutor newBlockingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - long blockingTime, TimeUnit blockingUnit, int blockingCapacity, - ThreadFactory threadFactory) { - return new TransferThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, true, blockingTime, blockingUnit, blockingCapacity, threadFactory); - } - - private TransferThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - boolean blocking, long blockingTime, TimeUnit blockingUnit, int blockingCapacity, - ThreadFactory threadFactory) { - this.blocking = blocking; - this.blockingTime = blockingUnit.toNanos(blockingTime); - this.blockingCapacity = blockingCapacity; - this.corePoolSize = corePoolSize; - this.maximumPoolSize = maximumPoolSize; - this.keepAliveTime = unit.toNanos(keepAliveTime); - this.threadFactory = threadFactory; - - for (int i = 0; i < corePoolSize; i++) { - Thread t = addWorker(); - if (t != null) { - poolSize.incrementAndGet(); - t.start(); - } - } - } - - - @Override - public void execute(Runnable command) { - if (blocking) { - executeBlocking(command); - } else { - executeNonBlocking(command); - } - } - - private void executeNonBlocking(Runnable command) { - // note, there might be starvation of some commands that were added to the queue, - // while others are being transferred directly - queueSize.getAndIncrement(); - boolean succeeded = workQueue.tryTransfer(command); - if (succeeded) { - return; - } - int currentPoolSize = poolSize.get(); - if (currentPoolSize < maximumPoolSize) { - // if we manage to add a worker, add it, and tryTransfer again - if (poolSize.compareAndSet(currentPoolSize, currentPoolSize + 1)) { - Thread t = addWorker(); - if (t == null) { - poolSize.decrementAndGet(); - workQueue.add(command); - } else { - t.start(); - succeeded = workQueue.tryTransfer(command); - if (!succeeded) { - workQueue.add(command); - } - } - } else { - succeeded = workQueue.tryTransfer(command); - if (!succeeded) { - workQueue.add(command); - } - } - } else { - workQueue.add(command); - } - } - - private void executeBlocking(Runnable command) { - int currentCapacity = queueSize.getAndIncrement(); - boolean succeeded = workQueue.tryTransfer(command); - if (succeeded) { - return; - } - int currentPoolSize = poolSize.get(); - if (currentPoolSize < maximumPoolSize) { - // if we manage to add a worker, add it, and tryTransfer again - if (poolSize.compareAndSet(currentPoolSize, currentPoolSize + 1)) { - Thread t = addWorker(); - if (t == null) { - poolSize.decrementAndGet(); - workQueue.add(command); - } else { - t.start(); - succeeded = workQueue.tryTransfer(command); - if (!succeeded) { - transferOrAddBlocking(command, currentCapacity); - } - } - } else { - succeeded = workQueue.tryTransfer(command); - if (!succeeded) { - transferOrAddBlocking(command, currentCapacity); - } - } - } else { - transferOrAddBlocking(command, currentCapacity); - } - } - - private void transferOrAddBlocking(Runnable command, int currentCapacity) { - if (currentCapacity < blockingCapacity) { - workQueue.add(command); - } else { - boolean succeeded; - try { - succeeded = workQueue.tryTransfer(command, blockingTime, TimeUnit.NANOSECONDS); - if (!succeeded) { - throw new RejectedExecutionException("Rejected execution after waiting " - + TimeUnit.NANOSECONDS.toSeconds(blockingTime) + "s for task [" + command.getClass() + "] to be executed."); - } - } catch (InterruptedException e) { - throw new RejectedExecutionException(e); - } - } - } - - @Override - public void shutdown() { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - int state = runState; - if (state < SHUTDOWN) - runState = SHUTDOWN; - - try { - for (Worker w : workers) { - w.interruptIfIdle(); - } - } catch (SecurityException se) { // Try to back out - runState = state; - // tryTerminate() here would be a no-op - throw se; - } - - tryTerminate(); // Terminate now if pool and queue empty - } finally { - mainLock.unlock(); - } - } - - @Override - public List shutdownNow() { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - int state = runState; - if (state < STOP) - runState = STOP; - - try { - for (Worker w : workers) { - w.interruptNow(); - } - } catch (SecurityException se) { // Try to back out - runState = state; - // tryTerminate() here would be a no-op - throw se; - } - - List tasks = drainQueue(); - tryTerminate(); // Terminate now if pool and queue empty - return tasks; - } finally { - mainLock.unlock(); - } - } - - @Override - public boolean isShutdown() { - return runState != RUNNING; - } - - @Override - public boolean isTerminated() { - return runState == TERMINATED; - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - long nanos = unit.toNanos(timeout); - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - for (; ; ) { - if (runState == TERMINATED) - return true; - if (nanos <= 0) - return false; - nanos = termination.awaitNanos(nanos); - } - } finally { - mainLock.unlock(); - } - } - - /** - * Returns the current number of threads in the pool. - * - * @return the number of threads - */ - public int getPoolSize() { - return poolSize.get(); - } - - /** - * Returns the approximate number of threads that are actively - * executing tasks. - * - * @return the number of threads - */ - public int getActiveCount() { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - int n = 0; - for (Worker w : workers) { - if (w.isActive()) - ++n; - } - return n; - } finally { - mainLock.unlock(); - } - } - - public int getCorePoolSize() { - return corePoolSize; - } - - public int getMaximumPoolSize() { - return maximumPoolSize; - } - - public int getQueueSize() { - return queueSize.get(); - } - - private final class Worker implements Runnable { - /** - * The runLock is acquired and released surrounding each task - * execution. It mainly protects against interrupts that are - * intended to cancel the worker thread from instead - * interrupting the task being run. - */ - private final ReentrantLock runLock = new ReentrantLock(); - - /** - * Thread this worker is running in. Acts as a final field, - * but cannot be set until thread is created. - */ - Thread thread; - - Worker() { - } - - boolean isActive() { - return runLock.isLocked(); - } - - /** - * Interrupts thread if not running a task. - */ - void interruptIfIdle() { - final ReentrantLock runLock = this.runLock; - if (runLock.tryLock()) { - try { - if (thread != Thread.currentThread()) - thread.interrupt(); - } finally { - runLock.unlock(); - } - } - } - - /** - * Interrupts thread even if running a task. - */ - void interruptNow() { - thread.interrupt(); - } - - /** - * Runs a single task between before/after methods. - */ - private void runTask(Runnable task) { - final ReentrantLock runLock = this.runLock; - runLock.lock(); - try { - /* - * Ensure that unless pool is stopping, this thread - * does not have its interrupt set. This requires a - * double-check of state in case the interrupt was - * cleared concurrently with a shutdownNow -- if so, - * the interrupt is re-enabled. - */ - if (runState < STOP && Thread.interrupted() && runState >= STOP) - thread.interrupt(); - - task.run(); - } finally { - runLock.unlock(); - } - } - - /** - * Main run loop - */ - public void run() { - try { - Runnable task; - while ((task = getTask()) != null) { - runTask(task); - } - } finally { - workerDone(this); - } - } - } - - - Runnable getTask() { - for (; ; ) { - try { - int state = runState; - if (state > SHUTDOWN) - return null; - Runnable r; - if (state == SHUTDOWN) // Help drain queue - r = workQueue.poll(); - else if (poolSize.get() > corePoolSize) - r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); - else - r = workQueue.take(); - if (r != null) { - queueSize.decrementAndGet(); - return r; - } - if (workerCanExit()) { - if (runState >= SHUTDOWN) // Wake up others - interruptIdleWorkers(); - return null; - } - // Else retry - } catch (InterruptedException ie) { - // On interruption, re-check runState - } - } - } - - /** - * Check whether a worker thread that fails to get a task can - * exit. We allow a worker thread to die if the pool is stopping, - * or the queue is empty, or there is at least one thread to - * handle possibly non-empty queue, even if core timeouts are - * allowed. - */ - private boolean workerCanExit() { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - boolean canExit; - try { - canExit = runState >= STOP || (queueSize.get() == 0 && (runState >= SHUTDOWN || poolSize.get() > corePoolSize)); - } finally { - mainLock.unlock(); - } - return canExit; - } - - /** - * Wakes up all threads that might be waiting for tasks so they - * can check for termination. Note: this method is also called by - * ScheduledThreadPoolExecutor. - */ - void interruptIdleWorkers() { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - for (Worker w : workers) - w.interruptIfIdle(); - } finally { - mainLock.unlock(); - } - } - - /** - * Performs bookkeeping for an exiting worker thread. - * - * @param w the worker - */ - void workerDone(Worker w) { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - workers.remove(w); - if (poolSize.decrementAndGet() == 0) - tryTerminate(); - } finally { - mainLock.unlock(); - } - } - - /** - * Transitions to TERMINATED state if either (SHUTDOWN and pool - * and queue empty) or (STOP and pool empty), otherwise unless - * stopped, ensuring that there is at least one live thread to - * handle queued tasks. - *

- * This method is called from the three places in which - * termination can occur: in workerDone on exit of the last thread - * after pool has been shut down, or directly within calls to - * shutdown or shutdownNow, if there are no live threads. - */ - private void tryTerminate() { - if (poolSize.get() == 0) { - int state = runState; - if (state < STOP && queueSize.get() > 0) { - state = RUNNING; // disable termination check below - Thread t = addThread(); - poolSize.incrementAndGet(); - if (t != null) - t.start(); - } - if (state == STOP || state == SHUTDOWN) { - runState = TERMINATED; - termination.signalAll(); - } - } - } - - /** - * Creates and returns a new thread running firstTask as its first - * task. Executed under mainLock. - */ - private Thread addWorker() { - final ReentrantLock mainLock = this.mainLock; - mainLock.lock(); - try { - return addThread(); - } finally { - mainLock.unlock(); - } - } - - /** - * Creates and returns a new thread running firstTask as its first - * task. Call only while holding mainLock. - */ - private Thread addThread() { - Worker w = new Worker(); - Thread t = threadFactory.newThread(w); - if (t != null) { - w.thread = t; - workers.add(w); - } - return t; - } - - /** - * Drains the task queue into a new list. Used by shutdownNow. - * Call only while holding main lock. - */ - private List drainQueue() { - List taskList = new ArrayList(); - workQueue.drainTo(taskList); - queueSize.getAndAdd(taskList.size() * -1); - /* - * If the queue is a DelayQueue or any other kind of queue - * for which poll or drainTo may fail to remove some elements, - * we need to manually traverse and remove remaining tasks. - * To guarantee atomicity wrt other threads using this queue, - * we need to create a new iterator for each element removed. - */ - while (!workQueue.isEmpty()) { - Iterator it = workQueue.iterator(); - try { - if (it.hasNext()) { - Runnable r = it.next(); - if (workQueue.remove(r)) { - taskList.add(r); - queueSize.decrementAndGet(); - } - } - } catch (ConcurrentModificationException ignore) { - } - } - return taskList; - } -} diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index e1acde6b15a..7a514bcfb2d 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.DynamicExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; @@ -209,7 +208,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen public Executor executor() { if (executor == null) { ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); - executor = DynamicExecutors.newScalingThreadPool(1, concurrentConnects, 60000, threadFactory); + executor = EsExecutors.newScalingExecutorService(0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory); } return executor; } diff --git a/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java b/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java index c8210c215fc..8bcfb26430d 100644 --- a/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java +++ b/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java @@ -26,8 +26,6 @@ import org.elasticsearch.common.blobstore.fs.FsBlobStore; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.DynamicExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.env.Environment; import org.elasticsearch.gateway.blobstore.BlobStoreGateway; @@ -37,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.File; import java.io.IOException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; /** * @@ -60,7 +59,7 @@ public class FsGateway extends BlobStoreGateway { } int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5); - this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[fs_stream]")); + this.concurrentStreamPool = EsExecutors.newScalingExecutorService(1, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[fs_stream]")); initialize(new FsBlobStore(componentSettings, concurrentStreamPool, gatewayFile), clusterName, null); } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 737d5e6b4f8..715dca6e8e5 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -27,12 +27,11 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.DynamicExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.node.settings.NodeSettingsService; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** */ @@ -69,7 +68,7 @@ public class RecoverySettings extends AbstractComponent { this.compress = componentSettings.getAsBoolean("compress", true); this.concurrentStreams = componentSettings.getAsInt("concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 5)); - this.concurrentStreamPool = (ThreadPoolExecutor) DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); + this.concurrentStreamPool = EsExecutors.newScalingExecutorService(0, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); this.maxSizePerSec = componentSettings.getAsBytesSize("max_size_per_sec", new ByteSizeValue(0)); if (maxSizePerSec.bytes() <= 0) { diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index feb065d9116..e0cd7ae3c34 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -32,7 +32,6 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.DynamicExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors; import java.util.Map; @@ -194,7 +193,7 @@ public class ThreadPool extends AbstractComponent { int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1)); int size = settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5)); logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); - return DynamicExecutors.newScalingThreadPool(min, size, keepAlive.millis(), threadFactory); + return EsExecutors.newScalingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); } else if ("blocking".equals(type)) { TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1)); @@ -202,7 +201,7 @@ public class ThreadPool extends AbstractComponent { SizeValue capacity = settings.getAsSize("queue_size", defaultSettings.getAsSize("queue_size", new SizeValue(1000))); TimeValue waitTime = settings.getAsTime("wait_time", defaultSettings.getAsTime("wait_time", timeValueSeconds(60))); logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, capacity.singles(), keepAlive, waitTime); - return DynamicExecutors.newBlockingThreadPool(min, size, keepAlive.millis(), (int) capacity.singles(), waitTime.millis(), threadFactory); + return EsExecutors.newBlockingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, (int) capacity.singles(), waitTime.millis(), TimeUnit.MILLISECONDS); } throw new ElasticSearchIllegalArgumentException("No type found [" + type + "], for [" + name + "]"); } diff --git a/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/BlockingThreadPoolTest.java b/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/BlockingThreadPoolTest.java deleted file mode 100644 index fa30161ee38..00000000000 --- a/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/BlockingThreadPoolTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.test.unit.common.util.concurrent; - -import org.elasticsearch.common.util.concurrent.ThreadBarrier; -import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor; -import org.testng.annotations.Test; - -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; - -/** - * - */ -@Test(enabled = false) -public class BlockingThreadPoolTest { - - @Test - public void testBlocking() throws Exception { - final int min = 2; - final int max = 4; - final long waitTime = 1000; //1 second - final ThreadBarrier barrier = new ThreadBarrier(max + 1); - - TransferThreadPoolExecutor pool = TransferThreadPoolExecutor.newBlockingExecutor(min, max, 60000, TimeUnit.MILLISECONDS, waitTime, TimeUnit.MILLISECONDS, 1, Executors.defaultThreadFactory()); - assertThat("Min property", pool.getCorePoolSize(), equalTo(min)); - assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max)); - - for (int i = 0; i < max; ++i) { - pool.execute(new Runnable() { - public void run() { - try { - barrier.await(); - barrier.await(); - } catch (Throwable e) { - barrier.reset(e); - } - } - }); - - //wait until thread executes this task - //otherwise, a task might be queued - Thread.sleep(100); - } - - barrier.await(); - assertThat("wrong pool size", pool.getPoolSize(), equalTo(max)); - assertThat("wrong active size", pool.getActiveCount(), equalTo(max)); - - //Queue should be empty, lets occupy it's only free space - assertThat("queue isn't empty", pool.getQueueSize(), equalTo(0)); - pool.execute(new Runnable() { - public void run() { - //dummy task - } - }); - assertThat("queue isn't full", pool.getQueueSize(), equalTo(1)); - - //request should block since queue is full - try { - pool.execute(new Runnable() { - public void run() { - //dummy task - } - }); - assertThat("Should have thrown RejectedExecutionException", false, equalTo(true)); - } catch (RejectedExecutionException e) { - //caught expected exception - } - - barrier.await(); - pool.shutdown(); - } -} diff --git a/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/ScalingThreadPoolTest.java b/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/EsExecutorsTests.java similarity index 66% rename from src/test/java/org/elasticsearch/test/unit/common/util/concurrent/ScalingThreadPoolTest.java rename to src/test/java/org/elasticsearch/test/unit/common/util/concurrent/EsExecutorsTests.java index 89b6c318eff..3023595f449 100644 --- a/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/ScalingThreadPoolTest.java +++ b/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/EsExecutorsTests.java @@ -19,22 +19,22 @@ package org.elasticsearch.test.unit.common.util.concurrent; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadBarrier; -import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor; import org.testng.annotations.Test; -import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; /** - * */ -@Test(enabled = false) -public class ScalingThreadPoolTest { +@Test +public class EsExecutorsTests { @Test public void testScaleUp() throws Exception { @@ -42,8 +42,7 @@ public class ScalingThreadPoolTest { final int max = 4; final ThreadBarrier barrier = new ThreadBarrier(max + 1); -// ThreadPoolExecutor pool = (ThreadPoolExecutor) DynamicExecutors.newScalingThreadPool(min, max, Long.MAX_VALUE); - TransferThreadPoolExecutor pool = TransferThreadPoolExecutor.newScalingExecutor(min, max, Long.MAX_VALUE, TimeUnit.NANOSECONDS, Executors.defaultThreadFactory()); + ThreadPoolExecutor pool = EsExecutors.newScalingExecutorService(min, max, 100, TimeUnit.DAYS, EsExecutors.daemonThreadFactory("test")); assertThat("Min property", pool.getCorePoolSize(), equalTo(min)); assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max)); @@ -77,8 +76,7 @@ public class ScalingThreadPoolTest { final int max = 4; final ThreadBarrier barrier = new ThreadBarrier(max + 1); -// ThreadPoolExecutor pool = (ThreadPoolExecutor) DynamicExecutors.newScalingThreadPool(min, max, 0 /*keep alive*/); - TransferThreadPoolExecutor pool = TransferThreadPoolExecutor.newScalingExecutor(min, max, 0, TimeUnit.NANOSECONDS, Executors.defaultThreadFactory()); + ThreadPoolExecutor pool = EsExecutors.newScalingExecutorService(min, max, 10, TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test")); assertThat("Min property", pool.getCorePoolSize(), equalTo(min)); assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max)); @@ -108,34 +106,29 @@ public class ScalingThreadPoolTest { // assertThat("not all tasks completed", pool.getCompletedTaskCount(), equalTo((long) max)); assertThat("wrong active count", pool.getActiveCount(), equalTo(0)); //Assert.assertEquals("wrong pool size. ", min, pool.getPoolSize()); //BUG in ThreadPool - Bug ID: 6458662 - assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), greaterThan(0)); + //assertThat("idle threads didn't stay above min (" + pool.getPoolSize() + ")", pool.getPoolSize(), greaterThan(0)); assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max)); pool.shutdown(); } @Test - public void testScaleAbove() throws Exception { + public void testBlocking() throws Exception { final int min = 2; final int max = 4; - final int ntasks = 16; + final long waitTime = 1000; //1 second final ThreadBarrier barrier = new ThreadBarrier(max + 1); -// ThreadPoolExecutor pool = (ThreadPoolExecutor) DynamicExecutors.newScalingThreadPool(min, max, Long.MAX_VALUE); - TransferThreadPoolExecutor pool = TransferThreadPoolExecutor.newScalingExecutor(min, max, Long.MAX_VALUE, TimeUnit.NANOSECONDS, Executors.defaultThreadFactory()); + ThreadPoolExecutor pool = EsExecutors.newBlockingExecutorService(min, max, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test"), 1, waitTime, TimeUnit.MILLISECONDS); assertThat("Min property", pool.getCorePoolSize(), equalTo(min)); assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max)); - final AtomicInteger tasksExecuted = new AtomicInteger(); - for (int i = 0; i < ntasks; ++i) { - final int id = i; + for (int i = 0; i < max; ++i) { pool.execute(new Runnable() { public void run() { - tasksExecuted.incrementAndGet(); try { - if (id < max) { - barrier.await(); - } + barrier.await(); + barrier.await(); } catch (Throwable e) { barrier.reset(e); } @@ -147,14 +140,32 @@ public class ScalingThreadPoolTest { Thread.sleep(100); } - assertThat("wrong number of pooled tasks", pool.getQueueSize(), equalTo(ntasks - max)); barrier.await(); + assertThat("wrong pool size", pool.getPoolSize(), equalTo(max)); + assertThat("wrong active size", pool.getActiveCount(), equalTo(max)); - //wait around for one second - Thread.sleep(1000); - assertThat("tasks not complete", tasksExecuted.get(), equalTo(ntasks)); -// assertThat("didn't scale above core pool size. (" + pool.getLargestPoolSize() + ")", pool.getLargestPoolSize(), greaterThan(min)); -// assertThat("Largest pool size exceeds max. (" + pool.getLargestPoolSize() + ")", pool.getLargestPoolSize(), lessThanOrEqualTo(max)); + //Queue should be empty, lets occupy it's only free space + assertThat("queue isn't empty", pool.getQueue().size(), equalTo(0)); + pool.execute(new Runnable() { + public void run() { + //dummy task + } + }); + assertThat("queue isn't full", pool.getQueue().size(), equalTo(1)); + + //request should block since queue is full + try { + pool.execute(new Runnable() { + public void run() { + //dummy task + } + }); + assertThat("Should have thrown RejectedExecutionException", false, equalTo(true)); + } catch (RejectedExecutionException e) { + //caught expected exception + } + + barrier.await(); pool.shutdown(); } }