From 82b251adcf55a99def6ecb9f4f83416648fce9b2 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 7 May 2018 22:26:31 +0200 Subject: [PATCH] Auto-expand replicas when adding or removing nodes (#30423) Auto-expands replicas in the same cluster state update (instead of a follow-up reroute) where nodes are added or removed. Closes #1873, fixing an issue where nodes drop their copy of auto-expanded data when coming up, only to sync it again later. --- docs/CHANGELOG.asciidoc | 6 ++ .../cluster/metadata/AutoExpandReplicas.java | 64 +++++++++++++- .../MetaDataUpdateSettingsService.java | 84 +------------------ .../routing/allocation/AllocationService.java | 52 ++++++++++-- .../discovery/zen/NodeJoinController.java | 24 ++++-- .../discovery/zen/ZenDiscovery.java | 8 +- .../indices/cluster/ClusterStateChanges.java | 20 ++++- ...ClusterStateServiceRandomUpdatesTests.java | 23 ++--- 8 files changed, 162 insertions(+), 119 deletions(-) diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index e6f8ae37672..621ca5a6414 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -183,6 +183,12 @@ Rollup:: * Validate timezone in range queries to ensure they match the selected job when searching ({pull}30338[#30338]) + +Allocation:: + +Auto-expand replicas when adding or removing nodes to prevent shard copies from +being dropped and resynced when a data node rejoins the cluster ({pull}30423[#30423]) + //[float] //=== Regressions diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java b/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java index c2a23a378f1..5ef85d94970 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java @@ -18,23 +18,36 @@ */ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + /** * This class acts as a functional wrapper around the {@code index.auto_expand_replicas} setting. * This setting or rather it's value is expanded into a min and max value which requires special handling * based on the number of datanodes in the cluster. This class handles all the parsing and streamlines the access to these values. */ -final class AutoExpandReplicas { +public final class AutoExpandReplicas { // the value we recognize in the "max" position to mean all the nodes private static final String ALL_NODES_VALUE = "all"; - public static final Setting SETTING = new Setting<>(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "false", (value) -> { + + private static final AutoExpandReplicas FALSE_INSTANCE = new AutoExpandReplicas(0, 0, false); + + public static final Setting SETTING = new Setting<>(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "false", + AutoExpandReplicas::parse, Property.Dynamic, Property.IndexScope); + + private static AutoExpandReplicas parse(String value) { final int min; final int max; if (Booleans.isFalse(value)) { - return new AutoExpandReplicas(0, 0, false); + return FALSE_INSTANCE; } final int dash = value.indexOf('-'); if (-1 == dash) { @@ -57,7 +70,7 @@ final class AutoExpandReplicas { } } return new AutoExpandReplicas(min, max, true); - }, Property.Dynamic, Property.IndexScope); + } private final int minReplicas; private final int maxReplicas; @@ -80,6 +93,24 @@ final class AutoExpandReplicas { return Math.min(maxReplicas, numDataNodes-1); } + Optional getDesiredNumberOfReplicas(int numDataNodes) { + if (enabled) { + final int min = getMinReplicas(); + final int max = getMaxReplicas(numDataNodes); + int numberOfReplicas = numDataNodes - 1; + if (numberOfReplicas < min) { + numberOfReplicas = min; + } else if (numberOfReplicas > max) { + numberOfReplicas = max; + } + + if (numberOfReplicas >= min && numberOfReplicas <= max) { + return Optional.of(numberOfReplicas); + } + } + return Optional.empty(); + } + @Override public String toString() { return enabled ? minReplicas + "-" + maxReplicas : "false"; @@ -88,6 +119,31 @@ final class AutoExpandReplicas { boolean isEnabled() { return enabled; } + + /** + * Checks if the are replicas with the auto-expand feature that need to be adapted. + * Returns a map of updates, which maps the indices to be updated to the desired number of replicas. + * The map has the desired number of replicas as key and the indices to update as value, as this allows the result + * of this method to be directly applied to RoutingTable.Builder#updateNumberOfReplicas. + */ + public static Map> getAutoExpandReplicaChanges(MetaData metaData, DiscoveryNodes discoveryNodes) { + // used for translating "all" to a number + final int dataNodeCount = discoveryNodes.getDataNodes().size(); + + Map> nrReplicasChanged = new HashMap<>(); + + for (final IndexMetaData indexMetaData : metaData) { + if (indexMetaData.getState() != IndexMetaData.State.CLOSE) { + AutoExpandReplicas autoExpandReplicas = SETTING.get(indexMetaData.getSettings()); + autoExpandReplicas.getDesiredNumberOfReplicas(dataNodeCount).ifPresent(numberOfReplicas -> { + if (numberOfReplicas != indexMetaData.getNumberOfReplicas()) { + nrReplicasChanged.computeIfAbsent(numberOfReplicas, ArrayList::new).add(indexMetaData.getIndex().getName()); + } + }); + } + } + return nrReplicasChanged; + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java index 59c38be50e8..ce5ad12a53d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java @@ -25,9 +25,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeSettingsClusterStateUpdateRequest; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -42,16 +40,12 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; @@ -61,7 +55,7 @@ import static org.elasticsearch.action.support.ContextPreservingActionListener.w /** * Service responsible for submitting update index settings requests */ -public class MetaDataUpdateSettingsService extends AbstractComponent implements ClusterStateListener { +public class MetaDataUpdateSettingsService extends AbstractComponent { private final ClusterService clusterService; @@ -77,87 +71,11 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements super(settings); this.clusterService = clusterService; this.threadPool = threadPool; - this.clusterService.addListener(this); this.allocationService = allocationService; this.indexScopedSettings = indexScopedSettings; this.indicesService = indicesService; } - @Override - public void clusterChanged(ClusterChangedEvent event) { - // update an index with number of replicas based on data nodes if possible - if (!event.state().nodes().isLocalNodeElectedMaster()) { - return; - } - // we will want to know this for translating "all" to a number - final int dataNodeCount = event.state().nodes().getDataNodes().size(); - - Map> nrReplicasChanged = new HashMap<>(); - // we need to do this each time in case it was changed by update settings - for (final IndexMetaData indexMetaData : event.state().metaData()) { - AutoExpandReplicas autoExpandReplicas = IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetaData.getSettings()); - if (autoExpandReplicas.isEnabled()) { - /* - * we have to expand the number of replicas for this index to at least min and at most max nodes here - * so we are bumping it up if we have to or reduce it depending on min/max and the number of datanodes. - * If we change the number of replicas we just let the shard allocator do it's thing once we updated it - * since it goes through the index metadata to figure out if something needs to be done anyway. Do do that - * we issue a cluster settings update command below and kicks off a reroute. - */ - final int min = autoExpandReplicas.getMinReplicas(); - final int max = autoExpandReplicas.getMaxReplicas(dataNodeCount); - int numberOfReplicas = dataNodeCount - 1; - if (numberOfReplicas < min) { - numberOfReplicas = min; - } else if (numberOfReplicas > max) { - numberOfReplicas = max; - } - // same value, nothing to do there - if (numberOfReplicas == indexMetaData.getNumberOfReplicas()) { - continue; - } - - if (numberOfReplicas >= min && numberOfReplicas <= max) { - - if (!nrReplicasChanged.containsKey(numberOfReplicas)) { - nrReplicasChanged.put(numberOfReplicas, new ArrayList<>()); - } - - nrReplicasChanged.get(numberOfReplicas).add(indexMetaData.getIndex()); - } - } - } - - if (nrReplicasChanged.size() > 0) { - // update settings and kick of a reroute (implicit) for them to take effect - for (final Integer fNumberOfReplicas : nrReplicasChanged.keySet()) { - Settings settings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, fNumberOfReplicas).build(); - final List indices = nrReplicasChanged.get(fNumberOfReplicas); - - UpdateSettingsClusterStateUpdateRequest updateRequest = new UpdateSettingsClusterStateUpdateRequest() - .indices(indices.toArray(new Index[indices.size()])).settings(settings) - .ackTimeout(TimeValue.timeValueMillis(0)) //no need to wait for ack here - .masterNodeTimeout(TimeValue.timeValueMinutes(10)); - - updateSettings(updateRequest, new ActionListener() { - @Override - public void onResponse(ClusterStateUpdateResponse response) { - for (Index index : indices) { - logger.info("{} auto expanded replicas to [{}]", index, fNumberOfReplicas); - } - } - - @Override - public void onFailure(Exception t) { - for (Index index : indices) { - logger.warn("{} fail to auto expand replicas to [{}]", index, fNumberOfReplicas); - } - } - }); - } - } - } - public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener listener) { final Settings normalizedSettings = Settings.builder().put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX).build(); Settings.Builder settingsForClosedIndices = Settings.builder(); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index e6032c52585..deb10b83b5a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterStateHealth; +import org.elasticsearch.cluster.metadata.AutoExpandReplicas; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.RoutingNode; @@ -46,6 +47,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @@ -206,11 +208,12 @@ public class AllocationService extends AbstractComponent { * unassigned an shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas * if needed. */ - public ClusterState deassociateDeadNodes(final ClusterState clusterState, boolean reroute, String reason) { - RoutingNodes routingNodes = getMutableRoutingNodes(clusterState); + public ClusterState deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) { + ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState); + RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards routingNodes.unassigned().shuffle(); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState, + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState, clusterInfoService.getClusterInfo(), currentNanoTime()); // first, clear from the shards any node id they used to belong to that is now dead @@ -220,12 +223,40 @@ public class AllocationService extends AbstractComponent { reroute(allocation); } - if (allocation.routingNodesChanged() == false) { + if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) { return clusterState; } return buildResultAndLogHealthChange(clusterState, allocation, reason); } + /** + * Checks if the are replicas with the auto-expand feature that need to be adapted. + * Returns an updated cluster state if changes were necessary, or the identical cluster if no changes were required. + */ + private ClusterState adaptAutoExpandReplicas(ClusterState clusterState) { + final Map> autoExpandReplicaChanges = + AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), clusterState.nodes()); + if (autoExpandReplicaChanges.isEmpty()) { + return clusterState; + } else { + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(clusterState.routingTable()); + final MetaData.Builder metaDataBuilder = MetaData.builder(clusterState.metaData()); + for (Map.Entry> entry : autoExpandReplicaChanges.entrySet()) { + final int numberOfReplicas = entry.getKey(); + final String[] indices = entry.getValue().toArray(new String[entry.getValue().size()]); + // we do *not* update the in sync allocation ids as they will be removed upon the first index + // operation which make these copies stale + routingTableBuilder.updateNumberOfReplicas(numberOfReplicas, indices); + metaDataBuilder.updateNumberOfReplicas(numberOfReplicas, indices); + logger.info("updating number_of_replicas to [{}] for indices {}", numberOfReplicas, indices); + } + final ClusterState fixedState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()) + .metaData(metaDataBuilder).build(); + assert AutoExpandReplicas.getAutoExpandReplicaChanges(fixedState.metaData(), fixedState.nodes()).isEmpty(); + return fixedState; + } + } + /** * Removes delay markers from unassigned shards based on current time stamp. */ @@ -301,6 +332,7 @@ public class AllocationService extends AbstractComponent { if (retryFailed) { resetFailedAllocationCounter(allocation); } + reroute(allocation); return new CommandsResult(explanations, buildResultAndLogHealthChange(clusterState, allocation, "reroute commands")); } @@ -320,15 +352,17 @@ public class AllocationService extends AbstractComponent { *

* If the same instance of ClusterState is returned, then no change has been made. */ - protected ClusterState reroute(final ClusterState clusterState, String reason, boolean debug) { - RoutingNodes routingNodes = getMutableRoutingNodes(clusterState); + protected ClusterState reroute(ClusterState clusterState, String reason, boolean debug) { + ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState); + + RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards routingNodes.unassigned().shuffle(); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState, + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState, clusterInfoService.getClusterInfo(), currentNanoTime()); allocation.debugDecision(debug); reroute(allocation); - if (allocation.routingNodesChanged() == false) { + if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) { return clusterState; } return buildResultAndLogHealthChange(clusterState, allocation, reason); @@ -353,6 +387,8 @@ public class AllocationService extends AbstractComponent { private void reroute(RoutingAllocation allocation) { assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes"; + assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() : + "auto-expand replicas out of sync with number of nodes in the cluster"; // now allocate all the unassigned to available nodes if (allocation.routingNodes().unassigned().size() > 0) { diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index e36497d0916..e59fc8ad513 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -58,9 +58,7 @@ import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK public class NodeJoinController extends AbstractComponent { private final MasterService masterService; - private final AllocationService allocationService; - private final ElectMasterService electMaster; - private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(); + private final JoinTaskExecutor joinTaskExecutor; // this is set while trying to become a master // mutation should be done under lock @@ -71,8 +69,7 @@ public class NodeJoinController extends AbstractComponent { Settings settings) { super(settings); this.masterService = masterService; - this.allocationService = allocationService; - this.electMaster = electMaster; + joinTaskExecutor = new JoinTaskExecutor(allocationService, electMaster, logger); } /** @@ -404,7 +401,20 @@ public class NodeJoinController extends AbstractComponent { } }; - class JoinTaskExecutor implements ClusterStateTaskExecutor { + // visible for testing + public static class JoinTaskExecutor implements ClusterStateTaskExecutor { + + private final AllocationService allocationService; + + private final ElectMasterService electMasterService; + + private final Logger logger; + + public JoinTaskExecutor(AllocationService allocationService, ElectMasterService electMasterService, Logger logger) { + this.allocationService = allocationService; + this.electMasterService = electMasterService; + this.logger = logger; + } @Override public ClusterTasksResult execute(ClusterState currentState, List joiningNodes) throws Exception { @@ -512,7 +522,7 @@ public class NodeJoinController extends AbstractComponent { @Override public void clusterStatePublished(ClusterChangedEvent event) { - NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state()); + electMasterService.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state()); } } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 79ba5879743..4621e6769e9 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -558,19 +558,19 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover } // visible for testing - static class NodeRemovalClusterStateTaskExecutor implements ClusterStateTaskExecutor, ClusterStateTaskListener { + public static class NodeRemovalClusterStateTaskExecutor implements ClusterStateTaskExecutor, ClusterStateTaskListener { private final AllocationService allocationService; private final ElectMasterService electMasterService; private final Consumer rejoin; private final Logger logger; - static class Task { + public static class Task { private final DiscoveryNode node; private final String reason; - Task(final DiscoveryNode node, final String reason) { + public Task(final DiscoveryNode node, final String reason) { this.node = node; this.reason = reason; } @@ -589,7 +589,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover } } - NodeRemovalClusterStateTaskExecutor( + public NodeRemovalClusterStateTaskExecutor( final AllocationService allocationService, final ElectMasterService electMasterService, final Consumer rejoin, diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 8af19aa9ac1..9e8638af249 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -72,6 +72,9 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.discovery.zen.ElectMasterService; +import org.elasticsearch.discovery.zen.NodeJoinController; +import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.index.IndexService; @@ -117,6 +120,9 @@ public class ClusterStateChanges extends AbstractComponent { private final TransportClusterRerouteAction transportClusterRerouteAction; private final TransportCreateIndexAction transportCreateIndexAction; + private final ZenDiscovery.NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; + private final NodeJoinController.JoinTaskExecutor joinTaskExecutor; + public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool threadPool) { super(Settings.builder().put(PATH_HOME_SETTING.getKey(), "dummy").build()); @@ -191,6 +197,11 @@ public class ClusterStateChanges extends AbstractComponent { transportService, clusterService, threadPool, allocationService, actionFilters, indexNameExpressionResolver); transportCreateIndexAction = new TransportCreateIndexAction(settings, transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver); + + ElectMasterService electMasterService = new ElectMasterService(settings); + nodeRemovalExecutor = new ZenDiscovery.NodeRemovalClusterStateTaskExecutor(allocationService, electMasterService, + s -> { throw new AssertionError("rejoin not implemented"); }, logger); + joinTaskExecutor = new NodeJoinController.JoinTaskExecutor(allocationService, electMasterService, logger); } public ClusterState createIndex(ClusterState state, CreateIndexRequest request) { @@ -217,8 +228,13 @@ public class ClusterStateChanges extends AbstractComponent { return execute(transportClusterRerouteAction, request, state); } - public ClusterState deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) { - return allocationService.deassociateDeadNodes(clusterState, reroute, reason); + public ClusterState addNodes(ClusterState clusterState, List nodes) { + return runTasks(joinTaskExecutor, clusterState, nodes); + } + + public ClusterState removeNodes(ClusterState clusterState, List nodes) { + return runTasks(nodeRemovalExecutor, clusterState, nodes.stream() + .map(n -> new ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task(n, "dummy reason")).collect(Collectors.toList())); } public ClusterState applyFailedShards(ClusterState clusterState, List failedShards) { diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 6079a9104d3..4a496167c80 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -70,6 +70,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -258,8 +259,14 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice } String name = "index_" + randomAlphaOfLength(15).toLowerCase(Locale.ROOT); Settings.Builder settingsBuilder = Settings.builder() - .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) - .put(SETTING_NUMBER_OF_REPLICAS, randomInt(2)); + .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)); + if (randomBoolean()) { + int min = randomInt(2); + int max = min + randomInt(3); + settingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, randomBoolean() ? min + "-" + max : min + "-all"); + } else { + settingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, randomInt(2)); + } CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE); state = cluster.createIndex(state, request); assertTrue(state.metaData().hasIndex(name)); @@ -345,9 +352,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice if (randomBoolean()) { // add node if (state.nodes().getSize() < 10) { - DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()).add(createNode()).build(); - state = ClusterState.builder(state).nodes(newNodes).build(); - state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave + state = cluster.addNodes(state, Collections.singletonList(createNode())); updateNodes(state, clusterStateServiceMap, indicesServiceSupplier); } } else { @@ -355,16 +360,12 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice if (state.nodes().getDataNodes().size() > 3) { DiscoveryNode discoveryNode = randomFrom(state.nodes().getNodes().values().toArray(DiscoveryNode.class)); if (discoveryNode.equals(state.nodes().getMasterNode()) == false) { - DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()).remove(discoveryNode.getId()).build(); - state = ClusterState.builder(state).nodes(newNodes).build(); - state = cluster.deassociateDeadNodes(state, true, "removed and added a node"); + state = cluster.removeNodes(state, Collections.singletonList(discoveryNode)); updateNodes(state, clusterStateServiceMap, indicesServiceSupplier); } if (randomBoolean()) { // and add it back - DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()).add(discoveryNode).build(); - state = ClusterState.builder(state).nodes(newNodes).build(); - state = cluster.reroute(state, new ClusterRerouteRequest()); + state = cluster.addNodes(state, Collections.singletonList(discoveryNode)); updateNodes(state, clusterStateServiceMap, indicesServiceSupplier); } }