diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml
index 600fb2b0f75..23c73037238 100644
--- a/.idea/dictionaries/kimchy.xml
+++ b/.idea/dictionaries/kimchy.xml
@@ -41,6 +41,7 @@
intf
iter
iterable
+ javax
jclouds
jgroups
joda
@@ -85,6 +86,7 @@
streamable
successul
tagline
+ threadpool
throwable
tika
timestamp
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
index df0cc688d89..43f3f2708e5 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
@@ -28,6 +28,28 @@ import java.util.concurrent.*;
*/
public interface ThreadPool {
+ /**
+ * Returns the current number of threads in the pool.
+ *
+ * @return the number of threads
+ */
+ int getPoolSize();
+
+ /**
+ * Returns the approximate number of threads that are actively
+ * executing tasks.
+ *
+ * @return the number of threads
+ */
+ int getActiveCount();
+
+ int getSchedulerPoolSize();
+
+ int getSchedulerActiveCount();
+
+ /**
+ * Returns true if the thread pool has started.
+ */
boolean isStarted();
/**
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java
index 23636bf8e16..cbc5e606d77 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java
@@ -19,10 +19,13 @@
package org.elasticsearch.threadpool;
+import org.elasticsearch.threadpool.cached.CachedThreadPoolModule;
import org.elasticsearch.util.guice.inject.AbstractModule;
-import org.elasticsearch.threadpool.cached.CachedThreadPool;
+import org.elasticsearch.util.guice.inject.Module;
import org.elasticsearch.util.settings.Settings;
+import static org.elasticsearch.util.guice.ModulesFactory.*;
+
/**
* @author kimchy (Shay Banon)
*/
@@ -35,7 +38,7 @@ public class ThreadPoolModule extends AbstractModule {
}
@Override protected void configure() {
- bind(ThreadPool.class)
- .to(settings.getAsClass("threadpool.type", CachedThreadPool.class, "org.elasticsearch.threadpool.", "ThreadPool")).asEagerSingleton();
+ Class extends Module> moduleClass = settings.getAsClass("transport.type", CachedThreadPoolModule.class, "org.elasticsearch.threadpool.", "ThreadPoolModule");
+ createModule(moduleClass, settings).configure(binder());
}
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java
index 2c467cf42f0..32254d29d0f 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java
@@ -19,15 +19,16 @@
package org.elasticsearch.threadpool.blocking;
-import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.threadpool.support.AbstractThreadPool;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.concurrent.DynamicExecutors;
+import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.settings.Settings;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
@@ -39,13 +40,13 @@ import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
*/
public class BlockingThreadPool extends AbstractThreadPool {
- private final int min;
- private final int max;
- private final int capacity;
- private final TimeValue waitTime;
- private final TimeValue keepAlive;
+ final int min;
+ final int max;
+ final int capacity;
+ final TimeValue waitTime;
+ final TimeValue keepAlive;
- private final int scheduledSize;
+ final int scheduledSize;
public BlockingThreadPool() {
this(EMPTY_SETTINGS);
@@ -68,4 +69,20 @@ public class BlockingThreadPool extends AbstractThreadPool {
@Override public String getType() {
return "blocking";
}
+
+ @Override public int getPoolSize() {
+ return ((ThreadPoolExecutor) executorService).getPoolSize();
+ }
+
+ @Override public int getActiveCount() {
+ return ((ThreadPoolExecutor) executorService).getActiveCount();
+ }
+
+ @Override public int getSchedulerPoolSize() {
+ return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize();
+ }
+
+ @Override public int getSchedulerActiveCount() {
+ return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount();
+ }
}
\ No newline at end of file
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolManagement.java
new file mode 100644
index 00000000000..722322d4139
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolManagement.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.threadpool.blocking;
+
+import org.elasticsearch.jmx.MBean;
+import org.elasticsearch.jmx.ManagedAttribute;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.util.guice.inject.Inject;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+@MBean(objectName = "service=threadpool,threadpoolType=blocking", description = "Blocking Thread Pool")
+public class BlockingThreadPoolManagement {
+
+ private final BlockingThreadPool threadPool;
+
+ @Inject public BlockingThreadPoolManagement(ThreadPool threadPool) {
+ this.threadPool = (BlockingThreadPool) threadPool;
+ }
+
+ @ManagedAttribute(description = "Minimum number Of threads")
+ public long getMin() {
+ return threadPool.min;
+ }
+
+ @ManagedAttribute(description = "Maximum number of threads")
+ public int getMax() {
+ return threadPool.max;
+ }
+
+ @ManagedAttribute(description = "Number of scheduler threads")
+ public int getScheduleSize() {
+ return threadPool.scheduledSize;
+ }
+
+ @ManagedAttribute(description = "Thread keep alive")
+ public String getKeepAlive() {
+ return threadPool.keepAlive.format();
+ }
+
+ @ManagedAttribute(description = "Thread keep alive (in seconds)")
+ public long getKeepAliveInSeconds() {
+ return threadPool.keepAlive.seconds();
+ }
+
+ @ManagedAttribute(description = "Current number of threads in the pool")
+ public long getPoolSize() {
+ return threadPool.getPoolSize();
+ }
+
+ @ManagedAttribute(description = "Approximate number of threads that are actively executing tasks")
+ public long getActiveCount() {
+ return threadPool.getActiveCount();
+ }
+
+ @ManagedAttribute(description = "Current number of threads in the scheduler pool")
+ public long getSchedulerPoolSize() {
+ return threadPool.getSchedulerPoolSize();
+ }
+
+ @ManagedAttribute(description = "Approximate number of threads that are actively executing scheduled tasks")
+ public long getSchedulerActiveCount() {
+ return threadPool.getSchedulerActiveCount();
+ }
+}
\ No newline at end of file
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolModule.java
new file mode 100644
index 00000000000..d0fce692860
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolModule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.threadpool.blocking;
+
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.util.guice.inject.AbstractModule;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class BlockingThreadPoolModule extends AbstractModule {
+
+ @Override protected void configure() {
+ bind(ThreadPool.class).to(BlockingThreadPool.class).asEagerSingleton();
+ bind(BlockingThreadPoolManagement.class).asEagerSingleton();
+ }
+}
\ No newline at end of file
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPool.java
index 3ac27e0ce9a..326524d98c1 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPool.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPool.java
@@ -19,10 +19,10 @@
package org.elasticsearch.threadpool.cached;
-import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.threadpool.support.AbstractThreadPool;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.concurrent.DynamicExecutors;
+import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.settings.Settings;
import java.util.concurrent.Executors;
@@ -40,9 +40,9 @@ import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
*/
public class CachedThreadPool extends AbstractThreadPool {
- private final TimeValue keepAlive;
+ final TimeValue keepAlive;
- private final int scheduledSize;
+ final int scheduledSize;
public CachedThreadPool() {
this(EMPTY_SETTINGS);
@@ -64,4 +64,20 @@ public class CachedThreadPool extends AbstractThreadPool {
@Override public String getType() {
return "cached";
}
+
+ @Override public int getPoolSize() {
+ return ((ThreadPoolExecutor) executorService).getPoolSize();
+ }
+
+ @Override public int getActiveCount() {
+ return ((ThreadPoolExecutor) executorService).getActiveCount();
+ }
+
+ @Override public int getSchedulerPoolSize() {
+ return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize();
+ }
+
+ @Override public int getSchedulerActiveCount() {
+ return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount();
+ }
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolManagement.java
new file mode 100644
index 00000000000..36f58f3c585
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolManagement.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.threadpool.cached;
+
+import org.elasticsearch.jmx.MBean;
+import org.elasticsearch.jmx.ManagedAttribute;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.util.guice.inject.Inject;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+@MBean(objectName = "service=threadpool,threadpoolType=cached", description = "Cached Thread Pool")
+public class CachedThreadPoolManagement {
+
+ private final CachedThreadPool threadPool;
+
+ @Inject public CachedThreadPoolManagement(ThreadPool threadPool) {
+ this.threadPool = (CachedThreadPool) threadPool;
+ }
+
+ @ManagedAttribute(description = "Number of scheduler threads")
+ public int getScheduleSize() {
+ return threadPool.scheduledSize;
+ }
+
+ @ManagedAttribute(description = "Thread keep alive")
+ public String getKeepAlive() {
+ return threadPool.keepAlive.format();
+ }
+
+ @ManagedAttribute(description = "Thread keep alive (in seconds)")
+ public long getKeepAliveInSeconds() {
+ return threadPool.keepAlive.seconds();
+ }
+
+ @ManagedAttribute(description = "Current number of threads in the pool")
+ public long getPoolSize() {
+ return threadPool.getPoolSize();
+ }
+
+ @ManagedAttribute(description = "Approximate number of threads that are actively executing tasks")
+ public long getActiveCount() {
+ return threadPool.getActiveCount();
+ }
+
+ @ManagedAttribute(description = "Current number of threads in the scheduler pool")
+ public long getSchedulerPoolSize() {
+ return threadPool.getSchedulerPoolSize();
+ }
+
+ @ManagedAttribute(description = "Approximate number of threads that are actively executing scheduled tasks")
+ public long getSchedulerActiveCount() {
+ return threadPool.getSchedulerActiveCount();
+ }
+}
\ No newline at end of file
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolModule.java
new file mode 100644
index 00000000000..666c037c765
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolModule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.threadpool.cached;
+
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.util.guice.inject.AbstractModule;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class CachedThreadPoolModule extends AbstractModule {
+
+ @Override protected void configure() {
+ bind(ThreadPool.class).to(CachedThreadPool.class).asEagerSingleton();
+ bind(CachedThreadPoolManagement.class).asEagerSingleton();
+ }
+}
\ No newline at end of file
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java
index 55a52e21dde..69533da7fea 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java
@@ -19,13 +19,15 @@
package org.elasticsearch.threadpool.scaling;
-import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.threadpool.support.AbstractThreadPool;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.concurrent.DynamicExecutors;
+import org.elasticsearch.util.concurrent.ScalingThreadPoolExecutor;
+import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.settings.Settings;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
@@ -35,11 +37,12 @@ import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
*/
public class ScalingThreadPool extends AbstractThreadPool {
- private final int min;
- private final int max;
- private final TimeValue keepAlive;
+ final int min;
+ final int max;
+ final TimeValue keepAlive;
+ final TimeValue interval;
- private final int scheduledSize;
+ final int scheduledSize;
public ScalingThreadPool() {
this(EMPTY_SETTINGS);
@@ -47,17 +50,35 @@ public class ScalingThreadPool extends AbstractThreadPool {
@Inject public ScalingThreadPool(Settings settings) {
super(settings);
- this.min = componentSettings.getAsInt("min", 1);
+ this.min = componentSettings.getAsInt("min", 10);
this.max = componentSettings.getAsInt("max", 100);
this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueSeconds(60));
+ this.interval = componentSettings.getAsTime("interval", timeValueSeconds(5));
this.scheduledSize = componentSettings.getAsInt("scheduled_size", 20);
- logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], scheduled_size[{}]", new Object[]{getType(), min, max, keepAlive, scheduledSize});
- executorService = DynamicExecutors.newScalingThreadPool(min, max, keepAlive.millis(), DynamicExecutors.daemonThreadFactory(settings, "[tp]"));
+ logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], scheduled_size[{}]", getType(), min, max, keepAlive, scheduledSize);
scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, DynamicExecutors.daemonThreadFactory(settings, "[sc]"));
+ executorService = new ScalingThreadPoolExecutor(min, max, keepAlive, DynamicExecutors.daemonThreadFactory(settings, "[tp]"), scheduledExecutorService,
+ interval);
started = true;
}
+ @Override public int getPoolSize() {
+ return ((ScalingThreadPoolExecutor) executorService).getPoolSize();
+ }
+
+ @Override public int getActiveCount() {
+ return ((ScalingThreadPoolExecutor) executorService).getActiveCount();
+ }
+
+ @Override public int getSchedulerPoolSize() {
+ return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize();
+ }
+
+ @Override public int getSchedulerActiveCount() {
+ return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount();
+ }
+
@Override public String getType() {
- return "dynamic";
+ return "scaling";
}
}
\ No newline at end of file
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolManagement.java
new file mode 100644
index 00000000000..afe0f0035a5
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolManagement.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.threadpool.scaling;
+
+import org.elasticsearch.jmx.MBean;
+import org.elasticsearch.jmx.ManagedAttribute;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.util.guice.inject.Inject;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+@MBean(objectName = "service=threadpool,threadpoolType=scaling", description = "Scaling Thread Pool")
+public class ScalingThreadPoolManagement {
+
+ private final ScalingThreadPool threadPool;
+
+ @Inject public ScalingThreadPoolManagement(ThreadPool threadPool) {
+ this.threadPool = (ScalingThreadPool) threadPool;
+ }
+
+ @ManagedAttribute(description = "Minimum number Of threads")
+ public long getMin() {
+ return threadPool.min;
+ }
+
+ @ManagedAttribute(description = "Maximum number of threads")
+ public int getMax() {
+ return threadPool.max;
+ }
+
+ @ManagedAttribute(description = "Number of scheduler threads")
+ public int getScheduleSize() {
+ return threadPool.scheduledSize;
+ }
+
+ @ManagedAttribute(description = "Thread keep alive")
+ public String getKeepAlive() {
+ return threadPool.keepAlive.format();
+ }
+
+ @ManagedAttribute(description = "Thread keep alive (in seconds)")
+ public long getKeepAliveInSeconds() {
+ return threadPool.keepAlive.seconds();
+ }
+
+ @ManagedAttribute(description = "Current number of threads in the pool")
+ public long getPoolSize() {
+ return threadPool.getPoolSize();
+ }
+
+ @ManagedAttribute(description = "Approximate number of threads that are actively executing tasks")
+ public long getActiveCount() {
+ return threadPool.getActiveCount();
+ }
+
+ @ManagedAttribute(description = "Current number of threads in the scheduler pool")
+ public long getSchedulerPoolSize() {
+ return threadPool.getSchedulerPoolSize();
+ }
+
+ @ManagedAttribute(description = "Approximate number of threads that are actively executing scheduled tasks")
+ public long getSchedulerActiveCount() {
+ return threadPool.getSchedulerActiveCount();
+ }
+}
\ No newline at end of file
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolModule.java
new file mode 100644
index 00000000000..7092b426643
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolModule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.threadpool.scaling;
+
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.util.guice.inject.AbstractModule;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class ScalingThreadPoolModule extends AbstractModule {
+
+ @Override protected void configure() {
+ bind(ThreadPool.class).to(ScalingThreadPool.class).asEagerSingleton();
+ bind(ScalingThreadPoolManagement.class).asEagerSingleton();
+ }
+}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java
index 1dd5d678b23..347bb62279d 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java
@@ -28,7 +28,7 @@ import org.elasticsearch.util.settings.Settings;
import java.util.concurrent.*;
/**
- * @author kimchy (Shay Banon)
+ * @author kimchy (shay.banon)
*/
public abstract class AbstractThreadPool extends AbstractComponent implements ThreadPool {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ScalingThreadPoolExecutor.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ScalingThreadPoolExecutor.java
new file mode 100644
index 00000000000..bbb0ae4aae6
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ScalingThreadPoolExecutor.java
@@ -0,0 +1,527 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.util.concurrent;
+
+import org.elasticsearch.util.TimeValue;
+import org.elasticsearch.util.concurrent.jsr166y.LinkedTransferQueue;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class ScalingThreadPoolExecutor extends AbstractExecutorService {
+
+ private final BlockingQueue workQueue = new LinkedTransferQueue();
+
+ private final AtomicInteger queueSize = new AtomicInteger();
+
+ /**
+ * Lock held on updates to poolSize, corePoolSize,
+ * maximumPoolSize, runState, and workers set.
+ */
+ private final ReentrantLock mainLock = new ReentrantLock();
+
+ /**
+ * Wait condition to support awaitTermination
+ */
+ private final Condition termination = mainLock.newCondition();
+
+ /**
+ * Set containing all worker threads in pool. Accessed only when
+ * holding mainLock.
+ */
+ private final HashSet workers = new HashSet();
+
+
+ /**
+ * Factory for new threads. All threads are created using this
+ * factory (via method addThread). All callers must be prepared
+ * for addThread to fail by returning null, which may reflect a
+ * system or user's policy limiting the number of threads. Even
+ * though it is not treated as an error, failure to create threads
+ * may result in new tasks being rejected or existing ones
+ * remaining stuck in the queue. On the other hand, no special
+ * precautions exist to handle OutOfMemoryErrors that might be
+ * thrown while trying to create threads, since there is generally
+ * no recourse from within this class.
+ */
+ private final ThreadFactory threadFactory;
+
+ /**
+ * runState provides the main lifecyle control, taking on values:
+ *
+ * RUNNING: Accept new tasks and process queued tasks
+ * SHUTDOWN: Don't accept new tasks, but process queued tasks
+ * STOP: Don't accept new tasks, don't process queued tasks,
+ * and interrupt in-progress tasks
+ * TERMINATED: Same as STOP, plus all threads have terminated
+ *
+ * The numerical order among these values matters, to allow
+ * ordered comparisons. The runState monotonically increases over
+ * time, but need not hit each state. The transitions are:
+ *
+ * RUNNING -> SHUTDOWN
+ * On invocation of shutdown(), perhaps implicitly in finalize()
+ * (RUNNING or SHUTDOWN) -> STOP
+ * On invocation of shutdownNow()
+ * SHUTDOWN -> TERMINATED
+ * When both queue and pool are empty
+ * STOP -> TERMINATED
+ * When pool is empty
+ */
+ volatile int runState;
+ static final int RUNNING = 0;
+ static final int SHUTDOWN = 1;
+ static final int STOP = 2;
+ static final int TERMINATED = 3;
+
+
+ /**
+ * Core pool size, updated only while holding mainLock, but
+ * volatile to allow concurrent readability even during updates.
+ */
+ private final int corePoolSize;
+
+ /**
+ * Maximum pool size, updated only while holding mainLock but
+ * volatile to allow concurrent readability even during updates.
+ */
+ private final int maximumPoolSize;
+
+ /**
+ * Timeout in nanoseconds for idle threads waiting for work.
+ * Threads use this timeout when there are more than corePoolSize
+ * present or if allowCoreThreadTimeOut. Otherwise they wait
+ * forever for new work.
+ */
+ private final long keepAliveTime;
+
+ /**
+ * Current pool size, updated only while holding mainLock but
+ * volatile to allow concurrent readability even during updates.
+ */
+ private volatile int poolSize;
+
+
+ private final ScheduledFuture scheduledFuture;
+
+ public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, TimeValue keepAlive,
+ ThreadFactory threadFactory,
+ ScheduledExecutorService scheduler, TimeValue schedulerInterval) {
+ this.corePoolSize = corePoolSize;
+ this.maximumPoolSize = maximumPoolSize;
+ this.keepAliveTime = keepAlive.nanos();
+ this.threadFactory = threadFactory;
+
+ for (int i = 0; i < corePoolSize; i++) {
+ Thread t = addThread();
+ if (t != null)
+ t.start();
+ }
+
+ this.scheduledFuture = scheduler.scheduleWithFixedDelay(new Scheduler(), schedulerInterval.nanos(),
+ schedulerInterval.nanos(), TimeUnit.NANOSECONDS);
+ }
+
+
+ @Override public void execute(Runnable command) {
+ queueSize.incrementAndGet();
+ workQueue.add(command);
+ }
+
+ @Override public void shutdown() {
+ if (!scheduledFuture.isCancelled()) {
+ scheduledFuture.cancel(false);
+ }
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ int state = runState;
+ if (state < SHUTDOWN)
+ runState = SHUTDOWN;
+
+ try {
+ for (Worker w : workers) {
+ w.interruptIfIdle();
+ }
+ } catch (SecurityException se) { // Try to back out
+ runState = state;
+ // tryTerminate() here would be a no-op
+ throw se;
+ }
+
+ tryTerminate(); // Terminate now if pool and queue empty
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ @Override public List shutdownNow() {
+ if (!scheduledFuture.isCancelled()) {
+ scheduledFuture.cancel(false);
+ }
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ int state = runState;
+ if (state < STOP)
+ runState = STOP;
+
+ try {
+ for (Worker w : workers) {
+ w.interruptNow();
+ }
+ } catch (SecurityException se) { // Try to back out
+ runState = state;
+ // tryTerminate() here would be a no-op
+ throw se;
+ }
+
+ List tasks = drainQueue();
+ tryTerminate(); // Terminate now if pool and queue empty
+ return tasks;
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ @Override public boolean isShutdown() {
+ return runState != RUNNING;
+ }
+
+ @Override public boolean isTerminated() {
+ return runState == TERMINATED;
+ }
+
+ @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ for (; ;) {
+ if (runState == TERMINATED)
+ return true;
+ if (nanos <= 0)
+ return false;
+ nanos = termination.awaitNanos(nanos);
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the current number of threads in the pool.
+ *
+ * @return the number of threads
+ */
+ public int getPoolSize() {
+ return poolSize;
+ }
+
+ /**
+ * Returns the approximate number of threads that are actively
+ * executing tasks.
+ *
+ * @return the number of threads
+ */
+ public int getActiveCount() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ int n = 0;
+ for (Worker w : workers) {
+ if (w.isActive())
+ ++n;
+ }
+ return n;
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ private final class Scheduler implements Runnable {
+ @Override public void run() {
+ if (queueSize.get() > 0 && poolSize < maximumPoolSize) {
+ final ReentrantLock mainLock = ScalingThreadPoolExecutor.this.mainLock;
+ mainLock.lock();
+ try {
+ int currentQueueSize = queueSize.get();
+ if (currentQueueSize > 0 && poolSize < maximumPoolSize) {
+ int incrementBy = currentQueueSize;
+ if (poolSize + incrementBy > maximumPoolSize) {
+ incrementBy = maximumPoolSize - poolSize;
+ }
+ for (int i = 0; i < incrementBy; i++) {
+ Thread t = addThread();
+ if (t != null)
+ t.start();
+ }
+ }
+ } finally {
+ mainLock.unlock();
+ }
+ }
+ }
+ }
+
+
+ private final class Worker implements Runnable {
+ /**
+ * The runLock is acquired and released surrounding each task
+ * execution. It mainly protects against interrupts that are
+ * intended to cancel the worker thread from instead
+ * interrupting the task being run.
+ */
+ private final ReentrantLock runLock = new ReentrantLock();
+
+ /**
+ * Thread this worker is running in. Acts as a final field,
+ * but cannot be set until thread is created.
+ */
+ Thread thread;
+
+ Worker() {
+ }
+
+ boolean isActive() {
+ return runLock.isLocked();
+ }
+
+ /**
+ * Interrupts thread if not running a task.
+ */
+ void interruptIfIdle() {
+ final ReentrantLock runLock = this.runLock;
+ if (runLock.tryLock()) {
+ try {
+ if (thread != Thread.currentThread())
+ thread.interrupt();
+ } finally {
+ runLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Interrupts thread even if running a task.
+ */
+ void interruptNow() {
+ thread.interrupt();
+ }
+
+ /**
+ * Runs a single task between before/after methods.
+ */
+ private void runTask(Runnable task) {
+ final ReentrantLock runLock = this.runLock;
+ runLock.lock();
+ try {
+ /*
+ * Ensure that unless pool is stopping, this thread
+ * does not have its interrupt set. This requires a
+ * double-check of state in case the interrupt was
+ * cleared concurrently with a shutdownNow -- if so,
+ * the interrupt is re-enabled.
+ */
+ if (runState < STOP && Thread.interrupted() && runState >= STOP)
+ thread.interrupt();
+
+ task.run();
+ } finally {
+ runLock.unlock();
+ }
+ }
+
+ /**
+ * Main run loop
+ */
+ public void run() {
+ try {
+ Runnable task;
+ while ((task = getTask()) != null) {
+ runTask(task);
+ }
+ } finally {
+ workerDone(this);
+ }
+ }
+ }
+
+
+ Runnable getTask() {
+ for (; ;) {
+ try {
+ int state = runState;
+ if (state > SHUTDOWN)
+ return null;
+ Runnable r;
+ if (state == SHUTDOWN) // Help drain queue
+ r = workQueue.poll();
+ else if (poolSize > corePoolSize)
+ r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
+ else
+ r = workQueue.take();
+ if (r != null) {
+ queueSize.decrementAndGet();
+ return r;
+ }
+ if (workerCanExit()) {
+ if (runState >= SHUTDOWN) // Wake up others
+ interruptIdleWorkers();
+ return null;
+ }
+ // Else retry
+ } catch (InterruptedException ie) {
+ // On interruption, re-check runState
+ }
+ }
+ }
+
+ /**
+ * Check whether a worker thread that fails to get a task can
+ * exit. We allow a worker thread to die if the pool is stopping,
+ * or the queue is empty, or there is at least one thread to
+ * handle possibly non-empty queue, even if core timeouts are
+ * allowed.
+ */
+ private boolean workerCanExit() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ boolean canExit;
+ try {
+ canExit = runState >= STOP || queueSize.get() == 0;
+ } finally {
+ mainLock.unlock();
+ }
+ return canExit;
+ }
+
+ /**
+ * Wakes up all threads that might be waiting for tasks so they
+ * can check for termination. Note: this method is also called by
+ * ScheduledThreadPoolExecutor.
+ */
+ void interruptIdleWorkers() {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ for (Worker w : workers)
+ w.interruptIfIdle();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Performs bookkeeping for an exiting worker thread.
+ *
+ * @param w the worker
+ */
+ void workerDone(Worker w) {
+ final ReentrantLock mainLock = this.mainLock;
+ mainLock.lock();
+ try {
+ workers.remove(w);
+ if (--poolSize == 0)
+ tryTerminate();
+ } finally {
+ mainLock.unlock();
+ }
+ }
+
+ /**
+ * Transitions to TERMINATED state if either (SHUTDOWN and pool
+ * and queue empty) or (STOP and pool empty), otherwise unless
+ * stopped, ensuring that there is at least one live thread to
+ * handle queued tasks.
+ *
+ * This method is called from the three places in which
+ * termination can occur: in workerDone on exit of the last thread
+ * after pool has been shut down, or directly within calls to
+ * shutdown or shutdownNow, if there are no live threads.
+ */
+ private void tryTerminate() {
+ if (poolSize == 0) {
+ int state = runState;
+ if (state < STOP && queueSize.get() > 0) {
+ state = RUNNING; // disable termination check below
+ Thread t = addThread();
+ if (t != null)
+ t.start();
+ }
+ if (state == STOP || state == SHUTDOWN) {
+ runState = TERMINATED;
+ termination.signalAll();
+ }
+ }
+ }
+
+ /**
+ * Creates and returns a new thread running firstTask as its first
+ * task. Call only while holding mainLock.
+ */
+ private Thread addThread() {
+ Worker w = new Worker();
+ Thread t = threadFactory.newThread(w);
+ if (t != null) {
+ w.thread = t;
+ workers.add(w);
+ ++poolSize;
+ }
+ return t;
+ }
+
+ /**
+ * Drains the task queue into a new list. Used by shutdownNow.
+ * Call only while holding main lock.
+ */
+ private List drainQueue() {
+ List taskList = new ArrayList();
+ workQueue.drainTo(taskList);
+ queueSize.getAndAdd(taskList.size() * -1);
+ /*
+ * If the queue is a DelayQueue or any other kind of queue
+ * for which poll or drainTo may fail to remove some elements,
+ * we need to manually traverse and remove remaining tasks.
+ * To guarantee atomicity wrt other threads using this queue,
+ * we need to create a new iterator for each element removed.
+ */
+ while (!workQueue.isEmpty()) {
+ Iterator it = workQueue.iterator();
+ try {
+ if (it.hasNext()) {
+ Runnable r = it.next();
+ if (workQueue.remove(r)) {
+ taskList.add(r);
+ queueSize.decrementAndGet();
+ }
+ }
+ } catch (ConcurrentModificationException ignore) {
+ }
+ }
+ return taskList;
+ }
+}