MasterFaultDetection can start after the initial cluster state has been processed and the NodeConnectionService connect to the new master (#23037)
After the first cluster state from a new master is processed, NodeConnectionService guarantees we connect to the new master. This removes the need to explicitly connect to the master in the MasterFaultDetection code making it simpler and bypasses the assertion triggered due to the blocking operation on the cluster state thread. Relates to #22828
This commit is contained in:
parent
8e4b89cdbe
commit
0161edae10
|
@ -111,28 +111,10 @@ public class MasterFaultDetection extends FaultDetection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start(final DiscoveryNode masterNode, String reason) {
|
|
||||||
synchronized (masterNodeMutex) {
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("[master] starting fault detection against master [{}], reason [{}]", masterNode, reason);
|
|
||||||
}
|
|
||||||
innerStart(masterNode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void innerStart(final DiscoveryNode masterNode) {
|
private void innerStart(final DiscoveryNode masterNode) {
|
||||||
this.masterNode = masterNode;
|
this.masterNode = masterNode;
|
||||||
this.retryCount = 0;
|
this.retryCount = 0;
|
||||||
this.notifiedMasterFailure.set(false);
|
this.notifiedMasterFailure.set(false);
|
||||||
|
|
||||||
// try and connect to make sure we are connected
|
|
||||||
try {
|
|
||||||
transportService.connectToNode(masterNode);
|
|
||||||
} catch (final Exception e) {
|
|
||||||
// notify master failure (which stops also) and bail..
|
|
||||||
notifyMasterFailure(masterNode, e, "failed to perform initial connect ");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (masterPinger != null) {
|
if (masterPinger != null) {
|
||||||
masterPinger.stop();
|
masterPinger.stop();
|
||||||
}
|
}
|
||||||
|
|
|
@ -727,12 +727,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
||||||
if (shouldIgnoreOrRejectNewClusterState(logger, currentState, newClusterState)) {
|
if (shouldIgnoreOrRejectNewClusterState(logger, currentState, newClusterState)) {
|
||||||
return unchanged();
|
return unchanged();
|
||||||
}
|
}
|
||||||
|
|
||||||
// check to see that we monitor the correct master of the cluster
|
|
||||||
if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().getMasterNode())) {
|
|
||||||
masterFD.restart(newClusterState.nodes().getMasterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) {
|
if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) {
|
||||||
// its a fresh update from the master as we transition from a start of not having a master to having one
|
// its a fresh update from the master as we transition from a start of not having a master to having one
|
||||||
logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId());
|
logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId());
|
||||||
|
@ -786,6 +780,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
||||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
try {
|
try {
|
||||||
if (newClusterState != null) {
|
if (newClusterState != null) {
|
||||||
|
// check to see that we monitor the correct master of the cluster
|
||||||
|
if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().getMasterNode())) {
|
||||||
|
masterFD.restart(newClusterState.nodes().getMasterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
|
||||||
|
}
|
||||||
publishClusterState.pendingStatesQueue().markAsProcessed(newClusterState);
|
publishClusterState.pendingStatesQueue().markAsProcessed(newClusterState);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -244,7 +244,7 @@ public class ZenFaultDetectionTests extends ESTestCase {
|
||||||
setState(clusterServiceA, state);
|
setState(clusterServiceA, state);
|
||||||
MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA,
|
MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA,
|
||||||
clusterServiceA);
|
clusterServiceA);
|
||||||
masterFD.start(nodeB, "test");
|
masterFD.restart(nodeB, "test");
|
||||||
|
|
||||||
final String[] failureReason = new String[1];
|
final String[] failureReason = new String[1];
|
||||||
final DiscoveryNode[] failureNode = new DiscoveryNode[1];
|
final DiscoveryNode[] failureNode = new DiscoveryNode[1];
|
||||||
|
@ -290,14 +290,14 @@ public class ZenFaultDetectionTests extends ESTestCase {
|
||||||
|
|
||||||
MasterFaultDetection masterFDNodeA = new MasterFaultDetection(Settings.builder().put(settingsA).put(settings).build(),
|
MasterFaultDetection masterFDNodeA = new MasterFaultDetection(Settings.builder().put(settingsA).put(settings).build(),
|
||||||
threadPool, serviceA, clusterServiceA);
|
threadPool, serviceA, clusterServiceA);
|
||||||
masterFDNodeA.start(nodeB, "test");
|
masterFDNodeA.restart(nodeB, "test");
|
||||||
|
|
||||||
final ClusterState stateNodeB = ClusterState.builder(clusterName).nodes(buildNodesForB(true)).build();
|
final ClusterState stateNodeB = ClusterState.builder(clusterName).nodes(buildNodesForB(true)).build();
|
||||||
setState(clusterServiceB, stateNodeB);
|
setState(clusterServiceB, stateNodeB);
|
||||||
|
|
||||||
MasterFaultDetection masterFDNodeB = new MasterFaultDetection(Settings.builder().put(settingsB).put(settings).build(),
|
MasterFaultDetection masterFDNodeB = new MasterFaultDetection(Settings.builder().put(settingsB).put(settings).build(),
|
||||||
threadPool, serviceB, clusterServiceB);
|
threadPool, serviceB, clusterServiceB);
|
||||||
masterFDNodeB.start(nodeB, "test");
|
masterFDNodeB.restart(nodeB, "test");
|
||||||
|
|
||||||
// let's do a few pings
|
// let's do a few pings
|
||||||
pingProbeA.awaitMinCompletedPings();
|
pingProbeA.awaitMinCompletedPings();
|
||||||
|
|
Loading…
Reference in New Issue