Use m_m_nodes from Zen1 master for Zen2 bootstrap (#37701)
Today we support a smooth rolling upgrade from Zen1 to Zen2 by automatically bootstrapping the cluster once all the Zen1 nodes have left, as long as the `minimum_master_nodes` count is satisfied. However this means that Zen2 nodes also require the `minimum_master_nodes` setting for this one specific and transient situation. Since nodes only perform this automatic bootstrapping if they previously belonged to a Zen1 cluster, they can keep track of the `minimum_master_nodes` setting from the previous master instead of requiring it to be set on the Zen2 node.
This commit is contained in:
parent
2908ca1b35
commit
bdef2ab8c0
|
@ -127,6 +127,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
|
|||
ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
|
||||
builder.version(currentState.version());
|
||||
builder.stateUUID(currentState.stateUUID());
|
||||
builder.minimumMasterNodesOnPublishingMaster(currentState.getMinimumMasterNodesOnPublishingMaster());
|
||||
|
||||
if (request.nodes()) {
|
||||
builder.nodes(currentState.nodes());
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor;
|
|||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
|
@ -178,17 +179,19 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
|
||||
private final boolean wasReadFromDiff;
|
||||
|
||||
private final int minimumMasterNodesOnPublishingMaster;
|
||||
|
||||
// built on demand
|
||||
private volatile RoutingNodes routingNodes;
|
||||
|
||||
public ClusterState(long version, String stateUUID, ClusterState state) {
|
||||
this(state.clusterName, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(),
|
||||
state.customs(), false);
|
||||
state.customs(), -1, false);
|
||||
}
|
||||
|
||||
public ClusterState(ClusterName clusterName, long version, String stateUUID, MetaData metaData, RoutingTable routingTable,
|
||||
DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap<String, Custom> customs,
|
||||
boolean wasReadFromDiff) {
|
||||
int minimumMasterNodesOnPublishingMaster, boolean wasReadFromDiff) {
|
||||
this.version = version;
|
||||
this.stateUUID = stateUUID;
|
||||
this.clusterName = clusterName;
|
||||
|
@ -197,6 +200,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
this.nodes = nodes;
|
||||
this.blocks = blocks;
|
||||
this.customs = customs;
|
||||
this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster;
|
||||
this.wasReadFromDiff = wasReadFromDiff;
|
||||
}
|
||||
|
||||
|
@ -290,6 +294,17 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
return coordinationMetaData().getVotingConfigExclusions();
|
||||
}
|
||||
|
||||
/**
|
||||
* The node-level `discovery.zen.minimum_master_nodes` setting on the master node that published this cluster state, for use in rolling
|
||||
* upgrades from 6.x to 7.x. Once all the 6.x master-eligible nodes have left the cluster, the 7.x nodes use this value to determine how
|
||||
* many master-eligible nodes must be discovered before the cluster can be bootstrapped. Note that this method returns the node-level
|
||||
* value of this setting, and ignores any cluster-level override that was set via the API. Callers are expected to combine this value
|
||||
* with any value set in the cluster-level settings. This should be removed once we no longer need support for {@link Version#V_6_7_0}.
|
||||
*/
|
||||
public int getMinimumMasterNodesOnPublishingMaster() {
|
||||
return minimumMasterNodesOnPublishingMaster;
|
||||
}
|
||||
|
||||
// Used for testing and logging to determine how this cluster state was send over the wire
|
||||
public boolean wasReadFromDiff() {
|
||||
return wasReadFromDiff;
|
||||
|
@ -644,7 +659,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
private ClusterBlocks blocks = ClusterBlocks.EMPTY_CLUSTER_BLOCK;
|
||||
private final ImmutableOpenMap.Builder<String, Custom> customs;
|
||||
private boolean fromDiff;
|
||||
|
||||
private int minimumMasterNodesOnPublishingMaster = -1;
|
||||
|
||||
public Builder(ClusterState state) {
|
||||
this.clusterName = state.clusterName;
|
||||
|
@ -655,6 +670,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
this.metaData = state.metaData();
|
||||
this.blocks = state.blocks();
|
||||
this.customs = ImmutableOpenMap.builder(state.customs());
|
||||
this.minimumMasterNodesOnPublishingMaster = state.minimumMasterNodesOnPublishingMaster;
|
||||
this.fromDiff = false;
|
||||
}
|
||||
|
||||
|
@ -715,6 +731,11 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder minimumMasterNodesOnPublishingMaster(int minimumMasterNodesOnPublishingMaster) {
|
||||
this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder putCustom(String type, Custom custom) {
|
||||
customs.put(type, custom);
|
||||
return this;
|
||||
|
@ -739,7 +760,8 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
if (UNKNOWN_UUID.equals(uuid)) {
|
||||
uuid = UUIDs.randomBase64UUID();
|
||||
}
|
||||
return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), fromDiff);
|
||||
return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(),
|
||||
minimumMasterNodesOnPublishingMaster, fromDiff);
|
||||
}
|
||||
|
||||
public static byte[] toBytes(ClusterState state) throws IOException {
|
||||
|
@ -782,6 +804,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
Custom customIndexMetaData = in.readNamedWriteable(Custom.class);
|
||||
builder.putCustom(customIndexMetaData.getWriteableName(), customIndexMetaData);
|
||||
}
|
||||
builder.minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_7_0_0) ? in.readVInt() : -1;
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
@ -807,6 +830,9 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
out.writeNamedWriteable(cursor.value);
|
||||
}
|
||||
}
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
out.writeVInt(minimumMasterNodesOnPublishingMaster);
|
||||
}
|
||||
}
|
||||
|
||||
private static class ClusterStateDiff implements Diff<ClusterState> {
|
||||
|
@ -829,6 +855,8 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
|
||||
private final Diff<ImmutableOpenMap<String, Custom>> customs;
|
||||
|
||||
private final int minimumMasterNodesOnPublishingMaster;
|
||||
|
||||
ClusterStateDiff(ClusterState before, ClusterState after) {
|
||||
fromUuid = before.stateUUID;
|
||||
toUuid = after.stateUUID;
|
||||
|
@ -839,6 +867,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
metaData = after.metaData.diff(before.metaData);
|
||||
blocks = after.blocks.diff(before.blocks);
|
||||
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
|
||||
minimumMasterNodesOnPublishingMaster = after.minimumMasterNodesOnPublishingMaster;
|
||||
}
|
||||
|
||||
ClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException {
|
||||
|
@ -851,6 +880,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
metaData = MetaData.readDiffFrom(in);
|
||||
blocks = ClusterBlocks.readDiffFrom(in);
|
||||
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
|
||||
minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_7_0_0) ? in.readVInt() : -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -864,6 +894,9 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
metaData.writeTo(out);
|
||||
blocks.writeTo(out);
|
||||
customs.writeTo(out);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
out.writeVInt(minimumMasterNodesOnPublishingMaster);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -883,9 +916,9 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
builder.metaData(metaData.apply(state.metaData));
|
||||
builder.blocks(blocks.apply(state.blocks));
|
||||
builder.customs(customs.apply(state.customs));
|
||||
builder.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnPublishingMaster);
|
||||
builder.fromDiff(true);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -168,7 +168,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
|
||||
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService, this::getFoundPeers,
|
||||
this::isInitialConfigurationSet, this::setInitialConfiguration);
|
||||
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService,
|
||||
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, transportService,
|
||||
this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::setInitialConfiguration);
|
||||
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
|
||||
transportService::getLocalNode);
|
||||
|
@ -467,7 +467,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
clusterFormationFailureHelper.start();
|
||||
|
||||
if (getCurrentTerm() == ZEN1_BWC_TERM) {
|
||||
discoveryUpgradeService.activate(lastKnownLeader);
|
||||
discoveryUpgradeService.activate(lastKnownLeader, coordinationState.get().getLastAcceptedState());
|
||||
}
|
||||
|
||||
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
|
||||
|
|
|
@ -24,11 +24,11 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -60,6 +60,7 @@ import static java.lang.Math.max;
|
|||
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
|
||||
import static org.elasticsearch.cluster.ClusterState.UNKNOWN_VERSION;
|
||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
|
||||
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
|
||||
import static org.elasticsearch.discovery.zen.ZenDiscovery.PING_TIMEOUT_SETTING;
|
||||
|
||||
/**
|
||||
|
@ -80,7 +81,12 @@ public class DiscoveryUpgradeService {
|
|||
public static final Setting<Boolean> ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING =
|
||||
Setting.boolSetting("discovery.zen.unsafe_rolling_upgrades_enabled", true, Setting.Property.NodeScope);
|
||||
|
||||
private final ElectMasterService electMasterService;
|
||||
/**
|
||||
* Dummy {@link ElectMasterService} that is only used to choose the best 6.x master from the discovered nodes, ignoring the
|
||||
* `minimum_master_nodes` setting.
|
||||
*/
|
||||
private static final ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY);
|
||||
|
||||
private final TransportService transportService;
|
||||
private final BooleanSupplier isBootstrappedSupplier;
|
||||
private final JoinHelper joinHelper;
|
||||
|
@ -93,12 +99,11 @@ public class DiscoveryUpgradeService {
|
|||
@Nullable // null if no active joining round
|
||||
private volatile JoiningRound joiningRound;
|
||||
|
||||
public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSettings, TransportService transportService,
|
||||
public DiscoveryUpgradeService(Settings settings, TransportService transportService,
|
||||
BooleanSupplier isBootstrappedSupplier, JoinHelper joinHelper,
|
||||
Supplier<Iterable<DiscoveryNode>> peersSupplier,
|
||||
Consumer<VotingConfiguration> initialConfigurationConsumer) {
|
||||
assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this service once unsafe upgrades are no longer needed";
|
||||
electMasterService = new ElectMasterService(settings);
|
||||
this.transportService = transportService;
|
||||
this.isBootstrappedSupplier = isBootstrappedSupplier;
|
||||
this.joinHelper = joinHelper;
|
||||
|
@ -107,12 +112,9 @@ public class DiscoveryUpgradeService {
|
|||
this.bwcPingTimeout = BWC_PING_TIMEOUT_SETTING.get(settings);
|
||||
this.enableUnsafeBootstrappingOnUpgrade = ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING.get(settings);
|
||||
this.clusterName = CLUSTER_NAME_SETTING.get(settings);
|
||||
|
||||
clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
|
||||
electMasterService::minimumMasterNodes); // TODO reject update if the new value is too large
|
||||
}
|
||||
|
||||
public void activate(Optional<DiscoveryNode> lastKnownLeader) {
|
||||
public void activate(Optional<DiscoveryNode> lastKnownLeader, ClusterState lastAcceptedClusterState) {
|
||||
// called under coordinator mutex
|
||||
|
||||
if (isBootstrappedSupplier.getAsBoolean()) {
|
||||
|
@ -122,8 +124,13 @@ public class DiscoveryUpgradeService {
|
|||
assert lastKnownLeader.isPresent() == false || Coordinator.isZen1Node(lastKnownLeader.get()) : lastKnownLeader;
|
||||
// if there was a leader and it's not a old node then we must have been bootstrapped
|
||||
|
||||
final Settings dynamicSettings = lastAcceptedClusterState.metaData().settings();
|
||||
final int minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(dynamicSettings)
|
||||
? DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(dynamicSettings)
|
||||
: lastAcceptedClusterState.getMinimumMasterNodesOnPublishingMaster();
|
||||
|
||||
assert joiningRound == null : joiningRound;
|
||||
joiningRound = new JoiningRound(lastKnownLeader.isPresent());
|
||||
joiningRound = new JoiningRound(enableUnsafeBootstrappingOnUpgrade && lastKnownLeader.isPresent(), minimumMasterNodes);
|
||||
joiningRound.scheduleNextAttempt();
|
||||
}
|
||||
|
||||
|
@ -160,15 +167,21 @@ public class DiscoveryUpgradeService {
|
|||
|
||||
private class JoiningRound {
|
||||
private final boolean upgrading;
|
||||
private final int minimumMasterNodes;
|
||||
|
||||
JoiningRound(boolean upgrading) {
|
||||
JoiningRound(boolean upgrading, int minimumMasterNodes) {
|
||||
this.upgrading = upgrading;
|
||||
this.minimumMasterNodes = minimumMasterNodes;
|
||||
}
|
||||
|
||||
private boolean isRunning() {
|
||||
return joiningRound == this && isBootstrappedSupplier.getAsBoolean() == false;
|
||||
}
|
||||
|
||||
private boolean canBootstrap(Set<DiscoveryNode> discoveryNodes) {
|
||||
return upgrading && minimumMasterNodes <= discoveryNodes.stream().filter(DiscoveryNode::isMasterNode).count();
|
||||
}
|
||||
|
||||
void scheduleNextAttempt() {
|
||||
if (isRunning() == false) {
|
||||
return;
|
||||
|
@ -189,26 +202,22 @@ public class DiscoveryUpgradeService {
|
|||
// this set of nodes is reasonably fresh - the PeerFinder cleans up nodes to which the transport service is not
|
||||
// connected each time it wakes up (every second by default)
|
||||
|
||||
logger.debug("nodes: {}", discoveryNodes);
|
||||
logger.debug("upgrading={}, minimumMasterNodes={}, nodes={}", upgrading, minimumMasterNodes, discoveryNodes);
|
||||
|
||||
if (electMasterService.hasEnoughMasterNodes(discoveryNodes)) {
|
||||
if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) {
|
||||
electBestOldMaster(discoveryNodes);
|
||||
} else if (upgrading && enableUnsafeBootstrappingOnUpgrade) {
|
||||
// no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade
|
||||
transportService.getThreadPool().generic().execute(() -> {
|
||||
try {
|
||||
initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream()
|
||||
.map(DiscoveryNode::getId).collect(Collectors.toSet())));
|
||||
} catch (Exception e) {
|
||||
logger.debug("exception during bootstrapping upgrade, retrying", e);
|
||||
} finally {
|
||||
scheduleNextAttempt();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
scheduleNextAttempt();
|
||||
}
|
||||
if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) {
|
||||
electBestOldMaster(discoveryNodes);
|
||||
} else if (canBootstrap(discoveryNodes)) {
|
||||
// no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade
|
||||
transportService.getThreadPool().generic().execute(() -> {
|
||||
try {
|
||||
initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream()
|
||||
.map(DiscoveryNode::getId).collect(Collectors.toSet())));
|
||||
} catch (Exception e) {
|
||||
logger.debug("exception during bootstrapping upgrade, retrying", e);
|
||||
} finally {
|
||||
scheduleNextAttempt();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
scheduleNextAttempt();
|
||||
}
|
||||
|
|
|
@ -90,7 +90,7 @@ public class JoinHelper {
|
|||
this.masterService = masterService;
|
||||
this.transportService = transportService;
|
||||
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
|
||||
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) {
|
||||
this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger) {
|
||||
|
||||
@Override
|
||||
public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)
|
||||
|
|
|
@ -29,7 +29,9 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -46,6 +48,8 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
|
|||
|
||||
private final Logger logger;
|
||||
|
||||
private final int minimumMasterNodesOnLocalNode;
|
||||
|
||||
public static class Task {
|
||||
|
||||
private final DiscoveryNode node;
|
||||
|
@ -81,9 +85,10 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
|
|||
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
|
||||
}
|
||||
|
||||
public JoinTaskExecutor(AllocationService allocationService, Logger logger) {
|
||||
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger) {
|
||||
this.allocationService = allocationService;
|
||||
this.logger = logger;
|
||||
minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -186,7 +191,9 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
|
|||
// or removed by us above
|
||||
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(ClusterBlocks.builder()
|
||||
.blocks(currentState.blocks())
|
||||
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build();
|
||||
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID))
|
||||
.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnLocalNode)
|
||||
.build();
|
||||
logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes());
|
||||
tmpState = PersistentTasksCustomMetaData.disassociateDeadNodes(tmpState);
|
||||
return ClusterState.builder(allocationService.disassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -59,9 +60,10 @@ public class NodeJoinController {
|
|||
private ElectionContext electionContext = null;
|
||||
|
||||
|
||||
public NodeJoinController(MasterService masterService, AllocationService allocationService, ElectMasterService electMaster) {
|
||||
public NodeJoinController(Settings settings, MasterService masterService, AllocationService allocationService,
|
||||
ElectMasterService electMaster) {
|
||||
this.masterService = masterService;
|
||||
joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) {
|
||||
joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger) {
|
||||
@Override
|
||||
public void clusterStatePublished(ClusterChangedEvent event) {
|
||||
electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
|
||||
|
|
|
@ -220,7 +220,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
this.membership = new MembershipAction(transportService, new MembershipListener(), onJoinValidators);
|
||||
this.joinThreadControl = new JoinThreadControl();
|
||||
|
||||
this.nodeJoinController = new NodeJoinController(masterService, allocationService, electMaster);
|
||||
this.nodeJoinController = new NodeJoinController(settings, masterService, allocationService, electMaster);
|
||||
this.nodeRemovalExecutor = new ZenNodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::submitRejoin, logger);
|
||||
|
||||
masterService.setClusterStateSupplier(this::clusterState);
|
||||
|
|
|
@ -67,7 +67,8 @@ public class ClusterSerializationTests extends ESAllocationTestCase {
|
|||
.add(newNode("node3")).localNodeId("node1").masterNodeId("node2").build();
|
||||
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("clusterName1"))
|
||||
.nodes(nodes).metaData(metaData).routingTable(routingTable).build();
|
||||
.nodes(nodes).metaData(metaData).routingTable(routingTable)
|
||||
.minimumMasterNodesOnPublishingMaster(randomIntBetween(-1, 10)).build();
|
||||
|
||||
AllocationService strategy = createAllocationService();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(strategy.reroute(clusterState, "reroute").routingTable()).build();
|
||||
|
@ -78,6 +79,9 @@ public class ClusterSerializationTests extends ESAllocationTestCase {
|
|||
assertThat(serializedClusterState.getClusterName().value(), equalTo(clusterState.getClusterName().value()));
|
||||
|
||||
assertThat(serializedClusterState.routingTable().toString(), equalTo(clusterState.routingTable().toString()));
|
||||
|
||||
assertThat(serializedClusterState.getMinimumMasterNodesOnPublishingMaster(),
|
||||
equalTo(clusterState.getMinimumMasterNodesOnPublishingMaster()));
|
||||
}
|
||||
|
||||
public void testRoutingTableSerialization() throws Exception {
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.discovery.zen;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
|
||||
import static org.elasticsearch.test.InternalTestCluster.nameFilter;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.isIn;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0)
|
||||
public class MinimumMasterNodesInClusterStateIT extends ESIntegTestCase {
|
||||
|
||||
public void testMasterPublishes() throws Exception {
|
||||
final String firstNode = internalCluster().startNode();
|
||||
|
||||
{
|
||||
final ClusterState localState
|
||||
= client(firstNode).admin().cluster().state(new ClusterStateRequest().local(true)).get().getState();
|
||||
assertThat(localState.getMinimumMasterNodesOnPublishingMaster(), equalTo(1));
|
||||
assertFalse(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(localState.metaData().settings()));
|
||||
}
|
||||
|
||||
final List<String> secondThirdNodes = internalCluster().startNodes(2);
|
||||
assertThat(internalCluster().getMasterName(), equalTo(firstNode));
|
||||
|
||||
final List<String> allNodes = Stream.concat(Stream.of(firstNode), secondThirdNodes.stream()).collect(Collectors.toList());
|
||||
for (final String node : allNodes) {
|
||||
final ClusterState localState = client(node).admin().cluster().state(new ClusterStateRequest().local(true)).get().getState();
|
||||
assertThat(localState.getMinimumMasterNodesOnPublishingMaster(), equalTo(1));
|
||||
assertThat(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(localState.metaData().settings()), equalTo(2));
|
||||
}
|
||||
|
||||
internalCluster().stopRandomNode(nameFilter(firstNode));
|
||||
assertThat(internalCluster().getMasterName(), isIn(secondThirdNodes));
|
||||
|
||||
for (final String node : secondThirdNodes) {
|
||||
final ClusterState localState = client(node).admin().cluster().state(new ClusterStateRequest().local(true)).get().getState();
|
||||
assertThat(localState.getMinimumMasterNodesOnPublishingMaster(), equalTo(2));
|
||||
assertThat(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(localState.metaData().settings()), equalTo(2));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -141,7 +141,7 @@ public class NodeJoinControllerTests extends ESTestCase {
|
|||
throw new IllegalStateException("method setupMasterServiceAndNodeJoinController can only be called once");
|
||||
}
|
||||
masterService = ClusterServiceUtils.createMasterService(threadPool, initialState);
|
||||
nodeJoinController = new NodeJoinController(masterService, createAllocationService(Settings.EMPTY),
|
||||
nodeJoinController = new NodeJoinController(Settings.EMPTY, masterService, createAllocationService(Settings.EMPTY),
|
||||
new ElectMasterService(Settings.EMPTY));
|
||||
}
|
||||
|
||||
|
|
|
@ -213,7 +213,7 @@ public class ClusterStateChanges {
|
|||
transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver);
|
||||
|
||||
nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
|
||||
joinTaskExecutor = new JoinTaskExecutor(allocationService, logger);
|
||||
joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger);
|
||||
}
|
||||
|
||||
public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {
|
||||
|
|
Loading…
Reference in New Issue