[Zen2] Support rolling upgrades from Zen1 ()

We support rolling upgrades from Zen1 by keeping the master as a Zen1 node
until there are no more Zen1 nodes in the cluster, using the following
principles:

- Zen1 nodes will never vote for Zen2 nodes
- Zen2 nodes will, while not bootstrapped, vote for Zen1 nodes
- Zen2 nodes that were previously part of a mixed cluster will automatically
  (and unsafely) bootstrap themselves when the last Zen1 node leaves.
This commit is contained in:
David Turner 2018-12-08 07:33:35 +00:00 committed by GitHub
parent a27f2efca5
commit 9f86e996fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 690 additions and 27 deletions
server/src
test/framework/src/main/java/org/elasticsearch/test

@ -32,6 +32,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import static org.elasticsearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
/**
* The core class of the cluster state coordination algorithm, directly implementing the
* <a href="https://github.com/elastic/elasticsearch-formal-models/blob/master/ZenWithTerms/tla/ZenWithTerms.tla">formal model</a>
@ -321,10 +323,16 @@ public class CoordinationState {
getCurrentTerm());
}
if (clusterState.term() == getLastAcceptedTerm() && clusterState.version() <= getLastAcceptedVersion()) {
logger.debug("handlePublishRequest: ignored publish request due to version mismatch (expected: >[{}], actual: [{}])",
getLastAcceptedVersion(), clusterState.version());
throw new CoordinationStateRejectedException("incoming version " + clusterState.version() +
" lower or equal to current version " + getLastAcceptedVersion());
if (clusterState.term() == ZEN1_BWC_TERM
&& clusterState.nodes().getMasterNode().equals(getLastAcceptedState().nodes().getMasterNode()) == false) {
logger.debug("handling publish request in compatibility mode despite version mismatch (expected: >[{}], actual: [{}])",
getLastAcceptedVersion(), clusterState.version());
} else {
logger.debug("handlePublishRequest: ignored publish request due to version mismatch (expected: >[{}], actual: [{}])",
getLastAcceptedVersion(), clusterState.version());
throw new CoordinationStateRejectedException("incoming version " + clusterState.version() +
" lower or equal to current version " + getLastAcceptedVersion());
}
}
logger.trace("handlePublishRequest: accepting publish request for version [{}] and term [{}]",

@ -44,6 +44,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
@ -86,6 +87,9 @@ import static org.elasticsearch.gateway.ClusterStateUpdaters.hideStateIfNotRecov
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
public class Coordinator extends AbstractLifecycleComponent implements Discovery {
public static final long ZEN1_BWC_TERM = 0;
private static final Logger logger = LogManager.getLogger(Coordinator.class);
// the timeout for the publication of each value
@ -121,6 +125,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private long maxTermSeen;
private final Reconfigurator reconfigurator;
private final ClusterBootstrapService clusterBootstrapService;
private final DiscoveryUpgradeService discoveryUpgradeService;
private final LagDetector lagDetector;
private final ClusterFormationFailureHelper clusterFormationFailureHelper;
@ -161,6 +166,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
masterService.setClusterStateSupplier(this::getStateForMasterService);
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService);
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService, this::isBootstrapped,
joinHelper, peerFinder::getFoundPeers, this::unsafelySetConfigurationForUpgrade);
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
transportService::getLocalNode);
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
@ -256,6 +263,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest);
}
if (publishRequest.getAcceptedState().term() == ZEN1_BWC_TERM && getCurrentTerm() == ZEN1_BWC_TERM
&& mode == Mode.FOLLOWER && Optional.of(sourceNode).equals(lastKnownLeader) == false) {
logger.debug("received cluster state from {} but currently following {}, rejecting", sourceNode, lastKnownLeader);
throw new CoordinationStateRejectedException("received cluster state from " + sourceNode + " but currently following "
+ lastKnownLeader + ", rejecting");
}
ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);
@ -323,7 +338,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
final StartJoinRequest startJoinRequest
= new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.debug("starting election with {}", startJoinRequest);
getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node));
getDiscoveredNodes().forEach(node -> {
if (isZen1Node(node) == false) {
joinHelper.sendStartJoinRequest(startJoinRequest, node);
}
});
}
}
}
@ -384,6 +403,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
clusterFormationFailureHelper.start();
if (getCurrentTerm() == ZEN1_BWC_TERM) {
discoveryUpgradeService.activate(lastKnownLeader);
}
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
leaderChecker.updateLeader(null);
@ -414,6 +438,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
lastKnownLeader = Optional.of(getLocalNode());
peerFinder.deactivate(getLocalNode());
discoveryUpgradeService.deactivate();
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
@ -439,6 +464,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
lastKnownLeader = Optional.of(leaderNode);
peerFinder.deactivate(leaderNode);
discoveryUpgradeService.deactivate();
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
cancelActivePublication();
@ -647,9 +673,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
return false;
}
assert currentState.term() == 0 : currentState;
assert currentState.version() == 0 : currentState;
if (mode != Mode.CANDIDATE) {
throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode);
}
@ -681,12 +704,59 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
}
private boolean isBootstrapped() {
return getLastAcceptedState().getLastAcceptedConfiguration().isEmpty() == false;
}
private void unsafelySetConfigurationForUpgrade(VotingConfiguration votingConfiguration) {
assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this method once unsafe upgrades are no longer needed";
synchronized (mutex) {
if (mode != Mode.CANDIDATE) {
throw new IllegalStateException("Cannot overwrite configuration in mode " + mode);
}
if (isBootstrapped()) {
throw new IllegalStateException("Cannot overwrite configuration: configuration is already set to "
+ getLastAcceptedState().getLastAcceptedConfiguration());
}
if (lastKnownLeader.map(Coordinator::isZen1Node).orElse(false) == false) {
throw new IllegalStateException("Cannot upgrade from last-known leader: " + lastKnownLeader);
}
if (getCurrentTerm() != ZEN1_BWC_TERM) {
throw new IllegalStateException("Cannot upgrade, term is " + getCurrentTerm());
}
logger.info("automatically bootstrapping during rolling upgrade, using initial configuration {}", votingConfiguration);
final ClusterState currentState = getStateForMasterService();
final Builder builder = masterService.incrementVersion(currentState);
builder.metaData(MetaData.builder(currentState.metaData()).coordinationMetaData(
CoordinationMetaData.builder(currentState.metaData().coordinationMetaData())
.term(1)
.lastAcceptedConfiguration(votingConfiguration)
.lastCommittedConfiguration(votingConfiguration)
.build()));
final ClusterState newClusterState = builder.build();
coordinationState.get().handleStartJoin(new StartJoinRequest(getLocalNode(), newClusterState.term()));
coordinationState.get().handlePublishRequest(new PublishRequest(newClusterState));
followersChecker.clearCurrentNodes();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
peerFinder.deactivate(getLocalNode());
peerFinder.activate(newClusterState.nodes());
}
}
// Package-private for testing
ClusterState improveConfiguration(ClusterState clusterState) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
final Set<DiscoveryNode> liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false)
.filter(this::hasJoinVoteFrom).collect(Collectors.toSet());
.filter(this::hasJoinVoteFrom).filter(discoveryNode -> isZen1Node(discoveryNode) == false).collect(Collectors.toSet());
final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes,
clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId).collect(Collectors.toSet()),
clusterState.getLastAcceptedConfiguration());
@ -967,7 +1037,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
prevotingRound.close();
}
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
final List<DiscoveryNode> discoveredNodes
= getDiscoveredNodes().stream().filter(n -> isZen1Node(n) == false).collect(Collectors.toList());
prevotingRound = preVoteCollector.start(lastAcceptedState, discoveredNodes);
}
}
}
@ -1176,13 +1248,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
// TODO: only here temporarily for BWC development, remove once complete
public static Settings.Builder addZen1Attribute(Settings.Builder builder) {
return builder.put("node.attr.zen1", true);
public static Settings.Builder addZen1Attribute(boolean isZen1Node, Settings.Builder builder) {
return builder.put("node.attr.zen1", isZen1Node);
}
// TODO: only here temporarily for BWC development, remove once complete
public static boolean isZen1Node(DiscoveryNode discoveryNode) {
return discoveryNode.getVersion().before(Version.V_7_0_0) ||
discoveryNode.getAttributes().containsKey("zen1");
(Booleans.isTrue(discoveryNode.getAttributes().getOrDefault("zen1", "false")));
}
}

@ -0,0 +1,320 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.ElectMasterService.MasterCandidate;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.UnicastZenPing.UnicastPingRequest;
import org.elasticsearch.discovery.zen.UnicastZenPing.UnicastPingResponse;
import org.elasticsearch.discovery.zen.ZenPing.PingResponse;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.lang.Math.max;
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.elasticsearch.cluster.ClusterState.UNKNOWN_VERSION;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
import static org.elasticsearch.discovery.zen.ZenDiscovery.PING_TIMEOUT_SETTING;
/**
* Deals with rolling upgrades of the cluster coordination layer. In mixed clusters we prefer to elect the older nodes, but
* when the last old node shuts down then as long as there are enough new nodes we can assume that they form the whole cluster and
* define them as the initial configuration.
*/
public class DiscoveryUpgradeService {
private static Logger logger = LogManager.getLogger(DiscoveryUpgradeService.class);
// how long to wait after activation before attempting to join a master or perform a bootstrap upgrade
public static final Setting<TimeValue> BWC_PING_TIMEOUT_SETTING =
Setting.timeSetting("discovery.zen.bwc_ping_timeout",
PING_TIMEOUT_SETTING, TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
// whether to try and bootstrap all the discovered Zen2 nodes when the last Zen1 node leaves the cluster.
public static final Setting<Boolean> ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING =
Setting.boolSetting("discovery.zen.unsafe_rolling_upgrades_enabled", true, Setting.Property.NodeScope);
private final ElectMasterService electMasterService;
private final TransportService transportService;
private final BooleanSupplier isBootstrappedSupplier;
private final JoinHelper joinHelper;
private final Supplier<Iterable<DiscoveryNode>> peersSupplier;
private final Consumer<VotingConfiguration> initialConfigurationConsumer;
private final TimeValue bwcPingTimeout;
private final boolean enableUnsafeBootstrappingOnUpgrade;
private final ClusterName clusterName;
@Nullable // null if no active joining round
private volatile JoiningRound joiningRound;
public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSettings, TransportService transportService,
BooleanSupplier isBootstrappedSupplier, JoinHelper joinHelper,
Supplier<Iterable<DiscoveryNode>> peersSupplier,
Consumer<VotingConfiguration> initialConfigurationConsumer) {
assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this service once unsafe upgrades are no longer needed";
electMasterService = new ElectMasterService(settings);
this.transportService = transportService;
this.isBootstrappedSupplier = isBootstrappedSupplier;
this.joinHelper = joinHelper;
this.peersSupplier = peersSupplier;
this.initialConfigurationConsumer = initialConfigurationConsumer;
this.bwcPingTimeout = BWC_PING_TIMEOUT_SETTING.get(settings);
this.enableUnsafeBootstrappingOnUpgrade = ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING.get(settings);
this.clusterName = CLUSTER_NAME_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
electMasterService::minimumMasterNodes); // TODO reject update if the new value is too large
}
public void activate(Optional<DiscoveryNode> lastKnownLeader) {
// called under coordinator mutex
if (isBootstrappedSupplier.getAsBoolean()) {
return;
}
assert lastKnownLeader.isPresent() == false || Coordinator.isZen1Node(lastKnownLeader.get()) : lastKnownLeader;
// if there was a leader and it's not a old node then we must have been bootstrapped
assert joiningRound == null : joiningRound;
joiningRound = new JoiningRound(lastKnownLeader.isPresent());
joiningRound.scheduleNextAttempt();
}
public void deactivate() {
// called under coordinator mutex
joiningRound = null;
}
/**
* Waits for some number of calls to {@link ListenableCountDown#countDown()} and then notifies a listener. The listener
* is only ever notified once, whether successful or not.
*/
private static class ListenableCountDown {
private final CountDown countDown;
private final ActionListener<Void> listener;
ListenableCountDown(int count, ActionListener<Void> listener) {
this.countDown = new CountDown(count);
this.listener = listener;
}
void onFailure(Exception e) {
if (countDown.fastForward()) {
listener.onFailure(e);
}
}
void countDown() {
if (countDown.countDown()) {
listener.onResponse(null);
}
}
}
private class JoiningRound {
private final boolean upgrading;
JoiningRound(boolean upgrading) {
this.upgrading = upgrading;
}
private boolean isRunning() {
return joiningRound == this && isBootstrappedSupplier.getAsBoolean() == false;
}
void scheduleNextAttempt() {
if (isRunning() == false) {
return;
}
final ThreadPool threadPool = transportService.getThreadPool();
threadPool.scheduleUnlessShuttingDown(bwcPingTimeout, Names.SAME, new Runnable() {
@Override
public void run() {
if (isRunning() == false) {
return;
}
final Set<DiscoveryNode> discoveryNodes = Stream.concat(StreamSupport.stream(peersSupplier.get().spliterator(), false),
Stream.of(transportService.getLocalNode())).filter(DiscoveryNode::isMasterNode).collect(Collectors.toSet());
// this set of nodes is reasonably fresh - the PeerFinder cleans up nodes to which the transport service is not
// connected each time it wakes up (every second by default)
logger.debug("nodes: {}", discoveryNodes);
if (electMasterService.hasEnoughMasterNodes(discoveryNodes)) {
if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) {
electBestOldMaster(discoveryNodes);
} else if (upgrading && enableUnsafeBootstrappingOnUpgrade) {
// no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade
transportService.getThreadPool().generic().execute(() -> {
try {
initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream()
.map(DiscoveryNode::getId).collect(Collectors.toSet())));
} catch (Exception e) {
logger.debug("exception during bootstrapping upgrade, retrying", e);
} finally {
scheduleNextAttempt();
}
});
} else {
scheduleNextAttempt();
}
} else {
scheduleNextAttempt();
}
}
/**
* Ping all the old discovered masters one more time to obtain their cluster state versions, and then vote for the best one.
* @param discoveryNodes The master nodes (old and new).
*/
private void electBestOldMaster(Set<DiscoveryNode> discoveryNodes) {
final Set<MasterCandidate> masterCandidates = newConcurrentSet();
final ListenableCountDown listenableCountDown
= new ListenableCountDown(discoveryNodes.size(), new ActionListener<Void>() {
@Override
public void onResponse(Void value) {
assert masterCandidates.size() == discoveryNodes.size()
: masterCandidates + " does not match " + discoveryNodes;
// TODO we shouldn't elect a master with a version that's older than ours
// If the only Zen1 nodes left are stale, and we can bootstrap, maybe we should bootstrap?
// Do we ever need to elect a freshly-started Zen1 node?
if (isRunning()) {
final MasterCandidate electedMaster = electMasterService.electMaster(masterCandidates);
logger.debug("elected {}, sending join", electedMaster);
joinHelper.sendJoinRequest(electedMaster.getNode(), Optional.empty(),
JoiningRound.this::scheduleNextAttempt);
}
}
@Override
public void onFailure(Exception e) {
scheduleNextAttempt();
}
});
boolean foundOldMaster = false;
for (final DiscoveryNode discoveryNode : discoveryNodes) {
assert discoveryNode.isMasterNode() : discoveryNode;
if (Coordinator.isZen1Node(discoveryNode)) {
foundOldMaster = true;
transportService.sendRequest(discoveryNode, UnicastZenPing.ACTION_NAME,
new UnicastPingRequest(0, TimeValue.ZERO,
new PingResponse(createDiscoveryNodeWithImpossiblyHighId(transportService.getLocalNode()),
null, clusterName, UNKNOWN_VERSION)),
TransportRequestOptions.builder().withTimeout(bwcPingTimeout).build(),
new TransportResponseHandler<UnicastPingResponse>() {
@Override
public void handleResponse(UnicastPingResponse response) {
long clusterStateVersion = UNKNOWN_VERSION;
for (PingResponse pingResponse : response.pingResponses) {
if (discoveryNode.equals(pingResponse.node())) {
clusterStateVersion
= max(clusterStateVersion, pingResponse.getClusterStateVersion());
}
}
masterCandidates.add(new MasterCandidate(discoveryNode, clusterStateVersion));
listenableCountDown.countDown();
}
@Override
public void handleException(TransportException exp) {
logger.debug(
new ParameterizedMessage("unexpected exception when pinging {}", discoveryNode), exp);
listenableCountDown.onFailure(exp);
}
@Override
public String executor() {
return Names.SAME;
}
@Override
public UnicastPingResponse read(StreamInput in) throws IOException {
return new UnicastPingResponse(in);
}
});
} else {
masterCandidates.add(
new MasterCandidate(createDiscoveryNodeWithImpossiblyHighId(discoveryNode), UNKNOWN_VERSION));
listenableCountDown.countDown();
}
}
assert foundOldMaster;
}
@Override
public String toString() {
return "discovery upgrade service retry";
}
});
}
}
/**
* Pre-7.0 nodes select the best master by comparing their IDs (as strings) and selecting the lowest one amongst those nodes with
* the best cluster state version. We want 7.0+ nodes to participate in these elections in a mixed cluster but never to win one, so
* we lie and claim to have an impossible ID that compares above all genuine IDs.
*/
public static DiscoveryNode createDiscoveryNodeWithImpossiblyHighId(DiscoveryNode node) {
// IDs are base-64-encoded UUIDs, which means they the character set [0-9A-Za-z_-]. The highest character in this set is 'z', and
// 'z' < '{', so by starting the ID with '{' we can be sure it's greater. This is terrible.
final String fakeId = "{zen2}" + node.getId();
assert node.getId().compareTo(fakeId) < 0 : node + " vs " + fakeId;
return new DiscoveryNode(node.getName(), fakeId, node.getEphemeralId(), node.getHostName(),
node.getHostAddress(), node.getAddress(), node.getAttributes(), node.getRoles(), node.getVersion());
}
}

