diff --git a/server/src/main/java/org/opensearch/threadpool/AutoQueueAdjustingExecutorBuilder.java b/server/src/main/java/org/opensearch/threadpool/AutoQueueAdjustingExecutorBuilder.java deleted file mode 100644 index cd89d6110a0..00000000000 --- a/server/src/main/java/org/opensearch/threadpool/AutoQueueAdjustingExecutorBuilder.java +++ /dev/null @@ -1,289 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.threadpool; - -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.SizeValue; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.concurrent.OpenSearchExecutors; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.node.Node; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; - -/** - * A builder for executors that automatically adjust the queue length as needed, depending on - * Little's Law. See https://en.wikipedia.org/wiki/Little's_law for more information. - * - * @opensearch.internal - */ -public final class AutoQueueAdjustingExecutorBuilder extends ExecutorBuilder { - - private final Setting sizeSetting; - private final Setting queueSizeSetting; - private final Setting minQueueSizeSetting; - private final Setting maxQueueSizeSetting; - private final Setting targetedResponseTimeSetting; - private final Setting frameSizeSetting; - - AutoQueueAdjustingExecutorBuilder( - final Settings settings, - final String name, - final int size, - final int initialQueueSize, - final int minQueueSize, - final int maxQueueSize, - final int frameSize - ) { - super(name); - final String prefix = "thread_pool." + name; - final String sizeKey = settingsKey(prefix, "size"); - this.sizeSetting = new Setting<>( - sizeKey, - s -> Integer.toString(size), - s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), - Setting.Property.NodeScope - ); - final String queueSizeKey = settingsKey(prefix, "queue_size"); - final String minSizeKey = settingsKey(prefix, "min_queue_size"); - final String maxSizeKey = settingsKey(prefix, "max_queue_size"); - final String frameSizeKey = settingsKey(prefix, "auto_queue_frame_size"); - final String targetedResponseTimeKey = settingsKey(prefix, "target_response_time"); - this.targetedResponseTimeSetting = Setting.timeSetting( - targetedResponseTimeKey, - TimeValue.timeValueSeconds(1), - TimeValue.timeValueMillis(10), - Setting.Property.NodeScope, - Setting.Property.Deprecated - ); - this.queueSizeSetting = Setting.intSetting(queueSizeKey, initialQueueSize, Setting.Property.NodeScope); - // These temp settings are used to validate the min and max settings below - Setting tempMaxQueueSizeSetting = Setting.intSetting( - maxSizeKey, - maxQueueSize, - Setting.Property.NodeScope, - Setting.Property.Deprecated - ); - Setting tempMinQueueSizeSetting = Setting.intSetting( - minSizeKey, - minQueueSize, - Setting.Property.NodeScope, - Setting.Property.Deprecated - ); - - this.minQueueSizeSetting = new Setting<>( - minSizeKey, - Integer.toString(minQueueSize), - s -> Setting.parseInt(s, 0, minSizeKey), - new Setting.Validator() { - - @Override - public void validate(final Integer value) { - - } - - @Override - public void validate(final Integer value, final Map, Object> settings) { - if (value > (int) settings.get(tempMaxQueueSizeSetting)) { - throw new IllegalArgumentException( - "Failed to parse value [" - + value - + "] for setting [" - + minSizeKey - + "] must be <= " - + settings.get(tempMaxQueueSizeSetting) - ); - } - } - - @Override - public Iterator> settings() { - final List> settings = Collections.singletonList(tempMaxQueueSizeSetting); - return settings.iterator(); - } - - }, - Setting.Property.NodeScope - ); - this.maxQueueSizeSetting = new Setting<>( - maxSizeKey, - Integer.toString(maxQueueSize), - s -> Setting.parseInt(s, 0, maxSizeKey), - new Setting.Validator() { - - @Override - public void validate(Integer value) { - - } - - @Override - public void validate(final Integer value, final Map, Object> settings) { - if (value < (int) settings.get(tempMinQueueSizeSetting)) { - throw new IllegalArgumentException( - "Failed to parse value [" - + value - + "] for setting [" - + minSizeKey - + "] must be >= " - + settings.get(tempMinQueueSizeSetting) - ); - } - } - - @Override - public Iterator> settings() { - final List> settings = Collections.singletonList(tempMinQueueSizeSetting); - return settings.iterator(); - } - - }, - Setting.Property.NodeScope, - Setting.Property.Deprecated - ); - this.frameSizeSetting = Setting.intSetting( - frameSizeKey, - frameSize, - 100, - Setting.Property.NodeScope, - Setting.Property.Deprecated, - Setting.Property.Deprecated - ); - } - - @Override - public List> getRegisteredSettings() { - return Arrays.asList( - sizeSetting, - queueSizeSetting, - minQueueSizeSetting, - maxQueueSizeSetting, - frameSizeSetting, - targetedResponseTimeSetting - ); - } - - @Override - AutoExecutorSettings getSettings(Settings settings) { - final String nodeName = Node.NODE_NAME_SETTING.get(settings); - final int size = sizeSetting.get(settings); - final int initialQueueSize = queueSizeSetting.get(settings); - final int minQueueSize = minQueueSizeSetting.get(settings); - final int maxQueueSize = maxQueueSizeSetting.get(settings); - final int frameSize = frameSizeSetting.get(settings); - final TimeValue targetedResponseTime = targetedResponseTimeSetting.get(settings); - return new AutoExecutorSettings(nodeName, size, initialQueueSize, minQueueSize, maxQueueSize, frameSize, targetedResponseTime); - } - - @Override - ThreadPool.ExecutorHolder build(final AutoExecutorSettings settings, final ThreadContext threadContext) { - int size = settings.size; - int initialQueueSize = settings.initialQueueSize; - int minQueueSize = settings.minQueueSize; - int maxQueueSize = settings.maxQueueSize; - int frameSize = settings.frameSize; - TimeValue targetedResponseTime = settings.targetedResponseTime; - final ThreadFactory threadFactory = OpenSearchExecutors.daemonThreadFactory( - OpenSearchExecutors.threadName(settings.nodeName, name()) - ); - final ExecutorService executor = OpenSearchExecutors.newAutoQueueFixed( - settings.nodeName + "/" + name(), - size, - initialQueueSize, - minQueueSize, - maxQueueSize, - frameSize, - targetedResponseTime, - threadFactory, - threadContext - ); - // TODO: in a subsequent change we hope to extend ThreadPool.Info to be more specific for the thread pool type - final ThreadPool.Info info = new ThreadPool.Info( - name(), - ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE, - size, - size, - null, - new SizeValue(initialQueueSize) - ); - return new ThreadPool.ExecutorHolder(executor, info); - } - - @Override - String formatInfo(ThreadPool.Info info) { - return String.format( - Locale.ROOT, - "name [%s], size [%d], queue size [%s]", - info.getName(), - info.getMax(), - info.getQueueSize() == null ? "unbounded" : info.getQueueSize() - ); - } - - static final class AutoExecutorSettings extends ExecutorBuilder.ExecutorSettings { - - final int size; - final int initialQueueSize; - final int minQueueSize; - final int maxQueueSize; - final int frameSize; - final TimeValue targetedResponseTime; - - AutoExecutorSettings( - final String nodeName, - final int size, - final int initialQueueSize, - final int minQueueSize, - final int maxQueueSize, - final int frameSize, - final TimeValue targetedResponseTime - ) { - super(nodeName); - this.size = size; - this.initialQueueSize = initialQueueSize; - this.minQueueSize = minQueueSize; - this.maxQueueSize = maxQueueSize; - this.frameSize = frameSize; - this.targetedResponseTime = targetedResponseTime; - } - - } - -} diff --git a/server/src/test/java/org/opensearch/threadpool/AutoQueueAdjustingExecutorBuilderTests.java b/server/src/test/java/org/opensearch/threadpool/AutoQueueAdjustingExecutorBuilderTests.java deleted file mode 100644 index 5071a07032b..00000000000 --- a/server/src/test/java/org/opensearch/threadpool/AutoQueueAdjustingExecutorBuilderTests.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.threadpool; - -import org.opensearch.common.settings.Settings; - -import static org.hamcrest.CoreMatchers.containsString; - -public class AutoQueueAdjustingExecutorBuilderTests extends OpenSearchThreadPoolTestCase { - - public void testValidatingMinMaxSettings() { - Settings settings = Settings.builder() - .put("thread_pool.test.min_queue_size", randomIntBetween(30, 100)) - .put("thread_pool.test.max_queue_size", randomIntBetween(1, 25)) - .build(); - try { - new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 15, 1, 100, 10); - fail("should have thrown an exception"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("Failed to parse value")); - } - - settings = Settings.builder().put("thread_pool.test.min_queue_size", 10).put("thread_pool.test.max_queue_size", 9).build(); - try { - new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 15, 1, 100, 2000).getSettings(settings); - fail("should have thrown an exception"); - } catch (IllegalArgumentException e) { - assertEquals(e.getMessage(), "Failed to parse value [10] for setting [thread_pool.test.min_queue_size] must be <= 9"); - } - - settings = Settings.builder().put("thread_pool.test.min_queue_size", 11).put("thread_pool.test.max_queue_size", 10).build(); - try { - new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 15, 1, 100, 2000).getSettings(settings); - fail("should have thrown an exception"); - } catch (IllegalArgumentException e) { - assertEquals(e.getMessage(), "Failed to parse value [11] for setting [thread_pool.test.min_queue_size] must be <= 10"); - } - - settings = Settings.builder().put("thread_pool.test.min_queue_size", 101).build(); - try { - new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 15, 100, 100, 2000).getSettings(settings); - fail("should have thrown an exception"); - } catch (IllegalArgumentException e) { - assertEquals(e.getMessage(), "Failed to parse value [101] for setting [thread_pool.test.min_queue_size] must be <= 100"); - } - - settings = Settings.builder().put("thread_pool.test.max_queue_size", 99).build(); - try { - new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 15, 100, 100, 2000).getSettings(settings); - fail("should have thrown an exception"); - } catch (IllegalArgumentException e) { - assertEquals(e.getMessage(), "Failed to parse value [100] for setting [thread_pool.test.min_queue_size] must be <= 99"); - } - - assertSettingDeprecationsAndWarnings(new String[] { "thread_pool.test.max_queue_size" }); - } - - public void testSetLowerSettings() { - Settings settings = Settings.builder() - .put("thread_pool.test.min_queue_size", 10) - .put("thread_pool.test.max_queue_size", 10) - .build(); - AutoQueueAdjustingExecutorBuilder test = new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 1000, 1000, 1000, 2000); - AutoQueueAdjustingExecutorBuilder.AutoExecutorSettings s = test.getSettings(settings); - assertEquals(10, s.maxQueueSize); - assertEquals(10, s.minQueueSize); - - assertSettingDeprecationsAndWarnings(new String[] { "thread_pool.test.min_queue_size", "thread_pool.test.max_queue_size" }); - } - - public void testSetHigherSettings() { - Settings settings = Settings.builder() - .put("thread_pool.test.min_queue_size", 2000) - .put("thread_pool.test.max_queue_size", 3000) - .build(); - AutoQueueAdjustingExecutorBuilder test = new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 1000, 1000, 1000, 2000); - AutoQueueAdjustingExecutorBuilder.AutoExecutorSettings s = test.getSettings(settings); - assertEquals(3000, s.maxQueueSize); - assertEquals(2000, s.minQueueSize); - - assertSettingDeprecationsAndWarnings(new String[] { "thread_pool.test.min_queue_size", "thread_pool.test.max_queue_size" }); - } - -}