diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index ac595e640bc..78dc6897843 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -188,7 +188,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName); this.nodesFD.addListener(new NodeFaultDetectionListener()); - this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewPendingClusterStateListener(), discoverySettings, clusterName); + this.publishClusterState = + new PublishClusterStateAction( + settings, + transportService, + clusterService::state, + new NewPendingClusterStateListener(), + discoverySettings, + clusterName); this.pingService.setPingContextProvider(this); this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener()); diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java index b3ad7329d99..7ab965110c8 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java @@ -164,12 +164,17 @@ public class PendingClusterStatesQueue { currentMaster ); } - } else if (state.supersedes(pendingState) && pendingContext.committed()) { + } else if (state.version() >= pendingState.version()) { + assert state.supersedes(pendingState) || ( + state.nodes().getMasterNodeId() != null && + state.nodes().getMasterNodeId().equals(pendingState.nodes().getMasterNodeId())); logger.trace("processing pending state uuid[{}]/v[{}] together with state uuid[{}]/v[{}]", pendingState.stateUUID(), pendingState.version(), state.stateUUID(), state.version() ); contextsToRemove.add(pendingContext); - pendingContext.listener.onNewClusterStateProcessed(); + if (pendingContext.committed()) { + pendingContext.listener.onNewClusterStateProcessed(); + } } else if (pendingState.stateUUID().equals(state.stateUUID())) { assert pendingContext.committed() : "processed cluster state is not committed " + state; contextsToRemove.add(pendingContext); diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 517339a36b9..c2407081ff3 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -41,7 +41,6 @@ import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler; import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BytesTransportRequest; @@ -64,6 +63,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; /** * @@ -82,17 +82,22 @@ public class PublishClusterStateAction extends AbstractComponent { } private final TransportService transportService; - private final DiscoveryNodesProvider nodesProvider; + private final Supplier clusterStateSupplier; private final NewPendingClusterStateListener newPendingClusterStatelistener; private final DiscoverySettings discoverySettings; private final ClusterName clusterName; private final PendingClusterStatesQueue pendingStatesQueue; - public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, - NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { + public PublishClusterStateAction( + Settings settings, + TransportService transportService, + Supplier clusterStateSupplier, + NewPendingClusterStateListener listener, + DiscoverySettings discoverySettings, + ClusterName clusterName) { super(settings); this.transportService = transportService; - this.nodesProvider = nodesProvider; + this.clusterStateSupplier = clusterStateSupplier; this.newPendingClusterStatelistener = listener; this.discoverySettings = discoverySettings; this.clusterName = clusterName; @@ -364,7 +369,7 @@ public class PublishClusterStateAction extends AbstractComponent { final ClusterState incomingState; // If true we received full cluster state - otherwise diffs if (in.readBoolean()) { - incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode()); + incomingState = ClusterState.Builder.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode()); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); } else if (lastSeenClusterState != null) { Diff diff = lastSeenClusterState.readDiffFrom(in); @@ -395,11 +400,11 @@ public class PublishClusterStateAction extends AbstractComponent { logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().getMasterNode(), incomingClusterName); throw new IllegalStateException("received state from a node that is not part of the cluster"); } - final DiscoveryNodes currentNodes = nodesProvider.nodes(); + final DiscoveryNodes currentNodes = clusterStateSupplier.get().nodes(); if (currentNodes.getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) { logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().getMasterNode()); - throw new IllegalStateException("received state from a node that is not part of the cluster"); + throw new IllegalStateException("received state from local node that does not match the current local node"); } ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState); @@ -407,7 +412,7 @@ public class PublishClusterStateAction extends AbstractComponent { final String message = String.format( Locale.ROOT, "received cluster state from current master superseded by last seen cluster state; " + - "received version [%s] with uuid [%s], last seen version [%s] with uuid [%s]", + "received version [%d] with uuid [%s], last seen version [%d] with uuid [%s]", incomingState.version(), incomingState.stateUUID(), lastSeenClusterState.version(), @@ -416,6 +421,21 @@ public class PublishClusterStateAction extends AbstractComponent { logger.warn(message); throw new IllegalStateException(message); } + + final ClusterState state = clusterStateSupplier.get(); + if (state.nodes().getMasterNodeId() != null && incomingState.version() <= state.version()) { + assert !incomingState.stateUUID().equals(state.stateUUID()); + final String message = String.format( + Locale.ROOT, + "received cluster state older than current cluster state; " + + "received version [%d] with uuid [%s], current version [%d]", + incomingState.version(), + incomingState.stateUUID(), + state.version() + ); + logger.warn(message); + throw new IllegalStateException(message); + } } protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java index 3d4c464f128..9c90e8de900 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java @@ -195,10 +195,11 @@ public class PendingClusterStatesQueueTests extends ESTestCase { highestCommitted = context.state; } } + assert highestCommitted != null; queue.markAsProcessed(highestCommitted); - assertThat(queue.stats().getTotal(), equalTo(states.size() - committedContexts.size())); - assertThat(queue.stats().getPending(), equalTo(states.size() - committedContexts.size())); + assertThat((long)queue.stats().getTotal(), equalTo(states.size() - (1 + highestCommitted.version()))); + assertThat((long)queue.stats().getPending(), equalTo(states.size() - (1 + highestCommitted.version()))); assertThat(queue.stats().getCommitted(), equalTo(0)); } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index fc198bf3ca0..21e20228f34 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -70,7 +70,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; @@ -161,7 +163,7 @@ public class PublishClusterStateActionTests extends ESTestCase { DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(settings, version); DiscoveryNode discoveryNode = discoveryNodeService.buildLocalNode(service.boundAddress().publishAddress()); MockNode node = new MockNode(discoveryNode, service, listener, logger); - node.action = buildPublishClusterStateAction(settings, service, node, node); + node.action = buildPublishClusterStateAction(settings, service, () -> node.clusterState, node); final CountDownLatch latch = new CountDownLatch(nodes.size() * 2 + 1); TransportConnectionListener waitForConnection = new TransportConnectionListener() { @Override @@ -233,10 +235,21 @@ public class PublishClusterStateActionTests extends ESTestCase { return transportService; } - protected MockPublishAction buildPublishClusterStateAction(Settings settings, MockTransportService transportService, DiscoveryNodesProvider nodesProvider, - PublishClusterStateAction.NewPendingClusterStateListener listener) { - DiscoverySettings discoverySettings = new DiscoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - return new MockPublishAction(settings, transportService, nodesProvider, listener, discoverySettings, ClusterName.DEFAULT); + protected MockPublishAction buildPublishClusterStateAction( + Settings settings, + MockTransportService transportService, + Supplier clusterStateSupplier, + PublishClusterStateAction.NewPendingClusterStateListener listener + ) { + DiscoverySettings discoverySettings = + new DiscoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + return new MockPublishAction( + settings, + transportService, + clusterStateSupplier, + listener, + discoverySettings, + ClusterName.DEFAULT); } public void testSimpleClusterStatePublishing() throws Exception { @@ -598,11 +611,12 @@ public class PublishClusterStateActionTests extends ESTestCase { node.action.validateIncomingState(state, node.clusterState); fail("node accepted state from another master"); } catch (IllegalStateException OK) { + assertThat(OK.toString(), containsString("cluster state from a different master than the current one, rejecting")); } logger.info("--> test state from the current master is accepted"); node.action.validateIncomingState(ClusterState.builder(node.clusterState) - .nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build(), node.clusterState); + .nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).incrementVersion().build(), node.clusterState); logger.info("--> testing rejection of another cluster name"); @@ -610,6 +624,7 @@ public class PublishClusterStateActionTests extends ESTestCase { node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAsciiOfLength(10))).nodes(node.nodes()).build(), node.clusterState); fail("node accepted state with another cluster name"); } catch (IllegalStateException OK) { + assertThat(OK.toString(), containsString("received state from a node that is not part of the cluster")); } logger.info("--> testing rejection of a cluster state with wrong local node"); @@ -620,6 +635,7 @@ public class PublishClusterStateActionTests extends ESTestCase { node.action.validateIncomingState(state, node.clusterState); fail("node accepted state with non-existence local node"); } catch (IllegalStateException OK) { + assertThat(OK.toString(), containsString("received state from local node that does not match the current local node")); } try { @@ -630,6 +646,7 @@ public class PublishClusterStateActionTests extends ESTestCase { node.action.validateIncomingState(state, node.clusterState); fail("node accepted state with existent but wrong local node"); } catch (IllegalStateException OK) { + assertThat(OK.toString(), containsString("received state from local node that does not match the current local node")); } logger.info("--> testing acceptance of an old cluster state"); @@ -639,7 +656,7 @@ public class PublishClusterStateActionTests extends ESTestCase { expectThrows(IllegalStateException.class, () -> node.action.validateIncomingState(incomingState, node.clusterState)); final String message = String.format( Locale.ROOT, - "received older cluster state version [%s] from current master with uuid [%s] than last seen cluster state [%s] from current master with uuid [%s]", + "received cluster state from current master superseded by last seen cluster state; received version [%d] with uuid [%s], last seen version [%d] with uuid [%s]", incomingState.version(), incomingState.stateUUID(), node.clusterState.version(), @@ -678,19 +695,27 @@ public class PublishClusterStateActionTests extends ESTestCase { assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); assertThat(channel.error.get(), nullValue()); channel.clear(); + } logger.info("--> committing states"); + long largestVersionSeen = Long.MIN_VALUE; Randomness.shuffle(states); for (ClusterState state : states) { node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state.stateUUID()), channel); - assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); - if (channel.error.get() != null) { - throw channel.error.get(); + if (largestVersionSeen < state.getVersion()) { + assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); + if (channel.error.get() != null) { + throw channel.error.get(); + } + largestVersionSeen = state.getVersion(); + } else { + assertNotNull(channel.error.get()); + assertThat(channel.error.get(), instanceOf(IllegalStateException.class)); } + channel.clear(); } - channel.clear(); //now check the last state held assertSameState(node.clusterState, finalState); @@ -828,8 +853,8 @@ public class PublishClusterStateActionTests extends ESTestCase { AtomicBoolean timeoutOnCommit = new AtomicBoolean(); AtomicBoolean errorOnCommit = new AtomicBoolean(); - public MockPublishAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { - super(settings, transportService, nodesProvider, listener, discoverySettings, clusterName); + public MockPublishAction(Settings settings, TransportService transportService, Supplier clusterStateSupplier, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { + super(settings, transportService, clusterStateSupplier, listener, discoverySettings, clusterName); } @Override