From c3da66d021b56aa5903037a13f1090591dcc92db Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 30 Aug 2017 20:55:11 -0600 Subject: [PATCH] Implement adaptive replica selection (#26128) * Implement adaptive replica selection This implements the selection algorithm described in the C3 paper for determining which copy of the data a query should be routed to. By using the service time EWMA, response time EWMA, and queue size EWMA we calculate the score of a node by piggybacking these metrics with each search request. Since Elasticsearch lacks the "broadcast to every copy" behavior that Cassandra has (as mentioned in the C3 paper) to update metrics after a node has been highly weighted, this implementation adjusts a node's response stats using the average of the its own and the "best" node's metrics. This is so that a long GC or other activity that may cause a node's rank to increase dramatically does not permanently keep a node from having requests routed to it, instead it will eventually lower its score back to the realm where it is a potential candidate for new queries. This feature is off by default and can be turned on with the dynamic setting `cluster.routing.use_adaptive_replica_selection`. Relates to #24915, however instead of `b=3` I used `b=4` (after benchmarking) * Randomly use adaptive replica selection for internal test cluster * Use an action name *prefix* for retrieving pending requests * Add unit test for replica selection * don't use adaptive replica selection in SearchPreferenceIT * Track client connections in a SearchTransportService instead of TransportService * Bind `entry` pieces in local variables * Add javadoc link to C3 paper and javadocs for stat adjustments * Bind entry's key and value to local variables * Remove unneeded actionNamePrefix parameter * Use conns.longValue() instead of cached Long * Add comments about removing entries from the map * Pull out bindings for `entry` in IndexShardRoutingTable * Use .compareTo instead of manually comparing * add assert for connections not being null and gte to 1 * Copy map for pending search connections instead of "live" map * Increase the number of pending search requests used for calculating rank when chosen When a node gets chosen, this increases the number of search counts for the winning node so that it will not be as likely to be chosen again for non-concurrent search requests. * Remove unused HashMap import * Rename rank -> rankShardsAndUpdateStats * Rename rankedActiveInitializingShardsIt -> activeInitializingShardsRankedIt * Instead of precalculating winning node, use "winning" shard from ranked list * Sort null ranked nodes before nodes that have a rank --- .../search/SearchExecutionStatsCollector.java | 2 +- .../action/search/SearchTransportService.java | 77 +++++++- .../action/search/TransportSearchAction.java | 3 +- .../routing/IndexShardRoutingTable.java | 165 ++++++++++++++++++ .../cluster/routing/OperationRouting.java | 52 +++++- .../common/settings/ClusterSettings.java | 2 + .../common/util/concurrent/EsExecutors.java | 4 - .../QueueResizingEsThreadPoolExecutor.java | 4 +- .../node/ResponseCollectorService.java | 77 ++++++-- .../search/query/QueryPhase.java | 2 +- .../routing/OperationRoutingTests.java | 88 ++++++++++ ...ueueResizingEsThreadPoolExecutorTests.java | 12 +- .../node/ResponseCollectorServiceTests.java | 4 +- .../search/preference/SearchPreferenceIT.java | 8 + .../search/query/QueryPhaseTests.java | 2 +- .../test/InternalTestCluster.java | 1 + 16 files changed, 458 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java b/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java index 72c3d5eaab6..0ffad5aa406 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java @@ -61,7 +61,7 @@ public final class SearchExecutionStatsCollector implements ActionListener 0 && queueSize > 0) { + if (serviceTimeEWMA > 0 && queueSize >= 0) { collector.addNodeStatistics(nodeId, queueSize, responseDuration, serviceTimeEWMA); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 755bfe48410..dba382aed6c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -50,6 +51,7 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TaskAwareTransportRequestHandler; import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; @@ -57,6 +59,9 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -80,6 +85,7 @@ public class SearchTransportService extends AbstractComponent { private final TransportService transportService; private final BiFunction responseWrapper; + private final Map clientConnections = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); public SearchTransportService(Settings settings, TransportService transportService, BiFunction responseWrapper) { @@ -131,7 +137,7 @@ public class SearchTransportService extends AbstractComponent { public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, DFS_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, DfsSearchResult::new)); + new ConnectionCountingHandler<>(listener, DfsSearchResult::new, clientConnections, connection.getNode().getId())); } public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, @@ -143,25 +149,26 @@ public class SearchTransportService extends AbstractComponent { final ActionListener handler = responseWrapper.apply(connection, listener); transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(handler, supplier)); + new ConnectionCountingHandler<>(handler, supplier, clientConnections, connection.getNode().getId())); } public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, QUERY_ID_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, QuerySearchResult::new)); + new ConnectionCountingHandler<>(listener, QuerySearchResult::new, clientConnections, connection.getNode().getId())); } public void sendExecuteScrollQuery(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, QUERY_SCROLL_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new)); + new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, clientConnections, connection.getNode().getId())); } public void sendExecuteScrollFetch(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, QUERY_FETCH_SCROLL_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new)); + new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new, + clientConnections, connection.getNode().getId())); } public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task, @@ -177,22 +184,31 @@ public class SearchTransportService extends AbstractComponent { private void sendExecuteFetch(Transport.Connection connection, String action, final ShardFetchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, action, request, task, - new ActionListenerResponseHandler<>(listener, FetchSearchResult::new)); + new ConnectionCountingHandler<>(listener, FetchSearchResult::new, clientConnections, connection.getNode().getId())); } /** * Used by {@link TransportSearchAction} to send the expand queries (field collapsing). */ void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task, - final ActionListener listener) { - transportService.sendChildRequest(transportService.getConnection(transportService.getLocalNode()), MultiSearchAction.NAME, request, - task, new ActionListenerResponseHandler<>(listener, MultiSearchResponse::new)); + final ActionListener listener) { + final Transport.Connection connection = transportService.getConnection(transportService.getLocalNode()); + transportService.sendChildRequest(connection, MultiSearchAction.NAME, request, task, + new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, clientConnections, connection.getNode().getId())); } public RemoteClusterService getRemoteClusterService() { return transportService.getRemoteClusterService(); } + /** + * Return a map of nodeId to pending number of search requests. + * This is a snapshot of the current pending search and not a live map. + */ + public Map getPendingSearchRequests() { + return new HashMap<>(clientConnections); + } + static class ScrollFreeContextRequest extends TransportRequest { private long id; @@ -486,4 +502,47 @@ public class SearchTransportService extends AbstractComponent { return transportService.getRemoteClusterService().getConnection(node, clusterAlias); } } + + final class ConnectionCountingHandler extends ActionListenerResponseHandler { + private final Map clientConnections; + private final String nodeId; + + ConnectionCountingHandler(final ActionListener listener, final Supplier responseSupplier, + final Map clientConnections, final String nodeId) { + super(listener, responseSupplier); + this.clientConnections = clientConnections; + this.nodeId = nodeId; + // Increment the number of connections for this node by one + clientConnections.compute(nodeId, (id, conns) -> conns == null ? 1 : conns + 1); + } + + @Override + public void handleResponse(Response response) { + super.handleResponse(response); + // Decrement the number of connections or remove it entirely if there are no more connections + // We need to remove the entry here so we don't leak when nodes go away forever + assert assertNodePresent(); + clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1); + } + + @Override + public void handleException(TransportException e) { + super.handleException(e); + // Decrement the number of connections or remove it entirely if there are no more connections + // We need to remove the entry here so we don't leak when nodes go away forever + assert assertNodePresent(); + clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1); + } + + private boolean assertNodePresent() { + clientConnections.compute(nodeId, (id, conns) -> { + assert conns != null : "number of connections for " + id + " is null, but should be an integer"; + assert conns >= 1 : "number of connections for " + id + " should be >= 1 but was " + conns; + return conns; + }); + // Always return true, there is additional asserting here, the boolean is just so this + // can be skipped when assertions are not enabled + return true; + } + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 795ccdd3f20..8400707e370 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -284,8 +284,9 @@ public class TransportSearchAction extends HandledTransportAction nodeSearchCounts = searchTransportService.getPendingSearchRequests(); GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState, - concreteIndices, routingMap, searchRequest.preference()); + concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts); GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, localIndices, remoteShardIterators); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 8ed06736b6b..4376980eca8 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -29,18 +29,24 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.ResponseCollectorService; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; +import java.util.OptionalDouble; import java.util.Set; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; @@ -261,6 +267,165 @@ public class IndexShardRoutingTable implements Iterable { return new PlainShardIterator(shardId, ordered); } + /** + * Returns an iterator over active and initializing shards, ordered by the adaptive replica + * selection forumla. Making sure though that its random within the active shards of the same + * (or missing) rank, and initializing shards are the last to iterate through. + */ + public ShardIterator activeInitializingShardsRankedIt(@Nullable ResponseCollectorService collector, + @Nullable Map nodeSearchCounts) { + final int seed = shuffler.nextSeed(); + if (allInitializingShards.isEmpty()) { + return new PlainShardIterator(shardId, + rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts)); + } + + ArrayList ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size()); + List rankedActiveShards = + rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts); + ordered.addAll(rankedActiveShards); + List rankedInitializingShards = + rankShardsAndUpdateStats(allInitializingShards, collector, nodeSearchCounts); + ordered.addAll(rankedInitializingShards); + return new PlainShardIterator(shardId, ordered); + } + + private static Set getAllNodeIds(final List shards) { + final Set nodeIds = new HashSet<>(); + for (ShardRouting shard : shards) { + nodeIds.add(shard.currentNodeId()); + } + return nodeIds; + } + + private static Map> + getNodeStats(final Set nodeIds, final ResponseCollectorService collector) { + + final Map> nodeStats = new HashMap<>(nodeIds.size()); + for (String nodeId : nodeIds) { + nodeStats.put(nodeId, collector.getNodeStatistics(nodeId)); + } + return nodeStats; + } + + private static Map rankNodes(final Map> nodeStats, + final Map nodeSearchCounts) { + final Map nodeRanks = new HashMap<>(nodeStats.size()); + for (Map.Entry> entry : nodeStats.entrySet()) { + Optional maybeStats = entry.getValue(); + maybeStats.ifPresent(stats -> { + final String nodeId = entry.getKey(); + nodeRanks.put(nodeId, stats.rank(nodeSearchCounts.getOrDefault(nodeId, 1L))); + }); + } + return nodeRanks; + } + + /** + * Adjust the for all other nodes' collected stats. In the original ranking paper there is no need to adjust other nodes' stats because + * Cassandra sends occasional requests to all copies of the data, so their stats will be updated during that broadcast phase. In + * Elasticsearch, however, we do not have that sort of broadcast-to-all behavior. In order to prevent a node that gets a high score and + * then never gets any more requests, we must ensure it eventually returns to a more normal score and can be a candidate for serving + * requests. + * + * This adjustment takes the "winning" node's statistics and adds the average of those statistics with each non-winning node. Let's say + * the winning node had a queue size of 10 and a non-winning node had a queue of 18. The average queue size is (10 + 18) / 2 = 14 so the + * non-winning node will have statistics added for a queue size of 14. This is repeated for the response time and service times as well. + */ + private static void adjustStats(final ResponseCollectorService collector, + final Map> nodeStats, + final String minNodeId, + final ResponseCollectorService.ComputedNodeStats minStats) { + if (minNodeId != null) { + for (Map.Entry> entry : nodeStats.entrySet()) { + final String nodeId = entry.getKey(); + final Optional maybeStats = entry.getValue(); + if (nodeId.equals(minNodeId) == false && maybeStats.isPresent()) { + final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get(); + final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2; + final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2; + final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2; + collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService); + } + } + } + } + + private static List rankShardsAndUpdateStats(List shards, final ResponseCollectorService collector, + final Map nodeSearchCounts) { + if (collector == null || nodeSearchCounts == null || shards.size() <= 1) { + return shards; + } + + // Retrieve which nodes we can potentially send the query to + final Set nodeIds = getAllNodeIds(shards); + final int nodeCount = nodeIds.size(); + + final Map> nodeStats = getNodeStats(nodeIds, collector); + + // Retrieve all the nodes the shards exist on + final Map nodeRanks = rankNodes(nodeStats, nodeSearchCounts); + + // sort all shards based on the shard rank + ArrayList sortedShards = new ArrayList<>(shards); + Collections.sort(sortedShards, new NodeRankComparator(nodeRanks)); + + // adjust the non-winner nodes' stats so they will get a chance to receive queries + if (sortedShards.size() > 1) { + ShardRouting minShard = sortedShards.get(0); + // If the winning shard is not started we are ranking initializing + // shards, don't bother to do adjustments + if (minShard.started()) { + String minNodeId = minShard.currentNodeId(); + Optional maybeMinStats = nodeStats.get(minNodeId); + if (maybeMinStats.isPresent()) { + adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get()); + // Increase the number of searches for the "winning" node by one. + // Note that this doesn't actually affect the "real" counts, instead + // it only affects the captured node search counts, which is + // captured once for each query in TransportSearchAction + nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1); + } + } + } + + return sortedShards; + } + + private static class NodeRankComparator implements Comparator { + private final Map nodeRanks; + + NodeRankComparator(Map nodeRanks) { + this.nodeRanks = nodeRanks; + } + + @Override + public int compare(ShardRouting s1, ShardRouting s2) { + if (s1.currentNodeId().equals(s2.currentNodeId())) { + // these shards on the the same node + return 0; + } + Double shard1rank = nodeRanks.get(s1.currentNodeId()); + Double shard2rank = nodeRanks.get(s2.currentNodeId()); + if (shard1rank != null) { + if (shard2rank != null) { + return shard1rank.compareTo(shard2rank); + } else { + // place non-nulls after null values + return 1; + } + } else { + if (shard2rank != null) { + // place nulls before non-null values + return -1; + } else { + // Both nodes do not have stats, they are equal + return 0; + } + } + } + } + /** * Returns true if no primaries are active or initializing for this shard */ diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 8a88ee1751a..4b3f254c9f5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -28,10 +28,12 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.node.ResponseCollectorService; import java.util.ArrayList; import java.util.Arrays; @@ -43,13 +45,24 @@ import java.util.stream.Collectors; public class OperationRouting extends AbstractComponent { + public static final Setting USE_ADAPTIVE_REPLICA_SELECTION_SETTING = + Setting.boolSetting("cluster.routing.use_adaptive_replica_selection", false, + Setting.Property.Dynamic, Setting.Property.NodeScope); + private String[] awarenessAttributes; + private boolean useAdaptiveReplicaSelection; public OperationRouting(Settings settings, ClusterSettings clusterSettings) { super(settings); this.awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); + this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes); + clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection); + } + + void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { + this.useAdaptiveReplicaSelection = useAdaptiveReplicaSelection; } private void setAwarenessAttributes(String[] awarenessAttributes) { @@ -61,19 +74,33 @@ public class OperationRouting extends AbstractComponent { } public ShardIterator getShards(ClusterState clusterState, String index, String id, @Nullable String routing, @Nullable String preference) { - return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference); + return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, null, null); } public ShardIterator getShards(ClusterState clusterState, String index, int shardId, @Nullable String preference) { final IndexShardRoutingTable indexShard = clusterState.getRoutingTable().shardRoutingTable(index, shardId); - return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference); + return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, null, null); } - public GroupShardsIterator searchShards(ClusterState clusterState, String[] concreteIndices, @Nullable Map> routing, @Nullable String preference) { + public GroupShardsIterator searchShards(ClusterState clusterState, + String[] concreteIndices, + @Nullable Map> routing, + @Nullable String preference) { + return searchShards(clusterState, concreteIndices, routing, preference, null, null); + } + + + public GroupShardsIterator searchShards(ClusterState clusterState, + String[] concreteIndices, + @Nullable Map> routing, + @Nullable String preference, + @Nullable ResponseCollectorService collectorService, + @Nullable Map nodeCounts) { final Set shards = computeTargetedShards(clusterState, concreteIndices, routing); final Set set = new HashSet<>(shards.size()); for (IndexShardRoutingTable shard : shards) { - ShardIterator iterator = preferenceActiveShardIterator(shard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference); + ShardIterator iterator = preferenceActiveShardIterator(shard, + clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, collectorService, nodeCounts); if (iterator != null) { set.add(iterator); } @@ -107,10 +134,17 @@ public class OperationRouting extends AbstractComponent { return set; } - private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId, DiscoveryNodes nodes, @Nullable String preference) { + private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId, + DiscoveryNodes nodes, @Nullable String preference, + @Nullable ResponseCollectorService collectorService, + @Nullable Map nodeCounts) { if (preference == null || preference.isEmpty()) { if (awarenessAttributes.length == 0) { - return indexShard.activeInitializingShardsRandomIt(); + if (useAdaptiveReplicaSelection) { + return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); + } else { + return indexShard.activeInitializingShardsRandomIt(); + } } else { return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes); } @@ -141,7 +175,11 @@ public class OperationRouting extends AbstractComponent { // no more preference if (index == -1 || index == preference.length() - 1) { if (awarenessAttributes.length == 0) { - return indexShard.activeInitializingShardsRandomIt(); + if (useAdaptiveReplicaSelection) { + return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); + } else { + return indexShard.activeInitializingShardsRandomIt(); + } } else { return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes); } diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index b56b56c788e..cd49c479305 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; @@ -410,6 +411,7 @@ public final class ClusterSettings extends AbstractScopedSettings { ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING, FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE, Node.BREAKER_TYPE_KEY, + OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, IndexGraveyard.SETTING_MAX_TOMBSTONES ))); } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index b37a6e14f02..45d9a208284 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -92,10 +92,6 @@ public class EsExecutors { public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize, int maxQueueSize, int frameSize, TimeValue targetedResponseTime, ThreadFactory threadFactory, ThreadContext contextHolder) { - if (initialQueueCapacity == minQueueSize && initialQueueCapacity == maxQueueSize) { - return newFixed(name, size, initialQueueCapacity, threadFactory, contextHolder); - } - if (initialQueueCapacity <= 0) { throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " + initialQueueCapacity); diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java index 1f694d73fa7..2d1be51824e 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java @@ -79,9 +79,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto this.minQueueSize = minQueueSize; this.maxQueueSize = maxQueueSize; this.targetedResponseTimeNanos = targetedResponseTime.getNanos(); - // We choose to start the EWMA with the targeted response time, reasoning that it is a - // better start point for a realistic task execution time than starting at 0 - this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, targetedResponseTimeNanos); + this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); logger.debug("thread pool [{}] will adjust queue by [{}] when determining automatic queue size", name, QUEUE_ADJUSTMENT_AMOUNT); } diff --git a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java index 1afbd3b2997..6fea0e2e1c0 100644 --- a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java +++ b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java @@ -31,7 +31,9 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import java.util.Collections; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; @@ -65,13 +67,11 @@ public final class ResponseCollectorService extends AbstractComponent implements } public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNanos, long avgServiceTimeNanos) { - NodeStatistics nodeStats = nodeIdToStats.get(nodeId); nodeIdToStats.compute(nodeId, (id, ns) -> { if (ns == null) { ExponentiallyWeightedMovingAverage queueEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, queueSize); ExponentiallyWeightedMovingAverage responseEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, responseTimeNanos); - NodeStatistics newStats = new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos); - return newStats; + return new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos); } else { ns.queueSize.addValue((double) queueSize); ns.responseTime.addValue((double) responseTimeNanos); @@ -82,39 +82,96 @@ public final class ResponseCollectorService extends AbstractComponent implements } public Map getAllNodeStatistics() { + final int clientNum = nodeIdToStats.size(); // Transform the mutable object internally used for accounting into the computed version Map nodeStats = new HashMap<>(nodeIdToStats.size()); nodeIdToStats.forEach((k, v) -> { - nodeStats.put(k, new ComputedNodeStats(v)); + nodeStats.put(k, new ComputedNodeStats(clientNum, v)); }); return nodeStats; } + /** + * Optionally return a {@code NodeStatistics} for the given nodeid, if + * response information exists for the given node. Returns an empty + * {@code Optional} if the node was not found. + */ + public Optional getNodeStatistics(final String nodeId) { + final int clientNum = nodeIdToStats.size(); + return Optional.ofNullable(nodeIdToStats.get(nodeId)).map(ns -> new ComputedNodeStats(clientNum, ns)); + } + /** * Struct-like class encapsulating a point-in-time snapshot of a particular * node's statistics. This includes the EWMA of queue size, response time, * and service time. */ public static class ComputedNodeStats { + // We store timestamps with nanosecond precision, however, the + // formula specifies milliseconds, therefore we need to convert + // the values so the times don't unduely weight the formula + private final double FACTOR = 1000000.0; + private final int clientNum; + + private double cachedRank = 0; + public final String nodeId; - public final double queueSize; + public final int queueSize; public final double responseTime; public final double serviceTime; - ComputedNodeStats(NodeStatistics nodeStats) { + ComputedNodeStats(int clientNum, NodeStatistics nodeStats) { + this.clientNum = clientNum; this.nodeId = nodeStats.nodeId; - this.queueSize = nodeStats.queueSize.getAverage(); + this.queueSize = (int) nodeStats.queueSize.getAverage(); this.responseTime = nodeStats.responseTime.getAverage(); this.serviceTime = nodeStats.serviceTime; } + /** + * Rank this copy of the data, according to the adaptive replica selection formula from the C3 paper + * https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf + */ + private double innerRank(long outstandingRequests) { + // this is a placeholder value, the concurrency compensation is + // defined as the number of outstanding requests from the client + // to the node times the number of clients in the system + double concurrencyCompensation = outstandingRequests * clientNum; + + // Cubic queue adjustment factor. The paper chose 3 though we could + // potentially make this configurable if desired. + int queueAdjustmentFactor = 3; + + // EWMA of queue size + double qBar = queueSize; + double qHatS = 1 + concurrencyCompensation + qBar; + + // EWMA of response time + double rS = responseTime / FACTOR; + // EWMA of service time + double muBarS = serviceTime / FACTOR; + + // The final formula + double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS); + return rank; + } + + public double rank(long outstandingRequests) { + if (cachedRank == 0) { + cachedRank = innerRank(outstandingRequests); + } + return cachedRank; + } + @Override public String toString() { StringBuilder sb = new StringBuilder("ComputedNodeStats["); sb.append(nodeId).append("]("); - sb.append("queue: ").append(queueSize); - sb.append(", response time: ").append(responseTime); - sb.append(", service time: ").append(serviceTime); + sb.append("nodes: ").append(clientNum); + sb.append(", queue: ").append(queueSize); + sb.append(", response time: ").append(String.format(Locale.ROOT, "%.1f", responseTime)); + sb.append(", service time: ").append(String.format(Locale.ROOT, "%.1f", serviceTime)); + sb.append(", rank: ").append(String.format(Locale.ROOT, "%.1f", rank(1))); sb.append(")"); return sb.toString(); } diff --git a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 9fdaae098b8..500612974c8 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -283,7 +283,7 @@ public class QueryPhase implements SearchPhase { ctx.postProcess(result, shouldCollect); } EsThreadPoolExecutor executor = (EsThreadPoolExecutor) - searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);; + searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); if (executor instanceof QueueResizingEsThreadPoolExecutor) { QueueResizingEsThreadPoolExecutor rExecutor = (QueueResizingEsThreadPoolExecutor) executor; queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java index be7ebd4a4c2..498edee12f9 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java @@ -28,8 +28,10 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -490,4 +492,90 @@ public class OperationRoutingTests extends ESTestCase{ } } + public void testAdaptiveReplicaSelection() throws Exception { + final int numIndices = 1; + final int numShards = 1; + final int numReplicas = 2; + final String[] indexNames = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + indexNames[i] = "test" + i; + } + ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indexNames, numShards, numReplicas); + final int numRepeatedSearches = 4; + OperationRouting opRouting = new OperationRouting(Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + opRouting.setUseAdaptiveReplicaSelection(true); + List searchedShards = new ArrayList<>(numShards); + Set selectedNodes = new HashSet<>(numShards); + TestThreadPool threadPool = new TestThreadPool("testThatOnlyNodesSupportNodeIds"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + ResponseCollectorService collector = new ResponseCollectorService(Settings.EMPTY, clusterService); + Map outstandingRequests = new HashMap<>(); + GroupShardsIterator groupIterator = opRouting.searchShards(state, + indexNames, null, null, collector, outstandingRequests); + + assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); + + // Test that the shards use a round-robin pattern when there are no stats + assertThat(groupIterator.get(0).size(), equalTo(numReplicas + 1)); + ShardRouting firstChoice = groupIterator.get(0).nextOrNull(); + assertNotNull(firstChoice); + searchedShards.add(firstChoice); + selectedNodes.add(firstChoice.currentNodeId()); + + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + + assertThat(groupIterator.size(), equalTo(numIndices * numShards)); + ShardRouting secondChoice = groupIterator.get(0).nextOrNull(); + assertNotNull(secondChoice); + searchedShards.add(secondChoice); + selectedNodes.add(secondChoice.currentNodeId()); + + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + + assertThat(groupIterator.size(), equalTo(numIndices * numShards)); + ShardRouting thirdChoice = groupIterator.get(0).nextOrNull(); + assertNotNull(thirdChoice); + searchedShards.add(thirdChoice); + selectedNodes.add(thirdChoice.currentNodeId()); + + // All three shards should have been separate, because there are no stats yet so they're all ranked equally. + assertThat(searchedShards.size(), equalTo(3)); + + // Now let's start adding node metrics, since that will affect which node is chosen + collector.addNodeStatistics("node_0", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos()); + collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(100).nanos(), TimeValue.timeValueMillis(50).nanos()); + collector.addNodeStatistics("node_2", 1, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(200).nanos()); + outstandingRequests.put("node_0", 1L); + outstandingRequests.put("node_1", 1L); + outstandingRequests.put("node_2", 1L); + + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + ShardRouting shardChoice = groupIterator.get(0).nextOrNull(); + // node 1 should be the lowest ranked node to start + assertThat(shardChoice.currentNodeId(), equalTo("node_1")); + + // node 1 starts getting more loaded... + collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + shardChoice = groupIterator.get(0).nextOrNull(); + assertThat(shardChoice.currentNodeId(), equalTo("node_1")); + + // and more loaded... + collector.addNodeStatistics("node_1", 3, TimeValue.timeValueMillis(250).nanos(), TimeValue.timeValueMillis(200).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + shardChoice = groupIterator.get(0).nextOrNull(); + assertThat(shardChoice.currentNodeId(), equalTo("node_1")); + + // and even more + collector.addNodeStatistics("node_1", 4, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(250).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + shardChoice = groupIterator.get(0).nextOrNull(); + // finally, node 2 is choosen instead + assertThat(shardChoice.currentNodeId(), equalTo("node_2")); + + IOUtils.close(clusterService); + terminate(threadPool); + } + } diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java index 5365e1bb909..125cb572ea5 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java @@ -198,26 +198,26 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase { executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(1000000L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(0L)); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(700030L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(30L)); }); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(490050L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(51L)); }); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(343065L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(65L)); }); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(240175L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(75L)); }); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(168153L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(83L)); }); executor.shutdown(); diff --git a/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java b/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java index d620007d2cd..d86d7b46cc7 100644 --- a/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java +++ b/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java @@ -67,7 +67,7 @@ public class ResponseCollectorServiceTests extends ESTestCase { collector.addNodeStatistics("node1", 1, 100, 10); Map nodeStats = collector.getAllNodeStatistics(); assertTrue(nodeStats.containsKey("node1")); - assertThat(nodeStats.get("node1").queueSize, equalTo(1.0)); + assertThat(nodeStats.get("node1").queueSize, equalTo(1)); assertThat(nodeStats.get("node1").responseTime, equalTo(100.0)); assertThat(nodeStats.get("node1").serviceTime, equalTo(10.0)); } @@ -113,7 +113,7 @@ public class ResponseCollectorServiceTests extends ESTestCase { logger.info("--> got stats: {}", nodeStats); for (String nodeId : nodes) { if (nodeStats.containsKey(nodeId)) { - assertThat(nodeStats.get(nodeId).queueSize, greaterThan(0.0)); + assertThat(nodeStats.get(nodeId).queueSize, greaterThan(0)); assertThat(nodeStats.get(nodeId).responseTime, greaterThan(0.0)); assertThat(nodeStats.get(nodeId).serviceTime, greaterThan(0.0)); } diff --git a/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceIT.java b/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceIT.java index 9163ee572cf..6478446a1a2 100644 --- a/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceIT.java +++ b/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; @@ -49,6 +50,13 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; @ESIntegTestCase.ClusterScope(minNumDataNodes = 2) public class SearchPreferenceIT extends ESIntegTestCase { + + @Override + public Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING.getKey(), false).build(); + } + // see #2896 public void testStopOneNodePreferenceWithRedState() throws InterruptedException, IOException { assertAcked(prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", cluster().numDataNodes()+2).put("index.number_of_replicas", 0))); diff --git a/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index feca42e5495..4128c4a6aa6 100644 --- a/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -226,7 +226,7 @@ public class QueryPhaseTests extends IndexShardTestCase { QueryPhase.execute(context, contextSearcher, checkCancelled -> {}, null); QuerySearchResult results = context.queryResult(); - assertThat(results.serviceTimeEWMA(), greaterThan(0L)); + assertThat(results.serviceTimeEWMA(), greaterThanOrEqualTo(0L)); assertThat(results.nodeQueueSize(), greaterThanOrEqualTo(0)); reader.close(); dir.close(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 29bfbff29b2..8ef7500d04d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -338,6 +338,7 @@ public final class InternalTestCluster extends TestCluster { builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b"); // Some tests make use of scripting quite a bit, so increase the limit for integration tests builder.put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000); + builder.put(OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING.getKey(), random.nextBoolean()); if (TEST_NIGHTLY) { builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), RandomNumbers.randomIntBetween(random, 5, 10)); builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), RandomNumbers.randomIntBetween(random, 5, 10));