Clear responsibilities for PublishClusterStateAction and ZenDiscovery (#24772)

This commit moves some functionality from PublishClusterStateAction to ZenDiscovery, which allows each class to focus on it's core competencies:
- PendingStatesQueue is now solely managed by ZenDiscovery (no shared access by both PublishClusterStateAction and ZenDiscovery)
- Validation logic is handled exclusively by ZenDiscovery
This commit is contained in:
Yannick Welsch 2017-05-19 09:34:23 +02:00 committed by GitHub
parent 0aa380b770
commit 7054d24f0d
5 changed files with 227 additions and 195 deletions

View File

@ -341,6 +341,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
ZenDiscovery.SEND_LEAVE_REQUEST_SETTING,
ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING,
ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING,
ZenDiscovery.MAX_PENDING_CLUSTER_STATES_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT,

View File

@ -23,8 +23,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
@ -60,61 +60,53 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
public class PublishClusterStateAction extends AbstractComponent {
public static final String SEND_ACTION_NAME = "internal:discovery/zen/publish/send";
public static final String COMMIT_ACTION_NAME = "internal:discovery/zen/publish/commit";
public static final String SETTINGS_MAX_PENDING_CLUSTER_STATES = "discovery.zen.publish.max_pending_cluster_states";
public interface IncomingClusterStateListener {
public interface NewPendingClusterStateListener {
/**
* called when a new incoming cluster state has been received.
* Should validate the incoming state and throw an exception if it's not a valid successor state.
*/
void onIncomingClusterState(ClusterState incomingState);
/** a new cluster state has been committed and is ready to process via {@link #pendingStatesQueue()} */
void onNewClusterState(String reason);
/**
* called when a cluster state has been committed and is ready to be processed
*/
void onClusterStateCommitted(String stateUUID, ActionListener<Void> processedListener);
}
private final TransportService transportService;
private final NamedWriteableRegistry namedWriteableRegistry;
private final Supplier<ClusterState> clusterStateSupplier;
private final NewPendingClusterStateListener newPendingClusterStatelistener;
private final IncomingClusterStateListener incomingClusterStateListener;
private final DiscoverySettings discoverySettings;
private final ClusterName clusterName;
private final PendingClusterStatesQueue pendingStatesQueue;
public PublishClusterStateAction(
Settings settings,
TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry,
Supplier<ClusterState> clusterStateSupplier,
NewPendingClusterStateListener listener,
DiscoverySettings discoverySettings,
ClusterName clusterName) {
IncomingClusterStateListener incomingClusterStateListener,
DiscoverySettings discoverySettings) {
super(settings);
this.transportService = transportService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.clusterStateSupplier = clusterStateSupplier;
this.newPendingClusterStatelistener = listener;
this.incomingClusterStateListener = incomingClusterStateListener;
this.discoverySettings = discoverySettings;
this.clusterName = clusterName;
this.pendingStatesQueue = new PendingClusterStatesQueue(logger, settings.getAsInt(SETTINGS_MAX_PENDING_CLUSTER_STATES, 25));
transportService.registerRequestHandler(SEND_ACTION_NAME, BytesTransportRequest::new, ThreadPool.Names.SAME, false, false,
new SendClusterStateRequestHandler());
transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest::new, ThreadPool.Names.SAME, false, false,
new CommitClusterStateRequestHandler());
}
public PendingClusterStatesQueue pendingStatesQueue() {
return pendingStatesQueue;
}
/**
* publishes a cluster change event to other nodes. if at least minMasterNodes acknowledge the change it is committed and will
* be processed by the master and the other nodes.
@ -387,7 +379,7 @@ public class PublishClusterStateAction extends AbstractComponent {
final ClusterState incomingState;
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
incomingState = ClusterState.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode());
incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
request.bytes().length());
} else if (lastSeenClusterState != null) {
@ -399,10 +391,7 @@ public class PublishClusterStateAction extends AbstractComponent {
logger.debug("received diff for but don't have any local cluster state - requesting full state");
throw new IncompatibleClusterStateVersionException("have no local cluster state");
}
// sanity check incoming state
validateIncomingState(incomingState, lastSeenClusterState);
pendingStatesQueue.addPending(incomingState);
incomingClusterStateListener.onIncomingClusterState(incomingState);
lastSeenClusterState = incomingState;
}
} finally {
@ -411,56 +400,22 @@ public class PublishClusterStateAction extends AbstractComponent {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
// package private for testing
/**
* does simple sanity check of the incoming cluster state. Throws an exception on rejections.
*/
void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClusterState) {
final ClusterName incomingClusterName = incomingState.getClusterName();
if (!incomingClusterName.equals(this.clusterName)) {
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]",
incomingState.nodes().getMasterNode(), incomingClusterName);
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
final ClusterState clusterState = clusterStateSupplier.get();
if (clusterState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen",
incomingState.nodes().getMasterNode());
throw new IllegalStateException("received state with a local node that does not match the current local node");
}
if (ZenDiscovery.shouldIgnoreOrRejectNewClusterState(logger, clusterState, incomingState)) {
String message = String.format(
Locale.ROOT,
"rejecting cluster state version [%d] uuid [%s] received from [%s]",
incomingState.version(),
incomingState.stateUUID(),
incomingState.nodes().getMasterNodeId()
);
logger.warn(message);
throw new IllegalStateException(message);
}
}
protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {
final ClusterState state = pendingStatesQueue.markAsCommitted(request.stateUUID,
new PendingClusterStatesQueue.StateProcessedListener() {
incomingClusterStateListener.onClusterStateCommitted(request.stateUUID, new ActionListener<Void>() {
@Override
public void onNewClusterStateProcessed() {
public void onResponse(Void ignore) {
try {
// send a response to the master to indicate that this cluster state has been processed post committing it.
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Exception e) {
logger.debug("failed to send response on cluster state processed", e);
onNewClusterStateFailed(e);
onFailure(e);
}
}
@Override
public void onNewClusterStateFailed(Exception e) {
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
@ -469,10 +424,6 @@ public class PublishClusterStateAction extends AbstractComponent {
}
}
});
if (state != null) {
newPendingClusterStatelistener.onNewClusterState("master " + state.nodes().getMasterNode() +
" committed version [" + state.version() + "]");
}
}
private class SendClusterStateRequestHandler implements TransportRequestHandler<BytesTransportRequest> {

View File

@ -25,6 +25,7 @@ import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -56,6 +57,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.PublishClusterStateAction.IncomingClusterStateListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
@ -82,7 +84,7 @@ import java.util.stream.Collectors;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider {
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {
public static final Setting<TimeValue> PING_TIMEOUT_SETTING =
Setting.positiveTimeSetting("discovery.zen.ping_timeout", timeValueSeconds(3), Property.NodeScope);
@ -104,6 +106,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
Property.NodeScope);
public static final Setting<Boolean> MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING =
Setting.boolSetting("discovery.zen.master_election.ignore_non_master_pings", false, Property.NodeScope);
public static final Setting<Integer> MAX_PENDING_CLUSTER_STATES_SETTING =
Setting.intSetting("discovery.zen.publish.max_pending_cluster_states", 25, 1, Property.NodeScope);
public static final String DISCOVERY_REJOIN_ACTION_NAME = "internal:discovery/zen/rejoin";
@ -139,6 +143,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
private final JoinThreadControl joinThreadControl;
private final PendingClusterStatesQueue pendingStatesQueue;
private final NodeJoinController nodeJoinController;
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
@ -197,16 +203,15 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
this.masterFD.addListener(new MasterNodeFailureListener());
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);
this.nodesFD.addListener(new NodeFaultDetectionListener());
this.pendingStatesQueue = new PendingClusterStatesQueue(logger, MAX_PENDING_CLUSTER_STATES_SETTING.get(settings));
this.publishClusterState =
new PublishClusterStateAction(
settings,
transportService,
namedWriteableRegistry,
this::clusterState,
new NewPendingClusterStateListener(),
discoverySettings,
clusterName);
this,
discoverySettings);
this.membership = new MembershipAction(settings, transportService, new MembershipListener());
this.joinThreadControl = new JoinThreadControl();
@ -311,7 +316,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
throw new FailedToCommitClusterStateException("state was mutated while calculating new CS update");
}
publishClusterState.pendingStatesQueue().addPending(newState);
pendingStatesQueue.addPending(newState);
try {
publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
@ -321,7 +326,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
newState.version(), electMaster.minimumMasterNodes());
synchronized (stateMutex) {
publishClusterState.pendingStatesQueue().failAllStatesAndClear(
pendingStatesQueue.failAllStatesAndClear(
new ElasticsearchException("failed to publish cluster state"));
rejoin("zen-disco-failed-to-publish");
@ -332,7 +337,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
final DiscoveryNode localNode = newState.getNodes().getLocalNode();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean processedOrFailed = new AtomicBoolean();
publishClusterState.pendingStatesQueue().markAsCommitted(newState.stateUUID(),
pendingStatesQueue.markAsCommitted(newState.stateUUID(),
new PendingClusterStatesQueue.StateProcessedListener() {
@Override
public void onNewClusterStateProcessed() {
@ -391,7 +396,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
@Override
public DiscoveryStats stats() {
PendingClusterStateStats queueStats = publishClusterState.pendingStatesQueue().stats();
PendingClusterStateStats queueStats = pendingStatesQueue.stats();
return new DiscoveryStats(queueStats);
}
@ -409,11 +414,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
// used for testing
public ClusterState[] pendingClusterStates() {
return publishClusterState.pendingStatesQueue().pendingClusterStates();
return pendingStatesQueue.pendingClusterStates();
}
PendingClusterStatesQueue pendingClusterStatesQueue() {
return publishClusterState.pendingStatesQueue();
return pendingStatesQueue;
}
/**
@ -703,7 +708,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
synchronized (stateMutex) {
if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {
// flush any pending cluster states from old master, so it will not be set as master again
publishClusterState.pendingStatesQueue().failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));
pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));
rejoin("master left (reason = " + reason + ")");
}
}
@ -713,7 +718,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
boolean processNextCommittedClusterState(String reason) {
assert Thread.holdsLock(stateMutex);
final ClusterState newClusterState = publishClusterState.pendingStatesQueue().getNextClusterStateToProcess();
final ClusterState newClusterState = pendingStatesQueue.getNextClusterStateToProcess();
final ClusterState currentState = committedState.get();
final ClusterState adaptedNewClusterState;
// all pending states have been processed
@ -742,7 +747,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
}
} catch (Exception e) {
try {
publishClusterState.pendingStatesQueue().markAsFailed(newClusterState, e);
pendingStatesQueue.markAsFailed(newClusterState, e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner);
@ -811,7 +816,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
try {
publishClusterState.pendingStatesQueue().markAsProcessed(newClusterState);
pendingStatesQueue.markAsProcessed(newClusterState);
} catch (Exception e) {
onFailure(source, e);
}
@ -823,7 +828,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
try {
// TODO: use cluster state uuid instead of full cluster state so that we don't keep reference to CS around
// for too long.
publishClusterState.pendingStatesQueue().markAsFailed(newClusterState, e);
pendingStatesQueue.markAsFailed(newClusterState, e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner);
@ -1066,16 +1071,64 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
}
}
private class NewPendingClusterStateListener implements PublishClusterStateAction.NewPendingClusterStateListener {
@Override
public void onIncomingClusterState(ClusterState incomingState) {
validateIncomingState(logger, incomingState, committedState.get());
pendingStatesQueue.addPending(incomingState);
}
@Override
public void onNewClusterState(String reason) {
@Override
public void onClusterStateCommitted(String stateUUID, ActionListener<Void> processedListener) {
final ClusterState state = pendingStatesQueue.markAsCommitted(stateUUID,
new PendingClusterStatesQueue.StateProcessedListener() {
@Override
public void onNewClusterStateProcessed() {
processedListener.onResponse(null);
}
@Override
public void onNewClusterStateFailed(Exception e) {
processedListener.onFailure(e);
}
});
if (state != null) {
synchronized (stateMutex) {
processNextCommittedClusterState(reason);
processNextCommittedClusterState("master " + state.nodes().getMasterNode() +
" committed version [" + state.version() + "]");
}
}
}
/**
* does simple sanity check of the incoming cluster state. Throws an exception on rejections.
*/
static void validateIncomingState(Logger logger, ClusterState incomingState, ClusterState lastState) {
final ClusterName incomingClusterName = incomingState.getClusterName();
if (!incomingClusterName.equals(lastState.getClusterName())) {
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]",
incomingState.nodes().getMasterNode(), incomingClusterName);
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
if (lastState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen",
incomingState.nodes().getMasterNode());
throw new IllegalStateException("received state with a local node that does not match the current local node");
}
if (shouldIgnoreOrRejectNewClusterState(logger, lastState, incomingState)) {
String message = String.format(
Locale.ROOT,
"rejecting cluster state version [%d] uuid [%s] received from [%s]",
incomingState.version(),
incomingState.stateUUID(),
incomingState.nodes().getMasterNodeId()
);
logger.warn(message);
throw new IllegalStateException(message);
}
}
private class MembershipListener implements MembershipAction.MembershipListener {
@Override
public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) {

View File

@ -22,6 +22,7 @@ package org.elasticsearch.discovery.zen;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
@ -64,20 +65,17 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -90,11 +88,12 @@ public class PublishClusterStateActionTests extends ESTestCase {
protected ThreadPool threadPool;
protected Map<String, MockNode> nodes = new HashMap<>();
public static class MockNode implements PublishClusterStateAction.NewPendingClusterStateListener {
public static class MockNode implements PublishClusterStateAction.IncomingClusterStateListener {
public final DiscoveryNode discoveryNode;
public final MockTransportService service;
public MockPublishAction action;
public final ClusterStateListener listener;
private final PendingClusterStatesQueue pendingStatesQueue;
public volatile ClusterState clusterState;
@ -108,6 +107,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
this.logger = logger;
this.clusterState = ClusterState.builder(CLUSTER_NAME).nodes(DiscoveryNodes.builder()
.add(discoveryNode).localNodeId(discoveryNode.getId()).build()).build();
this.pendingStatesQueue = new PendingClusterStatesQueue(logger, 25);
}
public MockNode setAsMaster() {
@ -128,18 +128,37 @@ public class PublishClusterStateActionTests extends ESTestCase {
}
@Override
public void onNewClusterState(String reason) {
ClusterState newClusterState = action.pendingStatesQueue().getNextClusterStateToProcess();
logger.debug("[{}] received version [{}], uuid [{}]",
discoveryNode.getName(), newClusterState.version(), newClusterState.stateUUID());
if (listener != null) {
ClusterChangedEvent event = new ClusterChangedEvent("", newClusterState, clusterState);
listener.clusterChanged(event);
public void onIncomingClusterState(ClusterState incomingState) {
ZenDiscovery.validateIncomingState(logger, incomingState, clusterState);
pendingStatesQueue.addPending(incomingState);
}
public void onClusterStateCommitted(String stateUUID, ActionListener<Void> processedListener) {
final ClusterState state = pendingStatesQueue.markAsCommitted(stateUUID,
new PendingClusterStatesQueue.StateProcessedListener() {
@Override
public void onNewClusterStateProcessed() {
processedListener.onResponse(null);
}
@Override
public void onNewClusterStateFailed(Exception e) {
processedListener.onFailure(e);
}
});
if (state != null) {
ClusterState newClusterState = pendingStatesQueue.getNextClusterStateToProcess();
logger.debug("[{}] received version [{}], uuid [{}]",
discoveryNode.getName(), newClusterState.version(), newClusterState.stateUUID());
if (listener != null) {
ClusterChangedEvent event = new ClusterChangedEvent("", newClusterState, clusterState);
listener.clusterChanged(event);
}
if (clusterState.nodes().getMasterNode() == null || newClusterState.supersedes(clusterState)) {
clusterState = newClusterState;
}
pendingStatesQueue.markAsProcessed(newClusterState);
}
if (clusterState.nodes().getMasterNode() == null || newClusterState.supersedes(clusterState)) {
clusterState = newClusterState;
}
action.pendingStatesQueue().markAsProcessed(newClusterState);
}
public DiscoveryNodes nodes() {
@ -168,7 +187,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
MockTransportService service = buildTransportService(settings, threadPool);
DiscoveryNode discoveryNode = service.getLocalDiscoNode();
MockNode node = new MockNode(discoveryNode, service, listener, logger);
node.action = buildPublishClusterStateAction(settings, service, () -> node.clusterState, node);
node.action = buildPublishClusterStateAction(settings, service, node);
final CountDownLatch latch = new CountDownLatch(nodes.size() * 2);
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
@Override
@ -241,8 +260,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
private static MockPublishAction buildPublishClusterStateAction(
Settings settings,
MockTransportService transportService,
Supplier<ClusterState> clusterStateSupplier,
PublishClusterStateAction.NewPendingClusterStateListener listener
PublishClusterStateAction.IncomingClusterStateListener listener
) {
DiscoverySettings discoverySettings =
new DiscoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
@ -251,10 +269,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
settings,
transportService,
namedWriteableRegistry,
clusterStateSupplier,
listener,
discoverySettings,
CLUSTER_NAME);
discoverySettings);
}
public void testSimpleClusterStatePublishing() throws Exception {
@ -607,86 +623,6 @@ public class PublishClusterStateActionTests extends ESTestCase {
}
}
public void testIncomingClusterStateValidation() throws Exception {
MockNode node = createMockNode("node");
logger.info("--> testing acceptances of any master when having no master");
ClusterState state = ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId(randomAlphaOfLength(10))).incrementVersion().build();
node.action.validateIncomingState(state, null);
// now set a master node
node.clusterState = ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build();
logger.info("--> testing rejection of another master");
try {
node.action.validateIncomingState(state, node.clusterState);
fail("node accepted state from another master");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("cluster state from a different master than the current one, rejecting"));
}
logger.info("--> test state from the current master is accepted");
node.action.validateIncomingState(ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).incrementVersion().build(), node.clusterState);
logger.info("--> testing rejection of another cluster name");
try {
node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAlphaOfLength(10)))
.nodes(node.nodes()).build(), node.clusterState);
fail("node accepted state with another cluster name");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("received state from a node that is not part of the cluster"));
}
logger.info("--> testing rejection of a cluster state with wrong local node");
try {
state = ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.nodes()).localNodeId("_non_existing_").build())
.incrementVersion().build();
node.action.validateIncomingState(state, node.clusterState);
fail("node accepted state with non-existence local node");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("received state with a local node that does not match the current local node"));
}
try {
MockNode otherNode = createMockNode("otherNode");
state = ClusterState.builder(node.clusterState).nodes(
DiscoveryNodes.builder(node.nodes()).add(otherNode.discoveryNode).localNodeId(otherNode.discoveryNode.getId()).build()
).incrementVersion().build();
node.action.validateIncomingState(state, node.clusterState);
fail("node accepted state with existent but wrong local node");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("received state with a local node that does not match the current local node"));
}
logger.info("--> testing acceptance of an old cluster state");
final ClusterState incomingState = node.clusterState;
node.clusterState = ClusterState.builder(node.clusterState).incrementVersion().build();
final IllegalStateException e =
expectThrows(IllegalStateException.class, () -> node.action.validateIncomingState(incomingState, node.clusterState));
final String message = String.format(
Locale.ROOT,
"rejecting cluster state version [%d] uuid [%s] received from [%s]",
incomingState.version(),
incomingState.stateUUID(),
incomingState.nodes().getMasterNodeId()
);
assertThat(e, hasToString("java.lang.IllegalStateException: " + message));
// an older version from a *new* master is also OK!
ClusterState previousState = ClusterState.builder(node.clusterState).incrementVersion().build();
state = ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.clusterState.nodes()).masterNodeId("_new_master_").build())
.build();
// remove the master of the node (but still have a previous cluster state with it)!
node.resetMasterId();
node.action.validateIncomingState(state, previousState);
}
public void testOutOfOrderCommitMessages() throws Throwable {
MockNode node = createMockNode("node").setAsMaster();
final CapturingTransportChannel channel = new CapturingTransportChannel();
@ -874,9 +810,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
AtomicBoolean errorOnCommit = new AtomicBoolean();
public MockPublishAction(Settings settings, TransportService transportService, NamedWriteableRegistry namedWriteableRegistry,
Supplier<ClusterState> clusterStateSupplier, NewPendingClusterStateListener listener,
DiscoverySettings discoverySettings, ClusterName clusterName) {
super(settings, transportService, namedWriteableRegistry, clusterStateSupplier, listener, discoverySettings, clusterName);
IncomingClusterStateListener listener, DiscoverySettings discoverySettings) {
super(settings, transportService, namedWriteableRegistry, listener, discoverySettings);
}
@Override

View File

@ -67,6 +67,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -88,6 +89,7 @@ import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
public class ZenDiscoveryUnitTests extends ESTestCase {
@ -405,4 +407,94 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
}
}
}
public void testIncomingClusterStateValidation() throws Exception {
ClusterName clusterName = new ClusterName("abc");
DiscoveryNodes.Builder currentNodes = DiscoveryNodes.builder().add(
new DiscoveryNode("a", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT)).localNodeId("a");
ClusterState previousState = ClusterState.builder(clusterName).nodes(currentNodes).build();
logger.info("--> testing acceptances of any master when having no master");
ClusterState state = ClusterState.builder(previousState)
.nodes(DiscoveryNodes.builder(previousState.nodes()).masterNodeId(randomAlphaOfLength(10))).incrementVersion().build();
ZenDiscovery.validateIncomingState(logger, state, previousState);
// now set a master node
previousState = state;
state = ClusterState.builder(previousState)
.nodes(DiscoveryNodes.builder(previousState.nodes()).masterNodeId("master")).build();
logger.info("--> testing rejection of another master");
try {
ZenDiscovery.validateIncomingState(logger, state, previousState);
fail("node accepted state from another master");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("cluster state from a different master than the current one, rejecting"));
}
logger.info("--> test state from the current master is accepted");
previousState = state;
ZenDiscovery.validateIncomingState(logger, ClusterState.builder(previousState)
.nodes(DiscoveryNodes.builder(previousState.nodes()).masterNodeId("master")).incrementVersion().build(), previousState);
logger.info("--> testing rejection of another cluster name");
try {
ZenDiscovery.validateIncomingState(logger, ClusterState.builder(new ClusterName(randomAlphaOfLength(10)))
.nodes(previousState.nodes()).build(), previousState);
fail("node accepted state with another cluster name");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("received state from a node that is not part of the cluster"));
}
logger.info("--> testing rejection of a cluster state with wrong local node");
try {
state = ClusterState.builder(previousState)
.nodes(DiscoveryNodes.builder(previousState.nodes()).localNodeId("_non_existing_").build())
.incrementVersion().build();
ZenDiscovery.validateIncomingState(logger, state, previousState);
fail("node accepted state with non-existence local node");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("received state with a local node that does not match the current local node"));
}
try {
DiscoveryNode otherNode = new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
state = ClusterState.builder(previousState).nodes(
DiscoveryNodes.builder(previousState.nodes()).add(otherNode)
.localNodeId(otherNode.getId()).build()
).incrementVersion().build();
ZenDiscovery.validateIncomingState(logger, state, previousState);
fail("node accepted state with existent but wrong local node");
} catch (IllegalStateException OK) {
assertThat(OK.toString(), containsString("received state with a local node that does not match the current local node"));
}
logger.info("--> testing acceptance of an old cluster state");
final ClusterState incomingState = previousState;
previousState = ClusterState.builder(previousState).incrementVersion().build();
final ClusterState finalPreviousState = previousState;
final IllegalStateException e =
expectThrows(IllegalStateException.class, () -> ZenDiscovery.validateIncomingState(logger, incomingState, finalPreviousState));
final String message = String.format(
Locale.ROOT,
"rejecting cluster state version [%d] uuid [%s] received from [%s]",
incomingState.version(),
incomingState.stateUUID(),
incomingState.nodes().getMasterNodeId()
);
assertThat(e, hasToString("java.lang.IllegalStateException: " + message));
ClusterState higherVersionState = ClusterState.builder(previousState).incrementVersion().build();
// remove the master of the node (but still have a previous cluster state with it)!
higherVersionState = ClusterState.builder(higherVersionState)
.nodes(DiscoveryNodes.builder(higherVersionState.nodes()).masterNodeId(null)).build();
// an older version from a *new* master is also OK!
state = ClusterState.builder(previousState)
.nodes(DiscoveryNodes.builder(previousState.nodes()).masterNodeId("_new_master_").build())
.build();
ZenDiscovery.validateIncomingState(logger, state, higherVersionState);
}
}