Thread Pool: Remove blocking type option

The blocking thread pool type is not recommended to be used, since it will end up blocking the IO thread most times when executing, which is not recommended (other operations will then stall as well).
closes #3531
This commit is contained in:
Shay Banon 2013-08-18 22:28:26 +02:00
parent 766c787737
commit 616b09e9b4
12 changed files with 60 additions and 401 deletions

View File

@ -120,7 +120,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
protected void doStart() throws ElasticSearchException {
add(localNodeMasterListeners);
this.clusterState = newClusterStateBuilder().blocks(initialBlocks).build();
this.updateTasksExecutor = EsExecutors.newSinglePrioritizingThreadExecutor(daemonThreadFactory(settings, "clusterService#updateTask"));
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(daemonThreadFactory(settings, "clusterService#updateTask"));
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes());
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.common.util.concurrent;
import jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import java.util.concurrent.*;
@ -31,28 +30,28 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class EsExecutors {
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizingThreadExecutor(ThreadFactory threadFactory) {
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(ThreadFactory threadFactory) {
return new PrioritizedEsThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory);
}
public static EsThreadPoolExecutor newScalingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory) {
public static EsThreadPoolExecutor newScaling(int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<Runnable>();
// we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory,
new ForceQueuePolicy());
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy());
queue.executor = executor;
return executor;
}
public static EsThreadPoolExecutor newBlockingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, int capacity,
long waitTime, TimeUnit waitTimeUnit) {
ExecutorBlockingQueue<Runnable> queue = new ExecutorBlockingQueue<Runnable>(capacity);
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory,
new TimedBlockingPolicy(waitTimeUnit.toMillis(waitTime)));
queue.executor = executor;
return executor;
public static EsThreadPoolExecutor newCached(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
return new EsThreadPoolExecutor(0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory);
}
public static EsThreadPoolExecutor newFixed(int size, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
return new EsThreadPoolExecutor(size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy());
}
public static EsThreadPoolExecutor newFixed(int size, BlockingQueue<Runnable> queue, ThreadFactory threadFactory, XRejectedExecutionHandler rejectedExecutionHandler) {
return new EsThreadPoolExecutor(size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, rejectedExecutionHandler);
}
public static String threadName(Settings settings, String namePrefix) {
@ -110,8 +109,8 @@ public class EsExecutors {
@Override
public boolean offer(E e) {
int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
if (!tryTransfer(e)) {
int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
if (left > 0) {
return false;
} else {
@ -123,27 +122,11 @@ public class EsExecutors {
}
}
static class ExecutorBlockingQueue<E> extends ArrayBlockingQueue<E> {
ThreadPoolExecutor executor;
ExecutorBlockingQueue(int capacity) {
super(capacity);
}
@Override
public boolean offer(E o) {
int allWorkingThreads = executor.getActiveCount() + super.size();
return allWorkingThreads < executor.getPoolSize() && super.offer(o);
}
}
/**
* A handler for rejected tasks that adds the specified element to this queue,
* waiting if necessary for space to become available.
*/
static class ForceQueuePolicy implements RejectedExecutionHandler {
static class ForceQueuePolicy implements XRejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
@ -152,40 +135,10 @@ public class EsExecutors {
throw new EsRejectedExecutionException(e);
}
}
}
/**
* A handler for rejected tasks that inserts the specified element into this
* queue, waiting if necessary up to the specified wait time for space to become
* available.
*/
static class TimedBlockingPolicy implements XRejectedExecutionHandler {
private final CounterMetric rejected = new CounterMetric();
private final long waitTime;
/**
* @param waitTime wait time in milliseconds for space to become available.
*/
public TimedBlockingPolicy(long waitTime) {
this.waitTime = waitTime;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
boolean successful = executor.getQueue().offer(r, waitTime, TimeUnit.MILLISECONDS);
if (!successful) {
rejected.inc();
throw new EsRejectedExecutionException();
}
} catch (InterruptedException e) {
throw new EsRejectedExecutionException(e);
}
}
@Override
public long rejected() {
return rejected.count();
return 0;
}
}
}

View File

@ -21,7 +21,10 @@ package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.ElasticSearchIllegalStateException;
import java.util.concurrent.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* An extension to thread pool executor, allowing (in the future) to add specific additional stats to it.
@ -32,11 +35,11 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
private final Object monitor = new Object();
public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy());
EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy());
}
public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

