Auto-expand replicas when adding or removing nodes (#30423)
Auto-expands replicas in the same cluster state update (instead of a follow-up reroute) where nodes are added or removed. Closes #1873, fixing an issue where nodes drop their copy of auto-expanded data when coming up, only to sync it again later.
This commit is contained in:
parent
44c6dcf5be
commit
82b251adcf
|
@ -183,6 +183,12 @@ Rollup::
|
|||
* Validate timezone in range queries to ensure they match the selected job when
|
||||
searching ({pull}30338[#30338])
|
||||
|
||||
|
||||
Allocation::
|
||||
|
||||
Auto-expand replicas when adding or removing nodes to prevent shard copies from
|
||||
being dropped and resynced when a data node rejoins the cluster ({pull}30423[#30423])
|
||||
|
||||
//[float]
|
||||
//=== Regressions
|
||||
|
||||
|
|
|
@ -18,23 +18,36 @@
|
|||
*/
|
||||
package org.elasticsearch.cluster.metadata;
|
||||
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* This class acts as a functional wrapper around the {@code index.auto_expand_replicas} setting.
|
||||
* This setting or rather it's value is expanded into a min and max value which requires special handling
|
||||
* based on the number of datanodes in the cluster. This class handles all the parsing and streamlines the access to these values.
|
||||
*/
|
||||
final class AutoExpandReplicas {
|
||||
public final class AutoExpandReplicas {
|
||||
// the value we recognize in the "max" position to mean all the nodes
|
||||
private static final String ALL_NODES_VALUE = "all";
|
||||
public static final Setting<AutoExpandReplicas> SETTING = new Setting<>(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "false", (value) -> {
|
||||
|
||||
private static final AutoExpandReplicas FALSE_INSTANCE = new AutoExpandReplicas(0, 0, false);
|
||||
|
||||
public static final Setting<AutoExpandReplicas> SETTING = new Setting<>(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "false",
|
||||
AutoExpandReplicas::parse, Property.Dynamic, Property.IndexScope);
|
||||
|
||||
private static AutoExpandReplicas parse(String value) {
|
||||
final int min;
|
||||
final int max;
|
||||
if (Booleans.isFalse(value)) {
|
||||
return new AutoExpandReplicas(0, 0, false);
|
||||
return FALSE_INSTANCE;
|
||||
}
|
||||
final int dash = value.indexOf('-');
|
||||
if (-1 == dash) {
|
||||
|
@ -57,7 +70,7 @@ final class AutoExpandReplicas {
|
|||
}
|
||||
}
|
||||
return new AutoExpandReplicas(min, max, true);
|
||||
}, Property.Dynamic, Property.IndexScope);
|
||||
}
|
||||
|
||||
private final int minReplicas;
|
||||
private final int maxReplicas;
|
||||
|
@ -80,6 +93,24 @@ final class AutoExpandReplicas {
|
|||
return Math.min(maxReplicas, numDataNodes-1);
|
||||
}
|
||||
|
||||
Optional<Integer> getDesiredNumberOfReplicas(int numDataNodes) {
|
||||
if (enabled) {
|
||||
final int min = getMinReplicas();
|
||||
final int max = getMaxReplicas(numDataNodes);
|
||||
int numberOfReplicas = numDataNodes - 1;
|
||||
if (numberOfReplicas < min) {
|
||||
numberOfReplicas = min;
|
||||
} else if (numberOfReplicas > max) {
|
||||
numberOfReplicas = max;
|
||||
}
|
||||
|
||||
if (numberOfReplicas >= min && numberOfReplicas <= max) {
|
||||
return Optional.of(numberOfReplicas);
|
||||
}
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return enabled ? minReplicas + "-" + maxReplicas : "false";
|
||||
|
@ -88,6 +119,31 @@ final class AutoExpandReplicas {
|
|||
boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the are replicas with the auto-expand feature that need to be adapted.
|
||||
* Returns a map of updates, which maps the indices to be updated to the desired number of replicas.
|
||||
* The map has the desired number of replicas as key and the indices to update as value, as this allows the result
|
||||
* of this method to be directly applied to RoutingTable.Builder#updateNumberOfReplicas.
|
||||
*/
|
||||
public static Map<Integer, List<String>> getAutoExpandReplicaChanges(MetaData metaData, DiscoveryNodes discoveryNodes) {
|
||||
// used for translating "all" to a number
|
||||
final int dataNodeCount = discoveryNodes.getDataNodes().size();
|
||||
|
||||
Map<Integer, List<String>> nrReplicasChanged = new HashMap<>();
|
||||
|
||||
for (final IndexMetaData indexMetaData : metaData) {
|
||||
if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
|
||||
AutoExpandReplicas autoExpandReplicas = SETTING.get(indexMetaData.getSettings());
|
||||
autoExpandReplicas.getDesiredNumberOfReplicas(dataNodeCount).ifPresent(numberOfReplicas -> {
|
||||
if (numberOfReplicas != indexMetaData.getNumberOfReplicas()) {
|
||||
nrReplicasChanged.computeIfAbsent(numberOfReplicas, ArrayList::new).add(indexMetaData.getIndex().getName());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
return nrReplicasChanged;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -25,9 +25,7 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
|
||||
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeSettingsClusterStateUpdateRequest;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
|
@ -42,16 +40,12 @@ import org.elasticsearch.common.regex.Regex;
|
|||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -61,7 +55,7 @@ import static org.elasticsearch.action.support.ContextPreservingActionListener.w
|
|||
/**
|
||||
* Service responsible for submitting update index settings requests
|
||||
*/
|
||||
public class MetaDataUpdateSettingsService extends AbstractComponent implements ClusterStateListener {
|
||||
public class MetaDataUpdateSettingsService extends AbstractComponent {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
|
@ -77,87 +71,11 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
|
|||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService.addListener(this);
|
||||
this.allocationService = allocationService;
|
||||
this.indexScopedSettings = indexScopedSettings;
|
||||
this.indicesService = indicesService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
// update an index with number of replicas based on data nodes if possible
|
||||
if (!event.state().nodes().isLocalNodeElectedMaster()) {
|
||||
return;
|
||||
}
|
||||
// we will want to know this for translating "all" to a number
|
||||
final int dataNodeCount = event.state().nodes().getDataNodes().size();
|
||||
|
||||
Map<Integer, List<Index>> nrReplicasChanged = new HashMap<>();
|
||||
// we need to do this each time in case it was changed by update settings
|
||||
for (final IndexMetaData indexMetaData : event.state().metaData()) {
|
||||
AutoExpandReplicas autoExpandReplicas = IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetaData.getSettings());
|
||||
if (autoExpandReplicas.isEnabled()) {
|
||||
/*
|
||||
* we have to expand the number of replicas for this index to at least min and at most max nodes here
|
||||
* so we are bumping it up if we have to or reduce it depending on min/max and the number of datanodes.
|
||||
* If we change the number of replicas we just let the shard allocator do it's thing once we updated it
|
||||
* since it goes through the index metadata to figure out if something needs to be done anyway. Do do that
|
||||
* we issue a cluster settings update command below and kicks off a reroute.
|
||||
*/
|
||||
final int min = autoExpandReplicas.getMinReplicas();
|
||||
final int max = autoExpandReplicas.getMaxReplicas(dataNodeCount);
|
||||
int numberOfReplicas = dataNodeCount - 1;
|
||||
if (numberOfReplicas < min) {
|
||||
numberOfReplicas = min;
|
||||
} else if (numberOfReplicas > max) {
|
||||
numberOfReplicas = max;
|
||||
}
|
||||
// same value, nothing to do there
|
||||
if (numberOfReplicas == indexMetaData.getNumberOfReplicas()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (numberOfReplicas >= min && numberOfReplicas <= max) {
|
||||
|
||||
if (!nrReplicasChanged.containsKey(numberOfReplicas)) {
|
||||
nrReplicasChanged.put(numberOfReplicas, new ArrayList<>());
|
||||
}
|
||||
|
||||
nrReplicasChanged.get(numberOfReplicas).add(indexMetaData.getIndex());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (nrReplicasChanged.size() > 0) {
|
||||
// update settings and kick of a reroute (implicit) for them to take effect
|
||||
for (final Integer fNumberOfReplicas : nrReplicasChanged.keySet()) {
|
||||
Settings settings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, fNumberOfReplicas).build();
|
||||
final List<Index> indices = nrReplicasChanged.get(fNumberOfReplicas);
|
||||
|
||||
UpdateSettingsClusterStateUpdateRequest updateRequest = new UpdateSettingsClusterStateUpdateRequest()
|
||||
.indices(indices.toArray(new Index[indices.size()])).settings(settings)
|
||||
.ackTimeout(TimeValue.timeValueMillis(0)) //no need to wait for ack here
|
||||
.masterNodeTimeout(TimeValue.timeValueMinutes(10));
|
||||
|
||||
updateSettings(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
|
||||
@Override
|
||||
public void onResponse(ClusterStateUpdateResponse response) {
|
||||
for (Index index : indices) {
|
||||
logger.info("{} auto expanded replicas to [{}]", index, fNumberOfReplicas);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception t) {
|
||||
for (Index index : indices) {
|
||||
logger.warn("{} fail to auto expand replicas to [{}]", index, fNumberOfReplicas);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
|
||||
final Settings normalizedSettings = Settings.builder().put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX).build();
|
||||
Settings.Builder settingsForClosedIndices = Settings.builder();
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.RestoreInProgress;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.health.ClusterStateHealth;
|
||||
import org.elasticsearch.cluster.metadata.AutoExpandReplicas;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
|
@ -46,6 +47,7 @@ import java.util.Collections;
|
|||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -206,11 +208,12 @@ public class AllocationService extends AbstractComponent {
|
|||
* unassigned an shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas
|
||||
* if needed.
|
||||
*/
|
||||
public ClusterState deassociateDeadNodes(final ClusterState clusterState, boolean reroute, String reason) {
|
||||
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
|
||||
public ClusterState deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
|
||||
ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);
|
||||
RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
|
||||
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
|
||||
routingNodes.unassigned().shuffle();
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
|
||||
clusterInfoService.getClusterInfo(), currentNanoTime());
|
||||
|
||||
// first, clear from the shards any node id they used to belong to that is now dead
|
||||
|
@ -220,12 +223,40 @@ public class AllocationService extends AbstractComponent {
|
|||
reroute(allocation);
|
||||
}
|
||||
|
||||
if (allocation.routingNodesChanged() == false) {
|
||||
if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
|
||||
return clusterState;
|
||||
}
|
||||
return buildResultAndLogHealthChange(clusterState, allocation, reason);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the are replicas with the auto-expand feature that need to be adapted.
|
||||
* Returns an updated cluster state if changes were necessary, or the identical cluster if no changes were required.
|
||||
*/
|
||||
private ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
|
||||
final Map<Integer, List<String>> autoExpandReplicaChanges =
|
||||
AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), clusterState.nodes());
|
||||
if (autoExpandReplicaChanges.isEmpty()) {
|
||||
return clusterState;
|
||||
} else {
|
||||
final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(clusterState.routingTable());
|
||||
final MetaData.Builder metaDataBuilder = MetaData.builder(clusterState.metaData());
|
||||
for (Map.Entry<Integer, List<String>> entry : autoExpandReplicaChanges.entrySet()) {
|
||||
final int numberOfReplicas = entry.getKey();
|
||||
final String[] indices = entry.getValue().toArray(new String[entry.getValue().size()]);
|
||||
// we do *not* update the in sync allocation ids as they will be removed upon the first index
|
||||
// operation which make these copies stale
|
||||
routingTableBuilder.updateNumberOfReplicas(numberOfReplicas, indices);
|
||||
metaDataBuilder.updateNumberOfReplicas(numberOfReplicas, indices);
|
||||
logger.info("updating number_of_replicas to [{}] for indices {}", numberOfReplicas, indices);
|
||||
}
|
||||
final ClusterState fixedState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build())
|
||||
.metaData(metaDataBuilder).build();
|
||||
assert AutoExpandReplicas.getAutoExpandReplicaChanges(fixedState.metaData(), fixedState.nodes()).isEmpty();
|
||||
return fixedState;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes delay markers from unassigned shards based on current time stamp.
|
||||
*/
|
||||
|
@ -301,6 +332,7 @@ public class AllocationService extends AbstractComponent {
|
|||
if (retryFailed) {
|
||||
resetFailedAllocationCounter(allocation);
|
||||
}
|
||||
|
||||
reroute(allocation);
|
||||
return new CommandsResult(explanations, buildResultAndLogHealthChange(clusterState, allocation, "reroute commands"));
|
||||
}
|
||||
|
@ -320,15 +352,17 @@ public class AllocationService extends AbstractComponent {
|
|||
* <p>
|
||||
* If the same instance of ClusterState is returned, then no change has been made.
|
||||
*/
|
||||
protected ClusterState reroute(final ClusterState clusterState, String reason, boolean debug) {
|
||||
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
|
||||
protected ClusterState reroute(ClusterState clusterState, String reason, boolean debug) {
|
||||
ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);
|
||||
|
||||
RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
|
||||
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
|
||||
routingNodes.unassigned().shuffle();
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
|
||||
clusterInfoService.getClusterInfo(), currentNanoTime());
|
||||
allocation.debugDecision(debug);
|
||||
reroute(allocation);
|
||||
if (allocation.routingNodesChanged() == false) {
|
||||
if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
|
||||
return clusterState;
|
||||
}
|
||||
return buildResultAndLogHealthChange(clusterState, allocation, reason);
|
||||
|
@ -353,6 +387,8 @@ public class AllocationService extends AbstractComponent {
|
|||
|
||||
private void reroute(RoutingAllocation allocation) {
|
||||
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";
|
||||
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :
|
||||
"auto-expand replicas out of sync with number of nodes in the cluster";
|
||||
|
||||
// now allocate all the unassigned to available nodes
|
||||
if (allocation.routingNodes().unassigned().size() > 0) {
|
||||
|
|
|
@ -58,9 +58,7 @@ import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK
|
|||
public class NodeJoinController extends AbstractComponent {
|
||||
|
||||
private final MasterService masterService;
|
||||
private final AllocationService allocationService;
|
||||
private final ElectMasterService electMaster;
|
||||
private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor();
|
||||
private final JoinTaskExecutor joinTaskExecutor;
|
||||
|
||||
// this is set while trying to become a master
|
||||
// mutation should be done under lock
|
||||
|
@ -71,8 +69,7 @@ public class NodeJoinController extends AbstractComponent {
|
|||
Settings settings) {
|
||||
super(settings);
|
||||
this.masterService = masterService;
|
||||
this.allocationService = allocationService;
|
||||
this.electMaster = electMaster;
|
||||
joinTaskExecutor = new JoinTaskExecutor(allocationService, electMaster, logger);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -404,7 +401,20 @@ public class NodeJoinController extends AbstractComponent {
|
|||
}
|
||||
};
|
||||
|
||||
class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {
|
||||
// visible for testing
|
||||
public static class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {
|
||||
|
||||
private final AllocationService allocationService;
|
||||
|
||||
private final ElectMasterService electMasterService;
|
||||
|
||||
private final Logger logger;
|
||||
|
||||
public JoinTaskExecutor(AllocationService allocationService, ElectMasterService electMasterService, Logger logger) {
|
||||
this.allocationService = allocationService;
|
||||
this.electMasterService = electMasterService;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterTasksResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {
|
||||
|
@ -512,7 +522,7 @@ public class NodeJoinController extends AbstractComponent {
|
|||
|
||||
@Override
|
||||
public void clusterStatePublished(ClusterChangedEvent event) {
|
||||
NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
|
||||
electMasterService.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -558,19 +558,19 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
}
|
||||
|
||||
// visible for testing
|
||||
static class NodeRemovalClusterStateTaskExecutor implements ClusterStateTaskExecutor<NodeRemovalClusterStateTaskExecutor.Task>, ClusterStateTaskListener {
|
||||
public static class NodeRemovalClusterStateTaskExecutor implements ClusterStateTaskExecutor<NodeRemovalClusterStateTaskExecutor.Task>, ClusterStateTaskListener {
|
||||
|
||||
private final AllocationService allocationService;
|
||||
private final ElectMasterService electMasterService;
|
||||
private final Consumer<String> rejoin;
|
||||
private final Logger logger;
|
||||
|
||||
static class Task {
|
||||
public static class Task {
|
||||
|
||||
private final DiscoveryNode node;
|
||||
private final String reason;
|
||||
|
||||
Task(final DiscoveryNode node, final String reason) {
|
||||
public Task(final DiscoveryNode node, final String reason) {
|
||||
this.node = node;
|
||||
this.reason = reason;
|
||||
}
|
||||
|
@ -589,7 +589,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
}
|
||||
}
|
||||
|
||||
NodeRemovalClusterStateTaskExecutor(
|
||||
public NodeRemovalClusterStateTaskExecutor(
|
||||
final AllocationService allocationService,
|
||||
final ElectMasterService electMasterService,
|
||||
final Consumer<String> rejoin,
|
||||
|
|
|
@ -72,6 +72,9 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.NodeJoinController;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.TestEnvironment;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
|
@ -117,6 +120,9 @@ public class ClusterStateChanges extends AbstractComponent {
|
|||
private final TransportClusterRerouteAction transportClusterRerouteAction;
|
||||
private final TransportCreateIndexAction transportCreateIndexAction;
|
||||
|
||||
private final ZenDiscovery.NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
|
||||
private final NodeJoinController.JoinTaskExecutor joinTaskExecutor;
|
||||
|
||||
public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool threadPool) {
|
||||
super(Settings.builder().put(PATH_HOME_SETTING.getKey(), "dummy").build());
|
||||
|
||||
|
@ -191,6 +197,11 @@ public class ClusterStateChanges extends AbstractComponent {
|
|||
transportService, clusterService, threadPool, allocationService, actionFilters, indexNameExpressionResolver);
|
||||
transportCreateIndexAction = new TransportCreateIndexAction(settings,
|
||||
transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver);
|
||||
|
||||
ElectMasterService electMasterService = new ElectMasterService(settings);
|
||||
nodeRemovalExecutor = new ZenDiscovery.NodeRemovalClusterStateTaskExecutor(allocationService, electMasterService,
|
||||
s -> { throw new AssertionError("rejoin not implemented"); }, logger);
|
||||
joinTaskExecutor = new NodeJoinController.JoinTaskExecutor(allocationService, electMasterService, logger);
|
||||
}
|
||||
|
||||
public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {
|
||||
|
@ -217,8 +228,13 @@ public class ClusterStateChanges extends AbstractComponent {
|
|||
return execute(transportClusterRerouteAction, request, state);
|
||||
}
|
||||
|
||||
public ClusterState deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
|
||||
return allocationService.deassociateDeadNodes(clusterState, reroute, reason);
|
||||
public ClusterState addNodes(ClusterState clusterState, List<DiscoveryNode> nodes) {
|
||||
return runTasks(joinTaskExecutor, clusterState, nodes);
|
||||
}
|
||||
|
||||
public ClusterState removeNodes(ClusterState clusterState, List<DiscoveryNode> nodes) {
|
||||
return runTasks(nodeRemovalExecutor, clusterState, nodes.stream()
|
||||
.map(n -> new ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task(n, "dummy reason")).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public ClusterState applyFailedShards(ClusterState clusterState, List<FailedShard> failedShards) {
|
||||
|
|
|
@ -70,6 +70,7 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
|
@ -258,8 +259,14 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
|
|||
}
|
||||
String name = "index_" + randomAlphaOfLength(15).toLowerCase(Locale.ROOT);
|
||||
Settings.Builder settingsBuilder = Settings.builder()
|
||||
.put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3))
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, randomInt(2));
|
||||
.put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3));
|
||||
if (randomBoolean()) {
|
||||
int min = randomInt(2);
|
||||
int max = min + randomInt(3);
|
||||
settingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, randomBoolean() ? min + "-" + max : min + "-all");
|
||||
} else {
|
||||
settingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, randomInt(2));
|
||||
}
|
||||
CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE);
|
||||
state = cluster.createIndex(state, request);
|
||||
assertTrue(state.metaData().hasIndex(name));
|
||||
|
@ -345,9 +352,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
|
|||
if (randomBoolean()) {
|
||||
// add node
|
||||
if (state.nodes().getSize() < 10) {
|
||||
DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()).add(createNode()).build();
|
||||
state = ClusterState.builder(state).nodes(newNodes).build();
|
||||
state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave
|
||||
state = cluster.addNodes(state, Collections.singletonList(createNode()));
|
||||
updateNodes(state, clusterStateServiceMap, indicesServiceSupplier);
|
||||
}
|
||||
} else {
|
||||
|
@ -355,16 +360,12 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
|
|||
if (state.nodes().getDataNodes().size() > 3) {
|
||||
DiscoveryNode discoveryNode = randomFrom(state.nodes().getNodes().values().toArray(DiscoveryNode.class));
|
||||
if (discoveryNode.equals(state.nodes().getMasterNode()) == false) {
|
||||
DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()).remove(discoveryNode.getId()).build();
|
||||
state = ClusterState.builder(state).nodes(newNodes).build();
|
||||
state = cluster.deassociateDeadNodes(state, true, "removed and added a node");
|
||||
state = cluster.removeNodes(state, Collections.singletonList(discoveryNode));
|
||||
updateNodes(state, clusterStateServiceMap, indicesServiceSupplier);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
// and add it back
|
||||
DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()).add(discoveryNode).build();
|
||||
state = ClusterState.builder(state).nodes(newNodes).build();
|
||||
state = cluster.reroute(state, new ClusterRerouteRequest());
|
||||
state = cluster.addNodes(state, Collections.singletonList(discoveryNode));
|
||||
updateNodes(state, clusterStateServiceMap, indicesServiceSupplier);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue