diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index 9fc35dc7be3..75dc811f37d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -127,6 +127,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction private final boolean wasReadFromDiff; + private final int minimumMasterNodesOnPublishingMaster; + // built on demand private volatile RoutingNodes routingNodes; public ClusterState(long version, String stateUUID, ClusterState state) { this(state.clusterName, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), - state.customs(), false); + state.customs(), -1, false); } public ClusterState(ClusterName clusterName, long version, String stateUUID, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap customs, - boolean wasReadFromDiff) { + int minimumMasterNodesOnPublishingMaster, boolean wasReadFromDiff) { this.version = version; this.stateUUID = stateUUID; this.clusterName = clusterName; @@ -197,6 +200,7 @@ public class ClusterState implements ToXContentFragment, Diffable this.nodes = nodes; this.blocks = blocks; this.customs = customs; + this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster; this.wasReadFromDiff = wasReadFromDiff; } @@ -290,6 +294,17 @@ public class ClusterState implements ToXContentFragment, Diffable return coordinationMetaData().getVotingConfigExclusions(); } + /** + * The node-level `discovery.zen.minimum_master_nodes` setting on the master node that published this cluster state, for use in rolling + * upgrades from 6.x to 7.x. Once all the 6.x master-eligible nodes have left the cluster, the 7.x nodes use this value to determine how + * many master-eligible nodes must be discovered before the cluster can be bootstrapped. Note that this method returns the node-level + * value of this setting, and ignores any cluster-level override that was set via the API. Callers are expected to combine this value + * with any value set in the cluster-level settings. This should be removed once we no longer need support for {@link Version#V_6_7_0}. + */ + public int getMinimumMasterNodesOnPublishingMaster() { + return minimumMasterNodesOnPublishingMaster; + } + // Used for testing and logging to determine how this cluster state was send over the wire public boolean wasReadFromDiff() { return wasReadFromDiff; @@ -644,7 +659,7 @@ public class ClusterState implements ToXContentFragment, Diffable private ClusterBlocks blocks = ClusterBlocks.EMPTY_CLUSTER_BLOCK; private final ImmutableOpenMap.Builder customs; private boolean fromDiff; - + private int minimumMasterNodesOnPublishingMaster = -1; public Builder(ClusterState state) { this.clusterName = state.clusterName; @@ -655,6 +670,7 @@ public class ClusterState implements ToXContentFragment, Diffable this.metaData = state.metaData(); this.blocks = state.blocks(); this.customs = ImmutableOpenMap.builder(state.customs()); + this.minimumMasterNodesOnPublishingMaster = state.minimumMasterNodesOnPublishingMaster; this.fromDiff = false; } @@ -715,6 +731,11 @@ public class ClusterState implements ToXContentFragment, Diffable return this; } + public Builder minimumMasterNodesOnPublishingMaster(int minimumMasterNodesOnPublishingMaster) { + this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster; + return this; + } + public Builder putCustom(String type, Custom custom) { customs.put(type, custom); return this; @@ -739,7 +760,8 @@ public class ClusterState implements ToXContentFragment, Diffable if (UNKNOWN_UUID.equals(uuid)) { uuid = UUIDs.randomBase64UUID(); } - return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), fromDiff); + return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), + minimumMasterNodesOnPublishingMaster, fromDiff); } public static byte[] toBytes(ClusterState state) throws IOException { @@ -782,6 +804,7 @@ public class ClusterState implements ToXContentFragment, Diffable Custom customIndexMetaData = in.readNamedWriteable(Custom.class); builder.putCustom(customIndexMetaData.getWriteableName(), customIndexMetaData); } + builder.minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_7_0_0) ? in.readVInt() : -1; return builder.build(); } @@ -807,6 +830,9 @@ public class ClusterState implements ToXContentFragment, Diffable out.writeNamedWriteable(cursor.value); } } + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeVInt(minimumMasterNodesOnPublishingMaster); + } } private static class ClusterStateDiff implements Diff { @@ -829,6 +855,8 @@ public class ClusterState implements ToXContentFragment, Diffable private final Diff> customs; + private final int minimumMasterNodesOnPublishingMaster; + ClusterStateDiff(ClusterState before, ClusterState after) { fromUuid = before.stateUUID; toUuid = after.stateUUID; @@ -839,6 +867,7 @@ public class ClusterState implements ToXContentFragment, Diffable metaData = after.metaData.diff(before.metaData); blocks = after.blocks.diff(before.blocks); customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER); + minimumMasterNodesOnPublishingMaster = after.minimumMasterNodesOnPublishingMaster; } ClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException { @@ -851,6 +880,7 @@ public class ClusterState implements ToXContentFragment, Diffable metaData = MetaData.readDiffFrom(in); blocks = ClusterBlocks.readDiffFrom(in); customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER); + minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_7_0_0) ? in.readVInt() : -1; } @Override @@ -864,6 +894,9 @@ public class ClusterState implements ToXContentFragment, Diffable metaData.writeTo(out); blocks.writeTo(out); customs.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeVInt(minimumMasterNodesOnPublishingMaster); + } } @Override @@ -883,9 +916,9 @@ public class ClusterState implements ToXContentFragment, Diffable builder.metaData(metaData.apply(state.metaData)); builder.blocks(blocks.apply(state.blocks)); builder.customs(customs.apply(state.customs)); + builder.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnPublishingMaster); builder.fromDiff(true); return builder.build(); } - } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 4a018c1f78f..a4e1d3ed8c9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -168,7 +168,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.reconfigurator = new Reconfigurator(settings, clusterSettings); this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService, this::getFoundPeers, this::isInitialConfigurationSet, this::setInitialConfiguration); - this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService, + this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, transportService, this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::setInitialConfiguration); this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"), transportService::getLocalNode); @@ -467,7 +467,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery clusterFormationFailureHelper.start(); if (getCurrentTerm() == ZEN1_BWC_TERM) { - discoveryUpgradeService.activate(lastKnownLeader); + discoveryUpgradeService.activate(lastKnownLeader, coordinationState.get().getLastAcceptedState()); } leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java index 496adb65bb6..56102704848 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java @@ -24,11 +24,11 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -60,6 +60,7 @@ import static java.lang.Math.max; import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING; import static org.elasticsearch.cluster.ClusterState.UNKNOWN_VERSION; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; +import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.discovery.zen.ZenDiscovery.PING_TIMEOUT_SETTING; /** @@ -80,7 +81,12 @@ public class DiscoveryUpgradeService { public static final Setting ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING = Setting.boolSetting("discovery.zen.unsafe_rolling_upgrades_enabled", true, Setting.Property.NodeScope); - private final ElectMasterService electMasterService; + /** + * Dummy {@link ElectMasterService} that is only used to choose the best 6.x master from the discovered nodes, ignoring the + * `minimum_master_nodes` setting. + */ + private static final ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY); + private final TransportService transportService; private final BooleanSupplier isBootstrappedSupplier; private final JoinHelper joinHelper; @@ -93,12 +99,11 @@ public class DiscoveryUpgradeService { @Nullable // null if no active joining round private volatile JoiningRound joiningRound; - public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSettings, TransportService transportService, + public DiscoveryUpgradeService(Settings settings, TransportService transportService, BooleanSupplier isBootstrappedSupplier, JoinHelper joinHelper, Supplier> peersSupplier, Consumer initialConfigurationConsumer) { assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this service once unsafe upgrades are no longer needed"; - electMasterService = new ElectMasterService(settings); this.transportService = transportService; this.isBootstrappedSupplier = isBootstrappedSupplier; this.joinHelper = joinHelper; @@ -107,12 +112,9 @@ public class DiscoveryUpgradeService { this.bwcPingTimeout = BWC_PING_TIMEOUT_SETTING.get(settings); this.enableUnsafeBootstrappingOnUpgrade = ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING.get(settings); this.clusterName = CLUSTER_NAME_SETTING.get(settings); - - clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, - electMasterService::minimumMasterNodes); // TODO reject update if the new value is too large } - public void activate(Optional lastKnownLeader) { + public void activate(Optional lastKnownLeader, ClusterState lastAcceptedClusterState) { // called under coordinator mutex if (isBootstrappedSupplier.getAsBoolean()) { @@ -122,8 +124,13 @@ public class DiscoveryUpgradeService { assert lastKnownLeader.isPresent() == false || Coordinator.isZen1Node(lastKnownLeader.get()) : lastKnownLeader; // if there was a leader and it's not a old node then we must have been bootstrapped + final Settings dynamicSettings = lastAcceptedClusterState.metaData().settings(); + final int minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(dynamicSettings) + ? DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(dynamicSettings) + : lastAcceptedClusterState.getMinimumMasterNodesOnPublishingMaster(); + assert joiningRound == null : joiningRound; - joiningRound = new JoiningRound(lastKnownLeader.isPresent()); + joiningRound = new JoiningRound(enableUnsafeBootstrappingOnUpgrade && lastKnownLeader.isPresent(), minimumMasterNodes); joiningRound.scheduleNextAttempt(); } @@ -160,15 +167,21 @@ public class DiscoveryUpgradeService { private class JoiningRound { private final boolean upgrading; + private final int minimumMasterNodes; - JoiningRound(boolean upgrading) { + JoiningRound(boolean upgrading, int minimumMasterNodes) { this.upgrading = upgrading; + this.minimumMasterNodes = minimumMasterNodes; } private boolean isRunning() { return joiningRound == this && isBootstrappedSupplier.getAsBoolean() == false; } + private boolean canBootstrap(Set discoveryNodes) { + return upgrading && minimumMasterNodes <= discoveryNodes.stream().filter(DiscoveryNode::isMasterNode).count(); + } + void scheduleNextAttempt() { if (isRunning() == false) { return; @@ -189,26 +202,22 @@ public class DiscoveryUpgradeService { // this set of nodes is reasonably fresh - the PeerFinder cleans up nodes to which the transport service is not // connected each time it wakes up (every second by default) - logger.debug("nodes: {}", discoveryNodes); + logger.debug("upgrading={}, minimumMasterNodes={}, nodes={}", upgrading, minimumMasterNodes, discoveryNodes); - if (electMasterService.hasEnoughMasterNodes(discoveryNodes)) { - if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) { - electBestOldMaster(discoveryNodes); - } else if (upgrading && enableUnsafeBootstrappingOnUpgrade) { - // no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade - transportService.getThreadPool().generic().execute(() -> { - try { - initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream() - .map(DiscoveryNode::getId).collect(Collectors.toSet()))); - } catch (Exception e) { - logger.debug("exception during bootstrapping upgrade, retrying", e); - } finally { - scheduleNextAttempt(); - } - }); - } else { - scheduleNextAttempt(); - } + if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) { + electBestOldMaster(discoveryNodes); + } else if (canBootstrap(discoveryNodes)) { + // no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade + transportService.getThreadPool().generic().execute(() -> { + try { + initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream() + .map(DiscoveryNode::getId).collect(Collectors.toSet()))); + } catch (Exception e) { + logger.debug("exception during bootstrapping upgrade, retrying", e); + } finally { + scheduleNextAttempt(); + } + }); } else { scheduleNextAttempt(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 8c41d7b2eaa..53fada396fc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -90,7 +90,7 @@ public class JoinHelper { this.masterService = masterService; this.transportService = transportService; this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); - this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) { + this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger) { @Override public ClusterTasksResult execute(ClusterState currentState, List joiningTasks) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index b754e50a945..2dcc1022f8d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -29,7 +29,9 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import java.util.ArrayList; @@ -46,6 +48,8 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor secondThirdNodes = internalCluster().startNodes(2); + assertThat(internalCluster().getMasterName(), equalTo(firstNode)); + + final List allNodes = Stream.concat(Stream.of(firstNode), secondThirdNodes.stream()).collect(Collectors.toList()); + for (final String node : allNodes) { + final ClusterState localState = client(node).admin().cluster().state(new ClusterStateRequest().local(true)).get().getState(); + assertThat(localState.getMinimumMasterNodesOnPublishingMaster(), equalTo(1)); + assertThat(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(localState.metaData().settings()), equalTo(2)); + } + + internalCluster().stopRandomNode(nameFilter(firstNode)); + assertThat(internalCluster().getMasterName(), isIn(secondThirdNodes)); + + for (final String node : secondThirdNodes) { + final ClusterState localState = client(node).admin().cluster().state(new ClusterStateRequest().local(true)).get().getState(); + assertThat(localState.getMinimumMasterNodesOnPublishingMaster(), equalTo(2)); + assertThat(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(localState.metaData().settings()), equalTo(2)); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index a3ae6b07b19..35a2173e0ae 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -141,7 +141,7 @@ public class NodeJoinControllerTests extends ESTestCase { throw new IllegalStateException("method setupMasterServiceAndNodeJoinController can only be called once"); } masterService = ClusterServiceUtils.createMasterService(threadPool, initialState); - nodeJoinController = new NodeJoinController(masterService, createAllocationService(Settings.EMPTY), + nodeJoinController = new NodeJoinController(Settings.EMPTY, masterService, createAllocationService(Settings.EMPTY), new ElectMasterService(Settings.EMPTY)); } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 387ba1c3d96..8a00be28f5e 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -213,7 +213,7 @@ public class ClusterStateChanges { transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver); nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); - joinTaskExecutor = new JoinTaskExecutor(allocationService, logger); + joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger); } public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {