[STATE] Observe cluster state on health request
Today we use busy waiting and sampling when we execute HealthReqeusts on the master. This is tricky sicne we might sample a not yet fully applied cluster state and make a decsions base on the partial cluster state. This can lead to ugly problems since requests might be routed to nodes where shards are already marked as relocated but on the actual cluster state they are still started. Yet, this window is very small usually it can lead to ugly test failures. This commit moves the health request over to a listener pattern that gets the actual applied cluster state. Closes #8350
This commit is contained in:
parent
9192219f13
commit
cc8e8e6b89
|
@ -20,18 +20,17 @@
|
||||||
package org.elasticsearch.action.admin.cluster.health;
|
package org.elasticsearch.action.admin.cluster.health;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.IndicesOptions;
|
import org.elasticsearch.action.support.IndicesOptions;
|
||||||
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
|
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.*;
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
|
||||||
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
|
|
||||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.indices.IndexMissingException;
|
import org.elasticsearch.indices.IndexMissingException;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
@ -56,8 +55,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String executor() {
|
protected String executor() {
|
||||||
// we block here...
|
// this should be executing quickly no need to fork off
|
||||||
return ThreadPool.Names.GENERIC;
|
return ThreadPool.Names.SAME;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -77,11 +76,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void masterOperation(final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener<ClusterHealthResponse> listener) throws ElasticsearchException {
|
protected void masterOperation(final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener<ClusterHealthResponse> listener) throws ElasticsearchException {
|
||||||
long endTime = System.currentTimeMillis() + request.timeout().millis();
|
|
||||||
|
|
||||||
if (request.waitForEvents() != null) {
|
if (request.waitForEvents() != null) {
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final long endTime = System.currentTimeMillis() + request.timeout().millis();
|
||||||
final AtomicReference<ElasticsearchException> failure = new AtomicReference<>();
|
|
||||||
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", request.waitForEvents(), new ProcessedClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", request.waitForEvents(), new ProcessedClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
|
@ -90,14 +86,16 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
latch.countDown();
|
final long timeoutInMillis = Math.max(0, endTime - System.currentTimeMillis());
|
||||||
|
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
|
||||||
|
request.timeout(newTimeout);
|
||||||
|
executeHealth(request, listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(String source, Throwable t) {
|
public void onFailure(String source, Throwable t) {
|
||||||
logger.error("unexpected failure during [{}]", t, source);
|
logger.error("unexpected failure during [{}]", t, source);
|
||||||
failure.set(new ElasticsearchException("Error while waiting for events", t));
|
listener.onFailure(t);
|
||||||
latch.countDown();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -105,18 +103,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
|
||||||
return !request.local();
|
return !request.local();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
} else {
|
||||||
try {
|
executeHealth(request, listener);
|
||||||
latch.await(request.timeout().millis(), TimeUnit.MILLISECONDS);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// ignore
|
|
||||||
}
|
|
||||||
if (failure.get() != null) {
|
|
||||||
throw failure.get();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void executeHealth(final ClusterHealthRequest request, final ActionListener<ClusterHealthResponse> listener) {
|
||||||
int waitFor = 5;
|
int waitFor = 5;
|
||||||
if (request.waitForStatus() == null) {
|
if (request.waitForStatus() == null) {
|
||||||
waitFor--;
|
waitFor--;
|
||||||
|
@ -133,99 +126,134 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
|
||||||
if (request.indices().length == 0) { // check that they actually exists in the meta data
|
if (request.indices().length == 0) { // check that they actually exists in the meta data
|
||||||
waitFor--;
|
waitFor--;
|
||||||
}
|
}
|
||||||
if (waitFor == 0) {
|
|
||||||
|
assert waitFor >= 0;
|
||||||
|
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger);
|
||||||
|
final ClusterState state = observer.observedState();
|
||||||
|
if (waitFor == 0 || request.timeout().millis() == 0) {
|
||||||
|
// we check for a timeout here since this method might be called from the wait_for_events
|
||||||
|
// response handler which might have timed out already.
|
||||||
|
ClusterHealthResponse response = clusterHealth(request, state);
|
||||||
|
response.timedOut = request.timeout().millis() == 0;
|
||||||
// no need to wait for anything
|
// no need to wait for anything
|
||||||
ClusterState clusterState = clusterService.state();
|
listener.onResponse(response);
|
||||||
listener.onResponse(clusterHealth(request, clusterState));
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
while (true) {
|
final int concreteWaitFor = waitFor;
|
||||||
int waitForCounter = 0;
|
final ClusterStateObserver.ChangePredicate validationPredicate = new ClusterStateObserver.ValidationPredicate() {
|
||||||
ClusterState clusterState = clusterService.state();
|
@Override
|
||||||
ClusterHealthResponse response = clusterHealth(request, clusterState);
|
protected boolean validate(ClusterState newState) {
|
||||||
if (request.waitForStatus() != null && response.getStatus().value() <= request.waitForStatus().value()) {
|
return newState.status() == ClusterState.ClusterStateStatus.APPLIED && validateRequest(request, newState, concreteWaitFor);
|
||||||
waitForCounter++;
|
|
||||||
}
|
}
|
||||||
if (request.waitForRelocatingShards() != -1 && response.getRelocatingShards() <= request.waitForRelocatingShards()) {
|
};
|
||||||
waitForCounter++;
|
|
||||||
|
final ClusterStateObserver.Listener stateListener = new ClusterStateObserver.Listener() {
|
||||||
|
@Override
|
||||||
|
public void onNewClusterState(ClusterState clusterState) {
|
||||||
|
listener.onResponse(getResponse(request, clusterState, concreteWaitFor, false));
|
||||||
}
|
}
|
||||||
if (request.waitForActiveShards() != -1 && response.getActiveShards() >= request.waitForActiveShards()) {
|
|
||||||
waitForCounter++;
|
@Override
|
||||||
|
public void onClusterServiceClose() {
|
||||||
|
listener.onFailure(new ElasticsearchIllegalStateException("ClusterService was close during health call"));
|
||||||
}
|
}
|
||||||
if (request.indices().length > 0) {
|
|
||||||
try {
|
@Override
|
||||||
clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), request.indices());
|
public void onTimeout(TimeValue timeout) {
|
||||||
waitForCounter++;
|
final ClusterState clusterState = clusterService.state();
|
||||||
} catch (IndexMissingException e) {
|
final ClusterHealthResponse response = getResponse(request, clusterState, concreteWaitFor, true);
|
||||||
response.status = ClusterHealthStatus.RED; // no indices, make sure its RED
|
|
||||||
// missing indices, wait a bit more...
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!request.waitForNodes().isEmpty()) {
|
|
||||||
if (request.waitForNodes().startsWith(">=")) {
|
|
||||||
int expected = Integer.parseInt(request.waitForNodes().substring(2));
|
|
||||||
if (response.getNumberOfNodes() >= expected) {
|
|
||||||
waitForCounter++;
|
|
||||||
}
|
|
||||||
} else if (request.waitForNodes().startsWith("ge(")) {
|
|
||||||
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
|
|
||||||
if (response.getNumberOfNodes() >= expected) {
|
|
||||||
waitForCounter++;
|
|
||||||
}
|
|
||||||
} else if (request.waitForNodes().startsWith("<=")) {
|
|
||||||
int expected = Integer.parseInt(request.waitForNodes().substring(2));
|
|
||||||
if (response.getNumberOfNodes() <= expected) {
|
|
||||||
waitForCounter++;
|
|
||||||
}
|
|
||||||
} else if (request.waitForNodes().startsWith("le(")) {
|
|
||||||
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
|
|
||||||
if (response.getNumberOfNodes() <= expected) {
|
|
||||||
waitForCounter++;
|
|
||||||
}
|
|
||||||
} else if (request.waitForNodes().startsWith(">")) {
|
|
||||||
int expected = Integer.parseInt(request.waitForNodes().substring(1));
|
|
||||||
if (response.getNumberOfNodes() > expected) {
|
|
||||||
waitForCounter++;
|
|
||||||
}
|
|
||||||
} else if (request.waitForNodes().startsWith("gt(")) {
|
|
||||||
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
|
|
||||||
if (response.getNumberOfNodes() > expected) {
|
|
||||||
waitForCounter++;
|
|
||||||
}
|
|
||||||
} else if (request.waitForNodes().startsWith("<")) {
|
|
||||||
int expected = Integer.parseInt(request.waitForNodes().substring(1));
|
|
||||||
if (response.getNumberOfNodes() < expected) {
|
|
||||||
waitForCounter++;
|
|
||||||
}
|
|
||||||
} else if (request.waitForNodes().startsWith("lt(")) {
|
|
||||||
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
|
|
||||||
if (response.getNumberOfNodes() < expected) {
|
|
||||||
waitForCounter++;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
int expected = Integer.parseInt(request.waitForNodes());
|
|
||||||
if (response.getNumberOfNodes() == expected) {
|
|
||||||
waitForCounter++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (waitForCounter == waitFor) {
|
|
||||||
listener.onResponse(response);
|
listener.onResponse(response);
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (System.currentTimeMillis() > endTime) {
|
|
||||||
response.timedOut = true;
|
|
||||||
listener.onResponse(response);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
if (state.status() == ClusterState.ClusterStateStatus.APPLIED && validateRequest(request, state, concreteWaitFor)) {
|
||||||
|
stateListener.onNewClusterState(state);
|
||||||
|
} else {
|
||||||
|
observer.waitForNextChange(stateListener, validationPredicate, request.timeout());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) {
|
||||||
|
ClusterHealthResponse response = clusterHealth(request, clusterState);
|
||||||
|
return prepareResponse(request, response, clusterState, waitFor);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor, boolean timedOut) {
|
||||||
|
ClusterHealthResponse response = clusterHealth(request, clusterState);
|
||||||
|
boolean valid = prepareResponse(request, response, clusterState, waitFor);
|
||||||
|
assert valid || timedOut;
|
||||||
|
response.timedOut = timedOut;
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean prepareResponse(final ClusterHealthRequest request, final ClusterHealthResponse response, ClusterState clusterState, final int waitFor) {
|
||||||
|
int waitForCounter = 0;
|
||||||
|
if (request.waitForStatus() != null && response.getStatus().value() <= request.waitForStatus().value()) {
|
||||||
|
waitForCounter++;
|
||||||
|
}
|
||||||
|
if (request.waitForRelocatingShards() != -1 && response.getRelocatingShards() <= request.waitForRelocatingShards()) {
|
||||||
|
waitForCounter++;
|
||||||
|
}
|
||||||
|
if (request.waitForActiveShards() != -1 && response.getActiveShards() >= request.waitForActiveShards()) {
|
||||||
|
waitForCounter++;
|
||||||
|
}
|
||||||
|
if (request.indices().length > 0) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(200);
|
clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), request.indices());
|
||||||
} catch (InterruptedException e) {
|
waitForCounter++;
|
||||||
response.timedOut = true;
|
} catch (IndexMissingException e) {
|
||||||
listener.onResponse(response);
|
response.status = ClusterHealthStatus.RED; // no indices, make sure its RED
|
||||||
return;
|
// missing indices, wait a bit more...
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!request.waitForNodes().isEmpty()) {
|
||||||
|
if (request.waitForNodes().startsWith(">=")) {
|
||||||
|
int expected = Integer.parseInt(request.waitForNodes().substring(2));
|
||||||
|
if (response.getNumberOfNodes() >= expected) {
|
||||||
|
waitForCounter++;
|
||||||
|
}
|
||||||
|
} else if (request.waitForNodes().startsWith("ge(")) {
|
||||||
|
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
|
||||||
|
if (response.getNumberOfNodes() >= expected) {
|
||||||
|
waitForCounter++;
|
||||||
|
}
|
||||||
|
} else if (request.waitForNodes().startsWith("<=")) {
|
||||||
|
int expected = Integer.parseInt(request.waitForNodes().substring(2));
|
||||||
|
if (response.getNumberOfNodes() <= expected) {
|
||||||
|
waitForCounter++;
|
||||||
|
}
|
||||||
|
} else if (request.waitForNodes().startsWith("le(")) {
|
||||||
|
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
|
||||||
|
if (response.getNumberOfNodes() <= expected) {
|
||||||
|
waitForCounter++;
|
||||||
|
}
|
||||||
|
} else if (request.waitForNodes().startsWith(">")) {
|
||||||
|
int expected = Integer.parseInt(request.waitForNodes().substring(1));
|
||||||
|
if (response.getNumberOfNodes() > expected) {
|
||||||
|
waitForCounter++;
|
||||||
|
}
|
||||||
|
} else if (request.waitForNodes().startsWith("gt(")) {
|
||||||
|
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
|
||||||
|
if (response.getNumberOfNodes() > expected) {
|
||||||
|
waitForCounter++;
|
||||||
|
}
|
||||||
|
} else if (request.waitForNodes().startsWith("<")) {
|
||||||
|
int expected = Integer.parseInt(request.waitForNodes().substring(1));
|
||||||
|
if (response.getNumberOfNodes() < expected) {
|
||||||
|
waitForCounter++;
|
||||||
|
}
|
||||||
|
} else if (request.waitForNodes().startsWith("lt(")) {
|
||||||
|
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
|
||||||
|
if (response.getNumberOfNodes() < expected) {
|
||||||
|
waitForCounter++;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
int expected = Integer.parseInt(request.waitForNodes());
|
||||||
|
if (response.getNumberOfNodes() == expected) {
|
||||||
|
waitForCounter++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return waitForCounter == waitFor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -92,7 +92,9 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
|
||||||
void remove(LocalNodeMasterListener listener);
|
void remove(LocalNodeMasterListener listener);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds a cluster state listener that will timeout after the provided timeout.
|
* Adds a cluster state listener that will timeout after the provided timeout,
|
||||||
|
* and is executed after the clusterstate has been successfully applied ie. is
|
||||||
|
* in state {@link org.elasticsearch.cluster.ClusterState.ClusterStateStatus#APPLIED}
|
||||||
*/
|
*/
|
||||||
void add(TimeValue timeout, TimeoutClusterStateListener listener);
|
void add(TimeValue timeout, TimeoutClusterStateListener listener);
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,6 @@ public class ClusterStateObserver {
|
||||||
return changedEvent.previousState().version() != changedEvent.state().version();
|
return changedEvent.previousState().version() != changedEvent.state().version();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private ClusterService clusterService;
|
private ClusterService clusterService;
|
||||||
volatile TimeValue timeOutValue;
|
volatile TimeValue timeOutValue;
|
||||||
|
|
||||||
|
@ -241,7 +240,7 @@ public class ClusterStateObserver {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface Listener {
|
public static interface Listener {
|
||||||
|
|
||||||
/** called when a new state is observed */
|
/** called when a new state is observed */
|
||||||
void onNewClusterState(ClusterState state);
|
void onNewClusterState(ClusterState state);
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.cluster.service;
|
package org.elasticsearch.cluster.service;
|
||||||
|
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
@ -76,9 +77,19 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
||||||
|
|
||||||
private volatile PrioritizedEsThreadPoolExecutor updateTasksExecutor;
|
private volatile PrioritizedEsThreadPoolExecutor updateTasksExecutor;
|
||||||
|
|
||||||
private final List<ClusterStateListener> priorityClusterStateListeners = new CopyOnWriteArrayList<>();
|
/**
|
||||||
private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
|
* Those 3 state listeners are changing infrequently - CopyOnWriteArrayList is just fine
|
||||||
private final List<ClusterStateListener> lastClusterStateListeners = new CopyOnWriteArrayList<>();
|
*/
|
||||||
|
private final Collection<ClusterStateListener> priorityClusterStateListeners = new CopyOnWriteArrayList<>();
|
||||||
|
private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
|
||||||
|
private final Collection<ClusterStateListener> lastClusterStateListeners = new CopyOnWriteArrayList<>();
|
||||||
|
// TODO this is rather frequently changing I guess a Synced Set would be better here and a dedicated remove API
|
||||||
|
private final Collection<ClusterStateListener> postAppliedListeners = new CopyOnWriteArrayList<>();
|
||||||
|
private final Iterable<ClusterStateListener> preAppliedListeners = Iterables.concat(
|
||||||
|
priorityClusterStateListeners,
|
||||||
|
clusterStateListeners,
|
||||||
|
lastClusterStateListeners);
|
||||||
|
|
||||||
private final LocalNodeMasterListeners localNodeMasterListeners;
|
private final LocalNodeMasterListeners localNodeMasterListeners;
|
||||||
|
|
||||||
private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();
|
private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();
|
||||||
|
@ -197,6 +208,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
||||||
clusterStateListeners.remove(listener);
|
clusterStateListeners.remove(listener);
|
||||||
priorityClusterStateListeners.remove(listener);
|
priorityClusterStateListeners.remove(listener);
|
||||||
lastClusterStateListeners.remove(listener);
|
lastClusterStateListeners.remove(listener);
|
||||||
|
postAppliedListeners.remove(listener);
|
||||||
for (Iterator<NotifyTimeout> it = onGoingTimeouts.iterator(); it.hasNext(); ) {
|
for (Iterator<NotifyTimeout> it = onGoingTimeouts.iterator(); it.hasNext(); ) {
|
||||||
NotifyTimeout timeout = it.next();
|
NotifyTimeout timeout = it.next();
|
||||||
if (timeout.listener.equals(listener)) {
|
if (timeout.listener.equals(listener)) {
|
||||||
|
@ -229,7 +241,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
||||||
NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
|
NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
|
||||||
notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.GENERIC, notifyTimeout);
|
notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.GENERIC, notifyTimeout);
|
||||||
onGoingTimeouts.add(notifyTimeout);
|
onGoingTimeouts.add(notifyTimeout);
|
||||||
clusterStateListeners.add(listener);
|
postAppliedListeners.add(listener);
|
||||||
listener.postAdded();
|
listener.postAdded();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -426,15 +438,12 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
||||||
// update the current cluster state
|
// update the current cluster state
|
||||||
clusterState = newClusterState;
|
clusterState = newClusterState;
|
||||||
logger.debug("set local cluster state to version {}", newClusterState.version());
|
logger.debug("set local cluster state to version {}", newClusterState.version());
|
||||||
|
for (ClusterStateListener listener : preAppliedListeners) {
|
||||||
for (ClusterStateListener listener : priorityClusterStateListeners) {
|
try {
|
||||||
listener.clusterChanged(clusterChangedEvent);
|
listener.clusterChanged(clusterChangedEvent);
|
||||||
}
|
} catch (Exception ex) {
|
||||||
for (ClusterStateListener listener : clusterStateListeners) {
|
logger.warn("failed to notify ClusterStateListener", ex);
|
||||||
listener.clusterChanged(clusterChangedEvent);
|
}
|
||||||
}
|
|
||||||
for (ClusterStateListener listener : lastClusterStateListeners) {
|
|
||||||
listener.clusterChanged(clusterChangedEvent);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (DiscoveryNode node : nodesDelta.removedNodes()) {
|
for (DiscoveryNode node : nodesDelta.removedNodes()) {
|
||||||
|
@ -447,6 +456,14 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
||||||
|
|
||||||
newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);
|
newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);
|
||||||
|
|
||||||
|
for (ClusterStateListener listener : postAppliedListeners) {
|
||||||
|
try {
|
||||||
|
listener.clusterChanged(clusterChangedEvent);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
logger.warn("failed to notify ClusterStateListener", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//manual ack only from the master at the end of the publish
|
//manual ack only from the master at the end of the publish
|
||||||
if (newClusterState.nodes().localNodeMaster()) {
|
if (newClusterState.nodes().localNodeMaster()) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -25,7 +25,6 @@ import com.google.common.base.Predicate;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import org.apache.lucene.index.IndexFileNames;
|
import org.apache.lucene.index.IndexFileNames;
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
||||||
|
@ -341,7 +340,7 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
|
||||||
indexRandom(true, true, builders2);
|
indexRandom(true, true, builders2);
|
||||||
|
|
||||||
// verify cluster was finished.
|
// verify cluster was finished.
|
||||||
assertFalse(client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).setTimeout("30s").get().isTimedOut());
|
assertFalse(client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).setWaitForEvents(Priority.LANGUID).setTimeout("30s").get().isTimedOut());
|
||||||
logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode);
|
logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode);
|
||||||
|
|
||||||
logger.debug("--> verifying all searches return the same number of docs");
|
logger.debug("--> verifying all searches return the same number of docs");
|
||||||
|
|
Loading…
Reference in New Issue