reject older cluster state from the same master

This commit is contained in:
Boaz Leskes 2015-08-25 11:45:19 +02:00
parent a56d67d8d7
commit 91dee8b311
2 changed files with 49 additions and 23 deletions

View File

@ -45,10 +45,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -329,7 +326,7 @@ public class PublishClusterStateAction extends AbstractComponent {
throw new IncompatibleClusterStateVersionException("have no local cluster state");
}
// sanity check incoming state
validateIncomingState(incomingState);
validateIncomingState(incomingState, lastSeenClusterState);
lastSeenClusterState = incomingState;
lastSeenClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
@ -338,25 +335,32 @@ public class PublishClusterStateAction extends AbstractComponent {
}
// package private for testing
/**
* does simple sanity check of the incoming cluster state. Throws an exception on rejections.
*/
void validateIncomingState(ClusterState state) {
final ClusterName incomingClusterName = state.getClusterName();
void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClusterState) {
final ClusterName incomingClusterName = incomingState.getClusterName();
if (!incomingClusterName.equals(PublishClusterStateAction.this.clusterName)) {
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", state.nodes().masterNode(), incomingClusterName);
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().masterNode(), incomingClusterName);
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
final DiscoveryNodes currentNodes = nodesProvider.nodes();
if (currentNodes.localNode().equals(state.nodes().localNode()) == false) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", state.nodes().masterNode());
if (currentNodes.localNode().equals(incomingState.nodes().localNode()) == false) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().masterNode());
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
// state from another master requires more subtle checks, so we let it pass for now (it will be checked in ZenDiscovery)
if (currentNodes.localNodeMaster() == false) {
ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, state);
ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState);
}
if (lastSeenClusterState != null
&& Objects.equals(lastSeenClusterState.nodes().masterNodeId(), incomingState.nodes().masterNodeId())
&& lastSeenClusterState.version() > incomingState.version()) {
logger.debug("received an older cluster state from master, rejecting (received version [{}], last version is [{}])",
incomingState.version(), lastSeenClusterState.version());
throw new IllegalStateException("cluster state version [" + incomingState.version() + "] is old (last seen version [" + lastSeenClusterState.version() + "])");
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.discovery.zen.publish;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
@ -564,39 +563,41 @@ public class PublishClusterStateActionTests extends ESTestCase {
}
}
public void testIncomingClusterStateVerification() throws Exception {
public void testIncomingClusterStateValidation() throws Exception {
MockNode node = createMockNode("node");
logger.info("--> testing acceptances of any master when having no master");
ClusterState state = ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId(randomAsciiOfLength(10))).build();
node.action.validateIncomingState(state);
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId(randomAsciiOfLength(10))).incrementVersion().build();
node.action.validateIncomingState(state, null);
// now set a master node
node.clusterState = ClusterState.builder(node.clusterState).nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build();
logger.info("--> testing rejection of another master");
try {
node.action.validateIncomingState(state);
node.action.validateIncomingState(state, node.clusterState);
fail("node accepted state from another master");
} catch (IllegalStateException OK) {
}
logger.info("--> test state from the current master is accepted");
node.action.validateIncomingState(ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build());
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build(), node.clusterState);
logger.info("--> testing rejection of another cluster name");
try {
node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAsciiOfLength(10))).nodes(node.nodes()).build());
node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAsciiOfLength(10))).nodes(node.nodes()).build(), node.clusterState);
fail("node accepted state with another cluster name");
} catch (IllegalStateException OK) {
}
logger.info("--> testing rejection of a cluster state with wrong local node");
try {
state = ClusterState.builder(node.clusterState).nodes(DiscoveryNodes.builder(node.nodes()).localNodeId("_non_existing_").build()).build();
node.action.validateIncomingState(state);
state = ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.nodes()).localNodeId("_non_existing_").build())
.incrementVersion().build();
node.action.validateIncomingState(state, node.clusterState);
fail("node accepted state with non-existence local node");
} catch (IllegalStateException OK) {
}
@ -605,11 +606,32 @@ public class PublishClusterStateActionTests extends ESTestCase {
MockNode otherNode = createMockNode("otherNode");
state = ClusterState.builder(node.clusterState).nodes(
DiscoveryNodes.builder(node.nodes()).put(otherNode.discoveryNode).localNodeId(otherNode.discoveryNode.id()).build()
).build();
node.action.validateIncomingState(state);
).incrementVersion().build();
node.action.validateIncomingState(state, node.clusterState);
fail("node accepted state with existent but wrong local node");
} catch (IllegalStateException OK) {
}
logger.info("--> testing rejection of an old cluster state");
state = node.clusterState;
node.clusterState = ClusterState.builder(node.clusterState).incrementVersion().build();
try {
node.action.validateIncomingState(state, node.clusterState);
fail("node accepted state with an older version");
} catch (IllegalStateException OK) {
}
// an older version from a *new* master is OK!
ClusterState previousState = ClusterState.builder(node.clusterState).incrementVersion().build();
state = ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.clusterState.nodes()).masterNodeId("_new_master_").build())
.build();
// remove the master of the node (but still have a previous cluster state with it)!
node.clusterState = ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.clusterState.nodes()).masterNodeId(null).build())
.build();
node.action.validateIncomingState(state, previousState);
}
public void testInterleavedPublishCommit() throws Throwable {