Revert "Validate a joining node's version with version of existing cluster nodes (#25770)"

This reverts commit 1e1f8e6376.
This commit is contained in:
Boaz Leskes 2017-07-19 17:34:53 +02:00
parent 4d78935df7
commit 9989ac69a4
9 changed files with 21 additions and 211 deletions

View File

@ -667,8 +667,10 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
ImmutableOpenMap.Builder<String, DiscoveryNode> dataNodesBuilder = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiscoveryNode> masterNodesBuilder = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiscoveryNode> ingestNodesBuilder = ImmutableOpenMap.builder();
Version minNodeVersion = null;
Version maxNodeVersion = null;
Version minNodeVersion = Version.CURRENT;
Version maxNodeVersion = Version.CURRENT;
// The node where we are building this on might not be a master or a data node, so we cannot assume
// that there is a node with the current version as a part of the cluster.
Version minNonClientNodeVersion = null;
Version maxNonClientNodeVersion = null;
for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) {
@ -678,29 +680,26 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
if (nodeEntry.value.isMasterNode()) {
masterNodesBuilder.put(nodeEntry.key, nodeEntry.value);
}
final Version version = nodeEntry.value.getVersion();
if (nodeEntry.value.isDataNode() || nodeEntry.value.isMasterNode()) {
if (minNonClientNodeVersion == null) {
minNonClientNodeVersion = version;
maxNonClientNodeVersion = version;
minNonClientNodeVersion = nodeEntry.value.getVersion();
maxNonClientNodeVersion = nodeEntry.value.getVersion();
} else {
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, version);
maxNonClientNodeVersion = Version.max(maxNonClientNodeVersion, version);
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
maxNonClientNodeVersion = Version.max(maxNonClientNodeVersion, nodeEntry.value.getVersion());
}
}
if (nodeEntry.value.isIngestNode()) {
ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value);
}
minNodeVersion = minNodeVersion == null ? version : Version.min(minNodeVersion, version);
maxNodeVersion = maxNodeVersion == null ? version : Version.max(maxNodeVersion, version);
minNodeVersion = Version.min(minNodeVersion, nodeEntry.value.getVersion());
maxNodeVersion = Version.max(maxNodeVersion, nodeEntry.value.getVersion());
}
return new DiscoveryNodes(
nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(),
masterNodeId, localNodeId, minNonClientNodeVersion == null ? Version.CURRENT : minNonClientNodeVersion,
maxNonClientNodeVersion == null ? Version.CURRENT : maxNonClientNodeVersion,
maxNodeVersion == null ? Version.CURRENT : maxNodeVersion,
minNodeVersion == null ? Version.CURRENT : minNodeVersion
maxNonClientNodeVersion == null ? Version.CURRENT : maxNonClientNodeVersion, maxNodeVersion, minNodeVersion
);
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -179,7 +178,6 @@ public class MembershipAction extends AbstractComponent {
@Override
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
ensureNodesCompatibility(Version.CURRENT, request.state.getNodes());
ensureIndexCompatibility(Version.CURRENT, request.state.getMetaData());
// for now, the mere fact that we can serialize the cluster state acts as validation....
channel.sendResponse(TransportResponse.Empty.INSTANCE);
@ -209,31 +207,6 @@ public class MembershipAction extends AbstractComponent {
}
}
/** ensures that the joining node has a version that's compatible with all current nodes*/
static void ensureNodesCompatibility(final Version joiningNodeVersion, DiscoveryNodes currentNodes) {
final Version minNodeVersion = currentNodes.getMinNodeVersion();
final Version maxNodeVersion = currentNodes.getMaxNodeVersion();
ensureNodesCompatibility(joiningNodeVersion, minNodeVersion, maxNodeVersion);
}
/** ensures that the joining node has a version that's compatible with a given version range */
static void ensureNodesCompatibility(Version joiningNodeVersion, Version minClusterNodeVersion, Version maxClusterNodeVersion) {
assert minClusterNodeVersion.onOrBefore(maxClusterNodeVersion) : minClusterNodeVersion + " > " + maxClusterNodeVersion;
final byte clusterMajor = minClusterNodeVersion.major;
if (joiningNodeVersion.major < clusterMajor) {
throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " +
"All nodes in the cluster are of a higher major [" + clusterMajor + "].");
}
if (joiningNodeVersion.isCompatible(maxClusterNodeVersion) == false) {
throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " +
"The cluster contains nodes with version [" + maxClusterNodeVersion + "], which is incompatible.");
}
if (joiningNodeVersion.isCompatible(minClusterNodeVersion) == false) {
throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported." +
"The cluster contains nodes with version [" + minClusterNodeVersion + "], which is incompatible.");
}
}
public static class LeaveRequest extends TransportRequest {
private DiscoveryNode node;

View File

@ -33,12 +33,12 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.discovery.DiscoverySettings;
import java.util.ArrayList;
@ -433,31 +433,28 @@ public class NodeJoinController extends AbstractComponent {
assert nodesBuilder.isLocalNodeElectedMaster();
Version minClusterNodeVersion = newState.nodes().getMinNodeVersion();
Version maxClusterNodeVersion = newState.nodes().getMaxNodeVersion();
Version minNodeVersion = Version.CURRENT;
// processing any joins
for (final DiscoveryNode node : joiningNodes) {
minNodeVersion = Version.min(minNodeVersion, node.getVersion());
if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) {
// noop
} else if (currentNodes.nodeExists(node)) {
logger.debug("received a join request for an existing node [{}]", node);
} else {
try {
MembershipAction.ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
MembershipAction.ensureIndexCompatibility(node.getVersion(), currentState.getMetaData());
nodesBuilder.add(node);
nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
} catch (IllegalArgumentException | IllegalStateException e) {
} catch (IllegalArgumentException e) {
results.failure(node, e);
continue;
}
}
results.success(node);
}
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
MembershipAction.ensureIndexCompatibility(minNodeVersion, currentState.getMetaData());
if (nodesChanged) {
newState.nodes(nodesBuilder);
return results.build(allocationService.reroute(newState.build(), "node_join"));

View File

@ -885,7 +885,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
} else {
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
// to ensure we fail as fast as possible.
MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes());
MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData());
// try and connect to the node, if it fails, we can raise an exception back to the client...
transportService.connectToNode(node);

