diff --git a/core/src/main/java/org/elasticsearch/common/ExponentiallyWeightedMovingAverage.java b/core/src/main/java/org/elasticsearch/common/ExponentiallyWeightedMovingAverage.java new file mode 100644 index 00000000000..835f37664ec --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/ExponentiallyWeightedMovingAverage.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common; + + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implements exponentially weighted moving averages (commonly abbreviated EWMA) for a single value. + * This class is safe to share between threads. + */ +public class ExponentiallyWeightedMovingAverage { + + private final double alpha; + private final AtomicLong averageBits; + + /** + * Create a new EWMA with a given {@code alpha} and {@code initialAvg}. A smaller alpha means + * that new data points will have less weight, where a high alpha means older data points will + * have a lower influence. + */ + public ExponentiallyWeightedMovingAverage(double alpha, double initialAvg) { + if (alpha < 0 || alpha > 1) { + throw new IllegalArgumentException("alpha must be greater or equal to 0 and less than or equal to 1"); + } + this.alpha = alpha; + this.averageBits = new AtomicLong(Double.doubleToLongBits(initialAvg)); + } + + public double getAverage() { + return Double.longBitsToDouble(this.averageBits.get()); + } + + public void addValue(double newValue) { + boolean successful = false; + do { + final long currentBits = this.averageBits.get(); + final double currentAvg = getAverage(); + final double newAvg = (alpha * newValue) + ((1 - alpha) * currentAvg); + final long newBits = Double.doubleToLongBits(newAvg); + successful = averageBits.compareAndSet(currentBits, newBits); + } while (successful == false); + } +} diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java index 898c2203d0b..c24b6899bcc 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java @@ -21,6 +21,7 @@ package org.elasticsearch.common.util.concurrent; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.unit.TimeValue; @@ -43,8 +44,13 @@ import java.util.stream.Stream; */ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecutor { + // This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable + public static double EWMA_ALPHA = 0.3; + private static final Logger logger = ESLoggerFactory.getLogger(QueueResizingEsThreadPoolExecutor.class); + // The amount the queue size is adjusted by for each calcuation + private static final int QUEUE_ADJUSTMENT_AMOUNT = 50; private final Function runnableWrapper; private final ResizableBlockingQueue workQueue; @@ -52,8 +58,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto private final int minQueueSize; private final int maxQueueSize; private final long targetedResponseTimeNanos; - // The amount the queue size is adjusted by for each calcuation - private static final int QUEUE_ADJUSTMENT_AMOUNT = 50; + private final ExponentiallyWeightedMovingAverage executionEWMA; private final AtomicLong totalTaskNanos = new AtomicLong(0); private final AtomicInteger taskCount = new AtomicInteger(0); @@ -74,6 +79,9 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto this.minQueueSize = minQueueSize; this.maxQueueSize = maxQueueSize; this.targetedResponseTimeNanos = targetedResponseTime.getNanos(); + // We choose to start the EWMA with the targeted response time, reasoning that it is a + // better start point for a realistic task execution time than starting at 0 + this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, targetedResponseTimeNanos); logger.debug("thread pool [{}] will adjust queue by [{}] when determining automatic queue size", name, QUEUE_ADJUSTMENT_AMOUNT); } @@ -126,6 +134,13 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto return workQueue.capacity(); } + /** + * Returns the exponentially weighted moving average of the task execution time + */ + public double getTaskExecutionEWMA() { + return executionEWMA.getAverage(); + } + @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); @@ -136,6 +151,11 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto assert r instanceof TimedRunnable : "expected only TimedRunnables in queue"; final long taskNanos = ((TimedRunnable) r).getTotalNanos(); final long totalNanos = totalTaskNanos.addAndGet(taskNanos); + + final long taskExecutionNanos = ((TimedRunnable) r).getTotalExecutionNanos(); + assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos; + executionEWMA.addValue(taskExecutionNanos); + if (taskCount.incrementAndGet() == this.tasksPerFrame) { final long endTimeNs = System.nanoTime(); final long totalRuntime = endTimeNs - this.startNs; @@ -149,20 +169,22 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto try { final double lambda = calculateLambda(tasksPerFrame, totalNanos); final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos); + final int oldCapacity = workQueue.capacity(); + if (logger.isDebugEnabled()) { final long avgTaskTime = totalNanos / tasksPerFrame; - logger.debug("[{}]: there were [{}] tasks in [{}], avg task time: [{}], [{} tasks/s], " + - "optimal queue is [{}]", + logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " + + "[{} tasks/s], optimal queue is [{}], current capacity [{}]", name, tasksPerFrame, TimeValue.timeValueNanos(totalRuntime), TimeValue.timeValueNanos(avgTaskTime), + TimeValue.timeValueNanos((long)executionEWMA.getAverage()), String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()), - desiredQueueSize); + desiredQueueSize, + oldCapacity); } - final int oldCapacity = workQueue.capacity(); - // Adjust the queue size towards the desired capacity using an adjust of // QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max // values the queue size can have. @@ -223,6 +245,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto b.append("max queue capacity = ").append(maxQueueSize).append(", "); b.append("frame size = ").append(tasksPerFrame).append(", "); b.append("targeted response rate = ").append(TimeValue.timeValueNanos(targetedResponseTimeNanos)).append(", "); + b.append("task execution EWMA = ").append(TimeValue.timeValueNanos((long)executionEWMA.getAverage())).append(", "); b.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", "); /* * ThreadPoolExecutor has some nice information in its toString but we diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java index 91ad6e46efa..2ee80badb74 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java @@ -20,12 +20,13 @@ package org.elasticsearch.common.util.concurrent; /** - * A class used to wrap a {@code Runnable} that allows capturing the time the task since creation - * through execution. + * A class used to wrap a {@code Runnable} that allows capturing the time of the task since creation + * through execution as well as only execution time. */ class TimedRunnable implements Runnable { private final Runnable original; private final long creationTimeNanos; + private long startTimeNanos; private long finishTimeNanos = -1; TimedRunnable(Runnable original) { @@ -36,6 +37,7 @@ class TimedRunnable implements Runnable { @Override public void run() { try { + startTimeNanos = System.nanoTime(); original.run(); } finally { finishTimeNanos = System.nanoTime(); @@ -53,4 +55,16 @@ class TimedRunnable implements Runnable { } return finishTimeNanos - creationTimeNanos; } + + /** + * Return the time this task spent being run. + * If the task is still running or has not yet been run, returns -1. + */ + long getTotalExecutionNanos() { + if (startTimeNanos == -1 || finishTimeNanos == -1) { + // There must have been an exception thrown, the total time is unknown (-1) + return -1; + } + return finishTimeNanos - startTimeNanos; + } } diff --git a/core/src/test/java/org/elasticsearch/common/ExponentiallyWeightedMovingAverageTests.java b/core/src/test/java/org/elasticsearch/common/ExponentiallyWeightedMovingAverageTests.java new file mode 100644 index 00000000000..9e50d0afd71 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/ExponentiallyWeightedMovingAverageTests.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assert.assertThat; + +/** + * Implements exponentially weighted moving averages (commonly abbreviated EWMA) for a single value. + */ +public class ExponentiallyWeightedMovingAverageTests extends ESTestCase { + + public void testEWMA() { + final ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(0.5, 10); + ewma.addValue(12); + assertThat(ewma.getAverage(), equalTo(11.0)); + ewma.addValue(10); + ewma.addValue(15); + ewma.addValue(13); + assertThat(ewma.getAverage(), equalTo(12.875)); + } + + public void testInvalidAlpha() { + try { + ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(-0.5, 10); + fail("should have failed to create EWMA"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("alpha must be greater or equal to 0 and less than or equal to 1")); + } + + try { + ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(1.5, 10); + fail("should have failed to create EWMA"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("alpha must be greater or equal to 0 and less than or equal to 1")); + } + } + + public void testConvergingToValue() { + final ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(0.5, 10000); + for (int i = 0; i < 100000; i++) { + ewma.addValue(1); + } + assertThat(ewma.getAverage(), lessThan(2.0)); + } +} diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java index 82b67806b79..5365e1bb909 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java @@ -184,6 +184,47 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase { context.close(); } + public void testExecutionEWMACalculation() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + ResizableBlockingQueue queue = + new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), + 100); + + QueueResizingEsThreadPoolExecutor executor = + new QueueResizingEsThreadPoolExecutor( + "test-threadpool", 1, 1, 1000, + TimeUnit.MILLISECONDS, queue, 10, 200, fastWrapper(), 10, TimeValue.timeValueMillis(1), + EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(1000000L)); + executeTask(executor, 1); + assertBusy(() -> { + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(700030L)); + }); + executeTask(executor, 1); + assertBusy(() -> { + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(490050L)); + }); + executeTask(executor, 1); + assertBusy(() -> { + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(343065L)); + }); + executeTask(executor, 1); + assertBusy(() -> { + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(240175L)); + }); + executeTask(executor, 1); + assertBusy(() -> { + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(168153L)); + }); + + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + context.close(); + } + private Function randomBetweenLimitsWrapper(final int minNs, final int maxNs) { return (runnable) -> { return new SettableTimedRunnable(randomIntBetween(minNs, maxNs)); @@ -222,5 +263,10 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase { public long getTotalNanos() { return timeTaken; } + + @Override + public long getTotalExecutionNanos() { + return timeTaken; + } } }