diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterHealthIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterHealthIT.java index 9beba672767..7488e24b881 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterHealthIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterHealthIT.java @@ -22,11 +22,13 @@ package org.elasticsearch.cluster; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; @@ -261,12 +263,13 @@ public class ClusterHealthIT extends ESIntegTestCase { clusterHealthThread.join(); } - public void testWaitForEventsRetriesIfOtherConditionsNotMet() throws Exception { + public void testWaitForEventsRetriesIfOtherConditionsNotMet() { final ActionFuture healthResponseFuture = client().admin().cluster().prepareHealth("index").setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute(); final AtomicBoolean keepSubmittingTasks = new AtomicBoolean(true); final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()); + final PlainActionFuture completionFuture = new PlainActionFuture<>(); clusterService.submitStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) { @Override public ClusterState execute(ClusterState currentState) { @@ -275,6 +278,7 @@ public class ClusterHealthIT extends ESIntegTestCase { @Override public void onFailure(String source, Exception e) { + completionFuture.onFailure(e); throw new AssertionError(source, e); } @@ -282,19 +286,25 @@ public class ClusterHealthIT extends ESIntegTestCase { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (keepSubmittingTasks.get()) { clusterService.submitStateUpdateTask("looping task", this); + } else { + completionFuture.onResponse(null); } } }); - createIndex("index"); - assertFalse(client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().get().isTimedOut()); + try { + createIndex("index"); + assertFalse(client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().get().isTimedOut()); - // at this point the original health response should not have returned: there was never a point where the index was green AND - // the master had processed all pending tasks above LANGUID priority. - assertFalse(healthResponseFuture.isDone()); - - keepSubmittingTasks.set(false); - assertFalse(healthResponseFuture.get().isTimedOut()); + // at this point the original health response should not have returned: there was never a point where the index was green AND + // the master had processed all pending tasks above LANGUID priority. + assertFalse(healthResponseFuture.isDone()); + keepSubmittingTasks.set(false); + assertFalse(healthResponseFuture.actionGet(TimeValue.timeValueSeconds(30)).isTimedOut()); + } finally { + keepSubmittingTasks.set(false); + completionFuture.actionGet(TimeValue.timeValueSeconds(30)); + } } public void testHealthOnMasterFailover() throws Exception { @@ -311,4 +321,42 @@ public class ClusterHealthIT extends ESIntegTestCase { assertSame(responseFuture.get().getStatus(), ClusterHealthStatus.GREEN); } } + + public void testWaitForEventsTimesOutIfMasterBusy() { + final AtomicBoolean keepSubmittingTasks = new AtomicBoolean(true); + final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()); + final PlainActionFuture completionFuture = new PlainActionFuture<>(); + clusterService.submitStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) { + @Override + public ClusterState execute(ClusterState currentState) { + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + completionFuture.onFailure(e); + throw new AssertionError(source, e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (keepSubmittingTasks.get()) { + clusterService.submitStateUpdateTask("looping task", this); + } else { + completionFuture.onResponse(null); + } + } + }); + + try { + final ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setTimeout(TimeValue.timeValueSeconds(1)) + .get(TimeValue.timeValueSeconds(30)); + assertTrue(clusterHealthResponse.isTimedOut()); + } finally { + keepSubmittingTasks.set(false); + completionFuture.actionGet(TimeValue.timeValueSeconds(30)); + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index e3102b14207..12bcde03588 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; @@ -103,7 +104,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction< waitForEventsAndExecuteHealth(request, listener, waitCount, threadPool.relativeTimeInMillis() + request.timeout().millis()); } else { executeHealth(request, clusterService.state(), listener, waitCount, - clusterState -> listener.onResponse(getResponse(request, clusterState, waitCount, false))); + clusterState -> listener.onResponse(getResponse(request, clusterState, waitCount, TimeoutState.OK))); } } @@ -136,6 +137,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction< } }); } else { + final TimeValue taskTimeout = TimeValue.timeValueMillis(Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis())); clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", new ClusterStateUpdateTask(request.waitForEvents()) { @Override @@ -143,6 +145,11 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction< return currentState; } + @Override + public TimeValue timeout() { + return taskTimeout; + } + @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis()); @@ -168,8 +175,12 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction< @Override public void onFailure(String source, Exception e) { - logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - listener.onFailure(e); + if (e instanceof ProcessClusterEventTimeoutException) { + listener.onResponse(getResponse(request, clusterService.state(), waitCount, TimeoutState.TIMED_OUT)); + } else { + logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e); + listener.onFailure(e); + } } }); } @@ -182,13 +193,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction< final Consumer onNewClusterStateAfterDelay) { if (request.timeout().millis() == 0) { - listener.onResponse(getResponse(request, currentState, waitCount, true)); + listener.onResponse(getResponse(request, currentState, waitCount, TimeoutState.ZERO_TIMEOUT)); return; } final Predicate validationPredicate = newState -> validateRequest(request, newState, waitCount); if (validationPredicate.test(currentState)) { - listener.onResponse(getResponse(request, currentState, waitCount, false)); + listener.onResponse(getResponse(request, currentState, waitCount, TimeoutState.OK)); } else { final ClusterStateObserver observer = new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext()); @@ -205,7 +216,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction< @Override public void onTimeout(TimeValue timeout) { - listener.onResponse(getResponse(request, observer.setAndGetObservedState(), waitCount, true)); + listener.onResponse(getResponse(request, observer.setAndGetObservedState(), waitCount, TimeoutState.TIMED_OUT)); } }; observer.waitForNextChange(stateListener, validationPredicate, request.timeout()); @@ -241,19 +252,23 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction< return prepareResponse(request, response, clusterState, indexNameExpressionResolver) == waitCount; } + private enum TimeoutState { + OK, + TIMED_OUT, + ZERO_TIMEOUT + } + private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, - final int waitFor, boolean timedOut) { + final int waitFor, TimeoutState timeoutState) { ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(), allocationService.getNumberOfInFlightFetches(), clusterService.getMasterService().getMaxTaskWaitTime()); int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver); boolean valid = (readyCounter == waitFor); - assert valid || timedOut; - // we check for a timeout here since this method might be called from the wait_for_events - // response handler which might have timed out already. - // if the state is sufficient for what we where waiting for we don't need to mark this as timedOut. - // We spend too much time in waiting for events such that we might already reached a valid state. - // this should not mark the request as timed out - response.setTimedOut(timedOut && valid == false); + assert valid || (timeoutState != TimeoutState.OK); + // If valid && timeoutState == TimeoutState.ZERO_TIMEOUT then we immediately found **and processed** a valid state, so we don't + // consider this a timeout. However if timeoutState == TimeoutState.TIMED_OUT then we didn't process a valid state (perhaps we + // failed on wait_for_events) so this does count as a timeout. + response.setTimedOut(valid == false || timeoutState == TimeoutState.TIMED_OUT); return response; }