View File

@ -36,18 +36,10 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private AtomicLong insertionOrder = new AtomicLong();
public PrioritizedEsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
PrioritizedEsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(), threadFactory);
}
public PrioritizedEsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(), threadFactory, handler);
}
public PrioritizedEsThreadPoolExecutor(int corePoolSize, int initialWorkQueuSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(initialWorkQueuSize), threadFactory, handler);
}
public Pending[] getPending() {
Object[] objects = getQueue().toArray();
Pending[] infos = new Pending[objects.length];

View File

@ -223,7 +223,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
public Executor executor() {
if (executor == null) {
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
executor = EsExecutors.newScalingExecutorService(0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory);
executor = EsExecutors.newScaling(0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory);
}
return executor;
}

View File

@ -59,7 +59,7 @@ public class FsGateway extends BlobStoreGateway {
}
int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5);
this.concurrentStreamPool = EsExecutors.newScalingExecutorService(1, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[fs_stream]"));
this.concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[fs_stream]"));
initialize(new FsBlobStore(componentSettings, concurrentStreamPool, gatewayFile), clusterName, null);
}

View File

@ -72,7 +72,7 @@ public class RecoverySettings extends AbstractComponent {
this.compress = componentSettings.getAsBoolean("compress", true);
this.concurrentStreams = componentSettings.getAsInt("concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 3));
this.concurrentStreamPool = EsExecutors.newScalingExecutorService(0, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
this.concurrentStreamPool = EsExecutors.newScaling(0, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
this.maxBytesPerSec = componentSettings.getAsBytesSize("max_bytes_per_sec", componentSettings.getAsBytesSize("max_size_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB)));
if (maxBytesPerSec.bytes() <= 0) {

View File

@ -35,7 +35,10 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.*;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
@ -51,7 +54,6 @@ import java.util.concurrent.*;
import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
/**
*
@ -291,10 +293,7 @@ public class ThreadPool extends AbstractComponent {
} else {
logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive);
}
Executor executor = new EsThreadPoolExecutor(0, Integer.MAX_VALUE,
keepAlive.millis(), TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
Executor executor = EsExecutors.newCached(keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null));
} else if ("fixed".equals(type)) {
int defaultSize = defaultSettings.getAsInt("size", boundedNumberOfProcessors() * 5);
@ -308,17 +307,11 @@ public class ThreadPool extends AbstractComponent {
String updatedQueueType = settings.get("queue_type", previousInfo.getQueueType());
if (Objects.equal(previousInfo.getQueueSize(), updatedQueueSize) && previousInfo.getQueueType().equals(updatedQueueType)) {
int updatedSize = settings.getAsInt("size", previousInfo.getMax());
String updatedRejectSetting = settings.get("reject_policy", previousInfo.getRejectSetting());
if (previousInfo.getMax() != updatedSize) {
logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, updatedSize, updatedQueueSize, updatedRejectSetting, updatedQueueType);
logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], queue_type [{}]", name, type, updatedSize, updatedQueueSize, updatedQueueType);
((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedSize);
((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize);
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize, null, updatedRejectSetting, updatedQueueType));
}
if (!previousInfo.getRejectSetting().equals(updatedRejectSetting)) {
logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, updatedSize, updatedQueueSize, updatedRejectSetting, updatedQueueType);
((EsThreadPoolExecutor) previousExecutorHolder.executor).setRejectedExecutionHandler(newRejectedExecutionHandler(name, updatedRejectSetting));
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize, null, updatedRejectSetting, updatedQueueType));
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize, updatedQueueType));
}
return previousExecutorHolder;
}
@ -327,9 +320,6 @@ public class ThreadPool extends AbstractComponent {
defaultSize = previousInfo.getMax();
}
defaultQueueSize = previousInfo.getQueueSize();
if (previousInfo.rejectSetting != null) {
defaultRejectSetting = previousInfo.rejectSetting;
}
if (previousInfo.getQueueType() != null) {
defaultQueueType = previousInfo.getQueueType();
}
@ -337,16 +327,11 @@ public class ThreadPool extends AbstractComponent {
int size = settings.getAsInt("size", defaultSize);
SizeValue queueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultQueueSize)));
String rejectSetting = settings.get("reject_policy", defaultRejectSetting);
RejectedExecutionHandler rejectedExecutionHandler = newRejectedExecutionHandler(name, rejectSetting);
String queueType = settings.get("queue_type", defaultQueueType);
BlockingQueue<Runnable> workQueue = newQueue(queueSize, queueType);
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, size, queueSize, rejectSetting, queueType);
Executor executor = new EsThreadPoolExecutor(size, size,
0L, TimeUnit.MILLISECONDS,
workQueue,
threadFactory, rejectedExecutionHandler);
return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize, null, rejectSetting, queueType));
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, size, queueSize, queueType);
Executor executor = EsExecutors.newFixed(size, workQueue, threadFactory);
return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize, queueType));
} else if ("scaling".equals(type)) {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
int defaultMin = defaultSettings.getAsInt("min", 1);
@ -389,67 +374,8 @@ public class ThreadPool extends AbstractComponent {
} else {
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
}
Executor executor = EsExecutors.newScalingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
Executor executor = EsExecutors.newScaling(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null));
} else if ("blocking".equals(type)) {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
int defaultMin = defaultSettings.getAsInt("min", 1);
int defaultSize = defaultSettings.getAsInt("size", boundedNumberOfProcessors() * 5);
SizeValue defaultQueueSize = defaultSettings.getAsSize("queue_size", new SizeValue(1000));
TimeValue defaultWaitTime = defaultSettings.getAsTime("wait_time", timeValueSeconds(60));
if (previousExecutorHolder != null) {
if ("blocking".equals(previousInfo.getType())) {
SizeValue updatedQueueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultQueueSize)));
TimeValue updatedWaitTime = settings.getAsTime("wait_time", defaultWaitTime);
if (previousInfo.getQueueSize().equals(updatedQueueSize) && previousInfo.getWaitTime().equals(updatedWaitTime)) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
int updatedMin = settings.getAsInt("min", previousInfo.getMin());
int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax()));
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || !previousInfo.getWaitTime().equals(settings.getAsTime("wait_time", defaultWaitTime)) ||
previousInfo.getMin() != updatedMin || previousInfo.getMax() != updatedSize) {
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
}
if (previousInfo.getMin() != updatedMin) {
((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedMin);
}
if (previousInfo.getMax() != updatedSize) {
((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize);
}
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, updatedQueueSize, updatedWaitTime));
}
return previousExecutorHolder;
}
}
if (previousInfo.getKeepAlive() != null) {
defaultKeepAlive = previousInfo.getKeepAlive();
}
if (previousInfo.getMin() >= 0) {
defaultMin = previousInfo.getMin();
}
if (previousInfo.getMax() >= 0) {
defaultSize = previousInfo.getMax();
}
if (previousInfo.getQueueSize() != null) {
defaultQueueSize = previousInfo.getQueueSize();
}
if (previousInfo.getWaitTime() != null) {
defaultWaitTime = previousInfo.getWaitTime();
}
}
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive);
int min = settings.getAsInt("min", defaultMin);
int size = settings.getAsInt("max", settings.getAsInt("size", defaultSize));
SizeValue queueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultQueueSize)));
TimeValue waitTime = settings.getAsTime("wait_time", defaultWaitTime);
if (previousExecutorHolder != null) {
logger.debug("updating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, queueSize.singles(), keepAlive, waitTime);
} else {
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, queueSize.singles(), keepAlive, waitTime);
}
Executor executor = EsExecutors.newBlockingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, (int) queueSize.singles(), waitTime.millis(), TimeUnit.MILLISECONDS);
return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, queueSize, waitTime));
}
throw new ElasticSearchIllegalArgumentException("No type found [" + type + "], for [" + name + "]");
}
@ -496,16 +422,6 @@ public class ThreadPool extends AbstractComponent {
}
}
private RejectedExecutionHandler newRejectedExecutionHandler(String name, String rejectSetting) {
if ("abort".equals(rejectSetting)) {
return new EsAbortPolicy();
} else if ("caller".equals(rejectSetting)) {
return new ThreadPoolExecutor.CallerRunsPolicy();
} else {
throw new ElasticSearchIllegalArgumentException("reject_policy [" + rejectSetting + "] not valid for [" + name + "] thread pool");
}
}
class ExecutorShutdownListener implements EsThreadPoolExecutor.ShutdownListener {
private ExecutorHolder holder;
@ -640,8 +556,6 @@ public class ThreadPool extends AbstractComponent {
private int max;
private TimeValue keepAlive;
private SizeValue queueSize;
private TimeValue waitTime;
private String rejectSetting;
private String queueType;
Info() {
@ -660,19 +574,13 @@ public class ThreadPool extends AbstractComponent {
this(name, type, min, max, keepAlive, queueSize, null);
}
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize, @Nullable TimeValue waitTime) {
this(name, type, min, max, keepAlive, queueSize, waitTime, null, null);
}
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize, @Nullable TimeValue waitTime, String rejectSetting, String queueType) {
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize, String queueType) {
this.name = name;
this.type = type;
this.min = min;
this.max = max;
this.keepAlive = keepAlive;
this.queueSize = queueSize;
this.waitTime = waitTime;
this.rejectSetting = rejectSetting;
this.queueType = queueType;
}
@ -702,16 +610,6 @@ public class ThreadPool extends AbstractComponent {
return this.queueSize;
}
@Nullable
public TimeValue getWaitTime() {
return this.waitTime;
}
@Nullable
public String getRejectSetting() {
return this.rejectSetting;
}
@Nullable
public String getQueueType() {
return this.queueType;
@ -730,10 +628,8 @@ public class ThreadPool extends AbstractComponent {
if (in.readBoolean()) {
queueSize = SizeValue.readSizeValue(in);
}
if (in.readBoolean()) {
waitTime = TimeValue.readTimeValue(in);
}
rejectSetting = in.readOptionalString();
in.readBoolean(); // here to conform with removed waitTime
in.readBoolean(); // here to conform with removed rejected setting
queueType = in.readOptionalString();
}
@ -755,13 +651,8 @@ public class ThreadPool extends AbstractComponent {
out.writeBoolean(true);
queueSize.writeTo(out);
}
if (waitTime == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
waitTime.writeTo(out);
}
out.writeOptionalString(rejectSetting);
out.writeBoolean(false); // here to conform with remobed waitTime
out.writeBoolean(false); // here to conform with removed rejected setting
out.writeOptionalString(queueType);
}
@ -781,12 +672,6 @@ public class ThreadPool extends AbstractComponent {
if (queueSize != null) {
builder.field(Fields.QUEUE_SIZE, queueSize.toString());
}
if (waitTime != null) {
builder.field(Fields.WAIT_TIME, waitTime.toString());
}
if (rejectSetting != null) {
builder.field(Fields.REJECT_POLICY, rejectSetting);
}
if (queueType != null) {
builder.field(Fields.QUEUE_TYPE, queueType);
}
@ -800,8 +685,6 @@ public class ThreadPool extends AbstractComponent {
static final XContentBuilderString MAX = new XContentBuilderString("max");
static final XContentBuilderString KEEP_ALIVE = new XContentBuilderString("keep_alive");
static final XContentBuilderString QUEUE_SIZE = new XContentBuilderString("queue_size");
static final XContentBuilderString WAIT_TIME = new XContentBuilderString("wait_time");
static final XContentBuilderString REJECT_POLICY = new XContentBuilderString("reject_policy");
static final XContentBuilderString QUEUE_TYPE = new XContentBuilderString("queue_type");
}

View File

@ -21,7 +21,6 @@ import java.util.Map;
import java.util.concurrent.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
/**
@ -97,7 +96,6 @@ public class SimpleThreadPoolTests extends AbstractNodesTests {
for (ThreadPool.Info info : nodeInfo.getThreadPool()) {
if (info.getName().equals(Names.SEARCH)) {
assertThat(info.getType(), equalTo("fixed"));
assertThat(info.getRejectSetting(), equalTo("abort"));
assertThat(info.getQueueType(), equalTo("linked"));
found = true;
break;
@ -106,39 +104,8 @@ public class SimpleThreadPoolTests extends AbstractNodesTests {
assertThat(found, equalTo(true));
Map<String, Object> poolMap = getPoolSettingsThroughJson(nodeInfo.getThreadPool(), Names.SEARCH);
assertThat(poolMap.get("reject_policy").toString(), equalTo("abort"));
assertThat(poolMap.get("queue_type").toString(), equalTo("linked"));
}
client1.admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder()
.put("threadpool.search.type", "blocking")
.put("threadpool.search.wait_time", "10s")
.put("threadpool.search.keep_alive", "15s")
.put("threadpool.search.capacity", "100")
.build()).execute().actionGet();
Thread.sleep(200);
nodesInfoResponse = client2.admin().cluster().prepareNodesInfo().all().execute().actionGet();
for (int i = 0; i < 2; i++) {
NodeInfo nodeInfo = nodesInfoResponse.getNodes()[i];
boolean found = false;
for (ThreadPool.Info info : nodeInfo.getThreadPool()) {
if (info.getName().equals(Names.SEARCH)) {
assertThat(info.getType(), equalTo("blocking"));
assertThat(info.getQueueSize().singles(), equalTo(100L));
assertThat(info.getWaitTime().seconds(), equalTo(10L));
assertThat(info.getKeepAlive().seconds(), equalTo(15L));
found = true;
break;
}
}
assertThat(found, equalTo(true));
Map<String, Object> poolMap = getPoolSettingsThroughJson(nodeInfo.getThreadPool(), Names.SEARCH);
assertThat(poolMap.get("queue_size").toString(), equalTo("100"));
assertThat(poolMap.get("wait_time").toString(), equalTo("10s"));
assertThat(poolMap.get("keep_alive").toString(), equalTo("15s"));
}
}
private Map<String, Object> getPoolSettingsThroughJson(ThreadPoolInfo info, String poolName) throws IOException {

View File

@ -21,7 +21,6 @@ package org.elasticsearch.test.unit.common.util.concurrent;
import com.google.common.base.Predicate;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadBarrier;
import org.elasticsearch.test.integration.ElasticsearchTestCase;
import org.junit.Test;
@ -47,7 +46,7 @@ public class EsExecutorsTests extends ElasticsearchTestCase {
final int max = between(min + 1, 6);
final ThreadBarrier barrier = new ThreadBarrier(max + 1);
ThreadPoolExecutor pool = EsExecutors.newScalingExecutorService(min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test"));
ThreadPoolExecutor pool = EsExecutors.newScaling(min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test"));
assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
@ -83,7 +82,7 @@ public class EsExecutorsTests extends ElasticsearchTestCase {
final int max = between(min + 1, 6);
final ThreadBarrier barrier = new ThreadBarrier(max + 1);
final ThreadPoolExecutor pool = EsExecutors.newScalingExecutorService(min, max, between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test"));
final ThreadPoolExecutor pool = EsExecutors.newScaling(min, max, between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test"));
assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
@ -122,62 +121,4 @@ public class EsExecutorsTests extends ElasticsearchTestCase {
assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max));
pool.shutdown();
}
@Test
public void testBlocking() throws Exception {
final int min = between(1, 3);
final int max = between(min+1, 6);
final long waitTime = between(1000, 2000); //1 second
final ThreadBarrier barrier = new ThreadBarrier(max + 1);
ThreadPoolExecutor pool = EsExecutors.newBlockingExecutorService(min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test"), 1, waitTime, TimeUnit.MILLISECONDS);
assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
for (int i = 0; i < max; ++i) {
final CountDownLatch latch = new CountDownLatch(1);
pool.execute(new Runnable() {
public void run() {
latch.countDown();
try {
barrier.await();
barrier.await();
} catch (Throwable e) {
barrier.reset(e);
}
}
});
//wait until thread executes this task
//otherwise, a task might be queued
latch.await();
}
barrier.await();
assertThat("wrong pool size", pool.getPoolSize(), equalTo(max));
assertThat("wrong active size", pool.getActiveCount(), equalTo(max));
//Queue should be empty, lets occupy it's only free space
assertThat("queue isn't empty", pool.getQueue().size(), equalTo(0));
pool.execute(new Runnable() {
public void run() {
//dummy task
}
});
assertThat("queue isn't full", pool.getQueue().size(), equalTo(1));
//request should block since queue is full
try {
pool.execute(new Runnable() {
public void run() {
//dummy task
}
});
assertThat("Should have thrown RejectedExecutionException", false, equalTo(true));
} catch (EsRejectedExecutionException e) {
//caught expected exception
}
barrier.await();
pool.shutdown();
}
}

View File

@ -58,7 +58,7 @@ public class PrioritizedExecutorsTests {
@Test
public void testSubmitPrioritizedExecutorWithRunnables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizingThreadExecutor(Executors.defaultThreadFactory());
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
List<Integer> results = new ArrayList<Integer>(7);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(7);
@ -85,7 +85,7 @@ public class PrioritizedExecutorsTests {
@Test
public void testExecutePrioritizedExecutorWithRunnables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizingThreadExecutor(Executors.defaultThreadFactory());
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
List<Integer> results = new ArrayList<Integer>(7);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(7);
@ -112,7 +112,7 @@ public class PrioritizedExecutorsTests {
@Test
public void testSubmitPrioritizedExecutorWithCallables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizingThreadExecutor(Executors.defaultThreadFactory());
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
List<Integer> results = new ArrayList<Integer>(7);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(7);
@ -139,7 +139,7 @@ public class PrioritizedExecutorsTests {
@Test
public void testSubmitPrioritizedExecutorWithMixed() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizingThreadExecutor(Executors.defaultThreadFactory());
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
List<Integer> results = new ArrayList<Integer>(7);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(7);
@ -167,7 +167,7 @@ public class PrioritizedExecutorsTests {
@Test
public void testTimeout() throws Exception {
ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizingThreadExecutor(Executors.defaultThreadFactory());
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
final CountDownLatch block = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override

View File

@ -21,7 +21,6 @@ package org.elasticsearch.test.unit.threadpool;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
@ -102,7 +101,6 @@ public class UpdateThreadPoolSettingsTests {
@Test
public void testFixedExecutorType() {
ThreadPool threadPool = new ThreadPool(settingsBuilder().put("threadpool.search.type", "fixed").build(), null);
assertThat(info(threadPool, Names.SEARCH).getRejectSetting(), equalTo("abort"));
assertThat(info(threadPool, Names.SEARCH).getQueueType(), equalTo("linked"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
@ -168,17 +166,6 @@ public class UpdateThreadPoolSettingsTests {
assertThat(info(threadPool, Names.SEARCH).getQueueType(), equalTo("array"));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue(), instanceOf(ArrayBlockingQueue.class));
// Change rejection policy
oldExecutor = threadPool.executor(Names.SEARCH);
assertThat(info(threadPool, Names.SEARCH).getRejectSetting(), equalTo("abort"));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getRejectedExecutionHandler(), instanceOf(EsAbortPolicy.class));
threadPool.updateSettings(settingsBuilder().put("threadpool.search.reject_policy", "caller").build());
// Make sure rejection handler changed
assertThat(info(threadPool, Names.SEARCH).getRejectSetting(), equalTo("caller"));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getRejectedExecutionHandler(), instanceOf(ThreadPoolExecutor.CallerRunsPolicy.class));
// Make sure executor didn't change
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
threadPool.shutdown();
}
@ -215,73 +202,6 @@ public class UpdateThreadPoolSettingsTests {
threadPool.shutdown();
}
@Test
public void testBlockingExecutorType() {
ThreadPool threadPool = new ThreadPool(settingsBuilder().put("threadpool.search.type", "blocking").put("threadpool.search.size", "10").build(), null);
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(1));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).getQueueSize().singles(), equalTo(1000L));
assertThat(info(threadPool, Names.SEARCH).getWaitTime().minutes(), equalTo(1L));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Replace with different type
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.keep_alive", "10m")
.put("threadpool.search.min", "2")
.put("threadpool.search.size", "15")
.build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
// Put old type back
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "blocking")
.build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("blocking"));
// Make sure keep alive value is not used
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
// Make sure keep pool size value were reused
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
// Change size
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").put("threadpool.search.min", "5").build());
// Make sure size values changed
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(5));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(5));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(10));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("blocking"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
// Change queue capacity
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.queue_size", "500")
.build());
assertThat(info(threadPool, Names.SEARCH).getQueueSize().singles(), equalTo(500L));
// Change wait time capacity
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.wait_time", "2m")
.build());
assertThat(info(threadPool, Names.SEARCH).getWaitTime().minutes(), equalTo(2L));
threadPool.shutdown();
}
@Test(timeout = 10000)
public void testShutdownDownNowDoesntBlock() throws Exception {
ThreadPool threadPool = new ThreadPool(ImmutableSettings.settingsBuilder().put("threadpool.search.type", "cached").build(), null);