Use Threadpool Time in ClusterApplierService (#39679) (#39685)

* Use threadpool's time in `ClusterApplierService` to allow for deterministic tests
* This is a part of/requirement for #39504
This commit is contained in:
Armin Braun 2019-03-05 12:37:49 +01:00 committed by GitHub
parent 380dc27d91
commit e8d9744340
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 22 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
@ -44,6 +45,7 @@ public class ClusterStateObserver {
private final Predicate<ClusterState> MATCH_ALL_CHANGES_PREDICATE = state -> true;
private final ClusterApplierService clusterApplierService;
private final ThreadPool threadPool;
private final ThreadContext contextHolder;
volatile TimeValue timeOutValue;
@ -52,7 +54,7 @@ public class ClusterStateObserver {
final TimeoutClusterStateListener clusterStateListener = new ObserverClusterStateListener();
// observingContext is not null when waiting on cluster state changes
final AtomicReference<ObservingContext> observingContext = new AtomicReference<>(null);
volatile Long startTimeNS;
volatile Long startTimeMS;
volatile boolean timedOut;
@ -81,10 +83,11 @@ public class ClusterStateObserver {
public ClusterStateObserver(ClusterState initialState, ClusterApplierService clusterApplierService, @Nullable TimeValue timeout,
Logger logger, ThreadContext contextHolder) {
this.clusterApplierService = clusterApplierService;
this.threadPool = clusterApplierService.threadPool();
this.lastObservedState = new AtomicReference<>(new StoredState(initialState));
this.timeOutValue = timeout;
if (timeOutValue != null) {
this.startTimeNS = System.nanoTime();
this.startTimeMS = threadPool.relativeTimeInMillis();
}
this.logger = logger;
this.contextHolder = contextHolder;
@ -134,7 +137,7 @@ public class ClusterStateObserver {
if (timeOutValue == null) {
timeOutValue = this.timeOutValue;
if (timeOutValue != null) {
long timeSinceStartMS = TimeValue.nsecToMSec(System.nanoTime() - startTimeNS);
long timeSinceStartMS = threadPool.relativeTimeInMillis() - startTimeMS;
timeoutTimeLeftMS = timeOutValue.millis() - timeSinceStartMS;
if (timeoutTimeLeftMS <= 0L) {
// things have timeout while we were busy -> notify
@ -150,7 +153,7 @@ public class ClusterStateObserver {
timeoutTimeLeftMS = null;
}
} else {
this.startTimeNS = System.nanoTime();
this.startTimeMS = threadPool.relativeTimeInMillis();
this.timeOutValue = timeOutValue;
timeoutTimeLeftMS = timeOutValue.millis();
timedOut = false;
@ -240,7 +243,7 @@ public class ClusterStateObserver {
ObservingContext context = observingContext.getAndSet(null);
if (context != null) {
clusterApplierService.removeTimeoutListener(this);
long timeSinceStartMS = TimeValue.nsecToMSec(System.nanoTime() - startTimeNS);
long timeSinceStartMS = threadPool.relativeTimeInMillis() - startTimeMS;
logger.trace("observer: timeout notification from cluster service. timeout setting [{}], time since start [{}]",
timeOutValue, new TimeValue(timeSinceStartMS));
// update to latest, in case people want to retry

View File

@ -311,6 +311,10 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
runOnApplierThread(source, clusterStateConsumer, listener, Priority.HIGH);
}
public ThreadPool threadPool() {
return threadPool;
}
@Override
public void onNewClusterState(final String source, final Supplier<ClusterState> clusterStateSupplier,
final ClusterApplyListener listener) {
@ -383,12 +387,12 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
logger.debug("processing [{}]: execute", task.source);
final ClusterState previousClusterState = state.get();
long startTimeNS = currentTimeInNanos();
long startTimeMS = currentTimeInMillis();
final ClusterState newClusterState;
try {
newClusterState = task.apply(previousClusterState);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
logger.trace(() -> new ParameterizedMessage(
"failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}",
executionTime, previousClusterState.version(), task.source, previousClusterState), e);
@ -398,7 +402,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
if (previousClusterState == newClusterState) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, task.source);
task.listener.onSuccess(task.source);
@ -411,14 +415,14 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
try {
applyChanges(task, previousClusterState, newClusterState);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source,
executionTime, newClusterState.version(),
newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, task.source);
task.listener.onSuccess(task.source);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
if (logger.isTraceEnabled()) {
logger.warn(new ParameterizedMessage(
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
@ -617,8 +621,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
// this one is overridden in tests so we can control time
protected long currentTimeInNanos() {
return System.nanoTime();
protected long currentTimeInMillis() {
return threadPool.relativeTimeInMillis();
}
}

View File

@ -88,7 +88,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
super.tearDown();
}
TimedClusterApplierService createTimedClusterService(boolean makeMaster) throws InterruptedException {
TimedClusterApplierService createTimedClusterService(boolean makeMaster) {
DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
emptySet(), Version.CURRENT);
TimedClusterApplierService timedClusterApplierService = new TimedClusterApplierService(Settings.builder().put("cluster.name",
@ -141,9 +141,9 @@ public class ClusterApplierServiceTests extends ESTestCase {
Logger clusterLogger = LogManager.getLogger(ClusterApplierService.class);
Loggers.addAppender(clusterLogger, mockAppender);
try {
clusterApplierService.currentTimeOverride = System.nanoTime();
clusterApplierService.currentTimeOverride = threadPool.relativeTimeInMillis();
clusterApplierService.runOnApplierThread("test1",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(),
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).millis(),
new ClusterApplyListener() {
@Override
public void onSuccess(String source) { }
@ -155,7 +155,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
});
clusterApplierService.runOnApplierThread("test2",
currentState -> {
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(2).nanos();
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(2).millis();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
},
new ClusterApplyListener() {
@ -214,9 +214,9 @@ public class ClusterApplierServiceTests extends ESTestCase {
try {
final CountDownLatch latch = new CountDownLatch(4);
final CountDownLatch processedFirstTask = new CountDownLatch(1);
clusterApplierService.currentTimeOverride = System.nanoTime();
clusterApplierService.currentTimeOverride = threadPool.relativeTimeInMillis();
clusterApplierService.runOnApplierThread("test1",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(),
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).millis(),
new ClusterApplyListener() {
@Override
public void onSuccess(String source) {
@ -232,7 +232,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
processedFirstTask.await();
clusterApplierService.runOnApplierThread("test2",
currentState -> {
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(32).nanos();
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(32).millis();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
},
new ClusterApplyListener() {
@ -247,7 +247,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
}
});
clusterApplierService.runOnApplierThread("test3",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(34).nanos(),
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(34).millis(),
new ClusterApplyListener() {
@Override
public void onSuccess(String source) {
@ -510,11 +510,11 @@ public class ClusterApplierServiceTests extends ESTestCase {
}
@Override
protected long currentTimeInNanos() {
protected long currentTimeInMillis() {
if (currentTimeOverride != null) {
return currentTimeOverride;
}
return super.currentTimeInNanos();
return super.currentTimeInMillis();
}
}
}