Track EWMA[1] of task execution time in search threadpool executor

This is the first step towards adaptive replica selection (#24915). This PR
tracks the execution time, also known as the "service time" of a task in the
threadpool. The `QueueResizingEsThreadPoolExecutor` then stores a moving average
of these task times which can be retrieved from the executor.

Currently there is no functionality using the EWMA yet (other than tests), this
is only a bite-sized building block so that it's easier to review.

[1]: EWMA = Exponentially Weighted Moving Average
This commit is contained in:
Lee Hinman 2017-05-30 16:47:00 -06:00
parent f2a23e3459
commit b6a2b8d682
5 changed files with 219 additions and 9 deletions

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import java.util.concurrent.atomic.AtomicLong;
/**
* Implements exponentially weighted moving averages (commonly abbreviated EWMA) for a single value.
* This class is safe to share between threads.
*/
public class ExponentiallyWeightedMovingAverage {
private final double alpha;
private final AtomicLong averageBits;
/**
* Create a new EWMA with a given {@code alpha} and {@code initialAvg}. A smaller alpha means
* that new data points will have less weight, where a high alpha means older data points will
* have a lower influence.
*/
public ExponentiallyWeightedMovingAverage(double alpha, double initialAvg) {
if (alpha < 0 || alpha > 1) {
throw new IllegalArgumentException("alpha must be greater or equal to 0 and less than or equal to 1");
}
this.alpha = alpha;
this.averageBits = new AtomicLong(Double.doubleToLongBits(initialAvg));
}
public double getAverage() {
return Double.longBitsToDouble(this.averageBits.get());
}
public void addValue(double newValue) {
boolean successful = false;
do {
final long currentBits = this.averageBits.get();
final double currentAvg = getAverage();
final double newAvg = (alpha * newValue) + ((1 - alpha) * currentAvg);
final long newBits = Double.doubleToLongBits(newAvg);
successful = averageBits.compareAndSet(currentBits, newBits);
} while (successful == false);
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.common.util.concurrent;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
@ -43,8 +44,13 @@ import java.util.stream.Stream;
*/
public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecutor {
// This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable
public static double EWMA_ALPHA = 0.3;
private static final Logger logger =
ESLoggerFactory.getLogger(QueueResizingEsThreadPoolExecutor.class);
// The amount the queue size is adjusted by for each calcuation
private static final int QUEUE_ADJUSTMENT_AMOUNT = 50;
private final Function<Runnable, Runnable> runnableWrapper;
private final ResizableBlockingQueue<Runnable> workQueue;
@ -52,8 +58,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
private final int minQueueSize;
private final int maxQueueSize;
private final long targetedResponseTimeNanos;
// The amount the queue size is adjusted by for each calcuation
private static final int QUEUE_ADJUSTMENT_AMOUNT = 50;
private final ExponentiallyWeightedMovingAverage executionEWMA;
private final AtomicLong totalTaskNanos = new AtomicLong(0);
private final AtomicInteger taskCount = new AtomicInteger(0);
@ -74,6 +79,9 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
this.minQueueSize = minQueueSize;
this.maxQueueSize = maxQueueSize;
this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
// We choose to start the EWMA with the targeted response time, reasoning that it is a
// better start point for a realistic task execution time than starting at 0
this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, targetedResponseTimeNanos);
logger.debug("thread pool [{}] will adjust queue by [{}] when determining automatic queue size",
name, QUEUE_ADJUSTMENT_AMOUNT);
}
@ -126,6 +134,13 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
return workQueue.capacity();
}
/**
* Returns the exponentially weighted moving average of the task execution time
*/
public double getTaskExecutionEWMA() {
return executionEWMA.getAverage();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
@ -136,6 +151,11 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
assert r instanceof TimedRunnable : "expected only TimedRunnables in queue";
final long taskNanos = ((TimedRunnable) r).getTotalNanos();
final long totalNanos = totalTaskNanos.addAndGet(taskNanos);
final long taskExecutionNanos = ((TimedRunnable) r).getTotalExecutionNanos();
assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos;
executionEWMA.addValue(taskExecutionNanos);
if (taskCount.incrementAndGet() == this.tasksPerFrame) {
final long endTimeNs = System.nanoTime();
final long totalRuntime = endTimeNs - this.startNs;
@ -149,20 +169,22 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
try {
final double lambda = calculateLambda(tasksPerFrame, totalNanos);
final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos);
final int oldCapacity = workQueue.capacity();
if (logger.isDebugEnabled()) {
final long avgTaskTime = totalNanos / tasksPerFrame;
logger.debug("[{}]: there were [{}] tasks in [{}], avg task time: [{}], [{} tasks/s], " +
"optimal queue is [{}]",
logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " +
"[{} tasks/s], optimal queue is [{}], current capacity [{}]",
name,
tasksPerFrame,
TimeValue.timeValueNanos(totalRuntime),
TimeValue.timeValueNanos(avgTaskTime),
TimeValue.timeValueNanos((long)executionEWMA.getAverage()),
String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()),
desiredQueueSize);
desiredQueueSize,
oldCapacity);
}
final int oldCapacity = workQueue.capacity();
// Adjust the queue size towards the desired capacity using an adjust of
// QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max
// values the queue size can have.
@ -223,6 +245,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
b.append("max queue capacity = ").append(maxQueueSize).append(", ");
b.append("frame size = ").append(tasksPerFrame).append(", ");
b.append("targeted response rate = ").append(TimeValue.timeValueNanos(targetedResponseTimeNanos)).append(", ");
b.append("task execution EWMA = ").append(TimeValue.timeValueNanos((long)executionEWMA.getAverage())).append(", ");
b.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", ");
/*
* ThreadPoolExecutor has some nice information in its toString but we

View File

@ -20,12 +20,13 @@
package org.elasticsearch.common.util.concurrent;
/**
* A class used to wrap a {@code Runnable} that allows capturing the time the task since creation
* through execution.
* A class used to wrap a {@code Runnable} that allows capturing the time of the task since creation
* through execution as well as only execution time.
*/
class TimedRunnable implements Runnable {
private final Runnable original;
private final long creationTimeNanos;
private long startTimeNanos;
private long finishTimeNanos = -1;
TimedRunnable(Runnable original) {
@ -36,6 +37,7 @@ class TimedRunnable implements Runnable {
@Override
public void run() {
try {
startTimeNanos = System.nanoTime();
original.run();
} finally {
finishTimeNanos = System.nanoTime();
@ -53,4 +55,16 @@ class TimedRunnable implements Runnable {
}
return finishTimeNanos - creationTimeNanos;
}
/**
* Return the time this task spent being run.
* If the task is still running or has not yet been run, returns -1.
*/
long getTotalExecutionNanos() {
if (startTimeNanos == -1 || finishTimeNanos == -1) {
// There must have been an exception thrown, the total time is unknown (-1)
return -1;
}
return finishTimeNanos - startTimeNanos;
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertThat;
/**
* Implements exponentially weighted moving averages (commonly abbreviated EWMA) for a single value.
*/
public class ExponentiallyWeightedMovingAverageTests extends ESTestCase {
public void testEWMA() {
final ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(0.5, 10);
ewma.addValue(12);
assertThat(ewma.getAverage(), equalTo(11.0));
ewma.addValue(10);
ewma.addValue(15);
ewma.addValue(13);
assertThat(ewma.getAverage(), equalTo(12.875));
}
public void testInvalidAlpha() {
try {
ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(-0.5, 10);
fail("should have failed to create EWMA");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("alpha must be greater or equal to 0 and less than or equal to 1"));
}
try {
ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(1.5, 10);
fail("should have failed to create EWMA");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("alpha must be greater or equal to 0 and less than or equal to 1"));
}
}
public void testConvergingToValue() {
final ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(0.5, 10000);
for (int i = 0; i < 100000; i++) {
ewma.addValue(1);
}
assertThat(ewma.getAverage(), lessThan(2.0));
}
}

View File

@ -184,6 +184,47 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
context.close();
}
public void testExecutionEWMACalculation() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(),
100);
QueueResizingEsThreadPoolExecutor executor =
new QueueResizingEsThreadPoolExecutor(
"test-threadpool", 1, 1, 1000,
TimeUnit.MILLISECONDS, queue, 10, 200, fastWrapper(), 10, TimeValue.timeValueMillis(1),
EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context);
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(1000000L));
executeTask(executor, 1);
assertBusy(() -> {
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(700030L));
});
executeTask(executor, 1);
assertBusy(() -> {
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(490050L));
});
executeTask(executor, 1);
assertBusy(() -> {
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(343065L));
});
executeTask(executor, 1);
assertBusy(() -> {
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(240175L));
});
executeTask(executor, 1);
assertBusy(() -> {
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(168153L));
});
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
context.close();
}
private Function<Runnable, Runnable> randomBetweenLimitsWrapper(final int minNs, final int maxNs) {
return (runnable) -> {
return new SettableTimedRunnable(randomIntBetween(minNs, maxNs));
@ -222,5 +263,10 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
public long getTotalNanos() {
return timeTaken;
}
@Override
public long getTotalExecutionNanos() {
return timeTaken;
}
}
}