diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java
index b145e688272..4cea05726b0 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java
@@ -32,6 +32,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import static org.elasticsearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
+
/**
* The core class of the cluster state coordination algorithm, directly implementing the
* formal model
@@ -321,10 +323,16 @@ public class CoordinationState {
getCurrentTerm());
}
if (clusterState.term() == getLastAcceptedTerm() && clusterState.version() <= getLastAcceptedVersion()) {
- logger.debug("handlePublishRequest: ignored publish request due to version mismatch (expected: >[{}], actual: [{}])",
- getLastAcceptedVersion(), clusterState.version());
- throw new CoordinationStateRejectedException("incoming version " + clusterState.version() +
- " lower or equal to current version " + getLastAcceptedVersion());
+ if (clusterState.term() == ZEN1_BWC_TERM
+ && clusterState.nodes().getMasterNode().equals(getLastAcceptedState().nodes().getMasterNode()) == false) {
+ logger.debug("handling publish request in compatibility mode despite version mismatch (expected: >[{}], actual: [{}])",
+ getLastAcceptedVersion(), clusterState.version());
+ } else {
+ logger.debug("handlePublishRequest: ignored publish request due to version mismatch (expected: >[{}], actual: [{}])",
+ getLastAcceptedVersion(), clusterState.version());
+ throw new CoordinationStateRejectedException("incoming version " + clusterState.version() +
+ " lower or equal to current version " + getLastAcceptedVersion());
+ }
}
logger.trace("handlePublishRequest: accepting publish request for version [{}] and term [{}]",
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
index ef4fd454a11..ee6d565b97c 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
@@ -44,6 +44,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.MasterService;
+import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
@@ -86,6 +87,9 @@ import static org.elasticsearch.gateway.ClusterStateUpdaters.hideStateIfNotRecov
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
public class Coordinator extends AbstractLifecycleComponent implements Discovery {
+
+ public static final long ZEN1_BWC_TERM = 0;
+
private static final Logger logger = LogManager.getLogger(Coordinator.class);
// the timeout for the publication of each value
@@ -121,6 +125,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private long maxTermSeen;
private final Reconfigurator reconfigurator;
private final ClusterBootstrapService clusterBootstrapService;
+ private final DiscoveryUpgradeService discoveryUpgradeService;
private final LagDetector lagDetector;
private final ClusterFormationFailureHelper clusterFormationFailureHelper;
@@ -161,6 +166,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
masterService.setClusterStateSupplier(this::getStateForMasterService);
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService);
+ this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService, this::isBootstrapped,
+ joinHelper, peerFinder::getFoundPeers, this::unsafelySetConfigurationForUpgrade);
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
transportService::getLocalNode);
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
@@ -256,6 +263,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest);
}
+ if (publishRequest.getAcceptedState().term() == ZEN1_BWC_TERM && getCurrentTerm() == ZEN1_BWC_TERM
+ && mode == Mode.FOLLOWER && Optional.of(sourceNode).equals(lastKnownLeader) == false) {
+
+ logger.debug("received cluster state from {} but currently following {}, rejecting", sourceNode, lastKnownLeader);
+ throw new CoordinationStateRejectedException("received cluster state from " + sourceNode + " but currently following "
+ + lastKnownLeader + ", rejecting");
+ }
+
ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);
@@ -323,7 +338,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
final StartJoinRequest startJoinRequest
= new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.debug("starting election with {}", startJoinRequest);
- getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node));
+ getDiscoveredNodes().forEach(node -> {
+ if (isZen1Node(node) == false) {
+ joinHelper.sendStartJoinRequest(startJoinRequest, node);
+ }
+ });
}
}
}
@@ -384,6 +403,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
clusterFormationFailureHelper.start();
+
+ if (getCurrentTerm() == ZEN1_BWC_TERM) {
+ discoveryUpgradeService.activate(lastKnownLeader);
+ }
+
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
leaderChecker.updateLeader(null);
@@ -414,6 +438,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
lastKnownLeader = Optional.of(getLocalNode());
peerFinder.deactivate(getLocalNode());
+ discoveryUpgradeService.deactivate();
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
@@ -439,6 +464,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
lastKnownLeader = Optional.of(leaderNode);
peerFinder.deactivate(leaderNode);
+ discoveryUpgradeService.deactivate();
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
cancelActivePublication();
@@ -647,9 +673,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
return false;
}
- assert currentState.term() == 0 : currentState;
- assert currentState.version() == 0 : currentState;
-
if (mode != Mode.CANDIDATE) {
throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode);
}
@@ -681,12 +704,59 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
}
+ private boolean isBootstrapped() {
+ return getLastAcceptedState().getLastAcceptedConfiguration().isEmpty() == false;
+ }
+
+ private void unsafelySetConfigurationForUpgrade(VotingConfiguration votingConfiguration) {
+ assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this method once unsafe upgrades are no longer needed";
+ synchronized (mutex) {
+ if (mode != Mode.CANDIDATE) {
+ throw new IllegalStateException("Cannot overwrite configuration in mode " + mode);
+ }
+
+ if (isBootstrapped()) {
+ throw new IllegalStateException("Cannot overwrite configuration: configuration is already set to "
+ + getLastAcceptedState().getLastAcceptedConfiguration());
+ }
+
+ if (lastKnownLeader.map(Coordinator::isZen1Node).orElse(false) == false) {
+ throw new IllegalStateException("Cannot upgrade from last-known leader: " + lastKnownLeader);
+ }
+
+ if (getCurrentTerm() != ZEN1_BWC_TERM) {
+ throw new IllegalStateException("Cannot upgrade, term is " + getCurrentTerm());
+ }
+
+ logger.info("automatically bootstrapping during rolling upgrade, using initial configuration {}", votingConfiguration);
+
+ final ClusterState currentState = getStateForMasterService();
+ final Builder builder = masterService.incrementVersion(currentState);
+ builder.metaData(MetaData.builder(currentState.metaData()).coordinationMetaData(
+ CoordinationMetaData.builder(currentState.metaData().coordinationMetaData())
+ .term(1)
+ .lastAcceptedConfiguration(votingConfiguration)
+ .lastCommittedConfiguration(votingConfiguration)
+ .build()));
+ final ClusterState newClusterState = builder.build();
+
+ coordinationState.get().handleStartJoin(new StartJoinRequest(getLocalNode(), newClusterState.term()));
+ coordinationState.get().handlePublishRequest(new PublishRequest(newClusterState));
+
+ followersChecker.clearCurrentNodes();
+ followersChecker.updateFastResponseState(getCurrentTerm(), mode);
+
+ peerFinder.deactivate(getLocalNode());
+ peerFinder.activate(newClusterState.nodes());
+ }
+ }
+
// Package-private for testing
ClusterState improveConfiguration(ClusterState clusterState) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
final Set liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false)
- .filter(this::hasJoinVoteFrom).collect(Collectors.toSet());
+ .filter(this::hasJoinVoteFrom).filter(discoveryNode -> isZen1Node(discoveryNode) == false).collect(Collectors.toSet());
final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes,
clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId).collect(Collectors.toSet()),
clusterState.getLastAcceptedConfiguration());
@@ -967,7 +1037,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
prevotingRound.close();
}
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
- prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
+ final List discoveredNodes
+ = getDiscoveredNodes().stream().filter(n -> isZen1Node(n) == false).collect(Collectors.toList());
+ prevotingRound = preVoteCollector.start(lastAcceptedState, discoveredNodes);
}
}
}
@@ -1176,13 +1248,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
// TODO: only here temporarily for BWC development, remove once complete
- public static Settings.Builder addZen1Attribute(Settings.Builder builder) {
- return builder.put("node.attr.zen1", true);
+ public static Settings.Builder addZen1Attribute(boolean isZen1Node, Settings.Builder builder) {
+ return builder.put("node.attr.zen1", isZen1Node);
}
// TODO: only here temporarily for BWC development, remove once complete
public static boolean isZen1Node(DiscoveryNode discoveryNode) {
return discoveryNode.getVersion().before(Version.V_7_0_0) ||
- discoveryNode.getAttributes().containsKey("zen1");
+ (Booleans.isTrue(discoveryNode.getAttributes().getOrDefault("zen1", "false")));
}
}
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java
new file mode 100644
index 00000000000..52fb25dd8af
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.coordination;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.CountDown;
+import org.elasticsearch.discovery.zen.ElectMasterService;
+import org.elasticsearch.discovery.zen.ElectMasterService.MasterCandidate;
+import org.elasticsearch.discovery.zen.UnicastZenPing;
+import org.elasticsearch.discovery.zen.UnicastZenPing.UnicastPingRequest;
+import org.elasticsearch.discovery.zen.UnicastZenPing.UnicastPingResponse;
+import org.elasticsearch.discovery.zen.ZenPing.PingResponse;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.threadpool.ThreadPool.Names;
+import org.elasticsearch.transport.TransportException;
+import org.elasticsearch.transport.TransportRequestOptions;
+import org.elasticsearch.transport.TransportResponseHandler;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static java.lang.Math.max;
+import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
+import static org.elasticsearch.cluster.ClusterState.UNKNOWN_VERSION;
+import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
+import static org.elasticsearch.discovery.zen.ZenDiscovery.PING_TIMEOUT_SETTING;
+
+/**
+ * Deals with rolling upgrades of the cluster coordination layer. In mixed clusters we prefer to elect the older nodes, but
+ * when the last old node shuts down then as long as there are enough new nodes we can assume that they form the whole cluster and
+ * define them as the initial configuration.
+ */
+public class DiscoveryUpgradeService {
+
+ private static Logger logger = LogManager.getLogger(DiscoveryUpgradeService.class);
+
+ // how long to wait after activation before attempting to join a master or perform a bootstrap upgrade
+ public static final Setting BWC_PING_TIMEOUT_SETTING =
+ Setting.timeSetting("discovery.zen.bwc_ping_timeout",
+ PING_TIMEOUT_SETTING, TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
+
+ // whether to try and bootstrap all the discovered Zen2 nodes when the last Zen1 node leaves the cluster.
+ public static final Setting ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING =
+ Setting.boolSetting("discovery.zen.unsafe_rolling_upgrades_enabled", true, Setting.Property.NodeScope);
+
+ private final ElectMasterService electMasterService;
+ private final TransportService transportService;
+ private final BooleanSupplier isBootstrappedSupplier;
+ private final JoinHelper joinHelper;
+ private final Supplier> peersSupplier;
+ private final Consumer initialConfigurationConsumer;
+ private final TimeValue bwcPingTimeout;
+ private final boolean enableUnsafeBootstrappingOnUpgrade;
+ private final ClusterName clusterName;
+
+ @Nullable // null if no active joining round
+ private volatile JoiningRound joiningRound;
+
+ public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSettings, TransportService transportService,
+ BooleanSupplier isBootstrappedSupplier, JoinHelper joinHelper,
+ Supplier> peersSupplier,
+ Consumer initialConfigurationConsumer) {
+ assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this service once unsafe upgrades are no longer needed";
+ electMasterService = new ElectMasterService(settings);
+ this.transportService = transportService;
+ this.isBootstrappedSupplier = isBootstrappedSupplier;
+ this.joinHelper = joinHelper;
+ this.peersSupplier = peersSupplier;
+ this.initialConfigurationConsumer = initialConfigurationConsumer;
+ this.bwcPingTimeout = BWC_PING_TIMEOUT_SETTING.get(settings);
+ this.enableUnsafeBootstrappingOnUpgrade = ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING.get(settings);
+ this.clusterName = CLUSTER_NAME_SETTING.get(settings);
+
+ clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
+ electMasterService::minimumMasterNodes); // TODO reject update if the new value is too large
+ }
+
+ public void activate(Optional lastKnownLeader) {
+ // called under coordinator mutex
+
+ if (isBootstrappedSupplier.getAsBoolean()) {
+ return;
+ }
+
+ assert lastKnownLeader.isPresent() == false || Coordinator.isZen1Node(lastKnownLeader.get()) : lastKnownLeader;
+ // if there was a leader and it's not a old node then we must have been bootstrapped
+
+ assert joiningRound == null : joiningRound;
+ joiningRound = new JoiningRound(lastKnownLeader.isPresent());
+ joiningRound.scheduleNextAttempt();
+ }
+
+ public void deactivate() {
+ // called under coordinator mutex
+ joiningRound = null;
+ }
+
+ /**
+ * Waits for some number of calls to {@link ListenableCountDown#countDown()} and then notifies a listener. The listener
+ * is only ever notified once, whether successful or not.
+ */
+ private static class ListenableCountDown {
+ private final CountDown countDown;
+ private final ActionListener listener;
+
+ ListenableCountDown(int count, ActionListener listener) {
+ this.countDown = new CountDown(count);
+ this.listener = listener;
+ }
+
+ void onFailure(Exception e) {
+ if (countDown.fastForward()) {
+ listener.onFailure(e);
+ }
+ }
+
+ void countDown() {
+ if (countDown.countDown()) {
+ listener.onResponse(null);
+ }
+ }
+ }
+
+ private class JoiningRound {
+ private final boolean upgrading;
+
+ JoiningRound(boolean upgrading) {
+ this.upgrading = upgrading;
+ }
+
+ private boolean isRunning() {
+ return joiningRound == this && isBootstrappedSupplier.getAsBoolean() == false;
+ }
+
+ void scheduleNextAttempt() {
+ if (isRunning() == false) {
+ return;
+ }
+
+ final ThreadPool threadPool = transportService.getThreadPool();
+ threadPool.scheduleUnlessShuttingDown(bwcPingTimeout, Names.SAME, new Runnable() {
+
+ @Override
+ public void run() {
+ if (isRunning() == false) {
+ return;
+ }
+
+ final Set discoveryNodes = Stream.concat(StreamSupport.stream(peersSupplier.get().spliterator(), false),
+ Stream.of(transportService.getLocalNode())).filter(DiscoveryNode::isMasterNode).collect(Collectors.toSet());
+
+ // this set of nodes is reasonably fresh - the PeerFinder cleans up nodes to which the transport service is not
+ // connected each time it wakes up (every second by default)
+
+ logger.debug("nodes: {}", discoveryNodes);
+
+ if (electMasterService.hasEnoughMasterNodes(discoveryNodes)) {
+ if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) {
+ electBestOldMaster(discoveryNodes);
+ } else if (upgrading && enableUnsafeBootstrappingOnUpgrade) {
+ // no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade
+ transportService.getThreadPool().generic().execute(() -> {
+ try {
+ initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream()
+ .map(DiscoveryNode::getId).collect(Collectors.toSet())));
+ } catch (Exception e) {
+ logger.debug("exception during bootstrapping upgrade, retrying", e);
+ } finally {
+ scheduleNextAttempt();
+ }
+ });
+ } else {
+ scheduleNextAttempt();
+ }
+ } else {
+ scheduleNextAttempt();
+ }
+ }
+
+ /**
+ * Ping all the old discovered masters one more time to obtain their cluster state versions, and then vote for the best one.
+ * @param discoveryNodes The master nodes (old and new).
+ */
+ private void electBestOldMaster(Set discoveryNodes) {
+ final Set masterCandidates = newConcurrentSet();
+ final ListenableCountDown listenableCountDown
+ = new ListenableCountDown(discoveryNodes.size(), new ActionListener() {
+
+ @Override
+ public void onResponse(Void value) {
+ assert masterCandidates.size() == discoveryNodes.size()
+ : masterCandidates + " does not match " + discoveryNodes;
+
+ // TODO we shouldn't elect a master with a version that's older than ours
+ // If the only Zen1 nodes left are stale, and we can bootstrap, maybe we should bootstrap?
+ // Do we ever need to elect a freshly-started Zen1 node?
+ if (isRunning()) {
+ final MasterCandidate electedMaster = electMasterService.electMaster(masterCandidates);
+ logger.debug("elected {}, sending join", electedMaster);
+ joinHelper.sendJoinRequest(electedMaster.getNode(), Optional.empty(),
+ JoiningRound.this::scheduleNextAttempt);
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ scheduleNextAttempt();
+ }
+ });
+
+ boolean foundOldMaster = false;
+ for (final DiscoveryNode discoveryNode : discoveryNodes) {
+ assert discoveryNode.isMasterNode() : discoveryNode;
+ if (Coordinator.isZen1Node(discoveryNode)) {
+ foundOldMaster = true;
+ transportService.sendRequest(discoveryNode, UnicastZenPing.ACTION_NAME,
+ new UnicastPingRequest(0, TimeValue.ZERO,
+ new PingResponse(createDiscoveryNodeWithImpossiblyHighId(transportService.getLocalNode()),
+ null, clusterName, UNKNOWN_VERSION)),
+ TransportRequestOptions.builder().withTimeout(bwcPingTimeout).build(),
+ new TransportResponseHandler() {
+ @Override
+ public void handleResponse(UnicastPingResponse response) {
+ long clusterStateVersion = UNKNOWN_VERSION;
+ for (PingResponse pingResponse : response.pingResponses) {
+ if (discoveryNode.equals(pingResponse.node())) {
+ clusterStateVersion
+ = max(clusterStateVersion, pingResponse.getClusterStateVersion());
+ }
+ }
+ masterCandidates.add(new MasterCandidate(discoveryNode, clusterStateVersion));
+ listenableCountDown.countDown();
+ }
+
+ @Override
+ public void handleException(TransportException exp) {
+ logger.debug(
+ new ParameterizedMessage("unexpected exception when pinging {}", discoveryNode), exp);
+ listenableCountDown.onFailure(exp);
+ }
+
+ @Override
+ public String executor() {
+ return Names.SAME;
+ }
+
+ @Override
+ public UnicastPingResponse read(StreamInput in) throws IOException {
+ return new UnicastPingResponse(in);
+ }
+ });
+
+ } else {
+ masterCandidates.add(
+ new MasterCandidate(createDiscoveryNodeWithImpossiblyHighId(discoveryNode), UNKNOWN_VERSION));
+ listenableCountDown.countDown();
+ }
+ }
+ assert foundOldMaster;
+ }
+
+ @Override
+ public String toString() {
+ return "discovery upgrade service retry";
+ }
+ });
+ }
+ }
+
+ /**
+ * Pre-7.0 nodes select the best master by comparing their IDs (as strings) and selecting the lowest one amongst those nodes with
+ * the best cluster state version. We want 7.0+ nodes to participate in these elections in a mixed cluster but never to win one, so
+ * we lie and claim to have an impossible ID that compares above all genuine IDs.
+ */
+ public static DiscoveryNode createDiscoveryNodeWithImpossiblyHighId(DiscoveryNode node) {
+ // IDs are base-64-encoded UUIDs, which means they the character set [0-9A-Za-z_-]. The highest character in this set is 'z', and
+ // 'z' < '{', so by starting the ID with '{' we can be sure it's greater. This is terrible.
+ final String fakeId = "{zen2}" + node.getId();
+ assert node.getId().compareTo(fakeId) < 0 : node + " vs " + fakeId;
+ return new DiscoveryNode(node.getName(), fakeId, node.getEphemeralId(), node.getHostName(),
+ node.getHostAddress(), node.getAddress(), node.getAttributes(), node.getRoles(), node.getVersion());
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java
index 14c7c47f4c5..f00be2259a4 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java
@@ -166,6 +166,11 @@ public class JoinHelper {
}
public void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin) {
+ sendJoinRequest(destination, optionalJoin, () -> {
+ });
+ }
+
+ public void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin, Runnable onCompletion) {
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
final Tuple dedupKey = Tuple.tuple(destination, joinRequest);
@@ -192,12 +197,14 @@ public class JoinHelper {
public void handleResponse(Empty response) {
pendingOutgoingJoins.remove(dedupKey);
logger.debug("successfully joined {} with {}", destination, joinRequest);
+ onCompletion.run();
}
@Override
public void handleException(TransportException exp) {
pendingOutgoingJoins.remove(dedupKey);
logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
+ onCompletion.run();
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java
index a4580c062b6..9544cf15a0c 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java
@@ -151,7 +151,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor joiningNodes) {
+ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState currentState, List joiningNodes) {
assert currentState.nodes().getMasterNodeId() == null : currentState;
DiscoveryNodes currentNodes = currentState.nodes();
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes);
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java
index 5001e3be181..6137f5a6d0f 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java
@@ -38,6 +38,7 @@ import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
+import java.util.stream.StreamSupport;
import static org.elasticsearch.cluster.coordination.CoordinationState.isElectionQuorum;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
@@ -140,6 +141,7 @@ public class PreVoteCollector {
}
void start(final Iterable broadcastNodes) {
+ assert StreamSupport.stream(broadcastNodes.spliterator(), false).noneMatch(Coordinator::isZen1Node) : broadcastNodes;
logger.debug("{} requesting pre-votes from {}", this, broadcastNodes);
broadcastNodes.forEach(n -> transportService.sendRequest(n, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest,
new TransportResponseHandler() {
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java
index 8b227c0b261..5c7b9562d8d 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java
@@ -94,6 +94,7 @@ public class Reconfigurator {
* @return An optimal configuration, or leave the current configuration unchanged if the optimal configuration has no live quorum.
*/
public VotingConfiguration reconfigure(Set liveNodes, Set retiredNodeIds, VotingConfiguration currentConfig) {
+ assert liveNodes.stream().noneMatch(Coordinator::isZen1Node) : liveNodes;
logger.trace("{} reconfiguring {} based on liveNodes={}, retiredNodeIds={}", this, currentConfig, liveNodes, retiredNodeIds);
/*
diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
index 7775f1de1a2..f150650948d 100644
--- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
@@ -35,6 +35,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper;
import org.elasticsearch.cluster.coordination.Coordinator;
+import org.elasticsearch.cluster.coordination.DiscoveryUpgradeService;
import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory;
import org.elasticsearch.cluster.coordination.FollowersChecker;
import org.elasticsearch.cluster.coordination.JoinHelper;
@@ -476,7 +477,9 @@ public final class ClusterSettings extends AbstractScopedSettings {
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING,
ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING,
ClusterBootstrapService.UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING,
- LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING
+ LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING,
+ DiscoveryUpgradeService.BWC_PING_TIMEOUT_SETTING,
+ DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING
)));
public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
index 36090f645f6..61d2e031977 100644
--- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
+++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
@@ -27,7 +27,6 @@ import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.PeersResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -41,6 +40,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.ZenPing;
+import org.elasticsearch.discovery.zen.ZenPing.PingResponse;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportChannel;
@@ -62,6 +62,8 @@ import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import static org.elasticsearch.cluster.coordination.Coordinator.isZen1Node;
+import static org.elasticsearch.cluster.coordination.DiscoveryUpgradeService.createDiscoveryNodeWithImpossiblyHighId;
import static java.util.Collections.emptyList;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
@@ -412,6 +414,11 @@ public abstract class PeerFinder {
final DiscoveryNode discoveryNode = getDiscoveryNode();
assert discoveryNode != null : "cannot request peers without first connecting";
+ if (discoveryNode.equals(getLocalNode())) {
+ logger.trace("{} not requesting peers from local node", this);
+ return;
+ }
+
logger.trace("{} requesting peers", this);
peersRequestInFlight = true;
@@ -459,11 +466,11 @@ public abstract class PeerFinder {
final String actionName;
final TransportRequest transportRequest;
final TransportResponseHandler> transportResponseHandler;
- if (Coordinator.isZen1Node(discoveryNode)) {
+ if (isZen1Node(discoveryNode)) {
actionName = UnicastZenPing.ACTION_NAME;
transportRequest = new UnicastZenPing.UnicastPingRequest(1, ZenDiscovery.PING_TIMEOUT_SETTING.get(settings),
- new ZenPing.PingResponse(getLocalNode(), null, ClusterName.CLUSTER_NAME_SETTING.get(settings),
- ClusterState.UNKNOWN_VERSION));
+ new ZenPing.PingResponse(createDiscoveryNodeWithImpossiblyHighId(getLocalNode()), null,
+ ClusterName.CLUSTER_NAME_SETTING.get(settings), ClusterState.UNKNOWN_VERSION));
transportResponseHandler = peersResponseHandler.wrap(ucResponse -> {
Optional optionalMasterNode = Arrays.stream(ucResponse.pingResponses)
.filter(pr -> discoveryNode.equals(pr.node()) && discoveryNode.equals(pr.master()))
@@ -471,9 +478,9 @@ public abstract class PeerFinder {
.findFirst();
List discoveredNodes = new ArrayList<>();
if (optionalMasterNode.isPresent() == false) {
- Arrays.stream(ucResponse.pingResponses).map(pr -> pr.master()).filter(Objects::nonNull)
+ Arrays.stream(ucResponse.pingResponses).map(PingResponse::master).filter(Objects::nonNull)
.forEach(discoveredNodes::add);
- Arrays.stream(ucResponse.pingResponses).map(pr -> pr.node()).forEach(discoveredNodes::add);
+ Arrays.stream(ucResponse.pingResponses).map(PingResponse::node).forEach(discoveredNodes::add);
}
return new PeersResponse(optionalMasterNode, discoveredNodes, 0L);
}, UnicastZenPing.UnicastPingResponse::new);
@@ -506,10 +513,12 @@ public abstract class PeerFinder {
final PeersResponse peersResponse = handlePeersRequest(peersRequest);
final List pingResponses = new ArrayList<>();
final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
- pingResponses.add(new ZenPing.PingResponse(transportService.getLocalNode(), peersResponse.getMasterNode().orElse(null),
- clusterName, 0L));
+ pingResponses.add(new ZenPing.PingResponse(createDiscoveryNodeWithImpossiblyHighId(transportService.getLocalNode()),
+ peersResponse.getMasterNode().orElse(null),
+ clusterName, ClusterState.UNKNOWN_VERSION));
peersResponse.getKnownPeers().forEach(dn -> pingResponses.add(
- new ZenPing.PingResponse(dn, null, clusterName, ClusterState.UNKNOWN_VERSION)));
+ new ZenPing.PingResponse(ZenPing.PingResponse.FAKE_PING_ID,
+ isZen1Node(dn) ? dn : createDiscoveryNodeWithImpossiblyHighId(dn), null, clusterName, ClusterState.UNKNOWN_VERSION)));
channel.sendResponse(new UnicastZenPing.UnicastPingResponse(request.id, pingResponses.toArray(new ZenPing.PingResponse[0])));
}
}
diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java
index af5a3d285a3..eddcf3de6e1 100644
--- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java
+++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java
@@ -46,6 +46,12 @@ public interface ZenPing extends Releasable {
class PingResponse implements Writeable {
+ /**
+ * An ID of a ping response that was generated on behalf of another node. Needs to be less than all other ping IDs so that fake ping
+ * responses don't override real ones.
+ */
+ public static long FAKE_PING_ID = -1;
+
private static final AtomicLong idGenerator = new AtomicLong();
// an always increasing unique identifier for this ping response.
@@ -68,7 +74,19 @@ public interface ZenPing extends Releasable {
* ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)
*/
public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {
- this.id = idGenerator.incrementAndGet();
+ this(idGenerator.incrementAndGet(), node, master, clusterName, clusterStateVersion);
+ }
+
+ /**
+ * @param id the ping's ID
+ * @param node the node which this ping describes
+ * @param master the current master of the node
+ * @param clusterName the cluster name of the node
+ * @param clusterStateVersion the current cluster state version of that node
+* ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)
+ */
+ public PingResponse(long id, DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {
+ this.id = id;
this.node = node;
this.master = master;
this.clusterName = clusterName;
diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeServiceTests.java
new file mode 100644
index 00000000000..ed29e235bc8
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeServiceTests.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.coordination;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.lessThan;
+
+public class DiscoveryUpgradeServiceTests extends ESTestCase {
+ public void testCreateDiscoveryNodeWithImpossiblyHighId() {
+ final DiscoveryNode discoveryNode
+ = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
+ final DiscoveryNode fakeNode = DiscoveryUpgradeService.createDiscoveryNodeWithImpossiblyHighId(discoveryNode);
+ assertThat(discoveryNode.getId(), lessThan(fakeNode.getId()));
+ assertThat(fakeNode.getId(), containsString(discoveryNode.getId()));
+
+ }
+}
diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java
index a4ede1df6de..e8a2a17eb06 100644
--- a/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java
+++ b/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java
@@ -18,14 +18,33 @@
*/
package org.elasticsearch.cluster.coordination;
+import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
+import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
+import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
+import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.discovery.TestZenDiscovery;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING;
+import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
+import static org.hamcrest.Matchers.equalTo;
+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class Zen1IT extends ESIntegTestCase {
- private static Settings ZEN1_SETTINGS = Coordinator.addZen1Attribute(Settings.builder()
+ private static Settings ZEN1_SETTINGS = Coordinator.addZen1Attribute(true, Settings.builder()
.put(TestZenDiscovery.USE_ZEN2.getKey(), false)
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false)) // Zen2 does not know about mock pings
.build();
@@ -45,4 +64,168 @@ public class Zen1IT extends ESIntegTestCase {
internalCluster().startNodes(randomIntBetween(1, 3), ZEN1_SETTINGS);
createIndex("test");
}
+
+ public void testMixedClusterFormation() throws Exception {
+ final int zen1NodeCount = randomIntBetween(1, 3);
+ final int zen2NodeCount = randomIntBetween(zen1NodeCount == 1 ? 2 : 1, 3);
+ logger.info("starting cluster of [{}] Zen1 nodes and [{}] Zen2 nodes", zen1NodeCount, zen2NodeCount);
+ final List nodes = internalCluster().startNodes(IntStream.range(0, zen1NodeCount + zen2NodeCount)
+ .mapToObj(i -> i < zen1NodeCount ? ZEN1_SETTINGS : ZEN2_SETTINGS).toArray(Settings[]::new));
+
+ createIndex("test",
+ Settings.builder()
+ .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.ZERO) // assign shards
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, zen1NodeCount + zen2NodeCount + randomIntBetween(0, 2)) // causes rebalancing
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
+ .build());
+ ensureGreen("test");
+
+ for (final String node : nodes) {
+ // With 1 Zen1 node when you stop the Zen1 node the Zen2 nodes might auto-bootstrap.
+ // But there are only 2 Zen2 nodes so you must do the right things with voting config exclusions to keep the cluster
+ // alive through the other two restarts.
+ final boolean masterNodeIsZen2 = zen1NodeCount <= nodes.indexOf(internalCluster().getMasterName());
+ final boolean thisNodeIsZen2 = zen1NodeCount <= nodes.indexOf(node);
+ final boolean requiresVotingConfigExclusions = zen1NodeCount == 1 && zen2NodeCount == 2 && masterNodeIsZen2 && thisNodeIsZen2;
+
+ if (requiresVotingConfigExclusions) {
+ client().execute(AddVotingConfigExclusionsAction.INSTANCE,
+ new AddVotingConfigExclusionsRequest(new String[]{node})).get();
+ }
+
+ internalCluster().restartNode(node, new RestartCallback() {
+ @Override
+ public Settings onNodeStopped(String restartingNode) {
+ String viaNode = randomValueOtherThan(restartingNode, () -> randomFrom(nodes));
+ final ClusterHealthRequestBuilder clusterHealthRequestBuilder = client(viaNode).admin().cluster().prepareHealth()
+ .setWaitForEvents(Priority.LANGUID)
+ .setWaitForNodes(Integer.toString(zen1NodeCount + zen2NodeCount - 1))
+ .setTimeout(TimeValue.timeValueSeconds(30));
+ ClusterHealthResponse clusterHealthResponse = clusterHealthRequestBuilder.get();
+ assertFalse(restartingNode, clusterHealthResponse.isTimedOut());
+ return Settings.EMPTY;
+ }
+ });
+ ensureStableCluster(zen1NodeCount + zen2NodeCount);
+ ensureGreen("test");
+
+ if (requiresVotingConfigExclusions) {
+ final ClearVotingConfigExclusionsRequest clearVotingTombstonesRequest = new ClearVotingConfigExclusionsRequest();
+ clearVotingTombstonesRequest.setWaitForRemoval(false);
+ client().execute(ClearVotingConfigExclusionsAction.INSTANCE, clearVotingTombstonesRequest).get();
+ }
+ }
+ }
+
+ public void testRollingMigrationFromZen1ToZen2() throws Exception {
+ final int nodeCount = randomIntBetween(2, 5);
+ final List zen1Nodes = internalCluster().startNodes(nodeCount, ZEN1_SETTINGS);
+
+ createIndex("test",
+ Settings.builder()
+ .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.ZERO) // assign shards
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, nodeCount) // causes rebalancing
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
+ .build());
+ ensureGreen("test");
+
+ for (final String zen1Node : zen1Nodes) {
+ logger.info("--> shutting down {}", zen1Node);
+ internalCluster().stopRandomNode(s -> NODE_NAME_SETTING.get(s).equals(zen1Node));
+
+ ensureStableCluster(nodeCount - 1);
+ if (nodeCount > 2) {
+ ensureGreen("test");
+ } else {
+ ensureYellow("test");
+ }
+
+ logger.info("--> starting replacement for {}", zen1Node);
+ final String newNode = internalCluster().startNode(ZEN2_SETTINGS);
+ ensureStableCluster(nodeCount);
+ ensureGreen("test");
+ logger.info("--> successfully replaced {} with {}", zen1Node, newNode);
+ }
+
+ assertThat(internalCluster().size(), equalTo(nodeCount));
+ }
+
+ public void testRollingUpgradeFromZen1ToZen2() throws Exception {
+ final int nodeCount = randomIntBetween(2, 5);
+ final List nodes = internalCluster().startNodes(nodeCount, ZEN1_SETTINGS);
+
+ createIndex("test",
+ Settings.builder()
+ .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.ZERO) // assign shards
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, nodeCount) // causes rebalancing
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
+ .build());
+ ensureGreen("test");
+
+ internalCluster().rollingRestart(new RestartCallback() {
+ @Override
+ public void doAfterNodes(int n, Client client) {
+ ensureGreen("test");
+ }
+
+ @Override
+ public Settings onNodeStopped(String nodeName) {
+ String viaNode = randomValueOtherThan(nodeName, () -> randomFrom(nodes));
+ final ClusterHealthRequestBuilder clusterHealthRequestBuilder = client(viaNode).admin().cluster().prepareHealth()
+ .setWaitForEvents(Priority.LANGUID)
+ .setWaitForNodes(Integer.toString(nodeCount - 1))
+ .setTimeout(TimeValue.timeValueSeconds(30));
+ if (nodeCount == 2) {
+ clusterHealthRequestBuilder.setWaitForYellowStatus();
+ } else {
+ clusterHealthRequestBuilder.setWaitForGreenStatus();
+ }
+ ClusterHealthResponse clusterHealthResponse = clusterHealthRequestBuilder.get();
+ assertFalse(nodeName, clusterHealthResponse.isTimedOut());
+ return Coordinator.addZen1Attribute(false, Settings.builder().put(ZEN2_SETTINGS)).build();
+ }
+ });
+
+ ensureStableCluster(nodeCount);
+ ensureGreen("test");
+ assertThat(internalCluster().size(), equalTo(nodeCount));
+ }
+
+ private void testMultipleNodeMigrationFromZen1ToZen2(int nodeCount) throws Exception {
+ final List oldNodes = internalCluster().startNodes(nodeCount, ZEN1_SETTINGS);
+ createIndex("test",
+ Settings.builder()
+ .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.ZERO) // assign shards
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, nodeCount) // causes rebalancing
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, nodeCount > 1 ? 1 : 0)
+ .build());
+ ensureGreen("test");
+
+ internalCluster().startNodes(nodeCount, ZEN2_SETTINGS);
+
+ logger.info("--> updating settings to exclude old nodes");
+ client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder()
+ .put(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), String.join(",", oldNodes))).get();
+
+ logger.info("--> waiting for old nodes to be vacated");
+ waitForRelocation();
+
+ while (internalCluster().size() > nodeCount) {
+ internalCluster().stopRandomNode(settings -> oldNodes.contains(NODE_NAME_SETTING.get(settings)));
+ }
+
+ ensureGreen("test");
+ }
+
+ public void testMultipleNodeMigrationFromZen1ToZen2WithOneNode() throws Exception {
+ testMultipleNodeMigrationFromZen1ToZen2(1);
+ }
+
+ public void testMultipleNodeMigrationFromZen1ToZen2WithTwoNodes() throws Exception {
+ testMultipleNodeMigrationFromZen1ToZen2(2);
+ }
+
+ public void testMultipleNodeMigrationFromZen1ToZen2WithThreeNodes() throws Exception {
+ testMultipleNodeMigrationFromZen1ToZen2(3);
+ }
}
diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
index fd0bd114b3b..1d6758c4d20 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
@@ -105,6 +105,7 @@ import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
+import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.MockTransportClient;
@@ -1943,7 +1944,8 @@ public final class InternalTestCluster extends TestCluster {
}
final List nodes = new ArrayList<>();
final int prevMasterCount = getMasterNodesCount();
- int bootstrapMasterNodeIndex = prevMasterCount == 0 && autoManageMinMasterNodes && newMasterCount > 0
+ int bootstrapMasterNodeIndex = prevMasterCount == 0 && autoManageMinMasterNodes && newMasterCount > 0 && Arrays.stream(settings)
+ .allMatch(s -> Node.NODE_MASTER_SETTING.get(s) == false || TestZenDiscovery.USE_ZEN2.get(s) == true)
? RandomNumbers.randomIntBetween(random, 0, newMasterCount - 1) : -1;
for (Settings nodeSettings : settings) {