Add ability to automatically adjust search threadpool queue_size

This PR adds a new thread pool type: `fixed_auto_queue_size`. This thread pool
behaves like a regular `fixed` threadpool, except that every
`auto_queue_frame_size` operations (default: 10,000) in the thread pool,
[Little's Law](https://en.wikipedia.org/wiki/Little's_law) is calculated and
used to adjust the pool's `queue_size` either up or down by 50. A minimum and
maximum is taken into account also. When the min and max are the same value, a
regular fixed executor is used instead.

The `SEARCH` threadpool is changed to use this new type of thread pool. However,
the min and max are both set to 1000, meaning auto adjustment is opt-in rather
than opt-out.

Resolves #3890
This commit is contained in:
Lee Hinman 2017-03-20 11:07:32 -06:00
parent 7ef390068a
commit d09e64323f
14 changed files with 955 additions and 17 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import java.util.Arrays;
@ -79,6 +80,33 @@ public class EsExecutors {
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder);
}
/**
* Return a new executor that will automatically adjust the queue size based on queue throughput.
*
* @param size number of fixed threads to use for executing tasks
* @param initialQueueCapacity initial size of the executor queue
* @param minQueueSize minimum queue size that the queue can be adjusted to
* @param maxQueueSize maximum queue size that the queue can be adjusted to
* @param frameSize number of tasks during which stats are collected before adjusting queue size
*/
public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize,
int maxQueueSize, int frameSize, TimeValue targetedResponseTime,
ThreadFactory threadFactory, ThreadContext contextHolder) {
if (initialQueueCapacity == minQueueSize && initialQueueCapacity == maxQueueSize) {
return newFixed(name, size, initialQueueCapacity, threadFactory, contextHolder);
}
if (initialQueueCapacity <= 0) {
throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " +
initialQueueCapacity);
}
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity);
return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory,
new EsAbortPolicy(), contextHolder);
}
private static final ExecutorService DIRECT_EXECUTOR_SERVICE = new AbstractExecutorService() {
@Override

View File

@ -37,7 +37,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
/**
* Name used in error reporting.
*/
private final String name;
protected final String name;
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {

View File

@ -0,0 +1,234 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ResizableBlockingQueue;
import java.util.Locale;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
/**
* An extension to thread pool executor, which automatically adjusts the queue size of the
* {@code ResizableBlockingQueue} according to Little's Law.
*/
public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecutor {
private static final Logger logger =
ESLoggerFactory.getLogger(QueueResizingEsThreadPoolExecutor.class);
private final Function<Runnable, Runnable> runnableWrapper;
private final ResizableBlockingQueue<Runnable> workQueue;
private final int tasksPerFrame;
private final int minQueueSize;
private final int maxQueueSize;
private final long targetedResponseTimeNanos;
// The amount the queue size is adjusted by for each calcuation
private static final int QUEUE_ADJUSTMENT_AMOUNT = 50;
private final AtomicLong totalTaskNanos = new AtomicLong(0);
private final AtomicInteger taskCount = new AtomicInteger(0);
private long startNs;
QueueResizingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ResizableBlockingQueue<Runnable> workQueue, int minQueueSize, int maxQueueSize,
Function<Runnable, Runnable> runnableWrapper, final int tasksPerFrame,
TimeValue targetedResponseTime, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
ThreadContext contextHolder) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler, contextHolder);
this.runnableWrapper = runnableWrapper;
this.workQueue = workQueue;
this.tasksPerFrame = tasksPerFrame;
this.startNs = System.nanoTime();
this.minQueueSize = minQueueSize;
this.maxQueueSize = maxQueueSize;
this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
logger.debug("thread pool [{}] will adjust queue by [{}] when determining automatic queue size",
name, QUEUE_ADJUSTMENT_AMOUNT);
}
@Override
protected void doExecute(final Runnable command) {
// we are submitting a task, it has not yet started running (because super.excute() has not
// been called), but it could be immediately run, or run at a later time. We need the time
// this task entered the queue, which we get by creating a TimedRunnable, which starts the
// clock as soon as it is created.
super.doExecute(this.runnableWrapper.apply(command));
}
/**
* Calculate task rate (λ), for a fixed number of tasks and time it took those tasks to be measured
*
* @param totalNumberOfTasks total number of tasks that were measured
* @param totalFrameFrameNanos nanoseconds during which the tasks were received
* @return the rate of tasks in the system
*/
static double calculateLambda(final int totalNumberOfTasks, final long totalFrameFrameNanos) {
assert totalFrameFrameNanos > 0 : "cannot calculate for instantaneous tasks";
assert totalNumberOfTasks > 0 : "cannot calculate for no tasks";
// There is no set execution time, instead we adjust the time window based on the
// number of completed tasks, so there is no background thread required to update the
// queue size at a regular interval. This means we need to calculate our λ by the
// total runtime, rather than a fixed interval.
// λ = total tasks divided by measurement time
return (double) totalNumberOfTasks / totalFrameFrameNanos;
}
/**
* Calculate Little's Law (L), which is the "optimal" queue size for a particular task rate (lambda) and targeted response time.
*
* @param lambda the arrival rate of tasks in nanoseconds
* @param targetedResponseTimeNanos nanoseconds for the average targeted response rate of requests
* @return the optimal queue size for the give task rate and targeted response time
*/
static int calculateL(final double lambda, final long targetedResponseTimeNanos) {
assert targetedResponseTimeNanos > 0 : "cannot calculate for instantaneous requests";
// L = λ * W
return Math.toIntExact((long)(lambda * targetedResponseTimeNanos));
}
/**
* Returns the current queue capacity
*/
public int getCurrentCapacity() {
return workQueue.capacity();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// A task has been completed, it has left the building. We should now be able to get the
// total time as a combination of the time in the queue and time spent running the task. We
// only want runnables that did not throw errors though, because they could be fast-failures
// that throw off our timings, so only check when t is null.
assert r instanceof TimedRunnable : "expected only TimedRunnables in queue";
final long taskNanos = ((TimedRunnable) r).getTotalNanos();
final long totalNanos = totalTaskNanos.addAndGet(taskNanos);
if (taskCount.incrementAndGet() == this.tasksPerFrame) {
final long endTimeNs = System.nanoTime();
final long totalRuntime = endTimeNs - this.startNs;
// Reset the start time for all tasks. At first glance this appears to need to be
// volatile, since we are reading from a different thread when it is set, but it
// is protected by the taskCount memory barrier.
// See: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html
startNs = endTimeNs;
// Calculate the new desired queue size
try {
final double lambda = calculateLambda(tasksPerFrame, totalNanos);
final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos);
if (logger.isDebugEnabled()) {
final long avgTaskTime = totalNanos / tasksPerFrame;
logger.debug("[{}]: there were [{}] tasks in [{}], avg task time: [{}], [{} tasks/s], " +
"optimal queue is [{}]",
name,
tasksPerFrame,
TimeValue.timeValueNanos(totalRuntime),
TimeValue.timeValueNanos(avgTaskTime),
String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()),
desiredQueueSize);
}
final int oldCapacity = workQueue.capacity();
// Adjust the queue size towards the desired capacity using an adjust of
// QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max
// values the queue size can have.
final int newCapacity =
workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize);
if (oldCapacity != newCapacity && logger.isDebugEnabled()) {
logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", name,
newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT,
oldCapacity, newCapacity);
}
} catch (ArithmeticException e) {
// There was an integer overflow, so just log about it, rather than adjust the queue size
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
"failed to calculate optimal queue size for [{}] thread pool, " +
"total frame time [{}ns], tasks [{}], task execution time [{}ns]",
name, totalRuntime, tasksPerFrame, totalNanos),
e);
} finally {
// Finally, decrement the task count and time back to their starting values. We
// do this at the end so there is no concurrent adjustments happening. We also
// decrement them instead of resetting them back to zero, as resetting them back
// to zero causes operations that came in during the adjustment to be uncounted
int tasks = taskCount.addAndGet(-this.tasksPerFrame);
assert tasks >= 0 : "tasks should never be negative, got: " + tasks;
if (tasks >= this.tasksPerFrame) {
// Start over, because we can potentially reach a "never adjusting" state,
//
// consider the following:
// - If the frame window is 10, and there are 10 tasks, then an adjustment will begin. (taskCount == 10)
// - Prior to the adjustment being done, 15 more tasks come in, the taskCount is now 25
// - Adjustment happens and we decrement the tasks by 10, taskCount is now 15
// - Since taskCount will now be incremented forever, it will never be 10 again,
// so there will be no further adjustments
logger.debug("[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", name);
totalTaskNanos.getAndSet(0);
taskCount.getAndSet(0);
startNs = System.nanoTime();
} else {
// Do a regular adjustment
totalTaskNanos.addAndGet(-totalNanos);
}
}
}
}
@Override
public String toString() {
StringBuilder b = new StringBuilder();
b.append(getClass().getSimpleName()).append('[');
b.append(name).append(", ");
@SuppressWarnings("rawtypes")
ResizableBlockingQueue queue = (ResizableBlockingQueue) getQueue();
b.append("queue capacity = ").append(getCurrentCapacity()).append(", ");
b.append("min queue capacity = ").append(minQueueSize).append(", ");
b.append("max queue capacity = ").append(maxQueueSize).append(", ");
b.append("frame size = ").append(tasksPerFrame).append(", ");
b.append("targeted response rate = ").append(TimeValue.timeValueNanos(targetedResponseTimeNanos)).append(", ");
b.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", ");
/*
* ThreadPoolExecutor has some nice information in its toString but we
* can't get at it easily without just getting the toString.
*/
b.append(super.toString()).append(']');
return b.toString();
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import java.util.concurrent.BlockingQueue;
import org.elasticsearch.common.SuppressForbidden;
/**
* Extends the {@code SizeBlockingQueue} to add the {@code adjustCapacity} method, which will adjust
* the capacity by a certain amount towards a maximum or minimum.
*/
final class ResizableBlockingQueue<E> extends SizeBlockingQueue<E> {
private volatile int capacity;
ResizableBlockingQueue(BlockingQueue<E> queue, int initialCapacity) {
super(queue, initialCapacity);
this.capacity = initialCapacity;
}
@SuppressForbidden(reason = "optimalCapacity is non-negative, therefore the difference cannot be < -Integer.MAX_VALUE")
private int getChangeAmount(int optimalCapacity) {
assert optimalCapacity >= 0 : "optimal capacity should always be positive, got: " + optimalCapacity;
return Math.abs(optimalCapacity - this.capacity);
}
@Override
public int capacity() {
return this.capacity;
}
@Override
public int remainingCapacity() {
return Math.max(0, this.capacity());
}
/** Resize the limit for the queue, returning the new size limit */
public synchronized int adjustCapacity(int optimalCapacity, int adjustmentAmount, int minCapacity, int maxCapacity) {
assert adjustmentAmount > 0 : "adjustment amount should be a positive value";
assert optimalCapacity >= 0 : "desired capacity cannot be negative";
assert minCapacity >= 0 : "cannot have min capacity smaller than 0";
assert maxCapacity >= minCapacity : "cannot have max capacity smaller than min capacity";
if (optimalCapacity == capacity) {
// Yahtzee!
return this.capacity;
}
if (optimalCapacity > capacity + adjustmentAmount) {
// adjust up
final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount);
this.capacity = newCapacity;
return newCapacity;
} else if (optimalCapacity < capacity - adjustmentAmount) {
// adjust down
final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount);
this.capacity = newCapacity;
return newCapacity;
} else {
return this.capacity;
}
}
}

