No old states polluting pending states queue

This commit adds a guard against an old cluster state that arrives out
of order from the last seen cluster state from the current master from
polluting the pending cluster states queue. Without this guard, such a
state can end up stuck in the pending states queue.
This commit is contained in:
Jason Tedor 2016-03-09 00:28:38 -05:00
parent 2bd09c4625
commit 563304df9e
2 changed files with 41 additions and 7 deletions

View File

@ -58,6 +58,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -402,6 +403,18 @@ public class PublishClusterStateAction extends AbstractComponent {
}
ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState);
if (lastSeenClusterState != null && lastSeenClusterState.supersedes(incomingState)) {
final String message = String.format(
Locale.ROOT,
"received older cluster state version [%s] with uuid [%s] than last seen cluster state [%s] with uuid [%s]",
incomingState.version(),
incomingState.stateUUID(),
lastSeenClusterState.version(),
lastSeenClusterState.stateUUID()
);
logger.warn(message);
throw new IllegalStateException(message);
}
}
protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {

View File

@ -62,6 +62,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -72,6 +73,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -630,9 +632,19 @@ public class PublishClusterStateActionTests extends ESTestCase {
}
logger.info("--> testing acceptance of an old cluster state");
state = node.clusterState;
final ClusterState incomingState = node.clusterState;
node.clusterState = ClusterState.builder(node.clusterState).incrementVersion().build();
node.action.validateIncomingState(state, node.clusterState);
final IllegalStateException e =
expectThrows(IllegalStateException.class, () -> node.action.validateIncomingState(incomingState, node.clusterState));
final String message = String.format(
Locale.ROOT,
"received older cluster state version [%s] with uuid [%s] than last seen cluster state [%s] with uuid [%s]",
incomingState.version(),
incomingState.stateUUID(),
node.clusterState.version(),
node.clusterState.stateUUID()
);
assertThat(e, hasToString("java.lang.IllegalStateException: " + message));
// an older version from a *new* master is also OK!
ClusterState previousState = ClusterState.builder(node.clusterState).incrementVersion().build();
@ -645,12 +657,12 @@ public class PublishClusterStateActionTests extends ESTestCase {
node.action.validateIncomingState(state, previousState);
}
public void testInterleavedPublishCommit() throws Throwable {
public void testOutOfOrderCommitMessages() throws Throwable {
MockNode node = createMockNode("node").setAsMaster();
final CapturingTransportChannel channel = new CapturingTransportChannel();
List<ClusterState> states = new ArrayList<>();
final int numOfStates = scaledRandomIntBetween(3, 10);
final int numOfStates = scaledRandomIntBetween(3, 25);
for (int i = 1; i <= numOfStates; i++) {
states.add(ClusterState.builder(node.clusterState).version(i).stateUUID(ClusterState.UNKNOWN_UUID).build());
}
@ -658,8 +670,17 @@ public class PublishClusterStateActionTests extends ESTestCase {
final ClusterState finalState = states.get(numOfStates - 1);
Collections.shuffle(states, random());
logger.info("--> publishing states");
List<ClusterState> orderedSubsequence = new ArrayList<>();
long version = 0;
for (ClusterState state : states) {
if (state.version() >= version) {
orderedSubsequence.add(state);
version = state.version();
}
}
logger.info("--> publishing states");
for (ClusterState state : orderedSubsequence) {
node.action.handleIncomingClusterStateRequest(
new BytesTransportRequest(PublishClusterStateAction.serializeFullClusterState(state, Version.CURRENT), Version.CURRENT),
channel);
@ -670,8 +691,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
logger.info("--> committing states");
Randomness.shuffle(states);
for (ClusterState state : states) {
Randomness.shuffle(orderedSubsequence);
for (ClusterState state : orderedSubsequence) {
node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state.stateUUID()), channel);
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
if (channel.error.get() != null) {