add retry if master is not connected till master is connected or a timeout has passed to master based operations (like cluster health)

This commit is contained in:
kimchy 2010-07-20 18:32:21 +03:00
parent 45c821316b
commit 45e54c1705
2 changed files with 50 additions and 4 deletions

View File

@ -24,10 +24,13 @@ import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -70,6 +73,10 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
}
@Override protected void doExecute(final Request request, final ActionListener<Response> listener) {
innerExecute(request, listener, false);
}
private void innerExecute(final Request request, final ActionListener<Response> listener, final boolean retrying) {
final ClusterState clusterState = clusterService.state();
DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster()) {
@ -98,8 +105,43 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
listener.onResponse(response);
}
@Override public void handleException(RemoteTransportException exp) {
listener.onFailure(exp);
@Override public void handleException(final RemoteTransportException exp) {
if (retrying) {
listener.onFailure(exp);
} else {
if (exp.unwrapCause() instanceof ConnectTransportException) {
// we want to retry here a bit to see if a new master is elected
clusterService.add(TimeValue.timeValueSeconds(30), new TimeoutClusterStateListener() {
@Override public void postAdded() {
ClusterState clusterStateV2 = clusterService.state();
if (!clusterState.nodes().masterNodeId().equals(clusterStateV2.nodes().masterNodeId())) {
// master changes while adding the listener, try here
clusterService.remove(this);
innerExecute(request, listener, true);
}
}
@Override public void onClose() {
clusterService.remove(this);
listener.onFailure(new ElasticSearchIllegalStateException("node is shutting down"));
}
@Override public void onTimeout(TimeValue timeout) {
clusterService.remove(this);
listener.onFailure(exp);
}
@Override public void clusterChanged(ClusterChangedEvent event) {
if (event.nodesDelta().masterNodeChanged()) {
clusterService.remove(this);
innerExecute(request, listener, true);
}
}
});
} else {
listener.onFailure(exp);
}
}
}
});
}

View File

@ -73,10 +73,12 @@ public class FullRollingRestartTests extends AbstractNodesTests {
// now start shutting nodes down
closeNode("node1");
closeNode("node2");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
closeNode("node2");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
client("node5").admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {
@ -84,6 +86,8 @@ public class FullRollingRestartTests extends AbstractNodesTests {
}
closeNode("node3");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
closeNode("node4");
// make sure the cluster state is green, and all has been recovered