Thread pool: rename `capacity` to `queue_size`

fixes #3161
This commit is contained in:
Shay Banon 2013-06-11 13:06:59 +02:00
parent 7afffbe13b
commit 41e4ee22e6
3 changed files with 131 additions and 172 deletions

View File

@ -139,7 +139,7 @@ public class ThreadPool extends AbstractComponent {
public ThreadPoolInfo info() {
List<Info> infos = new ArrayList<Info>();
for (ExecutorHolder holder : executors.values()) {
String name = holder.info.name();
String name = holder.info.getName();
// no need to have info on "same" thread pool
if ("same".equals(name)) {
continue;
@ -152,7 +152,7 @@ public class ThreadPool extends AbstractComponent {
public ThreadPoolStats stats() {
List<ThreadPoolStats.Stats> stats = new ArrayList<ThreadPoolStats.Stats>();
for (ExecutorHolder holder : executors.values()) {
String name = holder.info.name();
String name = holder.info.getName();
// no need to have info on "same" thread pool
if ("same".equals(name)) {
continue;
@ -258,7 +258,7 @@ public class ThreadPool extends AbstractComponent {
settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
}
Info previousInfo = previousExecutorHolder != null ? previousExecutorHolder.info : null;
String type = settings.get("type", previousInfo != null ? previousInfo.type() : defaultSettings.get("type"));
String type = settings.get("type", previousInfo != null ? previousInfo.getType() : defaultSettings.get("type"));
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name);
if ("same".equals(type)) {
if (previousExecutorHolder != null) {
@ -270,17 +270,17 @@ public class ThreadPool extends AbstractComponent {
} else if ("cached".equals(type)) {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
if (previousExecutorHolder != null) {
if ("cached".equals(previousInfo.type())) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.keepAlive());
if (!previousInfo.keepAlive().equals(updatedKeepAlive)) {
if ("cached".equals(previousInfo.getType())) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, -1, -1, updatedKeepAlive, null));
}
return previousExecutorHolder;
}
if (previousInfo.keepAlive() != null) {
defaultKeepAlive = previousInfo.keepAlive();
if (previousInfo.getKeepAlive() != null) {
defaultKeepAlive = previousInfo.getKeepAlive();
}
}
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive);
@ -296,55 +296,55 @@ public class ThreadPool extends AbstractComponent {
return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null));
} else if ("fixed".equals(type)) {
int defaultSize = defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5);
SizeValue defaultCapacity = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null));
SizeValue defaultQueueSize = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null));
String defaultRejectSetting = defaultSettings.get("reject_policy", "abort");
String defaultQueueType = defaultSettings.get("queue_type", "linked");
if (previousExecutorHolder != null) {
if ("fixed".equals(previousInfo.type())) {
SizeValue updatedCapacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", previousInfo.capacity())));
String updatedQueueType = settings.get("queue_type", previousInfo.queueType());
if (Objects.equal(previousInfo.capacity(), updatedCapacity) && previousInfo.queueType().equals(updatedQueueType)) {
int updatedSize = settings.getAsInt("size", previousInfo.max());
String updatedRejectSetting = settings.get("reject_policy", previousInfo.rejectSetting());
if (previousInfo.max() != updatedSize) {
logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, updatedSize, updatedCapacity, updatedRejectSetting, updatedQueueType);
if ("fixed".equals(previousInfo.getType())) {
SizeValue updatedQueueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", previousInfo.getQueueSize())));
String updatedQueueType = settings.get("queue_type", previousInfo.getQueueType());
if (Objects.equal(previousInfo.getQueueSize(), updatedQueueSize) && previousInfo.getQueueType().equals(updatedQueueType)) {
int updatedSize = settings.getAsInt("size", previousInfo.getMax());
String updatedRejectSetting = settings.get("reject_policy", previousInfo.getRejectSetting());
if (previousInfo.getMax() != updatedSize) {
logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, updatedSize, updatedQueueSize, updatedRejectSetting, updatedQueueType);
((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedSize);
((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize);
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedCapacity, null, updatedRejectSetting, updatedQueueType));
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize, null, updatedRejectSetting, updatedQueueType));
}
if (!previousInfo.rejectSetting().equals(updatedRejectSetting)) {
logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, updatedSize, updatedCapacity, updatedRejectSetting, updatedQueueType);
if (!previousInfo.getRejectSetting().equals(updatedRejectSetting)) {
logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, updatedSize, updatedQueueSize, updatedRejectSetting, updatedQueueType);
((EsThreadPoolExecutor) previousExecutorHolder.executor).setRejectedExecutionHandler(newRejectedExecutionHandler(name, updatedRejectSetting));
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedCapacity, null, updatedRejectSetting, updatedQueueType));
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize, null, updatedRejectSetting, updatedQueueType));
}
return previousExecutorHolder;
}
}
if (previousInfo.max() >= 0) {
defaultSize = previousInfo.max();
if (previousInfo.getMax() >= 0) {
defaultSize = previousInfo.getMax();
}
defaultCapacity = previousInfo.capacity();
defaultQueueSize = previousInfo.getQueueSize();
if (previousInfo.rejectSetting != null) {
defaultRejectSetting = previousInfo.rejectSetting;
}
if (previousInfo.queueType() != null) {
defaultQueueType = previousInfo.queueType();
if (previousInfo.getQueueType() != null) {
defaultQueueType = previousInfo.getQueueType();
}
}
int size = settings.getAsInt("size", defaultSize);
SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultCapacity)));
SizeValue queueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultQueueSize)));
String rejectSetting = settings.get("reject_policy", defaultRejectSetting);
RejectedExecutionHandler rejectedExecutionHandler = newRejectedExecutionHandler(name, rejectSetting);
String queueType = settings.get("queue_type", defaultQueueType);
BlockingQueue<Runnable> workQueue = newQueue(capacity, queueType);
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, size, capacity, rejectSetting, queueType);
BlockingQueue<Runnable> workQueue = newQueue(queueSize, queueType);
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, size, queueSize, rejectSetting, queueType);
Executor executor = new EsThreadPoolExecutor(size, size,
0L, TimeUnit.MILLISECONDS,
workQueue,
threadFactory, rejectedExecutionHandler);
return new ExecutorHolder(executor, new Info(name, type, size, size, null, capacity, null, rejectSetting, queueType));
return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize, null, rejectSetting, queueType));
} else if ("scaling".equals(type)) {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
int defaultMin = defaultSettings.getAsInt("min", 1);
@ -354,7 +354,7 @@ public class ThreadPool extends AbstractComponent {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
int updatedMin = settings.getAsInt("min", previousInfo.getMin());
int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax()));
if (!previousInfo.keepAlive().equals(updatedKeepAlive) || previousInfo.min() != updatedMin || previousInfo.max() != updatedSize) {
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || previousInfo.getMin() != updatedMin || previousInfo.getMax() != updatedSize) {
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
@ -393,17 +393,17 @@ public class ThreadPool extends AbstractComponent {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
int defaultMin = defaultSettings.getAsInt("min", 1);
int defaultSize = defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5);
SizeValue defaultCapacity = defaultSettings.getAsSize("queue_size", new SizeValue(1000));
SizeValue defaultQueueSize = defaultSettings.getAsSize("queue_size", new SizeValue(1000));
TimeValue defaultWaitTime = defaultSettings.getAsTime("wait_time", timeValueSeconds(60));
if (previousExecutorHolder != null) {
if ("blocking".equals(previousInfo.getType())) {
SizeValue updatedCapacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultCapacity)));
SizeValue updatedQueueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultQueueSize)));
TimeValue updatedWaitTime = settings.getAsTime("wait_time", defaultWaitTime);
if (previousInfo.capacity().equals(updatedCapacity) && previousInfo.waitTime().equals(updatedWaitTime)) {
if (previousInfo.getQueueSize().equals(updatedQueueSize) && previousInfo.getWaitTime().equals(updatedWaitTime)) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
int updatedMin = settings.getAsInt("min", previousInfo.getMin());
int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax()));
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || !previousInfo.waitTime().equals(settings.getAsTime("wait_time", defaultWaitTime)) ||
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || !previousInfo.getWaitTime().equals(settings.getAsTime("wait_time", defaultWaitTime)) ||
previousInfo.getMin() != updatedMin || previousInfo.getMax() != updatedSize) {
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
@ -415,7 +415,7 @@ public class ThreadPool extends AbstractComponent {
if (previousInfo.getMax() != updatedSize) {
((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize);
}
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, updatedCapacity, updatedWaitTime));
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, updatedQueueSize, updatedWaitTime));
}
return previousExecutorHolder;
}
@ -429,25 +429,25 @@ public class ThreadPool extends AbstractComponent {
if (previousInfo.getMax() >= 0) {
defaultSize = previousInfo.getMax();
}
if (previousInfo.getCapacity() != null) {
defaultCapacity = previousInfo.getCapacity();
if (previousInfo.getQueueSize() != null) {
defaultQueueSize = previousInfo.getQueueSize();
}
if (previousInfo.waitTime() != null) {
defaultWaitTime = previousInfo.getKeepAlive();
if (previousInfo.getWaitTime() != null) {
defaultWaitTime = previousInfo.getWaitTime();
}
}
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive);
int min = settings.getAsInt("min", defaultMin);
int size = settings.getAsInt("max", settings.getAsInt("size", defaultSize));
SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultCapacity)));
SizeValue queueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultQueueSize)));
TimeValue waitTime = settings.getAsTime("wait_time", defaultWaitTime);
if (previousExecutorHolder != null) {
logger.debug("updating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, capacity.singles(), keepAlive, waitTime);
logger.debug("updating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, queueSize.singles(), keepAlive, waitTime);
} else {
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, capacity.singles(), keepAlive, waitTime);
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, queueSize.singles(), keepAlive, waitTime);
}
Executor executor = EsExecutors.newBlockingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, (int) capacity.singles(), waitTime.millis(), TimeUnit.MILLISECONDS);
return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, capacity, waitTime));
Executor executor = EsExecutors.newBlockingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, (int) queueSize.singles(), waitTime.millis(), TimeUnit.MILLISECONDS);
return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, queueSize, waitTime));
}
throw new ElasticSearchIllegalArgumentException("No type found [" + type + "], for [" + name + "]");
}
@ -476,14 +476,14 @@ public class ThreadPool extends AbstractComponent {
}
}
private BlockingQueue<Runnable> newQueue(SizeValue capacity, String queueType) {
if (capacity == null) {
private BlockingQueue<Runnable> newQueue(SizeValue queueSize, String queueType) {
if (queueSize == null) {
return ConcurrentCollections.newBlockingQueue();
} else if ((int) capacity.singles() > 0) {
} else if ((int) queueSize.singles() > 0) {
if ("linked".equals(queueType)) {
return new LinkedBlockingQueue<Runnable>((int) capacity.singles());
return new LinkedBlockingQueue<Runnable>((int) queueSize.singles());
} else if ("array".equals(queueType)) {
return new ArrayBlockingQueue<Runnable>((int) capacity.singles());
return new ArrayBlockingQueue<Runnable>((int) queueSize.singles());
} else {
throw new ElasticSearchIllegalArgumentException("illegal queue_type set to [" + queueType + "], should be either linked or array");
}
@ -635,7 +635,7 @@ public class ThreadPool extends AbstractComponent {
private int min;
private int max;
private TimeValue keepAlive;
private SizeValue capacity;
private SizeValue queueSize;
private TimeValue waitTime;
private String rejectSetting;
private String queueType;
@ -652,81 +652,50 @@ public class ThreadPool extends AbstractComponent {
this(name, type, size, size, null, null);
}
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity) {
this(name, type, min, max, keepAlive, capacity, null);
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize) {
this(name, type, min, max, keepAlive, queueSize, null);
}
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity, @Nullable TimeValue waitTime) {
this(name, type, min, max, keepAlive, capacity, waitTime, null, null);
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize, @Nullable TimeValue waitTime) {
this(name, type, min, max, keepAlive, queueSize, waitTime, null, null);
}
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity, @Nullable TimeValue waitTime, String rejectSetting, String queueType) {
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize, @Nullable TimeValue waitTime, String rejectSetting, String queueType) {
this.name = name;
this.type = type;
this.min = min;
this.max = max;
this.keepAlive = keepAlive;
this.capacity = capacity;
this.queueSize = queueSize;
this.waitTime = waitTime;
this.rejectSetting = rejectSetting;
this.queueType = queueType;
}
public String name() {
return this.name;
}
public String getName() {
return this.name;
}
public String type() {
return this.type;
}
public String getType() {
return this.type;
}
public int min() {
return this.min;
}
public int getMin() {
return this.min;
}
public int max() {
return this.max;
}
public int getMax() {
return this.max;
}
@Nullable
public TimeValue keepAlive() {
return this.keepAlive;
}
@Nullable
public TimeValue getKeepAlive() {
return this.keepAlive;
}
@Nullable
public SizeValue capacity() {
return this.capacity;
}
@Nullable
public SizeValue getCapacity() {
return this.capacity;
}
@Nullable
public TimeValue waitTime() {
return this.waitTime;
public SizeValue getQueueSize() {
return this.queueSize;
}
@Nullable
@ -734,21 +703,11 @@ public class ThreadPool extends AbstractComponent {
return this.waitTime;
}
@Nullable
public String rejectSetting() {
return this.rejectSetting;
}
@Nullable
public String getRejectSetting() {
return this.rejectSetting;
}
@Nullable
public String queueType() {
return this.queueType;
}
@Nullable
public String getQueueType() {
return this.queueType;
@ -765,7 +724,7 @@ public class ThreadPool extends AbstractComponent {
keepAlive = TimeValue.readTimeValue(in);
}
if (in.readBoolean()) {
capacity = SizeValue.readSizeValue(in);
queueSize = SizeValue.readSizeValue(in);
}
if (in.readBoolean()) {
waitTime = TimeValue.readTimeValue(in);
@ -786,11 +745,11 @@ public class ThreadPool extends AbstractComponent {
out.writeBoolean(true);
keepAlive.writeTo(out);
}
if (capacity == null) {
if (queueSize == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
capacity.writeTo(out);
queueSize.writeTo(out);
}
if (waitTime == null) {
out.writeBoolean(false);
@ -815,8 +774,8 @@ public class ThreadPool extends AbstractComponent {
if (keepAlive != null) {
builder.field(Fields.KEEP_ALIVE, keepAlive.toString());
}
if (capacity != null) {
builder.field(Fields.CAPACITY, capacity.toString());
if (queueSize != null) {
builder.field(Fields.QUEUE_SIZE, queueSize.toString());
}
if (waitTime != null) {
builder.field(Fields.WAIT_TIME, waitTime.toString());
@ -836,7 +795,7 @@ public class ThreadPool extends AbstractComponent {
static final XContentBuilderString MIN = new XContentBuilderString("min");
static final XContentBuilderString MAX = new XContentBuilderString("max");
static final XContentBuilderString KEEP_ALIVE = new XContentBuilderString("keep_alive");
static final XContentBuilderString CAPACITY = new XContentBuilderString("capacity");
static final XContentBuilderString QUEUE_SIZE = new XContentBuilderString("queue_size");
static final XContentBuilderString WAIT_TIME = new XContentBuilderString("wait_time");
static final XContentBuilderString REJECT_POLICY = new XContentBuilderString("reject_policy");
static final XContentBuilderString QUEUE_TYPE = new XContentBuilderString("queue_type");

View File

@ -103,10 +103,10 @@ public class SimpleThreadPoolTests extends AbstractNodesTests {
NodeInfo nodeInfo = nodesInfoResponse.getNodes()[i];
boolean found = false;
for (ThreadPool.Info info : nodeInfo.getThreadPool()) {
if (info.name().equals(Names.SEARCH)) {
assertThat(info.type(), equalTo("fixed"));
assertThat(info.rejectSetting(), equalTo("abort"));
assertThat(info.queueType(), equalTo("linked"));
if (info.getName().equals(Names.SEARCH)) {
assertThat(info.getType(), equalTo("fixed"));
assertThat(info.getRejectSetting(), equalTo("abort"));
assertThat(info.getQueueType(), equalTo("linked"));
found = true;
break;
}
@ -130,11 +130,11 @@ public class SimpleThreadPoolTests extends AbstractNodesTests {
NodeInfo nodeInfo = nodesInfoResponse.getNodes()[i];
boolean found = false;
for (ThreadPool.Info info : nodeInfo.getThreadPool()) {
if (info.name().equals(Names.SEARCH)) {
assertThat(info.type(), equalTo("blocking"));
assertThat(info.capacity().singles(), equalTo(100L));
assertThat(info.waitTime().seconds(), equalTo(10L));
assertThat(info.keepAlive().seconds(), equalTo(15L));
if (info.getName().equals(Names.SEARCH)) {
assertThat(info.getType(), equalTo("blocking"));
assertThat(info.getQueueSize().singles(), equalTo(100L));
assertThat(info.getWaitTime().seconds(), equalTo(10L));
assertThat(info.getKeepAlive().seconds(), equalTo(15L));
found = true;
break;
}
@ -142,7 +142,7 @@ public class SimpleThreadPoolTests extends AbstractNodesTests {
assertThat(found, equalTo(true));
Map<String, Object> poolMap = getPoolSettingsThroughJson(nodeInfo.getThreadPool(), Names.SEARCH);
assertThat(poolMap.get("capacity").toString(), equalTo("100"));
assertThat(poolMap.get("queue_size").toString(), equalTo("100"));
assertThat(poolMap.get("wait_time").toString(), equalTo("10s"));
assertThat(poolMap.get("keep_alive").toString(), equalTo("15s"));
}

View File

@ -39,7 +39,7 @@ public class UpdateThreadPoolSettingsTests {
private ThreadPool.Info info(ThreadPool threadPool, String name) {
for (ThreadPool.Info info : threadPool.info()) {
if (info.name().equals(name)) {
if (info.getName().equals(name)) {
return info;
}
}
@ -49,13 +49,13 @@ public class UpdateThreadPoolSettingsTests {
@Test
public void testCachedExecutorType() {
ThreadPool threadPool = new ThreadPool(ImmutableSettings.settingsBuilder().put("threadpool.search.type", "cached").build(), null);
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached"));
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(5L));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Replace with different type
threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "same").build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("same"));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("same"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(ListeningExecutorService.class));
// Replace with different type again
@ -63,37 +63,37 @@ public class UpdateThreadPoolSettingsTests {
.put("threadpool.search.type", "scaling")
.put("threadpool.search.keep_alive", "10m")
.build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling"));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(1));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
// Put old type back
threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "cached").build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached"));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
// Make sure keep alive value reused
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Change keep alive
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build());
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(1L));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached"));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
// Set the same keep alive
threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build());
// Make sure keep alive value didn't change
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(1L));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached"));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
threadPool.shutdown();
@ -102,8 +102,8 @@ public class UpdateThreadPoolSettingsTests {
@Test
public void testFixedExecutorType() {
ThreadPool threadPool = new ThreadPool(settingsBuilder().put("threadpool.search.type", "fixed").build(), null);
assertThat(info(threadPool, Names.SEARCH).rejectSetting(), equalTo("abort"));
assertThat(info(threadPool, Names.SEARCH).queueType(), equalTo("linked"));
assertThat(info(threadPool, Names.SEARCH).getRejectSetting(), equalTo("abort"));
assertThat(info(threadPool, Names.SEARCH).getQueueType(), equalTo("linked"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Replace with different type
@ -113,26 +113,26 @@ public class UpdateThreadPoolSettingsTests {
.put("threadpool.search.min", "2")
.put("threadpool.search.size", "15")
.build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling"));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
// Put old type back
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "fixed")
.build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("fixed"));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed"));
// Make sure keep alive value is not used
assertThat(info(threadPool, Names.SEARCH).keepAlive(), nullValue());
assertThat(info(threadPool, Names.SEARCH).getKeepAlive(), nullValue());
// Make sure keep pool size value were reused
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
@ -141,19 +141,19 @@ public class UpdateThreadPoolSettingsTests {
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").build());
// Make sure size values changed
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(10));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("fixed"));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
// Change queue capacity
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.queue", "500")
.build());
assertThat(info(threadPool, Names.SEARCH).queueType(), equalTo("linked"));
assertThat(info(threadPool, Names.SEARCH).getQueueType(), equalTo("linked"));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue(), instanceOf(LinkedBlockingQueue.class));
// Set different queue and size type
@ -162,19 +162,19 @@ public class UpdateThreadPoolSettingsTests {
.put("threadpool.search.size", "12")
.build());
// Make sure keep size changed
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("fixed"));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(12));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed"));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(12));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(12));
assertThat(info(threadPool, Names.SEARCH).queueType(), equalTo("array"));
assertThat(info(threadPool, Names.SEARCH).getQueueType(), equalTo("array"));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue(), instanceOf(ArrayBlockingQueue.class));
// Change rejection policy
oldExecutor = threadPool.executor(Names.SEARCH);
assertThat(info(threadPool, Names.SEARCH).rejectSetting(), equalTo("abort"));
assertThat(info(threadPool, Names.SEARCH).getRejectSetting(), equalTo("abort"));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getRejectedExecutionHandler(), instanceOf(EsAbortPolicy.class));
threadPool.updateSettings(settingsBuilder().put("threadpool.search.reject_policy", "caller").build());
// Make sure rejection handler changed
assertThat(info(threadPool, Names.SEARCH).rejectSetting(), equalTo("caller"));
assertThat(info(threadPool, Names.SEARCH).getRejectSetting(), equalTo("caller"));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getRejectedExecutionHandler(), instanceOf(ThreadPoolExecutor.CallerRunsPolicy.class));
// Make sure executor didn't change
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
@ -187,10 +187,10 @@ public class UpdateThreadPoolSettingsTests {
public void testScalingExecutorType() {
ThreadPool threadPool = new ThreadPool(
settingsBuilder().put("threadpool.search.type", "scaling").put("threadpool.search.size", 10).build(), null);
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(1));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(5L));
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling"));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(1));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Change settings that doesn't require pool replacement
@ -201,14 +201,14 @@ public class UpdateThreadPoolSettingsTests {
.put("threadpool.search.min", "2")
.put("threadpool.search.size", "15")
.build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling"));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
@ -218,10 +218,10 @@ public class UpdateThreadPoolSettingsTests {
@Test
public void testBlockingExecutorType() {
ThreadPool threadPool = new ThreadPool(settingsBuilder().put("threadpool.search.type", "blocking").put("threadpool.search.size", "10").build(), null);
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(1));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).capacity().singles(), equalTo(1000L));
assertThat(info(threadPool, Names.SEARCH).waitTime().minutes(), equalTo(1L));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(1));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).getQueueSize().singles(), equalTo(1000L));
assertThat(info(threadPool, Names.SEARCH).getWaitTime().minutes(), equalTo(1L));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Replace with different type
@ -231,26 +231,26 @@ public class UpdateThreadPoolSettingsTests {
.put("threadpool.search.min", "2")
.put("threadpool.search.size", "15")
.build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling"));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
// Put old type back
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "blocking")
.build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("blocking"));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("blocking"));
// Make sure keep alive value is not used
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
// Make sure keep pool size value were reused
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
@ -259,25 +259,25 @@ public class UpdateThreadPoolSettingsTests {
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").put("threadpool.search.min", "5").build());
// Make sure size values changed
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(5));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(5));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(5));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(10));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("blocking"));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("blocking"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
// Change queue capacity
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.queue_size", "500")
.build());
assertThat(info(threadPool, Names.SEARCH).capacity().singles(), equalTo(500L));
assertThat(info(threadPool, Names.SEARCH).getQueueSize().singles(), equalTo(500L));
// Change wait time capacity
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.wait_time", "2m")
.build());
assertThat(info(threadPool, Names.SEARCH).waitTime().minutes(), equalTo(2L));
assertThat(info(threadPool, Names.SEARCH).getWaitTime().minutes(), equalTo(2L));
threadPool.shutdown();
}