View File

@ -27,18 +27,14 @@ import org.elasticsearch.test.VersionUtils;
import org.hamcrest.Matchers;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.Version.V_5_3_0;
import static org.elasticsearch.Version.V_6_0_0_beta1;
import static org.elasticsearch.test.VersionUtils.allVersions;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.containsString;
@ -347,20 +343,6 @@ public class VersionTests extends ESTestCase {
isCompatible(VersionUtils.getPreviousMinorVersion(), Version.fromString("7.0.0")));
assertFalse(isCompatible(Version.V_5_0_0, Version.fromString("6.0.0")));
assertFalse(isCompatible(Version.V_5_0_0, Version.fromString("7.0.0")));
Version a = randomVersion(random());
Version b = randomVersion(random());
assertThat(a.isCompatible(b), equalTo(b.isCompatible(a)));
}
/* tests that if a new version's minCompatVersion is always equal or higher to any older version */
public void testMinCompatVersionOrderRespectsVersionOrder() {
List<Version> versionsByMinCompat = new ArrayList<>(allVersions());
versionsByMinCompat.sort(Comparator.comparing(Version::minimumCompatibilityVersion));
assertThat(versionsByMinCompat, equalTo(allVersions()));
versionsByMinCompat.sort(Comparator.comparing(Version::minimumIndexCompatibilityVersion));
assertThat(versionsByMinCompat, equalTo(allVersions()));
}

View File

