From 9f86e996fecd8a6e6aa4a2b3c5e2580e1c3171db Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 8 Dec 2018 07:33:35 +0000 Subject: [PATCH] [Zen2] Support rolling upgrades from Zen1 (#35737) We support rolling upgrades from Zen1 by keeping the master as a Zen1 node until there are no more Zen1 nodes in the cluster, using the following principles: - Zen1 nodes will never vote for Zen2 nodes - Zen2 nodes will, while not bootstrapped, vote for Zen1 nodes - Zen2 nodes that were previously part of a mixed cluster will automatically (and unsafely) bootstrap themselves when the last Zen1 node leaves. --- .../coordination/CoordinationState.java | 16 +- .../cluster/coordination/Coordinator.java | 90 ++++- .../coordination/DiscoveryUpgradeService.java | 320 ++++++++++++++++++ .../cluster/coordination/JoinHelper.java | 7 + .../coordination/JoinTaskExecutor.java | 2 +- .../coordination/PreVoteCollector.java | 2 + .../cluster/coordination/Reconfigurator.java | 1 + .../common/settings/ClusterSettings.java | 5 +- .../elasticsearch/discovery/PeerFinder.java | 27 +- .../elasticsearch/discovery/zen/ZenPing.java | 20 +- .../DiscoveryUpgradeServiceTests.java | 38 +++ .../cluster/coordination/Zen1IT.java | 185 +++++++++- .../test/InternalTestCluster.java | 4 +- 13 files changed, 690 insertions(+), 27 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeServiceTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index b145e688272..4cea05726b0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -32,6 +32,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import static org.elasticsearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM; + /** * The core class of the cluster state coordination algorithm, directly implementing the * formal model @@ -321,10 +323,16 @@ public class CoordinationState { getCurrentTerm()); } if (clusterState.term() == getLastAcceptedTerm() && clusterState.version() <= getLastAcceptedVersion()) { - logger.debug("handlePublishRequest: ignored publish request due to version mismatch (expected: >[{}], actual: [{}])", - getLastAcceptedVersion(), clusterState.version()); - throw new CoordinationStateRejectedException("incoming version " + clusterState.version() + - " lower or equal to current version " + getLastAcceptedVersion()); + if (clusterState.term() == ZEN1_BWC_TERM + && clusterState.nodes().getMasterNode().equals(getLastAcceptedState().nodes().getMasterNode()) == false) { + logger.debug("handling publish request in compatibility mode despite version mismatch (expected: >[{}], actual: [{}])", + getLastAcceptedVersion(), clusterState.version()); + } else { + logger.debug("handlePublishRequest: ignored publish request due to version mismatch (expected: >[{}], actual: [{}])", + getLastAcceptedVersion(), clusterState.version()); + throw new CoordinationStateRejectedException("incoming version " + clusterState.version() + + " lower or equal to current version " + getLastAcceptedVersion()); + } } logger.trace("handlePublishRequest: accepting publish request for version [{}] and term [{}]", diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index ef4fd454a11..ee6d565b97c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -44,6 +44,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; @@ -86,6 +87,9 @@ import static org.elasticsearch.gateway.ClusterStateUpdaters.hideStateIfNotRecov import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; public class Coordinator extends AbstractLifecycleComponent implements Discovery { + + public static final long ZEN1_BWC_TERM = 0; + private static final Logger logger = LogManager.getLogger(Coordinator.class); // the timeout for the publication of each value @@ -121,6 +125,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private long maxTermSeen; private final Reconfigurator reconfigurator; private final ClusterBootstrapService clusterBootstrapService; + private final DiscoveryUpgradeService discoveryUpgradeService; private final LagDetector lagDetector; private final ClusterFormationFailureHelper clusterFormationFailureHelper; @@ -161,6 +166,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery masterService.setClusterStateSupplier(this::getStateForMasterService); this.reconfigurator = new Reconfigurator(settings, clusterSettings); this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService); + this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService, this::isBootstrapped, + joinHelper, peerFinder::getFoundPeers, this::unsafelySetConfigurationForUpgrade); this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"), transportService::getLocalNode); this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState, @@ -256,6 +263,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest); } + if (publishRequest.getAcceptedState().term() == ZEN1_BWC_TERM && getCurrentTerm() == ZEN1_BWC_TERM + && mode == Mode.FOLLOWER && Optional.of(sourceNode).equals(lastKnownLeader) == false) { + + logger.debug("received cluster state from {} but currently following {}, rejecting", sourceNode, lastKnownLeader); + throw new CoordinationStateRejectedException("received cluster state from " + sourceNode + " but currently following " + + lastKnownLeader + ", rejecting"); + } + ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest); @@ -323,7 +338,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery final StartJoinRequest startJoinRequest = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1); logger.debug("starting election with {}", startJoinRequest); - getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node)); + getDiscoveredNodes().forEach(node -> { + if (isZen1Node(node) == false) { + joinHelper.sendStartJoinRequest(startJoinRequest, node); + } + }); } } } @@ -384,6 +403,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes()); clusterFormationFailureHelper.start(); + + if (getCurrentTerm() == ZEN1_BWC_TERM) { + discoveryUpgradeService.activate(lastKnownLeader); + } + leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES); leaderChecker.updateLeader(null); @@ -414,6 +438,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery lastKnownLeader = Optional.of(getLocalNode()); peerFinder.deactivate(getLocalNode()); + discoveryUpgradeService.deactivate(); clusterFormationFailureHelper.stop(); closePrevotingAndElectionScheduler(); preVoteCollector.update(getPreVoteResponse(), getLocalNode()); @@ -439,6 +464,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery lastKnownLeader = Optional.of(leaderNode); peerFinder.deactivate(leaderNode); + discoveryUpgradeService.deactivate(); clusterFormationFailureHelper.stop(); closePrevotingAndElectionScheduler(); cancelActivePublication(); @@ -647,9 +673,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery return false; } - assert currentState.term() == 0 : currentState; - assert currentState.version() == 0 : currentState; - if (mode != Mode.CANDIDATE) { throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode); } @@ -681,12 +704,59 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery } } + private boolean isBootstrapped() { + return getLastAcceptedState().getLastAcceptedConfiguration().isEmpty() == false; + } + + private void unsafelySetConfigurationForUpgrade(VotingConfiguration votingConfiguration) { + assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this method once unsafe upgrades are no longer needed"; + synchronized (mutex) { + if (mode != Mode.CANDIDATE) { + throw new IllegalStateException("Cannot overwrite configuration in mode " + mode); + } + + if (isBootstrapped()) { + throw new IllegalStateException("Cannot overwrite configuration: configuration is already set to " + + getLastAcceptedState().getLastAcceptedConfiguration()); + } + + if (lastKnownLeader.map(Coordinator::isZen1Node).orElse(false) == false) { + throw new IllegalStateException("Cannot upgrade from last-known leader: " + lastKnownLeader); + } + + if (getCurrentTerm() != ZEN1_BWC_TERM) { + throw new IllegalStateException("Cannot upgrade, term is " + getCurrentTerm()); + } + + logger.info("automatically bootstrapping during rolling upgrade, using initial configuration {}", votingConfiguration); + + final ClusterState currentState = getStateForMasterService(); + final Builder builder = masterService.incrementVersion(currentState); + builder.metaData(MetaData.builder(currentState.metaData()).coordinationMetaData( + CoordinationMetaData.builder(currentState.metaData().coordinationMetaData()) + .term(1) + .lastAcceptedConfiguration(votingConfiguration) + .lastCommittedConfiguration(votingConfiguration) + .build())); + final ClusterState newClusterState = builder.build(); + + coordinationState.get().handleStartJoin(new StartJoinRequest(getLocalNode(), newClusterState.term())); + coordinationState.get().handlePublishRequest(new PublishRequest(newClusterState)); + + followersChecker.clearCurrentNodes(); + followersChecker.updateFastResponseState(getCurrentTerm(), mode); + + peerFinder.deactivate(getLocalNode()); + peerFinder.activate(newClusterState.nodes()); + } + } + // Package-private for testing ClusterState improveConfiguration(ClusterState clusterState) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; final Set liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false) - .filter(this::hasJoinVoteFrom).collect(Collectors.toSet()); + .filter(this::hasJoinVoteFrom).filter(discoveryNode -> isZen1Node(discoveryNode) == false).collect(Collectors.toSet()); final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes, clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId).collect(Collectors.toSet()), clusterState.getLastAcceptedConfiguration()); @@ -967,7 +1037,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery prevotingRound.close(); } final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); - prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes()); + final List discoveredNodes + = getDiscoveredNodes().stream().filter(n -> isZen1Node(n) == false).collect(Collectors.toList()); + prevotingRound = preVoteCollector.start(lastAcceptedState, discoveredNodes); } } } @@ -1176,13 +1248,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery } // TODO: only here temporarily for BWC development, remove once complete - public static Settings.Builder addZen1Attribute(Settings.Builder builder) { - return builder.put("node.attr.zen1", true); + public static Settings.Builder addZen1Attribute(boolean isZen1Node, Settings.Builder builder) { + return builder.put("node.attr.zen1", isZen1Node); } // TODO: only here temporarily for BWC development, remove once complete public static boolean isZen1Node(DiscoveryNode discoveryNode) { return discoveryNode.getVersion().before(Version.V_7_0_0) || - discoveryNode.getAttributes().containsKey("zen1"); + (Booleans.isTrue(discoveryNode.getAttributes().getOrDefault("zen1", "false"))); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java new file mode 100644 index 00000000000..52fb25dd8af --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java @@ -0,0 +1,320 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.discovery.zen.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService.MasterCandidate; +import org.elasticsearch.discovery.zen.UnicastZenPing; +import org.elasticsearch.discovery.zen.UnicastZenPing.UnicastPingRequest; +import org.elasticsearch.discovery.zen.UnicastZenPing.UnicastPingResponse; +import org.elasticsearch.discovery.zen.ZenPing.PingResponse; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Optional; +import java.util.Set; +import java.util.function.BooleanSupplier; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static java.lang.Math.max; +import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING; +import static org.elasticsearch.cluster.ClusterState.UNKNOWN_VERSION; +import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; +import static org.elasticsearch.discovery.zen.ZenDiscovery.PING_TIMEOUT_SETTING; + +/** + * Deals with rolling upgrades of the cluster coordination layer. In mixed clusters we prefer to elect the older nodes, but + * when the last old node shuts down then as long as there are enough new nodes we can assume that they form the whole cluster and + * define them as the initial configuration. + */ +public class DiscoveryUpgradeService { + + private static Logger logger = LogManager.getLogger(DiscoveryUpgradeService.class); + + // how long to wait after activation before attempting to join a master or perform a bootstrap upgrade + public static final Setting BWC_PING_TIMEOUT_SETTING = + Setting.timeSetting("discovery.zen.bwc_ping_timeout", + PING_TIMEOUT_SETTING, TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + + // whether to try and bootstrap all the discovered Zen2 nodes when the last Zen1 node leaves the cluster. + public static final Setting ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING = + Setting.boolSetting("discovery.zen.unsafe_rolling_upgrades_enabled", true, Setting.Property.NodeScope); + + private final ElectMasterService electMasterService; + private final TransportService transportService; + private final BooleanSupplier isBootstrappedSupplier; + private final JoinHelper joinHelper; + private final Supplier> peersSupplier; + private final Consumer initialConfigurationConsumer; + private final TimeValue bwcPingTimeout; + private final boolean enableUnsafeBootstrappingOnUpgrade; + private final ClusterName clusterName; + + @Nullable // null if no active joining round + private volatile JoiningRound joiningRound; + + public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSettings, TransportService transportService, + BooleanSupplier isBootstrappedSupplier, JoinHelper joinHelper, + Supplier> peersSupplier, + Consumer initialConfigurationConsumer) { + assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this service once unsafe upgrades are no longer needed"; + electMasterService = new ElectMasterService(settings); + this.transportService = transportService; + this.isBootstrappedSupplier = isBootstrappedSupplier; + this.joinHelper = joinHelper; + this.peersSupplier = peersSupplier; + this.initialConfigurationConsumer = initialConfigurationConsumer; + this.bwcPingTimeout = BWC_PING_TIMEOUT_SETTING.get(settings); + this.enableUnsafeBootstrappingOnUpgrade = ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING.get(settings); + this.clusterName = CLUSTER_NAME_SETTING.get(settings); + + clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, + electMasterService::minimumMasterNodes); // TODO reject update if the new value is too large + } + + public void activate(Optional lastKnownLeader) { + // called under coordinator mutex + + if (isBootstrappedSupplier.getAsBoolean()) { + return; + } + + assert lastKnownLeader.isPresent() == false || Coordinator.isZen1Node(lastKnownLeader.get()) : lastKnownLeader; + // if there was a leader and it's not a old node then we must have been bootstrapped + + assert joiningRound == null : joiningRound; + joiningRound = new JoiningRound(lastKnownLeader.isPresent()); + joiningRound.scheduleNextAttempt(); + } + + public void deactivate() { + // called under coordinator mutex + joiningRound = null; + } + + /** + * Waits for some number of calls to {@link ListenableCountDown#countDown()} and then notifies a listener. The listener + * is only ever notified once, whether successful or not. + */ + private static class ListenableCountDown { + private final CountDown countDown; + private final ActionListener listener; + + ListenableCountDown(int count, ActionListener listener) { + this.countDown = new CountDown(count); + this.listener = listener; + } + + void onFailure(Exception e) { + if (countDown.fastForward()) { + listener.onFailure(e); + } + } + + void countDown() { + if (countDown.countDown()) { + listener.onResponse(null); + } + } + } + + private class JoiningRound { + private final boolean upgrading; + + JoiningRound(boolean upgrading) { + this.upgrading = upgrading; + } + + private boolean isRunning() { + return joiningRound == this && isBootstrappedSupplier.getAsBoolean() == false; + } + + void scheduleNextAttempt() { + if (isRunning() == false) { + return; + } + + final ThreadPool threadPool = transportService.getThreadPool(); + threadPool.scheduleUnlessShuttingDown(bwcPingTimeout, Names.SAME, new Runnable() { + + @Override + public void run() { + if (isRunning() == false) { + return; + } + + final Set discoveryNodes = Stream.concat(StreamSupport.stream(peersSupplier.get().spliterator(), false), + Stream.of(transportService.getLocalNode())).filter(DiscoveryNode::isMasterNode).collect(Collectors.toSet()); + + // this set of nodes is reasonably fresh - the PeerFinder cleans up nodes to which the transport service is not + // connected each time it wakes up (every second by default) + + logger.debug("nodes: {}", discoveryNodes); + + if (electMasterService.hasEnoughMasterNodes(discoveryNodes)) { + if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) { + electBestOldMaster(discoveryNodes); + } else if (upgrading && enableUnsafeBootstrappingOnUpgrade) { + // no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade + transportService.getThreadPool().generic().execute(() -> { + try { + initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream() + .map(DiscoveryNode::getId).collect(Collectors.toSet()))); + } catch (Exception e) { + logger.debug("exception during bootstrapping upgrade, retrying", e); + } finally { + scheduleNextAttempt(); + } + }); + } else { + scheduleNextAttempt(); + } + } else { + scheduleNextAttempt(); + } + } + + /** + * Ping all the old discovered masters one more time to obtain their cluster state versions, and then vote for the best one. + * @param discoveryNodes The master nodes (old and new). + */ + private void electBestOldMaster(Set discoveryNodes) { + final Set masterCandidates = newConcurrentSet(); + final ListenableCountDown listenableCountDown + = new ListenableCountDown(discoveryNodes.size(), new ActionListener() { + + @Override + public void onResponse(Void value) { + assert masterCandidates.size() == discoveryNodes.size() + : masterCandidates + " does not match " + discoveryNodes; + + // TODO we shouldn't elect a master with a version that's older than ours + // If the only Zen1 nodes left are stale, and we can bootstrap, maybe we should bootstrap? + // Do we ever need to elect a freshly-started Zen1 node? + if (isRunning()) { + final MasterCandidate electedMaster = electMasterService.electMaster(masterCandidates); + logger.debug("elected {}, sending join", electedMaster); + joinHelper.sendJoinRequest(electedMaster.getNode(), Optional.empty(), + JoiningRound.this::scheduleNextAttempt); + } + } + + @Override + public void onFailure(Exception e) { + scheduleNextAttempt(); + } + }); + + boolean foundOldMaster = false; + for (final DiscoveryNode discoveryNode : discoveryNodes) { + assert discoveryNode.isMasterNode() : discoveryNode; + if (Coordinator.isZen1Node(discoveryNode)) { + foundOldMaster = true; + transportService.sendRequest(discoveryNode, UnicastZenPing.ACTION_NAME, + new UnicastPingRequest(0, TimeValue.ZERO, + new PingResponse(createDiscoveryNodeWithImpossiblyHighId(transportService.getLocalNode()), + null, clusterName, UNKNOWN_VERSION)), + TransportRequestOptions.builder().withTimeout(bwcPingTimeout).build(), + new TransportResponseHandler() { + @Override + public void handleResponse(UnicastPingResponse response) { + long clusterStateVersion = UNKNOWN_VERSION; + for (PingResponse pingResponse : response.pingResponses) { + if (discoveryNode.equals(pingResponse.node())) { + clusterStateVersion + = max(clusterStateVersion, pingResponse.getClusterStateVersion()); + } + } + masterCandidates.add(new MasterCandidate(discoveryNode, clusterStateVersion)); + listenableCountDown.countDown(); + } + + @Override + public void handleException(TransportException exp) { + logger.debug( + new ParameterizedMessage("unexpected exception when pinging {}", discoveryNode), exp); + listenableCountDown.onFailure(exp); + } + + @Override + public String executor() { + return Names.SAME; + } + + @Override + public UnicastPingResponse read(StreamInput in) throws IOException { + return new UnicastPingResponse(in); + } + }); + + } else { + masterCandidates.add( + new MasterCandidate(createDiscoveryNodeWithImpossiblyHighId(discoveryNode), UNKNOWN_VERSION)); + listenableCountDown.countDown(); + } + } + assert foundOldMaster; + } + + @Override + public String toString() { + return "discovery upgrade service retry"; + } + }); + } + } + + /** + * Pre-7.0 nodes select the best master by comparing their IDs (as strings) and selecting the lowest one amongst those nodes with + * the best cluster state version. We want 7.0+ nodes to participate in these elections in a mixed cluster but never to win one, so + * we lie and claim to have an impossible ID that compares above all genuine IDs. + */ + public static DiscoveryNode createDiscoveryNodeWithImpossiblyHighId(DiscoveryNode node) { + // IDs are base-64-encoded UUIDs, which means they the character set [0-9A-Za-z_-]. The highest character in this set is 'z', and + // 'z' < '{', so by starting the ID with '{' we can be sure it's greater. This is terrible. + final String fakeId = "{zen2}" + node.getId(); + assert node.getId().compareTo(fakeId) < 0 : node + " vs " + fakeId; + return new DiscoveryNode(node.getName(), fakeId, node.getEphemeralId(), node.getHostName(), + node.getHostAddress(), node.getAddress(), node.getAttributes(), node.getRoles(), node.getVersion()); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 14c7c47f4c5..f00be2259a4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -166,6 +166,11 @@ public class JoinHelper { } public void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin) { + sendJoinRequest(destination, optionalJoin, () -> { + }); + } + + public void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin, Runnable onCompletion) { assert destination.isMasterNode() : "trying to join master-ineligible " + destination; final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin); final Tuple dedupKey = Tuple.tuple(destination, joinRequest); @@ -192,12 +197,14 @@ public class JoinHelper { public void handleResponse(Empty response) { pendingOutgoingJoins.remove(dedupKey); logger.debug("successfully joined {} with {}", destination, joinRequest); + onCompletion.run(); } @Override public void handleException(TransportException exp) { pendingOutgoingJoins.remove(dedupKey); logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp); + onCompletion.run(); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index a4580c062b6..9544cf15a0c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -151,7 +151,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor joiningNodes) { + protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState currentState, List joiningNodes) { assert currentState.nodes().getMasterNodeId() == null : currentState; DiscoveryNodes currentNodes = currentState.nodes(); DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java index 5001e3be181..6137f5a6d0f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongConsumer; +import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.coordination.CoordinationState.isElectionQuorum; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; @@ -140,6 +141,7 @@ public class PreVoteCollector { } void start(final Iterable broadcastNodes) { + assert StreamSupport.stream(broadcastNodes.spliterator(), false).noneMatch(Coordinator::isZen1Node) : broadcastNodes; logger.debug("{} requesting pre-votes from {}", this, broadcastNodes); broadcastNodes.forEach(n -> transportService.sendRequest(n, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest, new TransportResponseHandler() { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java index 8b227c0b261..5c7b9562d8d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java @@ -94,6 +94,7 @@ public class Reconfigurator { * @return An optimal configuration, or leave the current configuration unchanged if the optimal configuration has no live quorum. */ public VotingConfiguration reconfigure(Set liveNodes, Set retiredNodeIds, VotingConfiguration currentConfig) { + assert liveNodes.stream().noneMatch(Coordinator::isZen1Node) : liveNodes; logger.trace("{} reconfiguring {} based on liveNodes={}, retiredNodeIds={}", this, currentConfig, liveNodes, retiredNodeIds); /* diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 7775f1de1a2..f150650948d 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.coordination.ClusterBootstrapService; import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper; import org.elasticsearch.cluster.coordination.Coordinator; +import org.elasticsearch.cluster.coordination.DiscoveryUpgradeService; import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory; import org.elasticsearch.cluster.coordination.FollowersChecker; import org.elasticsearch.cluster.coordination.JoinHelper; @@ -476,7 +477,9 @@ public final class ClusterSettings extends AbstractScopedSettings { ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING, ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING, ClusterBootstrapService.UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING, - LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING + LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING, + DiscoveryUpgradeService.BWC_PING_TIMEOUT_SETTING, + DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING ))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 36090f645f6..61d2e031977 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -27,7 +27,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.PeersResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -41,6 +40,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.discovery.zen.UnicastZenPing; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ZenPing; +import org.elasticsearch.discovery.zen.ZenPing.PingResponse; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportChannel; @@ -62,6 +62,8 @@ import java.util.Optional; import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.elasticsearch.cluster.coordination.Coordinator.isZen1Node; +import static org.elasticsearch.cluster.coordination.DiscoveryUpgradeService.createDiscoveryNodeWithImpossiblyHighId; import static java.util.Collections.emptyList; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; @@ -412,6 +414,11 @@ public abstract class PeerFinder { final DiscoveryNode discoveryNode = getDiscoveryNode(); assert discoveryNode != null : "cannot request peers without first connecting"; + if (discoveryNode.equals(getLocalNode())) { + logger.trace("{} not requesting peers from local node", this); + return; + } + logger.trace("{} requesting peers", this); peersRequestInFlight = true; @@ -459,11 +466,11 @@ public abstract class PeerFinder { final String actionName; final TransportRequest transportRequest; final TransportResponseHandler transportResponseHandler; - if (Coordinator.isZen1Node(discoveryNode)) { + if (isZen1Node(discoveryNode)) { actionName = UnicastZenPing.ACTION_NAME; transportRequest = new UnicastZenPing.UnicastPingRequest(1, ZenDiscovery.PING_TIMEOUT_SETTING.get(settings), - new ZenPing.PingResponse(getLocalNode(), null, ClusterName.CLUSTER_NAME_SETTING.get(settings), - ClusterState.UNKNOWN_VERSION)); + new ZenPing.PingResponse(createDiscoveryNodeWithImpossiblyHighId(getLocalNode()), null, + ClusterName.CLUSTER_NAME_SETTING.get(settings), ClusterState.UNKNOWN_VERSION)); transportResponseHandler = peersResponseHandler.wrap(ucResponse -> { Optional optionalMasterNode = Arrays.stream(ucResponse.pingResponses) .filter(pr -> discoveryNode.equals(pr.node()) && discoveryNode.equals(pr.master())) @@ -471,9 +478,9 @@ public abstract class PeerFinder { .findFirst(); List discoveredNodes = new ArrayList<>(); if (optionalMasterNode.isPresent() == false) { - Arrays.stream(ucResponse.pingResponses).map(pr -> pr.master()).filter(Objects::nonNull) + Arrays.stream(ucResponse.pingResponses).map(PingResponse::master).filter(Objects::nonNull) .forEach(discoveredNodes::add); - Arrays.stream(ucResponse.pingResponses).map(pr -> pr.node()).forEach(discoveredNodes::add); + Arrays.stream(ucResponse.pingResponses).map(PingResponse::node).forEach(discoveredNodes::add); } return new PeersResponse(optionalMasterNode, discoveredNodes, 0L); }, UnicastZenPing.UnicastPingResponse::new); @@ -506,10 +513,12 @@ public abstract class PeerFinder { final PeersResponse peersResponse = handlePeersRequest(peersRequest); final List pingResponses = new ArrayList<>(); final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); - pingResponses.add(new ZenPing.PingResponse(transportService.getLocalNode(), peersResponse.getMasterNode().orElse(null), - clusterName, 0L)); + pingResponses.add(new ZenPing.PingResponse(createDiscoveryNodeWithImpossiblyHighId(transportService.getLocalNode()), + peersResponse.getMasterNode().orElse(null), + clusterName, ClusterState.UNKNOWN_VERSION)); peersResponse.getKnownPeers().forEach(dn -> pingResponses.add( - new ZenPing.PingResponse(dn, null, clusterName, ClusterState.UNKNOWN_VERSION))); + new ZenPing.PingResponse(ZenPing.PingResponse.FAKE_PING_ID, + isZen1Node(dn) ? dn : createDiscoveryNodeWithImpossiblyHighId(dn), null, clusterName, ClusterState.UNKNOWN_VERSION))); channel.sendResponse(new UnicastZenPing.UnicastPingResponse(request.id, pingResponses.toArray(new ZenPing.PingResponse[0]))); } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java index af5a3d285a3..eddcf3de6e1 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java @@ -46,6 +46,12 @@ public interface ZenPing extends Releasable { class PingResponse implements Writeable { + /** + * An ID of a ping response that was generated on behalf of another node. Needs to be less than all other ping IDs so that fake ping + * responses don't override real ones. + */ + public static long FAKE_PING_ID = -1; + private static final AtomicLong idGenerator = new AtomicLong(); // an always increasing unique identifier for this ping response. @@ -68,7 +74,19 @@ public interface ZenPing extends Releasable { * ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered) */ public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) { - this.id = idGenerator.incrementAndGet(); + this(idGenerator.incrementAndGet(), node, master, clusterName, clusterStateVersion); + } + + /** + * @param id the ping's ID + * @param node the node which this ping describes + * @param master the current master of the node + * @param clusterName the cluster name of the node + * @param clusterStateVersion the current cluster state version of that node +* ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered) + */ + public PingResponse(long id, DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) { + this.id = id; this.node = node; this.master = master; this.clusterName = clusterName; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeServiceTests.java new file mode 100644 index 00000000000..ed29e235bc8 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeServiceTests.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.lessThan; + +public class DiscoveryUpgradeServiceTests extends ESTestCase { + public void testCreateDiscoveryNodeWithImpossiblyHighId() { + final DiscoveryNode discoveryNode + = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode fakeNode = DiscoveryUpgradeService.createDiscoveryNodeWithImpossiblyHighId(discoveryNode); + assertThat(discoveryNode.getId(), lessThan(fakeNode.getId())); + assertThat(fakeNode.getId(), containsString(discoveryNode.getId())); + + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java index a4ede1df6de..e8a2a17eb06 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java @@ -18,14 +18,33 @@ */ package org.elasticsearch.cluster.coordination; +import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; +import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; +import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; +import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster.RestartCallback; import org.elasticsearch.test.discovery.TestZenDiscovery; +import java.util.List; +import java.util.stream.IntStream; + +import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.equalTo; + @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class Zen1IT extends ESIntegTestCase { - private static Settings ZEN1_SETTINGS = Coordinator.addZen1Attribute(Settings.builder() + private static Settings ZEN1_SETTINGS = Coordinator.addZen1Attribute(true, Settings.builder() .put(TestZenDiscovery.USE_ZEN2.getKey(), false) .put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false)) // Zen2 does not know about mock pings .build(); @@ -45,4 +64,168 @@ public class Zen1IT extends ESIntegTestCase { internalCluster().startNodes(randomIntBetween(1, 3), ZEN1_SETTINGS); createIndex("test"); } + + public void testMixedClusterFormation() throws Exception { + final int zen1NodeCount = randomIntBetween(1, 3); + final int zen2NodeCount = randomIntBetween(zen1NodeCount == 1 ? 2 : 1, 3); + logger.info("starting cluster of [{}] Zen1 nodes and [{}] Zen2 nodes", zen1NodeCount, zen2NodeCount); + final List nodes = internalCluster().startNodes(IntStream.range(0, zen1NodeCount + zen2NodeCount) + .mapToObj(i -> i < zen1NodeCount ? ZEN1_SETTINGS : ZEN2_SETTINGS).toArray(Settings[]::new)); + + createIndex("test", + Settings.builder() + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.ZERO) // assign shards + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, zen1NodeCount + zen2NodeCount + randomIntBetween(0, 2)) // causes rebalancing + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build()); + ensureGreen("test"); + + for (final String node : nodes) { + // With 1 Zen1 node when you stop the Zen1 node the Zen2 nodes might auto-bootstrap. + // But there are only 2 Zen2 nodes so you must do the right things with voting config exclusions to keep the cluster + // alive through the other two restarts. + final boolean masterNodeIsZen2 = zen1NodeCount <= nodes.indexOf(internalCluster().getMasterName()); + final boolean thisNodeIsZen2 = zen1NodeCount <= nodes.indexOf(node); + final boolean requiresVotingConfigExclusions = zen1NodeCount == 1 && zen2NodeCount == 2 && masterNodeIsZen2 && thisNodeIsZen2; + + if (requiresVotingConfigExclusions) { + client().execute(AddVotingConfigExclusionsAction.INSTANCE, + new AddVotingConfigExclusionsRequest(new String[]{node})).get(); + } + + internalCluster().restartNode(node, new RestartCallback() { + @Override + public Settings onNodeStopped(String restartingNode) { + String viaNode = randomValueOtherThan(restartingNode, () -> randomFrom(nodes)); + final ClusterHealthRequestBuilder clusterHealthRequestBuilder = client(viaNode).admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes(Integer.toString(zen1NodeCount + zen2NodeCount - 1)) + .setTimeout(TimeValue.timeValueSeconds(30)); + ClusterHealthResponse clusterHealthResponse = clusterHealthRequestBuilder.get(); + assertFalse(restartingNode, clusterHealthResponse.isTimedOut()); + return Settings.EMPTY; + } + }); + ensureStableCluster(zen1NodeCount + zen2NodeCount); + ensureGreen("test"); + + if (requiresVotingConfigExclusions) { + final ClearVotingConfigExclusionsRequest clearVotingTombstonesRequest = new ClearVotingConfigExclusionsRequest(); + clearVotingTombstonesRequest.setWaitForRemoval(false); + client().execute(ClearVotingConfigExclusionsAction.INSTANCE, clearVotingTombstonesRequest).get(); + } + } + } + + public void testRollingMigrationFromZen1ToZen2() throws Exception { + final int nodeCount = randomIntBetween(2, 5); + final List zen1Nodes = internalCluster().startNodes(nodeCount, ZEN1_SETTINGS); + + createIndex("test", + Settings.builder() + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.ZERO) // assign shards + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, nodeCount) // causes rebalancing + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build()); + ensureGreen("test"); + + for (final String zen1Node : zen1Nodes) { + logger.info("--> shutting down {}", zen1Node); + internalCluster().stopRandomNode(s -> NODE_NAME_SETTING.get(s).equals(zen1Node)); + + ensureStableCluster(nodeCount - 1); + if (nodeCount > 2) { + ensureGreen("test"); + } else { + ensureYellow("test"); + } + + logger.info("--> starting replacement for {}", zen1Node); + final String newNode = internalCluster().startNode(ZEN2_SETTINGS); + ensureStableCluster(nodeCount); + ensureGreen("test"); + logger.info("--> successfully replaced {} with {}", zen1Node, newNode); + } + + assertThat(internalCluster().size(), equalTo(nodeCount)); + } + + public void testRollingUpgradeFromZen1ToZen2() throws Exception { + final int nodeCount = randomIntBetween(2, 5); + final List nodes = internalCluster().startNodes(nodeCount, ZEN1_SETTINGS); + + createIndex("test", + Settings.builder() + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.ZERO) // assign shards + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, nodeCount) // causes rebalancing + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build()); + ensureGreen("test"); + + internalCluster().rollingRestart(new RestartCallback() { + @Override + public void doAfterNodes(int n, Client client) { + ensureGreen("test"); + } + + @Override + public Settings onNodeStopped(String nodeName) { + String viaNode = randomValueOtherThan(nodeName, () -> randomFrom(nodes)); + final ClusterHealthRequestBuilder clusterHealthRequestBuilder = client(viaNode).admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes(Integer.toString(nodeCount - 1)) + .setTimeout(TimeValue.timeValueSeconds(30)); + if (nodeCount == 2) { + clusterHealthRequestBuilder.setWaitForYellowStatus(); + } else { + clusterHealthRequestBuilder.setWaitForGreenStatus(); + } + ClusterHealthResponse clusterHealthResponse = clusterHealthRequestBuilder.get(); + assertFalse(nodeName, clusterHealthResponse.isTimedOut()); + return Coordinator.addZen1Attribute(false, Settings.builder().put(ZEN2_SETTINGS)).build(); + } + }); + + ensureStableCluster(nodeCount); + ensureGreen("test"); + assertThat(internalCluster().size(), equalTo(nodeCount)); + } + + private void testMultipleNodeMigrationFromZen1ToZen2(int nodeCount) throws Exception { + final List oldNodes = internalCluster().startNodes(nodeCount, ZEN1_SETTINGS); + createIndex("test", + Settings.builder() + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.ZERO) // assign shards + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, nodeCount) // causes rebalancing + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, nodeCount > 1 ? 1 : 0) + .build()); + ensureGreen("test"); + + internalCluster().startNodes(nodeCount, ZEN2_SETTINGS); + + logger.info("--> updating settings to exclude old nodes"); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() + .put(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), String.join(",", oldNodes))).get(); + + logger.info("--> waiting for old nodes to be vacated"); + waitForRelocation(); + + while (internalCluster().size() > nodeCount) { + internalCluster().stopRandomNode(settings -> oldNodes.contains(NODE_NAME_SETTING.get(settings))); + } + + ensureGreen("test"); + } + + public void testMultipleNodeMigrationFromZen1ToZen2WithOneNode() throws Exception { + testMultipleNodeMigrationFromZen1ToZen2(1); + } + + public void testMultipleNodeMigrationFromZen1ToZen2WithTwoNodes() throws Exception { + testMultipleNodeMigrationFromZen1ToZen2(2); + } + + public void testMultipleNodeMigrationFromZen1ToZen2WithThreeNodes() throws Exception { + testMultipleNodeMigrationFromZen1ToZen2(3); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index fd0bd114b3b..1d6758c4d20 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -105,6 +105,7 @@ import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchService; +import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.MockTransportClient; @@ -1943,7 +1944,8 @@ public final class InternalTestCluster extends TestCluster { } final List nodes = new ArrayList<>(); final int prevMasterCount = getMasterNodesCount(); - int bootstrapMasterNodeIndex = prevMasterCount == 0 && autoManageMinMasterNodes && newMasterCount > 0 + int bootstrapMasterNodeIndex = prevMasterCount == 0 && autoManageMinMasterNodes && newMasterCount > 0 && Arrays.stream(settings) + .allMatch(s -> Node.NODE_MASTER_SETTING.get(s) == false || TestZenDiscovery.USE_ZEN2.get(s) == true) ? RandomNumbers.randomIntBetween(random, 0, newMasterCount - 1) : -1; for (Settings nodeSettings : settings) {