Timeout health API on busy master (#57587)
Today `GET _cluster/health?wait_for_events=...&timeout=...` will wait indefinitely for the master to process the pending cluster health task, ignoring the specified timeout. This could take a very long time if the master is overloaded. This commit fixes this by adding a timeout to the pending cluster health task.
This commit is contained in:
parent
5f8442d1f4
commit
fc4dd6d681
|
@ -22,11 +22,13 @@ package org.elasticsearch.cluster;
|
|||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
|
||||
|
@ -261,12 +263,13 @@ public class ClusterHealthIT extends ESIntegTestCase {
|
|||
clusterHealthThread.join();
|
||||
}
|
||||
|
||||
public void testWaitForEventsRetriesIfOtherConditionsNotMet() throws Exception {
|
||||
public void testWaitForEventsRetriesIfOtherConditionsNotMet() {
|
||||
final ActionFuture<ClusterHealthResponse> healthResponseFuture
|
||||
= client().admin().cluster().prepareHealth("index").setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute();
|
||||
|
||||
final AtomicBoolean keepSubmittingTasks = new AtomicBoolean(true);
|
||||
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
|
||||
final PlainActionFuture<Void> completionFuture = new PlainActionFuture<>();
|
||||
clusterService.submitStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
|
@ -275,6 +278,7 @@ public class ClusterHealthIT extends ESIntegTestCase {
|
|||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
completionFuture.onFailure(e);
|
||||
throw new AssertionError(source, e);
|
||||
}
|
||||
|
||||
|
@ -282,19 +286,25 @@ public class ClusterHealthIT extends ESIntegTestCase {
|
|||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
if (keepSubmittingTasks.get()) {
|
||||
clusterService.submitStateUpdateTask("looping task", this);
|
||||
} else {
|
||||
completionFuture.onResponse(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
createIndex("index");
|
||||
assertFalse(client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().get().isTimedOut());
|
||||
try {
|
||||
createIndex("index");
|
||||
assertFalse(client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().get().isTimedOut());
|
||||
|
||||
// at this point the original health response should not have returned: there was never a point where the index was green AND
|
||||
// the master had processed all pending tasks above LANGUID priority.
|
||||
assertFalse(healthResponseFuture.isDone());
|
||||
|
||||
keepSubmittingTasks.set(false);
|
||||
assertFalse(healthResponseFuture.get().isTimedOut());
|
||||
// at this point the original health response should not have returned: there was never a point where the index was green AND
|
||||
// the master had processed all pending tasks above LANGUID priority.
|
||||
assertFalse(healthResponseFuture.isDone());
|
||||
keepSubmittingTasks.set(false);
|
||||
assertFalse(healthResponseFuture.actionGet(TimeValue.timeValueSeconds(30)).isTimedOut());
|
||||
} finally {
|
||||
keepSubmittingTasks.set(false);
|
||||
completionFuture.actionGet(TimeValue.timeValueSeconds(30));
|
||||
}
|
||||
}
|
||||
|
||||
public void testHealthOnMasterFailover() throws Exception {
|
||||
|
@ -311,4 +321,42 @@ public class ClusterHealthIT extends ESIntegTestCase {
|
|||
assertSame(responseFuture.get().getStatus(), ClusterHealthStatus.GREEN);
|
||||
}
|
||||
}
|
||||
|
||||
public void testWaitForEventsTimesOutIfMasterBusy() {
|
||||
final AtomicBoolean keepSubmittingTasks = new AtomicBoolean(true);
|
||||
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
|
||||
final PlainActionFuture<Void> completionFuture = new PlainActionFuture<>();
|
||||
clusterService.submitStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return currentState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
completionFuture.onFailure(e);
|
||||
throw new AssertionError(source, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
if (keepSubmittingTasks.get()) {
|
||||
clusterService.submitStateUpdateTask("looping task", this);
|
||||
} else {
|
||||
completionFuture.onResponse(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
final ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth()
|
||||
.setWaitForEvents(Priority.LANGUID)
|
||||
.setTimeout(TimeValue.timeValueSeconds(1))
|
||||
.get(TimeValue.timeValueSeconds(30));
|
||||
assertTrue(clusterHealthResponse.isTimedOut());
|
||||
} finally {
|
||||
keepSubmittingTasks.set(false);
|
||||
completionFuture.actionGet(TimeValue.timeValueSeconds(30));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.cluster.NotMasterException;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
|
@ -103,7 +104,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
|||
waitForEventsAndExecuteHealth(request, listener, waitCount, threadPool.relativeTimeInMillis() + request.timeout().millis());
|
||||
} else {
|
||||
executeHealth(request, clusterService.state(), listener, waitCount,
|
||||
clusterState -> listener.onResponse(getResponse(request, clusterState, waitCount, false)));
|
||||
clusterState -> listener.onResponse(getResponse(request, clusterState, waitCount, TimeoutState.OK)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -136,6 +137,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
|||
}
|
||||
});
|
||||
} else {
|
||||
final TimeValue taskTimeout = TimeValue.timeValueMillis(Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis()));
|
||||
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])",
|
||||
new ClusterStateUpdateTask(request.waitForEvents()) {
|
||||
@Override
|
||||
|
@ -143,6 +145,11 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
|||
return currentState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return taskTimeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis());
|
||||
|
@ -168,8 +175,12 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
|||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
|
||||
listener.onFailure(e);
|
||||
if (e instanceof ProcessClusterEventTimeoutException) {
|
||||
listener.onResponse(getResponse(request, clusterService.state(), waitCount, TimeoutState.TIMED_OUT));
|
||||
} else {
|
||||
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -182,13 +193,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
|||
final Consumer<ClusterState> onNewClusterStateAfterDelay) {
|
||||
|
||||
if (request.timeout().millis() == 0) {
|
||||
listener.onResponse(getResponse(request, currentState, waitCount, true));
|
||||
listener.onResponse(getResponse(request, currentState, waitCount, TimeoutState.ZERO_TIMEOUT));
|
||||
return;
|
||||
}
|
||||
|
||||
final Predicate<ClusterState> validationPredicate = newState -> validateRequest(request, newState, waitCount);
|
||||
if (validationPredicate.test(currentState)) {
|
||||
listener.onResponse(getResponse(request, currentState, waitCount, false));
|
||||
listener.onResponse(getResponse(request, currentState, waitCount, TimeoutState.OK));
|
||||
} else {
|
||||
final ClusterStateObserver observer
|
||||
= new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext());
|
||||
|
@ -205,7 +216,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
|||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onResponse(getResponse(request, observer.setAndGetObservedState(), waitCount, true));
|
||||
listener.onResponse(getResponse(request, observer.setAndGetObservedState(), waitCount, TimeoutState.TIMED_OUT));
|
||||
}
|
||||
};
|
||||
observer.waitForNextChange(stateListener, validationPredicate, request.timeout());
|
||||
|
@ -241,19 +252,23 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
|||
return prepareResponse(request, response, clusterState, indexNameExpressionResolver) == waitCount;
|
||||
}
|
||||
|
||||
private enum TimeoutState {
|
||||
OK,
|
||||
TIMED_OUT,
|
||||
ZERO_TIMEOUT
|
||||
}
|
||||
|
||||
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState,
|
||||
final int waitFor, boolean timedOut) {
|
||||
final int waitFor, TimeoutState timeoutState) {
|
||||
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
|
||||
allocationService.getNumberOfInFlightFetches(), clusterService.getMasterService().getMaxTaskWaitTime());
|
||||
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
|
||||
boolean valid = (readyCounter == waitFor);
|
||||
assert valid || timedOut;
|
||||
// we check for a timeout here since this method might be called from the wait_for_events
|
||||
// response handler which might have timed out already.
|
||||
// if the state is sufficient for what we where waiting for we don't need to mark this as timedOut.
|
||||
// We spend too much time in waiting for events such that we might already reached a valid state.
|
||||
// this should not mark the request as timed out
|
||||
response.setTimedOut(timedOut && valid == false);
|
||||
assert valid || (timeoutState != TimeoutState.OK);
|
||||
// If valid && timeoutState == TimeoutState.ZERO_TIMEOUT then we immediately found **and processed** a valid state, so we don't
|
||||
// consider this a timeout. However if timeoutState == TimeoutState.TIMED_OUT then we didn't process a valid state (perhaps we
|
||||
// failed on wait_for_events) so this does count as a timeout.
|
||||
response.setTimedOut(valid == false || timeoutState == TimeoutState.TIMED_OUT);
|
||||
return response;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue