diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/EWMATrackingThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/EWMATrackingThreadPoolExecutor.java new file mode 100644 index 00000000000..ab65c68dadc --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/concurrent/EWMATrackingThreadPoolExecutor.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util.concurrent; + +/** + * Marks the thread pool executor as supporting EWMA (exponential weighted moving average) tracking + * + * @opensearch.internal + */ +public interface EWMATrackingThreadPoolExecutor { + /** + * This is a random starting point alpha + */ + double EWMA_ALPHA = 0.3; + + /** + * Returns the exponentially weighted moving average of the task execution time + */ + double getTaskExecutionEWMA(); + + /** + * Returns the current queue size (operations that are queued) + */ + int getCurrentQueueSize(); +} diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchExecutors.java b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchExecutors.java index 094a356151b..14f9486b4ba 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchExecutors.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchExecutors.java @@ -177,6 +177,32 @@ public class OpenSearchExecutors { ); } + public static OpenSearchThreadPoolExecutor newResizable( + String name, + int size, + int queueCapacity, + ThreadFactory threadFactory, + ThreadContext contextHolder + ) { + + if (queueCapacity <= 0) { + throw new IllegalArgumentException("queue capacity for [" + name + "] executor must be positive, got: " + queueCapacity); + } + + return new QueueResizableOpenSearchThreadPoolExecutor( + name, + size, + size, + 0, + TimeUnit.MILLISECONDS, + new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), queueCapacity), + TimedRunnable::new, + threadFactory, + new OpenSearchAbortPolicy(), + contextHolder + ); + } + /** * Return a new executor that will automatically adjust the queue size based on queue throughput. * diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java new file mode 100644 index 00000000000..7a0ce8244ef --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutor.java @@ -0,0 +1,176 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util.concurrent; + +import org.opensearch.common.ExponentiallyWeightedMovingAverage; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +/** + * An extension to thread pool executor, which allows to adjusts the queue size of the + * {@code ResizableBlockingQueue} and tracks EWMA. + * + * @opensearch.internal + */ +public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearchThreadPoolExecutor + implements + EWMATrackingThreadPoolExecutor { + + private final ResizableBlockingQueue workQueue; + private final Function runnableWrapper; + private final ExponentiallyWeightedMovingAverage executionEWMA; + + /** + * Create new resizable at runtime thread pool executor + * @param name thread pool name + * @param corePoolSize core pool size + * @param maximumPoolSize maximum pool size + * @param keepAliveTime keep alive time + * @param unit time unit for keep alive time + * @param workQueue work queue + * @param runnableWrapper runnable wrapper + * @param threadFactory thread factory + * @param handler rejected execution handler + * @param contextHolder context holder + */ + QueueResizableOpenSearchThreadPoolExecutor( + String name, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + ResizableBlockingQueue workQueue, + Function runnableWrapper, + ThreadFactory threadFactory, + XRejectedExecutionHandler handler, + ThreadContext contextHolder + ) { + this( + name, + corePoolSize, + maximumPoolSize, + keepAliveTime, + unit, + workQueue, + runnableWrapper, + threadFactory, + handler, + contextHolder, + EWMA_ALPHA + ); + } + + /** + * Create new resizable at runtime thread pool executor + * @param name thread pool name + * @param corePoolSize core pool size + * @param maximumPoolSize maximum pool size + * @param keepAliveTime keep alive time + * @param unit time unit for keep alive time + * @param workQueue work queue + * @param runnableWrapper runnable wrapper + * @param threadFactory thread factory + * @param handler rejected execution handler + * @param contextHolder context holder + * @param ewmaAlpha the alpha parameter for exponentially weighted moving average (a smaller alpha means + * that new data points will have less weight, where a high alpha means older data points will + * have a lower influence) + */ + QueueResizableOpenSearchThreadPoolExecutor( + String name, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + ResizableBlockingQueue workQueue, + Function runnableWrapper, + ThreadFactory threadFactory, + XRejectedExecutionHandler handler, + ThreadContext contextHolder, + double ewmaAlpha + ) { + super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder); + this.workQueue = workQueue; + this.runnableWrapper = runnableWrapper; + this.executionEWMA = new ExponentiallyWeightedMovingAverage(ewmaAlpha, 0); + } + + @Override + protected Runnable wrapRunnable(Runnable command) { + return super.wrapRunnable(this.runnableWrapper.apply(command)); + } + + @Override + protected Runnable unwrap(Runnable runnable) { + final Runnable unwrapped = super.unwrap(runnable); + if (unwrapped instanceof WrappedRunnable) { + return ((WrappedRunnable) unwrapped).unwrap(); + } else { + return unwrapped; + } + } + + /** + * Returns the exponentially weighted moving average of the task execution time + */ + @Override + public double getTaskExecutionEWMA() { + return executionEWMA.getAverage(); + } + + /** + * Returns the current queue size (operations that are queued) + */ + @Override + public int getCurrentQueueSize() { + return workQueue.size(); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + // A task has been completed, it has left the building. We should now be able to get the + // total time as a combination of the time in the queue and time spent running the task. We + // only want runnables that did not throw errors though, because they could be fast-failures + // that throw off our timings, so only check when t is null. + assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue"; + final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r); + final boolean failedOrRejected = timedRunnable.getFailedOrRejected(); + + final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos(); + assert taskExecutionNanos >= 0 || (failedOrRejected && taskExecutionNanos == -1) + : "expected task to always take longer than 0 nanoseconds or have '-1' failure code, got: " + + taskExecutionNanos + + ", failedOrRejected: " + + failedOrRejected; + + if (taskExecutionNanos != -1) { + // taskExecutionNanos may be -1 if the task threw an exception + executionEWMA.addValue(taskExecutionNanos); + } + } + + /** + * Resizes the work queue capacity of the pool + * @param capacity the new capacity + */ + public synchronized int resize(int capacity) { + final ResizableBlockingQueue resizableWorkQueue = (ResizableBlockingQueue) workQueue; + final int currentCapacity = resizableWorkQueue.capacity(); + // Reusing adjustCapacity method instead of introducing the new one + return resizableWorkQueue.adjustCapacity( + currentCapacity < capacity ? capacity + 1 : capacity - 1, + StrictMath.abs(capacity - currentCapacity), + capacity, + capacity + ); + } +} diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java index 94306906d8c..336c605e1a5 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java @@ -51,10 +51,9 @@ import java.util.function.Function; * * @opensearch.internal */ -public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchThreadPoolExecutor { - - // This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable - public static double EWMA_ALPHA = 0.3; +public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchThreadPoolExecutor + implements + EWMATrackingThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(QueueResizingOpenSearchThreadPoolExecutor.class); // The amount the queue size is adjusted by for each calcuation @@ -155,6 +154,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT /** * Returns the exponentially weighted moving average of the task execution time */ + @Override public double getTaskExecutionEWMA() { return executionEWMA.getAverage(); } @@ -162,6 +162,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT /** * Returns the current queue size (operations that are queued) */ + @Override public int getCurrentQueueSize() { return workQueue.size(); } diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhase.java b/server/src/main/java/org/opensearch/search/query/QueryPhase.java index 340a32e914e..8de44a74480 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhase.java @@ -53,7 +53,7 @@ import org.opensearch.action.search.SearchShardTask; import org.opensearch.common.Booleans; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.search.TopDocsAndMaxScore; -import org.opensearch.common.util.concurrent.QueueResizingOpenSearchThreadPoolExecutor; +import org.opensearch.common.util.concurrent.EWMATrackingThreadPoolExecutor; import org.opensearch.search.DocValueFormat; import org.opensearch.search.SearchContextSourcePrinter; import org.opensearch.search.SearchService; @@ -290,8 +290,8 @@ public class QueryPhase { ); ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); - if (executor instanceof QueueResizingOpenSearchThreadPoolExecutor) { - QueueResizingOpenSearchThreadPoolExecutor rExecutor = (QueueResizingOpenSearchThreadPoolExecutor) executor; + if (executor instanceof EWMATrackingThreadPoolExecutor) { + final EWMATrackingThreadPoolExecutor rExecutor = (EWMATrackingThreadPoolExecutor) executor; queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); } diff --git a/server/src/main/java/org/opensearch/threadpool/ResizableExecutorBuilder.java b/server/src/main/java/org/opensearch/threadpool/ResizableExecutorBuilder.java new file mode 100644 index 00000000000..fd9ca1d3b5f --- /dev/null +++ b/server/src/main/java/org/opensearch/threadpool/ResizableExecutorBuilder.java @@ -0,0 +1,116 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.threadpool; + +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.SizeValue; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.node.Node; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; + +/** + * A builder for resizable executors. + * + * @opensearch.internal + */ +public final class ResizableExecutorBuilder extends ExecutorBuilder { + + private final Setting sizeSetting; + private final Setting queueSizeSetting; + + ResizableExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize) { + this(settings, name, size, queueSize, "thread_pool." + name); + } + + public ResizableExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) { + super(name); + final String sizeKey = settingsKey(prefix, "size"); + this.sizeSetting = new Setting<>( + sizeKey, + s -> Integer.toString(size), + s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), + Setting.Property.NodeScope + ); + final String queueSizeKey = settingsKey(prefix, "queue_size"); + this.queueSizeSetting = Setting.intSetting( + queueSizeKey, + queueSize, + new Setting.Property[] { Setting.Property.NodeScope, Setting.Property.Dynamic } + ); + } + + @Override + public List> getRegisteredSettings() { + return Arrays.asList(sizeSetting, queueSizeSetting); + } + + @Override + ResizableExecutorSettings getSettings(Settings settings) { + final String nodeName = Node.NODE_NAME_SETTING.get(settings); + final int size = sizeSetting.get(settings); + final int queueSize = queueSizeSetting.get(settings); + return new ResizableExecutorSettings(nodeName, size, queueSize); + } + + @Override + ThreadPool.ExecutorHolder build(final ResizableExecutorSettings settings, final ThreadContext threadContext) { + int size = settings.size; + int queueSize = settings.queueSize; + final ThreadFactory threadFactory = OpenSearchExecutors.daemonThreadFactory( + OpenSearchExecutors.threadName(settings.nodeName, name()) + ); + final ExecutorService executor = OpenSearchExecutors.newResizable( + settings.nodeName + "/" + name(), + size, + queueSize, + threadFactory, + threadContext + ); + final ThreadPool.Info info = new ThreadPool.Info( + name(), + ThreadPool.ThreadPoolType.RESIZABLE, + size, + size, + null, + queueSize < 0 ? null : new SizeValue(queueSize) + ); + return new ThreadPool.ExecutorHolder(executor, info); + } + + @Override + String formatInfo(ThreadPool.Info info) { + return String.format( + Locale.ROOT, + "name [%s], size [%d], queue size [%s]", + info.getName(), + info.getMax(), + info.getQueueSize() == null ? "unbounded" : info.getQueueSize() + ); + } + + static class ResizableExecutorSettings extends ExecutorBuilder.ExecutorSettings { + + private final int size; + private final int queueSize; + + ResizableExecutorSettings(final String nodeName, final int size, final int queueSize) { + super(nodeName); + this.size = size; + this.queueSize = queueSize; + } + + } +} diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index fb62c1243db..77682a7946c 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -35,6 +35,7 @@ package org.opensearch.threadpool; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -116,6 +117,7 @@ public class ThreadPool implements ReportingService, Scheduler { public enum ThreadPoolType { DIRECT("direct"), FIXED("fixed"), + RESIZABLE("resizable"), FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"), SCALING("scaling"); @@ -158,7 +160,7 @@ public class ThreadPool implements ReportingService, Scheduler { map.put(Names.GET, ThreadPoolType.FIXED); map.put(Names.ANALYZE, ThreadPoolType.FIXED); map.put(Names.WRITE, ThreadPoolType.FIXED); - map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE); + map.put(Names.SEARCH, ThreadPoolType.RESIZABLE); map.put(Names.MANAGEMENT, ThreadPoolType.SCALING); map.put(Names.FLUSH, ThreadPoolType.SCALING); map.put(Names.REFRESH, ThreadPoolType.SCALING); @@ -167,7 +169,7 @@ public class ThreadPool implements ReportingService, Scheduler { map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED); map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING); map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING); - map.put(Names.SEARCH_THROTTLED, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE); + map.put(Names.SEARCH_THROTTLED, ThreadPoolType.RESIZABLE); map.put(Names.SYSTEM_READ, ThreadPoolType.FIXED); map.put(Names.SYSTEM_WRITE, ThreadPoolType.FIXED); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); @@ -210,14 +212,8 @@ public class ThreadPool implements ReportingService, Scheduler { builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000)); builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16)); - builders.put( - Names.SEARCH, - new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000, 1000, 1000, 2000) - ); - builders.put( - Names.SEARCH_THROTTLED, - new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, 100, 100, 200) - ); + builders.put(Names.SEARCH, new ResizableExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000)); + builders.put(Names.SEARCH_THROTTLED, new ResizableExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side @@ -710,7 +706,13 @@ public class ThreadPool implements ReportingService, Scheduler { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(name); - out.writeString(type.getType()); + if (type == ThreadPoolType.RESIZABLE && out.getVersion().before(Version.V_3_0_0)) { + // Opensearch on older version doesn't know about "resizable" thread pool. Convert RESIZABLE to FIXED + // to avoid serialization/de-serization issue between nodes with different OpenSearch version + out.writeString(ThreadPoolType.FIXED.getType()); + } else { + out.writeString(type.getType()); + } out.writeInt(min); out.writeInt(max); out.writeOptionalTimeValue(keepAlive); diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutorTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutorTests.java new file mode 100644 index 00000000000..1fa3e3b4c03 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutorTests.java @@ -0,0 +1,226 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util.concurrent; + +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +/** + * Tests for the automatic queue resizing of the {@code QueueResizableOpenSearchThreadPoolExecutor} + * based on the time taken for each event. + */ +public class QueueResizableOpenSearchThreadPoolExecutorTests extends OpenSearchTestCase { + public void testResizeQueueSameSize() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + ResizableBlockingQueue queue = new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), 2000); + + int threads = randomIntBetween(1, 10); + int measureWindow = randomIntBetween(100, 200); + logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow); + QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor( + "test-threadpool", + threads, + threads, + 1000, + TimeUnit.MILLISECONDS, + queue, + fastWrapper(), + OpenSearchExecutors.daemonThreadFactory("queuetest"), + new OpenSearchAbortPolicy(), + context + ); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + // Execute a task multiple times that takes 1ms + assertThat(executor.resize(1000), equalTo(1000)); + executeTask(executor, (measureWindow * 5) + 2); + + assertBusy(() -> { assertThat(queue.capacity(), lessThanOrEqualTo(1000)); }); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } + + public void testResizeQueueUp() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + ResizableBlockingQueue queue = new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), 2000); + + int threads = randomIntBetween(1, 10); + int measureWindow = randomIntBetween(100, 200); + logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow); + QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor( + "test-threadpool", + threads, + threads, + 1000, + TimeUnit.MILLISECONDS, + queue, + fastWrapper(), + OpenSearchExecutors.daemonThreadFactory("queuetest"), + new OpenSearchAbortPolicy(), + context + ); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + // Execute a task multiple times that takes 1ms + assertThat(executor.resize(3000), equalTo(3000)); + executeTask(executor, (measureWindow * 5) + 2); + + assertBusy(() -> { assertThat(queue.capacity(), greaterThanOrEqualTo(2000)); }); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } + + public void testResizeQueueDown() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + ResizableBlockingQueue queue = new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), 2000); + + int threads = randomIntBetween(1, 10); + int measureWindow = randomIntBetween(100, 200); + logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow); + QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor( + "test-threadpool", + threads, + threads, + 1000, + TimeUnit.MILLISECONDS, + queue, + fastWrapper(), + OpenSearchExecutors.daemonThreadFactory("queuetest"), + new OpenSearchAbortPolicy(), + context + ); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + // Execute a task multiple times that takes 1ms + assertThat(executor.resize(900), equalTo(900)); + executeTask(executor, (measureWindow * 5) + 2); + + assertBusy(() -> { assertThat(queue.capacity(), lessThanOrEqualTo(900)); }); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } + + public void testExecutionEWMACalculation() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + ResizableBlockingQueue queue = new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), 100); + + QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor( + "test-threadpool", + 1, + 1, + 1000, + TimeUnit.MILLISECONDS, + queue, + fastWrapper(), + OpenSearchExecutors.daemonThreadFactory("queuetest"), + new OpenSearchAbortPolicy(), + context + ); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L)); + executeTask(executor, 1); + assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(30L)); }); + executeTask(executor, 1); + assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(51L)); }); + executeTask(executor, 1); + assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(65L)); }); + executeTask(executor, 1); + assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(75L)); }); + executeTask(executor, 1); + assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(83L)); }); + + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } + + /** Use a runnable wrapper that simulates a task with unknown failures. */ + public void testExceptionThrowingTask() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + ResizableBlockingQueue queue = new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), 100); + + QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor( + "test-threadpool", + 1, + 1, + 1000, + TimeUnit.MILLISECONDS, + queue, + exceptionalWrapper(), + OpenSearchExecutors.daemonThreadFactory("queuetest"), + new OpenSearchAbortPolicy(), + context + ); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L)); + executeTask(executor, 1); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } + + private Function fastWrapper() { + return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(100), false); + } + + /** + * The returned function outputs a WrappedRunnabled that simulates the case + * where {@link TimedRunnable#getTotalExecutionNanos()} returns -1 because + * the job failed or was rejected before it finished. + */ + private Function exceptionalWrapper() { + return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(-1), true); + } + + /** Execute a blank task {@code times} times for the executor */ + private void executeTask(QueueResizableOpenSearchThreadPoolExecutor executor, int times) { + logger.info("--> executing a task [{}] times", times); + for (int i = 0; i < times; i++) { + executor.execute(() -> {}); + } + } + + public class SettableTimedRunnable extends TimedRunnable { + private final long timeTaken; + private final boolean testFailedOrRejected; + + public SettableTimedRunnable(long timeTaken, boolean failedOrRejected) { + super(() -> {}); + this.timeTaken = timeTaken; + this.testFailedOrRejected = failedOrRejected; + } + + @Override + public long getTotalNanos() { + return timeTaken; + } + + @Override + public long getTotalExecutionNanos() { + return timeTaken; + } + + @Override + public boolean getFailedOrRejected() { + return testFailedOrRejected; + } + } +} diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutorTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutorTests.java index f2bbfe91525..3d8faeeee3e 100644 --- a/server/src/test/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutorTests.java +++ b/server/src/test/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutorTests.java @@ -44,7 +44,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; /** - * Tests for the automatic queue resizing of the {@code QueueResizingOpenSearchThreadPoolExecutorTests} + * Tests for the automatic queue resizing of the {@code QueueResizingOpenSearchThreadPoolExecutor} * based on the time taken for each event. */ public class QueueResizingOpenSearchThreadPoolExecutorTests extends OpenSearchTestCase { diff --git a/server/src/test/java/org/opensearch/threadpool/ThreadPoolSerializationTests.java b/server/src/test/java/org/opensearch/threadpool/ThreadPoolSerializationTests.java index 6623cb0e188..80434519b0d 100644 --- a/server/src/test/java/org/opensearch/threadpool/ThreadPoolSerializationTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ThreadPoolSerializationTests.java @@ -141,6 +141,9 @@ public class ThreadPoolSerializationTests extends OpenSearchTestCase { StreamInput input = output.bytes().streamInput(); ThreadPool.Info newInfo = new ThreadPool.Info(input); + /* The SerDe patch converts RESIZABLE threadpool type value to FIXED. Implementing + * the same conversion in test to maintain parity. + */ assertThat(newInfo.getThreadPoolType(), is(threadPoolType)); } } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index f36f4dedfdf..1af6b03ff24 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -1895,12 +1895,6 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase { .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean()) .putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes .putList(DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), "file"); - if (rarely()) { - // Sometimes adjust the minimum search thread pool size, causing - // QueueResizingOpenSearchThreadPoolExecutor to be used instead of a regular - // fixed thread pool - builder.put("thread_pool.search.min_queue_size", 100); - } return builder.build(); }