Refactor old state version check to ZenDiscovery

This commit is contained in:
Jason Tedor 2016-04-05 16:02:19 -04:00
parent c2ed5a1c9e
commit 66cc2029cd
4 changed files with 32 additions and 25 deletions

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
@ -773,15 +774,24 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
* If the first condition fails we reject the cluster state and throw an error.
* If the second condition fails we ignore the cluster state.
*/
static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
@SuppressForbidden(reason = "debug")
public static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
validateStateIsFromCurrentMaster(logger, currentState.nodes(), newClusterState);
if (currentState.supersedes(newClusterState)) {
// reject cluster states that are not new from the same master
if (currentState.supersedes(newClusterState) ||
(newClusterState.nodes().getMasterNodeId().equals(currentState.nodes().getMasterNodeId()) && currentState.version() == newClusterState.version())) {
// if the new state has a smaller version, and it has the same master node, then no need to process it
logger.debug("received a cluster state that is not newer than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
return true;
}
// reject older cluster states if we are following a master
if (currentState.nodes().getMasterNodeId() != null && newClusterState.version() < currentState.version()) {
logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
return true;
} else {
return false;
}
return false;
}
/**

View File

@ -400,14 +400,25 @@ public class PublishClusterStateAction extends AbstractComponent {
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().getMasterNode(), incomingClusterName);
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
final DiscoveryNodes currentNodes = clusterStateSupplier.get().nodes();
final ClusterState clusterState = clusterStateSupplier.get();
if (currentNodes.getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
if (clusterState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().getMasterNode());
throw new IllegalStateException("received state with a local node that does not match the current local node");
}
ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState);
if (ZenDiscovery.shouldIgnoreOrRejectNewClusterState(logger, clusterState, incomingState)) {
String message = String.format(
Locale.ROOT,
"rejecting cluster state version [%d] uuid [%s] received from [%s]",
incomingState.version(),
incomingState.stateUUID(),
incomingState.nodes().getMasterNodeId()
);
logger.warn(message);
throw new IllegalStateException(message);
}
if (lastSeenClusterState != null && lastSeenClusterState.supersedes(incomingState)) {
final String message = String.format(
Locale.ROOT,
@ -422,20 +433,6 @@ public class PublishClusterStateAction extends AbstractComponent {
throw new IllegalStateException(message);
}
final ClusterState state = clusterStateSupplier.get();
if (state.nodes().getMasterNodeId() != null && incomingState.version() <= state.version()) {
assert !incomingState.stateUUID().equals(state.stateUUID());
final String message = String.format(
Locale.ROOT,
"received cluster state older than current cluster state; " +
"received version [%d] with uuid [%s], current version [%d]",
incomingState.version(),
incomingState.stateUUID(),
state.version()
);
logger.warn(message);
throw new IllegalStateException(message);
}
}
protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.test.ESTestCase;
@ -64,7 +65,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
assertTrue("should ignore, because new state's version is lower to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
currentState.version(1);
newState.version(1);
assertFalse("should not ignore, because new state's version is equal to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
assertTrue("should ignore, because new state's version is equal to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
currentState.version(1);
newState.version(2);
assertFalse("should not ignore, because new state's version is higher to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));

View File

@ -656,11 +656,10 @@ public class PublishClusterStateActionTests extends ESTestCase {
expectThrows(IllegalStateException.class, () -> node.action.validateIncomingState(incomingState, node.clusterState));
final String message = String.format(
Locale.ROOT,
"received cluster state from current master superseded by last seen cluster state; received version [%d] with uuid [%s], last seen version [%d] with uuid [%s]",
"rejecting cluster state version [%d] uuid [%s] received from [%s]",
incomingState.version(),
incomingState.stateUUID(),
node.clusterState.version(),
node.clusterState.stateUUID()
incomingState.nodes().getMasterNodeId()
);
assertThat(e, hasToString("java.lang.IllegalStateException: " + message));