Require executor name when calling scheduleWithFixedDelay

The ThreadPool#scheduleWithFixedDelay method does not make it clear that all scheduled runnable instances
will be run on the scheduler thread. This becomes problematic if the actions being performed include
blocking operations since there is a single thread and tasks may not get executed due to a blocking task.

This change includes a few different aspects around trying to prevent this situation. The first is that
the scheduleWithFixedDelay method now requires the name of the executor that should be used to execute
the runnable. All existing calls were updated to use Names.SAME to preserve the existing behavior.

The second aspect is the removal of using ScheduledThreadPoolExecutor#scheduleWithFixedDelay in favor of
a custom runnable, ReschedulingRunnable. This runnable encapsulates the logic to deal with rescheduling a
runnable with a fixed delay and mimics the behavior of executing using a ScheduledThreadPoolExecutor and
provides a ScheduledFuture implementation that also mimics that of the typed returned by a
ScheduledThreadPoolExecutor.

Finally, an assertion was added to BaseFuture to detect blocking calls that are being made on the scheduler
thread.
This commit is contained in:
jaymode 2016-04-05 09:01:47 -04:00
parent 0854b03f13
commit 11389638f9
10 changed files with 478 additions and 53 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transports;
import java.util.Objects;
@ -32,6 +33,8 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public abstract class BaseFuture<V> implements Future<V> {
private static final String BLOCKING_OP_REASON = "Blocking operation";
/**
* Synchronization control for AbstractFutures.
*/
@ -56,7 +59,8 @@ public abstract class BaseFuture<V> implements Future<V> {
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException,
TimeoutException, ExecutionException {
assert timeout <= 0 || Transports.assertNotTransportThread("Blocking operation");
assert timeout <= 0 ||
(Transports.assertNotTransportThread(BLOCKING_OP_REASON) && ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON));
return sync.get(unit.toNanos(timeout));
}
@ -78,7 +82,7 @@ public abstract class BaseFuture<V> implements Future<V> {
*/
@Override
public V get() throws InterruptedException, ExecutionException {
assert Transports.assertNotTransportThread("Blocking operation");
assert Transports.assertNotTransportThread(BLOCKING_OP_REASON) && ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON);
return sync.get();
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
@ -35,6 +34,8 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.io.Closeable;
import java.util.ArrayList;
@ -43,7 +44,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
@ -84,7 +84,7 @@ public class IndexingMemoryController extends AbstractComponent implements Index
/** Contains shards currently being throttled because we can't write segments quickly enough */
private final Set<IndexShard> throttled = new HashSet<>();
private final ScheduledFuture scheduler;
private final Cancellable scheduler;
private static final EnumSet<IndexShardState> CAN_WRITE_INDEX_BUFFER_STATES = EnumSet.of(
IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
@ -128,14 +128,14 @@ public class IndexingMemoryController extends AbstractComponent implements Index
this.threadPool = threadPool;
}
protected ScheduledFuture<?> scheduleTask(ThreadPool threadPool) {
protected Cancellable scheduleTask(ThreadPool threadPool) {
// it's fine to run it on the scheduler thread, no busy work
return threadPool.scheduleWithFixedDelay(statusChecker, interval);
return threadPool.scheduleWithFixedDelay(statusChecker, interval, Names.SAME);
}
@Override
public void close() {
FutureUtils.cancel(scheduler);
scheduler.cancel();
}
/**

View File

@ -29,6 +29,8 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.monitor.jvm.JvmStats.GarbageCollector;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.util.HashMap;
import java.util.Locale;
@ -48,7 +50,7 @@ public class JvmGcMonitorService extends AbstractLifecycleComponent {
private final Map<String, GcThreshold> gcThresholds;
private final GcOverheadThreshold gcOverheadThreshold;
private volatile ScheduledFuture scheduledFuture;
private volatile Cancellable scheduledFuture;
public static final Setting<Boolean> ENABLED_SETTING =
Setting.boolSetting("monitor.jvm.gc.enabled", true, Property.NodeScope);
@ -198,7 +200,7 @@ public class JvmGcMonitorService extends AbstractLifecycleComponent {
void onGcOverhead(final Threshold threshold, final long current, final long elapsed, final long seq) {
logGcOverhead(logger, threshold, current, elapsed, seq);
}
}, interval);
}, interval, Names.SAME);
}
private static final String SLOW_GC_LOG_MESSAGE =
@ -334,7 +336,7 @@ public class JvmGcMonitorService extends AbstractLifecycleComponent {
if (!enabled) {
return;
}
FutureUtils.cancel(scheduledFuture);
scheduledFuture.cancel();
}
@Override

View File

@ -89,6 +89,8 @@ import org.elasticsearch.search.searchafter.SearchAfterBuilder;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.io.IOException;
import java.util.Collections;
@ -96,7 +98,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.unmodifiableMap;
@ -139,7 +140,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private volatile TimeValue defaultSearchTimeout;
private final ScheduledFuture<?> keepAliveReaper;
private final Cancellable keepAliveReaper;
private final AtomicLong idGenerator = new AtomicLong();
@ -171,7 +172,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
elementParsers.putAll(fetchPhase.parseElements());
this.elementParsers = unmodifiableMap(elementParsers);
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval);
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval, Names.SAME);
defaultSearchTimeout = DEFAULT_SEARCH_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(DEFAULT_SEARCH_TIMEOUT_SETTING, this::setDefaultSearchTimeout);
@ -224,7 +225,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
@Override
protected void doClose() {
doStop();
FutureUtils.cancel(keepAliveReaper);
keepAliveReaper.cancel();
}
public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws IOException {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
@ -298,14 +299,18 @@ public class ThreadPool extends AbstractComponent implements Closeable {
}
/**
* Schedules a periodic action that always runs on the scheduler thread.
* Schedules a periodic action that runs on the specified thread pool.
*
* @param command the action to take
* @param interval the delay interval
* @return a ScheduledFuture who's get will return when the task is complete and throw an exception if it is canceled
* @param executor The name of the thread pool on which to execute this task. {@link Names#SAME} means "execute on the scheduler thread",
* which there is only one of. Executing blocking or long running code on the {@link Names#SAME} thread pool should never
* be done as it can cause issues with the cluster
* @return a {@link Cancellable} that can be used to cancel the subsequent runs of the command. If the command is running, it will
* not be interrupted.
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) {
return scheduler.scheduleWithFixedDelay(new LoggingRunnable(command), interval.millis(), interval.millis(), TimeUnit.MILLISECONDS);
public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) {
return new ReschedulingRunnable(command, interval, executor, this);
}
/**
@ -314,7 +319,7 @@ public class ThreadPool extends AbstractComponent implements Closeable {
* it to this method.
*
* @param delay delay before the task executes
* @param name the name of the thread pool on which to execute this task. SAME means "execute on the scheduler thread" which changes the
* @param executor the name of the thread pool on which to execute this task. SAME means "execute on the scheduler thread" which changes the
* meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the command
* completes.
* @param command the command to run
@ -322,9 +327,9 @@ public class ThreadPool extends AbstractComponent implements Closeable {
* the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool
* the ScheduledFuture will cannot interact with it.
*/
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
if (!Names.SAME.equals(name)) {
command = new ThreadedRunnable(command, executor(name));
public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable command) {
if (!Names.SAME.equals(executor)) {
command = new ThreadedRunnable(command, executor(executor));
}
return scheduler.schedule(new LoggingRunnable(command), delay.millis(), TimeUnit.MILLISECONDS);
}
@ -700,4 +705,101 @@ public class ThreadPool extends AbstractComponent implements Closeable {
return threadContext;
}
/**
* This interface represents an object whose execution may be cancelled during runtime.
*/
public interface Cancellable {
/**
* Cancel the execution of this object. This method is idempotent.
*/
void cancel();
/**
* Check if the execution has been cancelled
* @return true if cancelled
*/
boolean isCancelled();
}
/**
* This class encapsulates the scheduling of a {@link Runnable} that needs to be repeated on a interval. For example, checking a value
* for cleanup every second could be done by passing in a Runnable that can perform the check and the specified interval between
* executions of this runnable. <em>NOTE:</em> the runnable is only rescheduled to run again after completion of the runnable.
*
* For this class, <i>completion</i> means that the call to {@link Runnable#run()} returned or an exception was thrown and caught. In
* case of an exception, this class will log the exception and reschedule the runnable for its next execution. This differs from the
* {@link ScheduledThreadPoolExecutor#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} semantics as an exception there would
* terminate the rescheduling of the runnable.
*/
static final class ReschedulingRunnable extends AbstractRunnable implements Cancellable {
private final Runnable runnable;
private final TimeValue interval;
private final String executor;
private final ThreadPool threadPool;
private volatile boolean run = true;
/**
* Creates a new rescheduling runnable and schedules the first execution to occur after the interval specified
*
* @param runnable the {@link Runnable} that should be executed periodically
* @param interval the time interval between executions
* @param executor the executor where this runnable should be scheduled to run
* @param threadPool the {@link ThreadPool} instance to use for scheduling
*/
ReschedulingRunnable(Runnable runnable, TimeValue interval, String executor, ThreadPool threadPool) {
this.runnable = runnable;
this.interval = interval;
this.executor = executor;
this.threadPool = threadPool;
threadPool.schedule(interval, executor, this);
}
@Override
public void cancel() {
run = false;
}
@Override
public boolean isCancelled() {
return run == false;
}
@Override
public void doRun() {
// always check run here since this may have been cancelled since the last execution and we do not want to run
if (run) {
runnable.run();
}
}
@Override
public void onFailure(Exception e) {
threadPool.logger.warn("failed to run scheduled task [{}] on thread pool [{}]", e, runnable.toString(), executor);
}
@Override
public void onRejection(Exception e) {
run = false;
if (threadPool.logger.isDebugEnabled()) {
threadPool.logger.debug("scheduled task [{}] was rejected on thread pool [{}]", e, runnable, executor);
}
}
@Override
public void onAfter() {
// if this has not been cancelled reschedule it to run again
if (run) {
threadPool.schedule(interval, executor, this);
}
}
}
public static boolean assertNotScheduleThread(String reason) {
assert Thread.currentThread().getName().contains("scheduler") == false :
"Expected current thread [" + Thread.currentThread() + "] to not be the scheduler thread. Reason: [" + reason + "]";
return true;
}
}

View File

@ -24,13 +24,13 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledFuture;
/**
* Generic resource watcher service
@ -81,9 +81,9 @@ public class ResourceWatcherService extends AbstractLifecycleComponent {
final ResourceMonitor mediumMonitor;
final ResourceMonitor highMonitor;
private volatile ScheduledFuture lowFuture;
private volatile ScheduledFuture mediumFuture;
private volatile ScheduledFuture highFuture;
private volatile Cancellable lowFuture;
private volatile Cancellable mediumFuture;
private volatile Cancellable highFuture;
@Inject
public ResourceWatcherService(Settings settings, ThreadPool threadPool) {
@ -110,9 +110,9 @@ public class ResourceWatcherService extends AbstractLifecycleComponent {
if (!enabled) {
return;
}
lowFuture = threadPool.scheduleWithFixedDelay(lowMonitor, lowMonitor.interval);
mediumFuture = threadPool.scheduleWithFixedDelay(mediumMonitor, mediumMonitor.interval);
highFuture = threadPool.scheduleWithFixedDelay(highMonitor, highMonitor.interval);
lowFuture = threadPool.scheduleWithFixedDelay(lowMonitor, lowMonitor.interval, Names.SAME);
mediumFuture = threadPool.scheduleWithFixedDelay(mediumMonitor, mediumMonitor.interval, Names.SAME);
highFuture = threadPool.scheduleWithFixedDelay(highMonitor, highMonitor.interval, Names.SAME);
}
@Override
@ -120,9 +120,9 @@ public class ResourceWatcherService extends AbstractLifecycleComponent {
if (!enabled) {
return;
}
FutureUtils.cancel(lowFuture);
FutureUtils.cancel(mediumFuture);
FutureUtils.cancel(highFuture);
lowFuture.cancel();
mediumFuture.cancel();
highFuture.cancel();
}
@Override

View File

@ -56,6 +56,8 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.junit.After;
import org.junit.Before;
@ -222,7 +224,7 @@ public class RefreshListenersTests extends ESTestCase {
maxListeners = between(1, threadCount * 2);
// This thread just refreshes every once in a while to cause trouble.
ScheduledFuture<?> refresher = threadPool.scheduleWithFixedDelay(() -> engine.refresh("because test"), timeValueMillis(100));
Cancellable refresher = threadPool.scheduleWithFixedDelay(() -> engine.refresh("because test"), timeValueMillis(100), Names.SAME);
// These threads add and block until the refresh makes the change visible and then do a non-realtime get.
Thread[] indexers = new Thread[threadCount];
@ -262,7 +264,7 @@ public class RefreshListenersTests extends ESTestCase {
for (Thread indexer: indexers) {
indexer.join();
}
FutureUtils.cancel(refresher);
refresher.cancel();
}
private Engine.Index index(String id) {

View File

@ -34,6 +34,7 @@ import org.elasticsearch.index.shard.IndexShardTests;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import java.io.IOException;
import java.util.ArrayList;
@ -43,7 +44,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -160,7 +160,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
}
@Override
protected ScheduledFuture<?> scheduleTask(ThreadPool threadPool) {
protected Cancellable scheduleTask(ThreadPool threadPool) {
return null;
}
}
@ -390,7 +390,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
}
@Override
protected ScheduledFuture<?> scheduleTask(ThreadPool threadPool) {
protected Cancellable scheduleTask(ThreadPool threadPool) {
return null;
}
};

View File

@ -24,14 +24,13 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import java.util.AbstractMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import static org.hamcrest.CoreMatchers.allOf;
@ -42,19 +41,23 @@ public class JvmGcMonitorServiceSettingsTests extends ESTestCase {
public void testEmptySettingsAreOkay() throws InterruptedException {
AtomicBoolean scheduled = new AtomicBoolean();
execute(Settings.EMPTY, (command, interval) -> { scheduled.set(true); return null; }, () -> assertTrue(scheduled.get()));
execute(Settings.EMPTY,
(command, interval, name) -> { scheduled.set(true); return new MockCancellable(); },
() -> assertTrue(scheduled.get()));
}
public void testDisabledSetting() throws InterruptedException {
Settings settings = Settings.builder().put("monitor.jvm.gc.enabled", "false").build();
AtomicBoolean scheduled = new AtomicBoolean();
execute(settings, (command, interval) -> { scheduled.set(true); return null; }, () -> assertFalse(scheduled.get()));
execute(settings,
(command, interval, name) -> { scheduled.set(true); return new MockCancellable(); },
() -> assertFalse(scheduled.get()));
}
public void testNegativeSetting() throws InterruptedException {
String collector = randomAsciiOfLength(5);
Settings settings = Settings.builder().put("monitor.jvm.gc.collector." + collector + ".warn", "-" + randomTimeValue()).build();
execute(settings, (command, interval) -> null, e -> {
execute(settings, (command, interval, name) -> null, e -> {
assertThat(e, instanceOf(IllegalArgumentException.class));
assertThat(e.getMessage(), allOf(containsString("invalid gc_threshold"), containsString("for [monitor.jvm.gc.collector." + collector + ".")));
}, true, null);
@ -74,7 +77,7 @@ public class JvmGcMonitorServiceSettingsTests extends ESTestCase {
}
// we should get an exception that a setting is missing
execute(builder.build(), (command, interval) -> null, e -> {
execute(builder.build(), (command, interval, name) -> null, e -> {
assertThat(e, instanceOf(IllegalArgumentException.class));
assertThat(e.getMessage(), containsString("missing gc_threshold for [monitor.jvm.gc.collector." + collector + "."));
}, true, null);
@ -84,7 +87,7 @@ public class JvmGcMonitorServiceSettingsTests extends ESTestCase {
for (final String threshold : new String[] { "warn", "info", "debug" }) {
final Settings.Builder builder = Settings.builder();
builder.put("monitor.jvm.gc.overhead." + threshold, randomIntBetween(Integer.MIN_VALUE, -1));
execute(builder.build(), (command, interval) -> null, e -> {
execute(builder.build(), (command, interval, name) -> null, e -> {
assertThat(e, instanceOf(IllegalArgumentException.class));
assertThat(e.getMessage(), containsString("setting [monitor.jvm.gc.overhead." + threshold + "] must be >= 0"));
}, true, null);
@ -93,7 +96,7 @@ public class JvmGcMonitorServiceSettingsTests extends ESTestCase {
for (final String threshold : new String[] { "warn", "info", "debug" }) {
final Settings.Builder builder = Settings.builder();
builder.put("monitor.jvm.gc.overhead." + threshold, randomIntBetween(100 + 1, Integer.MAX_VALUE));
execute(builder.build(), (command, interval) -> null, e -> {
execute(builder.build(), (command, interval, name) -> null, e -> {
assertThat(e, instanceOf(IllegalArgumentException.class));
assertThat(e.getMessage(), containsString("setting [monitor.jvm.gc.overhead." + threshold + "] must be <= 100"));
}, true, null);
@ -104,7 +107,7 @@ public class JvmGcMonitorServiceSettingsTests extends ESTestCase {
infoWarnOutOfOrderBuilder.put("monitor.jvm.gc.overhead.info", info);
final int warn = randomIntBetween(1, info - 1);
infoWarnOutOfOrderBuilder.put("monitor.jvm.gc.overhead.warn", warn);
execute(infoWarnOutOfOrderBuilder.build(), (command, interval) -> null, e -> {
execute(infoWarnOutOfOrderBuilder.build(), (command, interval, name) -> null, e -> {
assertThat(e, instanceOf(IllegalArgumentException.class));
assertThat(e.getMessage(), containsString("[monitor.jvm.gc.overhead.warn] must be greater than [monitor.jvm.gc.overhead.info] [" + info + "] but was [" + warn + "]"));
}, true, null);
@ -114,25 +117,25 @@ public class JvmGcMonitorServiceSettingsTests extends ESTestCase {
final int debug = randomIntBetween(info + 1, 99);
debugInfoOutOfOrderBuilder.put("monitor.jvm.gc.overhead.debug", debug);
debugInfoOutOfOrderBuilder.put("monitor.jvm.gc.overhead.warn", randomIntBetween(debug + 1, 100)); // or the test will fail for the wrong reason
execute(debugInfoOutOfOrderBuilder.build(), (command, interval) -> null, e -> {
execute(debugInfoOutOfOrderBuilder.build(), (command, interval, name) -> null, e -> {
assertThat(e, instanceOf(IllegalArgumentException.class));
assertThat(e.getMessage(), containsString("[monitor.jvm.gc.overhead.info] must be greater than [monitor.jvm.gc.overhead.debug] [" + debug + "] but was [" + info + "]"));
}, true, null);
}
private static void execute(Settings settings, BiFunction<Runnable, TimeValue, ScheduledFuture<?>> scheduler, Runnable asserts) throws InterruptedException {
private static void execute(Settings settings, TriFunction<Runnable, TimeValue, String, Cancellable> scheduler, Runnable asserts) throws InterruptedException {
execute(settings, scheduler, null, false, asserts);
}
private static void execute(Settings settings, BiFunction<Runnable, TimeValue, ScheduledFuture<?>> scheduler, Consumer<Exception> consumer, boolean constructionShouldFail, Runnable asserts) throws InterruptedException {
private static void execute(Settings settings, TriFunction<Runnable, TimeValue, String, Cancellable> scheduler, Consumer<Throwable> consumer, boolean constructionShouldFail, Runnable asserts) throws InterruptedException {
assert constructionShouldFail == (consumer != null);
assert constructionShouldFail == (asserts == null);
ThreadPool threadPool = null;
try {
threadPool = new TestThreadPool(JvmGcMonitorServiceSettingsTests.class.getCanonicalName()) {
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) {
return scheduler.apply(command, interval);
public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String name) {
return scheduler.apply(command, interval, name);
}
};
try {
@ -151,4 +154,19 @@ public class JvmGcMonitorServiceSettingsTests extends ESTestCase {
}
}
interface TriFunction<S, T, U, R> {
R apply(S s, T t, U u);
}
private static class MockCancellable implements Cancellable {
@Override
public void cancel() {
}
@Override
public boolean isCancelled() {
return false;
}
}
}

View File

@ -0,0 +1,296 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.threadpool.ThreadPool.ReschedulingRunnable;
import org.junit.After;
import org.junit.Before;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
/**
* Unit tests for the scheduling of tasks with a fixed delay
*/
public class ScheduleWithFixedDelayTests extends ESTestCase {
private ThreadPool threadPool;
@Before
public void setup() {
threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "fixed delay tests").build());
}
@After
public void shutdown() throws Exception {
terminate(threadPool);
}
public void testDoesNotRescheduleUntilExecutionFinished() throws Exception {
final TimeValue delay = TimeValue.timeValueMillis(100L);
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch pauseLatch = new CountDownLatch(1);
ThreadPool threadPool = mock(ThreadPool.class);
final Runnable runnable = () -> {
// notify that the runnable is started
startLatch.countDown();
try {
// wait for other thread to un-pause
pauseLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, Names.GENERIC, threadPool);
// this call was made during construction of the runnable
verify(threadPool, times(1)).schedule(delay, Names.GENERIC, reschedulingRunnable);
// create a thread and start the runnable
Thread runThread = new Thread() {
@Override
public void run() {
reschedulingRunnable.run();
}
};
runThread.start();
// wait for the runnable to be started and ensure the runnable hasn't used the threadpool again
startLatch.await();
verifyNoMoreInteractions(threadPool);
// un-pause the runnable and allow it to complete execution
pauseLatch.countDown();
runThread.join();
// validate schedule was called again
verify(threadPool, times(2)).schedule(delay, Names.GENERIC, reschedulingRunnable);
}
public void testThatRunnableIsRescheduled() throws Exception {
final CountDownLatch latch = new CountDownLatch(scaledRandomIntBetween(2, 16));
final Runnable countingRunnable = () -> {
if (rarely()) {
throw new ElasticsearchException("sometimes we throw before counting down");
}
latch.countDown();
if (randomBoolean()) {
throw new ElasticsearchException("this shouldn't cause the test to fail!");
}
};
Cancellable cancellable = threadPool.scheduleWithFixedDelay(countingRunnable, TimeValue.timeValueMillis(10L), Names.GENERIC);
assertNotNull(cancellable);
// wait for the number of successful count down operations
latch.await();
// cancel
cancellable.cancel();
assertTrue(cancellable.isCancelled());
}
public void testCancellingRunnable() throws Exception {
final boolean shouldThrow = randomBoolean();
final AtomicInteger counter = new AtomicInteger(scaledRandomIntBetween(2, 16));
final CountDownLatch doneLatch = new CountDownLatch(1);
final AtomicReference<Cancellable> cancellableRef = new AtomicReference<>();
final AtomicBoolean runAfterDone = new AtomicBoolean(false);
final Runnable countingRunnable = () -> {
if (doneLatch.getCount() == 0) {
runAfterDone.set(true);
logger.warn("this runnable ran after it was cancelled");
}
final Cancellable cancellable = cancellableRef.get();
if (cancellable == null) {
// wait for the cancellable to be present before we really start so we can accurately know we cancelled
return;
}
// rarely throw an exception before counting down
if (shouldThrow && rarely()) {
throw new RuntimeException("throw before count down");
}
final int count = counter.decrementAndGet();
// see if we have counted down to zero or below yet. the exception throwing could make us count below zero
if (count <= 0) {
cancellable.cancel();
doneLatch.countDown();
}
// rarely throw an exception after execution
if (shouldThrow && rarely()) {
throw new RuntimeException("throw at end");
}
};
Cancellable cancellable = threadPool.scheduleWithFixedDelay(countingRunnable, TimeValue.timeValueMillis(10L), Names.GENERIC);
cancellableRef.set(cancellable);
// wait for the runnable to finish
doneLatch.await();
// the runnable should have cancelled itself
assertTrue(cancellable.isCancelled());
assertFalse(runAfterDone.get());
// rarely wait and make sure the runnable didn't run at the next interval
if (rarely()) {
assertFalse(awaitBusy(runAfterDone::get, 1L, TimeUnit.SECONDS));
}
}
public void testBlockingCallOnSchedulerThreadFails() throws Exception {
final BaseFuture<Object> future = new BaseFuture<Object>() {};
final TestFuture resultsFuture = new TestFuture();
final boolean getWithTimeout = randomBoolean();
final Runnable runnable = () -> {
try {
Object obj;
if (getWithTimeout) {
obj = future.get(1L, TimeUnit.SECONDS);
} else {
obj = future.get();
}
resultsFuture.futureDone(obj);
} catch (Throwable t) {
resultsFuture.futureDone(t);
}
};
Cancellable cancellable = threadPool.scheduleWithFixedDelay(runnable, TimeValue.timeValueMillis(10L), Names.SAME);
Object resultingObject = resultsFuture.get();
assertNotNull(resultingObject);
assertThat(resultingObject, instanceOf(Throwable.class));
Throwable t = (Throwable) resultingObject;
assertThat(t, instanceOf(AssertionError.class));
assertThat(t.getMessage(), containsString("Blocking"));
assertFalse(cancellable.isCancelled());
}
public void testBlockingCallOnNonSchedulerThreadAllowed() throws Exception {
final TestFuture future = new TestFuture();
final TestFuture resultsFuture = new TestFuture();
final boolean rethrow = randomBoolean();
final boolean getWithTimeout = randomBoolean();
final Runnable runnable = () -> {
try {
Object obj;
if (getWithTimeout) {
obj = future.get(1, TimeUnit.MINUTES);
} else {
obj = future.get();
}
resultsFuture.futureDone(obj);
} catch (Throwable t) {
resultsFuture.futureDone(t);
if (rethrow) {
throw new RuntimeException(t);
}
}
};
final Cancellable cancellable = threadPool.scheduleWithFixedDelay(runnable, TimeValue.timeValueMillis(10L), Names.GENERIC);
assertFalse(resultsFuture.isDone());
final Object o = new Object();
future.futureDone(o);
final Object resultingObject = resultsFuture.get();
assertThat(resultingObject, sameInstance(o));
assertFalse(cancellable.isCancelled());
}
public void testOnRejectionCausesCancellation() throws Exception {
final TimeValue delay = TimeValue.timeValueMillis(10L);
terminate(threadPool);
threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "fixed delay tests").build()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable command) {
if (command instanceof ReschedulingRunnable) {
((ReschedulingRunnable) command).onRejection(new EsRejectedExecutionException());
} else {
fail("this should only be called with a rescheduling runnable in this test");
}
return null;
}
};
Runnable runnable = () -> {};
ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, Names.GENERIC, threadPool);
assertTrue(reschedulingRunnable.isCancelled());
}
public void testRunnableRunsAtMostOnceAfterCancellation() throws Exception {
final int iterations = scaledRandomIntBetween(1, 12);
final AtomicInteger counter = new AtomicInteger();
final CountDownLatch doneLatch = new CountDownLatch(iterations);
final Runnable countingRunnable = () -> {
counter.incrementAndGet();
doneLatch.countDown();
};
final Cancellable cancellable = threadPool.scheduleWithFixedDelay(countingRunnable, TimeValue.timeValueMillis(10L), Names.GENERIC);
doneLatch.await();
cancellable.cancel();
final int counterValue = counter.get();
assertThat(counterValue, isOneOf(iterations, iterations + 1));
if (rarely()) {
awaitBusy(() -> {
final int value = counter.get();
return value == iterations || value == iterations + 1;
}, 50L, TimeUnit.MILLISECONDS);
}
}
static final class TestFuture extends BaseFuture<Object> {
boolean futureDone(Object value) {
return set(value);
}
}
}