View File

@ -131,7 +131,7 @@ public class SizeBlockingQueue<E> extends AbstractQueue<E> implements BlockingQu
@Override
public boolean offer(E e) {
int count = size.incrementAndGet();
if (count > capacity) {
if (count > capacity()) {
size.decrementAndGet();
return false;
}
@ -168,7 +168,7 @@ public class SizeBlockingQueue<E> extends AbstractQueue<E> implements BlockingQu
@Override
public int remainingCapacity() {
return capacity - size.get();
return capacity() - size.get();
}
@Override

View File

@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
/**
* A class used to wrap a {@code Runnable} that allows capturing the time the task since creation
* through execution.
*/
class TimedRunnable implements Runnable {
private final Runnable original;
private final long creationTimeNanos;
private long finishTimeNanos = -1;
TimedRunnable(Runnable original) {
this.original = original;
this.creationTimeNanos = System.nanoTime();
}
@Override
public void run() {
try {
original.run();
} finally {
finishTimeNanos = System.nanoTime();
}
}
/**
* Return the time since this task was created until it finished running.
* If the task is still running or has not yet been run, returns -1.
*/
long getTotalNanos() {
if (finishTimeNanos == -1) {
// There must have been an exception thrown, the total time is unknown (-1)
return -1;
}
return finishTimeNanos - creationTimeNanos;
}
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.common.util.concurrent.QueueResizingEsThreadPoolExecutor;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
/**
* A builder for executors that automatically adjust the queue length as needed, depending on
* Little's Law. See https://en.wikipedia.org/wiki/Little's_law for more information.
*/
public final class AutoQueueAdjustingExecutorBuilder extends ExecutorBuilder<AutoQueueAdjustingExecutorBuilder.AutoExecutorSettings> {
private final Setting<Integer> sizeSetting;
private final Setting<Integer> queueSizeSetting;
private final Setting<Integer> minQueueSizeSetting;
private final Setting<Integer> maxQueueSizeSetting;
private final Setting<TimeValue> targetedResponseTimeSetting;
private final Setting<Integer> frameSizeSetting;
AutoQueueAdjustingExecutorBuilder(final Settings settings, final String name, final int size,
final int initialQueueSize, final int minQueueSize,
final int maxQueueSize, final int frameSize) {
super(name);
final String prefix = "thread_pool." + name;
final String sizeKey = settingsKey(prefix, "size");
this.sizeSetting =
new Setting<>(
sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
Setting.Property.NodeScope);
final String queueSizeKey = settingsKey(prefix, "queue_size");
final String minSizeKey = settingsKey(prefix, "min_queue_size");
final String maxSizeKey = settingsKey(prefix, "max_queue_size");
final String frameSizeKey = settingsKey(prefix, "auto_queue_frame_size");
final String targetedResponseTimeKey = settingsKey(prefix, "target_response_time");
this.targetedResponseTimeSetting = Setting.timeSetting(targetedResponseTimeKey, TimeValue.timeValueSeconds(1),
TimeValue.timeValueMillis(10), Setting.Property.NodeScope);
this.queueSizeSetting = Setting.intSetting(queueSizeKey, initialQueueSize, Setting.Property.NodeScope);
// These temp settings are used to validate the min and max settings below
Setting<Integer> tempMaxQueueSizeSetting = Setting.intSetting(maxSizeKey, maxQueueSize, Setting.Property.NodeScope);
Setting<Integer> tempMinQueueSizeSetting = Setting.intSetting(minSizeKey, minQueueSize, Setting.Property.NodeScope);
this.minQueueSizeSetting = new Setting<>(
minSizeKey,
(s) -> Integer.toString(minQueueSize),
(s) -> Setting.parseInt(s, 0, tempMaxQueueSizeSetting.get(settings), minSizeKey),
Setting.Property.NodeScope);
this.maxQueueSizeSetting = new Setting<>(
maxSizeKey,
(s) -> Integer.toString(maxQueueSize),
(s) -> Setting.parseInt(s, tempMinQueueSizeSetting.get(settings), Integer.MAX_VALUE, maxSizeKey),
Setting.Property.NodeScope);
this.frameSizeSetting = Setting.intSetting(frameSizeKey, frameSize, 100, Setting.Property.NodeScope);
}
@Override
public List<Setting<?>> getRegisteredSettings() {
return Arrays.asList(sizeSetting, queueSizeSetting, minQueueSizeSetting,
maxQueueSizeSetting, frameSizeSetting, targetedResponseTimeSetting);
}
@Override
AutoExecutorSettings getSettings(Settings settings) {
final String nodeName = Node.NODE_NAME_SETTING.get(settings);
final int size = sizeSetting.get(settings);
final int initialQueueSize = queueSizeSetting.get(settings);
final int minQueueSize = minQueueSizeSetting.get(settings);
final int maxQueueSize = maxQueueSizeSetting.get(settings);
final int frameSize = frameSizeSetting.get(settings);
final TimeValue targetedResponseTime = targetedResponseTimeSetting.get(settings);
return new AutoExecutorSettings(nodeName, size, initialQueueSize, minQueueSize, maxQueueSize, frameSize, targetedResponseTime);
}
@Override
ThreadPool.ExecutorHolder build(final AutoExecutorSettings settings,
final ThreadContext threadContext) {
int size = settings.size;
int initialQueueSize = settings.initialQueueSize;
int minQueueSize = settings.minQueueSize;
int maxQueueSize = settings.maxQueueSize;
int frameSize = settings.frameSize;
TimeValue targetedResponseTime = settings.targetedResponseTime;
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
final ExecutorService executor =
EsExecutors.newAutoQueueFixed(name(), size, initialQueueSize, minQueueSize,
maxQueueSize, frameSize, targetedResponseTime, threadFactory, threadContext);
// TODO: in a subsequent change we hope to extend ThreadPool.Info to be more specific for the thread pool type
final ThreadPool.Info info =
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE,
size, size, null, new SizeValue(initialQueueSize));
return new ThreadPool.ExecutorHolder(executor, info);
}
@Override
String formatInfo(ThreadPool.Info info) {
return String.format(
Locale.ROOT,
"name [%s], size [%d], queue size [%s]",
info.getName(),
info.getMax(),
info.getQueueSize() == null ? "unbounded" : info.getQueueSize());
}
static final class AutoExecutorSettings extends ExecutorBuilder.ExecutorSettings {
private final int size;
private final int initialQueueSize;
private final int minQueueSize;
private final int maxQueueSize;
private final int frameSize;
private final TimeValue targetedResponseTime;
AutoExecutorSettings(final String nodeName, final int size, final int initialQueueSize,
final int minQueueSize, final int maxQueueSize, final int frameSize,
final TimeValue targetedResponseTime) {
super(nodeName);
this.size = size;
this.initialQueueSize = initialQueueSize;
this.minQueueSize = minQueueSize;
this.maxQueueSize = maxQueueSize;
this.frameSize = frameSize;
this.targetedResponseTime = targetedResponseTime;
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.threadpool;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import java.util.List;
@ -46,6 +47,14 @@ public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings
return String.join(".", prefix, key);
}
protected int applyHardSizeLimit(final Settings settings, final String name) {
if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) {
return 1 + EsExecutors.numberOfProcessors(settings);
} else {
return Integer.MAX_VALUE;
}
}
/**
* The list of settings this builder will register.
*

View File

@ -76,14 +76,6 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
}
private int applyHardSizeLimit(final Settings settings, final String name) {
if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) {
return 1 + EsExecutors.numberOfProcessors(settings);
} else {
return Integer.MAX_VALUE;
}
}
@Override
public List<Setting<?>> getRegisteredSettings() {
return Arrays.asList(sizeSetting, queueSizeSetting);

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
@ -85,6 +86,7 @@ public class ThreadPool extends AbstractComponent implements Closeable {
public enum ThreadPoolType {
DIRECT("direct"),
FIXED("fixed"),
FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"),
SCALING("scaling");
private final String type;
@ -126,7 +128,7 @@ public class ThreadPool extends AbstractComponent implements Closeable {
map.put(Names.GET, ThreadPoolType.FIXED);
map.put(Names.INDEX, ThreadPoolType.FIXED);
map.put(Names.BULK, ThreadPoolType.FIXED);
map.put(Names.SEARCH, ThreadPoolType.FIXED);
map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
map.put(Names.MANAGEMENT, ThreadPoolType.SCALING);
map.put(Names.FLUSH, ThreadPoolType.SCALING);
map.put(Names.REFRESH, ThreadPoolType.SCALING);
@ -171,7 +173,8 @@ public class ThreadPool extends AbstractComponent implements Closeable {
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));
builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000));
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
// the assumption here is that the listeners should be very lightweight on the listeners side
@ -608,7 +611,13 @@ public class ThreadPool extends AbstractComponent implements Closeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
if (type == ThreadPoolType.FIXED_AUTO_QUEUE_SIZE &&
out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
// 5.x doesn't know about the "fixed_auto_queue_size" thread pool type, just write fixed.
out.writeString(ThreadPoolType.FIXED.getType());
} else {
out.writeString(type.getType());
}
out.writeInt(min);
out.writeInt(max);
out.writeOptionalWriteable(keepAlive);

View File

@ -0,0 +1,226 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
/**
* Tests for the automatic queue resizing of the {@code QueueResizingEsThreadPoolExecutorTests}
* based on the time taken for each event.
*/
public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
public void testExactWindowSizeAdjustment() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 100);
int threads = randomIntBetween(1, 3);
int measureWindow = 3;
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
QueueResizingEsThreadPoolExecutor executor =
new QueueResizingEsThreadPoolExecutor(
"test-threadpool", threads, threads, 1000,
TimeUnit.MILLISECONDS, queue, 10, 1000, fastWrapper(),
measureWindow, TimeValue.timeValueMillis(1), EsExecutors.daemonThreadFactory("queuetest"),
new EsAbortPolicy(), context);
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);
// Execute exactly 3 (measureWindow) times
executor.execute(() -> {});
executor.execute(() -> {});
executor.execute(() -> {});
// The queue capacity should have increased by 50 since they were very fast tasks
assertBusy(() -> {
assertThat(queue.capacity(), equalTo(150));
});
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
context.close();
}
public void testAutoQueueSizingUp() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(),
2000);
int threads = randomIntBetween(1, 10);
int measureWindow = randomIntBetween(100, 200);
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
QueueResizingEsThreadPoolExecutor executor =
new QueueResizingEsThreadPoolExecutor(
"test-threadpool", threads, threads, 1000,
TimeUnit.MILLISECONDS, queue, 10, 3000, fastWrapper(),
measureWindow, TimeValue.timeValueMillis(1), EsExecutors.daemonThreadFactory("queuetest"),
new EsAbortPolicy(), context);
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);
// Execute a task multiple times that takes 1ms
executeTask(executor, (measureWindow * 5) + 2);
assertBusy(() -> {
assertThat(queue.capacity(), greaterThan(2000));
});
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
context.close();
}
public void testAutoQueueSizingDown() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(),
2000);
int threads = randomIntBetween(1, 10);
int measureWindow = randomIntBetween(100, 200);
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
QueueResizingEsThreadPoolExecutor executor =
new QueueResizingEsThreadPoolExecutor(
"test-threadpool", threads, threads, 1000,
TimeUnit.MILLISECONDS, queue, 10, 3000, slowWrapper(), measureWindow, TimeValue.timeValueMillis(1),
EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context);
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);
// Execute a task multiple times that takes 1m
executeTask(executor, (measureWindow * 5) + 2);
assertBusy(() -> {
assertThat(queue.capacity(), lessThan(2000));
});
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
context.close();
}
public void testAutoQueueSizingWithMin() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(),
5000);
int threads = randomIntBetween(1, 5);
int measureWindow = randomIntBetween(10, 100);;
int min = randomIntBetween(4981, 4999);
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
QueueResizingEsThreadPoolExecutor executor =
new QueueResizingEsThreadPoolExecutor(
"test-threadpool", threads, threads, 1000,
TimeUnit.MILLISECONDS, queue, min, 100000, slowWrapper(), measureWindow, TimeValue.timeValueMillis(1),
EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context);
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);
// Execute a task multiple times that takes 1m
executeTask(executor, (measureWindow * 5));
// The queue capacity should decrease, but no lower than the minimum
assertBusy(() -> {
assertThat(queue.capacity(), equalTo(min));
});
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
context.close();
}
public void testAutoQueueSizingWithMax() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(),
5000);
int threads = randomIntBetween(1, 5);
int measureWindow = randomIntBetween(10, 100);
int max = randomIntBetween(5010, 5024);
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
QueueResizingEsThreadPoolExecutor executor =
new QueueResizingEsThreadPoolExecutor(
"test-threadpool", threads, threads, 1000,
TimeUnit.MILLISECONDS, queue, 10, max, fastWrapper(), measureWindow, TimeValue.timeValueMillis(1),
EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context);
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);
// Execute a task multiple times that takes 1ms
executeTask(executor, measureWindow * 3);
// The queue capacity should increase, but no higher than the maximum
assertBusy(() -> {
assertThat(queue.capacity(), equalTo(max));
});
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
context.close();
}
private Function<Runnable, Runnable> randomBetweenLimitsWrapper(final int minNs, final int maxNs) {
return (runnable) -> {
return new SettableTimedRunnable(randomIntBetween(minNs, maxNs));
};
}
private Function<Runnable, Runnable> fastWrapper() {
return (runnable) -> {
return new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(50));
};
}
private Function<Runnable, Runnable> slowWrapper() {
return (runnable) -> {
return new SettableTimedRunnable(TimeUnit.MINUTES.toNanos(2));
};
}
/** Execute a blank task {@code times} times for the executor */
private void executeTask(QueueResizingEsThreadPoolExecutor executor, int times) {
logger.info("--> executing a task [{}] times", times);
for (int i = 0; i < times; i++) {
executor.execute(() -> {});
}
}
public class SettableTimedRunnable extends TimedRunnable {
private final long timeTaken;
public SettableTimedRunnable(long timeTaken) {
super(() -> {});
this.timeTaken = timeTaken;
}
@Override
public long getTotalNanos() {
return timeTaken;
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
public class ResizableBlockingQueueTests extends ESTestCase {
public void testAdjustCapacity() throws Exception {
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(),
100);
assertThat(queue.capacity(), equalTo(100));
// Queue size already equal to desired capacity
queue.adjustCapacity(100, 25, 1, 1000);
assertThat(queue.capacity(), equalTo(100));
// Not worth adjusting
queue.adjustCapacity(99, 25, 1, 1000);
assertThat(queue.capacity(), equalTo(100));
// Not worth adjusting
queue.adjustCapacity(75, 25, 1, 1000);
assertThat(queue.capacity(), equalTo(100));
queue.adjustCapacity(74, 25, 1, 1000);
assertThat(queue.capacity(), equalTo(75));
queue.adjustCapacity(1000000, 25, 1, 1000);
assertThat(queue.capacity(), equalTo(100));
queue.adjustCapacity(1, 25, 80, 1000);
assertThat(queue.capacity(), equalTo(80));
queue.adjustCapacity(1000000, 25, 80, 100);
assertThat(queue.capacity(), equalTo(100));
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.common.settings.Settings;
import static org.hamcrest.CoreMatchers.containsString;
public class AutoQueueAdjustingExecutorBuilderTests extends ESThreadPoolTestCase {
public void testValidatingMinMaxSettings() throws Exception {
Settings settings = Settings.builder()
.put("thread_pool.search.min_queue_size", randomIntBetween(30, 100))
.put("thread_pool.search.max_queue_size", randomIntBetween(1,25))
.build();
try {
new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 15, 1, 100, 10);
fail("should have thrown an exception");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("Failed to parse value"));
}
}
}

View File

@ -20,9 +20,10 @@ There are several thread pools, but the important ones include:
is `1 + # of available processors`.
`search`::
For count/search/suggest operations. Thread pool type is `fixed`
with a size of `int((# of available_processors * 3) / 2) + 1`,
queue_size of `1000`.
For count/search/suggest operations. Thread pool type is
`fixed_auto_queue_size` with a size of
`int((# of available_processors * 3) / 2) + 1`, and initial queue_size of
`1000`.
`get`::
For get operations. Thread pool type is `fixed`
@ -90,6 +91,50 @@ thread_pool:
queue_size: 1000
--------------------------------------------------
[float]
==== `fixed_auto_queue_size`
The `fixed_auto_queue_size` thread pool holds a fixed size of threads to handle
the requests with a bounded queue for pending requests that have no threads to
service them. It's similar to the `fixed` threadpool, however, the `queue_size`
automatically adjusts according to calculations based on
https://en.wikipedia.org/wiki/Little%27s_law[Little's Law]. These calculations
will potentially adjust the `queue_size` up or down by 50 every time
`auto_queue_frame_size` operations have been completed.
The `size` parameter controls the number of threads, and defaults to the
number of cores times 5.
The `queue_size` allows to control the initial size of the queue of pending
requests that have no threads to execute them.
The `min_queue_size` setting controls the minimum amount the `queue_size` can be
adjusted to.
The `max_queue_size` setting controls the maximum amount the `queue_size` can be
adjusted to.
The `auto_queue_frame_size` setting controls the number of operations during
which measurement is taken before the queue is adjusted. It should be large
enough that a single operation cannot unduly bias the calculation.
The `target_response_rate` is a time value setting that indicates the targeted
average response time for tasks in the thread pool queue. If tasks are routinely
above this time, the thread pool queue will be adjusted down so that tasks are
rejected.
[source,yaml]
--------------------------------------------------
thread_pool:
search:
size: 30
queue_size: 500
min_queue_size: 10
max_queue_size: 1000
auto_queue_frame_size: 2000
target_response_rate: 1s
--------------------------------------------------
[float]
==== `scaling`