start work on a more scalable thread pool, expose its status over jmx

This commit is contained in:
kimchy 2010-05-10 01:13:16 +03:00
parent 7fe5243517
commit 35c8774304
14 changed files with 972 additions and 23 deletions

View File

@ -41,6 +41,7 @@
<w>intf</w> <w>intf</w>
<w>iter</w> <w>iter</w>
<w>iterable</w> <w>iterable</w>
<w>javax</w>
<w>jclouds</w> <w>jclouds</w>
<w>jgroups</w> <w>jgroups</w>
<w>joda</w> <w>joda</w>
@ -85,6 +86,7 @@
<w>streamable</w> <w>streamable</w>
<w>successul</w> <w>successul</w>
<w>tagline</w> <w>tagline</w>
<w>threadpool</w>
<w>throwable</w> <w>throwable</w>
<w>tika</w> <w>tika</w>
<w>timestamp</w> <w>timestamp</w>

View File

@ -28,6 +28,28 @@ import java.util.concurrent.*;
*/ */
public interface ThreadPool { public interface ThreadPool {
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
int getPoolSize();
/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
int getActiveCount();
int getSchedulerPoolSize();
int getSchedulerActiveCount();
/**
* Returns <tt>true</tt> if the thread pool has started.
*/
boolean isStarted(); boolean isStarted();
/** /**

View File

@ -19,10 +19,13 @@
package org.elasticsearch.threadpool; package org.elasticsearch.threadpool;
import org.elasticsearch.threadpool.cached.CachedThreadPoolModule;
import org.elasticsearch.util.guice.inject.AbstractModule; import org.elasticsearch.util.guice.inject.AbstractModule;
import org.elasticsearch.threadpool.cached.CachedThreadPool; import org.elasticsearch.util.guice.inject.Module;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import static org.elasticsearch.util.guice.ModulesFactory.*;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (Shay Banon)
*/ */
@ -35,7 +38,7 @@ public class ThreadPoolModule extends AbstractModule {
} }
@Override protected void configure() { @Override protected void configure() {
bind(ThreadPool.class) Class<? extends Module> moduleClass = settings.getAsClass("transport.type", CachedThreadPoolModule.class, "org.elasticsearch.threadpool.", "ThreadPoolModule");
.to(settings.getAsClass("threadpool.type", CachedThreadPool.class, "org.elasticsearch.threadpool.", "ThreadPool")).asEagerSingleton(); createModule(moduleClass, settings).configure(binder());
} }
} }

View File

