Validate a joining node's version with version of existing cluster nodes (#25808)
When a node tries to join a cluster, it goes through a validation step to make sure the node is compatible with the cluster. Currently we validation that the node can read the cluster state and that it is compatible with the indexes of the cluster. This PR adds validation that the joining node's version is compatible with the versions of existing nodes. Concretely we check that: 1) The node's min compatible version is higher or equal to any node in the cluster (this prevents a too-new node from joining) 2) The node's version is higher or equal to the min compat version of all cluster nodes (this prevents a too old join where, for example, the master is on 5.6, there's another 6.0 node in the cluster and a 5.4 node tries to join). 3) The node's major version is at least as higher as the lowest node in the cluster. This is important as we use the minimum version in the cluster to stop executing bwc code for operations that require multiple nodes. If the nodes are already operating in "new cluster mode", we should prevent nodes from the previous major to join (even if they are wire level compatible). This does mean that if you have a very unlucky partition during the upgrade which partitions all old nodes which are also a minority / data nodes only, the may not be able to re-join the cluster. We feel this edge case risk is well worth the simplification it brings to BWC layers only going one way. This restriction only holds if the cluster state has been recovered (i.e., the cluster has properly formed). Also, the node join validation can now selectively fail specific nodes (previously the entire batch was failed). This is an important preparation for a follow up PR where we plan to have a rejected joining node die with dignity.
This commit is contained in:
parent
de6ad7a704
commit
7488877d1a
|
@ -667,10 +667,8 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
|
|||
ImmutableOpenMap.Builder<String, DiscoveryNode> dataNodesBuilder = ImmutableOpenMap.builder();
|
||||
ImmutableOpenMap.Builder<String, DiscoveryNode> masterNodesBuilder = ImmutableOpenMap.builder();
|
||||
ImmutableOpenMap.Builder<String, DiscoveryNode> ingestNodesBuilder = ImmutableOpenMap.builder();
|
||||
Version minNodeVersion = Version.CURRENT;
|
||||
Version maxNodeVersion = Version.CURRENT;
|
||||
// The node where we are building this on might not be a master or a data node, so we cannot assume
|
||||
// that there is a node with the current version as a part of the cluster.
|
||||
Version minNodeVersion = null;
|
||||
Version maxNodeVersion = null;
|
||||
Version minNonClientNodeVersion = null;
|
||||
Version maxNonClientNodeVersion = null;
|
||||
for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) {
|
||||
|
@ -680,26 +678,29 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
|
|||
if (nodeEntry.value.isMasterNode()) {
|
||||
masterNodesBuilder.put(nodeEntry.key, nodeEntry.value);
|
||||
}
|
||||
final Version version = nodeEntry.value.getVersion();
|
||||
if (nodeEntry.value.isDataNode() || nodeEntry.value.isMasterNode()) {
|
||||
if (minNonClientNodeVersion == null) {
|
||||
minNonClientNodeVersion = nodeEntry.value.getVersion();
|
||||
maxNonClientNodeVersion = nodeEntry.value.getVersion();
|
||||
minNonClientNodeVersion = version;
|
||||
maxNonClientNodeVersion = version;
|
||||
} else {
|
||||
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
|
||||
maxNonClientNodeVersion = Version.max(maxNonClientNodeVersion, nodeEntry.value.getVersion());
|
||||
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, version);
|
||||
maxNonClientNodeVersion = Version.max(maxNonClientNodeVersion, version);
|
||||
}
|
||||
}
|
||||
if (nodeEntry.value.isIngestNode()) {
|
||||
ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value);
|
||||
}
|
||||
minNodeVersion = Version.min(minNodeVersion, nodeEntry.value.getVersion());
|
||||
maxNodeVersion = Version.max(maxNodeVersion, nodeEntry.value.getVersion());
|
||||
minNodeVersion = minNodeVersion == null ? version : Version.min(minNodeVersion, version);
|
||||
maxNodeVersion = maxNodeVersion == null ? version : Version.max(maxNodeVersion, version);
|
||||
}
|
||||
|
||||
return new DiscoveryNodes(
|
||||
nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(),
|
||||
masterNodeId, localNodeId, minNonClientNodeVersion == null ? Version.CURRENT : minNonClientNodeVersion,
|
||||
maxNonClientNodeVersion == null ? Version.CURRENT : maxNonClientNodeVersion, maxNodeVersion, minNodeVersion
|
||||
maxNonClientNodeVersion == null ? Version.CURRENT : maxNonClientNodeVersion,
|
||||
maxNodeVersion == null ? Version.CURRENT : maxNodeVersion,
|
||||
minNodeVersion == null ? Version.CURRENT : minNodeVersion
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -178,6 +179,7 @@ public class MembershipAction extends AbstractComponent {
|
|||
|
||||
@Override
|
||||
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
|
||||
ensureNodesCompatibility(Version.CURRENT, request.state.getNodes());
|
||||
ensureIndexCompatibility(Version.CURRENT, request.state.getMetaData());
|
||||
// for now, the mere fact that we can serialize the cluster state acts as validation....
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
|
@ -207,6 +209,39 @@ public class MembershipAction extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
/** ensures that the joining node has a version that's compatible with all current nodes*/
|
||||
static void ensureNodesCompatibility(final Version joiningNodeVersion, DiscoveryNodes currentNodes) {
|
||||
final Version minNodeVersion = currentNodes.getMinNodeVersion();
|
||||
final Version maxNodeVersion = currentNodes.getMaxNodeVersion();
|
||||
ensureNodesCompatibility(joiningNodeVersion, minNodeVersion, maxNodeVersion);
|
||||
}
|
||||
|
||||
/** ensures that the joining node has a version that's compatible with a given version range */
|
||||
static void ensureNodesCompatibility(Version joiningNodeVersion, Version minClusterNodeVersion, Version maxClusterNodeVersion) {
|
||||
assert minClusterNodeVersion.onOrBefore(maxClusterNodeVersion) : minClusterNodeVersion + " > " + maxClusterNodeVersion;
|
||||
if (joiningNodeVersion.isCompatible(maxClusterNodeVersion) == false) {
|
||||
throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " +
|
||||
"The cluster contains nodes with version [" + maxClusterNodeVersion + "], which is incompatible.");
|
||||
}
|
||||
if (joiningNodeVersion.isCompatible(minClusterNodeVersion) == false) {
|
||||
throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported." +
|
||||
"The cluster contains nodes with version [" + minClusterNodeVersion + "], which is incompatible.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ensures that the joining node's major version is equal or higher to the minClusterNodeVersion. This is needed
|
||||
* to ensure that if the master is already fully operating under the new major version, it doesn't go back to mixed
|
||||
* version mode
|
||||
**/
|
||||
static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version minClusterNodeVersion) {
|
||||
final byte clusterMajor = minClusterNodeVersion.major;
|
||||
if (joiningNodeVersion.major < clusterMajor) {
|
||||
throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " +
|
||||
"All nodes in the cluster are of a higher major [" + clusterMajor + "].");
|
||||
}
|
||||
}
|
||||
|
||||
public static class LeaveRequest extends TransportRequest {
|
||||
|
||||
private DiscoveryNode node;
|
||||
|
|
|
@ -33,12 +33,12 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -50,6 +50,8 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||
|
||||
/**
|
||||
* This class processes incoming join request (passed zia {@link ZenDiscovery}). Incoming nodes
|
||||
* are directly added to the cluster state or are accumulated during master election.
|
||||
|
@ -433,28 +435,36 @@ public class NodeJoinController extends AbstractComponent {
|
|||
|
||||
assert nodesBuilder.isLocalNodeElectedMaster();
|
||||
|
||||
Version minNodeVersion = Version.CURRENT;
|
||||
Version minClusterNodeVersion = newState.nodes().getMinNodeVersion();
|
||||
Version maxClusterNodeVersion = newState.nodes().getMaxNodeVersion();
|
||||
// we only enforce major version transitions on a fully formed clusters
|
||||
final boolean enforceMajorVersion = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false;
|
||||
// processing any joins
|
||||
for (final DiscoveryNode node : joiningNodes) {
|
||||
minNodeVersion = Version.min(minNodeVersion, node.getVersion());
|
||||
if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) {
|
||||
// noop
|
||||
} else if (currentNodes.nodeExists(node)) {
|
||||
logger.debug("received a join request for an existing node [{}]", node);
|
||||
} else {
|
||||
try {
|
||||
if (enforceMajorVersion) {
|
||||
MembershipAction.ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion);
|
||||
}
|
||||
MembershipAction.ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
|
||||
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
|
||||
// we have to reject nodes that don't support all indices we have in this cluster
|
||||
MembershipAction.ensureIndexCompatibility(node.getVersion(), currentState.getMetaData());
|
||||
nodesBuilder.add(node);
|
||||
nodesChanged = true;
|
||||
} catch (IllegalArgumentException e) {
|
||||
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
|
||||
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
|
||||
} catch (IllegalArgumentException | IllegalStateException e) {
|
||||
results.failure(node, e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
results.success(node);
|
||||
}
|
||||
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
|
||||
// we have to reject nodes that don't support all indices we have in this cluster
|
||||
MembershipAction.ensureIndexCompatibility(minNodeVersion, currentState.getMetaData());
|
||||
if (nodesChanged) {
|
||||
newState.nodes(nodesBuilder);
|
||||
return results.build(allocationService.reroute(newState.build(), "node_join"));
|
||||
|
|
|
@ -885,7 +885,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
} else {
|
||||
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
|
||||
// to ensure we fail as fast as possible.
|
||||
MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes());
|
||||
MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData());
|
||||
if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
|
||||
MembershipAction.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion());
|
||||
}
|
||||
// try and connect to the node, if it fails, we can raise an exception back to the client...
|
||||
transportService.connectToNode(node);
|
||||
|
||||
|
|
|
@ -27,14 +27,18 @@ import org.elasticsearch.test.VersionUtils;
|
|||
import org.hamcrest.Matchers;
|
||||
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.Version.V_5_3_0;
|
||||
import static org.elasticsearch.Version.V_6_0_0_beta1;
|
||||
import static org.elasticsearch.test.VersionUtils.allVersions;
|
||||
import static org.elasticsearch.test.VersionUtils.randomVersion;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
@ -343,6 +347,20 @@ public class VersionTests extends ESTestCase {
|
|||
isCompatible(VersionUtils.getPreviousMinorVersion(), Version.fromString("7.0.0")));
|
||||
assertFalse(isCompatible(Version.V_5_0_0, Version.fromString("6.0.0")));
|
||||
assertFalse(isCompatible(Version.V_5_0_0, Version.fromString("7.0.0")));
|
||||
|
||||
Version a = randomVersion(random());
|
||||
Version b = randomVersion(random());
|
||||
assertThat(a.isCompatible(b), equalTo(b.isCompatible(a)));
|
||||
}
|
||||
|
||||
/* tests that if a new version's minCompatVersion is always equal or higher to any older version */
|
||||
public void testMinCompatVersionOrderRespectsVersionOrder() {
|
||||
List<Version> versionsByMinCompat = new ArrayList<>(allVersions());
|
||||
versionsByMinCompat.sort(Comparator.comparing(Version::minimumCompatibilityVersion));
|
||||
assertThat(versionsByMinCompat, equalTo(allVersions()));
|
||||
|
||||
versionsByMinCompat.sort(Comparator.comparing(Version::minimumIndexCompatibilityVersion));
|
||||
assertThat(versionsByMinCompat, equalTo(allVersions()));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -21,10 +21,23 @@ package org.elasticsearch.discovery.zen;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.test.VersionUtils.allVersions;
|
||||
import static org.elasticsearch.test.VersionUtils.getPreviousVersion;
|
||||
import static org.elasticsearch.test.VersionUtils.incompatibleFutureVersion;
|
||||
import static org.elasticsearch.test.VersionUtils.maxCompatibleVersion;
|
||||
import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion;
|
||||
import static org.elasticsearch.test.VersionUtils.randomVersion;
|
||||
import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
|
||||
|
||||
public class MembershipActionTests extends ESTestCase {
|
||||
|
||||
public void testPreventJoinClusterWithNewerIndices() {
|
||||
|
@ -58,6 +71,55 @@ public class MembershipActionTests extends ESTestCase {
|
|||
metaData));
|
||||
}
|
||||
|
||||
public void testPreventJoinClusterWithUnsupportedNodeVersions() {
|
||||
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
|
||||
final Version version = randomVersion(random());
|
||||
builder.add(new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), version));
|
||||
builder.add(new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), randomCompatibleVersion(random(), version)));
|
||||
DiscoveryNodes nodes = builder.build();
|
||||
|
||||
final Version maxNodeVersion = nodes.getMaxNodeVersion();
|
||||
final Version minNodeVersion = nodes.getMinNodeVersion();
|
||||
if (maxNodeVersion.onOrAfter(Version.V_6_0_0_alpha1)) {
|
||||
final Version tooLow = getPreviousVersion(maxNodeVersion.minimumCompatibilityVersion());
|
||||
expectThrows(IllegalStateException.class, () -> {
|
||||
if (randomBoolean()) {
|
||||
MembershipAction.ensureNodesCompatibility(tooLow, nodes);
|
||||
} else {
|
||||
MembershipAction.ensureNodesCompatibility(tooLow, minNodeVersion, maxNodeVersion);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (minNodeVersion.before(Version.V_5_5_0)) {
|
||||
Version tooHigh = incompatibleFutureVersion(minNodeVersion);
|
||||
expectThrows(IllegalStateException.class, () -> {
|
||||
if (randomBoolean()) {
|
||||
MembershipAction.ensureNodesCompatibility(tooHigh, nodes);
|
||||
} else {
|
||||
MembershipAction.ensureNodesCompatibility(tooHigh, minNodeVersion, maxNodeVersion);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (minNodeVersion.onOrAfter(Version.V_6_0_0_alpha1)) {
|
||||
Version oldMajor = randomFrom(allVersions().stream().filter(v -> v.major < 6).collect(Collectors.toList()));
|
||||
expectThrows(IllegalStateException.class, () -> MembershipAction.ensureMajorVersionBarrier(oldMajor, minNodeVersion));
|
||||
}
|
||||
|
||||
final Version minGoodVersion = maxNodeVersion.major == minNodeVersion.major ?
|
||||
// we have to stick with the same major
|
||||
minNodeVersion :
|
||||
maxNodeVersion.minimumCompatibilityVersion();
|
||||
final Version justGood = randomVersionBetween(random(), minGoodVersion, maxCompatibleVersion(minNodeVersion));
|
||||
|
||||
if (randomBoolean()) {
|
||||
MembershipAction.ensureNodesCompatibility(justGood, nodes);
|
||||
} else {
|
||||
MembershipAction.ensureNodesCompatibility(justGood, minNodeVersion, maxNodeVersion);
|
||||
}
|
||||
}
|
||||
|
||||
public void testSuccess() {
|
||||
Settings.builder().build();
|
||||
MetaData.Builder metaBuilder = MetaData.builder();
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.ExceptionsHelper;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.NotMasterException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -38,7 +39,6 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
|
|||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.cluster.service.MasterServiceTests;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
|
@ -46,7 +46,6 @@ import org.elasticsearch.common.util.concurrent.BaseFuture;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ClusterServiceUtils;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -84,6 +83,12 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
|
|||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
|
||||
import static org.elasticsearch.cluster.routing.RoutingTableTests.updateActiveAllocations;
|
||||
import static org.elasticsearch.cluster.service.MasterServiceTests.discoveryState;
|
||||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.setState;
|
||||
import static org.elasticsearch.test.VersionUtils.allVersions;
|
||||
import static org.elasticsearch.test.VersionUtils.getPreviousVersion;
|
||||
import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -566,7 +571,7 @@ public class NodeJoinControllerTests extends ESTestCase {
|
|||
randomBoolean() ? existing.getAddress() : buildNewFakeTransportAddress(),
|
||||
randomBoolean() ? existing.getAttributes() : Collections.singletonMap("attr", "other"),
|
||||
randomBoolean() ? existing.getRoles() : new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))),
|
||||
randomBoolean() ? existing.getVersion() : VersionUtils.randomVersion(random()));
|
||||
existing.getVersion());
|
||||
|
||||
ExecutionException e = expectThrows(ExecutionException.class, () -> joinNode(other_node));
|
||||
assertThat(e.getMessage(), containsString("found existing node"));
|
||||
|
@ -585,6 +590,101 @@ public class NodeJoinControllerTests extends ESTestCase {
|
|||
assertThat(e.getMessage(), containsString("found existing node"));
|
||||
}
|
||||
|
||||
public void testRejectingJoinWithIncompatibleVersion() throws InterruptedException, ExecutionException {
|
||||
addNodes(randomInt(5));
|
||||
final Version badVersion;
|
||||
if (randomBoolean()) {
|
||||
badVersion = getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion());
|
||||
} else {
|
||||
badVersion = randomFrom(allVersions().stream().filter(v -> v.major < Version.CURRENT.major).collect(Collectors.toList()));
|
||||
}
|
||||
final DiscoveryNode badNode = new DiscoveryNode("badNode", buildNewFakeTransportAddress(), emptyMap(),
|
||||
new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))), badVersion);
|
||||
|
||||
final Version goodVersion =
|
||||
randomFrom(allVersions().stream().filter(v -> v.major >= Version.CURRENT.major).collect(Collectors.toList()));
|
||||
final DiscoveryNode goodNode = new DiscoveryNode("goodNode", buildNewFakeTransportAddress(), emptyMap(),
|
||||
new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))), goodVersion);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
// block cluster state
|
||||
masterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
latch.await();
|
||||
return currentState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
|
||||
final SimpleFuture badJoin;
|
||||
final SimpleFuture goodJoin;
|
||||
if (randomBoolean()) {
|
||||
badJoin = joinNodeAsync(badNode);
|
||||
goodJoin = joinNodeAsync(goodNode);
|
||||
} else {
|
||||
goodJoin = joinNodeAsync(goodNode);
|
||||
badJoin = joinNodeAsync(badNode);
|
||||
}
|
||||
assert goodJoin.isDone() == false;
|
||||
assert badJoin.isDone() == false;
|
||||
latch.countDown();
|
||||
goodJoin.get();
|
||||
ExecutionException e = expectThrows(ExecutionException.class, badJoin::get);
|
||||
assertThat(e.getCause(), instanceOf(IllegalStateException.class));
|
||||
assertThat(e.getCause().getMessage(), allOf(containsString("node version"), containsString("not supported")));
|
||||
}
|
||||
|
||||
public void testRejectingJoinWithIncompatibleVersionWithUnrecoveredState() throws InterruptedException, ExecutionException {
|
||||
addNodes(randomInt(5));
|
||||
ClusterState.Builder builder = ClusterState.builder(discoveryState(masterService));
|
||||
builder.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK));
|
||||
setState(masterService, builder.build());
|
||||
final Version badVersion = getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion());
|
||||
final DiscoveryNode badNode = new DiscoveryNode("badNode", buildNewFakeTransportAddress(), emptyMap(),
|
||||
new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))), badVersion);
|
||||
|
||||
final Version goodVersion = randomFrom(randomCompatibleVersion(random(), Version.CURRENT));
|
||||
final DiscoveryNode goodNode = new DiscoveryNode("goodNode", buildNewFakeTransportAddress(), emptyMap(),
|
||||
new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))), goodVersion);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
// block cluster state
|
||||
masterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
latch.await();
|
||||
return currentState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
|
||||
final SimpleFuture badJoin;
|
||||
final SimpleFuture goodJoin;
|
||||
if (randomBoolean()) {
|
||||
badJoin = joinNodeAsync(badNode);
|
||||
goodJoin = joinNodeAsync(goodNode);
|
||||
} else {
|
||||
goodJoin = joinNodeAsync(goodNode);
|
||||
badJoin = joinNodeAsync(badNode);
|
||||
}
|
||||
assert goodJoin.isDone() == false;
|
||||
assert badJoin.isDone() == false;
|
||||
latch.countDown();
|
||||
goodJoin.get();
|
||||
ExecutionException e = expectThrows(ExecutionException.class, badJoin::get);
|
||||
assertThat(e.getCause(), instanceOf(IllegalStateException.class));
|
||||
assertThat(e.getCause().getMessage(), allOf(containsString("node version"), containsString("not supported")));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests tha node can become a master, even though the last cluster state it knows contains
|
||||
* nodes that conflict with the joins it got and needs to become a master
|
||||
|
|
|
@ -197,3 +197,21 @@ recovery has completed.
|
|||
When the cluster is stable and the node has recovered, repeat the above steps
|
||||
for all remaining nodes.
|
||||
--
|
||||
|
||||
[IMPORTANT]
|
||||
====================================================
|
||||
|
||||
During a rolling upgrade the cluster will continue to operate as normal. Any
|
||||
new functionality will be disabled or work in a backward compatible manner
|
||||
until all nodes of the cluster have been upgraded. Once the upgrade is
|
||||
completed and all nodes are on the new version, the new functionality will
|
||||
become operational. Once that has happened, it is practically impossible to
|
||||
go back to operating in a backward compatible mode. To protect against such a
|
||||
scenario, nodes from the previous major version (e.g. 5.x) will not be allowed
|
||||
to join a cluster where all nodes are of a higher major version (e.g. 6.x).
|
||||
|
||||
In the unlikely case of a network malfunction during upgrades, where all
|
||||
remaining old nodes are isolated from the cluster, you will have to take all
|
||||
old nodes offline and upgrade them before they can rejoin the cluster.
|
||||
|
||||
====================================================
|
|
@ -29,7 +29,9 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
|
@ -189,6 +191,12 @@ public class VersionUtils {
|
|||
return ALL_VERSIONS.get(random.nextInt(ALL_VERSIONS.size()));
|
||||
}
|
||||
|
||||
/** Returns a random {@link Version} from all available versions, that is compatible with the given version. */
|
||||
public static Version randomCompatibleVersion(Random random, Version version) {
|
||||
final List<Version> compatible = ALL_VERSIONS.stream().filter(version::isCompatible).collect(Collectors.toList());
|
||||
return compatible.get(random.nextInt(compatible.size()));
|
||||
}
|
||||
|
||||
/** Returns a random {@link Version} between <code>minVersion</code> and <code>maxVersion</code> (inclusive). */
|
||||
public static Version randomVersionBetween(Random random, @Nullable Version minVersion, @Nullable Version maxVersion) {
|
||||
int minVersionIndex = 0;
|
||||
|
@ -211,4 +219,20 @@ public class VersionUtils {
|
|||
return ALL_VERSIONS.get(minVersionIndex + random.nextInt(range));
|
||||
}
|
||||
}
|
||||
|
||||
/** returns the first future incompatible version */
|
||||
public static Version incompatibleFutureVersion(Version version) {
|
||||
final Optional<Version> opt = ALL_VERSIONS.stream().filter(version::before).filter(v -> v.isCompatible(version) == false).findAny();
|
||||
assert opt.isPresent() : "no future incompatible version for " + version;
|
||||
return opt.get();
|
||||
}
|
||||
|
||||
/** Returns the maximum {@link Version} that is compatible with the given version. */
|
||||
public static Version maxCompatibleVersion(Version version) {
|
||||
final List<Version> compatible = ALL_VERSIONS.stream().filter(version::isCompatible).filter(version::onOrBefore)
|
||||
.collect(Collectors.toList());
|
||||
assert compatible.size() > 0;
|
||||
return compatible.get(compatible.size() - 1);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue