Implement adaptive replica selection (#26128)
* Implement adaptive replica selection This implements the selection algorithm described in the C3 paper for determining which copy of the data a query should be routed to. By using the service time EWMA, response time EWMA, and queue size EWMA we calculate the score of a node by piggybacking these metrics with each search request. Since Elasticsearch lacks the "broadcast to every copy" behavior that Cassandra has (as mentioned in the C3 paper) to update metrics after a node has been highly weighted, this implementation adjusts a node's response stats using the average of the its own and the "best" node's metrics. This is so that a long GC or other activity that may cause a node's rank to increase dramatically does not permanently keep a node from having requests routed to it, instead it will eventually lower its score back to the realm where it is a potential candidate for new queries. This feature is off by default and can be turned on with the dynamic setting `cluster.routing.use_adaptive_replica_selection`. Relates to #24915, however instead of `b=3` I used `b=4` (after benchmarking) * Randomly use adaptive replica selection for internal test cluster * Use an action name *prefix* for retrieving pending requests * Add unit test for replica selection * don't use adaptive replica selection in SearchPreferenceIT * Track client connections in a SearchTransportService instead of TransportService * Bind `entry` pieces in local variables * Add javadoc link to C3 paper and javadocs for stat adjustments * Bind entry's key and value to local variables * Remove unneeded actionNamePrefix parameter * Use conns.longValue() instead of cached Long * Add comments about removing entries from the map * Pull out bindings for `entry` in IndexShardRoutingTable * Use .compareTo instead of manually comparing * add assert for connections not being null and gte to 1 * Copy map for pending search connections instead of "live" map * Increase the number of pending search requests used for calculating rank when chosen When a node gets chosen, this increases the number of search counts for the winning node so that it will not be as likely to be chosen again for non-concurrent search requests. * Remove unused HashMap import * Rename rank -> rankShardsAndUpdateStats * Rename rankedActiveInitializingShardsIt -> activeInitializingShardsRankedIt * Instead of precalculating winning node, use "winning" shard from ranked list * Sort null ranked nodes before nodes that have a rank
This commit is contained in:
parent
432f162981
commit
c3da66d021
|
@ -61,7 +61,7 @@ public final class SearchExecutionStatsCollector implements ActionListener<Searc
|
|||
final int queueSize = queryResult.nodeQueueSize();
|
||||
final long responseDuration = System.nanoTime() - startNanos;
|
||||
// EWMA/queue size may be -1 if the query node doesn't support capturing it
|
||||
if (serviceTimeEWMA > 0 && queueSize > 0) {
|
||||
if (serviceTimeEWMA > 0 && queueSize >= 0) {
|
||||
collector.addNodeStatistics(nodeId, queueSize, responseDuration, serviceTimeEWMA);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.search.SearchPhaseResult;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.search.dfs.DfsSearchResult;
|
||||
|
@ -50,6 +51,7 @@ import org.elasticsearch.transport.Transport;
|
|||
import org.elasticsearch.transport.TransportActionProxy;
|
||||
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
|
@ -57,6 +59,9 @@ import org.elasticsearch.transport.TransportService;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -80,6 +85,7 @@ public class SearchTransportService extends AbstractComponent {
|
|||
|
||||
private final TransportService transportService;
|
||||
private final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
|
||||
private final Map<String, Long> clientConnections = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
|
||||
|
||||
public SearchTransportService(Settings settings, TransportService transportService,
|
||||
BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper) {
|
||||
|
@ -131,7 +137,7 @@ public class SearchTransportService extends AbstractComponent {
|
|||
public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
|
||||
final SearchActionListener<DfsSearchResult> listener) {
|
||||
transportService.sendChildRequest(connection, DFS_ACTION_NAME, request, task,
|
||||
new ActionListenerResponseHandler<>(listener, DfsSearchResult::new));
|
||||
new ConnectionCountingHandler<>(listener, DfsSearchResult::new, clientConnections, connection.getNode().getId()));
|
||||
}
|
||||
|
||||
public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
|
||||
|
@ -143,25 +149,26 @@ public class SearchTransportService extends AbstractComponent {
|
|||
|
||||
final ActionListener handler = responseWrapper.apply(connection, listener);
|
||||
transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
|
||||
new ActionListenerResponseHandler<>(handler, supplier));
|
||||
new ConnectionCountingHandler<>(handler, supplier, clientConnections, connection.getNode().getId()));
|
||||
}
|
||||
|
||||
public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task,
|
||||
final SearchActionListener<QuerySearchResult> listener) {
|
||||
transportService.sendChildRequest(connection, QUERY_ID_ACTION_NAME, request, task,
|
||||
new ActionListenerResponseHandler<>(listener, QuerySearchResult::new));
|
||||
new ConnectionCountingHandler<>(listener, QuerySearchResult::new, clientConnections, connection.getNode().getId()));
|
||||
}
|
||||
|
||||
public void sendExecuteScrollQuery(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task,
|
||||
final SearchActionListener<ScrollQuerySearchResult> listener) {
|
||||
transportService.sendChildRequest(connection, QUERY_SCROLL_ACTION_NAME, request, task,
|
||||
new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new));
|
||||
new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, clientConnections, connection.getNode().getId()));
|
||||
}
|
||||
|
||||
public void sendExecuteScrollFetch(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task,
|
||||
final SearchActionListener<ScrollQueryFetchSearchResult> listener) {
|
||||
transportService.sendChildRequest(connection, QUERY_FETCH_SCROLL_ACTION_NAME, request, task,
|
||||
new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new));
|
||||
new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new,
|
||||
clientConnections, connection.getNode().getId()));
|
||||
}
|
||||
|
||||
public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task,
|
||||
|
@ -177,22 +184,31 @@ public class SearchTransportService extends AbstractComponent {
|
|||
private void sendExecuteFetch(Transport.Connection connection, String action, final ShardFetchRequest request, SearchTask task,
|
||||
final SearchActionListener<FetchSearchResult> listener) {
|
||||
transportService.sendChildRequest(connection, action, request, task,
|
||||
new ActionListenerResponseHandler<>(listener, FetchSearchResult::new));
|
||||
new ConnectionCountingHandler<>(listener, FetchSearchResult::new, clientConnections, connection.getNode().getId()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Used by {@link TransportSearchAction} to send the expand queries (field collapsing).
|
||||
*/
|
||||
void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task,
|
||||
final ActionListener<MultiSearchResponse> listener) {
|
||||
transportService.sendChildRequest(transportService.getConnection(transportService.getLocalNode()), MultiSearchAction.NAME, request,
|
||||
task, new ActionListenerResponseHandler<>(listener, MultiSearchResponse::new));
|
||||
final ActionListener<MultiSearchResponse> listener) {
|
||||
final Transport.Connection connection = transportService.getConnection(transportService.getLocalNode());
|
||||
transportService.sendChildRequest(connection, MultiSearchAction.NAME, request, task,
|
||||
new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, clientConnections, connection.getNode().getId()));
|
||||
}
|
||||
|
||||
public RemoteClusterService getRemoteClusterService() {
|
||||
return transportService.getRemoteClusterService();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a map of nodeId to pending number of search requests.
|
||||
* This is a snapshot of the current pending search and not a live map.
|
||||
*/
|
||||
public Map<String, Long> getPendingSearchRequests() {
|
||||
return new HashMap<>(clientConnections);
|
||||
}
|
||||
|
||||
static class ScrollFreeContextRequest extends TransportRequest {
|
||||
private long id;
|
||||
|
||||
|
@ -486,4 +502,47 @@ public class SearchTransportService extends AbstractComponent {
|
|||
return transportService.getRemoteClusterService().getConnection(node, clusterAlias);
|
||||
}
|
||||
}
|
||||
|
||||
final class ConnectionCountingHandler<Response extends TransportResponse> extends ActionListenerResponseHandler<Response> {
|
||||
private final Map<String, Long> clientConnections;
|
||||
private final String nodeId;
|
||||
|
||||
ConnectionCountingHandler(final ActionListener<? super Response> listener, final Supplier<Response> responseSupplier,
|
||||
final Map<String, Long> clientConnections, final String nodeId) {
|
||||
super(listener, responseSupplier);
|
||||
this.clientConnections = clientConnections;
|
||||
this.nodeId = nodeId;
|
||||
// Increment the number of connections for this node by one
|
||||
clientConnections.compute(nodeId, (id, conns) -> conns == null ? 1 : conns + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(Response response) {
|
||||
super.handleResponse(response);
|
||||
// Decrement the number of connections or remove it entirely if there are no more connections
|
||||
// We need to remove the entry here so we don't leak when nodes go away forever
|
||||
assert assertNodePresent();
|
||||
clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException e) {
|
||||
super.handleException(e);
|
||||
// Decrement the number of connections or remove it entirely if there are no more connections
|
||||
// We need to remove the entry here so we don't leak when nodes go away forever
|
||||
assert assertNodePresent();
|
||||
clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1);
|
||||
}
|
||||
|
||||
private boolean assertNodePresent() {
|
||||
clientConnections.compute(nodeId, (id, conns) -> {
|
||||
assert conns != null : "number of connections for " + id + " is null, but should be an integer";
|
||||
assert conns >= 1 : "number of connections for " + id + " should be >= 1 but was " + conns;
|
||||
return conns;
|
||||
});
|
||||
// Always return true, there is additional asserting here, the boolean is just so this
|
||||
// can be skipped when assertions are not enabled
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -284,8 +284,9 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
for (int i = 0; i < indices.length; i++) {
|
||||
concreteIndices[i] = indices[i].getName();
|
||||
}
|
||||
Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
|
||||
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
|
||||
concreteIndices, routingMap, searchRequest.preference());
|
||||
concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
|
||||
GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
|
||||
remoteShardIterators);
|
||||
|
||||
|
|
|
@ -29,18 +29,24 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.node.ResponseCollectorService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.OptionalDouble;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
||||
|
@ -261,6 +267,165 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
return new PlainShardIterator(shardId, ordered);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an iterator over active and initializing shards, ordered by the adaptive replica
|
||||
* selection forumla. Making sure though that its random within the active shards of the same
|
||||
* (or missing) rank, and initializing shards are the last to iterate through.
|
||||
*/
|
||||
public ShardIterator activeInitializingShardsRankedIt(@Nullable ResponseCollectorService collector,
|
||||
@Nullable Map<String, Long> nodeSearchCounts) {
|
||||
final int seed = shuffler.nextSeed();
|
||||
if (allInitializingShards.isEmpty()) {
|
||||
return new PlainShardIterator(shardId,
|
||||
rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts));
|
||||
}
|
||||
|
||||
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
|
||||
List<ShardRouting> rankedActiveShards =
|
||||
rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts);
|
||||
ordered.addAll(rankedActiveShards);
|
||||
List<ShardRouting> rankedInitializingShards =
|
||||
rankShardsAndUpdateStats(allInitializingShards, collector, nodeSearchCounts);
|
||||
ordered.addAll(rankedInitializingShards);
|
||||
return new PlainShardIterator(shardId, ordered);
|
||||
}
|
||||
|
||||
private static Set<String> getAllNodeIds(final List<ShardRouting> shards) {
|
||||
final Set<String> nodeIds = new HashSet<>();
|
||||
for (ShardRouting shard : shards) {
|
||||
nodeIds.add(shard.currentNodeId());
|
||||
}
|
||||
return nodeIds;
|
||||
}
|
||||
|
||||
private static Map<String, Optional<ResponseCollectorService.ComputedNodeStats>>
|
||||
getNodeStats(final Set<String> nodeIds, final ResponseCollectorService collector) {
|
||||
|
||||
final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = new HashMap<>(nodeIds.size());
|
||||
for (String nodeId : nodeIds) {
|
||||
nodeStats.put(nodeId, collector.getNodeStatistics(nodeId));
|
||||
}
|
||||
return nodeStats;
|
||||
}
|
||||
|
||||
private static Map<String, Double> rankNodes(final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats,
|
||||
final Map<String, Long> nodeSearchCounts) {
|
||||
final Map<String, Double> nodeRanks = new HashMap<>(nodeStats.size());
|
||||
for (Map.Entry<String, Optional<ResponseCollectorService.ComputedNodeStats>> entry : nodeStats.entrySet()) {
|
||||
Optional<ResponseCollectorService.ComputedNodeStats> maybeStats = entry.getValue();
|
||||
maybeStats.ifPresent(stats -> {
|
||||
final String nodeId = entry.getKey();
|
||||
nodeRanks.put(nodeId, stats.rank(nodeSearchCounts.getOrDefault(nodeId, 1L)));
|
||||
});
|
||||
}
|
||||
return nodeRanks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adjust the for all other nodes' collected stats. In the original ranking paper there is no need to adjust other nodes' stats because
|
||||
* Cassandra sends occasional requests to all copies of the data, so their stats will be updated during that broadcast phase. In
|
||||
* Elasticsearch, however, we do not have that sort of broadcast-to-all behavior. In order to prevent a node that gets a high score and
|
||||
* then never gets any more requests, we must ensure it eventually returns to a more normal score and can be a candidate for serving
|
||||
* requests.
|
||||
*
|
||||
* This adjustment takes the "winning" node's statistics and adds the average of those statistics with each non-winning node. Let's say
|
||||
* the winning node had a queue size of 10 and a non-winning node had a queue of 18. The average queue size is (10 + 18) / 2 = 14 so the
|
||||
* non-winning node will have statistics added for a queue size of 14. This is repeated for the response time and service times as well.
|
||||
*/
|
||||
private static void adjustStats(final ResponseCollectorService collector,
|
||||
final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats,
|
||||
final String minNodeId,
|
||||
final ResponseCollectorService.ComputedNodeStats minStats) {
|
||||
if (minNodeId != null) {
|
||||
for (Map.Entry<String, Optional<ResponseCollectorService.ComputedNodeStats>> entry : nodeStats.entrySet()) {
|
||||
final String nodeId = entry.getKey();
|
||||
final Optional<ResponseCollectorService.ComputedNodeStats> maybeStats = entry.getValue();
|
||||
if (nodeId.equals(minNodeId) == false && maybeStats.isPresent()) {
|
||||
final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get();
|
||||
final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2;
|
||||
final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2;
|
||||
final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2;
|
||||
collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static List<ShardRouting> rankShardsAndUpdateStats(List<ShardRouting> shards, final ResponseCollectorService collector,
|
||||
final Map<String, Long> nodeSearchCounts) {
|
||||
if (collector == null || nodeSearchCounts == null || shards.size() <= 1) {
|
||||
return shards;
|
||||
}
|
||||
|
||||
// Retrieve which nodes we can potentially send the query to
|
||||
final Set<String> nodeIds = getAllNodeIds(shards);
|
||||
final int nodeCount = nodeIds.size();
|
||||
|
||||
final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(nodeIds, collector);
|
||||
|
||||
// Retrieve all the nodes the shards exist on
|
||||
final Map<String, Double> nodeRanks = rankNodes(nodeStats, nodeSearchCounts);
|
||||
|
||||
// sort all shards based on the shard rank
|
||||
ArrayList<ShardRouting> sortedShards = new ArrayList<>(shards);
|
||||
Collections.sort(sortedShards, new NodeRankComparator(nodeRanks));
|
||||
|
||||
// adjust the non-winner nodes' stats so they will get a chance to receive queries
|
||||
if (sortedShards.size() > 1) {
|
||||
ShardRouting minShard = sortedShards.get(0);
|
||||
// If the winning shard is not started we are ranking initializing
|
||||
// shards, don't bother to do adjustments
|
||||
if (minShard.started()) {
|
||||
String minNodeId = minShard.currentNodeId();
|
||||
Optional<ResponseCollectorService.ComputedNodeStats> maybeMinStats = nodeStats.get(minNodeId);
|
||||
if (maybeMinStats.isPresent()) {
|
||||
adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get());
|
||||
// Increase the number of searches for the "winning" node by one.
|
||||
// Note that this doesn't actually affect the "real" counts, instead
|
||||
// it only affects the captured node search counts, which is
|
||||
// captured once for each query in TransportSearchAction
|
||||
nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return sortedShards;
|
||||
}
|
||||
|
||||
private static class NodeRankComparator implements Comparator<ShardRouting> {
|
||||
private final Map<String, Double> nodeRanks;
|
||||
|
||||
NodeRankComparator(Map<String, Double> nodeRanks) {
|
||||
this.nodeRanks = nodeRanks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(ShardRouting s1, ShardRouting s2) {
|
||||
if (s1.currentNodeId().equals(s2.currentNodeId())) {
|
||||
// these shards on the the same node
|
||||
return 0;
|
||||
}
|
||||
Double shard1rank = nodeRanks.get(s1.currentNodeId());
|
||||
Double shard2rank = nodeRanks.get(s2.currentNodeId());
|
||||
if (shard1rank != null) {
|
||||
if (shard2rank != null) {
|
||||
return shard1rank.compareTo(shard2rank);
|
||||
} else {
|
||||
// place non-nulls after null values
|
||||
return 1;
|
||||
}
|
||||
} else {
|
||||
if (shard2rank != null) {
|
||||
// place nulls before non-null values
|
||||
return -1;
|
||||
} else {
|
||||
// Both nodes do not have stats, they are equal
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if no primaries are active or initializing for this shard
|
||||
*/
|
||||
|
|
|
@ -28,10 +28,12 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardNotFoundException;
|
||||
import org.elasticsearch.node.ResponseCollectorService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -43,13 +45,24 @@ import java.util.stream.Collectors;
|
|||
|
||||
public class OperationRouting extends AbstractComponent {
|
||||
|
||||
public static final Setting<Boolean> USE_ADAPTIVE_REPLICA_SELECTION_SETTING =
|
||||
Setting.boolSetting("cluster.routing.use_adaptive_replica_selection", false,
|
||||
Setting.Property.Dynamic, Setting.Property.NodeScope);
|
||||
|
||||
private String[] awarenessAttributes;
|
||||
private boolean useAdaptiveReplicaSelection;
|
||||
|
||||
public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
|
||||
super(settings);
|
||||
this.awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
|
||||
this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings);
|
||||
clusterSettings.addSettingsUpdateConsumer(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
|
||||
this::setAwarenessAttributes);
|
||||
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
|
||||
}
|
||||
|
||||
void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
|
||||
this.useAdaptiveReplicaSelection = useAdaptiveReplicaSelection;
|
||||
}
|
||||
|
||||
private void setAwarenessAttributes(String[] awarenessAttributes) {
|
||||
|
@ -61,19 +74,33 @@ public class OperationRouting extends AbstractComponent {
|
|||
}
|
||||
|
||||
public ShardIterator getShards(ClusterState clusterState, String index, String id, @Nullable String routing, @Nullable String preference) {
|
||||
return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference);
|
||||
return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, null, null);
|
||||
}
|
||||
|
||||
public ShardIterator getShards(ClusterState clusterState, String index, int shardId, @Nullable String preference) {
|
||||
final IndexShardRoutingTable indexShard = clusterState.getRoutingTable().shardRoutingTable(index, shardId);
|
||||
return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference);
|
||||
return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, null, null);
|
||||
}
|
||||
|
||||
public GroupShardsIterator<ShardIterator> searchShards(ClusterState clusterState, String[] concreteIndices, @Nullable Map<String, Set<String>> routing, @Nullable String preference) {
|
||||
public GroupShardsIterator<ShardIterator> searchShards(ClusterState clusterState,
|
||||
String[] concreteIndices,
|
||||
@Nullable Map<String, Set<String>> routing,
|
||||
@Nullable String preference) {
|
||||
return searchShards(clusterState, concreteIndices, routing, preference, null, null);
|
||||
}
|
||||
|
||||
|
||||
public GroupShardsIterator<ShardIterator> searchShards(ClusterState clusterState,
|
||||
String[] concreteIndices,
|
||||
@Nullable Map<String, Set<String>> routing,
|
||||
@Nullable String preference,
|
||||
@Nullable ResponseCollectorService collectorService,
|
||||
@Nullable Map<String, Long> nodeCounts) {
|
||||
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
|
||||
final Set<ShardIterator> set = new HashSet<>(shards.size());
|
||||
for (IndexShardRoutingTable shard : shards) {
|
||||
ShardIterator iterator = preferenceActiveShardIterator(shard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference);
|
||||
ShardIterator iterator = preferenceActiveShardIterator(shard,
|
||||
clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, collectorService, nodeCounts);
|
||||
if (iterator != null) {
|
||||
set.add(iterator);
|
||||
}
|
||||
|
@ -107,10 +134,17 @@ public class OperationRouting extends AbstractComponent {
|
|||
return set;
|
||||
}
|
||||
|
||||
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId, DiscoveryNodes nodes, @Nullable String preference) {
|
||||
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId,
|
||||
DiscoveryNodes nodes, @Nullable String preference,
|
||||
@Nullable ResponseCollectorService collectorService,
|
||||
@Nullable Map<String, Long> nodeCounts) {
|
||||
if (preference == null || preference.isEmpty()) {
|
||||
if (awarenessAttributes.length == 0) {
|
||||
return indexShard.activeInitializingShardsRandomIt();
|
||||
if (useAdaptiveReplicaSelection) {
|
||||
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
|
||||
} else {
|
||||
return indexShard.activeInitializingShardsRandomIt();
|
||||
}
|
||||
} else {
|
||||
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
|
||||
}
|
||||
|
@ -141,7 +175,11 @@ public class OperationRouting extends AbstractComponent {
|
|||
// no more preference
|
||||
if (index == -1 || index == preference.length() - 1) {
|
||||
if (awarenessAttributes.length == 0) {
|
||||
return indexShard.activeInitializingShardsRandomIt();
|
||||
if (useAdaptiveReplicaSelection) {
|
||||
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
|
||||
} else {
|
||||
return indexShard.activeInitializingShardsRandomIt();
|
||||
}
|
||||
} else {
|
||||
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.cluster.NodeConnectionsService;
|
|||
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexGraveyard;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.OperationRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
|
||||
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
|
||||
|
@ -410,6 +411,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING,
|
||||
FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE,
|
||||
Node.BREAKER_TYPE_KEY,
|
||||
OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
|
||||
IndexGraveyard.SETTING_MAX_TOMBSTONES
|
||||
)));
|
||||
}
|
||||
|
|
|
@ -92,10 +92,6 @@ public class EsExecutors {
|
|||
public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize,
|
||||
int maxQueueSize, int frameSize, TimeValue targetedResponseTime,
|
||||
ThreadFactory threadFactory, ThreadContext contextHolder) {
|
||||
if (initialQueueCapacity == minQueueSize && initialQueueCapacity == maxQueueSize) {
|
||||
return newFixed(name, size, initialQueueCapacity, threadFactory, contextHolder);
|
||||
}
|
||||
|
||||
if (initialQueueCapacity <= 0) {
|
||||
throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " +
|
||||
initialQueueCapacity);
|
||||
|
|
|
@ -79,9 +79,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
|
|||
this.minQueueSize = minQueueSize;
|
||||
this.maxQueueSize = maxQueueSize;
|
||||
this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
|
||||
// We choose to start the EWMA with the targeted response time, reasoning that it is a
|
||||
// better start point for a realistic task execution time than starting at 0
|
||||
this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, targetedResponseTimeNanos);
|
||||
this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0);
|
||||
logger.debug("thread pool [{}] will adjust queue by [{}] when determining automatic queue size",
|
||||
name, QUEUE_ADJUSTMENT_AMOUNT);
|
||||
}
|
||||
|
|
|
@ -31,7 +31,9 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -65,13 +67,11 @@ public final class ResponseCollectorService extends AbstractComponent implements
|
|||
}
|
||||
|
||||
public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNanos, long avgServiceTimeNanos) {
|
||||
NodeStatistics nodeStats = nodeIdToStats.get(nodeId);
|
||||
nodeIdToStats.compute(nodeId, (id, ns) -> {
|
||||
if (ns == null) {
|
||||
ExponentiallyWeightedMovingAverage queueEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, queueSize);
|
||||
ExponentiallyWeightedMovingAverage responseEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, responseTimeNanos);
|
||||
NodeStatistics newStats = new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos);
|
||||
return newStats;
|
||||
return new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos);
|
||||
} else {
|
||||
ns.queueSize.addValue((double) queueSize);
|
||||
ns.responseTime.addValue((double) responseTimeNanos);
|
||||
|
@ -82,39 +82,96 @@ public final class ResponseCollectorService extends AbstractComponent implements
|
|||
}
|
||||
|
||||
public Map<String, ComputedNodeStats> getAllNodeStatistics() {
|
||||
final int clientNum = nodeIdToStats.size();
|
||||
// Transform the mutable object internally used for accounting into the computed version
|
||||
Map<String, ComputedNodeStats> nodeStats = new HashMap<>(nodeIdToStats.size());
|
||||
nodeIdToStats.forEach((k, v) -> {
|
||||
nodeStats.put(k, new ComputedNodeStats(v));
|
||||
nodeStats.put(k, new ComputedNodeStats(clientNum, v));
|
||||
});
|
||||
return nodeStats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Optionally return a {@code NodeStatistics} for the given nodeid, if
|
||||
* response information exists for the given node. Returns an empty
|
||||
* {@code Optional} if the node was not found.
|
||||
*/
|
||||
public Optional<ComputedNodeStats> getNodeStatistics(final String nodeId) {
|
||||
final int clientNum = nodeIdToStats.size();
|
||||
return Optional.ofNullable(nodeIdToStats.get(nodeId)).map(ns -> new ComputedNodeStats(clientNum, ns));
|
||||
}
|
||||
|
||||
/**
|
||||
* Struct-like class encapsulating a point-in-time snapshot of a particular
|
||||
* node's statistics. This includes the EWMA of queue size, response time,
|
||||
* and service time.
|
||||
*/
|
||||
public static class ComputedNodeStats {
|
||||
// We store timestamps with nanosecond precision, however, the
|
||||
// formula specifies milliseconds, therefore we need to convert
|
||||
// the values so the times don't unduely weight the formula
|
||||
private final double FACTOR = 1000000.0;
|
||||
private final int clientNum;
|
||||
|
||||
private double cachedRank = 0;
|
||||
|
||||
public final String nodeId;
|
||||
public final double queueSize;
|
||||
public final int queueSize;
|
||||
public final double responseTime;
|
||||
public final double serviceTime;
|
||||
|
||||
ComputedNodeStats(NodeStatistics nodeStats) {
|
||||
ComputedNodeStats(int clientNum, NodeStatistics nodeStats) {
|
||||
this.clientNum = clientNum;
|
||||
this.nodeId = nodeStats.nodeId;
|
||||
this.queueSize = nodeStats.queueSize.getAverage();
|
||||
this.queueSize = (int) nodeStats.queueSize.getAverage();
|
||||
this.responseTime = nodeStats.responseTime.getAverage();
|
||||
this.serviceTime = nodeStats.serviceTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Rank this copy of the data, according to the adaptive replica selection formula from the C3 paper
|
||||
* https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf
|
||||
*/
|
||||
private double innerRank(long outstandingRequests) {
|
||||
// this is a placeholder value, the concurrency compensation is
|
||||
// defined as the number of outstanding requests from the client
|
||||
// to the node times the number of clients in the system
|
||||
double concurrencyCompensation = outstandingRequests * clientNum;
|
||||
|
||||
// Cubic queue adjustment factor. The paper chose 3 though we could
|
||||
// potentially make this configurable if desired.
|
||||
int queueAdjustmentFactor = 3;
|
||||
|
||||
// EWMA of queue size
|
||||
double qBar = queueSize;
|
||||
double qHatS = 1 + concurrencyCompensation + qBar;
|
||||
|
||||
// EWMA of response time
|
||||
double rS = responseTime / FACTOR;
|
||||
// EWMA of service time
|
||||
double muBarS = serviceTime / FACTOR;
|
||||
|
||||
// The final formula
|
||||
double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS);
|
||||
return rank;
|
||||
}
|
||||
|
||||
public double rank(long outstandingRequests) {
|
||||
if (cachedRank == 0) {
|
||||
cachedRank = innerRank(outstandingRequests);
|
||||
}
|
||||
return cachedRank;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder("ComputedNodeStats[");
|
||||
sb.append(nodeId).append("](");
|
||||
sb.append("queue: ").append(queueSize);
|
||||
sb.append(", response time: ").append(responseTime);
|
||||
sb.append(", service time: ").append(serviceTime);
|
||||
sb.append("nodes: ").append(clientNum);
|
||||
sb.append(", queue: ").append(queueSize);
|
||||
sb.append(", response time: ").append(String.format(Locale.ROOT, "%.1f", responseTime));
|
||||
sb.append(", service time: ").append(String.format(Locale.ROOT, "%.1f", serviceTime));
|
||||
sb.append(", rank: ").append(String.format(Locale.ROOT, "%.1f", rank(1)));
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
|
|
|
@ -283,7 +283,7 @@ public class QueryPhase implements SearchPhase {
|
|||
ctx.postProcess(result, shouldCollect);
|
||||
}
|
||||
EsThreadPoolExecutor executor = (EsThreadPoolExecutor)
|
||||
searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);;
|
||||
searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
|
||||
if (executor instanceof QueueResizingEsThreadPoolExecutor) {
|
||||
QueueResizingEsThreadPoolExecutor rExecutor = (QueueResizingEsThreadPoolExecutor) executor;
|
||||
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
|
||||
|
|
|
@ -28,8 +28,10 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.node.ResponseCollectorService;
|
||||
import org.elasticsearch.test.ClusterServiceUtils;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
|
@ -490,4 +492,90 @@ public class OperationRoutingTests extends ESTestCase{
|
|||
}
|
||||
}
|
||||
|
||||
public void testAdaptiveReplicaSelection() throws Exception {
|
||||
final int numIndices = 1;
|
||||
final int numShards = 1;
|
||||
final int numReplicas = 2;
|
||||
final String[] indexNames = new String[numIndices];
|
||||
for (int i = 0; i < numIndices; i++) {
|
||||
indexNames[i] = "test" + i;
|
||||
}
|
||||
ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indexNames, numShards, numReplicas);
|
||||
final int numRepeatedSearches = 4;
|
||||
OperationRouting opRouting = new OperationRouting(Settings.EMPTY,
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
opRouting.setUseAdaptiveReplicaSelection(true);
|
||||
List<ShardRouting> searchedShards = new ArrayList<>(numShards);
|
||||
Set<String> selectedNodes = new HashSet<>(numShards);
|
||||
TestThreadPool threadPool = new TestThreadPool("testThatOnlyNodesSupportNodeIds");
|
||||
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||
ResponseCollectorService collector = new ResponseCollectorService(Settings.EMPTY, clusterService);
|
||||
Map<String, Long> outstandingRequests = new HashMap<>();
|
||||
GroupShardsIterator<ShardIterator> groupIterator = opRouting.searchShards(state,
|
||||
indexNames, null, null, collector, outstandingRequests);
|
||||
|
||||
assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards));
|
||||
|
||||
// Test that the shards use a round-robin pattern when there are no stats
|
||||
assertThat(groupIterator.get(0).size(), equalTo(numReplicas + 1));
|
||||
ShardRouting firstChoice = groupIterator.get(0).nextOrNull();
|
||||
assertNotNull(firstChoice);
|
||||
searchedShards.add(firstChoice);
|
||||
selectedNodes.add(firstChoice.currentNodeId());
|
||||
|
||||
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
|
||||
|
||||
assertThat(groupIterator.size(), equalTo(numIndices * numShards));
|
||||
ShardRouting secondChoice = groupIterator.get(0).nextOrNull();
|
||||
assertNotNull(secondChoice);
|
||||
searchedShards.add(secondChoice);
|
||||
selectedNodes.add(secondChoice.currentNodeId());
|
||||
|
||||
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
|
||||
|
||||
assertThat(groupIterator.size(), equalTo(numIndices * numShards));
|
||||
ShardRouting thirdChoice = groupIterator.get(0).nextOrNull();
|
||||
assertNotNull(thirdChoice);
|
||||
searchedShards.add(thirdChoice);
|
||||
selectedNodes.add(thirdChoice.currentNodeId());
|
||||
|
||||
// All three shards should have been separate, because there are no stats yet so they're all ranked equally.
|
||||
assertThat(searchedShards.size(), equalTo(3));
|
||||
|
||||
// Now let's start adding node metrics, since that will affect which node is chosen
|
||||
collector.addNodeStatistics("node_0", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos());
|
||||
collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(100).nanos(), TimeValue.timeValueMillis(50).nanos());
|
||||
collector.addNodeStatistics("node_2", 1, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(200).nanos());
|
||||
outstandingRequests.put("node_0", 1L);
|
||||
outstandingRequests.put("node_1", 1L);
|
||||
outstandingRequests.put("node_2", 1L);
|
||||
|
||||
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
|
||||
ShardRouting shardChoice = groupIterator.get(0).nextOrNull();
|
||||
// node 1 should be the lowest ranked node to start
|
||||
assertThat(shardChoice.currentNodeId(), equalTo("node_1"));
|
||||
|
||||
// node 1 starts getting more loaded...
|
||||
collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos());
|
||||
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
|
||||
shardChoice = groupIterator.get(0).nextOrNull();
|
||||
assertThat(shardChoice.currentNodeId(), equalTo("node_1"));
|
||||
|
||||
// and more loaded...
|
||||
collector.addNodeStatistics("node_1", 3, TimeValue.timeValueMillis(250).nanos(), TimeValue.timeValueMillis(200).nanos());
|
||||
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
|
||||
shardChoice = groupIterator.get(0).nextOrNull();
|
||||
assertThat(shardChoice.currentNodeId(), equalTo("node_1"));
|
||||
|
||||
// and even more
|
||||
collector.addNodeStatistics("node_1", 4, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(250).nanos());
|
||||
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
|
||||
shardChoice = groupIterator.get(0).nextOrNull();
|
||||
// finally, node 2 is choosen instead
|
||||
assertThat(shardChoice.currentNodeId(), equalTo("node_2"));
|
||||
|
||||
IOUtils.close(clusterService);
|
||||
terminate(threadPool);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -198,26 +198,26 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
|
|||
executor.prestartAllCoreThreads();
|
||||
logger.info("--> executor: {}", executor);
|
||||
|
||||
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(1000000L));
|
||||
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(0L));
|
||||
executeTask(executor, 1);
|
||||
assertBusy(() -> {
|
||||
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(700030L));
|
||||
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(30L));
|
||||
});
|
||||
executeTask(executor, 1);
|
||||
assertBusy(() -> {
|
||||
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(490050L));
|
||||
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(51L));
|
||||
});
|
||||
executeTask(executor, 1);
|
||||
assertBusy(() -> {
|
||||
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(343065L));
|
||||
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(65L));
|
||||
});
|
||||
executeTask(executor, 1);
|
||||
assertBusy(() -> {
|
||||
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(240175L));
|
||||
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(75L));
|
||||
});
|
||||
executeTask(executor, 1);
|
||||
assertBusy(() -> {
|
||||
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(168153L));
|
||||
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(83L));
|
||||
});
|
||||
|
||||
executor.shutdown();
|
||||
|
|
|
@ -67,7 +67,7 @@ public class ResponseCollectorServiceTests extends ESTestCase {
|
|||
collector.addNodeStatistics("node1", 1, 100, 10);
|
||||
Map<String, ResponseCollectorService.ComputedNodeStats> nodeStats = collector.getAllNodeStatistics();
|
||||
assertTrue(nodeStats.containsKey("node1"));
|
||||
assertThat(nodeStats.get("node1").queueSize, equalTo(1.0));
|
||||
assertThat(nodeStats.get("node1").queueSize, equalTo(1));
|
||||
assertThat(nodeStats.get("node1").responseTime, equalTo(100.0));
|
||||
assertThat(nodeStats.get("node1").serviceTime, equalTo(10.0));
|
||||
}
|
||||
|
@ -113,7 +113,7 @@ public class ResponseCollectorServiceTests extends ESTestCase {
|
|||
logger.info("--> got stats: {}", nodeStats);
|
||||
for (String nodeId : nodes) {
|
||||
if (nodeStats.containsKey(nodeId)) {
|
||||
assertThat(nodeStats.get(nodeId).queueSize, greaterThan(0.0));
|
||||
assertThat(nodeStats.get(nodeId).queueSize, greaterThan(0));
|
||||
assertThat(nodeStats.get(nodeId).responseTime, greaterThan(0.0));
|
||||
assertThat(nodeStats.get(nodeId).serviceTime, greaterThan(0.0));
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
|
|||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.routing.OperationRouting;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
@ -49,6 +50,13 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|||
|
||||
@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
|
||||
public class SearchPreferenceIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
public Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
||||
.put(OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING.getKey(), false).build();
|
||||
}
|
||||
|
||||
// see #2896
|
||||
public void testStopOneNodePreferenceWithRedState() throws InterruptedException, IOException {
|
||||
assertAcked(prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", cluster().numDataNodes()+2).put("index.number_of_replicas", 0)));
|
||||
|
|
|
@ -226,7 +226,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
|
|||
|
||||
QueryPhase.execute(context, contextSearcher, checkCancelled -> {}, null);
|
||||
QuerySearchResult results = context.queryResult();
|
||||
assertThat(results.serviceTimeEWMA(), greaterThan(0L));
|
||||
assertThat(results.serviceTimeEWMA(), greaterThanOrEqualTo(0L));
|
||||
assertThat(results.nodeQueueSize(), greaterThanOrEqualTo(0));
|
||||
reader.close();
|
||||
dir.close();
|
||||
|
|
|
@ -338,6 +338,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b");
|
||||
// Some tests make use of scripting quite a bit, so increase the limit for integration tests
|
||||
builder.put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000);
|
||||
builder.put(OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING.getKey(), random.nextBoolean());
|
||||
if (TEST_NIGHTLY) {
|
||||
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), RandomNumbers.randomIntBetween(random, 5, 10));
|
||||
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), RandomNumbers.randomIntBetween(random, 5, 10));
|
||||
|
|
Loading…
Reference in New Issue