diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java b/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java index 72c3d5eaab6..0ffad5aa406 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java @@ -61,7 +61,7 @@ public final class SearchExecutionStatsCollector implements ActionListener 0 && queueSize > 0) { + if (serviceTimeEWMA > 0 && queueSize >= 0) { collector.addNodeStatistics(nodeId, queueSize, responseDuration, serviceTimeEWMA); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 755bfe48410..dba382aed6c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -50,6 +51,7 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TaskAwareTransportRequestHandler; import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; @@ -57,6 +59,9 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -80,6 +85,7 @@ public class SearchTransportService extends AbstractComponent { private final TransportService transportService; private final BiFunction responseWrapper; + private final Map clientConnections = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); public SearchTransportService(Settings settings, TransportService transportService, BiFunction responseWrapper) { @@ -131,7 +137,7 @@ public class SearchTransportService extends AbstractComponent { public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, DFS_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, DfsSearchResult::new)); + new ConnectionCountingHandler<>(listener, DfsSearchResult::new, clientConnections, connection.getNode().getId())); } public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, @@ -143,25 +149,26 @@ public class SearchTransportService extends AbstractComponent { final ActionListener handler = responseWrapper.apply(connection, listener); transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(handler, supplier)); + new ConnectionCountingHandler<>(handler, supplier, clientConnections, connection.getNode().getId())); } public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, QUERY_ID_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, QuerySearchResult::new)); + new ConnectionCountingHandler<>(listener, QuerySearchResult::new, clientConnections, connection.getNode().getId())); } public void sendExecuteScrollQuery(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, QUERY_SCROLL_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new)); + new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, clientConnections, connection.getNode().getId())); } public void sendExecuteScrollFetch(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, QUERY_FETCH_SCROLL_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new)); + new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new, + clientConnections, connection.getNode().getId())); } public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task, @@ -177,22 +184,31 @@ public class SearchTransportService extends AbstractComponent { private void sendExecuteFetch(Transport.Connection connection, String action, final ShardFetchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, action, request, task, - new ActionListenerResponseHandler<>(listener, FetchSearchResult::new)); + new ConnectionCountingHandler<>(listener, FetchSearchResult::new, clientConnections, connection.getNode().getId())); } /** * Used by {@link TransportSearchAction} to send the expand queries (field collapsing). */ void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task, - final ActionListener listener) { - transportService.sendChildRequest(transportService.getConnection(transportService.getLocalNode()), MultiSearchAction.NAME, request, - task, new ActionListenerResponseHandler<>(listener, MultiSearchResponse::new)); + final ActionListener listener) { + final Transport.Connection connection = transportService.getConnection(transportService.getLocalNode()); + transportService.sendChildRequest(connection, MultiSearchAction.NAME, request, task, + new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, clientConnections, connection.getNode().getId())); } public RemoteClusterService getRemoteClusterService() { return transportService.getRemoteClusterService(); } + /** + * Return a map of nodeId to pending number of search requests. + * This is a snapshot of the current pending search and not a live map. + */ + public Map getPendingSearchRequests() { + return new HashMap<>(clientConnections); + } + static class ScrollFreeContextRequest extends TransportRequest { private long id; @@ -486,4 +502,47 @@ public class SearchTransportService extends AbstractComponent { return transportService.getRemoteClusterService().getConnection(node, clusterAlias); } } + + final class ConnectionCountingHandler extends ActionListenerResponseHandler { + private final Map clientConnections; + private final String nodeId; + + ConnectionCountingHandler(final ActionListener listener, final Supplier responseSupplier, + final Map clientConnections, final String nodeId) { + super(listener, responseSupplier); + this.clientConnections = clientConnections; + this.nodeId = nodeId; + // Increment the number of connections for this node by one + clientConnections.compute(nodeId, (id, conns) -> conns == null ? 1 : conns + 1); + } + + @Override + public void handleResponse(Response response) { + super.handleResponse(response); + // Decrement the number of connections or remove it entirely if there are no more connections + // We need to remove the entry here so we don't leak when nodes go away forever + assert assertNodePresent(); + clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1); + } + + @Override + public void handleException(TransportException e) { + super.handleException(e); + // Decrement the number of connections or remove it entirely if there are no more connections + // We need to remove the entry here so we don't leak when nodes go away forever + assert assertNodePresent(); + clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1); + } + + private boolean assertNodePresent() { + clientConnections.compute(nodeId, (id, conns) -> { + assert conns != null : "number of connections for " + id + " is null, but should be an integer"; + assert conns >= 1 : "number of connections for " + id + " should be >= 1 but was " + conns; + return conns; + }); + // Always return true, there is additional asserting here, the boolean is just so this + // can be skipped when assertions are not enabled + return true; + } + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 795ccdd3f20..8400707e370 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -284,8 +284,9 @@ public class TransportSearchAction extends HandledTransportAction nodeSearchCounts = searchTransportService.getPendingSearchRequests(); GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState, - concreteIndices, routingMap, searchRequest.preference()); + concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts); GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, localIndices, remoteShardIterators); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 8ed06736b6b..4376980eca8 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -29,18 +29,24 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.ResponseCollectorService; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; +import java.util.OptionalDouble; import java.util.Set; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; @@ -261,6 +267,165 @@ public class IndexShardRoutingTable implements Iterable { return new PlainShardIterator(shardId, ordered); } + /** + * Returns an iterator over active and initializing shards, ordered by the adaptive replica + * selection forumla. Making sure though that its random within the active shards of the same + * (or missing) rank, and initializing shards are the last to iterate through. + */ + public ShardIterator activeInitializingShardsRankedIt(@Nullable ResponseCollectorService collector, + @Nullable Map nodeSearchCounts) { + final int seed = shuffler.nextSeed(); + if (allInitializingShards.isEmpty()) { + return new PlainShardIterator(shardId, + rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts)); + } + + ArrayList ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size()); + List rankedActiveShards = + rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts); + ordered.addAll(rankedActiveShards); + List rankedInitializingShards = + rankShardsAndUpdateStats(allInitializingShards, collector, nodeSearchCounts); + ordered.addAll(rankedInitializingShards); + return new PlainShardIterator(shardId, ordered); + } + + private static Set getAllNodeIds(final List shards) { + final Set nodeIds = new HashSet<>(); + for (ShardRouting shard : shards) { + nodeIds.add(shard.currentNodeId()); + } + return nodeIds; + } + + private static Map> + getNodeStats(final Set nodeIds, final ResponseCollectorService collector) { + + final Map> nodeStats = new HashMap<>(nodeIds.size()); + for (String nodeId : nodeIds) { + nodeStats.put(nodeId, collector.getNodeStatistics(nodeId)); + } + return nodeStats; + } + + private static Map rankNodes(final Map> nodeStats, + final Map nodeSearchCounts) { + final Map nodeRanks = new HashMap<>(nodeStats.size()); + for (Map.Entry> entry : nodeStats.entrySet()) { + Optional maybeStats = entry.getValue(); + maybeStats.ifPresent(stats -> { + final String nodeId = entry.getKey(); + nodeRanks.put(nodeId, stats.rank(nodeSearchCounts.getOrDefault(nodeId, 1L))); + }); + } + return nodeRanks; + } + + /** + * Adjust the for all other nodes' collected stats. In the original ranking paper there is no need to adjust other nodes' stats because + * Cassandra sends occasional requests to all copies of the data, so their stats will be updated during that broadcast phase. In + * Elasticsearch, however, we do not have that sort of broadcast-to-all behavior. In order to prevent a node that gets a high score and + * then never gets any more requests, we must ensure it eventually returns to a more normal score and can be a candidate for serving + * requests. + * + * This adjustment takes the "winning" node's statistics and adds the average of those statistics with each non-winning node. Let's say + * the winning node had a queue size of 10 and a non-winning node had a queue of 18. The average queue size is (10 + 18) / 2 = 14 so the + * non-winning node will have statistics added for a queue size of 14. This is repeated for the response time and service times as well. + */ + private static void adjustStats(final ResponseCollectorService collector, + final Map> nodeStats, + final String minNodeId, + final ResponseCollectorService.ComputedNodeStats minStats) { + if (minNodeId != null) { + for (Map.Entry> entry : nodeStats.entrySet()) { + final String nodeId = entry.getKey(); + final Optional maybeStats = entry.getValue(); + if (nodeId.equals(minNodeId) == false && maybeStats.isPresent()) { + final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get(); + final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2; + final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2; + final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2; + collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService); + } + } + } + } + + private static List rankShardsAndUpdateStats(List shards, final ResponseCollectorService collector, + final Map nodeSearchCounts) { + if (collector == null || nodeSearchCounts == null || shards.size() <= 1) { + return shards; + } + + // Retrieve which nodes we can potentially send the query to + final Set nodeIds = getAllNodeIds(shards); + final int nodeCount = nodeIds.size(); + + final Map> nodeStats = getNodeStats(nodeIds, collector); + + // Retrieve all the nodes the shards exist on + final Map nodeRanks = rankNodes(nodeStats, nodeSearchCounts); + + // sort all shards based on the shard rank + ArrayList sortedShards = new ArrayList<>(shards); + Collections.sort(sortedShards, new NodeRankComparator(nodeRanks)); + + // adjust the non-winner nodes' stats so they will get a chance to receive queries + if (sortedShards.size() > 1) { + ShardRouting minShard = sortedShards.get(0); + // If the winning shard is not started we are ranking initializing + // shards, don't bother to do adjustments + if (minShard.started()) { + String minNodeId = minShard.currentNodeId(); + Optional maybeMinStats = nodeStats.get(minNodeId); + if (maybeMinStats.isPresent()) { + adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get()); + // Increase the number of searches for the "winning" node by one. + // Note that this doesn't actually affect the "real" counts, instead + // it only affects the captured node search counts, which is + // captured once for each query in TransportSearchAction + nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1); + } + } + } + + return sortedShards; + } + + private static class NodeRankComparator implements Comparator { + private final Map nodeRanks; + + NodeRankComparator(Map nodeRanks) { + this.nodeRanks = nodeRanks; + } + + @Override + public int compare(ShardRouting s1, ShardRouting s2) { + if (s1.currentNodeId().equals(s2.currentNodeId())) { + // these shards on the the same node + return 0; + } + Double shard1rank = nodeRanks.get(s1.currentNodeId()); + Double shard2rank = nodeRanks.get(s2.currentNodeId()); + if (shard1rank != null) { + if (shard2rank != null) { + return shard1rank.compareTo(shard2rank); + } else { + // place non-nulls after null values + return 1; + } + } else { + if (shard2rank != null) { + // place nulls before non-null values + return -1; + } else { + // Both nodes do not have stats, they are equal + return 0; + } + } + } + } + /** * Returns true if no primaries are active or initializing for this shard */ diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 8a88ee1751a..4b3f254c9f5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -28,10 +28,12 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.node.ResponseCollectorService; import java.util.ArrayList; import java.util.Arrays; @@ -43,13 +45,24 @@ import java.util.stream.Collectors; public class OperationRouting extends AbstractComponent { + public static final Setting USE_ADAPTIVE_REPLICA_SELECTION_SETTING = + Setting.boolSetting("cluster.routing.use_adaptive_replica_selection", false, + Setting.Property.Dynamic, Setting.Property.NodeScope); + private String[] awarenessAttributes; + private boolean useAdaptiveReplicaSelection; public OperationRouting(Settings settings, ClusterSettings clusterSettings) { super(settings); this.awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); + this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes); + clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection); + } + + void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { + this.useAdaptiveReplicaSelection = useAdaptiveReplicaSelection; } private void setAwarenessAttributes(String[] awarenessAttributes) { @@ -61,19 +74,33 @@ public class OperationRouting extends AbstractComponent { } public ShardIterator getShards(ClusterState clusterState, String index, String id, @Nullable String routing, @Nullable String preference) { - return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference); + return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, null, null); } public ShardIterator getShards(ClusterState clusterState, String index, int shardId, @Nullable String preference) { final IndexShardRoutingTable indexShard = clusterState.getRoutingTable().shardRoutingTable(index, shardId); - return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference); + return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, null, null); } - public GroupShardsIterator searchShards(ClusterState clusterState, String[] concreteIndices, @Nullable Map> routing, @Nullable String preference) { + public GroupShardsIterator searchShards(ClusterState clusterState, + String[] concreteIndices, + @Nullable Map> routing, + @Nullable String preference) { + return searchShards(clusterState, concreteIndices, routing, preference, null, null); + } + + + public GroupShardsIterator searchShards(ClusterState clusterState, + String[] concreteIndices, + @Nullable Map> routing, + @Nullable String preference, + @Nullable ResponseCollectorService collectorService, + @Nullable Map nodeCounts) { final Set shards = computeTargetedShards(clusterState, concreteIndices, routing); final Set set = new HashSet<>(shards.size()); for (IndexShardRoutingTable shard : shards) { - ShardIterator iterator = preferenceActiveShardIterator(shard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference); + ShardIterator iterator = preferenceActiveShardIterator(shard, + clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, collectorService, nodeCounts); if (iterator != null) { set.add(iterator); } @@ -107,10 +134,17 @@ public class OperationRouting extends AbstractComponent { return set; } - private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId, DiscoveryNodes nodes, @Nullable String preference) { + private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId, + DiscoveryNodes nodes, @Nullable String preference, + @Nullable ResponseCollectorService collectorService, + @Nullable Map nodeCounts) { if (preference == null || preference.isEmpty()) { if (awarenessAttributes.length == 0) { - return indexShard.activeInitializingShardsRandomIt(); + if (useAdaptiveReplicaSelection) { + return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); + } else { + return indexShard.activeInitializingShardsRandomIt(); + } } else { return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes); } @@ -141,7 +175,11 @@ public class OperationRouting extends AbstractComponent { // no more preference if (index == -1 || index == preference.length() - 1) { if (awarenessAttributes.length == 0) { - return indexShard.activeInitializingShardsRandomIt(); + if (useAdaptiveReplicaSelection) { + return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); + } else { + return indexShard.activeInitializingShardsRandomIt(); + } } else { return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes); } diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index b56b56c788e..cd49c479305 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; @@ -410,6 +411,7 @@ public final class ClusterSettings extends AbstractScopedSettings { ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING, FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE, Node.BREAKER_TYPE_KEY, + OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, IndexGraveyard.SETTING_MAX_TOMBSTONES ))); } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index b37a6e14f02..45d9a208284 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -92,10 +92,6 @@ public class EsExecutors { public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize, int maxQueueSize, int frameSize, TimeValue targetedResponseTime, ThreadFactory threadFactory, ThreadContext contextHolder) { - if (initialQueueCapacity == minQueueSize && initialQueueCapacity == maxQueueSize) { - return newFixed(name, size, initialQueueCapacity, threadFactory, contextHolder); - } - if (initialQueueCapacity <= 0) { throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " + initialQueueCapacity); diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java index 1f694d73fa7..2d1be51824e 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java @@ -79,9 +79,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto this.minQueueSize = minQueueSize; this.maxQueueSize = maxQueueSize; this.targetedResponseTimeNanos = targetedResponseTime.getNanos(); - // We choose to start the EWMA with the targeted response time, reasoning that it is a - // better start point for a realistic task execution time than starting at 0 - this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, targetedResponseTimeNanos); + this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); logger.debug("thread pool [{}] will adjust queue by [{}] when determining automatic queue size", name, QUEUE_ADJUSTMENT_AMOUNT); } diff --git a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java index 1afbd3b2997..6fea0e2e1c0 100644 --- a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java +++ b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java @@ -31,7 +31,9 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import java.util.Collections; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; @@ -65,13 +67,11 @@ public final class ResponseCollectorService extends AbstractComponent implements } public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNanos, long avgServiceTimeNanos) { - NodeStatistics nodeStats = nodeIdToStats.get(nodeId); nodeIdToStats.compute(nodeId, (id, ns) -> { if (ns == null) { ExponentiallyWeightedMovingAverage queueEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, queueSize); ExponentiallyWeightedMovingAverage responseEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, responseTimeNanos); - NodeStatistics newStats = new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos); - return newStats; + return new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos); } else { ns.queueSize.addValue((double) queueSize); ns.responseTime.addValue((double) responseTimeNanos); @@ -82,39 +82,96 @@ public final class ResponseCollectorService extends AbstractComponent implements } public Map getAllNodeStatistics() { + final int clientNum = nodeIdToStats.size(); // Transform the mutable object internally used for accounting into the computed version Map nodeStats = new HashMap<>(nodeIdToStats.size()); nodeIdToStats.forEach((k, v) -> { - nodeStats.put(k, new ComputedNodeStats(v)); + nodeStats.put(k, new ComputedNodeStats(clientNum, v)); }); return nodeStats; } + /** + * Optionally return a {@code NodeStatistics} for the given nodeid, if + * response information exists for the given node. Returns an empty + * {@code Optional} if the node was not found. + */ + public Optional getNodeStatistics(final String nodeId) { + final int clientNum = nodeIdToStats.size(); + return Optional.ofNullable(nodeIdToStats.get(nodeId)).map(ns -> new ComputedNodeStats(clientNum, ns)); + } + /** * Struct-like class encapsulating a point-in-time snapshot of a particular * node's statistics. This includes the EWMA of queue size, response time, * and service time. */ public static class ComputedNodeStats { + // We store timestamps with nanosecond precision, however, the + // formula specifies milliseconds, therefore we need to convert + // the values so the times don't unduely weight the formula + private final double FACTOR = 1000000.0; + private final int clientNum; + + private double cachedRank = 0; + public final String nodeId; - public final double queueSize; + public final int queueSize; public final double responseTime; public final double serviceTime; - ComputedNodeStats(NodeStatistics nodeStats) { + ComputedNodeStats(int clientNum, NodeStatistics nodeStats) { + this.clientNum = clientNum; this.nodeId = nodeStats.nodeId; - this.queueSize = nodeStats.queueSize.getAverage(); + this.queueSize = (int) nodeStats.queueSize.getAverage(); this.responseTime = nodeStats.responseTime.getAverage(); this.serviceTime = nodeStats.serviceTime; } + /** + * Rank this copy of the data, according to the adaptive replica selection formula from the C3 paper + * https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf + */ + private double innerRank(long outstandingRequests) { + // this is a placeholder value, the concurrency compensation is + // defined as the number of outstanding requests from the client + // to the node times the number of clients in the system + double concurrencyCompensation = outstandingRequests * clientNum; + + // Cubic queue adjustment factor. The paper chose 3 though we could + // potentially make this configurable if desired. + int queueAdjustmentFactor = 3; + + // EWMA of queue size + double qBar = queueSize; + double qHatS = 1 + concurrencyCompensation + qBar; + + // EWMA of response time + double rS = responseTime / FACTOR; + // EWMA of service time + double muBarS = serviceTime / FACTOR; + + // The final formula + double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS); + return rank; + } + + public double rank(long outstandingRequests) { + if (cachedRank == 0) { + cachedRank = innerRank(outstandingRequests); + } + return cachedRank; + } + @Override public String toString() { StringBuilder sb = new StringBuilder("ComputedNodeStats["); sb.append(nodeId).append("]("); - sb.append("queue: ").append(queueSize); - sb.append(", response time: ").append(responseTime); - sb.append(", service time: ").append(serviceTime); + sb.append("nodes: ").append(clientNum); + sb.append(", queue: ").append(queueSize); + sb.append(", response time: ").append(String.format(Locale.ROOT, "%.1f", responseTime)); + sb.append(", service time: ").append(String.format(Locale.ROOT, "%.1f", serviceTime)); + sb.append(", rank: ").append(String.format(Locale.ROOT, "%.1f", rank(1))); sb.append(")"); return sb.toString(); } diff --git a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 9fdaae098b8..500612974c8 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -283,7 +283,7 @@ public class QueryPhase implements SearchPhase { ctx.postProcess(result, shouldCollect); } EsThreadPoolExecutor executor = (EsThreadPoolExecutor) - searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);; + searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); if (executor instanceof QueueResizingEsThreadPoolExecutor) { QueueResizingEsThreadPoolExecutor rExecutor = (QueueResizingEsThreadPoolExecutor) executor; queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java index be7ebd4a4c2..498edee12f9 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java @@ -28,8 +28,10 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -490,4 +492,90 @@ public class OperationRoutingTests extends ESTestCase{ } } + public void testAdaptiveReplicaSelection() throws Exception { + final int numIndices = 1; + final int numShards = 1; + final int numReplicas = 2; + final String[] indexNames = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + indexNames[i] = "test" + i; + } + ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indexNames, numShards, numReplicas); + final int numRepeatedSearches = 4; + OperationRouting opRouting = new OperationRouting(Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + opRouting.setUseAdaptiveReplicaSelection(true); + List searchedShards = new ArrayList<>(numShards); + Set selectedNodes = new HashSet<>(numShards); + TestThreadPool threadPool = new TestThreadPool("testThatOnlyNodesSupportNodeIds"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + ResponseCollectorService collector = new ResponseCollectorService(Settings.EMPTY, clusterService); + Map outstandingRequests = new HashMap<>(); + GroupShardsIterator groupIterator = opRouting.searchShards(state, + indexNames, null, null, collector, outstandingRequests); + + assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); + + // Test that the shards use a round-robin pattern when there are no stats + assertThat(groupIterator.get(0).size(), equalTo(numReplicas + 1)); + ShardRouting firstChoice = groupIterator.get(0).nextOrNull(); + assertNotNull(firstChoice); + searchedShards.add(firstChoice); + selectedNodes.add(firstChoice.currentNodeId()); + + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + + assertThat(groupIterator.size(), equalTo(numIndices * numShards)); + ShardRouting secondChoice = groupIterator.get(0).nextOrNull(); + assertNotNull(secondChoice); + searchedShards.add(secondChoice); + selectedNodes.add(secondChoice.currentNodeId()); + + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + + assertThat(groupIterator.size(), equalTo(numIndices * numShards)); + ShardRouting thirdChoice = groupIterator.get(0).nextOrNull(); + assertNotNull(thirdChoice); + searchedShards.add(thirdChoice); + selectedNodes.add(thirdChoice.currentNodeId()); + + // All three shards should have been separate, because there are no stats yet so they're all ranked equally. + assertThat(searchedShards.size(), equalTo(3)); + + // Now let's start adding node metrics, since that will affect which node is chosen + collector.addNodeStatistics("node_0", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos()); + collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(100).nanos(), TimeValue.timeValueMillis(50).nanos()); + collector.addNodeStatistics("node_2", 1, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(200).nanos()); + outstandingRequests.put("node_0", 1L); + outstandingRequests.put("node_1", 1L); + outstandingRequests.put("node_2", 1L); + + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + ShardRouting shardChoice = groupIterator.get(0).nextOrNull(); + // node 1 should be the lowest ranked node to start + assertThat(shardChoice.currentNodeId(), equalTo("node_1")); + + // node 1 starts getting more loaded... + collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + shardChoice = groupIterator.get(0).nextOrNull(); + assertThat(shardChoice.currentNodeId(), equalTo("node_1")); + + // and more loaded... + collector.addNodeStatistics("node_1", 3, TimeValue.timeValueMillis(250).nanos(), TimeValue.timeValueMillis(200).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + shardChoice = groupIterator.get(0).nextOrNull(); + assertThat(shardChoice.currentNodeId(), equalTo("node_1")); + + // and even more + collector.addNodeStatistics("node_1", 4, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(250).nanos()); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + shardChoice = groupIterator.get(0).nextOrNull(); + // finally, node 2 is choosen instead + assertThat(shardChoice.currentNodeId(), equalTo("node_2")); + + IOUtils.close(clusterService); + terminate(threadPool); + } + } diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java index 5365e1bb909..125cb572ea5 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java @@ -198,26 +198,26 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase { executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(1000000L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(0L)); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(700030L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(30L)); }); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(490050L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(51L)); }); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(343065L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(65L)); }); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(240175L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(75L)); }); executeTask(executor, 1); assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(168153L)); + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(83L)); }); executor.shutdown(); diff --git a/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java b/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java index d620007d2cd..d86d7b46cc7 100644 --- a/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java +++ b/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java @@ -67,7 +67,7 @@ public class ResponseCollectorServiceTests extends ESTestCase { collector.addNodeStatistics("node1", 1, 100, 10); Map nodeStats = collector.getAllNodeStatistics(); assertTrue(nodeStats.containsKey("node1")); - assertThat(nodeStats.get("node1").queueSize, equalTo(1.0)); + assertThat(nodeStats.get("node1").queueSize, equalTo(1)); assertThat(nodeStats.get("node1").responseTime, equalTo(100.0)); assertThat(nodeStats.get("node1").serviceTime, equalTo(10.0)); } @@ -113,7 +113,7 @@ public class ResponseCollectorServiceTests extends ESTestCase { logger.info("--> got stats: {}", nodeStats); for (String nodeId : nodes) { if (nodeStats.containsKey(nodeId)) { - assertThat(nodeStats.get(nodeId).queueSize, greaterThan(0.0)); + assertThat(nodeStats.get(nodeId).queueSize, greaterThan(0)); assertThat(nodeStats.get(nodeId).responseTime, greaterThan(0.0)); assertThat(nodeStats.get(nodeId).serviceTime, greaterThan(0.0)); } diff --git a/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceIT.java b/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceIT.java index 9163ee572cf..6478446a1a2 100644 --- a/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceIT.java +++ b/core/src/test/java/org/elasticsearch/search/preference/SearchPreferenceIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; @@ -49,6 +50,13 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; @ESIntegTestCase.ClusterScope(minNumDataNodes = 2) public class SearchPreferenceIT extends ESIntegTestCase { + + @Override + public Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING.getKey(), false).build(); + } + // see #2896 public void testStopOneNodePreferenceWithRedState() throws InterruptedException, IOException { assertAcked(prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", cluster().numDataNodes()+2).put("index.number_of_replicas", 0))); diff --git a/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index feca42e5495..4128c4a6aa6 100644 --- a/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -226,7 +226,7 @@ public class QueryPhaseTests extends IndexShardTestCase { QueryPhase.execute(context, contextSearcher, checkCancelled -> {}, null); QuerySearchResult results = context.queryResult(); - assertThat(results.serviceTimeEWMA(), greaterThan(0L)); + assertThat(results.serviceTimeEWMA(), greaterThanOrEqualTo(0L)); assertThat(results.nodeQueueSize(), greaterThanOrEqualTo(0)); reader.close(); dir.close(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 29bfbff29b2..8ef7500d04d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -338,6 +338,7 @@ public final class InternalTestCluster extends TestCluster { builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b"); // Some tests make use of scripting quite a bit, so increase the limit for integration tests builder.put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000); + builder.put(OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING.getKey(), random.nextBoolean()); if (TEST_NIGHTLY) { builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), RandomNumbers.randomIntBetween(random, 5, 10)); builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), RandomNumbers.randomIntBetween(random, 5, 10));