@ -166,6 +166,11 @@ public class JoinHelper {
}
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
sendJoinRequest(destination, optionalJoin, () -> {
});
}
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin, Runnable onCompletion) {
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
@ -192,12 +197,14 @@ public class JoinHelper {
public void handleResponse(Empty response) {
pendingOutgoingJoins.remove(dedupKey);
logger.debug("successfully joined {} with {}", destination, joinRequest);
onCompletion.run();
}
@Override
public void handleException(TransportException exp) {
pendingOutgoingJoins.remove(dedupKey);
logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
onCompletion.run();
}
@Override

@ -151,7 +151,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
}
}
private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState currentState, List<Task> joiningNodes) {
protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState currentState, List<Task> joiningNodes) {
assert currentState.nodes().getMasterNodeId() == null : currentState;
DiscoveryNodes currentNodes = currentState.nodes();
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes);

@ -38,6 +38,7 @@ import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
import java.util.stream.StreamSupport;
import static org.elasticsearch.cluster.coordination.CoordinationState.isElectionQuorum;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
@ -140,6 +141,7 @@ public class PreVoteCollector {
}
void start(final Iterable<DiscoveryNode> broadcastNodes) {
assert StreamSupport.stream(broadcastNodes.spliterator(), false).noneMatch(Coordinator::isZen1Node) : broadcastNodes;
logger.debug("{} requesting pre-votes from {}", this, broadcastNodes);
broadcastNodes.forEach(n -> transportService.sendRequest(n, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest,
new TransportResponseHandler<PreVoteResponse>() {

@ -94,6 +94,7 @@ public class Reconfigurator {
* @return An optimal configuration, or leave the current configuration unchanged if the optimal configuration has no live quorum.
*/
public VotingConfiguration reconfigure(Set<DiscoveryNode> liveNodes, Set<String> retiredNodeIds, VotingConfiguration currentConfig) {
assert liveNodes.stream().noneMatch(Coordinator::isZen1Node) : liveNodes;
logger.trace("{} reconfiguring {} based on liveNodes={}, retiredNodeIds={}", this, currentConfig, liveNodes, retiredNodeIds);
/*

@ -35,6 +35,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.DiscoveryUpgradeService;
import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory;
import org.elasticsearch.cluster.coordination.FollowersChecker;
import org.elasticsearch.cluster.coordination.JoinHelper;
@ -476,7 +477,9 @@ public final class ClusterSettings extends AbstractScopedSettings {
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING,
ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING,
ClusterBootstrapService.UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING,
LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING
LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING,
DiscoveryUpgradeService.BWC_PING_TIMEOUT_SETTING,
DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING
)));
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(

@ -27,7 +27,6 @@ import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.PeersResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -41,6 +40,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.discovery.zen.ZenPing.PingResponse;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportChannel;
@ -62,6 +62,8 @@ import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.coordination.Coordinator.isZen1Node;
import static org.elasticsearch.cluster.coordination.DiscoveryUpgradeService.createDiscoveryNodeWithImpossiblyHighId;
import static java.util.Collections.emptyList;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
@ -412,6 +414,11 @@ public abstract class PeerFinder {
final DiscoveryNode discoveryNode = getDiscoveryNode();
assert discoveryNode != null : "cannot request peers without first connecting";
if (discoveryNode.equals(getLocalNode())) {
logger.trace("{} not requesting peers from local node", this);
return;
}
logger.trace("{} requesting peers", this);
peersRequestInFlight = true;
@ -459,11 +466,11 @@ public abstract class PeerFinder {
final String actionName;
final TransportRequest transportRequest;
final TransportResponseHandler<?> transportResponseHandler;
if (Coordinator.isZen1Node(discoveryNode)) {
if (isZen1Node(discoveryNode)) {
actionName = UnicastZenPing.ACTION_NAME;
transportRequest = new UnicastZenPing.UnicastPingRequest(1, ZenDiscovery.PING_TIMEOUT_SETTING.get(settings),
new ZenPing.PingResponse(getLocalNode(), null, ClusterName.CLUSTER_NAME_SETTING.get(settings),
ClusterState.UNKNOWN_VERSION));
new ZenPing.PingResponse(createDiscoveryNodeWithImpossiblyHighId(getLocalNode()), null,
ClusterName.CLUSTER_NAME_SETTING.get(settings), ClusterState.UNKNOWN_VERSION));
transportResponseHandler = peersResponseHandler.wrap(ucResponse -> {
Optional<DiscoveryNode> optionalMasterNode = Arrays.stream(ucResponse.pingResponses)
.filter(pr -> discoveryNode.equals(pr.node()) && discoveryNode.equals(pr.master()))
@ -471,9 +478,9 @@ public abstract class PeerFinder {
.findFirst();
List<DiscoveryNode> discoveredNodes = new ArrayList<>();
if (optionalMasterNode.isPresent() == false) {
Arrays.stream(ucResponse.pingResponses).map(pr -> pr.master()).filter(Objects::nonNull)
Arrays.stream(ucResponse.pingResponses).map(PingResponse::master).filter(Objects::nonNull)
.forEach(discoveredNodes::add);
Arrays.stream(ucResponse.pingResponses).map(pr -> pr.node()).forEach(discoveredNodes::add);
Arrays.stream(ucResponse.pingResponses).map(PingResponse::node).forEach(discoveredNodes::add);
}
return new PeersResponse(optionalMasterNode, discoveredNodes, 0L);
}, UnicastZenPing.UnicastPingResponse::new);
@ -506,10 +513,12 @@ public abstract class PeerFinder {
final PeersResponse peersResponse = handlePeersRequest(peersRequest);
final List<ZenPing.PingResponse> pingResponses = new ArrayList<>();
final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
pingResponses.add(new ZenPing.PingResponse(transportService.getLocalNode(), peersResponse.getMasterNode().orElse(null),
clusterName, 0L));
pingResponses.add(new ZenPing.PingResponse(createDiscoveryNodeWithImpossiblyHighId(transportService.getLocalNode()),
peersResponse.getMasterNode().orElse(null),
clusterName, ClusterState.UNKNOWN_VERSION));
peersResponse.getKnownPeers().forEach(dn -> pingResponses.add(
new ZenPing.PingResponse(dn, null, clusterName, ClusterState.UNKNOWN_VERSION)));
new ZenPing.PingResponse(ZenPing.PingResponse.FAKE_PING_ID,
isZen1Node(dn) ? dn : createDiscoveryNodeWithImpossiblyHighId(dn), null, clusterName, ClusterState.UNKNOWN_VERSION)));
channel.sendResponse(new UnicastZenPing.UnicastPingResponse(request.id, pingResponses.toArray(new ZenPing.PingResponse[0])));
}
}

@ -46,6 +46,12 @@ public interface ZenPing extends Releasable {
class PingResponse implements Writeable {
/**
* An ID of a ping response that was generated on behalf of another node. Needs to be less than all other ping IDs so that fake ping
* responses don't override real ones.
*/
public static long FAKE_PING_ID = -1;
private static final AtomicLong idGenerator = new AtomicLong();
// an always increasing unique identifier for this ping response.
@ -68,7 +74,19 @@ public interface ZenPing extends Releasable {
* ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)
*/
public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {
this.id = idGenerator.incrementAndGet();
this(idGenerator.incrementAndGet(), node, master, clusterName, clusterStateVersion);
}
/**
* @param id the ping's ID
* @param node the node which this ping describes
* @param master the current master of the node
* @param clusterName the cluster name of the node
* @param clusterStateVersion the current cluster state version of that node
* ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)
*/
public PingResponse(long id, DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {
this.id = id;
this.node = node;
this.master = master;
this.clusterName = clusterName;

@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.lessThan;
public class DiscoveryUpgradeServiceTests extends ESTestCase {
public void testCreateDiscoveryNodeWithImpossiblyHighId() {
final DiscoveryNode discoveryNode
= new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode fakeNode = DiscoveryUpgradeService.createDiscoveryNodeWithImpossiblyHighId(discoveryNode);
assertThat(discoveryNode.getId(), lessThan(fakeNode.getId()));
assertThat(fakeNode.getId(), containsString(discoveryNode.getId()));
}
}

@ -18,14 +18,33 @@
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import java.util.List;
import java.util.stream.IntStream;
import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class Zen1IT extends ESIntegTestCase {
private static Settings ZEN1_SETTINGS = Coordinator.addZen1Attribute(Settings.builder()
private static Settings ZEN1_SETTINGS = Coordinator.addZen1Attribute(true, Settings.builder()
.put(TestZenDiscovery.USE_ZEN2.getKey(), false)
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false)) // Zen2 does not know about mock pings
.build();
@ -45,4 +64,168 @@ public class Zen1IT extends ESIntegTestCase {
internalCluster().startNodes(randomIntBetween(1, 3), ZEN1_SETTINGS);
createIndex("test");
}
public void testMixedClusterFormation() throws Exception {
final int zen1NodeCount = randomIntBetween(1, 3);
final int zen2NodeCount = randomIntBetween(zen1NodeCount == 1 ? 2 : 1, 3);
logger.info("starting cluster of [{}] Zen1 nodes and [{}] Zen2 nodes", zen1NodeCount, zen2NodeCount);
final List<String> nodes = internalCluster().startNodes(IntStream.range(0, zen1NodeCount + zen2NodeCount)
.mapToObj(i -> i < zen1NodeCount ? ZEN1_SETTINGS : ZEN2_SETTINGS).toArray(Settings[]::new));
createIndex("test",
Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.ZERO) // assign shards
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, zen1NodeCount + zen2NodeCount + randomIntBetween(0, 2)) // causes rebalancing
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build());
ensureGreen("test");
for (final String node : nodes) {
// With 1 Zen1 node when you stop the Zen1 node the Zen2 nodes might auto-bootstrap.
// But there are only 2 Zen2 nodes so you must do the right things with voting config exclusions to keep the cluster
// alive through the other two restarts.
final boolean masterNodeIsZen2 = zen1NodeCount <= nodes.indexOf(internalCluster().getMasterName());
final boolean thisNodeIsZen2 = zen1NodeCount <= nodes.indexOf(node);
final boolean requiresVotingConfigExclusions = zen1NodeCount == 1 && zen2NodeCount == 2 && masterNodeIsZen2 && thisNodeIsZen2;
if (requiresVotingConfigExclusions) {
client().execute(AddVotingConfigExclusionsAction.INSTANCE,
new AddVotingConfigExclusionsRequest(new String[]{node})).get();
}
internalCluster().restartNode(node, new RestartCallback() {
@Override
public Settings onNodeStopped(String restartingNode) {
String viaNode = randomValueOtherThan(restartingNode, () -> randomFrom(nodes));
final ClusterHealthRequestBuilder clusterHealthRequestBuilder = client(viaNode).admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(zen1NodeCount + zen2NodeCount - 1))
.setTimeout(TimeValue.timeValueSeconds(30));
ClusterHealthResponse clusterHealthResponse = clusterHealthRequestBuilder.get();
assertFalse(restartingNode, clusterHealthResponse.isTimedOut());
return Settings.EMPTY;
}
});
ensureStableCluster(zen1NodeCount + zen2NodeCount);
ensureGreen("test");
if (requiresVotingConfigExclusions) {
final ClearVotingConfigExclusionsRequest clearVotingTombstonesRequest = new ClearVotingConfigExclusionsRequest();
clearVotingTombstonesRequest.setWaitForRemoval(false);
client().execute(ClearVotingConfigExclusionsAction.INSTANCE, clearVotingTombstonesRequest).get();
}
}
}
public void testRollingMigrationFromZen1ToZen2() throws Exception {
final int nodeCount = randomIntBetween(2, 5);
final List<String> zen1Nodes = internalCluster().startNodes(nodeCount, ZEN1_SETTINGS);
createIndex("test",
Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.ZERO) // assign shards
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, nodeCount) // causes rebalancing
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build());
ensureGreen("test");
for (final String zen1Node : zen1Nodes) {
logger.info("--> shutting down {}", zen1Node);
internalCluster().stopRandomNode(s -> NODE_NAME_SETTING.get(s).equals(zen1Node));
ensureStableCluster(nodeCount - 1);
if (nodeCount > 2) {
ensureGreen("test");
} else {
ensureYellow("test");
}
logger.info("--> starting replacement for {}", zen1Node);
final String newNode = internalCluster().startNode(ZEN2_SETTINGS);
ensureStableCluster(nodeCount);
ensureGreen("test");
logger.info("--> successfully replaced {} with {}", zen1Node, newNode);
}
assertThat(internalCluster().size(), equalTo(nodeCount));
}
public void testRollingUpgradeFromZen1ToZen2() throws Exception {
final int nodeCount = randomIntBetween(2, 5);
final List<String> nodes = internalCluster().startNodes(nodeCount, ZEN1_SETTINGS);
createIndex("test",
Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.ZERO) // assign shards
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, nodeCount) // causes rebalancing
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build());
ensureGreen("test");
internalCluster().rollingRestart(new RestartCallback() {
@Override
public void doAfterNodes(int n, Client client) {
ensureGreen("test");
}
@Override
public Settings onNodeStopped(String nodeName) {
String viaNode = randomValueOtherThan(nodeName, () -> randomFrom(nodes));
final ClusterHealthRequestBuilder clusterHealthRequestBuilder = client(viaNode).admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(nodeCount - 1))
.setTimeout(TimeValue.timeValueSeconds(30));
if (nodeCount == 2) {
clusterHealthRequestBuilder.setWaitForYellowStatus();
} else {
clusterHealthRequestBuilder.setWaitForGreenStatus();
}
ClusterHealthResponse clusterHealthResponse = clusterHealthRequestBuilder.get();
assertFalse(nodeName, clusterHealthResponse.isTimedOut());
return Coordinator.addZen1Attribute(false, Settings.builder().put(ZEN2_SETTINGS)).build();
}
});
ensureStableCluster(nodeCount);
ensureGreen("test");
assertThat(internalCluster().size(), equalTo(nodeCount));
}
private void testMultipleNodeMigrationFromZen1ToZen2(int nodeCount) throws Exception {
final List<String> oldNodes = internalCluster().startNodes(nodeCount, ZEN1_SETTINGS);
createIndex("test",
Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.ZERO) // assign shards
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, nodeCount) // causes rebalancing
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, nodeCount > 1 ? 1 : 0)
.build());
ensureGreen("test");
internalCluster().startNodes(nodeCount, ZEN2_SETTINGS);
logger.info("--> updating settings to exclude old nodes");
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder()
.put(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), String.join(",", oldNodes))).get();
logger.info("--> waiting for old nodes to be vacated");
waitForRelocation();
while (internalCluster().size() > nodeCount) {
internalCluster().stopRandomNode(settings -> oldNodes.contains(NODE_NAME_SETTING.get(settings)));
}
ensureGreen("test");
}
public void testMultipleNodeMigrationFromZen1ToZen2WithOneNode() throws Exception {
testMultipleNodeMigrationFromZen1ToZen2(1);
}
public void testMultipleNodeMigrationFromZen1ToZen2WithTwoNodes() throws Exception {
testMultipleNodeMigrationFromZen1ToZen2(2);
}
public void testMultipleNodeMigrationFromZen1ToZen2WithThreeNodes() throws Exception {
testMultipleNodeMigrationFromZen1ToZen2(3);
}
}

@ -105,6 +105,7 @@ import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.MockTransportClient;
@ -1943,7 +1944,8 @@ public final class InternalTestCluster extends TestCluster {
}
final List<NodeAndClient> nodes = new ArrayList<>();
final int prevMasterCount = getMasterNodesCount();
int bootstrapMasterNodeIndex = prevMasterCount == 0 && autoManageMinMasterNodes && newMasterCount > 0
int bootstrapMasterNodeIndex = prevMasterCount == 0 && autoManageMinMasterNodes && newMasterCount > 0 && Arrays.stream(settings)
.allMatch(s -> Node.NODE_MASTER_SETTING.get(s) == false || TestZenDiscovery.USE_ZEN2.get(s) == true)
? RandomNumbers.randomIntBetween(random, 0, newMasterCount - 1) : -1;
for (Settings nodeSettings : settings) {