From 7054d24f0d75b9b0c728b8632a1606d9f9bd5a12 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 19 May 2017 09:34:23 +0200 Subject: [PATCH] Clear responsibilities for PublishClusterStateAction and ZenDiscovery (#24772) This commit moves some functionality from PublishClusterStateAction to ZenDiscovery, which allows each class to focus on it's core competencies: - PendingStatesQueue is now solely managed by ZenDiscovery (no shared access by both PublishClusterStateAction and ZenDiscovery) - Validation logic is handled exclusively by ZenDiscovery --- .../common/settings/ClusterSettings.java | 1 + .../zen/PublishClusterStateAction.java | 93 +++--------- .../discovery/zen/ZenDiscovery.java | 93 +++++++++--- .../zen/PublishClusterStateActionTests.java | 143 +++++------------- .../discovery/zen/ZenDiscoveryUnitTests.java | 92 +++++++++++ 5 files changed, 227 insertions(+), 195 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 72d2bdc6b4c..d8ee93fe882 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -341,6 +341,7 @@ public final class ClusterSettings extends AbstractScopedSettings { ZenDiscovery.SEND_LEAVE_REQUEST_SETTING, ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING, ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING, + ZenDiscovery.MAX_PENDING_CLUSTER_STATES_SETTING, UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING, UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING, UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT, diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java index 4150783a8fd..ae469d162ae 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java @@ -23,8 +23,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.IncompatibleClusterStateVersionException; @@ -60,61 +60,53 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; public class PublishClusterStateAction extends AbstractComponent { public static final String SEND_ACTION_NAME = "internal:discovery/zen/publish/send"; public static final String COMMIT_ACTION_NAME = "internal:discovery/zen/publish/commit"; - public static final String SETTINGS_MAX_PENDING_CLUSTER_STATES = "discovery.zen.publish.max_pending_cluster_states"; + public interface IncomingClusterStateListener { - public interface NewPendingClusterStateListener { + /** + * called when a new incoming cluster state has been received. + * Should validate the incoming state and throw an exception if it's not a valid successor state. + */ + void onIncomingClusterState(ClusterState incomingState); - /** a new cluster state has been committed and is ready to process via {@link #pendingStatesQueue()} */ - void onNewClusterState(String reason); + /** + * called when a cluster state has been committed and is ready to be processed + */ + void onClusterStateCommitted(String stateUUID, ActionListener processedListener); } private final TransportService transportService; private final NamedWriteableRegistry namedWriteableRegistry; - private final Supplier clusterStateSupplier; - private final NewPendingClusterStateListener newPendingClusterStatelistener; + private final IncomingClusterStateListener incomingClusterStateListener; private final DiscoverySettings discoverySettings; - private final ClusterName clusterName; - private final PendingClusterStatesQueue pendingStatesQueue; public PublishClusterStateAction( Settings settings, TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, - Supplier clusterStateSupplier, - NewPendingClusterStateListener listener, - DiscoverySettings discoverySettings, - ClusterName clusterName) { + IncomingClusterStateListener incomingClusterStateListener, + DiscoverySettings discoverySettings) { super(settings); this.transportService = transportService; this.namedWriteableRegistry = namedWriteableRegistry; - this.clusterStateSupplier = clusterStateSupplier; - this.newPendingClusterStatelistener = listener; + this.incomingClusterStateListener = incomingClusterStateListener; this.discoverySettings = discoverySettings; - this.clusterName = clusterName; - this.pendingStatesQueue = new PendingClusterStatesQueue(logger, settings.getAsInt(SETTINGS_MAX_PENDING_CLUSTER_STATES, 25)); transportService.registerRequestHandler(SEND_ACTION_NAME, BytesTransportRequest::new, ThreadPool.Names.SAME, false, false, new SendClusterStateRequestHandler()); transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest::new, ThreadPool.Names.SAME, false, false, new CommitClusterStateRequestHandler()); } - public PendingClusterStatesQueue pendingStatesQueue() { - return pendingStatesQueue; - } - /** * publishes a cluster change event to other nodes. if at least minMasterNodes acknowledge the change it is committed and will * be processed by the master and the other nodes. @@ -387,7 +379,7 @@ public class PublishClusterStateAction extends AbstractComponent { final ClusterState incomingState; // If true we received full cluster state - otherwise diffs if (in.readBoolean()) { - incomingState = ClusterState.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode()); + incomingState = ClusterState.readFrom(in, transportService.getLocalNode()); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); } else if (lastSeenClusterState != null) { @@ -399,10 +391,7 @@ public class PublishClusterStateAction extends AbstractComponent { logger.debug("received diff for but don't have any local cluster state - requesting full state"); throw new IncompatibleClusterStateVersionException("have no local cluster state"); } - // sanity check incoming state - validateIncomingState(incomingState, lastSeenClusterState); - - pendingStatesQueue.addPending(incomingState); + incomingClusterStateListener.onIncomingClusterState(incomingState); lastSeenClusterState = incomingState; } } finally { @@ -411,56 +400,22 @@ public class PublishClusterStateAction extends AbstractComponent { channel.sendResponse(TransportResponse.Empty.INSTANCE); } - // package private for testing - - /** - * does simple sanity check of the incoming cluster state. Throws an exception on rejections. - */ - void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClusterState) { - final ClusterName incomingClusterName = incomingState.getClusterName(); - if (!incomingClusterName.equals(this.clusterName)) { - logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", - incomingState.nodes().getMasterNode(), incomingClusterName); - throw new IllegalStateException("received state from a node that is not part of the cluster"); - } - final ClusterState clusterState = clusterStateSupplier.get(); - - if (clusterState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) { - logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", - incomingState.nodes().getMasterNode()); - throw new IllegalStateException("received state with a local node that does not match the current local node"); - } - - if (ZenDiscovery.shouldIgnoreOrRejectNewClusterState(logger, clusterState, incomingState)) { - String message = String.format( - Locale.ROOT, - "rejecting cluster state version [%d] uuid [%s] received from [%s]", - incomingState.version(), - incomingState.stateUUID(), - incomingState.nodes().getMasterNodeId() - ); - logger.warn(message); - throw new IllegalStateException(message); - } - - } - protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) { - final ClusterState state = pendingStatesQueue.markAsCommitted(request.stateUUID, - new PendingClusterStatesQueue.StateProcessedListener() { + incomingClusterStateListener.onClusterStateCommitted(request.stateUUID, new ActionListener() { + @Override - public void onNewClusterStateProcessed() { + public void onResponse(Void ignore) { try { // send a response to the master to indicate that this cluster state has been processed post committing it. channel.sendResponse(TransportResponse.Empty.INSTANCE); } catch (Exception e) { logger.debug("failed to send response on cluster state processed", e); - onNewClusterStateFailed(e); + onFailure(e); } } @Override - public void onNewClusterStateFailed(Exception e) { + public void onFailure(Exception e) { try { channel.sendResponse(e); } catch (Exception inner) { @@ -469,10 +424,6 @@ public class PublishClusterStateAction extends AbstractComponent { } } }); - if (state != null) { - newPendingClusterStatelistener.onNewClusterState("master " + state.nodes().getMasterNode() + - " committed version [" + state.version() + "]"); - } } private class SendClusterStateRequestHandler implements TransportRequestHandler { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 09e6357ba56..dfbf3f780be 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -56,6 +57,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoveryStats; +import org.elasticsearch.discovery.zen.PublishClusterStateAction.IncomingClusterStateListener; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.TransportChannel; @@ -82,7 +84,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; -public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider { +public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener { public static final Setting PING_TIMEOUT_SETTING = Setting.positiveTimeSetting("discovery.zen.ping_timeout", timeValueSeconds(3), Property.NodeScope); @@ -104,6 +106,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover Property.NodeScope); public static final Setting MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING = Setting.boolSetting("discovery.zen.master_election.ignore_non_master_pings", false, Property.NodeScope); + public static final Setting MAX_PENDING_CLUSTER_STATES_SETTING = + Setting.intSetting("discovery.zen.publish.max_pending_cluster_states", 25, 1, Property.NodeScope); public static final String DISCOVERY_REJOIN_ACTION_NAME = "internal:discovery/zen/rejoin"; @@ -139,6 +143,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover private final JoinThreadControl joinThreadControl; + private final PendingClusterStatesQueue pendingStatesQueue; + private final NodeJoinController nodeJoinController; private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; @@ -197,16 +203,15 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover this.masterFD.addListener(new MasterNodeFailureListener()); this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName); this.nodesFD.addListener(new NodeFaultDetectionListener()); + this.pendingStatesQueue = new PendingClusterStatesQueue(logger, MAX_PENDING_CLUSTER_STATES_SETTING.get(settings)); this.publishClusterState = new PublishClusterStateAction( settings, transportService, namedWriteableRegistry, - this::clusterState, - new NewPendingClusterStateListener(), - discoverySettings, - clusterName); + this, + discoverySettings); this.membership = new MembershipAction(settings, transportService, new MembershipListener()); this.joinThreadControl = new JoinThreadControl(); @@ -311,7 +316,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover throw new FailedToCommitClusterStateException("state was mutated while calculating new CS update"); } - publishClusterState.pendingStatesQueue().addPending(newState); + pendingStatesQueue.addPending(newState); try { publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener); @@ -321,7 +326,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover newState.version(), electMaster.minimumMasterNodes()); synchronized (stateMutex) { - publishClusterState.pendingStatesQueue().failAllStatesAndClear( + pendingStatesQueue.failAllStatesAndClear( new ElasticsearchException("failed to publish cluster state")); rejoin("zen-disco-failed-to-publish"); @@ -332,7 +337,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover final DiscoveryNode localNode = newState.getNodes().getLocalNode(); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean processedOrFailed = new AtomicBoolean(); - publishClusterState.pendingStatesQueue().markAsCommitted(newState.stateUUID(), + pendingStatesQueue.markAsCommitted(newState.stateUUID(), new PendingClusterStatesQueue.StateProcessedListener() { @Override public void onNewClusterStateProcessed() { @@ -391,7 +396,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover @Override public DiscoveryStats stats() { - PendingClusterStateStats queueStats = publishClusterState.pendingStatesQueue().stats(); + PendingClusterStateStats queueStats = pendingStatesQueue.stats(); return new DiscoveryStats(queueStats); } @@ -409,11 +414,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover // used for testing public ClusterState[] pendingClusterStates() { - return publishClusterState.pendingStatesQueue().pendingClusterStates(); + return pendingStatesQueue.pendingClusterStates(); } PendingClusterStatesQueue pendingClusterStatesQueue() { - return publishClusterState.pendingStatesQueue(); + return pendingStatesQueue; } /** @@ -703,7 +708,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover synchronized (stateMutex) { if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) { // flush any pending cluster states from old master, so it will not be set as master again - publishClusterState.pendingStatesQueue().failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason)); + pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason)); rejoin("master left (reason = " + reason + ")"); } } @@ -713,7 +718,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover boolean processNextCommittedClusterState(String reason) { assert Thread.holdsLock(stateMutex); - final ClusterState newClusterState = publishClusterState.pendingStatesQueue().getNextClusterStateToProcess(); + final ClusterState newClusterState = pendingStatesQueue.getNextClusterStateToProcess(); final ClusterState currentState = committedState.get(); final ClusterState adaptedNewClusterState; // all pending states have been processed @@ -742,7 +747,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover } } catch (Exception e) { try { - publishClusterState.pendingStatesQueue().markAsFailed(newClusterState, e); + pendingStatesQueue.markAsFailed(newClusterState, e); } catch (Exception inner) { inner.addSuppressed(e); logger.error((Supplier) () -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner); @@ -811,7 +816,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { try { - publishClusterState.pendingStatesQueue().markAsProcessed(newClusterState); + pendingStatesQueue.markAsProcessed(newClusterState); } catch (Exception e) { onFailure(source, e); } @@ -823,7 +828,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover try { // TODO: use cluster state uuid instead of full cluster state so that we don't keep reference to CS around // for too long. - publishClusterState.pendingStatesQueue().markAsFailed(newClusterState, e); + pendingStatesQueue.markAsFailed(newClusterState, e); } catch (Exception inner) { inner.addSuppressed(e); logger.error((Supplier) () -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner); @@ -1066,16 +1071,64 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover } } - private class NewPendingClusterStateListener implements PublishClusterStateAction.NewPendingClusterStateListener { + @Override + public void onIncomingClusterState(ClusterState incomingState) { + validateIncomingState(logger, incomingState, committedState.get()); + pendingStatesQueue.addPending(incomingState); + } - @Override - public void onNewClusterState(String reason) { + @Override + public void onClusterStateCommitted(String stateUUID, ActionListener processedListener) { + final ClusterState state = pendingStatesQueue.markAsCommitted(stateUUID, + new PendingClusterStatesQueue.StateProcessedListener() { + @Override + public void onNewClusterStateProcessed() { + processedListener.onResponse(null); + } + + @Override + public void onNewClusterStateFailed(Exception e) { + processedListener.onFailure(e); + } + }); + if (state != null) { synchronized (stateMutex) { - processNextCommittedClusterState(reason); + processNextCommittedClusterState("master " + state.nodes().getMasterNode() + + " committed version [" + state.version() + "]"); } } } + /** + * does simple sanity check of the incoming cluster state. Throws an exception on rejections. + */ + static void validateIncomingState(Logger logger, ClusterState incomingState, ClusterState lastState) { + final ClusterName incomingClusterName = incomingState.getClusterName(); + if (!incomingClusterName.equals(lastState.getClusterName())) { + logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", + incomingState.nodes().getMasterNode(), incomingClusterName); + throw new IllegalStateException("received state from a node that is not part of the cluster"); + } + if (lastState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) { + logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", + incomingState.nodes().getMasterNode()); + throw new IllegalStateException("received state with a local node that does not match the current local node"); + } + + if (shouldIgnoreOrRejectNewClusterState(logger, lastState, incomingState)) { + String message = String.format( + Locale.ROOT, + "rejecting cluster state version [%d] uuid [%s] received from [%s]", + incomingState.version(), + incomingState.stateUUID(), + incomingState.nodes().getMasterNodeId() + ); + logger.warn(message); + throw new IllegalStateException(message); + } + + } + private class MembershipListener implements MembershipAction.MembershipListener { @Override public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java index 863bf80085b..2e293472045 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.discovery.zen; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; @@ -64,20 +65,17 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -90,11 +88,12 @@ public class PublishClusterStateActionTests extends ESTestCase { protected ThreadPool threadPool; protected Map nodes = new HashMap<>(); - public static class MockNode implements PublishClusterStateAction.NewPendingClusterStateListener { + public static class MockNode implements PublishClusterStateAction.IncomingClusterStateListener { public final DiscoveryNode discoveryNode; public final MockTransportService service; public MockPublishAction action; public final ClusterStateListener listener; + private final PendingClusterStatesQueue pendingStatesQueue; public volatile ClusterState clusterState; @@ -108,6 +107,7 @@ public class PublishClusterStateActionTests extends ESTestCase { this.logger = logger; this.clusterState = ClusterState.builder(CLUSTER_NAME).nodes(DiscoveryNodes.builder() .add(discoveryNode).localNodeId(discoveryNode.getId()).build()).build(); + this.pendingStatesQueue = new PendingClusterStatesQueue(logger, 25); } public MockNode setAsMaster() { @@ -128,18 +128,37 @@ public class PublishClusterStateActionTests extends ESTestCase { } @Override - public void onNewClusterState(String reason) { - ClusterState newClusterState = action.pendingStatesQueue().getNextClusterStateToProcess(); - logger.debug("[{}] received version [{}], uuid [{}]", - discoveryNode.getName(), newClusterState.version(), newClusterState.stateUUID()); - if (listener != null) { - ClusterChangedEvent event = new ClusterChangedEvent("", newClusterState, clusterState); - listener.clusterChanged(event); + public void onIncomingClusterState(ClusterState incomingState) { + ZenDiscovery.validateIncomingState(logger, incomingState, clusterState); + pendingStatesQueue.addPending(incomingState); + } + + public void onClusterStateCommitted(String stateUUID, ActionListener processedListener) { + final ClusterState state = pendingStatesQueue.markAsCommitted(stateUUID, + new PendingClusterStatesQueue.StateProcessedListener() { + @Override + public void onNewClusterStateProcessed() { + processedListener.onResponse(null); + } + + @Override + public void onNewClusterStateFailed(Exception e) { + processedListener.onFailure(e); + } + }); + if (state != null) { + ClusterState newClusterState = pendingStatesQueue.getNextClusterStateToProcess(); + logger.debug("[{}] received version [{}], uuid [{}]", + discoveryNode.getName(), newClusterState.version(), newClusterState.stateUUID()); + if (listener != null) { + ClusterChangedEvent event = new ClusterChangedEvent("", newClusterState, clusterState); + listener.clusterChanged(event); + } + if (clusterState.nodes().getMasterNode() == null || newClusterState.supersedes(clusterState)) { + clusterState = newClusterState; + } + pendingStatesQueue.markAsProcessed(newClusterState); } - if (clusterState.nodes().getMasterNode() == null || newClusterState.supersedes(clusterState)) { - clusterState = newClusterState; - } - action.pendingStatesQueue().markAsProcessed(newClusterState); } public DiscoveryNodes nodes() { @@ -168,7 +187,7 @@ public class PublishClusterStateActionTests extends ESTestCase { MockTransportService service = buildTransportService(settings, threadPool); DiscoveryNode discoveryNode = service.getLocalDiscoNode(); MockNode node = new MockNode(discoveryNode, service, listener, logger); - node.action = buildPublishClusterStateAction(settings, service, () -> node.clusterState, node); + node.action = buildPublishClusterStateAction(settings, service, node); final CountDownLatch latch = new CountDownLatch(nodes.size() * 2); TransportConnectionListener waitForConnection = new TransportConnectionListener() { @Override @@ -241,8 +260,7 @@ public class PublishClusterStateActionTests extends ESTestCase { private static MockPublishAction buildPublishClusterStateAction( Settings settings, MockTransportService transportService, - Supplier clusterStateSupplier, - PublishClusterStateAction.NewPendingClusterStateListener listener + PublishClusterStateAction.IncomingClusterStateListener listener ) { DiscoverySettings discoverySettings = new DiscoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); @@ -251,10 +269,8 @@ public class PublishClusterStateActionTests extends ESTestCase { settings, transportService, namedWriteableRegistry, - clusterStateSupplier, listener, - discoverySettings, - CLUSTER_NAME); + discoverySettings); } public void testSimpleClusterStatePublishing() throws Exception { @@ -607,86 +623,6 @@ public class PublishClusterStateActionTests extends ESTestCase { } } - public void testIncomingClusterStateValidation() throws Exception { - MockNode node = createMockNode("node"); - - logger.info("--> testing acceptances of any master when having no master"); - ClusterState state = ClusterState.builder(node.clusterState) - .nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId(randomAlphaOfLength(10))).incrementVersion().build(); - node.action.validateIncomingState(state, null); - - // now set a master node - node.clusterState = ClusterState.builder(node.clusterState) - .nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build(); - logger.info("--> testing rejection of another master"); - try { - node.action.validateIncomingState(state, node.clusterState); - fail("node accepted state from another master"); - } catch (IllegalStateException OK) { - assertThat(OK.toString(), containsString("cluster state from a different master than the current one, rejecting")); - } - - logger.info("--> test state from the current master is accepted"); - node.action.validateIncomingState(ClusterState.builder(node.clusterState) - .nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).incrementVersion().build(), node.clusterState); - - - logger.info("--> testing rejection of another cluster name"); - try { - node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAlphaOfLength(10))) - .nodes(node.nodes()).build(), node.clusterState); - fail("node accepted state with another cluster name"); - } catch (IllegalStateException OK) { - assertThat(OK.toString(), containsString("received state from a node that is not part of the cluster")); - } - - logger.info("--> testing rejection of a cluster state with wrong local node"); - try { - state = ClusterState.builder(node.clusterState) - .nodes(DiscoveryNodes.builder(node.nodes()).localNodeId("_non_existing_").build()) - .incrementVersion().build(); - node.action.validateIncomingState(state, node.clusterState); - fail("node accepted state with non-existence local node"); - } catch (IllegalStateException OK) { - assertThat(OK.toString(), containsString("received state with a local node that does not match the current local node")); - } - - try { - MockNode otherNode = createMockNode("otherNode"); - state = ClusterState.builder(node.clusterState).nodes( - DiscoveryNodes.builder(node.nodes()).add(otherNode.discoveryNode).localNodeId(otherNode.discoveryNode.getId()).build() - ).incrementVersion().build(); - node.action.validateIncomingState(state, node.clusterState); - fail("node accepted state with existent but wrong local node"); - } catch (IllegalStateException OK) { - assertThat(OK.toString(), containsString("received state with a local node that does not match the current local node")); - } - - logger.info("--> testing acceptance of an old cluster state"); - final ClusterState incomingState = node.clusterState; - node.clusterState = ClusterState.builder(node.clusterState).incrementVersion().build(); - final IllegalStateException e = - expectThrows(IllegalStateException.class, () -> node.action.validateIncomingState(incomingState, node.clusterState)); - final String message = String.format( - Locale.ROOT, - "rejecting cluster state version [%d] uuid [%s] received from [%s]", - incomingState.version(), - incomingState.stateUUID(), - incomingState.nodes().getMasterNodeId() - ); - assertThat(e, hasToString("java.lang.IllegalStateException: " + message)); - - // an older version from a *new* master is also OK! - ClusterState previousState = ClusterState.builder(node.clusterState).incrementVersion().build(); - state = ClusterState.builder(node.clusterState) - .nodes(DiscoveryNodes.builder(node.clusterState.nodes()).masterNodeId("_new_master_").build()) - .build(); - // remove the master of the node (but still have a previous cluster state with it)! - node.resetMasterId(); - - node.action.validateIncomingState(state, previousState); - } - public void testOutOfOrderCommitMessages() throws Throwable { MockNode node = createMockNode("node").setAsMaster(); final CapturingTransportChannel channel = new CapturingTransportChannel(); @@ -874,9 +810,8 @@ public class PublishClusterStateActionTests extends ESTestCase { AtomicBoolean errorOnCommit = new AtomicBoolean(); public MockPublishAction(Settings settings, TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, - Supplier clusterStateSupplier, NewPendingClusterStateListener listener, - DiscoverySettings discoverySettings, ClusterName clusterName) { - super(settings, transportService, namedWriteableRegistry, clusterStateSupplier, listener, discoverySettings, clusterName); + IncomingClusterStateListener listener, DiscoverySettings discoverySettings) { + super(settings, transportService, namedWriteableRegistry, listener, discoverySettings); } @Override diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index 65856add565..cb88213cfe3 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -67,6 +67,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -88,6 +89,7 @@ import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; public class ZenDiscoveryUnitTests extends ESTestCase { @@ -405,4 +407,94 @@ public class ZenDiscoveryUnitTests extends ESTestCase { } } } + + public void testIncomingClusterStateValidation() throws Exception { + ClusterName clusterName = new ClusterName("abc"); + + DiscoveryNodes.Builder currentNodes = DiscoveryNodes.builder().add( + new DiscoveryNode("a", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT)).localNodeId("a"); + + ClusterState previousState = ClusterState.builder(clusterName).nodes(currentNodes).build(); + + logger.info("--> testing acceptances of any master when having no master"); + ClusterState state = ClusterState.builder(previousState) + .nodes(DiscoveryNodes.builder(previousState.nodes()).masterNodeId(randomAlphaOfLength(10))).incrementVersion().build(); + ZenDiscovery.validateIncomingState(logger, state, previousState); + + // now set a master node + previousState = state; + state = ClusterState.builder(previousState) + .nodes(DiscoveryNodes.builder(previousState.nodes()).masterNodeId("master")).build(); + logger.info("--> testing rejection of another master"); + try { + ZenDiscovery.validateIncomingState(logger, state, previousState); + fail("node accepted state from another master"); + } catch (IllegalStateException OK) { + assertThat(OK.toString(), containsString("cluster state from a different master than the current one, rejecting")); + } + + logger.info("--> test state from the current master is accepted"); + previousState = state; + ZenDiscovery.validateIncomingState(logger, ClusterState.builder(previousState) + .nodes(DiscoveryNodes.builder(previousState.nodes()).masterNodeId("master")).incrementVersion().build(), previousState); + + + logger.info("--> testing rejection of another cluster name"); + try { + ZenDiscovery.validateIncomingState(logger, ClusterState.builder(new ClusterName(randomAlphaOfLength(10))) + .nodes(previousState.nodes()).build(), previousState); + fail("node accepted state with another cluster name"); + } catch (IllegalStateException OK) { + assertThat(OK.toString(), containsString("received state from a node that is not part of the cluster")); + } + + logger.info("--> testing rejection of a cluster state with wrong local node"); + try { + state = ClusterState.builder(previousState) + .nodes(DiscoveryNodes.builder(previousState.nodes()).localNodeId("_non_existing_").build()) + .incrementVersion().build(); + ZenDiscovery.validateIncomingState(logger, state, previousState); + fail("node accepted state with non-existence local node"); + } catch (IllegalStateException OK) { + assertThat(OK.toString(), containsString("received state with a local node that does not match the current local node")); + } + + try { + DiscoveryNode otherNode = new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + state = ClusterState.builder(previousState).nodes( + DiscoveryNodes.builder(previousState.nodes()).add(otherNode) + .localNodeId(otherNode.getId()).build() + ).incrementVersion().build(); + ZenDiscovery.validateIncomingState(logger, state, previousState); + fail("node accepted state with existent but wrong local node"); + } catch (IllegalStateException OK) { + assertThat(OK.toString(), containsString("received state with a local node that does not match the current local node")); + } + + logger.info("--> testing acceptance of an old cluster state"); + final ClusterState incomingState = previousState; + previousState = ClusterState.builder(previousState).incrementVersion().build(); + final ClusterState finalPreviousState = previousState; + final IllegalStateException e = + expectThrows(IllegalStateException.class, () -> ZenDiscovery.validateIncomingState(logger, incomingState, finalPreviousState)); + final String message = String.format( + Locale.ROOT, + "rejecting cluster state version [%d] uuid [%s] received from [%s]", + incomingState.version(), + incomingState.stateUUID(), + incomingState.nodes().getMasterNodeId() + ); + assertThat(e, hasToString("java.lang.IllegalStateException: " + message)); + + ClusterState higherVersionState = ClusterState.builder(previousState).incrementVersion().build(); + // remove the master of the node (but still have a previous cluster state with it)! + higherVersionState = ClusterState.builder(higherVersionState) + .nodes(DiscoveryNodes.builder(higherVersionState.nodes()).masterNodeId(null)).build(); + // an older version from a *new* master is also OK! + state = ClusterState.builder(previousState) + .nodes(DiscoveryNodes.builder(previousState.nodes()).masterNodeId("_new_master_").build()) + .build(); + + ZenDiscovery.validateIncomingState(logger, state, higherVersionState); + } }