Reject old cluster states and keep the queue clean

This commit adds a guard preventing old cluster states from entering
into the pending queue when we are following a master, and cleans old
cluster states from the pending queue when processing a commit.
This commit is contained in:
Jason Tedor 2016-04-05 09:33:50 -04:00
parent e201f5c8b0
commit cffc315dca
5 changed files with 85 additions and 27 deletions

View File

@ -188,7 +188,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);
this.nodesFD.addListener(new NodeFaultDetectionListener());
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewPendingClusterStateListener(), discoverySettings, clusterName);
this.publishClusterState =
new PublishClusterStateAction(
settings,
transportService,
clusterService::state,
new NewPendingClusterStateListener(),
discoverySettings,
clusterName);
this.pingService.setPingContextProvider(this);
this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener());

View File

@ -164,12 +164,17 @@ public class PendingClusterStatesQueue {
currentMaster
);
}
} else if (state.supersedes(pendingState) && pendingContext.committed()) {
} else if (state.version() >= pendingState.version()) {
assert state.supersedes(pendingState) || (
state.nodes().getMasterNodeId() != null &&
state.nodes().getMasterNodeId().equals(pendingState.nodes().getMasterNodeId()));
logger.trace("processing pending state uuid[{}]/v[{}] together with state uuid[{}]/v[{}]",
pendingState.stateUUID(), pendingState.version(), state.stateUUID(), state.version()
);
contextsToRemove.add(pendingContext);
pendingContext.listener.onNewClusterStateProcessed();
if (pendingContext.committed()) {
pendingContext.listener.onNewClusterStateProcessed();
}
} else if (pendingState.stateUUID().equals(state.stateUUID())) {
assert pendingContext.committed() : "processed cluster state is not committed " + state;
contextsToRemove.add(pendingContext);

View File

@ -41,7 +41,6 @@ import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest;
@ -64,6 +63,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
/**
*
@ -82,17 +82,22 @@ public class PublishClusterStateAction extends AbstractComponent {
}
private final TransportService transportService;
private final DiscoveryNodesProvider nodesProvider;
private final Supplier<ClusterState> clusterStateSupplier;
private final NewPendingClusterStateListener newPendingClusterStatelistener;
private final DiscoverySettings discoverySettings;
private final ClusterName clusterName;
private final PendingClusterStatesQueue pendingStatesQueue;
public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
public PublishClusterStateAction(
Settings settings,
TransportService transportService,
Supplier<ClusterState> clusterStateSupplier,
NewPendingClusterStateListener listener,
DiscoverySettings discoverySettings,
ClusterName clusterName) {
super(settings);
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.clusterStateSupplier = clusterStateSupplier;
this.newPendingClusterStatelistener = listener;
this.discoverySettings = discoverySettings;
this.clusterName = clusterName;
@ -364,7 +369,7 @@ public class PublishClusterStateAction extends AbstractComponent {
final ClusterState incomingState;
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode());
incomingState = ClusterState.Builder.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode());
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
} else if (lastSeenClusterState != null) {
Diff<ClusterState> diff = lastSeenClusterState.readDiffFrom(in);
@ -395,11 +400,11 @@ public class PublishClusterStateAction extends AbstractComponent {
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().getMasterNode(), incomingClusterName);
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
final DiscoveryNodes currentNodes = nodesProvider.nodes();
final DiscoveryNodes currentNodes = clusterStateSupplier.get().nodes();
if (currentNodes.getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().getMasterNode());
throw new IllegalStateException("received state from a node that is not part of the cluster");
throw new IllegalStateException("received state from local node that does not match the current local node");
}
ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState);
@ -407,7 +412,7 @@ public class PublishClusterStateAction extends AbstractComponent {
final String message = String.format(
Locale.ROOT,
"received cluster state from current master superseded by last seen cluster state; " +
"received version [%s] with uuid [%s], last seen version [%s] with uuid [%s]",
"received version [%d] with uuid [%s], last seen version [%d] with uuid [%s]",
incomingState.version(),
incomingState.stateUUID(),
lastSeenClusterState.version(),
@ -416,6 +421,21 @@ public class PublishClusterStateAction extends AbstractComponent {
logger.warn(message);
throw new IllegalStateException(message);
}
final ClusterState state = clusterStateSupplier.get();
if (state.nodes().getMasterNodeId() != null && incomingState.version() <= state.version()) {
assert !incomingState.stateUUID().equals(state.stateUUID());
final String message = String.format(
Locale.ROOT,
"received cluster state older than current cluster state; " +
"received version [%d] with uuid [%s], current version [%d]",
incomingState.version(),
incomingState.stateUUID(),
state.version()
);
logger.warn(message);
throw new IllegalStateException(message);
}
}
protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {

View File

@ -195,10 +195,11 @@ public class PendingClusterStatesQueueTests extends ESTestCase {
highestCommitted = context.state;
}
}
assert highestCommitted != null;
queue.markAsProcessed(highestCommitted);
assertThat(queue.stats().getTotal(), equalTo(states.size() - committedContexts.size()));
assertThat(queue.stats().getPending(), equalTo(states.size() - committedContexts.size()));
assertThat((long)queue.stats().getTotal(), equalTo(states.size() - (1 + highestCommitted.version())));
assertThat((long)queue.stats().getPending(), equalTo(states.size() - (1 + highestCommitted.version())));
assertThat(queue.stats().getCommitted(), equalTo(0));
}

View File

@ -70,7 +70,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
@ -161,7 +163,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(settings, version);
DiscoveryNode discoveryNode = discoveryNodeService.buildLocalNode(service.boundAddress().publishAddress());
MockNode node = new MockNode(discoveryNode, service, listener, logger);
node.action = buildPublishClusterStateAction(settings, service, node, node);
node.action = buildPublishClusterStateAction(settings, service, () -> node.clusterState, node);
final CountDownLatch latch = new CountDownLatch(nodes.size() * 2 + 1);
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
@Override
@ -233,10 +235,21 @@ public class PublishClusterStateActionTests extends ESTestCase {
return transportService;
}
protected MockPublishAction buildPublishClusterStateAction(Settings settings, MockTransportService transportService, DiscoveryNodesProvider nodesProvider,
PublishClusterStateAction.NewPendingClusterStateListener listener) {
DiscoverySettings discoverySettings = new DiscoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
return new MockPublishAction(settings, transportService, nodesProvider, listener, discoverySettings, ClusterName.DEFAULT);
protected MockPublishAction buildPublishClusterStateAction(
Settings settings,
MockTransportService transportService,
Supplier<ClusterState> clusterStateSupplier,
PublishClusterStateAction.NewPendingClusterStateListener listener
) {
DiscoverySettings discoverySettings =
new DiscoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
return new MockPublishAction(
settings,
transportService,
clusterStateSupplier,
listener,
discoverySettings,
ClusterName.DEFAULT);
}
public void testSimpleClusterStatePublishing() throws Exception {
@ -598,11 +611,12 @@ public class PublishClusterStateActionTests extends ESTestCase {
node.action.validateIncomingState(state, node.clusterState);
fail("node accepted state from another master");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("cluster state from a different master than the current one, rejecting"));
}
logger.info("--> test state from the current master is accepted");
node.action.validateIncomingState(ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build(), node.clusterState);
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).incrementVersion().build(), node.clusterState);
logger.info("--> testing rejection of another cluster name");
@ -610,6 +624,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAsciiOfLength(10))).nodes(node.nodes()).build(), node.clusterState);
fail("node accepted state with another cluster name");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("received state from a node that is not part of the cluster"));
}
logger.info("--> testing rejection of a cluster state with wrong local node");
@ -620,6 +635,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
node.action.validateIncomingState(state, node.clusterState);
fail("node accepted state with non-existence local node");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("received state from local node that does not match the current local node"));
}
try {
@ -630,6 +646,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
node.action.validateIncomingState(state, node.clusterState);
fail("node accepted state with existent but wrong local node");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("received state from local node that does not match the current local node"));
}
logger.info("--> testing acceptance of an old cluster state");
@ -639,7 +656,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
expectThrows(IllegalStateException.class, () -> node.action.validateIncomingState(incomingState, node.clusterState));
final String message = String.format(
Locale.ROOT,
"received older cluster state version [%s] from current master with uuid [%s] than last seen cluster state [%s] from current master with uuid [%s]",
"received cluster state from current master superseded by last seen cluster state; received version [%d] with uuid [%s], last seen version [%d] with uuid [%s]",
incomingState.version(),
incomingState.stateUUID(),
node.clusterState.version(),
@ -678,19 +695,27 @@ public class PublishClusterStateActionTests extends ESTestCase {
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
assertThat(channel.error.get(), nullValue());
channel.clear();
}
logger.info("--> committing states");
long largestVersionSeen = Long.MIN_VALUE;
Randomness.shuffle(states);
for (ClusterState state : states) {
node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state.stateUUID()), channel);
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
if (channel.error.get() != null) {
throw channel.error.get();
if (largestVersionSeen < state.getVersion()) {
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
if (channel.error.get() != null) {
throw channel.error.get();
}
largestVersionSeen = state.getVersion();
} else {
assertNotNull(channel.error.get());
assertThat(channel.error.get(), instanceOf(IllegalStateException.class));
}
channel.clear();
}
channel.clear();
//now check the last state held
assertSameState(node.clusterState, finalState);
@ -828,8 +853,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
AtomicBoolean timeoutOnCommit = new AtomicBoolean();
AtomicBoolean errorOnCommit = new AtomicBoolean();
public MockPublishAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
super(settings, transportService, nodesProvider, listener, discoverySettings, clusterName);
public MockPublishAction(Settings settings, TransportService transportService, Supplier<ClusterState> clusterStateSupplier, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
super(settings, transportService, clusterStateSupplier, listener, discoverySettings, clusterName);
}
@Override