From 35c8774304d6460dd3b201dd6dc8994c1ee7434d Mon Sep 17 00:00:00 2001 From: kimchy Date: Mon, 10 May 2010 01:13:16 +0300 Subject: [PATCH] start work on a more scalable thread pool, expose its status over jmx --- .idea/dictionaries/kimchy.xml | 2 + .../elasticsearch/threadpool/ThreadPool.java | 22 + .../threadpool/ThreadPoolModule.java | 9 +- .../blocking/BlockingThreadPool.java | 31 +- .../BlockingThreadPoolManagement.java | 83 +++ .../blocking/BlockingThreadPoolModule.java | 34 ++ .../threadpool/cached/CachedThreadPool.java | 22 +- .../cached/CachedThreadPoolManagement.java | 73 +++ .../cached/CachedThreadPoolModule.java | 34 ++ .../threadpool/scaling/ScalingThreadPool.java | 39 +- .../scaling/ScalingThreadPoolManagement.java | 83 +++ .../scaling/ScalingThreadPoolModule.java | 34 ++ .../support/AbstractThreadPool.java | 2 +- .../concurrent/ScalingThreadPoolExecutor.java | 527 ++++++++++++++++++ 14 files changed, 972 insertions(+), 23 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolManagement.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolModule.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolManagement.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolModule.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolManagement.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolModule.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ScalingThreadPoolExecutor.java diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml index 600fb2b0f75..23c73037238 100644 --- a/.idea/dictionaries/kimchy.xml +++ b/.idea/dictionaries/kimchy.xml @@ -41,6 +41,7 @@ intf iter iterable + javax jclouds jgroups joda @@ -85,6 +86,7 @@ streamable successul tagline + threadpool throwable tika timestamp diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index df0cc688d89..43f3f2708e5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -28,6 +28,28 @@ import java.util.concurrent.*; */ public interface ThreadPool { + /** + * Returns the current number of threads in the pool. + * + * @return the number of threads + */ + int getPoolSize(); + + /** + * Returns the approximate number of threads that are actively + * executing tasks. + * + * @return the number of threads + */ + int getActiveCount(); + + int getSchedulerPoolSize(); + + int getSchedulerActiveCount(); + + /** + * Returns true if the thread pool has started. + */ boolean isStarted(); /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java index 23636bf8e16..cbc5e606d77 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java @@ -19,10 +19,13 @@ package org.elasticsearch.threadpool; +import org.elasticsearch.threadpool.cached.CachedThreadPoolModule; import org.elasticsearch.util.guice.inject.AbstractModule; -import org.elasticsearch.threadpool.cached.CachedThreadPool; +import org.elasticsearch.util.guice.inject.Module; import org.elasticsearch.util.settings.Settings; +import static org.elasticsearch.util.guice.ModulesFactory.*; + /** * @author kimchy (Shay Banon) */ @@ -35,7 +38,7 @@ public class ThreadPoolModule extends AbstractModule { } @Override protected void configure() { - bind(ThreadPool.class) - .to(settings.getAsClass("threadpool.type", CachedThreadPool.class, "org.elasticsearch.threadpool.", "ThreadPool")).asEagerSingleton(); + Class moduleClass = settings.getAsClass("transport.type", CachedThreadPoolModule.class, "org.elasticsearch.threadpool.", "ThreadPoolModule"); + createModule(moduleClass, settings).configure(binder()); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java index 2c467cf42f0..32254d29d0f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java @@ -19,15 +19,16 @@ package org.elasticsearch.threadpool.blocking; -import org.elasticsearch.util.guice.inject.Inject; import org.elasticsearch.threadpool.support.AbstractThreadPool; import org.elasticsearch.util.SizeUnit; import org.elasticsearch.util.SizeValue; import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.concurrent.DynamicExecutors; +import org.elasticsearch.util.guice.inject.Inject; import org.elasticsearch.util.settings.Settings; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import static org.elasticsearch.util.TimeValue.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; @@ -39,13 +40,13 @@ import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; */ public class BlockingThreadPool extends AbstractThreadPool { - private final int min; - private final int max; - private final int capacity; - private final TimeValue waitTime; - private final TimeValue keepAlive; + final int min; + final int max; + final int capacity; + final TimeValue waitTime; + final TimeValue keepAlive; - private final int scheduledSize; + final int scheduledSize; public BlockingThreadPool() { this(EMPTY_SETTINGS); @@ -68,4 +69,20 @@ public class BlockingThreadPool extends AbstractThreadPool { @Override public String getType() { return "blocking"; } + + @Override public int getPoolSize() { + return ((ThreadPoolExecutor) executorService).getPoolSize(); + } + + @Override public int getActiveCount() { + return ((ThreadPoolExecutor) executorService).getActiveCount(); + } + + @Override public int getSchedulerPoolSize() { + return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize(); + } + + @Override public int getSchedulerActiveCount() { + return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount(); + } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolManagement.java new file mode 100644 index 00000000000..722322d4139 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolManagement.java @@ -0,0 +1,83 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool.blocking; + +import org.elasticsearch.jmx.MBean; +import org.elasticsearch.jmx.ManagedAttribute; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.util.guice.inject.Inject; + +/** + * @author kimchy (shay.banon) + */ +@MBean(objectName = "service=threadpool,threadpoolType=blocking", description = "Blocking Thread Pool") +public class BlockingThreadPoolManagement { + + private final BlockingThreadPool threadPool; + + @Inject public BlockingThreadPoolManagement(ThreadPool threadPool) { + this.threadPool = (BlockingThreadPool) threadPool; + } + + @ManagedAttribute(description = "Minimum number Of threads") + public long getMin() { + return threadPool.min; + } + + @ManagedAttribute(description = "Maximum number of threads") + public int getMax() { + return threadPool.max; + } + + @ManagedAttribute(description = "Number of scheduler threads") + public int getScheduleSize() { + return threadPool.scheduledSize; + } + + @ManagedAttribute(description = "Thread keep alive") + public String getKeepAlive() { + return threadPool.keepAlive.format(); + } + + @ManagedAttribute(description = "Thread keep alive (in seconds)") + public long getKeepAliveInSeconds() { + return threadPool.keepAlive.seconds(); + } + + @ManagedAttribute(description = "Current number of threads in the pool") + public long getPoolSize() { + return threadPool.getPoolSize(); + } + + @ManagedAttribute(description = "Approximate number of threads that are actively executing tasks") + public long getActiveCount() { + return threadPool.getActiveCount(); + } + + @ManagedAttribute(description = "Current number of threads in the scheduler pool") + public long getSchedulerPoolSize() { + return threadPool.getSchedulerPoolSize(); + } + + @ManagedAttribute(description = "Approximate number of threads that are actively executing scheduled tasks") + public long getSchedulerActiveCount() { + return threadPool.getSchedulerActiveCount(); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolModule.java new file mode 100644 index 00000000000..d0fce692860 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolModule.java @@ -0,0 +1,34 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool.blocking; + +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.util.guice.inject.AbstractModule; + +/** + * @author kimchy (shay.banon) + */ +public class BlockingThreadPoolModule extends AbstractModule { + + @Override protected void configure() { + bind(ThreadPool.class).to(BlockingThreadPool.class).asEagerSingleton(); + bind(BlockingThreadPoolManagement.class).asEagerSingleton(); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPool.java index 3ac27e0ce9a..326524d98c1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPool.java @@ -19,10 +19,10 @@ package org.elasticsearch.threadpool.cached; -import org.elasticsearch.util.guice.inject.Inject; import org.elasticsearch.threadpool.support.AbstractThreadPool; import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.concurrent.DynamicExecutors; +import org.elasticsearch.util.guice.inject.Inject; import org.elasticsearch.util.settings.Settings; import java.util.concurrent.Executors; @@ -40,9 +40,9 @@ import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; */ public class CachedThreadPool extends AbstractThreadPool { - private final TimeValue keepAlive; + final TimeValue keepAlive; - private final int scheduledSize; + final int scheduledSize; public CachedThreadPool() { this(EMPTY_SETTINGS); @@ -64,4 +64,20 @@ public class CachedThreadPool extends AbstractThreadPool { @Override public String getType() { return "cached"; } + + @Override public int getPoolSize() { + return ((ThreadPoolExecutor) executorService).getPoolSize(); + } + + @Override public int getActiveCount() { + return ((ThreadPoolExecutor) executorService).getActiveCount(); + } + + @Override public int getSchedulerPoolSize() { + return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize(); + } + + @Override public int getSchedulerActiveCount() { + return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount(); + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolManagement.java new file mode 100644 index 00000000000..36f58f3c585 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolManagement.java @@ -0,0 +1,73 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool.cached; + +import org.elasticsearch.jmx.MBean; +import org.elasticsearch.jmx.ManagedAttribute; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.util.guice.inject.Inject; + +/** + * @author kimchy (shay.banon) + */ +@MBean(objectName = "service=threadpool,threadpoolType=cached", description = "Cached Thread Pool") +public class CachedThreadPoolManagement { + + private final CachedThreadPool threadPool; + + @Inject public CachedThreadPoolManagement(ThreadPool threadPool) { + this.threadPool = (CachedThreadPool) threadPool; + } + + @ManagedAttribute(description = "Number of scheduler threads") + public int getScheduleSize() { + return threadPool.scheduledSize; + } + + @ManagedAttribute(description = "Thread keep alive") + public String getKeepAlive() { + return threadPool.keepAlive.format(); + } + + @ManagedAttribute(description = "Thread keep alive (in seconds)") + public long getKeepAliveInSeconds() { + return threadPool.keepAlive.seconds(); + } + + @ManagedAttribute(description = "Current number of threads in the pool") + public long getPoolSize() { + return threadPool.getPoolSize(); + } + + @ManagedAttribute(description = "Approximate number of threads that are actively executing tasks") + public long getActiveCount() { + return threadPool.getActiveCount(); + } + + @ManagedAttribute(description = "Current number of threads in the scheduler pool") + public long getSchedulerPoolSize() { + return threadPool.getSchedulerPoolSize(); + } + + @ManagedAttribute(description = "Approximate number of threads that are actively executing scheduled tasks") + public long getSchedulerActiveCount() { + return threadPool.getSchedulerActiveCount(); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolModule.java new file mode 100644 index 00000000000..666c037c765 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolModule.java @@ -0,0 +1,34 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool.cached; + +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.util.guice.inject.AbstractModule; + +/** + * @author kimchy (shay.banon) + */ +public class CachedThreadPoolModule extends AbstractModule { + + @Override protected void configure() { + bind(ThreadPool.class).to(CachedThreadPool.class).asEagerSingleton(); + bind(CachedThreadPoolManagement.class).asEagerSingleton(); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java index 55a52e21dde..69533da7fea 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java @@ -19,13 +19,15 @@ package org.elasticsearch.threadpool.scaling; -import org.elasticsearch.util.guice.inject.Inject; import org.elasticsearch.threadpool.support.AbstractThreadPool; import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.concurrent.DynamicExecutors; +import org.elasticsearch.util.concurrent.ScalingThreadPoolExecutor; +import org.elasticsearch.util.guice.inject.Inject; import org.elasticsearch.util.settings.Settings; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import static org.elasticsearch.util.TimeValue.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; @@ -35,11 +37,12 @@ import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; */ public class ScalingThreadPool extends AbstractThreadPool { - private final int min; - private final int max; - private final TimeValue keepAlive; + final int min; + final int max; + final TimeValue keepAlive; + final TimeValue interval; - private final int scheduledSize; + final int scheduledSize; public ScalingThreadPool() { this(EMPTY_SETTINGS); @@ -47,17 +50,35 @@ public class ScalingThreadPool extends AbstractThreadPool { @Inject public ScalingThreadPool(Settings settings) { super(settings); - this.min = componentSettings.getAsInt("min", 1); + this.min = componentSettings.getAsInt("min", 10); this.max = componentSettings.getAsInt("max", 100); this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueSeconds(60)); + this.interval = componentSettings.getAsTime("interval", timeValueSeconds(5)); this.scheduledSize = componentSettings.getAsInt("scheduled_size", 20); - logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], scheduled_size[{}]", new Object[]{getType(), min, max, keepAlive, scheduledSize}); - executorService = DynamicExecutors.newScalingThreadPool(min, max, keepAlive.millis(), DynamicExecutors.daemonThreadFactory(settings, "[tp]")); + logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], scheduled_size[{}]", getType(), min, max, keepAlive, scheduledSize); scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, DynamicExecutors.daemonThreadFactory(settings, "[sc]")); + executorService = new ScalingThreadPoolExecutor(min, max, keepAlive, DynamicExecutors.daemonThreadFactory(settings, "[tp]"), scheduledExecutorService, + interval); started = true; } + @Override public int getPoolSize() { + return ((ScalingThreadPoolExecutor) executorService).getPoolSize(); + } + + @Override public int getActiveCount() { + return ((ScalingThreadPoolExecutor) executorService).getActiveCount(); + } + + @Override public int getSchedulerPoolSize() { + return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize(); + } + + @Override public int getSchedulerActiveCount() { + return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount(); + } + @Override public String getType() { - return "dynamic"; + return "scaling"; } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolManagement.java new file mode 100644 index 00000000000..afe0f0035a5 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolManagement.java @@ -0,0 +1,83 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool.scaling; + +import org.elasticsearch.jmx.MBean; +import org.elasticsearch.jmx.ManagedAttribute; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.util.guice.inject.Inject; + +/** + * @author kimchy (shay.banon) + */ +@MBean(objectName = "service=threadpool,threadpoolType=scaling", description = "Scaling Thread Pool") +public class ScalingThreadPoolManagement { + + private final ScalingThreadPool threadPool; + + @Inject public ScalingThreadPoolManagement(ThreadPool threadPool) { + this.threadPool = (ScalingThreadPool) threadPool; + } + + @ManagedAttribute(description = "Minimum number Of threads") + public long getMin() { + return threadPool.min; + } + + @ManagedAttribute(description = "Maximum number of threads") + public int getMax() { + return threadPool.max; + } + + @ManagedAttribute(description = "Number of scheduler threads") + public int getScheduleSize() { + return threadPool.scheduledSize; + } + + @ManagedAttribute(description = "Thread keep alive") + public String getKeepAlive() { + return threadPool.keepAlive.format(); + } + + @ManagedAttribute(description = "Thread keep alive (in seconds)") + public long getKeepAliveInSeconds() { + return threadPool.keepAlive.seconds(); + } + + @ManagedAttribute(description = "Current number of threads in the pool") + public long getPoolSize() { + return threadPool.getPoolSize(); + } + + @ManagedAttribute(description = "Approximate number of threads that are actively executing tasks") + public long getActiveCount() { + return threadPool.getActiveCount(); + } + + @ManagedAttribute(description = "Current number of threads in the scheduler pool") + public long getSchedulerPoolSize() { + return threadPool.getSchedulerPoolSize(); + } + + @ManagedAttribute(description = "Approximate number of threads that are actively executing scheduled tasks") + public long getSchedulerActiveCount() { + return threadPool.getSchedulerActiveCount(); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolModule.java new file mode 100644 index 00000000000..7092b426643 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolModule.java @@ -0,0 +1,34 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool.scaling; + +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.util.guice.inject.AbstractModule; + +/** + * @author kimchy (shay.banon) + */ +public class ScalingThreadPoolModule extends AbstractModule { + + @Override protected void configure() { + bind(ThreadPool.class).to(ScalingThreadPool.class).asEagerSingleton(); + bind(ScalingThreadPoolManagement.class).asEagerSingleton(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java index 1dd5d678b23..347bb62279d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java @@ -28,7 +28,7 @@ import org.elasticsearch.util.settings.Settings; import java.util.concurrent.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public abstract class AbstractThreadPool extends AbstractComponent implements ThreadPool { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ScalingThreadPoolExecutor.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ScalingThreadPoolExecutor.java new file mode 100644 index 00000000000..bbb0ae4aae6 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ScalingThreadPoolExecutor.java @@ -0,0 +1,527 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.util.concurrent; + +import org.elasticsearch.util.TimeValue; +import org.elasticsearch.util.concurrent.jsr166y.LinkedTransferQueue; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @author kimchy (shay.banon) + */ +public class ScalingThreadPoolExecutor extends AbstractExecutorService { + + private final BlockingQueue workQueue = new LinkedTransferQueue(); + + private final AtomicInteger queueSize = new AtomicInteger(); + + /** + * Lock held on updates to poolSize, corePoolSize, + * maximumPoolSize, runState, and workers set. + */ + private final ReentrantLock mainLock = new ReentrantLock(); + + /** + * Wait condition to support awaitTermination + */ + private final Condition termination = mainLock.newCondition(); + + /** + * Set containing all worker threads in pool. Accessed only when + * holding mainLock. + */ + private final HashSet workers = new HashSet(); + + + /** + * Factory for new threads. All threads are created using this + * factory (via method addThread). All callers must be prepared + * for addThread to fail by returning null, which may reflect a + * system or user's policy limiting the number of threads. Even + * though it is not treated as an error, failure to create threads + * may result in new tasks being rejected or existing ones + * remaining stuck in the queue. On the other hand, no special + * precautions exist to handle OutOfMemoryErrors that might be + * thrown while trying to create threads, since there is generally + * no recourse from within this class. + */ + private final ThreadFactory threadFactory; + + /** + * runState provides the main lifecyle control, taking on values: + * + * RUNNING: Accept new tasks and process queued tasks + * SHUTDOWN: Don't accept new tasks, but process queued tasks + * STOP: Don't accept new tasks, don't process queued tasks, + * and interrupt in-progress tasks + * TERMINATED: Same as STOP, plus all threads have terminated + * + * The numerical order among these values matters, to allow + * ordered comparisons. The runState monotonically increases over + * time, but need not hit each state. The transitions are: + * + * RUNNING -> SHUTDOWN + * On invocation of shutdown(), perhaps implicitly in finalize() + * (RUNNING or SHUTDOWN) -> STOP + * On invocation of shutdownNow() + * SHUTDOWN -> TERMINATED + * When both queue and pool are empty + * STOP -> TERMINATED + * When pool is empty + */ + volatile int runState; + static final int RUNNING = 0; + static final int SHUTDOWN = 1; + static final int STOP = 2; + static final int TERMINATED = 3; + + + /** + * Core pool size, updated only while holding mainLock, but + * volatile to allow concurrent readability even during updates. + */ + private final int corePoolSize; + + /** + * Maximum pool size, updated only while holding mainLock but + * volatile to allow concurrent readability even during updates. + */ + private final int maximumPoolSize; + + /** + * Timeout in nanoseconds for idle threads waiting for work. + * Threads use this timeout when there are more than corePoolSize + * present or if allowCoreThreadTimeOut. Otherwise they wait + * forever for new work. + */ + private final long keepAliveTime; + + /** + * Current pool size, updated only while holding mainLock but + * volatile to allow concurrent readability even during updates. + */ + private volatile int poolSize; + + + private final ScheduledFuture scheduledFuture; + + public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, TimeValue keepAlive, + ThreadFactory threadFactory, + ScheduledExecutorService scheduler, TimeValue schedulerInterval) { + this.corePoolSize = corePoolSize; + this.maximumPoolSize = maximumPoolSize; + this.keepAliveTime = keepAlive.nanos(); + this.threadFactory = threadFactory; + + for (int i = 0; i < corePoolSize; i++) { + Thread t = addThread(); + if (t != null) + t.start(); + } + + this.scheduledFuture = scheduler.scheduleWithFixedDelay(new Scheduler(), schedulerInterval.nanos(), + schedulerInterval.nanos(), TimeUnit.NANOSECONDS); + } + + + @Override public void execute(Runnable command) { + queueSize.incrementAndGet(); + workQueue.add(command); + } + + @Override public void shutdown() { + if (!scheduledFuture.isCancelled()) { + scheduledFuture.cancel(false); + } + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + int state = runState; + if (state < SHUTDOWN) + runState = SHUTDOWN; + + try { + for (Worker w : workers) { + w.interruptIfIdle(); + } + } catch (SecurityException se) { // Try to back out + runState = state; + // tryTerminate() here would be a no-op + throw se; + } + + tryTerminate(); // Terminate now if pool and queue empty + } finally { + mainLock.unlock(); + } + } + + @Override public List shutdownNow() { + if (!scheduledFuture.isCancelled()) { + scheduledFuture.cancel(false); + } + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + int state = runState; + if (state < STOP) + runState = STOP; + + try { + for (Worker w : workers) { + w.interruptNow(); + } + } catch (SecurityException se) { // Try to back out + runState = state; + // tryTerminate() here would be a no-op + throw se; + } + + List tasks = drainQueue(); + tryTerminate(); // Terminate now if pool and queue empty + return tasks; + } finally { + mainLock.unlock(); + } + } + + @Override public boolean isShutdown() { + return runState != RUNNING; + } + + @Override public boolean isTerminated() { + return runState == TERMINATED; + } + + @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + for (; ;) { + if (runState == TERMINATED) + return true; + if (nanos <= 0) + return false; + nanos = termination.awaitNanos(nanos); + } + } finally { + mainLock.unlock(); + } + } + + /** + * Returns the current number of threads in the pool. + * + * @return the number of threads + */ + public int getPoolSize() { + return poolSize; + } + + /** + * Returns the approximate number of threads that are actively + * executing tasks. + * + * @return the number of threads + */ + public int getActiveCount() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + int n = 0; + for (Worker w : workers) { + if (w.isActive()) + ++n; + } + return n; + } finally { + mainLock.unlock(); + } + } + + private final class Scheduler implements Runnable { + @Override public void run() { + if (queueSize.get() > 0 && poolSize < maximumPoolSize) { + final ReentrantLock mainLock = ScalingThreadPoolExecutor.this.mainLock; + mainLock.lock(); + try { + int currentQueueSize = queueSize.get(); + if (currentQueueSize > 0 && poolSize < maximumPoolSize) { + int incrementBy = currentQueueSize; + if (poolSize + incrementBy > maximumPoolSize) { + incrementBy = maximumPoolSize - poolSize; + } + for (int i = 0; i < incrementBy; i++) { + Thread t = addThread(); + if (t != null) + t.start(); + } + } + } finally { + mainLock.unlock(); + } + } + } + } + + + private final class Worker implements Runnable { + /** + * The runLock is acquired and released surrounding each task + * execution. It mainly protects against interrupts that are + * intended to cancel the worker thread from instead + * interrupting the task being run. + */ + private final ReentrantLock runLock = new ReentrantLock(); + + /** + * Thread this worker is running in. Acts as a final field, + * but cannot be set until thread is created. + */ + Thread thread; + + Worker() { + } + + boolean isActive() { + return runLock.isLocked(); + } + + /** + * Interrupts thread if not running a task. + */ + void interruptIfIdle() { + final ReentrantLock runLock = this.runLock; + if (runLock.tryLock()) { + try { + if (thread != Thread.currentThread()) + thread.interrupt(); + } finally { + runLock.unlock(); + } + } + } + + /** + * Interrupts thread even if running a task. + */ + void interruptNow() { + thread.interrupt(); + } + + /** + * Runs a single task between before/after methods. + */ + private void runTask(Runnable task) { + final ReentrantLock runLock = this.runLock; + runLock.lock(); + try { + /* + * Ensure that unless pool is stopping, this thread + * does not have its interrupt set. This requires a + * double-check of state in case the interrupt was + * cleared concurrently with a shutdownNow -- if so, + * the interrupt is re-enabled. + */ + if (runState < STOP && Thread.interrupted() && runState >= STOP) + thread.interrupt(); + + task.run(); + } finally { + runLock.unlock(); + } + } + + /** + * Main run loop + */ + public void run() { + try { + Runnable task; + while ((task = getTask()) != null) { + runTask(task); + } + } finally { + workerDone(this); + } + } + } + + + Runnable getTask() { + for (; ;) { + try { + int state = runState; + if (state > SHUTDOWN) + return null; + Runnable r; + if (state == SHUTDOWN) // Help drain queue + r = workQueue.poll(); + else if (poolSize > corePoolSize) + r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); + else + r = workQueue.take(); + if (r != null) { + queueSize.decrementAndGet(); + return r; + } + if (workerCanExit()) { + if (runState >= SHUTDOWN) // Wake up others + interruptIdleWorkers(); + return null; + } + // Else retry + } catch (InterruptedException ie) { + // On interruption, re-check runState + } + } + } + + /** + * Check whether a worker thread that fails to get a task can + * exit. We allow a worker thread to die if the pool is stopping, + * or the queue is empty, or there is at least one thread to + * handle possibly non-empty queue, even if core timeouts are + * allowed. + */ + private boolean workerCanExit() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + boolean canExit; + try { + canExit = runState >= STOP || queueSize.get() == 0; + } finally { + mainLock.unlock(); + } + return canExit; + } + + /** + * Wakes up all threads that might be waiting for tasks so they + * can check for termination. Note: this method is also called by + * ScheduledThreadPoolExecutor. + */ + void interruptIdleWorkers() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + for (Worker w : workers) + w.interruptIfIdle(); + } finally { + mainLock.unlock(); + } + } + + /** + * Performs bookkeeping for an exiting worker thread. + * + * @param w the worker + */ + void workerDone(Worker w) { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + workers.remove(w); + if (--poolSize == 0) + tryTerminate(); + } finally { + mainLock.unlock(); + } + } + + /** + * Transitions to TERMINATED state if either (SHUTDOWN and pool + * and queue empty) or (STOP and pool empty), otherwise unless + * stopped, ensuring that there is at least one live thread to + * handle queued tasks. + * + * This method is called from the three places in which + * termination can occur: in workerDone on exit of the last thread + * after pool has been shut down, or directly within calls to + * shutdown or shutdownNow, if there are no live threads. + */ + private void tryTerminate() { + if (poolSize == 0) { + int state = runState; + if (state < STOP && queueSize.get() > 0) { + state = RUNNING; // disable termination check below + Thread t = addThread(); + if (t != null) + t.start(); + } + if (state == STOP || state == SHUTDOWN) { + runState = TERMINATED; + termination.signalAll(); + } + } + } + + /** + * Creates and returns a new thread running firstTask as its first + * task. Call only while holding mainLock. + */ + private Thread addThread() { + Worker w = new Worker(); + Thread t = threadFactory.newThread(w); + if (t != null) { + w.thread = t; + workers.add(w); + ++poolSize; + } + return t; + } + + /** + * Drains the task queue into a new list. Used by shutdownNow. + * Call only while holding main lock. + */ + private List drainQueue() { + List taskList = new ArrayList(); + workQueue.drainTo(taskList); + queueSize.getAndAdd(taskList.size() * -1); + /* + * If the queue is a DelayQueue or any other kind of queue + * for which poll or drainTo may fail to remove some elements, + * we need to manually traverse and remove remaining tasks. + * To guarantee atomicity wrt other threads using this queue, + * we need to create a new iterator for each element removed. + */ + while (!workQueue.isEmpty()) { + Iterator it = workQueue.iterator(); + try { + if (it.hasNext()) { + Runnable r = it.next(); + if (workQueue.remove(r)) { + taskList.add(r); + queueSize.decrementAndGet(); + } + } + } catch (ConcurrentModificationException ignore) { + } + } + return taskList; + } +}