always retry on connect exception with master operations

This commit is contained in:
kimchy 2010-07-23 01:59:56 +03:00
parent 9283e2a7ad
commit 72682a6730
2 changed files with 33 additions and 34 deletions

View File

@ -138,41 +138,37 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
}
@Override public void handleException(final RemoteTransportException exp) {
if (retrying) {
listener.onFailure(exp);
if (exp.unwrapCause() instanceof ConnectTransportException) {
// we want to retry here a bit to see if a new master is elected
clusterService.add(request.masterNodeTimeout(), new TimeoutClusterStateListener() {
@Override public void postAdded() {
ClusterState clusterStateV2 = clusterService.state();
if (!clusterState.nodes().masterNodeId().equals(clusterStateV2.nodes().masterNodeId())) {
// master changes while adding the listener, try here
clusterService.remove(this);
innerExecute(request, listener, false);
}
}
@Override public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(nodes.localNode()));
}
@Override public void onTimeout(TimeValue timeout) {
clusterService.remove(this);
listener.onFailure(exp);
}
@Override public void clusterChanged(ClusterChangedEvent event) {
if (event.nodesDelta().masterNodeChanged()) {
clusterService.remove(this);
innerExecute(request, listener, false);
}
}
});
} else {
if (exp.unwrapCause() instanceof ConnectTransportException) {
// we want to retry here a bit to see if a new master is elected
clusterService.add(request.masterNodeTimeout(), new TimeoutClusterStateListener() {
@Override public void postAdded() {
ClusterState clusterStateV2 = clusterService.state();
if (!clusterState.nodes().masterNodeId().equals(clusterStateV2.nodes().masterNodeId())) {
// master changes while adding the listener, try here
clusterService.remove(this);
innerExecute(request, listener, true);
}
}
@Override public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(nodes.localNode()));
}
@Override public void onTimeout(TimeValue timeout) {
clusterService.remove(this);
listener.onFailure(exp);
}
@Override public void clusterChanged(ClusterChangedEvent event) {
if (event.nodesDelta().masterNodeChanged()) {
clusterService.remove(this);
innerExecute(request, listener, true);
}
}
});
} else {
listener.onFailure(exp);
}
listener.onFailure(exp);
}
}
});

View File

@ -111,6 +111,7 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
logger.info("Starting server3");
// start another server
startNode("server3", settings);
Thread.sleep(200);
ClusterService clusterService3 = ((InternalNode) node("server3")).injector().getInstance(ClusterService.class);
@ -214,6 +215,7 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
// start another server
logger.info("Starting server2");
startNode("server2", settings);
Thread.sleep(200);
logger.info("Running Cluster Health");
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet();
@ -240,6 +242,7 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
// start another server
logger.info("Starting server3");
startNode("server3");
Thread.sleep(200);
ClusterService clusterService3 = ((InternalNode) node("server3")).injector().getInstance(ClusterService.class);