From 66cc2029cd4016afb6ac45587c5980d11431dd77 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 5 Apr 2016 16:02:19 -0400 Subject: [PATCH] Refactor old state version check to ZenDiscovery --- .../discovery/zen/ZenDiscovery.java | 18 ++++++++--- .../publish/PublishClusterStateAction.java | 31 +++++++++---------- .../discovery/zen/ZenDiscoveryUnitTests.java | 3 +- .../PublishClusterStateActionTests.java | 5 ++- 4 files changed, 32 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 78dc6897843..388e92c8764 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; @@ -773,15 +774,24 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen * If the first condition fails we reject the cluster state and throw an error. * If the second condition fails we ignore the cluster state. */ - static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) { + @SuppressForbidden(reason = "debug") + public static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) { validateStateIsFromCurrentMaster(logger, currentState.nodes(), newClusterState); - if (currentState.supersedes(newClusterState)) { + + // reject cluster states that are not new from the same master + if (currentState.supersedes(newClusterState) || + (newClusterState.nodes().getMasterNodeId().equals(currentState.nodes().getMasterNodeId()) && currentState.version() == newClusterState.version())) { // if the new state has a smaller version, and it has the same master node, then no need to process it + logger.debug("received a cluster state that is not newer than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version()); + return true; + } + + // reject older cluster states if we are following a master + if (currentState.nodes().getMasterNodeId() != null && newClusterState.version() < currentState.version()) { logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version()); return true; - } else { - return false; } + return false; } /** diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index f7e0060c769..8f786d2daa3 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -400,14 +400,25 @@ public class PublishClusterStateAction extends AbstractComponent { logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().getMasterNode(), incomingClusterName); throw new IllegalStateException("received state from a node that is not part of the cluster"); } - final DiscoveryNodes currentNodes = clusterStateSupplier.get().nodes(); + final ClusterState clusterState = clusterStateSupplier.get(); - if (currentNodes.getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) { + if (clusterState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) { logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().getMasterNode()); throw new IllegalStateException("received state with a local node that does not match the current local node"); } - ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState); + if (ZenDiscovery.shouldIgnoreOrRejectNewClusterState(logger, clusterState, incomingState)) { + String message = String.format( + Locale.ROOT, + "rejecting cluster state version [%d] uuid [%s] received from [%s]", + incomingState.version(), + incomingState.stateUUID(), + incomingState.nodes().getMasterNodeId() + ); + logger.warn(message); + throw new IllegalStateException(message); + } + if (lastSeenClusterState != null && lastSeenClusterState.supersedes(incomingState)) { final String message = String.format( Locale.ROOT, @@ -422,20 +433,6 @@ public class PublishClusterStateAction extends AbstractComponent { throw new IllegalStateException(message); } - final ClusterState state = clusterStateSupplier.get(); - if (state.nodes().getMasterNodeId() != null && incomingState.version() <= state.version()) { - assert !incomingState.stateUUID().equals(state.stateUUID()); - final String message = String.format( - Locale.ROOT, - "received cluster state older than current cluster state; " + - "received version [%d] with uuid [%s], current version [%d]", - incomingState.version(), - incomingState.stateUUID(), - state.version() - ); - logger.warn(message); - throw new IllegalStateException(message); - } } protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index dec3a2ae3ed..a6638eb19cf 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.test.ESTestCase; @@ -64,7 +65,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase { assertTrue("should ignore, because new state's version is lower to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); currentState.version(1); newState.version(1); - assertFalse("should not ignore, because new state's version is equal to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); + assertTrue("should ignore, because new state's version is equal to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); currentState.version(1); newState.version(2); assertFalse("should not ignore, because new state's version is higher to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index 084ddcf68dd..ceb125deda2 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -656,11 +656,10 @@ public class PublishClusterStateActionTests extends ESTestCase { expectThrows(IllegalStateException.class, () -> node.action.validateIncomingState(incomingState, node.clusterState)); final String message = String.format( Locale.ROOT, - "received cluster state from current master superseded by last seen cluster state; received version [%d] with uuid [%s], last seen version [%d] with uuid [%s]", + "rejecting cluster state version [%d] uuid [%s] received from [%s]", incomingState.version(), incomingState.stateUUID(), - node.clusterState.version(), - node.clusterState.stateUUID() + incomingState.nodes().getMasterNodeId() ); assertThat(e, hasToString("java.lang.IllegalStateException: " + message));