Cluster health should await events plus other things (#44348)
Today a cluster health request can wait on a selection of conditions, but it does not guarantee that all of these conditions have ever held simultaneously when it returns. More specifically, if a request sets `waitForEvents()` along with some other conditions then Elasticsearch will respond when the master has processed all the expected pending tasks _and then_ the cluster satisfied the other conditions, but it may be that at the time the cluster satisfied the other conditions there were undesired pending tasks again. This commit adjusts the behaviour of `waitForEvents()` to wait for all the required events to be processed and then, if the resulting cluster state does not satisfy the other conditions, it will wait until there is a cluster state that does and then retry the wait-for-events too.
This commit is contained in:
parent
5c8275cd2c
commit
8d68d1f54d
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.elasticsearch.action.admin.cluster.health;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
|
@ -44,10 +46,12 @@ import org.elasticsearch.tasks.Task;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class TransportClusterHealthAction
|
||||
extends StreamableTransportMasterNodeReadAction<ClusterHealthRequest, ClusterHealthResponse> {
|
||||
public class TransportClusterHealthAction extends StreamableTransportMasterNodeReadAction<ClusterHealthRequest, ClusterHealthResponse> {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(TransportClusterHealthAction.class);
|
||||
|
||||
private final GatewayAllocator gatewayAllocator;
|
||||
|
||||
|
@ -85,129 +89,147 @@ public class TransportClusterHealthAction
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(Task task, final ClusterHealthRequest request, final ClusterState unusedState,
|
||||
protected void masterOperation(final Task task,
|
||||
final ClusterHealthRequest request,
|
||||
final ClusterState unusedState,
|
||||
final ActionListener<ClusterHealthResponse> listener) {
|
||||
|
||||
final int waitCount = getWaitCount(request);
|
||||
|
||||
if (request.waitForEvents() != null) {
|
||||
final long endTimeMS = TimeValue.nsecToMSec(System.nanoTime()) + request.timeout().millis();
|
||||
if (request.local()) {
|
||||
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])",
|
||||
new LocalClusterUpdateTask(request.waitForEvents()) {
|
||||
@Override
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
final long timeoutInMillis = Math.max(0, endTimeMS - TimeValue.nsecToMSec(System.nanoTime()));
|
||||
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
|
||||
request.timeout(newTimeout);
|
||||
executeHealth(request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])",
|
||||
new ClusterStateUpdateTask(request.waitForEvents()) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return currentState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
final long timeoutInMillis = Math.max(0, endTimeMS - TimeValue.nsecToMSec(System.nanoTime()));
|
||||
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
|
||||
request.timeout(newTimeout);
|
||||
executeHealth(request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNoLongerMaster(String source) {
|
||||
logger.trace("stopped being master while waiting for events with priority [{}]. retrying.",
|
||||
request.waitForEvents());
|
||||
// TransportMasterNodeAction implements the retry logic, which is triggered by passing a NotMasterException
|
||||
listener.onFailure(new NotMasterException("no longer master. source: [" + source + "]"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
waitForEventsAndExecuteHealth(request, listener, waitCount, threadPool.relativeTimeInMillis() + request.timeout().millis());
|
||||
} else {
|
||||
executeHealth(request, listener);
|
||||
executeHealth(request, clusterService.state(), listener, waitCount,
|
||||
clusterState -> listener.onResponse(getResponse(request, clusterState, waitCount, false)));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void executeHealth(final ClusterHealthRequest request, final ActionListener<ClusterHealthResponse> listener) {
|
||||
int waitFor = 0;
|
||||
if (request.waitForStatus() != null) {
|
||||
waitFor++;
|
||||
}
|
||||
if (request.waitForNoRelocatingShards()) {
|
||||
waitFor++;
|
||||
}
|
||||
if (request.waitForNoInitializingShards()) {
|
||||
waitFor++;
|
||||
}
|
||||
if (request.waitForActiveShards().equals(ActiveShardCount.NONE) == false) {
|
||||
waitFor++;
|
||||
}
|
||||
if (request.waitForNodes().isEmpty() == false) {
|
||||
waitFor++;
|
||||
}
|
||||
if (request.indices() != null && request.indices().length > 0) { // check that they actually exists in the meta data
|
||||
waitFor++;
|
||||
}
|
||||
private void waitForEventsAndExecuteHealth(final ClusterHealthRequest request,
|
||||
final ActionListener<ClusterHealthResponse> listener,
|
||||
final int waitCount,
|
||||
final long endTimeRelativeMillis) {
|
||||
assert request.waitForEvents() != null;
|
||||
if (request.local()) {
|
||||
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])",
|
||||
new LocalClusterUpdateTask(request.waitForEvents()) {
|
||||
@Override
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis());
|
||||
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
|
||||
request.timeout(newTimeout);
|
||||
executeHealth(request, clusterService.state(), listener, waitCount,
|
||||
observedState -> waitForEventsAndExecuteHealth(request, listener, waitCount, endTimeRelativeMillis));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])",
|
||||
new ClusterStateUpdateTask(request.waitForEvents()) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return currentState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis());
|
||||
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
|
||||
request.timeout(newTimeout);
|
||||
executeHealth(request, newState, listener, waitCount,
|
||||
observedState -> waitForEventsAndExecuteHealth(request, listener, waitCount, endTimeRelativeMillis));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNoLongerMaster(String source) {
|
||||
logger.trace("stopped being master while waiting for events with priority [{}]. retrying.",
|
||||
request.waitForEvents());
|
||||
// TransportMasterNodeAction implements the retry logic, which is triggered by passing a NotMasterException
|
||||
listener.onFailure(new NotMasterException("no longer master. source: [" + source + "]"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void executeHealth(final ClusterHealthRequest request,
|
||||
final ClusterState currentState,
|
||||
final ActionListener<ClusterHealthResponse> listener,
|
||||
final int waitCount,
|
||||
final Consumer<ClusterState> onNewClusterStateAfterDelay) {
|
||||
|
||||
final ClusterState state = clusterService.state();
|
||||
final ClusterStateObserver observer = new ClusterStateObserver(state, clusterService,
|
||||
null, logger, threadPool.getThreadContext());
|
||||
if (request.timeout().millis() == 0) {
|
||||
listener.onResponse(getResponse(request, state, waitFor, request.timeout().millis() == 0));
|
||||
listener.onResponse(getResponse(request, currentState, waitCount, true));
|
||||
return;
|
||||
}
|
||||
final int concreteWaitFor = waitFor;
|
||||
final Predicate<ClusterState> validationPredicate = newState -> validateRequest(request, newState, concreteWaitFor);
|
||||
|
||||
final ClusterStateObserver.Listener stateListener = new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState clusterState) {
|
||||
listener.onResponse(getResponse(request, clusterState, concreteWaitFor, false));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new IllegalStateException("ClusterService was close during health call"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
final ClusterHealthResponse response = getResponse(request, observer.setAndGetObservedState(), concreteWaitFor, true);
|
||||
listener.onResponse(response);
|
||||
}
|
||||
};
|
||||
if (validationPredicate.test(state)) {
|
||||
stateListener.onNewClusterState(state);
|
||||
final Predicate<ClusterState> validationPredicate = newState -> validateRequest(request, newState, waitCount);
|
||||
if (validationPredicate.test(currentState)) {
|
||||
listener.onResponse(getResponse(request, currentState, waitCount, false));
|
||||
} else {
|
||||
final ClusterStateObserver observer
|
||||
= new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext());
|
||||
final ClusterStateObserver.Listener stateListener = new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState newState) {
|
||||
onNewClusterStateAfterDelay.accept(newState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new IllegalStateException("ClusterService was close during health call"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onResponse(getResponse(request, observer.setAndGetObservedState(), waitCount, true));
|
||||
}
|
||||
};
|
||||
observer.waitForNextChange(stateListener, validationPredicate, request.timeout());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) {
|
||||
private static int getWaitCount(ClusterHealthRequest request) {
|
||||
int waitCount = 0;
|
||||
if (request.waitForStatus() != null) {
|
||||
waitCount++;
|
||||
}
|
||||
if (request.waitForNoRelocatingShards()) {
|
||||
waitCount++;
|
||||
}
|
||||
if (request.waitForNoInitializingShards()) {
|
||||
waitCount++;
|
||||
}
|
||||
if (request.waitForActiveShards().equals(ActiveShardCount.NONE) == false) {
|
||||
waitCount++;
|
||||
}
|
||||
if (request.waitForNodes().isEmpty() == false) {
|
||||
waitCount++;
|
||||
}
|
||||
if (request.indices() != null && request.indices().length > 0) { // check that they actually exists in the meta data
|
||||
waitCount++;
|
||||
}
|
||||
return waitCount;
|
||||
}
|
||||
|
||||
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitCount) {
|
||||
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
|
||||
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime());
|
||||
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
|
||||
return readyCounter == waitFor;
|
||||
return prepareResponse(request, response, clusterState, indexNameExpressionResolver) == waitCount;
|
||||
}
|
||||
|
||||
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState,
|
||||
|
@ -227,7 +249,7 @@ public class TransportClusterHealthAction
|
|||
}
|
||||
|
||||
static int prepareResponse(final ClusterHealthRequest request, final ClusterHealthResponse response,
|
||||
final ClusterState clusterState, final IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
final ClusterState clusterState, final IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
int waitForCounter = 0;
|
||||
if (request.waitForStatus() != null && response.getStatus().value() <= request.waitForStatus().value()) {
|
||||
waitForCounter++;
|
||||
|
|
|
@ -19,10 +19,12 @@
|
|||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
@ -38,14 +40,16 @@ public class ClusterHealthIT extends ESIntegTestCase {
|
|||
|
||||
public void testSimpleLocalHealth() {
|
||||
createIndex("test");
|
||||
ensureGreen(); // master should thing it's green now.
|
||||
ensureGreen(); // master should think it's green now.
|
||||
|
||||
for (String node : internalCluster().getNodeNames()) {
|
||||
for (final String node : internalCluster().getNodeNames()) {
|
||||
// a very high time out, which should never fire due to the local flag
|
||||
ClusterHealthResponse health = client(node).admin().cluster().prepareHealth().setLocal(true)
|
||||
logger.info("--> getting cluster health on [{}]", node);
|
||||
final ClusterHealthResponse health = client(node).admin().cluster().prepareHealth().setLocal(true)
|
||||
.setWaitForEvents(Priority.LANGUID).setTimeout("30s").get("10s");
|
||||
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN));
|
||||
assertThat(health.isTimedOut(), equalTo(false));
|
||||
logger.info("--> got cluster health on [{}]", node);
|
||||
assertFalse("timed out on " + node, health.isTimedOut());
|
||||
assertThat("health status on " + node, health.getStatus(), equalTo(ClusterHealthStatus.GREEN));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -254,4 +258,40 @@ public class ClusterHealthIT extends ESIntegTestCase {
|
|||
clusterHealthThread.join();
|
||||
}
|
||||
|
||||
public void testWaitForEventsRetriesIfOtherConditionsNotMet() throws Exception {
|
||||
final ActionFuture<ClusterHealthResponse> healthResponseFuture
|
||||
= client().admin().cluster().prepareHealth("index").setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute();
|
||||
|
||||
final AtomicBoolean keepSubmittingTasks = new AtomicBoolean(true);
|
||||
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
|
||||
clusterService.submitStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return currentState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
throw new AssertionError(source, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
if (keepSubmittingTasks.get()) {
|
||||
clusterService.submitStateUpdateTask("looping task", this);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
createIndex("index");
|
||||
assertFalse(client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().get().isTimedOut());
|
||||
|
||||
// at this point the original health response should not have returned: there was never a point where the index was green AND
|
||||
// the master had processed all pending tasks above LANGUID priority.
|
||||
assertFalse(healthResponseFuture.isDone());
|
||||
|
||||
keepSubmittingTasks.set(false);
|
||||
assertFalse(healthResponseFuture.get().isTimedOut());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue