From cc8e8e6b89ba04c40d9b0be30c12c9a7899bfdb1 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 5 Nov 2014 13:47:50 +0100 Subject: [PATCH] [STATE] Observe cluster state on health request Today we use busy waiting and sampling when we execute HealthReqeusts on the master. This is tricky sicne we might sample a not yet fully applied cluster state and make a decsions base on the partial cluster state. This can lead to ugly problems since requests might be routed to nodes where shards are already marked as relocated but on the actual cluster state they are still started. Yet, this window is very small usually it can lead to ugly test failures. This commit moves the health request over to a listener pattern that gets the actual applied cluster state. Closes #8350 --- .../health/TransportClusterHealthAction.java | 236 ++++++++++-------- .../elasticsearch/cluster/ClusterService.java | 4 +- .../cluster/ClusterStateObserver.java | 3 +- .../service/InternalClusterService.java | 43 +++- .../recovery/RelocationTests.java | 3 +- 5 files changed, 167 insertions(+), 122 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index 71356e7c1e4..d645a248239 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -20,18 +20,17 @@ package org.elasticsearch.action.admin.cluster.health; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -56,8 +55,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati @Override protected String executor() { - // we block here... - return ThreadPool.Names.GENERIC; + // this should be executing quickly no need to fork off + return ThreadPool.Names.SAME; } @Override @@ -77,11 +76,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati @Override protected void masterOperation(final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener listener) throws ElasticsearchException { - long endTime = System.currentTimeMillis() + request.timeout().millis(); - if (request.waitForEvents() != null) { - final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference failure = new AtomicReference<>(); + final long endTime = System.currentTimeMillis() + request.timeout().millis(); clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", request.waitForEvents(), new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -90,14 +86,16 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); + final long timeoutInMillis = Math.max(0, endTime - System.currentTimeMillis()); + final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis); + request.timeout(newTimeout); + executeHealth(request, listener); } @Override public void onFailure(String source, Throwable t) { logger.error("unexpected failure during [{}]", t, source); - failure.set(new ElasticsearchException("Error while waiting for events", t)); - latch.countDown(); + listener.onFailure(t); } @Override @@ -105,18 +103,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati return !request.local(); } }); - - try { - latch.await(request.timeout().millis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // ignore - } - if (failure.get() != null) { - throw failure.get(); - } + } else { + executeHealth(request, listener); } + } + private void executeHealth(final ClusterHealthRequest request, final ActionListener listener) { int waitFor = 5; if (request.waitForStatus() == null) { waitFor--; @@ -133,99 +126,134 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati if (request.indices().length == 0) { // check that they actually exists in the meta data waitFor--; } - if (waitFor == 0) { + + assert waitFor >= 0; + final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger); + final ClusterState state = observer.observedState(); + if (waitFor == 0 || request.timeout().millis() == 0) { + // we check for a timeout here since this method might be called from the wait_for_events + // response handler which might have timed out already. + ClusterHealthResponse response = clusterHealth(request, state); + response.timedOut = request.timeout().millis() == 0; // no need to wait for anything - ClusterState clusterState = clusterService.state(); - listener.onResponse(clusterHealth(request, clusterState)); + listener.onResponse(response); return; } - while (true) { - int waitForCounter = 0; - ClusterState clusterState = clusterService.state(); - ClusterHealthResponse response = clusterHealth(request, clusterState); - if (request.waitForStatus() != null && response.getStatus().value() <= request.waitForStatus().value()) { - waitForCounter++; + final int concreteWaitFor = waitFor; + final ClusterStateObserver.ChangePredicate validationPredicate = new ClusterStateObserver.ValidationPredicate() { + @Override + protected boolean validate(ClusterState newState) { + return newState.status() == ClusterState.ClusterStateStatus.APPLIED && validateRequest(request, newState, concreteWaitFor); } - if (request.waitForRelocatingShards() != -1 && response.getRelocatingShards() <= request.waitForRelocatingShards()) { - waitForCounter++; + }; + + final ClusterStateObserver.Listener stateListener = new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState clusterState) { + listener.onResponse(getResponse(request, clusterState, concreteWaitFor, false)); } - if (request.waitForActiveShards() != -1 && response.getActiveShards() >= request.waitForActiveShards()) { - waitForCounter++; + + @Override + public void onClusterServiceClose() { + listener.onFailure(new ElasticsearchIllegalStateException("ClusterService was close during health call")); } - if (request.indices().length > 0) { - try { - clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), request.indices()); - waitForCounter++; - } catch (IndexMissingException e) { - response.status = ClusterHealthStatus.RED; // no indices, make sure its RED - // missing indices, wait a bit more... - } - } - if (!request.waitForNodes().isEmpty()) { - if (request.waitForNodes().startsWith(">=")) { - int expected = Integer.parseInt(request.waitForNodes().substring(2)); - if (response.getNumberOfNodes() >= expected) { - waitForCounter++; - } - } else if (request.waitForNodes().startsWith("ge(")) { - int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1)); - if (response.getNumberOfNodes() >= expected) { - waitForCounter++; - } - } else if (request.waitForNodes().startsWith("<=")) { - int expected = Integer.parseInt(request.waitForNodes().substring(2)); - if (response.getNumberOfNodes() <= expected) { - waitForCounter++; - } - } else if (request.waitForNodes().startsWith("le(")) { - int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1)); - if (response.getNumberOfNodes() <= expected) { - waitForCounter++; - } - } else if (request.waitForNodes().startsWith(">")) { - int expected = Integer.parseInt(request.waitForNodes().substring(1)); - if (response.getNumberOfNodes() > expected) { - waitForCounter++; - } - } else if (request.waitForNodes().startsWith("gt(")) { - int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1)); - if (response.getNumberOfNodes() > expected) { - waitForCounter++; - } - } else if (request.waitForNodes().startsWith("<")) { - int expected = Integer.parseInt(request.waitForNodes().substring(1)); - if (response.getNumberOfNodes() < expected) { - waitForCounter++; - } - } else if (request.waitForNodes().startsWith("lt(")) { - int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1)); - if (response.getNumberOfNodes() < expected) { - waitForCounter++; - } - } else { - int expected = Integer.parseInt(request.waitForNodes()); - if (response.getNumberOfNodes() == expected) { - waitForCounter++; - } - } - } - if (waitForCounter == waitFor) { + + @Override + public void onTimeout(TimeValue timeout) { + final ClusterState clusterState = clusterService.state(); + final ClusterHealthResponse response = getResponse(request, clusterState, concreteWaitFor, true); listener.onResponse(response); - return; - } - if (System.currentTimeMillis() > endTime) { - response.timedOut = true; - listener.onResponse(response); - return; } + }; + if (state.status() == ClusterState.ClusterStateStatus.APPLIED && validateRequest(request, state, concreteWaitFor)) { + stateListener.onNewClusterState(state); + } else { + observer.waitForNextChange(stateListener, validationPredicate, request.timeout()); + } + } + + private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) { + ClusterHealthResponse response = clusterHealth(request, clusterState); + return prepareResponse(request, response, clusterState, waitFor); + } + + private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor, boolean timedOut) { + ClusterHealthResponse response = clusterHealth(request, clusterState); + boolean valid = prepareResponse(request, response, clusterState, waitFor); + assert valid || timedOut; + response.timedOut = timedOut; + return response; + } + + private boolean prepareResponse(final ClusterHealthRequest request, final ClusterHealthResponse response, ClusterState clusterState, final int waitFor) { + int waitForCounter = 0; + if (request.waitForStatus() != null && response.getStatus().value() <= request.waitForStatus().value()) { + waitForCounter++; + } + if (request.waitForRelocatingShards() != -1 && response.getRelocatingShards() <= request.waitForRelocatingShards()) { + waitForCounter++; + } + if (request.waitForActiveShards() != -1 && response.getActiveShards() >= request.waitForActiveShards()) { + waitForCounter++; + } + if (request.indices().length > 0) { try { - Thread.sleep(200); - } catch (InterruptedException e) { - response.timedOut = true; - listener.onResponse(response); - return; + clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), request.indices()); + waitForCounter++; + } catch (IndexMissingException e) { + response.status = ClusterHealthStatus.RED; // no indices, make sure its RED + // missing indices, wait a bit more... } } + if (!request.waitForNodes().isEmpty()) { + if (request.waitForNodes().startsWith(">=")) { + int expected = Integer.parseInt(request.waitForNodes().substring(2)); + if (response.getNumberOfNodes() >= expected) { + waitForCounter++; + } + } else if (request.waitForNodes().startsWith("ge(")) { + int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1)); + if (response.getNumberOfNodes() >= expected) { + waitForCounter++; + } + } else if (request.waitForNodes().startsWith("<=")) { + int expected = Integer.parseInt(request.waitForNodes().substring(2)); + if (response.getNumberOfNodes() <= expected) { + waitForCounter++; + } + } else if (request.waitForNodes().startsWith("le(")) { + int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1)); + if (response.getNumberOfNodes() <= expected) { + waitForCounter++; + } + } else if (request.waitForNodes().startsWith(">")) { + int expected = Integer.parseInt(request.waitForNodes().substring(1)); + if (response.getNumberOfNodes() > expected) { + waitForCounter++; + } + } else if (request.waitForNodes().startsWith("gt(")) { + int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1)); + if (response.getNumberOfNodes() > expected) { + waitForCounter++; + } + } else if (request.waitForNodes().startsWith("<")) { + int expected = Integer.parseInt(request.waitForNodes().substring(1)); + if (response.getNumberOfNodes() < expected) { + waitForCounter++; + } + } else if (request.waitForNodes().startsWith("lt(")) { + int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1)); + if (response.getNumberOfNodes() < expected) { + waitForCounter++; + } + } else { + int expected = Integer.parseInt(request.waitForNodes()); + if (response.getNumberOfNodes() == expected) { + waitForCounter++; + } + } + } + return waitForCounter == waitFor; } diff --git a/src/main/java/org/elasticsearch/cluster/ClusterService.java b/src/main/java/org/elasticsearch/cluster/ClusterService.java index 080fce84a36..a626ab48832 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterService.java @@ -92,7 +92,9 @@ public interface ClusterService extends LifecycleComponent { void remove(LocalNodeMasterListener listener); /** - * Adds a cluster state listener that will timeout after the provided timeout. + * Adds a cluster state listener that will timeout after the provided timeout, + * and is executed after the clusterstate has been successfully applied ie. is + * in state {@link org.elasticsearch.cluster.ClusterState.ClusterStateStatus#APPLIED} */ void add(TimeValue timeout, TimeoutClusterStateListener listener); diff --git a/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java b/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java index dda9301b096..de5e031a4e6 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java @@ -42,7 +42,6 @@ public class ClusterStateObserver { return changedEvent.previousState().version() != changedEvent.state().version(); } }; - private ClusterService clusterService; volatile TimeValue timeOutValue; @@ -241,7 +240,7 @@ public class ClusterStateObserver { } } - public interface Listener { + public static interface Listener { /** called when a new state is observed */ void onNewClusterState(ClusterState state); diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 1c6ba204e0d..a753006d165 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.service; +import com.google.common.collect.Iterables; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.Version; @@ -76,9 +77,19 @@ public class InternalClusterService extends AbstractLifecycleComponent priorityClusterStateListeners = new CopyOnWriteArrayList<>(); - private final List clusterStateListeners = new CopyOnWriteArrayList<>(); - private final List lastClusterStateListeners = new CopyOnWriteArrayList<>(); + /** + * Those 3 state listeners are changing infrequently - CopyOnWriteArrayList is just fine + */ + private final Collection priorityClusterStateListeners = new CopyOnWriteArrayList<>(); + private final Collection clusterStateListeners = new CopyOnWriteArrayList<>(); + private final Collection lastClusterStateListeners = new CopyOnWriteArrayList<>(); + // TODO this is rather frequently changing I guess a Synced Set would be better here and a dedicated remove API + private final Collection postAppliedListeners = new CopyOnWriteArrayList<>(); + private final Iterable preAppliedListeners = Iterables.concat( + priorityClusterStateListeners, + clusterStateListeners, + lastClusterStateListeners); + private final LocalNodeMasterListeners localNodeMasterListeners; private final Queue onGoingTimeouts = ConcurrentCollections.newQueue(); @@ -197,6 +208,7 @@ public class InternalClusterService extends AbstractLifecycleComponent it = onGoingTimeouts.iterator(); it.hasNext(); ) { NotifyTimeout timeout = it.next(); if (timeout.listener.equals(listener)) { @@ -229,7 +241,7 @@ public class InternalClusterService extends AbstractLifecycleComponent DONE relocate the shard from {} to {}", fromNode, toNode); logger.debug("--> verifying all searches return the same number of docs");