@ -21,20 +21,10 @@ package org.elasticsearch.discovery.zen;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import static org.elasticsearch.test.VersionUtils.getPreviousVersion;
import static org.elasticsearch.test.VersionUtils.incompatibleFutureVersion;
import static org.elasticsearch.test.VersionUtils.maxCompatibleVersion;
import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
public class MembershipActionTests extends ESTestCase {
public void testPreventJoinClusterWithNewerIndices() {
@ -68,50 +58,6 @@ public class MembershipActionTests extends ESTestCase {
metaData));
}
public void testPreventJoinClusterWithUnsupportedNodeVersions() {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
final Version version = randomVersion(random());
builder.add(new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), version));
builder.add(new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), randomCompatibleVersion(random(), version)));
DiscoveryNodes nodes = builder.build();
final Version maxNodeVersion = nodes.getMaxNodeVersion();
final Version minNodeVersion = nodes.getMinNodeVersion();
if (maxNodeVersion.onOrAfter(Version.V_6_0_0_alpha1)) {
final Version tooLow = getPreviousVersion(maxNodeVersion.minimumCompatibilityVersion());
expectThrows(IllegalStateException.class, () -> {
if (randomBoolean()) {
MembershipAction.ensureNodesCompatibility(tooLow, nodes);
} else {
MembershipAction.ensureNodesCompatibility(tooLow, minNodeVersion, maxNodeVersion);
}
});
}
if (minNodeVersion.before(Version.V_5_5_0)) {
Version tooHigh = incompatibleFutureVersion(minNodeVersion);
expectThrows(IllegalStateException.class, () -> {
if (randomBoolean()) {
MembershipAction.ensureNodesCompatibility(tooHigh, nodes);
} else {
MembershipAction.ensureNodesCompatibility(tooHigh, minNodeVersion, maxNodeVersion);
}
});
}
final Version minGoodVersion = maxNodeVersion.major == minNodeVersion.major ?
// we have to stick with the same major
minNodeVersion :
maxNodeVersion.minimumCompatibilityVersion();
final Version justGood = randomVersionBetween(random(), minGoodVersion, maxCompatibleVersion(minNodeVersion));
if (randomBoolean()) {
MembershipAction.ensureNodesCompatibility(justGood, nodes);
} else {
MembershipAction.ensureNodesCompatibility(justGood, minNodeVersion, maxNodeVersion);
}
}
public void testSuccess() {
Settings.builder().build();
MetaData.Builder metaBuilder = MetaData.builder();

View File

@ -24,7 +24,6 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -39,6 +38,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.cluster.service.MasterServiceTests;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -46,6 +46,7 @@ import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -83,8 +84,6 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.cluster.routing.RoutingTableTests.updateActiveAllocations;
import static org.elasticsearch.cluster.service.MasterServiceTests.discoveryState;
import static org.elasticsearch.test.VersionUtils.getPreviousVersion;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
@ -567,7 +566,7 @@ public class NodeJoinControllerTests extends ESTestCase {
randomBoolean() ? existing.getAddress() : buildNewFakeTransportAddress(),
randomBoolean() ? existing.getAttributes() : Collections.singletonMap("attr", "other"),
randomBoolean() ? existing.getRoles() : new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))),
existing.getVersion());
randomBoolean() ? existing.getVersion() : VersionUtils.randomVersion(random()));
ExecutionException e = expectThrows(ExecutionException.class, () -> joinNode(other_node));
assertThat(e.getMessage(), containsString("found existing node"));
@ -586,49 +585,6 @@ public class NodeJoinControllerTests extends ESTestCase {
assertThat(e.getMessage(), containsString("found existing node"));
}
public void testRejectingJoinWithIncompatibleVersion() throws InterruptedException, ExecutionException {
addNodes(randomInt(5));
discoveryState(masterService);
final DiscoveryNode badNode = new DiscoveryNode("badNode", buildNewFakeTransportAddress(), emptyMap(),
new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))),
getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion()));
final DiscoveryNode goodNode = new DiscoveryNode("goodNode", buildNewFakeTransportAddress(), emptyMap(),
new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))), Version.CURRENT);
CountDownLatch latch = new CountDownLatch(1);
// block cluster state
masterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
latch.await();
return currentState;
}
@Override
public void onFailure(String source, Exception e) {
throw new AssertionError(e);
}
});
final SimpleFuture badJoin;
final SimpleFuture goodJoin;
if (randomBoolean()) {
badJoin = joinNodeAsync(badNode);
goodJoin = joinNodeAsync(goodNode);
} else {
goodJoin = joinNodeAsync(goodNode);
badJoin = joinNodeAsync(badNode);
}
assert goodJoin.isDone() == false;
assert badJoin.isDone() == false;
latch.countDown();
goodJoin.get();
ExecutionException e = expectThrows(ExecutionException.class, badJoin::get);
assertThat(e.getCause(), instanceOf(IllegalStateException.class));
assertThat(e.getCause().getMessage(), allOf(containsString("node version"), containsString("not supported")));
}
/**
* Tests tha node can become a master, even though the last cluster state it knows contains
* nodes that conflict with the joins it got and needs to become a master

View File

@ -197,21 +197,3 @@ recovery has completed.
When the cluster is stable and the node has recovered, repeat the above steps
for all remaining nodes.
--
[IMPORTANT]
====================================================
During a rolling upgrade the cluster will continue to operate as normal. Any
new functionality will be disabled or work in a backward compatible manner
until all nodes of the cluster have been upgraded. Once the upgrade is
completed and all nodes are on the new version, the new functionality will
become operational. Once that has happened, it is practically impossible to
go back to operating in a backward compatible mode. To protect against such a
scenario, nodes from the previous major version (e.g. 5.x) will not be allowed
to join a cluster where all nodes are of a higher major version (e.g. 6.x).
In the unlikely case of a network malfunction during upgrades, where all
remaining old nodes are isolated from the cluster, you will have to take all
old nodes offline and upgrade them before they can rejoin the cluster.
====================================================

View File

@ -29,9 +29,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
import static java.util.Collections.unmodifiableList;
@ -191,12 +189,6 @@ public class VersionUtils {
return ALL_VERSIONS.get(random.nextInt(ALL_VERSIONS.size()));
}
/** Returns a random {@link Version} from all available versions, that is compatible with the given version. */
public static Version randomCompatibleVersion(Random random, Version version) {
final List<Version> compatible = ALL_VERSIONS.stream().filter(version::isCompatible).collect(Collectors.toList());
return compatible.get(random.nextInt(compatible.size()));
}
/** Returns a random {@link Version} between <code>minVersion</code> and <code>maxVersion</code> (inclusive). */
public static Version randomVersionBetween(Random random, @Nullable Version minVersion, @Nullable Version maxVersion) {
int minVersionIndex = 0;
@ -219,20 +211,4 @@ public class VersionUtils {
return ALL_VERSIONS.get(minVersionIndex + random.nextInt(range));
}
}
/** returns the first future incompatible version */
public static Version incompatibleFutureVersion(Version version) {
final Optional<Version> opt = ALL_VERSIONS.stream().filter(version::before).filter(v -> v.isCompatible(version) == false).findAny();
assert opt.isPresent() : "no future incompatible version for " + version;
return opt.get();
}
/** Returns the maximum {@link Version} that is compatible with the given version. */
public static Version maxCompatibleVersion(Version version) {
final List<Version> compatible = ALL_VERSIONS.stream().filter(version::isCompatible).filter(version::onOrBefore)
.collect(Collectors.toList());
assert compatible.size() > 0;
return compatible.get(compatible.size() - 1);
}
}