From 41e4ee22e68a312ab58eb9b2c17382a516a88289 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 11 Jun 2013 13:06:59 +0200 Subject: [PATCH] Thread pool: rename `capacity` to `queue_size` fixes #3161 --- .../elasticsearch/threadpool/ThreadPool.java | 171 +++++++----------- .../threadpool/SimpleThreadPoolTests.java | 20 +- .../UpdateThreadPoolSettingsTests.java | 112 ++++++------ 3 files changed, 131 insertions(+), 172 deletions(-) diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 2cb4645ca32..58e0d8b0c7f 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -139,7 +139,7 @@ public class ThreadPool extends AbstractComponent { public ThreadPoolInfo info() { List infos = new ArrayList(); for (ExecutorHolder holder : executors.values()) { - String name = holder.info.name(); + String name = holder.info.getName(); // no need to have info on "same" thread pool if ("same".equals(name)) { continue; @@ -152,7 +152,7 @@ public class ThreadPool extends AbstractComponent { public ThreadPoolStats stats() { List stats = new ArrayList(); for (ExecutorHolder holder : executors.values()) { - String name = holder.info.name(); + String name = holder.info.getName(); // no need to have info on "same" thread pool if ("same".equals(name)) { continue; @@ -258,7 +258,7 @@ public class ThreadPool extends AbstractComponent { settings = ImmutableSettings.Builder.EMPTY_SETTINGS; } Info previousInfo = previousExecutorHolder != null ? previousExecutorHolder.info : null; - String type = settings.get("type", previousInfo != null ? previousInfo.type() : defaultSettings.get("type")); + String type = settings.get("type", previousInfo != null ? previousInfo.getType() : defaultSettings.get("type")); ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name); if ("same".equals(type)) { if (previousExecutorHolder != null) { @@ -270,17 +270,17 @@ public class ThreadPool extends AbstractComponent { } else if ("cached".equals(type)) { TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); if (previousExecutorHolder != null) { - if ("cached".equals(previousInfo.type())) { - TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.keepAlive()); - if (!previousInfo.keepAlive().equals(updatedKeepAlive)) { + if ("cached".equals(previousInfo.getType())) { + TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); + if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) { logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); ((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, -1, -1, updatedKeepAlive, null)); } return previousExecutorHolder; } - if (previousInfo.keepAlive() != null) { - defaultKeepAlive = previousInfo.keepAlive(); + if (previousInfo.getKeepAlive() != null) { + defaultKeepAlive = previousInfo.getKeepAlive(); } } TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive); @@ -296,55 +296,55 @@ public class ThreadPool extends AbstractComponent { return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null)); } else if ("fixed".equals(type)) { int defaultSize = defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5); - SizeValue defaultCapacity = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null)); + SizeValue defaultQueueSize = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null)); String defaultRejectSetting = defaultSettings.get("reject_policy", "abort"); String defaultQueueType = defaultSettings.get("queue_type", "linked"); if (previousExecutorHolder != null) { - if ("fixed".equals(previousInfo.type())) { - SizeValue updatedCapacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", previousInfo.capacity()))); - String updatedQueueType = settings.get("queue_type", previousInfo.queueType()); - if (Objects.equal(previousInfo.capacity(), updatedCapacity) && previousInfo.queueType().equals(updatedQueueType)) { - int updatedSize = settings.getAsInt("size", previousInfo.max()); - String updatedRejectSetting = settings.get("reject_policy", previousInfo.rejectSetting()); - if (previousInfo.max() != updatedSize) { - logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, updatedSize, updatedCapacity, updatedRejectSetting, updatedQueueType); + if ("fixed".equals(previousInfo.getType())) { + SizeValue updatedQueueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", previousInfo.getQueueSize()))); + String updatedQueueType = settings.get("queue_type", previousInfo.getQueueType()); + if (Objects.equal(previousInfo.getQueueSize(), updatedQueueSize) && previousInfo.getQueueType().equals(updatedQueueType)) { + int updatedSize = settings.getAsInt("size", previousInfo.getMax()); + String updatedRejectSetting = settings.get("reject_policy", previousInfo.getRejectSetting()); + if (previousInfo.getMax() != updatedSize) { + logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, updatedSize, updatedQueueSize, updatedRejectSetting, updatedQueueType); ((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedSize); ((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize); - return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedCapacity, null, updatedRejectSetting, updatedQueueType)); + return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize, null, updatedRejectSetting, updatedQueueType)); } - if (!previousInfo.rejectSetting().equals(updatedRejectSetting)) { - logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, updatedSize, updatedCapacity, updatedRejectSetting, updatedQueueType); + if (!previousInfo.getRejectSetting().equals(updatedRejectSetting)) { + logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, updatedSize, updatedQueueSize, updatedRejectSetting, updatedQueueType); ((EsThreadPoolExecutor) previousExecutorHolder.executor).setRejectedExecutionHandler(newRejectedExecutionHandler(name, updatedRejectSetting)); - return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedCapacity, null, updatedRejectSetting, updatedQueueType)); + return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize, null, updatedRejectSetting, updatedQueueType)); } return previousExecutorHolder; } } - if (previousInfo.max() >= 0) { - defaultSize = previousInfo.max(); + if (previousInfo.getMax() >= 0) { + defaultSize = previousInfo.getMax(); } - defaultCapacity = previousInfo.capacity(); + defaultQueueSize = previousInfo.getQueueSize(); if (previousInfo.rejectSetting != null) { defaultRejectSetting = previousInfo.rejectSetting; } - if (previousInfo.queueType() != null) { - defaultQueueType = previousInfo.queueType(); + if (previousInfo.getQueueType() != null) { + defaultQueueType = previousInfo.getQueueType(); } } int size = settings.getAsInt("size", defaultSize); - SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultCapacity))); + SizeValue queueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultQueueSize))); String rejectSetting = settings.get("reject_policy", defaultRejectSetting); RejectedExecutionHandler rejectedExecutionHandler = newRejectedExecutionHandler(name, rejectSetting); String queueType = settings.get("queue_type", defaultQueueType); - BlockingQueue workQueue = newQueue(capacity, queueType); - logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, size, capacity, rejectSetting, queueType); + BlockingQueue workQueue = newQueue(queueSize, queueType); + logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, size, queueSize, rejectSetting, queueType); Executor executor = new EsThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory, rejectedExecutionHandler); - return new ExecutorHolder(executor, new Info(name, type, size, size, null, capacity, null, rejectSetting, queueType)); + return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize, null, rejectSetting, queueType)); } else if ("scaling".equals(type)) { TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); int defaultMin = defaultSettings.getAsInt("min", 1); @@ -354,7 +354,7 @@ public class ThreadPool extends AbstractComponent { TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); int updatedMin = settings.getAsInt("min", previousInfo.getMin()); int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax())); - if (!previousInfo.keepAlive().equals(updatedKeepAlive) || previousInfo.min() != updatedMin || previousInfo.max() != updatedSize) { + if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || previousInfo.getMin() != updatedMin || previousInfo.getMax() != updatedSize) { logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) { ((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); @@ -393,17 +393,17 @@ public class ThreadPool extends AbstractComponent { TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); int defaultMin = defaultSettings.getAsInt("min", 1); int defaultSize = defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5); - SizeValue defaultCapacity = defaultSettings.getAsSize("queue_size", new SizeValue(1000)); + SizeValue defaultQueueSize = defaultSettings.getAsSize("queue_size", new SizeValue(1000)); TimeValue defaultWaitTime = defaultSettings.getAsTime("wait_time", timeValueSeconds(60)); if (previousExecutorHolder != null) { if ("blocking".equals(previousInfo.getType())) { - SizeValue updatedCapacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultCapacity))); + SizeValue updatedQueueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultQueueSize))); TimeValue updatedWaitTime = settings.getAsTime("wait_time", defaultWaitTime); - if (previousInfo.capacity().equals(updatedCapacity) && previousInfo.waitTime().equals(updatedWaitTime)) { + if (previousInfo.getQueueSize().equals(updatedQueueSize) && previousInfo.getWaitTime().equals(updatedWaitTime)) { TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); int updatedMin = settings.getAsInt("min", previousInfo.getMin()); int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax())); - if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || !previousInfo.waitTime().equals(settings.getAsTime("wait_time", defaultWaitTime)) || + if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || !previousInfo.getWaitTime().equals(settings.getAsTime("wait_time", defaultWaitTime)) || previousInfo.getMin() != updatedMin || previousInfo.getMax() != updatedSize) { logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) { @@ -415,7 +415,7 @@ public class ThreadPool extends AbstractComponent { if (previousInfo.getMax() != updatedSize) { ((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize); } - return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, updatedCapacity, updatedWaitTime)); + return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, updatedQueueSize, updatedWaitTime)); } return previousExecutorHolder; } @@ -429,25 +429,25 @@ public class ThreadPool extends AbstractComponent { if (previousInfo.getMax() >= 0) { defaultSize = previousInfo.getMax(); } - if (previousInfo.getCapacity() != null) { - defaultCapacity = previousInfo.getCapacity(); + if (previousInfo.getQueueSize() != null) { + defaultQueueSize = previousInfo.getQueueSize(); } - if (previousInfo.waitTime() != null) { - defaultWaitTime = previousInfo.getKeepAlive(); + if (previousInfo.getWaitTime() != null) { + defaultWaitTime = previousInfo.getWaitTime(); } } TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive); int min = settings.getAsInt("min", defaultMin); int size = settings.getAsInt("max", settings.getAsInt("size", defaultSize)); - SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultCapacity))); + SizeValue queueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultQueueSize))); TimeValue waitTime = settings.getAsTime("wait_time", defaultWaitTime); if (previousExecutorHolder != null) { - logger.debug("updating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, capacity.singles(), keepAlive, waitTime); + logger.debug("updating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, queueSize.singles(), keepAlive, waitTime); } else { - logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, capacity.singles(), keepAlive, waitTime); + logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, queueSize.singles(), keepAlive, waitTime); } - Executor executor = EsExecutors.newBlockingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, (int) capacity.singles(), waitTime.millis(), TimeUnit.MILLISECONDS); - return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, capacity, waitTime)); + Executor executor = EsExecutors.newBlockingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, (int) queueSize.singles(), waitTime.millis(), TimeUnit.MILLISECONDS); + return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, queueSize, waitTime)); } throw new ElasticSearchIllegalArgumentException("No type found [" + type + "], for [" + name + "]"); } @@ -476,14 +476,14 @@ public class ThreadPool extends AbstractComponent { } } - private BlockingQueue newQueue(SizeValue capacity, String queueType) { - if (capacity == null) { + private BlockingQueue newQueue(SizeValue queueSize, String queueType) { + if (queueSize == null) { return ConcurrentCollections.newBlockingQueue(); - } else if ((int) capacity.singles() > 0) { + } else if ((int) queueSize.singles() > 0) { if ("linked".equals(queueType)) { - return new LinkedBlockingQueue((int) capacity.singles()); + return new LinkedBlockingQueue((int) queueSize.singles()); } else if ("array".equals(queueType)) { - return new ArrayBlockingQueue((int) capacity.singles()); + return new ArrayBlockingQueue((int) queueSize.singles()); } else { throw new ElasticSearchIllegalArgumentException("illegal queue_type set to [" + queueType + "], should be either linked or array"); } @@ -635,7 +635,7 @@ public class ThreadPool extends AbstractComponent { private int min; private int max; private TimeValue keepAlive; - private SizeValue capacity; + private SizeValue queueSize; private TimeValue waitTime; private String rejectSetting; private String queueType; @@ -652,81 +652,50 @@ public class ThreadPool extends AbstractComponent { this(name, type, size, size, null, null); } - public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity) { - this(name, type, min, max, keepAlive, capacity, null); + public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize) { + this(name, type, min, max, keepAlive, queueSize, null); } - public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity, @Nullable TimeValue waitTime) { - this(name, type, min, max, keepAlive, capacity, waitTime, null, null); + public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize, @Nullable TimeValue waitTime) { + this(name, type, min, max, keepAlive, queueSize, waitTime, null, null); } - public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity, @Nullable TimeValue waitTime, String rejectSetting, String queueType) { + public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize, @Nullable TimeValue waitTime, String rejectSetting, String queueType) { this.name = name; this.type = type; this.min = min; this.max = max; this.keepAlive = keepAlive; - this.capacity = capacity; + this.queueSize = queueSize; this.waitTime = waitTime; this.rejectSetting = rejectSetting; this.queueType = queueType; } - public String name() { - return this.name; - } - public String getName() { return this.name; } - public String type() { - return this.type; - } - public String getType() { return this.type; } - public int min() { - return this.min; - } - public int getMin() { return this.min; } - public int max() { - return this.max; - } - public int getMax() { return this.max; } - @Nullable - public TimeValue keepAlive() { - return this.keepAlive; - } - @Nullable public TimeValue getKeepAlive() { return this.keepAlive; } @Nullable - public SizeValue capacity() { - return this.capacity; - } - - @Nullable - public SizeValue getCapacity() { - return this.capacity; - } - - @Nullable - public TimeValue waitTime() { - return this.waitTime; + public SizeValue getQueueSize() { + return this.queueSize; } @Nullable @@ -734,21 +703,11 @@ public class ThreadPool extends AbstractComponent { return this.waitTime; } - @Nullable - public String rejectSetting() { - return this.rejectSetting; - } - @Nullable public String getRejectSetting() { return this.rejectSetting; } - @Nullable - public String queueType() { - return this.queueType; - } - @Nullable public String getQueueType() { return this.queueType; @@ -765,7 +724,7 @@ public class ThreadPool extends AbstractComponent { keepAlive = TimeValue.readTimeValue(in); } if (in.readBoolean()) { - capacity = SizeValue.readSizeValue(in); + queueSize = SizeValue.readSizeValue(in); } if (in.readBoolean()) { waitTime = TimeValue.readTimeValue(in); @@ -786,11 +745,11 @@ public class ThreadPool extends AbstractComponent { out.writeBoolean(true); keepAlive.writeTo(out); } - if (capacity == null) { + if (queueSize == null) { out.writeBoolean(false); } else { out.writeBoolean(true); - capacity.writeTo(out); + queueSize.writeTo(out); } if (waitTime == null) { out.writeBoolean(false); @@ -815,8 +774,8 @@ public class ThreadPool extends AbstractComponent { if (keepAlive != null) { builder.field(Fields.KEEP_ALIVE, keepAlive.toString()); } - if (capacity != null) { - builder.field(Fields.CAPACITY, capacity.toString()); + if (queueSize != null) { + builder.field(Fields.QUEUE_SIZE, queueSize.toString()); } if (waitTime != null) { builder.field(Fields.WAIT_TIME, waitTime.toString()); @@ -836,7 +795,7 @@ public class ThreadPool extends AbstractComponent { static final XContentBuilderString MIN = new XContentBuilderString("min"); static final XContentBuilderString MAX = new XContentBuilderString("max"); static final XContentBuilderString KEEP_ALIVE = new XContentBuilderString("keep_alive"); - static final XContentBuilderString CAPACITY = new XContentBuilderString("capacity"); + static final XContentBuilderString QUEUE_SIZE = new XContentBuilderString("queue_size"); static final XContentBuilderString WAIT_TIME = new XContentBuilderString("wait_time"); static final XContentBuilderString REJECT_POLICY = new XContentBuilderString("reject_policy"); static final XContentBuilderString QUEUE_TYPE = new XContentBuilderString("queue_type"); diff --git a/src/test/java/org/elasticsearch/test/integration/threadpool/SimpleThreadPoolTests.java b/src/test/java/org/elasticsearch/test/integration/threadpool/SimpleThreadPoolTests.java index 491497600a2..2f8c7254aaa 100644 --- a/src/test/java/org/elasticsearch/test/integration/threadpool/SimpleThreadPoolTests.java +++ b/src/test/java/org/elasticsearch/test/integration/threadpool/SimpleThreadPoolTests.java @@ -103,10 +103,10 @@ public class SimpleThreadPoolTests extends AbstractNodesTests { NodeInfo nodeInfo = nodesInfoResponse.getNodes()[i]; boolean found = false; for (ThreadPool.Info info : nodeInfo.getThreadPool()) { - if (info.name().equals(Names.SEARCH)) { - assertThat(info.type(), equalTo("fixed")); - assertThat(info.rejectSetting(), equalTo("abort")); - assertThat(info.queueType(), equalTo("linked")); + if (info.getName().equals(Names.SEARCH)) { + assertThat(info.getType(), equalTo("fixed")); + assertThat(info.getRejectSetting(), equalTo("abort")); + assertThat(info.getQueueType(), equalTo("linked")); found = true; break; } @@ -130,11 +130,11 @@ public class SimpleThreadPoolTests extends AbstractNodesTests { NodeInfo nodeInfo = nodesInfoResponse.getNodes()[i]; boolean found = false; for (ThreadPool.Info info : nodeInfo.getThreadPool()) { - if (info.name().equals(Names.SEARCH)) { - assertThat(info.type(), equalTo("blocking")); - assertThat(info.capacity().singles(), equalTo(100L)); - assertThat(info.waitTime().seconds(), equalTo(10L)); - assertThat(info.keepAlive().seconds(), equalTo(15L)); + if (info.getName().equals(Names.SEARCH)) { + assertThat(info.getType(), equalTo("blocking")); + assertThat(info.getQueueSize().singles(), equalTo(100L)); + assertThat(info.getWaitTime().seconds(), equalTo(10L)); + assertThat(info.getKeepAlive().seconds(), equalTo(15L)); found = true; break; } @@ -142,7 +142,7 @@ public class SimpleThreadPoolTests extends AbstractNodesTests { assertThat(found, equalTo(true)); Map poolMap = getPoolSettingsThroughJson(nodeInfo.getThreadPool(), Names.SEARCH); - assertThat(poolMap.get("capacity").toString(), equalTo("100")); + assertThat(poolMap.get("queue_size").toString(), equalTo("100")); assertThat(poolMap.get("wait_time").toString(), equalTo("10s")); assertThat(poolMap.get("keep_alive").toString(), equalTo("15s")); } diff --git a/src/test/java/org/elasticsearch/test/unit/threadpool/UpdateThreadPoolSettingsTests.java b/src/test/java/org/elasticsearch/test/unit/threadpool/UpdateThreadPoolSettingsTests.java index 0d3b5e8b573..2522720812b 100644 --- a/src/test/java/org/elasticsearch/test/unit/threadpool/UpdateThreadPoolSettingsTests.java +++ b/src/test/java/org/elasticsearch/test/unit/threadpool/UpdateThreadPoolSettingsTests.java @@ -39,7 +39,7 @@ public class UpdateThreadPoolSettingsTests { private ThreadPool.Info info(ThreadPool threadPool, String name) { for (ThreadPool.Info info : threadPool.info()) { - if (info.name().equals(name)) { + if (info.getName().equals(name)) { return info; } } @@ -49,13 +49,13 @@ public class UpdateThreadPoolSettingsTests { @Test public void testCachedExecutorType() { ThreadPool threadPool = new ThreadPool(ImmutableSettings.settingsBuilder().put("threadpool.search.type", "cached").build(), null); - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached")); - assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(5L)); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); + assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L)); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); // Replace with different type threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "same").build()); - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("same")); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("same")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(ListeningExecutorService.class)); // Replace with different type again @@ -63,37 +63,37 @@ public class UpdateThreadPoolSettingsTests { .put("threadpool.search.type", "scaling") .put("threadpool.search.keep_alive", "10m") .build()); - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling")); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(1)); // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L)); + assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); // Put old type back threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "cached").build()); - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached")); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); // Make sure keep alive value reused - assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L)); + assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); // Change keep alive Executor oldExecutor = threadPool.executor(Names.SEARCH); threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build()); // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(1L)); + assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); // Make sure executor didn't change - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached")); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); // Set the same keep alive threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build()); // Make sure keep alive value didn't change - assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(1L)); + assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); // Make sure executor didn't change - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached")); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); threadPool.shutdown(); @@ -102,8 +102,8 @@ public class UpdateThreadPoolSettingsTests { @Test public void testFixedExecutorType() { ThreadPool threadPool = new ThreadPool(settingsBuilder().put("threadpool.search.type", "fixed").build(), null); - assertThat(info(threadPool, Names.SEARCH).rejectSetting(), equalTo("abort")); - assertThat(info(threadPool, Names.SEARCH).queueType(), equalTo("linked")); + assertThat(info(threadPool, Names.SEARCH).getRejectSetting(), equalTo("abort")); + assertThat(info(threadPool, Names.SEARCH).getQueueType(), equalTo("linked")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); // Replace with different type @@ -113,26 +113,26 @@ public class UpdateThreadPoolSettingsTests { .put("threadpool.search.min", "2") .put("threadpool.search.size", "15") .build()); - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling")); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); - assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2)); - assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15)); + assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2)); + assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15)); // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L)); + assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); // Put old type back threadPool.updateSettings(settingsBuilder() .put("threadpool.search.type", "fixed") .build()); - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("fixed")); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed")); // Make sure keep alive value is not used - assertThat(info(threadPool, Names.SEARCH).keepAlive(), nullValue()); + assertThat(info(threadPool, Names.SEARCH).getKeepAlive(), nullValue()); // Make sure keep pool size value were reused - assertThat(info(threadPool, Names.SEARCH).min(), equalTo(15)); - assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15)); + assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(15)); + assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15)); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(15)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); @@ -141,19 +141,19 @@ public class UpdateThreadPoolSettingsTests { Executor oldExecutor = threadPool.executor(Names.SEARCH); threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").build()); // Make sure size values changed - assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10)); - assertThat(info(threadPool, Names.SEARCH).min(), equalTo(10)); + assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10)); + assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(10)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(10)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(10)); // Make sure executor didn't change - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("fixed")); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed")); assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); // Change queue capacity threadPool.updateSettings(settingsBuilder() .put("threadpool.search.queue", "500") .build()); - assertThat(info(threadPool, Names.SEARCH).queueType(), equalTo("linked")); + assertThat(info(threadPool, Names.SEARCH).getQueueType(), equalTo("linked")); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue(), instanceOf(LinkedBlockingQueue.class)); // Set different queue and size type @@ -162,19 +162,19 @@ public class UpdateThreadPoolSettingsTests { .put("threadpool.search.size", "12") .build()); // Make sure keep size changed - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("fixed")); - assertThat(info(threadPool, Names.SEARCH).max(), equalTo(12)); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed")); + assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(12)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(12)); - assertThat(info(threadPool, Names.SEARCH).queueType(), equalTo("array")); + assertThat(info(threadPool, Names.SEARCH).getQueueType(), equalTo("array")); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue(), instanceOf(ArrayBlockingQueue.class)); // Change rejection policy oldExecutor = threadPool.executor(Names.SEARCH); - assertThat(info(threadPool, Names.SEARCH).rejectSetting(), equalTo("abort")); + assertThat(info(threadPool, Names.SEARCH).getRejectSetting(), equalTo("abort")); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getRejectedExecutionHandler(), instanceOf(EsAbortPolicy.class)); threadPool.updateSettings(settingsBuilder().put("threadpool.search.reject_policy", "caller").build()); // Make sure rejection handler changed - assertThat(info(threadPool, Names.SEARCH).rejectSetting(), equalTo("caller")); + assertThat(info(threadPool, Names.SEARCH).getRejectSetting(), equalTo("caller")); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getRejectedExecutionHandler(), instanceOf(ThreadPoolExecutor.CallerRunsPolicy.class)); // Make sure executor didn't change assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); @@ -187,10 +187,10 @@ public class UpdateThreadPoolSettingsTests { public void testScalingExecutorType() { ThreadPool threadPool = new ThreadPool( settingsBuilder().put("threadpool.search.type", "scaling").put("threadpool.search.size", 10).build(), null); - assertThat(info(threadPool, Names.SEARCH).min(), equalTo(1)); - assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10)); - assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(5L)); - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling")); + assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(1)); + assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10)); + assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L)); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); // Change settings that doesn't require pool replacement @@ -201,14 +201,14 @@ public class UpdateThreadPoolSettingsTests { .put("threadpool.search.min", "2") .put("threadpool.search.size", "15") .build()); - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling")); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); - assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2)); - assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15)); + assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2)); + assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15)); // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L)); + assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); @@ -218,10 +218,10 @@ public class UpdateThreadPoolSettingsTests { @Test public void testBlockingExecutorType() { ThreadPool threadPool = new ThreadPool(settingsBuilder().put("threadpool.search.type", "blocking").put("threadpool.search.size", "10").build(), null); - assertThat(info(threadPool, Names.SEARCH).min(), equalTo(1)); - assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10)); - assertThat(info(threadPool, Names.SEARCH).capacity().singles(), equalTo(1000L)); - assertThat(info(threadPool, Names.SEARCH).waitTime().minutes(), equalTo(1L)); + assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(1)); + assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10)); + assertThat(info(threadPool, Names.SEARCH).getQueueSize().singles(), equalTo(1000L)); + assertThat(info(threadPool, Names.SEARCH).getWaitTime().minutes(), equalTo(1L)); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); // Replace with different type @@ -231,26 +231,26 @@ public class UpdateThreadPoolSettingsTests { .put("threadpool.search.min", "2") .put("threadpool.search.size", "15") .build()); - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling")); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); - assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2)); - assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15)); + assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2)); + assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15)); // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L)); + assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); // Put old type back threadPool.updateSettings(settingsBuilder() .put("threadpool.search.type", "blocking") .build()); - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("blocking")); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("blocking")); // Make sure keep alive value is not used - assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L)); + assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); // Make sure keep pool size value were reused - assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2)); - assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15)); + assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2)); + assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15)); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); @@ -259,25 +259,25 @@ public class UpdateThreadPoolSettingsTests { Executor oldExecutor = threadPool.executor(Names.SEARCH); threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").put("threadpool.search.min", "5").build()); // Make sure size values changed - assertThat(info(threadPool, Names.SEARCH).min(), equalTo(5)); - assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10)); + assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(5)); + assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(5)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(10)); // Make sure executor didn't change - assertThat(info(threadPool, Names.SEARCH).type(), equalTo("blocking")); + assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("blocking")); assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); // Change queue capacity threadPool.updateSettings(settingsBuilder() .put("threadpool.search.queue_size", "500") .build()); - assertThat(info(threadPool, Names.SEARCH).capacity().singles(), equalTo(500L)); + assertThat(info(threadPool, Names.SEARCH).getQueueSize().singles(), equalTo(500L)); // Change wait time capacity threadPool.updateSettings(settingsBuilder() .put("threadpool.search.wait_time", "2m") .build()); - assertThat(info(threadPool, Names.SEARCH).waitTime().minutes(), equalTo(2L)); + assertThat(info(threadPool, Names.SEARCH).getWaitTime().minutes(), equalTo(2L)); threadPool.shutdown(); }