Add resizable search queue to OpenSearch (picking up #826) (#3207)

* Make search/write queue resizable

Signed-off-by: Ruizhen <ruizhen@amazon.com>

* Address PR comments

Signed-off-by: Ruizhen <ruizhen@amazon.com>

* Refactoring resizable queue implementation

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Addressing code review comments

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Addressing code review comments

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Addressing code review comments

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

Co-authored-by: Ruizhen <ruizhen@amazon.com>
This commit is contained in:
Andriy Redko 2022-05-16 08:37:41 -04:00 committed by GitHub
parent e074395452
commit 38fb1d9275
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 600 additions and 25 deletions

View File

@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.common.util.concurrent;
/**
* Marks the thread pool executor as supporting EWMA (exponential weighted moving average) tracking
*
* @opensearch.internal
*/
public interface EWMATrackingThreadPoolExecutor {
/**
* This is a random starting point alpha
*/
double EWMA_ALPHA = 0.3;
/**
* Returns the exponentially weighted moving average of the task execution time
*/
double getTaskExecutionEWMA();
/**
* Returns the current queue size (operations that are queued)
*/
int getCurrentQueueSize();
}

View File

@ -177,6 +177,32 @@ public class OpenSearchExecutors {
);
}
public static OpenSearchThreadPoolExecutor newResizable(
String name,
int size,
int queueCapacity,
ThreadFactory threadFactory,
ThreadContext contextHolder
) {
if (queueCapacity <= 0) {
throw new IllegalArgumentException("queue capacity for [" + name + "] executor must be positive, got: " + queueCapacity);
}
return new QueueResizableOpenSearchThreadPoolExecutor(
name,
size,
size,
0,
TimeUnit.MILLISECONDS,
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity),
TimedRunnable::new,
threadFactory,
new OpenSearchAbortPolicy(),
contextHolder
);
}
/**
* Return a new executor that will automatically adjust the queue size based on queue throughput.
*

View File

@ -0,0 +1,176 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.common.util.concurrent;
import org.opensearch.common.ExponentiallyWeightedMovingAverage;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/**
* An extension to thread pool executor, which allows to adjusts the queue size of the
* {@code ResizableBlockingQueue} and tracks EWMA.
*
* @opensearch.internal
*/
public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearchThreadPoolExecutor
implements
EWMATrackingThreadPoolExecutor {
private final ResizableBlockingQueue<Runnable> workQueue;
private final Function<Runnable, WrappedRunnable> runnableWrapper;
private final ExponentiallyWeightedMovingAverage executionEWMA;
/**
* Create new resizable at runtime thread pool executor
* @param name thread pool name
* @param corePoolSize core pool size
* @param maximumPoolSize maximum pool size
* @param keepAliveTime keep alive time
* @param unit time unit for keep alive time
* @param workQueue work queue
* @param runnableWrapper runnable wrapper
* @param threadFactory thread factory
* @param handler rejected execution handler
* @param contextHolder context holder
*/
QueueResizableOpenSearchThreadPoolExecutor(
String name,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
ResizableBlockingQueue<Runnable> workQueue,
Function<Runnable, WrappedRunnable> runnableWrapper,
ThreadFactory threadFactory,
XRejectedExecutionHandler handler,
ThreadContext contextHolder
) {
this(
name,
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
runnableWrapper,
threadFactory,
handler,
contextHolder,
EWMA_ALPHA
);
}
/**
* Create new resizable at runtime thread pool executor
* @param name thread pool name
* @param corePoolSize core pool size
* @param maximumPoolSize maximum pool size
* @param keepAliveTime keep alive time
* @param unit time unit for keep alive time
* @param workQueue work queue
* @param runnableWrapper runnable wrapper
* @param threadFactory thread factory
* @param handler rejected execution handler
* @param contextHolder context holder
* @param ewmaAlpha the alpha parameter for exponentially weighted moving average (a smaller alpha means
* that new data points will have less weight, where a high alpha means older data points will
* have a lower influence)
*/
QueueResizableOpenSearchThreadPoolExecutor(
String name,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
ResizableBlockingQueue<Runnable> workQueue,
Function<Runnable, WrappedRunnable> runnableWrapper,
ThreadFactory threadFactory,
XRejectedExecutionHandler handler,
ThreadContext contextHolder,
double ewmaAlpha
) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder);
this.workQueue = workQueue;
this.runnableWrapper = runnableWrapper;
this.executionEWMA = new ExponentiallyWeightedMovingAverage(ewmaAlpha, 0);
}
@Override
protected Runnable wrapRunnable(Runnable command) {
return super.wrapRunnable(this.runnableWrapper.apply(command));
}
@Override
protected Runnable unwrap(Runnable runnable) {
final Runnable unwrapped = super.unwrap(runnable);
if (unwrapped instanceof WrappedRunnable) {
return ((WrappedRunnable) unwrapped).unwrap();
} else {
return unwrapped;
}
}
/**
* Returns the exponentially weighted moving average of the task execution time
*/
@Override
public double getTaskExecutionEWMA() {
return executionEWMA.getAverage();
}
/**
* Returns the current queue size (operations that are queued)
*/
@Override
public int getCurrentQueueSize() {
return workQueue.size();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// A task has been completed, it has left the building. We should now be able to get the
// total time as a combination of the time in the queue and time spent running the task. We
// only want runnables that did not throw errors though, because they could be fast-failures
// that throw off our timings, so only check when t is null.
assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";
final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);
final boolean failedOrRejected = timedRunnable.getFailedOrRejected();
final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos();
assert taskExecutionNanos >= 0 || (failedOrRejected && taskExecutionNanos == -1)
: "expected task to always take longer than 0 nanoseconds or have '-1' failure code, got: "
+ taskExecutionNanos
+ ", failedOrRejected: "
+ failedOrRejected;
if (taskExecutionNanos != -1) {
// taskExecutionNanos may be -1 if the task threw an exception
executionEWMA.addValue(taskExecutionNanos);
}
}
/**
* Resizes the work queue capacity of the pool
* @param capacity the new capacity
*/
public synchronized int resize(int capacity) {
final ResizableBlockingQueue<Runnable> resizableWorkQueue = (ResizableBlockingQueue<Runnable>) workQueue;
final int currentCapacity = resizableWorkQueue.capacity();
// Reusing adjustCapacity method instead of introducing the new one
return resizableWorkQueue.adjustCapacity(
currentCapacity < capacity ? capacity + 1 : capacity - 1,
StrictMath.abs(capacity - currentCapacity),
capacity,
capacity
);
}
}

