From d09e64323f14d6dc5e64cb45b3fa71a1b2229a2f Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 20 Mar 2017 11:07:32 -0600 Subject: [PATCH] Add ability to automatically adjust search threadpool queue_size This PR adds a new thread pool type: `fixed_auto_queue_size`. This thread pool behaves like a regular `fixed` threadpool, except that every `auto_queue_frame_size` operations (default: 10,000) in the thread pool, [Little's Law](https://en.wikipedia.org/wiki/Little's_law) is calculated and used to adjust the pool's `queue_size` either up or down by 50. A minimum and maximum is taken into account also. When the min and max are the same value, a regular fixed executor is used instead. The `SEARCH` threadpool is changed to use this new type of thread pool. However, the min and max are both set to 1000, meaning auto adjustment is opt-in rather than opt-out. Resolves #3890 --- .../common/util/concurrent/EsExecutors.java | 28 +++ .../util/concurrent/EsThreadPoolExecutor.java | 2 +- .../QueueResizingEsThreadPoolExecutor.java | 234 ++++++++++++++++++ .../concurrent/ResizableBlockingQueue.java | 80 ++++++ .../util/concurrent/SizeBlockingQueue.java | 4 +- .../common/util/concurrent/TimedRunnable.java | 56 +++++ .../AutoQueueAdjustingExecutorBuilder.java | 166 +++++++++++++ .../threadpool/ExecutorBuilder.java | 9 + .../threadpool/FixedExecutorBuilder.java | 8 - .../elasticsearch/threadpool/ThreadPool.java | 15 +- ...ueueResizingEsThreadPoolExecutorTests.java | 226 +++++++++++++++++ .../ResizableBlockingQueueTests.java | 52 ++++ ...utoQueueAdjustingExecutorBuilderTests.java | 41 +++ docs/reference/modules/threadpool.asciidoc | 51 +++- 14 files changed, 955 insertions(+), 17 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java create mode 100644 core/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.java create mode 100644 core/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java create mode 100644 core/src/main/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilder.java create mode 100644 core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java create mode 100644 core/src/test/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueueTests.java create mode 100644 core/src/test/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilderTests.java diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 3a5e3b4dab4..b37a6e14f02 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -22,6 +22,7 @@ package org.elasticsearch.common.util.concurrent; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.Node; import java.util.Arrays; @@ -79,6 +80,33 @@ public class EsExecutors { return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder); } + /** + * Return a new executor that will automatically adjust the queue size based on queue throughput. + * + * @param size number of fixed threads to use for executing tasks + * @param initialQueueCapacity initial size of the executor queue + * @param minQueueSize minimum queue size that the queue can be adjusted to + * @param maxQueueSize maximum queue size that the queue can be adjusted to + * @param frameSize number of tasks during which stats are collected before adjusting queue size + */ + public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize, + int maxQueueSize, int frameSize, TimeValue targetedResponseTime, + ThreadFactory threadFactory, ThreadContext contextHolder) { + if (initialQueueCapacity == minQueueSize && initialQueueCapacity == maxQueueSize) { + return newFixed(name, size, initialQueueCapacity, threadFactory, contextHolder); + } + + if (initialQueueCapacity <= 0) { + throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " + + initialQueueCapacity); + } + ResizableBlockingQueue queue = + new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), initialQueueCapacity); + return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, + queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory, + new EsAbortPolicy(), contextHolder); + } + private static final ExecutorService DIRECT_EXECUTOR_SERVICE = new AbstractExecutorService() { @Override diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 9662292cf69..a1ac182b8dc 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -37,7 +37,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { /** * Name used in error reporting. */ - private final String name; + protected final String name; EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) { diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java new file mode 100644 index 00000000000..854dc862315 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java @@ -0,0 +1,234 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ResizableBlockingQueue; + +import java.util.Locale; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Stream; + +/** + * An extension to thread pool executor, which automatically adjusts the queue size of the + * {@code ResizableBlockingQueue} according to Little's Law. + */ +public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecutor { + + private static final Logger logger = + ESLoggerFactory.getLogger(QueueResizingEsThreadPoolExecutor.class); + + private final Function runnableWrapper; + private final ResizableBlockingQueue workQueue; + private final int tasksPerFrame; + private final int minQueueSize; + private final int maxQueueSize; + private final long targetedResponseTimeNanos; + // The amount the queue size is adjusted by for each calcuation + private static final int QUEUE_ADJUSTMENT_AMOUNT = 50; + + private final AtomicLong totalTaskNanos = new AtomicLong(0); + private final AtomicInteger taskCount = new AtomicInteger(0); + + private long startNs; + + QueueResizingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + ResizableBlockingQueue workQueue, int minQueueSize, int maxQueueSize, + Function runnableWrapper, final int tasksPerFrame, + TimeValue targetedResponseTime, ThreadFactory threadFactory, XRejectedExecutionHandler handler, + ThreadContext contextHolder) { + super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, + workQueue, threadFactory, handler, contextHolder); + this.runnableWrapper = runnableWrapper; + this.workQueue = workQueue; + this.tasksPerFrame = tasksPerFrame; + this.startNs = System.nanoTime(); + this.minQueueSize = minQueueSize; + this.maxQueueSize = maxQueueSize; + this.targetedResponseTimeNanos = targetedResponseTime.getNanos(); + logger.debug("thread pool [{}] will adjust queue by [{}] when determining automatic queue size", + name, QUEUE_ADJUSTMENT_AMOUNT); + } + + @Override + protected void doExecute(final Runnable command) { + // we are submitting a task, it has not yet started running (because super.excute() has not + // been called), but it could be immediately run, or run at a later time. We need the time + // this task entered the queue, which we get by creating a TimedRunnable, which starts the + // clock as soon as it is created. + super.doExecute(this.runnableWrapper.apply(command)); + } + + /** + * Calculate task rate (λ), for a fixed number of tasks and time it took those tasks to be measured + * + * @param totalNumberOfTasks total number of tasks that were measured + * @param totalFrameFrameNanos nanoseconds during which the tasks were received + * @return the rate of tasks in the system + */ + static double calculateLambda(final int totalNumberOfTasks, final long totalFrameFrameNanos) { + assert totalFrameFrameNanos > 0 : "cannot calculate for instantaneous tasks"; + assert totalNumberOfTasks > 0 : "cannot calculate for no tasks"; + // There is no set execution time, instead we adjust the time window based on the + // number of completed tasks, so there is no background thread required to update the + // queue size at a regular interval. This means we need to calculate our λ by the + // total runtime, rather than a fixed interval. + + // λ = total tasks divided by measurement time + return (double) totalNumberOfTasks / totalFrameFrameNanos; + } + + /** + * Calculate Little's Law (L), which is the "optimal" queue size for a particular task rate (lambda) and targeted response time. + * + * @param lambda the arrival rate of tasks in nanoseconds + * @param targetedResponseTimeNanos nanoseconds for the average targeted response rate of requests + * @return the optimal queue size for the give task rate and targeted response time + */ + static int calculateL(final double lambda, final long targetedResponseTimeNanos) { + assert targetedResponseTimeNanos > 0 : "cannot calculate for instantaneous requests"; + // L = λ * W + return Math.toIntExact((long)(lambda * targetedResponseTimeNanos)); + } + + /** + * Returns the current queue capacity + */ + public int getCurrentCapacity() { + return workQueue.capacity(); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + // A task has been completed, it has left the building. We should now be able to get the + // total time as a combination of the time in the queue and time spent running the task. We + // only want runnables that did not throw errors though, because they could be fast-failures + // that throw off our timings, so only check when t is null. + assert r instanceof TimedRunnable : "expected only TimedRunnables in queue"; + final long taskNanos = ((TimedRunnable) r).getTotalNanos(); + final long totalNanos = totalTaskNanos.addAndGet(taskNanos); + if (taskCount.incrementAndGet() == this.tasksPerFrame) { + final long endTimeNs = System.nanoTime(); + final long totalRuntime = endTimeNs - this.startNs; + // Reset the start time for all tasks. At first glance this appears to need to be + // volatile, since we are reading from a different thread when it is set, but it + // is protected by the taskCount memory barrier. + // See: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html + startNs = endTimeNs; + + // Calculate the new desired queue size + try { + final double lambda = calculateLambda(tasksPerFrame, totalNanos); + final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos); + if (logger.isDebugEnabled()) { + final long avgTaskTime = totalNanos / tasksPerFrame; + logger.debug("[{}]: there were [{}] tasks in [{}], avg task time: [{}], [{} tasks/s], " + + "optimal queue is [{}]", + name, + tasksPerFrame, + TimeValue.timeValueNanos(totalRuntime), + TimeValue.timeValueNanos(avgTaskTime), + String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()), + desiredQueueSize); + } + + final int oldCapacity = workQueue.capacity(); + + // Adjust the queue size towards the desired capacity using an adjust of + // QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max + // values the queue size can have. + final int newCapacity = + workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize); + if (oldCapacity != newCapacity && logger.isDebugEnabled()) { + logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", name, + newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT, + oldCapacity, newCapacity); + } + } catch (ArithmeticException e) { + // There was an integer overflow, so just log about it, rather than adjust the queue size + logger.warn((Supplier) () -> new ParameterizedMessage( + "failed to calculate optimal queue size for [{}] thread pool, " + + "total frame time [{}ns], tasks [{}], task execution time [{}ns]", + name, totalRuntime, tasksPerFrame, totalNanos), + e); + } finally { + // Finally, decrement the task count and time back to their starting values. We + // do this at the end so there is no concurrent adjustments happening. We also + // decrement them instead of resetting them back to zero, as resetting them back + // to zero causes operations that came in during the adjustment to be uncounted + int tasks = taskCount.addAndGet(-this.tasksPerFrame); + assert tasks >= 0 : "tasks should never be negative, got: " + tasks; + + if (tasks >= this.tasksPerFrame) { + // Start over, because we can potentially reach a "never adjusting" state, + // + // consider the following: + // - If the frame window is 10, and there are 10 tasks, then an adjustment will begin. (taskCount == 10) + // - Prior to the adjustment being done, 15 more tasks come in, the taskCount is now 25 + // - Adjustment happens and we decrement the tasks by 10, taskCount is now 15 + // - Since taskCount will now be incremented forever, it will never be 10 again, + // so there will be no further adjustments + logger.debug("[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", name); + totalTaskNanos.getAndSet(0); + taskCount.getAndSet(0); + startNs = System.nanoTime(); + } else { + // Do a regular adjustment + totalTaskNanos.addAndGet(-totalNanos); + } + } + } + } + + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + b.append(getClass().getSimpleName()).append('['); + b.append(name).append(", "); + + @SuppressWarnings("rawtypes") + ResizableBlockingQueue queue = (ResizableBlockingQueue) getQueue(); + + b.append("queue capacity = ").append(getCurrentCapacity()).append(", "); + b.append("min queue capacity = ").append(minQueueSize).append(", "); + b.append("max queue capacity = ").append(maxQueueSize).append(", "); + b.append("frame size = ").append(tasksPerFrame).append(", "); + b.append("targeted response rate = ").append(TimeValue.timeValueNanos(targetedResponseTimeNanos)).append(", "); + b.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", "); + /* + * ThreadPoolExecutor has some nice information in its toString but we + * can't get at it easily without just getting the toString. + */ + b.append(super.toString()).append(']'); + return b.toString(); + } +} diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.java new file mode 100644 index 00000000000..ca6f6030bb0 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.java @@ -0,0 +1,80 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import java.util.concurrent.BlockingQueue; +import org.elasticsearch.common.SuppressForbidden; + +/** + * Extends the {@code SizeBlockingQueue} to add the {@code adjustCapacity} method, which will adjust + * the capacity by a certain amount towards a maximum or minimum. + */ +final class ResizableBlockingQueue extends SizeBlockingQueue { + + private volatile int capacity; + + ResizableBlockingQueue(BlockingQueue queue, int initialCapacity) { + super(queue, initialCapacity); + this.capacity = initialCapacity; + } + + @SuppressForbidden(reason = "optimalCapacity is non-negative, therefore the difference cannot be < -Integer.MAX_VALUE") + private int getChangeAmount(int optimalCapacity) { + assert optimalCapacity >= 0 : "optimal capacity should always be positive, got: " + optimalCapacity; + return Math.abs(optimalCapacity - this.capacity); + } + + @Override + public int capacity() { + return this.capacity; + } + + @Override + public int remainingCapacity() { + return Math.max(0, this.capacity()); + } + + /** Resize the limit for the queue, returning the new size limit */ + public synchronized int adjustCapacity(int optimalCapacity, int adjustmentAmount, int minCapacity, int maxCapacity) { + assert adjustmentAmount > 0 : "adjustment amount should be a positive value"; + assert optimalCapacity >= 0 : "desired capacity cannot be negative"; + assert minCapacity >= 0 : "cannot have min capacity smaller than 0"; + assert maxCapacity >= minCapacity : "cannot have max capacity smaller than min capacity"; + + if (optimalCapacity == capacity) { + // Yahtzee! + return this.capacity; + } + + if (optimalCapacity > capacity + adjustmentAmount) { + // adjust up + final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount); + this.capacity = newCapacity; + return newCapacity; + } else if (optimalCapacity < capacity - adjustmentAmount) { + // adjust down + final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount); + this.capacity = newCapacity; + return newCapacity; + } else { + return this.capacity; + } + } +} diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java index bff4ee613e1..c4142152ccf 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java @@ -131,7 +131,7 @@ public class SizeBlockingQueue extends AbstractQueue implements BlockingQu @Override public boolean offer(E e) { int count = size.incrementAndGet(); - if (count > capacity) { + if (count > capacity()) { size.decrementAndGet(); return false; } @@ -168,7 +168,7 @@ public class SizeBlockingQueue extends AbstractQueue implements BlockingQu @Override public int remainingCapacity() { - return capacity - size.get(); + return capacity() - size.get(); } @Override diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java new file mode 100644 index 00000000000..91ad6e46efa --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java @@ -0,0 +1,56 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +/** + * A class used to wrap a {@code Runnable} that allows capturing the time the task since creation + * through execution. + */ +class TimedRunnable implements Runnable { + private final Runnable original; + private final long creationTimeNanos; + private long finishTimeNanos = -1; + + TimedRunnable(Runnable original) { + this.original = original; + this.creationTimeNanos = System.nanoTime(); + } + + @Override + public void run() { + try { + original.run(); + } finally { + finishTimeNanos = System.nanoTime(); + } + } + + /** + * Return the time since this task was created until it finished running. + * If the task is still running or has not yet been run, returns -1. + */ + long getTotalNanos() { + if (finishTimeNanos == -1) { + // There must have been an exception thrown, the total time is unknown (-1) + return -1; + } + return finishTimeNanos - creationTimeNanos; + } +} diff --git a/core/src/main/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilder.java b/core/src/main/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilder.java new file mode 100644 index 00000000000..265e544d281 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilder.java @@ -0,0 +1,166 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.SizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.node.Node; +import org.elasticsearch.threadpool.ExecutorBuilder; +import org.elasticsearch.common.util.concurrent.QueueResizingEsThreadPoolExecutor; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; + +/** + * A builder for executors that automatically adjust the queue length as needed, depending on + * Little's Law. See https://en.wikipedia.org/wiki/Little's_law for more information. + */ +public final class AutoQueueAdjustingExecutorBuilder extends ExecutorBuilder { + + private final Setting sizeSetting; + private final Setting queueSizeSetting; + private final Setting minQueueSizeSetting; + private final Setting maxQueueSizeSetting; + private final Setting targetedResponseTimeSetting; + private final Setting frameSizeSetting; + + AutoQueueAdjustingExecutorBuilder(final Settings settings, final String name, final int size, + final int initialQueueSize, final int minQueueSize, + final int maxQueueSize, final int frameSize) { + super(name); + final String prefix = "thread_pool." + name; + final String sizeKey = settingsKey(prefix, "size"); + this.sizeSetting = + new Setting<>( + sizeKey, + s -> Integer.toString(size), + s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), + Setting.Property.NodeScope); + final String queueSizeKey = settingsKey(prefix, "queue_size"); + final String minSizeKey = settingsKey(prefix, "min_queue_size"); + final String maxSizeKey = settingsKey(prefix, "max_queue_size"); + final String frameSizeKey = settingsKey(prefix, "auto_queue_frame_size"); + final String targetedResponseTimeKey = settingsKey(prefix, "target_response_time"); + this.targetedResponseTimeSetting = Setting.timeSetting(targetedResponseTimeKey, TimeValue.timeValueSeconds(1), + TimeValue.timeValueMillis(10), Setting.Property.NodeScope); + this.queueSizeSetting = Setting.intSetting(queueSizeKey, initialQueueSize, Setting.Property.NodeScope); + // These temp settings are used to validate the min and max settings below + Setting tempMaxQueueSizeSetting = Setting.intSetting(maxSizeKey, maxQueueSize, Setting.Property.NodeScope); + Setting tempMinQueueSizeSetting = Setting.intSetting(minSizeKey, minQueueSize, Setting.Property.NodeScope); + + this.minQueueSizeSetting = new Setting<>( + minSizeKey, + (s) -> Integer.toString(minQueueSize), + (s) -> Setting.parseInt(s, 0, tempMaxQueueSizeSetting.get(settings), minSizeKey), + Setting.Property.NodeScope); + this.maxQueueSizeSetting = new Setting<>( + maxSizeKey, + (s) -> Integer.toString(maxQueueSize), + (s) -> Setting.parseInt(s, tempMinQueueSizeSetting.get(settings), Integer.MAX_VALUE, maxSizeKey), + Setting.Property.NodeScope); + this.frameSizeSetting = Setting.intSetting(frameSizeKey, frameSize, 100, Setting.Property.NodeScope); + } + + @Override + public List> getRegisteredSettings() { + return Arrays.asList(sizeSetting, queueSizeSetting, minQueueSizeSetting, + maxQueueSizeSetting, frameSizeSetting, targetedResponseTimeSetting); + } + + @Override + AutoExecutorSettings getSettings(Settings settings) { + final String nodeName = Node.NODE_NAME_SETTING.get(settings); + final int size = sizeSetting.get(settings); + final int initialQueueSize = queueSizeSetting.get(settings); + final int minQueueSize = minQueueSizeSetting.get(settings); + final int maxQueueSize = maxQueueSizeSetting.get(settings); + final int frameSize = frameSizeSetting.get(settings); + final TimeValue targetedResponseTime = targetedResponseTimeSetting.get(settings); + return new AutoExecutorSettings(nodeName, size, initialQueueSize, minQueueSize, maxQueueSize, frameSize, targetedResponseTime); + } + + @Override + ThreadPool.ExecutorHolder build(final AutoExecutorSettings settings, + final ThreadContext threadContext) { + int size = settings.size; + int initialQueueSize = settings.initialQueueSize; + int minQueueSize = settings.minQueueSize; + int maxQueueSize = settings.maxQueueSize; + int frameSize = settings.frameSize; + TimeValue targetedResponseTime = settings.targetedResponseTime; + final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name())); + final ExecutorService executor = + EsExecutors.newAutoQueueFixed(name(), size, initialQueueSize, minQueueSize, + maxQueueSize, frameSize, targetedResponseTime, threadFactory, threadContext); + // TODO: in a subsequent change we hope to extend ThreadPool.Info to be more specific for the thread pool type + final ThreadPool.Info info = + new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE, + size, size, null, new SizeValue(initialQueueSize)); + return new ThreadPool.ExecutorHolder(executor, info); + } + + @Override + String formatInfo(ThreadPool.Info info) { + return String.format( + Locale.ROOT, + "name [%s], size [%d], queue size [%s]", + info.getName(), + info.getMax(), + info.getQueueSize() == null ? "unbounded" : info.getQueueSize()); + } + + static final class AutoExecutorSettings extends ExecutorBuilder.ExecutorSettings { + + private final int size; + private final int initialQueueSize; + private final int minQueueSize; + private final int maxQueueSize; + private final int frameSize; + private final TimeValue targetedResponseTime; + + AutoExecutorSettings(final String nodeName, final int size, final int initialQueueSize, + final int minQueueSize, final int maxQueueSize, final int frameSize, + final TimeValue targetedResponseTime) { + super(nodeName); + this.size = size; + this.initialQueueSize = initialQueueSize; + this.minQueueSize = minQueueSize; + this.maxQueueSize = maxQueueSize; + this.frameSize = frameSize; + this.targetedResponseTime = targetedResponseTime; + } + + } + +} diff --git a/core/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java b/core/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java index 54f5ab0af38..314eb1df71a 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java @@ -21,6 +21,7 @@ package org.elasticsearch.threadpool; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; import java.util.List; @@ -46,6 +47,14 @@ public abstract class ExecutorBuilder> getRegisteredSettings() { return Arrays.asList(sizeSetting, queueSizeSetting); diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index f72956c4202..7b0c4eb752b 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.Counter; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; @@ -85,6 +86,7 @@ public class ThreadPool extends AbstractComponent implements Closeable { public enum ThreadPoolType { DIRECT("direct"), FIXED("fixed"), + FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"), SCALING("scaling"); private final String type; @@ -126,7 +128,7 @@ public class ThreadPool extends AbstractComponent implements Closeable { map.put(Names.GET, ThreadPoolType.FIXED); map.put(Names.INDEX, ThreadPoolType.FIXED); map.put(Names.BULK, ThreadPoolType.FIXED); - map.put(Names.SEARCH, ThreadPoolType.FIXED); + map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE); map.put(Names.MANAGEMENT, ThreadPoolType.SCALING); map.put(Names.FLUSH, ThreadPoolType.SCALING); map.put(Names.REFRESH, ThreadPoolType.SCALING); @@ -171,7 +173,8 @@ public class ThreadPool extends AbstractComponent implements Closeable { builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200)); builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); - builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000)); + builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, + Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side @@ -608,7 +611,13 @@ public class ThreadPool extends AbstractComponent implements Closeable { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(name); - out.writeString(type.getType()); + if (type == ThreadPoolType.FIXED_AUTO_QUEUE_SIZE && + out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { + // 5.x doesn't know about the "fixed_auto_queue_size" thread pool type, just write fixed. + out.writeString(ThreadPoolType.FIXED.getType()); + } else { + out.writeString(type.getType()); + } out.writeInt(min); out.writeInt(max); out.writeOptionalWriteable(keepAlive); diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java new file mode 100644 index 00000000000..732ec94ae10 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java @@ -0,0 +1,226 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; + +/** + * Tests for the automatic queue resizing of the {@code QueueResizingEsThreadPoolExecutorTests} + * based on the time taken for each event. + */ +public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase { + + public void testExactWindowSizeAdjustment() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + ResizableBlockingQueue queue = + new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), 100); + + int threads = randomIntBetween(1, 3); + int measureWindow = 3; + logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow); + QueueResizingEsThreadPoolExecutor executor = + new QueueResizingEsThreadPoolExecutor( + "test-threadpool", threads, threads, 1000, + TimeUnit.MILLISECONDS, queue, 10, 1000, fastWrapper(), + measureWindow, TimeValue.timeValueMillis(1), EsExecutors.daemonThreadFactory("queuetest"), + new EsAbortPolicy(), context); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + // Execute exactly 3 (measureWindow) times + executor.execute(() -> {}); + executor.execute(() -> {}); + executor.execute(() -> {}); + + // The queue capacity should have increased by 50 since they were very fast tasks + assertBusy(() -> { + assertThat(queue.capacity(), equalTo(150)); + }); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + context.close(); + } + + public void testAutoQueueSizingUp() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + ResizableBlockingQueue queue = + new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), + 2000); + + int threads = randomIntBetween(1, 10); + int measureWindow = randomIntBetween(100, 200); + logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow); + QueueResizingEsThreadPoolExecutor executor = + new QueueResizingEsThreadPoolExecutor( + "test-threadpool", threads, threads, 1000, + TimeUnit.MILLISECONDS, queue, 10, 3000, fastWrapper(), + measureWindow, TimeValue.timeValueMillis(1), EsExecutors.daemonThreadFactory("queuetest"), + new EsAbortPolicy(), context); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + // Execute a task multiple times that takes 1ms + executeTask(executor, (measureWindow * 5) + 2); + + assertBusy(() -> { + assertThat(queue.capacity(), greaterThan(2000)); + }); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + context.close(); + } + + public void testAutoQueueSizingDown() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + ResizableBlockingQueue queue = + new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), + 2000); + + int threads = randomIntBetween(1, 10); + int measureWindow = randomIntBetween(100, 200); + logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow); + QueueResizingEsThreadPoolExecutor executor = + new QueueResizingEsThreadPoolExecutor( + "test-threadpool", threads, threads, 1000, + TimeUnit.MILLISECONDS, queue, 10, 3000, slowWrapper(), measureWindow, TimeValue.timeValueMillis(1), + EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + // Execute a task multiple times that takes 1m + executeTask(executor, (measureWindow * 5) + 2); + + assertBusy(() -> { + assertThat(queue.capacity(), lessThan(2000)); + }); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + context.close(); + } + + public void testAutoQueueSizingWithMin() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + ResizableBlockingQueue queue = + new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), + 5000); + + int threads = randomIntBetween(1, 5); + int measureWindow = randomIntBetween(10, 100);; + int min = randomIntBetween(4981, 4999); + logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow); + QueueResizingEsThreadPoolExecutor executor = + new QueueResizingEsThreadPoolExecutor( + "test-threadpool", threads, threads, 1000, + TimeUnit.MILLISECONDS, queue, min, 100000, slowWrapper(), measureWindow, TimeValue.timeValueMillis(1), + EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + // Execute a task multiple times that takes 1m + executeTask(executor, (measureWindow * 5)); + + // The queue capacity should decrease, but no lower than the minimum + assertBusy(() -> { + assertThat(queue.capacity(), equalTo(min)); + }); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + context.close(); + } + + public void testAutoQueueSizingWithMax() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + ResizableBlockingQueue queue = + new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), + 5000); + + int threads = randomIntBetween(1, 5); + int measureWindow = randomIntBetween(10, 100); + int max = randomIntBetween(5010, 5024); + logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow); + QueueResizingEsThreadPoolExecutor executor = + new QueueResizingEsThreadPoolExecutor( + "test-threadpool", threads, threads, 1000, + TimeUnit.MILLISECONDS, queue, 10, max, fastWrapper(), measureWindow, TimeValue.timeValueMillis(1), + EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + // Execute a task multiple times that takes 1ms + executeTask(executor, measureWindow * 3); + + // The queue capacity should increase, but no higher than the maximum + assertBusy(() -> { + assertThat(queue.capacity(), equalTo(max)); + }); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + context.close(); + } + + private Function randomBetweenLimitsWrapper(final int minNs, final int maxNs) { + return (runnable) -> { + return new SettableTimedRunnable(randomIntBetween(minNs, maxNs)); + }; + } + + private Function fastWrapper() { + return (runnable) -> { + return new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(50)); + }; + } + + private Function slowWrapper() { + return (runnable) -> { + return new SettableTimedRunnable(TimeUnit.MINUTES.toNanos(2)); + }; + } + + /** Execute a blank task {@code times} times for the executor */ + private void executeTask(QueueResizingEsThreadPoolExecutor executor, int times) { + logger.info("--> executing a task [{}] times", times); + for (int i = 0; i < times; i++) { + executor.execute(() -> {}); + } + } + + public class SettableTimedRunnable extends TimedRunnable { + private final long timeTaken; + + public SettableTimedRunnable(long timeTaken) { + super(() -> {}); + this.timeTaken = timeTaken; + } + + @Override + public long getTotalNanos() { + return timeTaken; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueueTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueueTests.java new file mode 100644 index 00000000000..b1d5b9bc1bc --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueueTests.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class ResizableBlockingQueueTests extends ESTestCase { + + public void testAdjustCapacity() throws Exception { + ResizableBlockingQueue queue = + new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), + 100); + + assertThat(queue.capacity(), equalTo(100)); + // Queue size already equal to desired capacity + queue.adjustCapacity(100, 25, 1, 1000); + assertThat(queue.capacity(), equalTo(100)); + // Not worth adjusting + queue.adjustCapacity(99, 25, 1, 1000); + assertThat(queue.capacity(), equalTo(100)); + // Not worth adjusting + queue.adjustCapacity(75, 25, 1, 1000); + assertThat(queue.capacity(), equalTo(100)); + queue.adjustCapacity(74, 25, 1, 1000); + assertThat(queue.capacity(), equalTo(75)); + queue.adjustCapacity(1000000, 25, 1, 1000); + assertThat(queue.capacity(), equalTo(100)); + queue.adjustCapacity(1, 25, 80, 1000); + assertThat(queue.capacity(), equalTo(80)); + queue.adjustCapacity(1000000, 25, 80, 100); + assertThat(queue.capacity(), equalTo(100)); + } +} diff --git a/core/src/test/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilderTests.java b/core/src/test/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilderTests.java new file mode 100644 index 00000000000..836193423f1 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilderTests.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.settings.Settings; + +import static org.hamcrest.CoreMatchers.containsString; + +public class AutoQueueAdjustingExecutorBuilderTests extends ESThreadPoolTestCase { + + public void testValidatingMinMaxSettings() throws Exception { + Settings settings = Settings.builder() + .put("thread_pool.search.min_queue_size", randomIntBetween(30, 100)) + .put("thread_pool.search.max_queue_size", randomIntBetween(1,25)) + .build(); + try { + new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 15, 1, 100, 10); + fail("should have thrown an exception"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("Failed to parse value")); + } + } + +} diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 01b204b510f..a9a5d2993c0 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -20,9 +20,10 @@ There are several thread pools, but the important ones include: is `1 + # of available processors`. `search`:: - For count/search/suggest operations. Thread pool type is `fixed` - with a size of `int((# of available_processors * 3) / 2) + 1`, - queue_size of `1000`. + For count/search/suggest operations. Thread pool type is + `fixed_auto_queue_size` with a size of + `int((# of available_processors * 3) / 2) + 1`, and initial queue_size of + `1000`. `get`:: For get operations. Thread pool type is `fixed` @@ -90,6 +91,50 @@ thread_pool: queue_size: 1000 -------------------------------------------------- +[float] +==== `fixed_auto_queue_size` + +The `fixed_auto_queue_size` thread pool holds a fixed size of threads to handle +the requests with a bounded queue for pending requests that have no threads to +service them. It's similar to the `fixed` threadpool, however, the `queue_size` +automatically adjusts according to calculations based on +https://en.wikipedia.org/wiki/Little%27s_law[Little's Law]. These calculations +will potentially adjust the `queue_size` up or down by 50 every time +`auto_queue_frame_size` operations have been completed. + +The `size` parameter controls the number of threads, and defaults to the +number of cores times 5. + +The `queue_size` allows to control the initial size of the queue of pending +requests that have no threads to execute them. + +The `min_queue_size` setting controls the minimum amount the `queue_size` can be +adjusted to. + +The `max_queue_size` setting controls the maximum amount the `queue_size` can be +adjusted to. + +The `auto_queue_frame_size` setting controls the number of operations during +which measurement is taken before the queue is adjusted. It should be large +enough that a single operation cannot unduly bias the calculation. + +The `target_response_rate` is a time value setting that indicates the targeted +average response time for tasks in the thread pool queue. If tasks are routinely +above this time, the thread pool queue will be adjusted down so that tasks are +rejected. + +[source,yaml] +-------------------------------------------------- +thread_pool: + search: + size: 30 + queue_size: 500 + min_queue_size: 10 + max_queue_size: 1000 + auto_queue_frame_size: 2000 + target_response_rate: 1s +-------------------------------------------------- + [float] ==== `scaling`