diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index 45297358b91..d320ac1b763 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.admin.cluster.health; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; @@ -44,10 +46,12 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.function.Consumer; import java.util.function.Predicate; -public class TransportClusterHealthAction - extends StreamableTransportMasterNodeReadAction { +public class TransportClusterHealthAction extends StreamableTransportMasterNodeReadAction { + + private static final Logger logger = LogManager.getLogger(TransportClusterHealthAction.class); private final GatewayAllocator gatewayAllocator; @@ -85,129 +89,147 @@ public class TransportClusterHealthAction } @Override - protected void masterOperation(Task task, final ClusterHealthRequest request, final ClusterState unusedState, + protected void masterOperation(final Task task, + final ClusterHealthRequest request, + final ClusterState unusedState, final ActionListener listener) { + + final int waitCount = getWaitCount(request); + if (request.waitForEvents() != null) { - final long endTimeMS = TimeValue.nsecToMSec(System.nanoTime()) + request.timeout().millis(); - if (request.local()) { - clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", - new LocalClusterUpdateTask(request.waitForEvents()) { - @Override - public ClusterTasksResult execute(ClusterState currentState) { - return unchanged(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - final long timeoutInMillis = Math.max(0, endTimeMS - TimeValue.nsecToMSec(System.nanoTime())); - final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis); - request.timeout(newTimeout); - executeHealth(request, listener); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - listener.onFailure(e); - } - }); - } else { - clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", - new ClusterStateUpdateTask(request.waitForEvents()) { - @Override - public ClusterState execute(ClusterState currentState) { - return currentState; - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - final long timeoutInMillis = Math.max(0, endTimeMS - TimeValue.nsecToMSec(System.nanoTime())); - final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis); - request.timeout(newTimeout); - executeHealth(request, listener); - } - - @Override - public void onNoLongerMaster(String source) { - logger.trace("stopped being master while waiting for events with priority [{}]. retrying.", - request.waitForEvents()); - // TransportMasterNodeAction implements the retry logic, which is triggered by passing a NotMasterException - listener.onFailure(new NotMasterException("no longer master. source: [" + source + "]")); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - listener.onFailure(e); - } - }); - } + waitForEventsAndExecuteHealth(request, listener, waitCount, threadPool.relativeTimeInMillis() + request.timeout().millis()); } else { - executeHealth(request, listener); + executeHealth(request, clusterService.state(), listener, waitCount, + clusterState -> listener.onResponse(getResponse(request, clusterState, waitCount, false))); } - } - private void executeHealth(final ClusterHealthRequest request, final ActionListener listener) { - int waitFor = 0; - if (request.waitForStatus() != null) { - waitFor++; - } - if (request.waitForNoRelocatingShards()) { - waitFor++; - } - if (request.waitForNoInitializingShards()) { - waitFor++; - } - if (request.waitForActiveShards().equals(ActiveShardCount.NONE) == false) { - waitFor++; - } - if (request.waitForNodes().isEmpty() == false) { - waitFor++; - } - if (request.indices() != null && request.indices().length > 0) { // check that they actually exists in the meta data - waitFor++; - } + private void waitForEventsAndExecuteHealth(final ClusterHealthRequest request, + final ActionListener listener, + final int waitCount, + final long endTimeRelativeMillis) { + assert request.waitForEvents() != null; + if (request.local()) { + clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", + new LocalClusterUpdateTask(request.waitForEvents()) { + @Override + public ClusterTasksResult execute(ClusterState currentState) { + return unchanged(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis()); + final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis); + request.timeout(newTimeout); + executeHealth(request, clusterService.state(), listener, waitCount, + observedState -> waitForEventsAndExecuteHealth(request, listener, waitCount, endTimeRelativeMillis)); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e); + listener.onFailure(e); + } + }); + } else { + clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", + new ClusterStateUpdateTask(request.waitForEvents()) { + @Override + public ClusterState execute(ClusterState currentState) { + return currentState; + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis()); + final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis); + request.timeout(newTimeout); + executeHealth(request, newState, listener, waitCount, + observedState -> waitForEventsAndExecuteHealth(request, listener, waitCount, endTimeRelativeMillis)); + } + + @Override + public void onNoLongerMaster(String source) { + logger.trace("stopped being master while waiting for events with priority [{}]. retrying.", + request.waitForEvents()); + // TransportMasterNodeAction implements the retry logic, which is triggered by passing a NotMasterException + listener.onFailure(new NotMasterException("no longer master. source: [" + source + "]")); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e); + listener.onFailure(e); + } + }); + } + } + + private void executeHealth(final ClusterHealthRequest request, + final ClusterState currentState, + final ActionListener listener, + final int waitCount, + final Consumer onNewClusterStateAfterDelay) { - final ClusterState state = clusterService.state(); - final ClusterStateObserver observer = new ClusterStateObserver(state, clusterService, - null, logger, threadPool.getThreadContext()); if (request.timeout().millis() == 0) { - listener.onResponse(getResponse(request, state, waitFor, request.timeout().millis() == 0)); + listener.onResponse(getResponse(request, currentState, waitCount, true)); return; } - final int concreteWaitFor = waitFor; - final Predicate validationPredicate = newState -> validateRequest(request, newState, concreteWaitFor); - final ClusterStateObserver.Listener stateListener = new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState clusterState) { - listener.onResponse(getResponse(request, clusterState, concreteWaitFor, false)); - } - - @Override - public void onClusterServiceClose() { - listener.onFailure(new IllegalStateException("ClusterService was close during health call")); - } - - @Override - public void onTimeout(TimeValue timeout) { - final ClusterHealthResponse response = getResponse(request, observer.setAndGetObservedState(), concreteWaitFor, true); - listener.onResponse(response); - } - }; - if (validationPredicate.test(state)) { - stateListener.onNewClusterState(state); + final Predicate validationPredicate = newState -> validateRequest(request, newState, waitCount); + if (validationPredicate.test(currentState)) { + listener.onResponse(getResponse(request, currentState, waitCount, false)); } else { + final ClusterStateObserver observer + = new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext()); + final ClusterStateObserver.Listener stateListener = new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState newState) { + onNewClusterStateAfterDelay.accept(newState); + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new IllegalStateException("ClusterService was close during health call")); + } + + @Override + public void onTimeout(TimeValue timeout) { + listener.onResponse(getResponse(request, observer.setAndGetObservedState(), waitCount, true)); + } + }; observer.waitForNextChange(stateListener, validationPredicate, request.timeout()); } } - private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) { + private static int getWaitCount(ClusterHealthRequest request) { + int waitCount = 0; + if (request.waitForStatus() != null) { + waitCount++; + } + if (request.waitForNoRelocatingShards()) { + waitCount++; + } + if (request.waitForNoInitializingShards()) { + waitCount++; + } + if (request.waitForActiveShards().equals(ActiveShardCount.NONE) == false) { + waitCount++; + } + if (request.waitForNodes().isEmpty() == false) { + waitCount++; + } + if (request.indices() != null && request.indices().length > 0) { // check that they actually exists in the meta data + waitCount++; + } + return waitCount; + } + + private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitCount) { ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(), gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime()); - int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver); - return readyCounter == waitFor; + return prepareResponse(request, response, clusterState, indexNameExpressionResolver) == waitCount; } private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, @@ -227,7 +249,7 @@ public class TransportClusterHealthAction } static int prepareResponse(final ClusterHealthRequest request, final ClusterHealthResponse response, - final ClusterState clusterState, final IndexNameExpressionResolver indexNameExpressionResolver) { + final ClusterState clusterState, final IndexNameExpressionResolver indexNameExpressionResolver) { int waitForCounter = 0; if (request.waitForStatus() != null && response.getStatus().value() <= request.waitForStatus().value()) { waitForCounter++; diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterHealthIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterHealthIT.java index d0680e91b8b..656961411ea 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterHealthIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterHealthIT.java @@ -19,10 +19,12 @@ package org.elasticsearch.cluster; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESIntegTestCase; @@ -38,14 +40,16 @@ public class ClusterHealthIT extends ESIntegTestCase { public void testSimpleLocalHealth() { createIndex("test"); - ensureGreen(); // master should thing it's green now. + ensureGreen(); // master should think it's green now. - for (String node : internalCluster().getNodeNames()) { + for (final String node : internalCluster().getNodeNames()) { // a very high time out, which should never fire due to the local flag - ClusterHealthResponse health = client(node).admin().cluster().prepareHealth().setLocal(true) + logger.info("--> getting cluster health on [{}]", node); + final ClusterHealthResponse health = client(node).admin().cluster().prepareHealth().setLocal(true) .setWaitForEvents(Priority.LANGUID).setTimeout("30s").get("10s"); - assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - assertThat(health.isTimedOut(), equalTo(false)); + logger.info("--> got cluster health on [{}]", node); + assertFalse("timed out on " + node, health.isTimedOut()); + assertThat("health status on " + node, health.getStatus(), equalTo(ClusterHealthStatus.GREEN)); } } @@ -254,4 +258,40 @@ public class ClusterHealthIT extends ESIntegTestCase { clusterHealthThread.join(); } + public void testWaitForEventsRetriesIfOtherConditionsNotMet() throws Exception { + final ActionFuture healthResponseFuture + = client().admin().cluster().prepareHealth("index").setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute(); + + final AtomicBoolean keepSubmittingTasks = new AtomicBoolean(true); + final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()); + clusterService.submitStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) { + @Override + public ClusterState execute(ClusterState currentState) { + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + throw new AssertionError(source, e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (keepSubmittingTasks.get()) { + clusterService.submitStateUpdateTask("looping task", this); + } + } + }); + + createIndex("index"); + assertFalse(client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().get().isTimedOut()); + + // at this point the original health response should not have returned: there was never a point where the index was green AND + // the master had processed all pending tasks above LANGUID priority. + assertFalse(healthResponseFuture.isDone()); + + keepSubmittingTasks.set(false); + assertFalse(healthResponseFuture.get().isTimedOut()); + } + }