View File

@ -51,10 +51,9 @@ import java.util.function.Function;
*
* @opensearch.internal
*/
public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchThreadPoolExecutor {
// This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable
public static double EWMA_ALPHA = 0.3;
public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchThreadPoolExecutor
implements
EWMATrackingThreadPoolExecutor {
private static final Logger logger = LogManager.getLogger(QueueResizingOpenSearchThreadPoolExecutor.class);
// The amount the queue size is adjusted by for each calcuation
@ -155,6 +154,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT
/**
* Returns the exponentially weighted moving average of the task execution time
*/
@Override
public double getTaskExecutionEWMA() {
return executionEWMA.getAverage();
}
@ -162,6 +162,7 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT
/**
* Returns the current queue size (operations that are queued)
*/
@Override
public int getCurrentQueueSize() {
return workQueue.size();
}

View File

@ -53,7 +53,7 @@ import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.Booleans;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.common.util.concurrent.QueueResizingOpenSearchThreadPoolExecutor;
import org.opensearch.common.util.concurrent.EWMATrackingThreadPoolExecutor;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.SearchContextSourcePrinter;
import org.opensearch.search.SearchService;
@ -290,8 +290,8 @@ public class QueryPhase {
);
ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
if (executor instanceof QueueResizingOpenSearchThreadPoolExecutor) {
QueueResizingOpenSearchThreadPoolExecutor rExecutor = (QueueResizingOpenSearchThreadPoolExecutor) executor;
if (executor instanceof EWMATrackingThreadPoolExecutor) {
final EWMATrackingThreadPoolExecutor rExecutor = (EWMATrackingThreadPoolExecutor) executor;
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
}

View File

@ -0,0 +1,116 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.threadpool;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.SizeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.node.Node;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
/**
* A builder for resizable executors.
*
* @opensearch.internal
*/
public final class ResizableExecutorBuilder extends ExecutorBuilder<ResizableExecutorBuilder.ResizableExecutorSettings> {
private final Setting<Integer> sizeSetting;
private final Setting<Integer> queueSizeSetting;
ResizableExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize) {
this(settings, name, size, queueSize, "thread_pool." + name);
}
public ResizableExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
super(name);
final String sizeKey = settingsKey(prefix, "size");
this.sizeSetting = new Setting<>(
sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
Setting.Property.NodeScope
);
final String queueSizeKey = settingsKey(prefix, "queue_size");
this.queueSizeSetting = Setting.intSetting(
queueSizeKey,
queueSize,
new Setting.Property[] { Setting.Property.NodeScope, Setting.Property.Dynamic }
);
}
@Override
public List<Setting<?>> getRegisteredSettings() {
return Arrays.asList(sizeSetting, queueSizeSetting);
}
@Override
ResizableExecutorSettings getSettings(Settings settings) {
final String nodeName = Node.NODE_NAME_SETTING.get(settings);
final int size = sizeSetting.get(settings);
final int queueSize = queueSizeSetting.get(settings);
return new ResizableExecutorSettings(nodeName, size, queueSize);
}
@Override
ThreadPool.ExecutorHolder build(final ResizableExecutorSettings settings, final ThreadContext threadContext) {
int size = settings.size;
int queueSize = settings.queueSize;
final ThreadFactory threadFactory = OpenSearchExecutors.daemonThreadFactory(
OpenSearchExecutors.threadName(settings.nodeName, name())
);
final ExecutorService executor = OpenSearchExecutors.newResizable(
settings.nodeName + "/" + name(),
size,
queueSize,
threadFactory,
threadContext
);
final ThreadPool.Info info = new ThreadPool.Info(
name(),
ThreadPool.ThreadPoolType.RESIZABLE,
size,
size,
null,
queueSize < 0 ? null : new SizeValue(queueSize)
);
return new ThreadPool.ExecutorHolder(executor, info);
}
@Override
String formatInfo(ThreadPool.Info info) {
return String.format(
Locale.ROOT,
"name [%s], size [%d], queue size [%s]",
info.getName(),
info.getMax(),
info.getQueueSize() == null ? "unbounded" : info.getQueueSize()
);
}
static class ResizableExecutorSettings extends ExecutorBuilder.ExecutorSettings {
private final int size;
private final int queueSize;
ResizableExecutorSettings(final String nodeName, final int size, final int queueSize) {
super(nodeName);
this.size = size;
this.queueSize = queueSize;
}
}
}

View File

@ -35,6 +35,7 @@ package org.opensearch.threadpool;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
@ -116,6 +117,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
public enum ThreadPoolType {
DIRECT("direct"),
FIXED("fixed"),
RESIZABLE("resizable"),
FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"),
SCALING("scaling");
@ -158,7 +160,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
map.put(Names.GET, ThreadPoolType.FIXED);
map.put(Names.ANALYZE, ThreadPoolType.FIXED);
map.put(Names.WRITE, ThreadPoolType.FIXED);
map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
map.put(Names.SEARCH, ThreadPoolType.RESIZABLE);
map.put(Names.MANAGEMENT, ThreadPoolType.SCALING);
map.put(Names.FLUSH, ThreadPoolType.SCALING);
map.put(Names.REFRESH, ThreadPoolType.SCALING);
@ -167,7 +169,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED);
map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING);
map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING);
map.put(Names.SEARCH_THROTTLED, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
map.put(Names.SEARCH_THROTTLED, ThreadPoolType.RESIZABLE);
map.put(Names.SYSTEM_READ, ThreadPoolType.FIXED);
map.put(Names.SYSTEM_WRITE, ThreadPoolType.FIXED);
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
@ -210,14 +212,8 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000));
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000));
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
builders.put(
Names.SEARCH,
new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000, 1000, 1000, 2000)
);
builders.put(
Names.SEARCH_THROTTLED,
new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, 100, 100, 200)
);
builders.put(Names.SEARCH, new ResizableExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000));
builders.put(Names.SEARCH_THROTTLED, new ResizableExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100));
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
// the assumption here is that the listeners should be very lightweight on the listeners side
@ -710,7 +706,13 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
if (type == ThreadPoolType.RESIZABLE && out.getVersion().before(Version.V_3_0_0)) {
// Opensearch on older version doesn't know about "resizable" thread pool. Convert RESIZABLE to FIXED
// to avoid serialization/de-serization issue between nodes with different OpenSearch version
out.writeString(ThreadPoolType.FIXED.getType());
} else {
out.writeString(type.getType());
}
out.writeInt(min);
out.writeInt(max);
out.writeOptionalTimeValue(keepAlive);

