From 563304df9eabd9aaaf5fc7db0c07803b17db2378 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 9 Mar 2016 00:28:38 -0500 Subject: [PATCH] No old states polluting pending states queue This commit adds a guard against an old cluster state that arrives out of order from the last seen cluster state from the current master from polluting the pending cluster states queue. Without this guard, such a state can end up stuck in the pending states queue. --- .../publish/PublishClusterStateAction.java | 13 +++++++ .../PublishClusterStateActionTests.java | 35 +++++++++++++++---- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index acb5f640db0..c4ecc644ea9 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -58,6 +58,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -402,6 +403,18 @@ public class PublishClusterStateAction extends AbstractComponent { } ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState); + if (lastSeenClusterState != null && lastSeenClusterState.supersedes(incomingState)) { + final String message = String.format( + Locale.ROOT, + "received older cluster state version [%s] with uuid [%s] than last seen cluster state [%s] with uuid [%s]", + incomingState.version(), + incomingState.stateUUID(), + lastSeenClusterState.version(), + lastSeenClusterState.stateUUID() + ); + logger.warn(message); + throw new IllegalStateException(message); + } } protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index 7e31f6055de..e1cb66555d1 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -62,6 +62,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -72,6 +73,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -630,9 +632,19 @@ public class PublishClusterStateActionTests extends ESTestCase { } logger.info("--> testing acceptance of an old cluster state"); - state = node.clusterState; + final ClusterState incomingState = node.clusterState; node.clusterState = ClusterState.builder(node.clusterState).incrementVersion().build(); - node.action.validateIncomingState(state, node.clusterState); + final IllegalStateException e = + expectThrows(IllegalStateException.class, () -> node.action.validateIncomingState(incomingState, node.clusterState)); + final String message = String.format( + Locale.ROOT, + "received older cluster state version [%s] with uuid [%s] than last seen cluster state [%s] with uuid [%s]", + incomingState.version(), + incomingState.stateUUID(), + node.clusterState.version(), + node.clusterState.stateUUID() + ); + assertThat(e, hasToString("java.lang.IllegalStateException: " + message)); // an older version from a *new* master is also OK! ClusterState previousState = ClusterState.builder(node.clusterState).incrementVersion().build(); @@ -645,12 +657,12 @@ public class PublishClusterStateActionTests extends ESTestCase { node.action.validateIncomingState(state, previousState); } - public void testInterleavedPublishCommit() throws Throwable { + public void testOutOfOrderCommitMessages() throws Throwable { MockNode node = createMockNode("node").setAsMaster(); final CapturingTransportChannel channel = new CapturingTransportChannel(); List states = new ArrayList<>(); - final int numOfStates = scaledRandomIntBetween(3, 10); + final int numOfStates = scaledRandomIntBetween(3, 25); for (int i = 1; i <= numOfStates; i++) { states.add(ClusterState.builder(node.clusterState).version(i).stateUUID(ClusterState.UNKNOWN_UUID).build()); } @@ -658,8 +670,17 @@ public class PublishClusterStateActionTests extends ESTestCase { final ClusterState finalState = states.get(numOfStates - 1); Collections.shuffle(states, random()); - logger.info("--> publishing states"); + List orderedSubsequence = new ArrayList<>(); + long version = 0; for (ClusterState state : states) { + if (state.version() >= version) { + orderedSubsequence.add(state); + version = state.version(); + } + } + + logger.info("--> publishing states"); + for (ClusterState state : orderedSubsequence) { node.action.handleIncomingClusterStateRequest( new BytesTransportRequest(PublishClusterStateAction.serializeFullClusterState(state, Version.CURRENT), Version.CURRENT), channel); @@ -670,8 +691,8 @@ public class PublishClusterStateActionTests extends ESTestCase { logger.info("--> committing states"); - Randomness.shuffle(states); - for (ClusterState state : states) { + Randomness.shuffle(orderedSubsequence); + for (ClusterState state : orderedSubsequence) { node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state.stateUUID()), channel); assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); if (channel.error.get() != null) {