diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java
index 24496205c26..b48b603ee64 100644
--- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java
+++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java
@@ -68,7 +68,7 @@ public class SingleThreadBulkStress {
StopWatch stopWatch = new StopWatch().start();
int COUNT = 200000;
- int BATCH = 1000;
+ int BATCH = 100;
System.out.println("Indexing [" + COUNT + "] ...");
int ITERS = COUNT / BATCH;
int i = 1;
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/util/concurrent/DynamicExecutors.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/util/concurrent/DynamicExecutors.java
new file mode 100644
index 00000000000..5ea7fd7f09a
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/util/concurrent/DynamicExecutors.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.util.concurrent;
+
+import java.util.concurrent.*;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class DynamicExecutors {
+
+ /**
+ * Creates a thread pool that creates new threads as needed, but will reuse
+ * previously constructed threads when they are available. Calls to
+ * execute will reuse previously constructed threads if
+ * available. If no existing thread is available, a new thread will be
+ * created and added to the pool. No more than max threads will
+ * be created. Threads that have not been used for a keepAlive
+ * timeout are terminated and removed from the cache. Thus, a pool that
+ * remains idle for long enough will not consume any resources other than
+ * the min specified.
+ *
+ * @param min the number of threads to keep in the pool, even if they are
+ * idle.
+ * @param max the maximum number of threads to allow in the pool.
+ * @param keepAliveTime when the number of threads is greater than the min,
+ * this is the maximum time that excess idle threads will wait
+ * for new tasks before terminating (in milliseconds).
+ * @return the newly created thread pool
+ */
+ public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime) {
+ return newScalingThreadPool(min, max, keepAliveTime, Executors.defaultThreadFactory());
+ }
+
+ /**
+ * Creates a thread pool, same as in
+ * {@link #newScalingThreadPool(int, int, long)}, using the provided
+ * ThreadFactory to create new threads when needed.
+ *
+ * @param min the number of threads to keep in the pool, even if they are
+ * idle.
+ * @param max the maximum number of threads to allow in the pool.
+ * @param keepAliveTime when the number of threads is greater than the min,
+ * this is the maximum time that excess idle threads will wait
+ * for new tasks before terminating (in milliseconds).
+ * @param threadFactory the factory to use when creating new threads.
+ * @return the newly created thread pool
+ */
+ public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime, ThreadFactory threadFactory) {
+ DynamicThreadPoolExecutor.DynamicQueue queue = new DynamicThreadPoolExecutor.DynamicQueue();
+ ThreadPoolExecutor executor = new DynamicThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue, threadFactory);
+ executor.setRejectedExecutionHandler(new DynamicThreadPoolExecutor.ForceQueuePolicy());
+ queue.setThreadPoolExecutor(executor);
+ return executor;
+ }
+
+ /**
+ * Creates a thread pool similar to that constructed by
+ * {@link #newScalingThreadPool(int, int, long)}, but blocks the call to
+ * execute if the queue has reached it's capacity, and all
+ * max threads are busy handling requests.
+ *
+ * If the wait time of this queue has elapsed, a
+ * {@link RejectedExecutionException} will be thrown.
+ *
+ * @param min the number of threads to keep in the pool, even if they are
+ * idle.
+ * @param max the maximum number of threads to allow in the pool.
+ * @param keepAliveTime when the number of threads is greater than the min,
+ * this is the maximum time that excess idle threads will wait
+ * for new tasks before terminating (in milliseconds).
+ * @param capacity the fixed capacity of the underlying queue (resembles
+ * backlog).
+ * @param waitTime the wait time (in milliseconds) for space to become
+ * available in the queue.
+ * @return the newly created thread pool
+ */
+ public static ExecutorService newBlockingThreadPool(int min, int max, long keepAliveTime, int capacity, long waitTime) {
+ return newBlockingThreadPool(min, max, keepAliveTime, capacity, waitTime, Executors.defaultThreadFactory());
+ }
+
+ /**
+ * Creates a thread pool, same as in
+ * {@link #newBlockingThreadPool(int, int, long, int, long)}, using the
+ * provided ThreadFactory to create new threads when needed.
+ *
+ * @param min the number of threads to keep in the pool, even if they are
+ * idle.
+ * @param max the maximum number of threads to allow in the pool.
+ * @param keepAliveTime when the number of threads is greater than the min,
+ * this is the maximum time that excess idle threads will wait
+ * for new tasks before terminating (in milliseconds).
+ * @param capacity the fixed capacity of the underlying queue (resembles
+ * backlog).
+ * @param waitTime the wait time (in milliseconds) for space to become
+ * available in the queue.
+ * @param threadFactory the factory to use when creating new threads.
+ * @return the newly created thread pool
+ */
+ public static ExecutorService newBlockingThreadPool(int min, int max,
+ long keepAliveTime, int capacity, long waitTime,
+ ThreadFactory threadFactory) {
+ DynamicThreadPoolExecutor.DynamicQueue queue = new DynamicThreadPoolExecutor.DynamicQueue(capacity);
+ ThreadPoolExecutor executor = new DynamicThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue, threadFactory);
+ executor.setRejectedExecutionHandler(new DynamicThreadPoolExecutor.TimedBlockingPolicy(waitTime));
+ queue.setThreadPoolExecutor(executor);
+ return executor;
+ }
+}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/util/concurrent/DynamicThreadPoolExecutor.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/util/concurrent/DynamicThreadPoolExecutor.java
new file mode 100644
index 00000000000..b92428b1073
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/util/concurrent/DynamicThreadPoolExecutor.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.util.concurrent;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An {@link ExecutorService} that executes each submitted task using one of
+ * possibly several pooled threads, normally configured using
+ * {@link DynamicExecutors} factory methods.
+ *
+ * @author kimchy (shay.banon)
+ */
+public class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
+ /**
+ * number of threads that are actively executing tasks
+ */
+ private final AtomicInteger activeCount = new AtomicInteger();
+
+ public DynamicThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
+ long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,
+ ThreadFactory threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ }
+
+ @Override public int getActiveCount() {
+ return activeCount.get();
+ }
+
+ @Override protected void beforeExecute(Thread t, Runnable r) {
+ activeCount.incrementAndGet();
+ }
+
+ @Override protected void afterExecute(Runnable r, Throwable t) {
+ activeCount.decrementAndGet();
+ }
+
+ /**
+ * Much like a {@link SynchronousQueue} which acts as a rendezvous channel. It
+ * is well suited for handoff designs, in which a tasks is only queued if there
+ * is an available thread to pick it up.
+ *
+ * This queue is correlated with a thread-pool, and allows insertions to the
+ * queue only if there is a free thread that can poll this task. Otherwise, the
+ * task is rejected and the decision is left up to one of the
+ * {@link RejectedExecutionHandler} policies:
+ *
+ * - {@link ForceQueuePolicy} - forces the queue to accept the rejected task.
+ * - {@link TimedBlockingPolicy} - waits for a given time for the task to be
+ * executed.
+ *
+ *
+ * @author kimchy (Shay Banon)
+ */
+ public static class DynamicQueue extends LinkedBlockingQueue {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The executor this Queue belongs to
+ */
+ private transient ThreadPoolExecutor executor;
+
+ /**
+ * Creates a DynamicQueue with a capacity of
+ * {@link Integer#MAX_VALUE}.
+ */
+ public DynamicQueue() {
+ super();
+ }
+
+ /**
+ * Creates a DynamicQueue with the given (fixed) capacity.
+ *
+ * @param capacity the capacity of this queue.
+ */
+ public DynamicQueue(int capacity) {
+ super(capacity);
+ }
+
+ /**
+ * Sets the executor this queue belongs to.
+ */
+ public void setThreadPoolExecutor(ThreadPoolExecutor executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue if there is at
+ * least one available thread to run the current task. If all pool threads
+ * are actively busy, it rejects the offer.
+ *
+ * @param o the element to add.
+ * @return true if it was possible to add the element to this
+ * queue, else false
+ * @see ThreadPoolExecutor#execute(Runnable)
+ */
+ @Override
+ public boolean offer(E o) {
+ int allWorkingThreads = executor.getActiveCount() + super.size();
+ return allWorkingThreads < executor.getPoolSize() && super.offer(o);
+ }
+ }
+
+ /**
+ * A handler for rejected tasks that adds the specified element to this queue,
+ * waiting if necessary for space to become available.
+ */
+ public static class ForceQueuePolicy implements RejectedExecutionHandler {
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {
+ //should never happen since we never wait
+ throw new RejectedExecutionException(e);
+ }
+ }
+ }
+
+ /**
+ * A handler for rejected tasks that inserts the specified element into this
+ * queue, waiting if necessary up to the specified wait time for space to become
+ * available.
+ */
+ public static class TimedBlockingPolicy implements RejectedExecutionHandler {
+ private final long waitTime;
+
+ /**
+ * @param waitTime wait time in milliseconds for space to become available.
+ */
+ public TimedBlockingPolicy(long waitTime) {
+ this.waitTime = waitTime;
+ }
+
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ boolean successful = executor.getQueue().offer(r, waitTime, TimeUnit.MILLISECONDS);
+ if (!successful)
+ throw new RejectedExecutionException("Rejected execution after waiting "
+ + waitTime + " ms for task [" + r.getClass() + "] to be executed.");
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException(e);
+ }
+ }
+ }
+}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java
index 9c7f662789c..33a5dfa8a6c 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java
@@ -25,7 +25,7 @@ import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.Modules;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.threadpool.cached.CachedThreadPoolModule;
+import org.elasticsearch.threadpool.scaling.ScalingThreadPoolModule;
/**
* @author kimchy (shay.banon)
@@ -39,7 +39,7 @@ public class ThreadPoolModule extends AbstractModule implements SpawnModules {
}
@Override public Iterable extends Module> spawnModules() {
- return ImmutableList.of(Modules.createModule(settings.getAsClass("threadpool.type", CachedThreadPoolModule.class, "org.elasticsearch.threadpool.", "ThreadPoolModule"), settings));
+ return ImmutableList.of(Modules.createModule(settings.getAsClass("threadpool.type", ScalingThreadPoolModule.class, "org.elasticsearch.threadpool.", "ThreadPoolModule"), settings));
}
@Override protected void configure() {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java
index 0f7aa5a8dc7..63a7be5a2ae 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java
@@ -23,13 +23,12 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor;
import org.elasticsearch.threadpool.support.AbstractThreadPool;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
import static org.elasticsearch.common.unit.TimeValue.*;
@@ -56,20 +55,16 @@ public class BlockingThreadPool extends AbstractThreadPool {
@Inject public BlockingThreadPool(Settings settings) {
super(settings);
this.scheduledSize = componentSettings.getAsInt("scheduled_size", 20);
- this.min = componentSettings.getAsInt("min", 1);
- int max = componentSettings.getAsInt("max", 100);
- if (max < 10) {
- logger.warn("blocking threadpool max threads [{}] must not be lower than 10, setting it to 10", max);
- max = 10;
- }
- this.max = max;
+ this.min = componentSettings.getAsInt("min", 10);
+ this.max = componentSettings.getAsInt("max", 100);
// capacity is set to 0 as it might cause starvation in blocking mode
this.capacity = (int) componentSettings.getAsSize("capacity", new SizeValue(0)).singles();
this.waitTime = componentSettings.getAsTime("wait_time", timeValueSeconds(60));
this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueSeconds(60));
logger.debug("initializing {} thread pool with min[{}], max[{}], keep_alive[{}], capacity[{}], wait_time[{}], scheduled_size[{}]", getType(), min, max, keepAlive, capacity, waitTime, scheduledSize);
- executorService = TransferThreadPoolExecutor.newBlockingExecutor(min, max, keepAlive.millis(), TimeUnit.MILLISECONDS, waitTime.millis(), TimeUnit.MILLISECONDS, capacity, EsExecutors.daemonThreadFactory(settings, "[tp]"));
+// executorService = TransferThreadPoolExecutor.newBlockingExecutor(min, max, keepAlive.millis(), TimeUnit.MILLISECONDS, waitTime.millis(), TimeUnit.MILLISECONDS, capacity, EsExecutors.daemonThreadFactory(settings, "[tp]"));
+ executorService = DynamicExecutors.newBlockingThreadPool(min, max, keepAlive.millis(), capacity, waitTime.millis(), EsExecutors.daemonThreadFactory(settings, "[tp]"));
scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]"));
cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]"));
started = true;
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java
index d2f0d4de985..8cc59cf0acd 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java
@@ -22,13 +22,13 @@ package org.elasticsearch.threadpool.scaling;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor;
import org.elasticsearch.threadpool.support.AbstractThreadPool;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
import static org.elasticsearch.common.unit.TimeValue.*;
@@ -56,7 +56,8 @@ public class ScalingThreadPool extends AbstractThreadPool {
this.scheduledSize = componentSettings.getAsInt("scheduled_size", 20);
logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], scheduled_size[{}]", getType(), min, max, keepAlive, scheduledSize);
scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]"));
- executorService = TransferThreadPoolExecutor.newScalingExecutor(min, max, keepAlive.nanos(), TimeUnit.NANOSECONDS, EsExecutors.daemonThreadFactory(settings, "[tp]"));
+// executorService = TransferThreadPoolExecutor.newScalingExecutor(min, max, keepAlive.nanos(), TimeUnit.NANOSECONDS, EsExecutors.daemonThreadFactory(settings, "[tp]"));
+ executorService = DynamicExecutors.newScalingThreadPool(min, max, keepAlive.millis(), EsExecutors.daemonThreadFactory(settings, "[tp]"));
cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]"));
started = true;
}