@ -19,15 +19,16 @@
package org.elasticsearch.threadpool.blocking; package org.elasticsearch.threadpool.blocking;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.threadpool.support.AbstractThreadPool; import org.elasticsearch.threadpool.support.AbstractThreadPool;
import org.elasticsearch.util.SizeUnit; import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue; import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.concurrent.DynamicExecutors; import org.elasticsearch.util.concurrent.DynamicExecutors;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import static org.elasticsearch.util.TimeValue.*; import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
@ -39,13 +40,13 @@ import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
*/ */
public class BlockingThreadPool extends AbstractThreadPool { public class BlockingThreadPool extends AbstractThreadPool {
private final int min; final int min;
private final int max; final int max;
private final int capacity; final int capacity;
private final TimeValue waitTime; final TimeValue waitTime;
private final TimeValue keepAlive; final TimeValue keepAlive;
private final int scheduledSize; final int scheduledSize;
public BlockingThreadPool() { public BlockingThreadPool() {
this(EMPTY_SETTINGS); this(EMPTY_SETTINGS);
@ -68,4 +69,20 @@ public class BlockingThreadPool extends AbstractThreadPool {
@Override public String getType() { @Override public String getType() {
return "blocking"; return "blocking";
} }
@Override public int getPoolSize() {
return ((ThreadPoolExecutor) executorService).getPoolSize();
}
@Override public int getActiveCount() {
return ((ThreadPoolExecutor) executorService).getActiveCount();
}
@Override public int getSchedulerPoolSize() {
return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize();
}
@Override public int getSchedulerActiveCount() {
return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount();
}
} }

View File

@ -0,0 +1,83 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool.blocking;
import org.elasticsearch.jmx.MBean;
import org.elasticsearch.jmx.ManagedAttribute;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.guice.inject.Inject;
/**
* @author kimchy (shay.banon)
*/
@MBean(objectName = "service=threadpool,threadpoolType=blocking", description = "Blocking Thread Pool")
public class BlockingThreadPoolManagement {
private final BlockingThreadPool threadPool;
@Inject public BlockingThreadPoolManagement(ThreadPool threadPool) {
this.threadPool = (BlockingThreadPool) threadPool;
}
@ManagedAttribute(description = "Minimum number Of threads")
public long getMin() {
return threadPool.min;
}
@ManagedAttribute(description = "Maximum number of threads")
public int getMax() {
return threadPool.max;
}
@ManagedAttribute(description = "Number of scheduler threads")
public int getScheduleSize() {
return threadPool.scheduledSize;
}
@ManagedAttribute(description = "Thread keep alive")
public String getKeepAlive() {
return threadPool.keepAlive.format();
}
@ManagedAttribute(description = "Thread keep alive (in seconds)")
public long getKeepAliveInSeconds() {
return threadPool.keepAlive.seconds();
}
@ManagedAttribute(description = "Current number of threads in the pool")
public long getPoolSize() {
return threadPool.getPoolSize();
}
@ManagedAttribute(description = "Approximate number of threads that are actively executing tasks")
public long getActiveCount() {
return threadPool.getActiveCount();
}
@ManagedAttribute(description = "Current number of threads in the scheduler pool")
public long getSchedulerPoolSize() {
return threadPool.getSchedulerPoolSize();
}
@ManagedAttribute(description = "Approximate number of threads that are actively executing scheduled tasks")
public long getSchedulerActiveCount() {
return threadPool.getSchedulerActiveCount();
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool.blocking;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.guice.inject.AbstractModule;
/**
* @author kimchy (shay.banon)
*/
public class BlockingThreadPoolModule extends AbstractModule {
@Override protected void configure() {
bind(ThreadPool.class).to(BlockingThreadPool.class).asEagerSingleton();
bind(BlockingThreadPoolManagement.class).asEagerSingleton();
}
}

View File

@ -19,10 +19,10 @@
package org.elasticsearch.threadpool.cached; package org.elasticsearch.threadpool.cached;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.threadpool.support.AbstractThreadPool; import org.elasticsearch.threadpool.support.AbstractThreadPool;
import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.concurrent.DynamicExecutors; import org.elasticsearch.util.concurrent.DynamicExecutors;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -40,9 +40,9 @@ import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
*/ */
public class CachedThreadPool extends AbstractThreadPool { public class CachedThreadPool extends AbstractThreadPool {
private final TimeValue keepAlive; final TimeValue keepAlive;
private final int scheduledSize; final int scheduledSize;
public CachedThreadPool() { public CachedThreadPool() {
this(EMPTY_SETTINGS); this(EMPTY_SETTINGS);
@ -64,4 +64,20 @@ public class CachedThreadPool extends AbstractThreadPool {
@Override public String getType() { @Override public String getType() {
return "cached"; return "cached";
} }
@Override public int getPoolSize() {
return ((ThreadPoolExecutor) executorService).getPoolSize();
}
@Override public int getActiveCount() {
return ((ThreadPoolExecutor) executorService).getActiveCount();
}
@Override public int getSchedulerPoolSize() {
return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize();
}
@Override public int getSchedulerActiveCount() {
return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount();
}
} }

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool.cached;
import org.elasticsearch.jmx.MBean;
import org.elasticsearch.jmx.ManagedAttribute;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.guice.inject.Inject;
/**
* @author kimchy (shay.banon)
*/
@MBean(objectName = "service=threadpool,threadpoolType=cached", description = "Cached Thread Pool")
public class CachedThreadPoolManagement {
private final CachedThreadPool threadPool;
@Inject public CachedThreadPoolManagement(ThreadPool threadPool) {
this.threadPool = (CachedThreadPool) threadPool;
}
@ManagedAttribute(description = "Number of scheduler threads")
public int getScheduleSize() {
return threadPool.scheduledSize;
}
@ManagedAttribute(description = "Thread keep alive")
public String getKeepAlive() {
return threadPool.keepAlive.format();
}
@ManagedAttribute(description = "Thread keep alive (in seconds)")
public long getKeepAliveInSeconds() {
return threadPool.keepAlive.seconds();
}
@ManagedAttribute(description = "Current number of threads in the pool")
public long getPoolSize() {
return threadPool.getPoolSize();
}
@ManagedAttribute(description = "Approximate number of threads that are actively executing tasks")
public long getActiveCount() {
return threadPool.getActiveCount();
}
@ManagedAttribute(description = "Current number of threads in the scheduler pool")
public long getSchedulerPoolSize() {
return threadPool.getSchedulerPoolSize();
}
@ManagedAttribute(description = "Approximate number of threads that are actively executing scheduled tasks")
public long getSchedulerActiveCount() {
return threadPool.getSchedulerActiveCount();
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool.cached;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.guice.inject.AbstractModule;
/**
* @author kimchy (shay.banon)
*/
public class CachedThreadPoolModule extends AbstractModule {
@Override protected void configure() {
bind(ThreadPool.class).to(CachedThreadPool.class).asEagerSingleton();
bind(CachedThreadPoolManagement.class).asEagerSingleton();
}
}

View File

@ -19,13 +19,15 @@
package org.elasticsearch.threadpool.scaling; package org.elasticsearch.threadpool.scaling;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.threadpool.support.AbstractThreadPool; import org.elasticsearch.threadpool.support.AbstractThreadPool;
import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.concurrent.DynamicExecutors; import org.elasticsearch.util.concurrent.DynamicExecutors;
import org.elasticsearch.util.concurrent.ScalingThreadPoolExecutor;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import static org.elasticsearch.util.TimeValue.*; import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
@ -35,11 +37,12 @@ import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
*/ */
public class ScalingThreadPool extends AbstractThreadPool { public class ScalingThreadPool extends AbstractThreadPool {
private final int min; final int min;
private final int max; final int max;
private final TimeValue keepAlive; final TimeValue keepAlive;
final TimeValue interval;
private final int scheduledSize; final int scheduledSize;
public ScalingThreadPool() { public ScalingThreadPool() {
this(EMPTY_SETTINGS); this(EMPTY_SETTINGS);
@ -47,17 +50,35 @@ public class ScalingThreadPool extends AbstractThreadPool {
@Inject public ScalingThreadPool(Settings settings) { @Inject public ScalingThreadPool(Settings settings) {
super(settings); super(settings);
this.min = componentSettings.getAsInt("min", 1); this.min = componentSettings.getAsInt("min", 10);
this.max = componentSettings.getAsInt("max", 100); this.max = componentSettings.getAsInt("max", 100);
this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueSeconds(60)); this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueSeconds(60));
this.interval = componentSettings.getAsTime("interval", timeValueSeconds(5));
this.scheduledSize = componentSettings.getAsInt("scheduled_size", 20); this.scheduledSize = componentSettings.getAsInt("scheduled_size", 20);
logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], scheduled_size[{}]", new Object[]{getType(), min, max, keepAlive, scheduledSize}); logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], scheduled_size[{}]", getType(), min, max, keepAlive, scheduledSize);
executorService = DynamicExecutors.newScalingThreadPool(min, max, keepAlive.millis(), DynamicExecutors.daemonThreadFactory(settings, "[tp]"));
scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, DynamicExecutors.daemonThreadFactory(settings, "[sc]")); scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, DynamicExecutors.daemonThreadFactory(settings, "[sc]"));
executorService = new ScalingThreadPoolExecutor(min, max, keepAlive, DynamicExecutors.daemonThreadFactory(settings, "[tp]"), scheduledExecutorService,
interval);
started = true; started = true;
} }
@Override public int getPoolSize() {
return ((ScalingThreadPoolExecutor) executorService).getPoolSize();
}
@Override public int getActiveCount() {
return ((ScalingThreadPoolExecutor) executorService).getActiveCount();
}
@Override public int getSchedulerPoolSize() {
return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize();
}
@Override public int getSchedulerActiveCount() {
return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount();
}
@Override public String getType() { @Override public String getType() {
return "dynamic"; return "scaling";
} }
} }

View File

@ -0,0 +1,83 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool.scaling;
import org.elasticsearch.jmx.MBean;
import org.elasticsearch.jmx.ManagedAttribute;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.guice.inject.Inject;
/**
* @author kimchy (shay.banon)
*/
@MBean(objectName = "service=threadpool,threadpoolType=scaling", description = "Scaling Thread Pool")
public class ScalingThreadPoolManagement {
private final ScalingThreadPool threadPool;
@Inject public ScalingThreadPoolManagement(ThreadPool threadPool) {
this.threadPool = (ScalingThreadPool) threadPool;
}
@ManagedAttribute(description = "Minimum number Of threads")
public long getMin() {
return threadPool.min;
}
@ManagedAttribute(description = "Maximum number of threads")
public int getMax() {
return threadPool.max;
}
@ManagedAttribute(description = "Number of scheduler threads")
public int getScheduleSize() {
return threadPool.scheduledSize;
}
@ManagedAttribute(description = "Thread keep alive")
public String getKeepAlive() {
return threadPool.keepAlive.format();
}
@ManagedAttribute(description = "Thread keep alive (in seconds)")
public long getKeepAliveInSeconds() {
return threadPool.keepAlive.seconds();
}
@ManagedAttribute(description = "Current number of threads in the pool")
public long getPoolSize() {
return threadPool.getPoolSize();
}
@ManagedAttribute(description = "Approximate number of threads that are actively executing tasks")
public long getActiveCount() {
return threadPool.getActiveCount();
}
@ManagedAttribute(description = "Current number of threads in the scheduler pool")
public long getSchedulerPoolSize() {
return threadPool.getSchedulerPoolSize();
}
@ManagedAttribute(description = "Approximate number of threads that are actively executing scheduled tasks")
public long getSchedulerActiveCount() {
return threadPool.getSchedulerActiveCount();
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool.scaling;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.guice.inject.AbstractModule;
/**
* @author kimchy (shay.banon)
*/
public class ScalingThreadPoolModule extends AbstractModule {
@Override protected void configure() {
bind(ThreadPool.class).to(ScalingThreadPool.class).asEagerSingleton();
bind(ScalingThreadPoolManagement.class).asEagerSingleton();
}
}

View File

@ -28,7 +28,7 @@ import org.elasticsearch.util.settings.Settings;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public abstract class AbstractThreadPool extends AbstractComponent implements ThreadPool { public abstract class AbstractThreadPool extends AbstractComponent implements ThreadPool {

View File

@ -0,0 +1,527 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.concurrent;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.concurrent.jsr166y.LinkedTransferQueue;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author kimchy (shay.banon)
*/
public class ScalingThreadPoolExecutor extends AbstractExecutorService {
private final BlockingQueue<Runnable> workQueue = new LinkedTransferQueue<Runnable>();
private final AtomicInteger queueSize = new AtomicInteger();
/**
* Lock held on updates to poolSize, corePoolSize,
* maximumPoolSize, runState, and workers set.
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* Factory for new threads. All threads are created using this
* factory (via method addThread). All callers must be prepared
* for addThread to fail by returning null, which may reflect a
* system or user's policy limiting the number of threads. Even
* though it is not treated as an error, failure to create threads
* may result in new tasks being rejected or existing ones
* remaining stuck in the queue. On the other hand, no special
* precautions exist to handle OutOfMemoryErrors that might be
* thrown while trying to create threads, since there is generally
* no recourse from within this class.
*/
private final ThreadFactory threadFactory;
/**
* runState provides the main lifecyle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TERMINATED: Same as STOP, plus all threads have terminated
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TERMINATED
* When both queue and pool are empty
* STOP -> TERMINATED
* When pool is empty
*/
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
/**
* Core pool size, updated only while holding mainLock, but
* volatile to allow concurrent readability even during updates.
*/
private final int corePoolSize;
/**
* Maximum pool size, updated only while holding mainLock but
* volatile to allow concurrent readability even during updates.
*/
private final int maximumPoolSize;
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private final long keepAliveTime;
/**
* Current pool size, updated only while holding mainLock but
* volatile to allow concurrent readability even during updates.
*/
private volatile int poolSize;
private final ScheduledFuture scheduledFuture;
public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, TimeValue keepAlive,
ThreadFactory threadFactory,
ScheduledExecutorService scheduler, TimeValue schedulerInterval) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.keepAliveTime = keepAlive.nanos();
this.threadFactory = threadFactory;
for (int i = 0; i < corePoolSize; i++) {
Thread t = addThread();
if (t != null)
t.start();
}
this.scheduledFuture = scheduler.scheduleWithFixedDelay(new Scheduler(), schedulerInterval.nanos(),
schedulerInterval.nanos(), TimeUnit.NANOSECONDS);
}
@Override public void execute(Runnable command) {
queueSize.incrementAndGet();
workQueue.add(command);
}
@Override public void shutdown() {
if (!scheduledFuture.isCancelled()) {
scheduledFuture.cancel(false);
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int state = runState;
if (state < SHUTDOWN)
runState = SHUTDOWN;
try {
for (Worker w : workers) {
w.interruptIfIdle();
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}
tryTerminate(); // Terminate now if pool and queue empty
} finally {
mainLock.unlock();
}
}
@Override public List<Runnable> shutdownNow() {
if (!scheduledFuture.isCancelled()) {
scheduledFuture.cancel(false);
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int state = runState;
if (state < STOP)
runState = STOP;
try {
for (Worker w : workers) {
w.interruptNow();
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}
List<Runnable> tasks = drainQueue();
tryTerminate(); // Terminate now if pool and queue empty
return tasks;
} finally {
mainLock.unlock();
}
}
@Override public boolean isShutdown() {
return runState != RUNNING;
}
@Override public boolean isTerminated() {
return runState == TERMINATED;
}
@Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (; ;) {
if (runState == TERMINATED)
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize() {
return poolSize;
}
/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers) {
if (w.isActive())
++n;
}
return n;
} finally {
mainLock.unlock();
}
}
private final class Scheduler implements Runnable {
@Override public void run() {
if (queueSize.get() > 0 && poolSize < maximumPoolSize) {
final ReentrantLock mainLock = ScalingThreadPoolExecutor.this.mainLock;
mainLock.lock();
try {
int currentQueueSize = queueSize.get();
if (currentQueueSize > 0 && poolSize < maximumPoolSize) {
int incrementBy = currentQueueSize;
if (poolSize + incrementBy > maximumPoolSize) {
incrementBy = maximumPoolSize - poolSize;
}
for (int i = 0; i < incrementBy; i++) {
Thread t = addThread();
if (t != null)
t.start();
}
}
} finally {
mainLock.unlock();
}
}
}
}
private final class Worker implements Runnable {
/**
* The runLock is acquired and released surrounding each task
* execution. It mainly protects against interrupts that are
* intended to cancel the worker thread from instead
* interrupting the task being run.
*/
private final ReentrantLock runLock = new ReentrantLock();
/**
* Thread this worker is running in. Acts as a final field,
* but cannot be set until thread is created.
*/
Thread thread;
Worker() {
}
boolean isActive() {
return runLock.isLocked();
}
/**
* Interrupts thread if not running a task.
*/
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {
try {
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
/**
* Interrupts thread even if running a task.
*/
void interruptNow() {
thread.interrupt();
}
/**
* Runs a single task between before/after methods.
*/
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
/*
* Ensure that unless pool is stopping, this thread
* does not have its interrupt set. This requires a
* double-check of state in case the interrupt was
* cleared concurrently with a shutdownNow -- if so,
* the interrupt is re-enabled.
*/
if (runState < STOP && Thread.interrupted() && runState >= STOP)
thread.interrupt();
task.run();
} finally {
runLock.unlock();
}
}
/**
* Main run loop
*/
public void run() {
try {
Runnable task;
while ((task = getTask()) != null) {
runTask(task);
}
} finally {
workerDone(this);
}
}
}
Runnable getTask() {
for (; ;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null) {
queueSize.decrementAndGet();
return r;
}
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
/**
* Check whether a worker thread that fails to get a task can
* exit. We allow a worker thread to die if the pool is stopping,
* or the queue is empty, or there is at least one thread to
* handle possibly non-empty queue, even if core timeouts are
* allowed.
*/
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
try {
canExit = runState >= STOP || queueSize.get() == 0;
} finally {
mainLock.unlock();
}
return canExit;
}
/**
* Wakes up all threads that might be waiting for tasks so they
* can check for termination. Note: this method is also called by
* ScheduledThreadPoolExecutor.
*/
void interruptIdleWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfIdle();
} finally {
mainLock.unlock();
}
}
/**
* Performs bookkeeping for an exiting worker thread.
*
* @param w the worker
*/
void workerDone(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
workers.remove(w);
if (--poolSize == 0)
tryTerminate();
} finally {
mainLock.unlock();
}
}
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty), otherwise unless
* stopped, ensuring that there is at least one live thread to
* handle queued tasks.
*
* This method is called from the three places in which
* termination can occur: in workerDone on exit of the last thread
* after pool has been shut down, or directly within calls to
* shutdown or shutdownNow, if there are no live threads.
*/
private void tryTerminate() {
if (poolSize == 0) {
int state = runState;
if (state < STOP && queueSize.get() > 0) {
state = RUNNING; // disable termination check below
Thread t = addThread();
if (t != null)
t.start();
}
if (state == STOP || state == SHUTDOWN) {
runState = TERMINATED;
termination.signalAll();
}
}
}
/**
* Creates and returns a new thread running firstTask as its first
* task. Call only while holding mainLock.
*/
private Thread addThread() {
Worker w = new Worker();
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
++poolSize;
}
return t;
}
/**
* Drains the task queue into a new list. Used by shutdownNow.
* Call only while holding main lock.
*/
private List<Runnable> drainQueue() {
List<Runnable> taskList = new ArrayList<Runnable>();
workQueue.drainTo(taskList);
queueSize.getAndAdd(taskList.size() * -1);
/*
* If the queue is a DelayQueue or any other kind of queue
* for which poll or drainTo may fail to remove some elements,
* we need to manually traverse and remove remaining tasks.
* To guarantee atomicity wrt other threads using this queue,
* we need to create a new iterator for each element removed.
*/
while (!workQueue.isEmpty()) {
Iterator<Runnable> it = workQueue.iterator();
try {
if (it.hasNext()) {
Runnable r = it.next();
if (workQueue.remove(r)) {
taskList.add(r);
queueSize.decrementAndGet();
}
}
} catch (ConcurrentModificationException ignore) {
}
}
return taskList;
}
}