View File

@ -0,0 +1,226 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.common.util.concurrent;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchTestCase;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
/**
* Tests for the automatic queue resizing of the {@code QueueResizableOpenSearchThreadPoolExecutor}
* based on the time taken for each event.
*/
public class QueueResizableOpenSearchThreadPoolExecutorTests extends OpenSearchTestCase {
public void testResizeQueueSameSize() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 2000);
int threads = randomIntBetween(1, 10);
int measureWindow = randomIntBetween(100, 200);
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
"test-threadpool",
threads,
threads,
1000,
TimeUnit.MILLISECONDS,
queue,
fastWrapper(),
OpenSearchExecutors.daemonThreadFactory("queuetest"),
new OpenSearchAbortPolicy(),
context
);
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);
// Execute a task multiple times that takes 1ms
assertThat(executor.resize(1000), equalTo(1000));
executeTask(executor, (measureWindow * 5) + 2);
assertBusy(() -> { assertThat(queue.capacity(), lessThanOrEqualTo(1000)); });
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
public void testResizeQueueUp() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 2000);
int threads = randomIntBetween(1, 10);
int measureWindow = randomIntBetween(100, 200);
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
"test-threadpool",
threads,
threads,
1000,
TimeUnit.MILLISECONDS,
queue,
fastWrapper(),
OpenSearchExecutors.daemonThreadFactory("queuetest"),
new OpenSearchAbortPolicy(),
context
);
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);
// Execute a task multiple times that takes 1ms
assertThat(executor.resize(3000), equalTo(3000));
executeTask(executor, (measureWindow * 5) + 2);
assertBusy(() -> { assertThat(queue.capacity(), greaterThanOrEqualTo(2000)); });
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
public void testResizeQueueDown() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 2000);
int threads = randomIntBetween(1, 10);
int measureWindow = randomIntBetween(100, 200);
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
"test-threadpool",
threads,
threads,
1000,
TimeUnit.MILLISECONDS,
queue,
fastWrapper(),
OpenSearchExecutors.daemonThreadFactory("queuetest"),
new OpenSearchAbortPolicy(),
context
);
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);
// Execute a task multiple times that takes 1ms
assertThat(executor.resize(900), equalTo(900));
executeTask(executor, (measureWindow * 5) + 2);
assertBusy(() -> { assertThat(queue.capacity(), lessThanOrEqualTo(900)); });
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
public void testExecutionEWMACalculation() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 100);
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
"test-threadpool",
1,
1,
1000,
TimeUnit.MILLISECONDS,
queue,
fastWrapper(),
OpenSearchExecutors.daemonThreadFactory("queuetest"),
new OpenSearchAbortPolicy(),
context
);
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L));
executeTask(executor, 1);
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(30L)); });
executeTask(executor, 1);
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(51L)); });
executeTask(executor, 1);
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(65L)); });
executeTask(executor, 1);
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(75L)); });
executeTask(executor, 1);
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(83L)); });
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
/** Use a runnable wrapper that simulates a task with unknown failures. */
public void testExceptionThrowingTask() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 100);
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
"test-threadpool",
1,
1,
1000,
TimeUnit.MILLISECONDS,
queue,
exceptionalWrapper(),
OpenSearchExecutors.daemonThreadFactory("queuetest"),
new OpenSearchAbortPolicy(),
context
);
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L));
executeTask(executor, 1);
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
private Function<Runnable, WrappedRunnable> fastWrapper() {
return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(100), false);
}
/**
* The returned function outputs a WrappedRunnabled that simulates the case
* where {@link TimedRunnable#getTotalExecutionNanos()} returns -1 because
* the job failed or was rejected before it finished.
*/
private Function<Runnable, WrappedRunnable> exceptionalWrapper() {
return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(-1), true);
}
/** Execute a blank task {@code times} times for the executor */
private void executeTask(QueueResizableOpenSearchThreadPoolExecutor executor, int times) {
logger.info("--> executing a task [{}] times", times);
for (int i = 0; i < times; i++) {
executor.execute(() -> {});
}
}
public class SettableTimedRunnable extends TimedRunnable {
private final long timeTaken;
private final boolean testFailedOrRejected;
public SettableTimedRunnable(long timeTaken, boolean failedOrRejected) {
super(() -> {});
this.timeTaken = timeTaken;
this.testFailedOrRejected = failedOrRejected;
}
@Override
public long getTotalNanos() {
return timeTaken;
}
@Override
public long getTotalExecutionNanos() {
return timeTaken;
}
@Override
public boolean getFailedOrRejected() {
return testFailedOrRejected;
}
}
}

View File

@ -44,7 +44,7 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
/**
* Tests for the automatic queue resizing of the {@code QueueResizingOpenSearchThreadPoolExecutorTests}
* Tests for the automatic queue resizing of the {@code QueueResizingOpenSearchThreadPoolExecutor}
* based on the time taken for each event.
*/
public class QueueResizingOpenSearchThreadPoolExecutorTests extends OpenSearchTestCase {

View File

@ -141,6 +141,9 @@ public class ThreadPoolSerializationTests extends OpenSearchTestCase {
StreamInput input = output.bytes().streamInput();
ThreadPool.Info newInfo = new ThreadPool.Info(input);
/* The SerDe patch converts RESIZABLE threadpool type value to FIXED. Implementing
* the same conversion in test to maintain parity.
*/
assertThat(newInfo.getThreadPoolType(), is(threadPoolType));
}
}

View File

@ -1895,12 +1895,6 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
.put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean())
.putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes
.putList(DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), "file");
if (rarely()) {
// Sometimes adjust the minimum search thread pool size, causing
// QueueResizingOpenSearchThreadPoolExecutor to be used instead of a regular
// fixed thread pool
builder.put("thread_pool.search.min_queue_size", 100);
}
return builder.build();
}