diff --git a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java index 2ed88aa1127..4373069a5f7 100644 --- a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java @@ -667,10 +667,8 @@ public class DiscoveryNodes extends AbstractDiffable implements ImmutableOpenMap.Builder dataNodesBuilder = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder masterNodesBuilder = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder ingestNodesBuilder = ImmutableOpenMap.builder(); - Version minNodeVersion = Version.CURRENT; - Version maxNodeVersion = Version.CURRENT; - // The node where we are building this on might not be a master or a data node, so we cannot assume - // that there is a node with the current version as a part of the cluster. + Version minNodeVersion = null; + Version maxNodeVersion = null; Version minNonClientNodeVersion = null; Version maxNonClientNodeVersion = null; for (ObjectObjectCursor nodeEntry : nodes) { @@ -680,26 +678,29 @@ public class DiscoveryNodes extends AbstractDiffable implements if (nodeEntry.value.isMasterNode()) { masterNodesBuilder.put(nodeEntry.key, nodeEntry.value); } + final Version version = nodeEntry.value.getVersion(); if (nodeEntry.value.isDataNode() || nodeEntry.value.isMasterNode()) { if (minNonClientNodeVersion == null) { - minNonClientNodeVersion = nodeEntry.value.getVersion(); - maxNonClientNodeVersion = nodeEntry.value.getVersion(); + minNonClientNodeVersion = version; + maxNonClientNodeVersion = version; } else { - minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion()); - maxNonClientNodeVersion = Version.max(maxNonClientNodeVersion, nodeEntry.value.getVersion()); + minNonClientNodeVersion = Version.min(minNonClientNodeVersion, version); + maxNonClientNodeVersion = Version.max(maxNonClientNodeVersion, version); } } if (nodeEntry.value.isIngestNode()) { ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value); } - minNodeVersion = Version.min(minNodeVersion, nodeEntry.value.getVersion()); - maxNodeVersion = Version.max(maxNodeVersion, nodeEntry.value.getVersion()); + minNodeVersion = minNodeVersion == null ? version : Version.min(minNodeVersion, version); + maxNodeVersion = maxNodeVersion == null ? version : Version.max(maxNodeVersion, version); } return new DiscoveryNodes( nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(), masterNodeId, localNodeId, minNonClientNodeVersion == null ? Version.CURRENT : minNonClientNodeVersion, - maxNonClientNodeVersion == null ? Version.CURRENT : maxNonClientNodeVersion, maxNodeVersion, minNodeVersion + maxNonClientNodeVersion == null ? Version.CURRENT : maxNonClientNodeVersion, + maxNodeVersion == null ? Version.CURRENT : maxNodeVersion, + minNodeVersion == null ? Version.CURRENT : minNodeVersion ); } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java index 442491e6b13..de6bf6af281 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -178,6 +179,7 @@ public class MembershipAction extends AbstractComponent { @Override public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception { + ensureNodesCompatibility(Version.CURRENT, request.state.getNodes()); ensureIndexCompatibility(Version.CURRENT, request.state.getMetaData()); // for now, the mere fact that we can serialize the cluster state acts as validation.... channel.sendResponse(TransportResponse.Empty.INSTANCE); @@ -207,6 +209,31 @@ public class MembershipAction extends AbstractComponent { } } + /** ensures that the joining node has a version that's compatible with all current nodes*/ + static void ensureNodesCompatibility(final Version joiningNodeVersion, DiscoveryNodes currentNodes) { + final Version minNodeVersion = currentNodes.getMinNodeVersion(); + final Version maxNodeVersion = currentNodes.getMaxNodeVersion(); + ensureNodesCompatibility(joiningNodeVersion, minNodeVersion, maxNodeVersion); + } + + /** ensures that the joining node has a version that's compatible with a given version range */ + static void ensureNodesCompatibility(Version joiningNodeVersion, Version minClusterNodeVersion, Version maxClusterNodeVersion) { + assert minClusterNodeVersion.onOrBefore(maxClusterNodeVersion) : minClusterNodeVersion + " > " + maxClusterNodeVersion; + final byte clusterMajor = minClusterNodeVersion.major; + if (joiningNodeVersion.major < clusterMajor) { + throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " + + "All nodes in the cluster are of a higher major [" + clusterMajor + "]."); + } + if (joiningNodeVersion.isCompatible(maxClusterNodeVersion) == false) { + throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " + + "The cluster contains nodes with version [" + maxClusterNodeVersion + "], which is incompatible."); + } + if (joiningNodeVersion.isCompatible(minClusterNodeVersion) == false) { + throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported." + + "The cluster contains nodes with version [" + minClusterNodeVersion + "], which is incompatible."); + } + } + public static class LeaveRequest extends TransportRequest { private DiscoveryNode node; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index 566886ecdb6..70837dadf2b 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -33,12 +33,12 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.discovery.DiscoverySettings; import java.util.ArrayList; @@ -433,28 +433,31 @@ public class NodeJoinController extends AbstractComponent { assert nodesBuilder.isLocalNodeElectedMaster(); - Version minNodeVersion = Version.CURRENT; + Version minClusterNodeVersion = newState.nodes().getMinNodeVersion(); + Version maxClusterNodeVersion = newState.nodes().getMaxNodeVersion(); // processing any joins for (final DiscoveryNode node : joiningNodes) { - minNodeVersion = Version.min(minNodeVersion, node.getVersion()); if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) { // noop } else if (currentNodes.nodeExists(node)) { logger.debug("received a join request for an existing node [{}]", node); } else { try { + MembershipAction.ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion); + // we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices + // we have to reject nodes that don't support all indices we have in this cluster + MembershipAction.ensureIndexCompatibility(node.getVersion(), currentState.getMetaData()); nodesBuilder.add(node); nodesChanged = true; - } catch (IllegalArgumentException e) { + minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); + maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion()); + } catch (IllegalArgumentException | IllegalStateException e) { results.failure(node, e); continue; } } results.success(node); } - // we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices - // we have to reject nodes that don't support all indices we have in this cluster - MembershipAction.ensureIndexCompatibility(minNodeVersion, currentState.getMetaData()); if (nodesChanged) { newState.nodes(nodesBuilder); return results.build(allocationService.reroute(newState.build(), "node_join")); diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index dfbf3f780be..e086448dc3f 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -885,6 +885,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover } else { // we do this in a couple of places including the cluster update thread. This one here is really just best effort // to ensure we fail as fast as possible. + MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes()); MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData()); // try and connect to the node, if it fails, we can raise an exception back to the client... transportService.connectToNode(node); diff --git a/core/src/test/java/org/elasticsearch/VersionTests.java b/core/src/test/java/org/elasticsearch/VersionTests.java index 1c2b9da38ba..d03bbe0cc3d 100644 --- a/core/src/test/java/org/elasticsearch/VersionTests.java +++ b/core/src/test/java/org/elasticsearch/VersionTests.java @@ -27,14 +27,18 @@ import org.elasticsearch.test.VersionUtils; import org.hamcrest.Matchers; import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; import static org.elasticsearch.Version.V_5_3_0; import static org.elasticsearch.Version.V_6_0_0_beta1; +import static org.elasticsearch.test.VersionUtils.allVersions; import static org.elasticsearch.test.VersionUtils.randomVersion; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.containsString; @@ -343,6 +347,20 @@ public class VersionTests extends ESTestCase { isCompatible(VersionUtils.getPreviousMinorVersion(), Version.fromString("7.0.0"))); assertFalse(isCompatible(Version.V_5_0_0, Version.fromString("6.0.0"))); assertFalse(isCompatible(Version.V_5_0_0, Version.fromString("7.0.0"))); + + Version a = randomVersion(random()); + Version b = randomVersion(random()); + assertThat(a.isCompatible(b), equalTo(b.isCompatible(a))); + } + + /* tests that if a new version's minCompatVersion is always equal or higher to any older version */ + public void testMinCompatVersionOrderRespectsVersionOrder() { + List versionsByMinCompat = new ArrayList<>(allVersions()); + versionsByMinCompat.sort(Comparator.comparing(Version::minimumCompatibilityVersion)); + assertThat(versionsByMinCompat, equalTo(allVersions())); + + versionsByMinCompat.sort(Comparator.comparing(Version::minimumIndexCompatibilityVersion)); + assertThat(versionsByMinCompat, equalTo(allVersions())); } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/MembershipActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/MembershipActionTests.java index b8d9f175e64..3f479793814 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/MembershipActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/MembershipActionTests.java @@ -21,10 +21,20 @@ package org.elasticsearch.discovery.zen; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; +import static org.elasticsearch.test.VersionUtils.getPreviousVersion; +import static org.elasticsearch.test.VersionUtils.incompatibleFutureVersion; +import static org.elasticsearch.test.VersionUtils.maxCompatibleVersion; +import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion; +import static org.elasticsearch.test.VersionUtils.randomVersion; +import static org.elasticsearch.test.VersionUtils.randomVersionBetween; + public class MembershipActionTests extends ESTestCase { public void testPreventJoinClusterWithNewerIndices() { @@ -58,6 +68,50 @@ public class MembershipActionTests extends ESTestCase { metaData)); } + public void testPreventJoinClusterWithUnsupportedNodeVersions() { + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + final Version version = randomVersion(random()); + builder.add(new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), version)); + builder.add(new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), randomCompatibleVersion(random(), version))); + DiscoveryNodes nodes = builder.build(); + + final Version maxNodeVersion = nodes.getMaxNodeVersion(); + final Version minNodeVersion = nodes.getMinNodeVersion(); + if (maxNodeVersion.onOrAfter(Version.V_6_0_0_alpha1)) { + final Version tooLow = getPreviousVersion(maxNodeVersion.minimumCompatibilityVersion()); + expectThrows(IllegalStateException.class, () -> { + if (randomBoolean()) { + MembershipAction.ensureNodesCompatibility(tooLow, nodes); + } else { + MembershipAction.ensureNodesCompatibility(tooLow, minNodeVersion, maxNodeVersion); + } + }); + } + + if (minNodeVersion.before(Version.V_5_5_0)) { + Version tooHigh = incompatibleFutureVersion(minNodeVersion); + expectThrows(IllegalStateException.class, () -> { + if (randomBoolean()) { + MembershipAction.ensureNodesCompatibility(tooHigh, nodes); + } else { + MembershipAction.ensureNodesCompatibility(tooHigh, minNodeVersion, maxNodeVersion); + } + }); + } + + final Version minGoodVersion = maxNodeVersion.major == minNodeVersion.major ? + // we have to stick with the same major + minNodeVersion : + maxNodeVersion.minimumCompatibilityVersion(); + final Version justGood = randomVersionBetween(random(), minGoodVersion, maxCompatibleVersion(minNodeVersion)); + + if (randomBoolean()) { + MembershipAction.ensureNodesCompatibility(justGood, nodes); + } else { + MembershipAction.ensureNodesCompatibility(justGood, minNodeVersion, maxNodeVersion); + } + } + public void testSuccess() { Settings.builder().build(); MetaData.Builder metaBuilder = MetaData.builder(); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index c25152a4426..026d918802d 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -38,7 +39,6 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.MasterService; -import org.elasticsearch.cluster.service.MasterServiceTests; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -46,7 +46,6 @@ import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -84,6 +83,8 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; import static org.elasticsearch.cluster.routing.RoutingTableTests.updateActiveAllocations; import static org.elasticsearch.cluster.service.MasterServiceTests.discoveryState; +import static org.elasticsearch.test.VersionUtils.getPreviousVersion; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -566,7 +567,7 @@ public class NodeJoinControllerTests extends ESTestCase { randomBoolean() ? existing.getAddress() : buildNewFakeTransportAddress(), randomBoolean() ? existing.getAttributes() : Collections.singletonMap("attr", "other"), randomBoolean() ? existing.getRoles() : new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))), - randomBoolean() ? existing.getVersion() : VersionUtils.randomVersion(random())); + existing.getVersion()); ExecutionException e = expectThrows(ExecutionException.class, () -> joinNode(other_node)); assertThat(e.getMessage(), containsString("found existing node")); @@ -585,6 +586,49 @@ public class NodeJoinControllerTests extends ESTestCase { assertThat(e.getMessage(), containsString("found existing node")); } + public void testRejectingJoinWithIncompatibleVersion() throws InterruptedException, ExecutionException { + addNodes(randomInt(5)); + discoveryState(masterService); + final DiscoveryNode badNode = new DiscoveryNode("badNode", buildNewFakeTransportAddress(), emptyMap(), + new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))), + getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion())); + + final DiscoveryNode goodNode = new DiscoveryNode("goodNode", buildNewFakeTransportAddress(), emptyMap(), + new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))), Version.CURRENT); + + CountDownLatch latch = new CountDownLatch(1); + // block cluster state + masterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + latch.await(); + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + throw new AssertionError(e); + } + }); + + final SimpleFuture badJoin; + final SimpleFuture goodJoin; + if (randomBoolean()) { + badJoin = joinNodeAsync(badNode); + goodJoin = joinNodeAsync(goodNode); + } else { + goodJoin = joinNodeAsync(goodNode); + badJoin = joinNodeAsync(badNode); + } + assert goodJoin.isDone() == false; + assert badJoin.isDone() == false; + latch.countDown(); + goodJoin.get(); + ExecutionException e = expectThrows(ExecutionException.class, badJoin::get); + assertThat(e.getCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getCause().getMessage(), allOf(containsString("node version"), containsString("not supported"))); + } + /** * Tests tha node can become a master, even though the last cluster state it knows contains * nodes that conflict with the joins it got and needs to become a master diff --git a/docs/reference/setup/rolling_upgrade.asciidoc b/docs/reference/setup/rolling_upgrade.asciidoc index a1dc5daf35e..1e5bd94cbfd 100644 --- a/docs/reference/setup/rolling_upgrade.asciidoc +++ b/docs/reference/setup/rolling_upgrade.asciidoc @@ -197,3 +197,21 @@ recovery has completed. When the cluster is stable and the node has recovered, repeat the above steps for all remaining nodes. -- + +[IMPORTANT] +==================================================== + +During a rolling upgrade the cluster will continue to operate as normal. Any +new functionality will be disabled or work in a backward compatible manner +until all nodes of the cluster have been upgraded. Once the upgrade is +completed and all nodes are on the new version, the new functionality will +become operational. Once that has happened, it is practically impossible to +go back to operating in a backward compatible mode. To protect against such a +scenario, nodes from the previous major version (e.g. 5.x) will not be allowed +to join a cluster where all nodes are of a higher major version (e.g. 6.x). + +In the unlikely case of a network malfunction during upgrades, where all +remaining old nodes are isolated from the cluster, you will have to take all +old nodes offline and upgrade them before they can rejoin the cluster. + +==================================================== \ No newline at end of file diff --git a/test/framework/src/main/java/org/elasticsearch/test/VersionUtils.java b/test/framework/src/main/java/org/elasticsearch/test/VersionUtils.java index bdeeeff251e..8b2f51cf8a9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/VersionUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/VersionUtils.java @@ -29,7 +29,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Random; +import java.util.stream.Collectors; import static java.util.Collections.singletonList; import static java.util.Collections.unmodifiableList; @@ -189,6 +191,12 @@ public class VersionUtils { return ALL_VERSIONS.get(random.nextInt(ALL_VERSIONS.size())); } + /** Returns a random {@link Version} from all available versions, that is compatible with the given version. */ + public static Version randomCompatibleVersion(Random random, Version version) { + final List compatible = ALL_VERSIONS.stream().filter(version::isCompatible).collect(Collectors.toList()); + return compatible.get(random.nextInt(compatible.size())); + } + /** Returns a random {@link Version} between minVersion and maxVersion (inclusive). */ public static Version randomVersionBetween(Random random, @Nullable Version minVersion, @Nullable Version maxVersion) { int minVersionIndex = 0; @@ -211,4 +219,20 @@ public class VersionUtils { return ALL_VERSIONS.get(minVersionIndex + random.nextInt(range)); } } + + /** returns the first future incompatible version */ + public static Version incompatibleFutureVersion(Version version) { + final Optional opt = ALL_VERSIONS.stream().filter(version::before).filter(v -> v.isCompatible(version) == false).findAny(); + assert opt.isPresent() : "no future incompatible version for " + version; + return opt.get(); + } + + /** Returns the maximum {@link Version} that is compatible with the given version. */ + public static Version maxCompatibleVersion(Version version) { + final List compatible = ALL_VERSIONS.stream().filter(version::isCompatible).filter(version::onOrBefore) + .collect(Collectors.toList()); + assert compatible.size() > 0; + return compatible.get(compatible.size() - 1); + } + }