diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index acb5f640db0..c4ecc644ea9 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -58,6 +58,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -402,6 +403,18 @@ public class PublishClusterStateAction extends AbstractComponent { } ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState); + if (lastSeenClusterState != null && lastSeenClusterState.supersedes(incomingState)) { + final String message = String.format( + Locale.ROOT, + "received older cluster state version [%s] with uuid [%s] than last seen cluster state [%s] with uuid [%s]", + incomingState.version(), + incomingState.stateUUID(), + lastSeenClusterState.version(), + lastSeenClusterState.stateUUID() + ); + logger.warn(message); + throw new IllegalStateException(message); + } } protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index 7e31f6055de..e1cb66555d1 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -62,6 +62,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -72,6 +73,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -630,9 +632,19 @@ public class PublishClusterStateActionTests extends ESTestCase { } logger.info("--> testing acceptance of an old cluster state"); - state = node.clusterState; + final ClusterState incomingState = node.clusterState; node.clusterState = ClusterState.builder(node.clusterState).incrementVersion().build(); - node.action.validateIncomingState(state, node.clusterState); + final IllegalStateException e = + expectThrows(IllegalStateException.class, () -> node.action.validateIncomingState(incomingState, node.clusterState)); + final String message = String.format( + Locale.ROOT, + "received older cluster state version [%s] with uuid [%s] than last seen cluster state [%s] with uuid [%s]", + incomingState.version(), + incomingState.stateUUID(), + node.clusterState.version(), + node.clusterState.stateUUID() + ); + assertThat(e, hasToString("java.lang.IllegalStateException: " + message)); // an older version from a *new* master is also OK! ClusterState previousState = ClusterState.builder(node.clusterState).incrementVersion().build(); @@ -645,12 +657,12 @@ public class PublishClusterStateActionTests extends ESTestCase { node.action.validateIncomingState(state, previousState); } - public void testInterleavedPublishCommit() throws Throwable { + public void testOutOfOrderCommitMessages() throws Throwable { MockNode node = createMockNode("node").setAsMaster(); final CapturingTransportChannel channel = new CapturingTransportChannel(); List states = new ArrayList<>(); - final int numOfStates = scaledRandomIntBetween(3, 10); + final int numOfStates = scaledRandomIntBetween(3, 25); for (int i = 1; i <= numOfStates; i++) { states.add(ClusterState.builder(node.clusterState).version(i).stateUUID(ClusterState.UNKNOWN_UUID).build()); } @@ -658,8 +670,17 @@ public class PublishClusterStateActionTests extends ESTestCase { final ClusterState finalState = states.get(numOfStates - 1); Collections.shuffle(states, random()); - logger.info("--> publishing states"); + List orderedSubsequence = new ArrayList<>(); + long version = 0; for (ClusterState state : states) { + if (state.version() >= version) { + orderedSubsequence.add(state); + version = state.version(); + } + } + + logger.info("--> publishing states"); + for (ClusterState state : orderedSubsequence) { node.action.handleIncomingClusterStateRequest( new BytesTransportRequest(PublishClusterStateAction.serializeFullClusterState(state, Version.CURRENT), Version.CURRENT), channel); @@ -670,8 +691,8 @@ public class PublishClusterStateActionTests extends ESTestCase { logger.info("--> committing states"); - Randomness.shuffle(states); - for (ClusterState state : states) { + Randomness.shuffle(orderedSubsequence); + for (ClusterState state : orderedSubsequence) { node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state.stateUUID()), channel); assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); if (channel.error.get() != null) {