NodesFD: simplify concurrency control to fully rely on a single map

The node fault detection class is used by the master node to ping the nodes in the cluster and verify they are alive. This PR simplifies the concurrency controls in the class + adds a test for a scenario that surfaced the problem.

Closes #7889
This commit is contained in:
Boaz Leskes 2014-09-24 09:20:56 +02:00
parent db54e9c2d5
commit 36c3e896de
4 changed files with 85 additions and 84 deletions

View File

@ -315,7 +315,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
if (!clusterState.getNodes().localNodeMaster()) {
throw new ElasticsearchIllegalStateException("Shouldn't publish state when not master");
}
nodesFD.updateNodes(clusterState);
nodesFD.updateNodesAndPing(clusterState);
publishClusterState.publish(clusterState, ackListener);
}
@ -377,7 +377,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
if (newState.nodes().localNodeMaster()) {
// we only starts nodesFD if we are master (it may be that we received a cluster state while pinging)
joinThreadControl.markThreadAsDone(currentThread);
nodesFD.start(newState); // start the nodes FD
nodesFD.updateNodesAndPing(newState); // start the nodes FD
} else {
// if we're not a master it means another node published a cluster state while we were pinging
// make sure we go through another pinging round and actively join it
@ -636,7 +636,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
masterFD.stop("got elected as new master since master left (reason = " + reason + ")");
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).masterNodeId(localNode.id()).build();
ClusterState newState = ClusterState.builder(currentState).nodes(discoveryNodes).build();
nodesFD.start(newState);
nodesFD.updateNodesAndPing(newState);
return newState;
} else {

View File

@ -24,7 +24,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
@ -36,7 +35,6 @@ import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.elasticsearch.cluster.node.DiscoveryNodes.EMPTY_NODES;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.transport.TransportRequestOptions.options;
@ -59,13 +57,9 @@ public class NodesFaultDetection extends FaultDetection {
private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();
private volatile DiscoveryNodes latestNodes = EMPTY_NODES;
private volatile DiscoveryNode localNode;
private volatile long clusterStateVersion = ClusterState.UNKNOWN_VERSION;
private volatile boolean running = false;
private volatile DiscoveryNode localNode;
public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
super(settings, threadPool, transportService, clusterName);
@ -87,41 +81,37 @@ public class NodesFaultDetection extends FaultDetection {
listeners.remove(listener);
}
public void updateNodes(ClusterState clusterState) {
if (!running) {
return;
/**
* make sure that nodes in clusterState are pinged. Any pinging to nodes which are not
* part of the cluster will be stopped
*/
public void updateNodesAndPing(ClusterState clusterState) {
// remove any nodes we don't need, this will cause their FD to stop
for (DiscoveryNode monitoredNode : nodesFD.keySet()) {
if (!clusterState.nodes().nodeExists(monitoredNode.id())) {
nodesFD.remove(monitoredNode);
}
DiscoveryNodes prevNodes = latestNodes;
this.latestNodes = clusterState.nodes();
this.clusterStateVersion = clusterState.version();
DiscoveryNodes.Delta delta = this.latestNodes.delta(prevNodes);
for (DiscoveryNode newNode : delta.addedNodes()) {
if (newNode.id().equals(this.localNode.id())) {
}
// add any missing nodes
for (DiscoveryNode node : clusterState.nodes()) {
if (node.equals(localNode)) {
// no need to monitor the local node
continue;
}
if (!nodesFD.containsKey(newNode)) {
nodesFD.put(newNode, new NodeFD());
if (!nodesFD.containsKey(node)) {
NodeFD fd = new NodeFD(node);
// it's OK to overwrite an existing nodeFD - it will just stop and the new one will pick things up.
nodesFD.put(node, fd);
// we use schedule with a 0 time value to run the pinger on the pool as it will run on later
threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, new SendPingRequest(newNode));
threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, fd);
}
}
for (DiscoveryNode removedNode : delta.removedNodes()) {
nodesFD.remove(removedNode);
}
}
public NodesFaultDetection start(ClusterState clusterState) {
running = true;
updateNodes(clusterState);
return this;
}
/** stops all pinging **/
public NodesFaultDetection stop() {
if (!running) {
return this;
}
running = false;
nodesFD.clear();
return this;
}
@ -133,25 +123,21 @@ public class NodesFaultDetection extends FaultDetection {
@Override
protected void handleTransportDisconnect(DiscoveryNode node) {
if (!latestNodes.nodeExists(node.id())) {
return;
}
NodeFD nodeFD = nodesFD.remove(node);
if (nodeFD == null) {
return;
}
if (!running) {
return;
}
nodeFD.running = false;
if (connectOnNetworkDisconnect) {
NodeFD fd = new NodeFD(node);
try {
transportService.connectToNode(node);
nodesFD.put(node, new NodeFD());
nodesFD.put(node, fd);
// we use schedule with a 0 time value to run the pinger on the pool as it will run on later
threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, new SendPingRequest(node));
threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, fd);
} catch (Exception e) {
logger.trace("[node ] [{}] transport disconnected (with verified connect)", node);
// clean up if needed, just to be safe..
nodesFD.remove(node, fd);
notifyNodeFailure(node, "transport disconnected (with verified connect)");
}
} else {
@ -184,17 +170,23 @@ public class NodesFaultDetection extends FaultDetection {
});
}
private class SendPingRequest implements Runnable {
private class NodeFD implements Runnable {
volatile int retryCount;
private final DiscoveryNode node;
private SendPingRequest(DiscoveryNode node) {
private NodeFD(DiscoveryNode node) {
this.node = node;
}
private boolean running() {
return NodeFD.this.equals(nodesFD.get(node));
}
@Override
public void run() {
if (!running) {
if (!running()) {
return;
}
final PingRequest pingRequest = new PingRequest(node.id(), clusterName, localNode, clusterStateVersion);
@ -207,28 +199,16 @@ public class NodesFaultDetection extends FaultDetection {
@Override
public void handleResponse(PingResponse response) {
if (!running) {
if (!running()) {
return;
}
NodeFD nodeFD = nodesFD.get(node);
if (nodeFD != null) {
if (!nodeFD.running) {
return;
}
nodeFD.retryCount = 0;
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, SendPingRequest.this);
}
retryCount = 0;
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, NodeFD.this);
}
@Override
public void handleException(TransportException exp) {
// check if the master node did not get switched on us...
if (!running) {
return;
}
NodeFD nodeFD = nodesFD.get(node);
if (nodeFD != null) {
if (!nodeFD.running) {
if (!running()) {
return;
}
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
@ -236,12 +216,12 @@ public class NodesFaultDetection extends FaultDetection {
return;
}
int retryCount = ++nodeFD.retryCount;
retryCount++;
logger.trace("[node ] failed to ping [{}], retry [{}] out of [{}]", exp, node, retryCount, pingRetryCount);
if (retryCount >= pingRetryCount) {
logger.debug("[node ] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", node, pingRetryCount, pingRetryTimeout);
// not good, failure
if (nodesFD.remove(node) != null) {
if (nodesFD.remove(node, NodeFD.this)) {
notifyNodeFailure(node, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
}
} else {
@ -249,7 +229,6 @@ public class NodesFaultDetection extends FaultDetection {
transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, this);
}
}
}
@Override
public String executor() {
@ -260,11 +239,6 @@ public class NodesFaultDetection extends FaultDetection {
}
}
static class NodeFD {
volatile int retryCount;
volatile boolean running = true;
}
class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> {
@Override

View File

@ -220,6 +220,33 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
assertMaster(masterNode, nodes);
}
/** Verify that nodes fault detection works after master (re) election */
@Test
@TestLogging(value = "cluster.service:TRACE,indices.recovery:TRACE")
public void testNodesFDAfterMasterReelection() throws Exception {
startCluster(3);
logger.info("stopping current master");
internalCluster().stopCurrentMasterNode();
ensureStableCluster(2);
String master = internalCluster().getMasterName();
String nonMaster = null;
for (String node : internalCluster().getNodeNames()) {
if (!node.equals(master)) {
nonMaster = node;
}
}
logger.info("--> isolating [{}]", nonMaster);
addRandomIsolation(nonMaster).startDisrupting();
logger.info("--> waiting for master to remove it");
ensureStableCluster(1, master);
}
/**
* Verify that the proper block is applied when nodes loose their master
*/

View File

@ -138,7 +138,7 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase {
ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(buildNodesForA(true)).build();
NodesFaultDetection nodesFD = new NodesFaultDetection(settings.build(), threadPool, serviceA, clusterState.getClusterName());
nodesFD.setLocalNode(clusterState.nodes().localNode());
nodesFD.start(clusterState);
nodesFD.updateNodesAndPing(clusterState);
final String[] failureReason = new String[1];
final DiscoveryNode[] failureNode = new DiscoveryNode[1];
final CountDownLatch notified = new CountDownLatch(1);