diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java b/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java index 67de87b1bb1..d34c8c61d43 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java @@ -27,6 +27,7 @@ import org.elasticsearch.search.SearchShardTarget; * received by this listener. */ abstract class SearchActionListener implements ActionListener { + private final int requestIndex; private final SearchShardTarget searchShardTarget; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java b/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java new file mode 100644 index 00000000000..72c3d5eaab6 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.node.ResponseCollectorService; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.transport.Transport; + +import java.util.Objects; +import java.util.function.BiFunction; + +/** + * A wrapper of search action listeners (search results) that unwraps the query + * result to get the piggybacked queue size and service time EWMA, adding those + * values to the coordinating nodes' {@code ResponseCollectorService}. + */ +public final class SearchExecutionStatsCollector implements ActionListener { + + private final ActionListener listener; + private final String nodeId; + private final ResponseCollectorService collector; + private final long startNanos; + + SearchExecutionStatsCollector(ActionListener listener, + ResponseCollectorService collector, + String nodeId) { + this.listener = Objects.requireNonNull(listener, "listener cannot be null"); + this.collector = Objects.requireNonNull(collector, "response collector cannot be null"); + this.startNanos = System.nanoTime(); + this.nodeId = nodeId; + } + + public static BiFunction makeWrapper(ResponseCollectorService service) { + return (connection, originalListener) -> new SearchExecutionStatsCollector(originalListener, service, connection.getNode().getId()); + } + + @Override + public void onResponse(SearchPhaseResult response) { + QuerySearchResult queryResult = response.queryResult(); + if (nodeId != null && queryResult != null) { + final long serviceTimeEWMA = queryResult.serviceTimeEWMA(); + final int queueSize = queryResult.nodeQueueSize(); + final long responseDuration = System.nanoTime() - startNanos; + // EWMA/queue size may be -1 if the query node doesn't support capturing it + if (serviceTimeEWMA > 0 && queueSize > 0) { + collector.addNodeStatistics(nodeId, queueSize, responseDuration, serviceTimeEWMA); + } + } + listener.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index d9c60438efc..5edfbbedb03 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.function.BiFunction; import java.util.function.Supplier; /** @@ -77,10 +78,13 @@ public class SearchTransportService extends AbstractComponent { public static final String QUERY_CAN_MATCH_NAME = "indices:data/read/search[can_match]"; private final TransportService transportService; + private final BiFunction responseWrapper; - public SearchTransportService(Settings settings, TransportService transportService) { + public SearchTransportService(Settings settings, TransportService transportService, + BiFunction responseWrapper) { super(settings); this.transportService = transportService; + this.responseWrapper = responseWrapper; } public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) { @@ -135,8 +139,10 @@ public class SearchTransportService extends AbstractComponent { // this used to be the QUERY_AND_FETCH which doesn't exist anymore. final boolean fetchDocuments = request.numberOfShards() == 1; Supplier supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new; + + final ActionListener handler = responseWrapper.apply(connection, listener); transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, supplier)); + new ActionListenerResponseHandler<>(handler, supplier)); } public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task, diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java index c24b6899bcc..1f694d73fa7 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java @@ -141,6 +141,13 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto return executionEWMA.getAverage(); } + /** + * Returns the current queue size (operations that are queued) + */ + public int getCurrentQueueSize() { + return workQueue.size(); + } + @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6b027339c9d..8637b79ca35 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -291,6 +291,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl persistMetadata(path, indexSettings, shardRouting, null, logger); } + public ThreadPool getThreadPool() { + return this.threadPool; + } + public Store store() { return this.store; } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 0b0f1a6aaa7..201513373b8 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -29,6 +29,7 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.GenericAction; +import org.elasticsearch.action.search.SearchExecutionStatsCollector; import org.elasticsearch.action.search.SearchPhaseController; import org.elasticsearch.action.search.SearchTransportService; import org.elasticsearch.action.support.TransportAction; @@ -423,8 +424,9 @@ public class Node implements Closeable { final Transport transport = networkModule.getTransportSupplier().get(); final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings()); - final SearchTransportService searchTransportService = new SearchTransportService(settings, - transportService); + final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService); + final SearchTransportService searchTransportService = new SearchTransportService(settings, transportService, + SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); final Consumer httpBind; final HttpServerTransport httpServerTransport; if (networkModule.isHttpEnabled()) { @@ -469,7 +471,8 @@ public class Node implements Closeable { b.bind(MetaStateService.class).toInstance(metaStateService); b.bind(IndicesService.class).toInstance(indicesService); b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService, - threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase())); + threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(), + responseCollectorService)); b.bind(SearchTransportService.class).toInstance(searchTransportService); b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays, scriptModule.getScriptService())); @@ -897,8 +900,9 @@ public class Node implements Closeable { */ protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, - FetchPhase fetchPhase) { - return new SearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase); + FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) { + return new SearchService(clusterService, indicesService, threadPool, + scriptService, bigArrays, fetchPhase, responseCollectorService); } /** diff --git a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java new file mode 100644 index 00000000000..1afbd3b2997 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java @@ -0,0 +1,144 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.node; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +/** + * Collects statistics about queue size, response time, and service time of + * tasks executed on each node, making the EWMA of the values available to the + * coordinating node. + */ +public final class ResponseCollectorService extends AbstractComponent implements ClusterStateListener { + + private static final double ALPHA = 0.3; + + private final ConcurrentMap nodeIdToStats = ConcurrentCollections.newConcurrentMap(); + + public ResponseCollectorService(Settings settings, ClusterService clusterService) { + super(settings); + clusterService.addListener(this); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.nodesRemoved()) { + for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) { + removeNode(removedNode.getId()); + } + } + } + + void removeNode(String nodeId) { + nodeIdToStats.remove(nodeId); + } + + public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNanos, long avgServiceTimeNanos) { + NodeStatistics nodeStats = nodeIdToStats.get(nodeId); + nodeIdToStats.compute(nodeId, (id, ns) -> { + if (ns == null) { + ExponentiallyWeightedMovingAverage queueEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, queueSize); + ExponentiallyWeightedMovingAverage responseEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, responseTimeNanos); + NodeStatistics newStats = new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos); + return newStats; + } else { + ns.queueSize.addValue((double) queueSize); + ns.responseTime.addValue((double) responseTimeNanos); + ns.serviceTime = avgServiceTimeNanos; + return ns; + } + }); + } + + public Map getAllNodeStatistics() { + // Transform the mutable object internally used for accounting into the computed version + Map nodeStats = new HashMap<>(nodeIdToStats.size()); + nodeIdToStats.forEach((k, v) -> { + nodeStats.put(k, new ComputedNodeStats(v)); + }); + return nodeStats; + } + + /** + * Struct-like class encapsulating a point-in-time snapshot of a particular + * node's statistics. This includes the EWMA of queue size, response time, + * and service time. + */ + public static class ComputedNodeStats { + public final String nodeId; + public final double queueSize; + public final double responseTime; + public final double serviceTime; + + ComputedNodeStats(NodeStatistics nodeStats) { + this.nodeId = nodeStats.nodeId; + this.queueSize = nodeStats.queueSize.getAverage(); + this.responseTime = nodeStats.responseTime.getAverage(); + this.serviceTime = nodeStats.serviceTime; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("ComputedNodeStats["); + sb.append(nodeId).append("]("); + sb.append("queue: ").append(queueSize); + sb.append(", response time: ").append(responseTime); + sb.append(", service time: ").append(serviceTime); + sb.append(")"); + return sb.toString(); + } + } + + /** + * Class encapsulating a node's exponentially weighted queue size, response + * time, and service time, however, this class is private and intended only + * to be used for the internal accounting of {@code ResponseCollectorService}. + */ + private static class NodeStatistics { + final String nodeId; + final ExponentiallyWeightedMovingAverage queueSize; + final ExponentiallyWeightedMovingAverage responseTime; + double serviceTime; + + NodeStatistics(String nodeId, + ExponentiallyWeightedMovingAverage queueSizeEWMA, + ExponentiallyWeightedMovingAverage responseTimeEWMA, + double serviceTimeEWMA) { + this.nodeId = nodeId; + this.queueSize = queueSizeEWMA; + this.responseTime = responseTimeEWMA; + this.serviceTime = serviceTimeEWMA; + } + } +} diff --git a/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 8e0536adfb4..815971e27d4 100644 --- a/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -51,6 +51,7 @@ import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.search.NestedHelper; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.similarity.SimilarityService; +import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.search.aggregations.SearchContextAggregations; import org.elasticsearch.search.collapse.CollapseContext; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -81,6 +82,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; final class DefaultSearchContext extends SearchContext { @@ -93,6 +95,7 @@ final class DefaultSearchContext extends SearchContext { private final BigArrays bigArrays; private final IndexShard indexShard; private final IndexService indexService; + private final ResponseCollectorService responseCollectorService; private final ContextIndexSearcher searcher; private final DfsSearchResult dfsResult; private final QuerySearchResult queryResult; @@ -147,6 +150,7 @@ final class DefaultSearchContext extends SearchContext { private final long originNanoTime = System.nanoTime(); private volatile long lastAccessTime = -1; private Profilers profilers; + private ExecutorService searchExecutor; private final Map searchExtBuilders = new HashMap<>(); private final Map, Collector> queryCollectors = new HashMap<>(); @@ -154,8 +158,8 @@ final class DefaultSearchContext extends SearchContext { private FetchPhase fetchPhase; DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, Engine.Searcher engineSearcher, - IndexService indexService, IndexShard indexShard, - BigArrays bigArrays, Counter timeEstimateCounter, TimeValue timeout, FetchPhase fetchPhase) { + IndexService indexService, IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter, + TimeValue timeout, FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) { this.id = id; this.request = request; this.fetchPhase = fetchPhase; @@ -169,6 +173,7 @@ final class DefaultSearchContext extends SearchContext { this.fetchResult = new FetchSearchResult(id, shardTarget); this.indexShard = indexShard; this.indexService = indexService; + this.responseCollectorService = responseCollectorService; this.searcher = new ContextIndexSearcher(engineSearcher, indexService.cache().query(), indexShard.getQueryCachingPolicy()); this.timeEstimateCounter = timeEstimateCounter; this.timeout = timeout; diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 2c5e6c49b41..6f831b6099c 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -52,6 +52,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; +import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.SearchScript; import org.elasticsearch.search.aggregations.AggregationInitializationException; @@ -96,6 +97,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; @@ -131,6 +133,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final ScriptService scriptService; + private final ResponseCollectorService responseCollectorService; + private final BigArrays bigArrays; private final DfsPhase dfsPhase = new DfsPhase(); @@ -152,12 +156,14 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final ConcurrentMapLong activeContexts = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); public SearchService(ClusterService clusterService, IndicesService indicesService, - ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase) { + ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase, + ResponseCollectorService responseCollectorService) { super(clusterService.getSettings()); this.threadPool = threadPool; this.clusterService = clusterService; this.indicesService = indicesService; this.scriptService = scriptService; + this.responseCollectorService = responseCollectorService; this.bigArrays = bigArrays; this.queryPhase = new QueryPhase(settings); this.fetchPhase = fetchPhase; @@ -520,7 +526,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher; final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, - engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase); + engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase, + responseCollectorService); boolean success = false; try { // we clone the query shard context here just for rewriting otherwise we @@ -813,6 +820,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv return this.activeContexts.size(); } + public ResponseCollectorService getResponseCollectorService() { + return this.responseCollectorService; + } + class Reaper implements Runnable { @Override public void run() { diff --git a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 82e572a180e..a944f0d1a51 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -38,6 +38,8 @@ import org.apache.lucene.search.TopDocs; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.QueueResizingEsThreadPoolExecutor; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchPhase; import org.elasticsearch.search.SearchService; @@ -50,8 +52,10 @@ import org.elasticsearch.search.profile.query.InternalProfileCollector; import org.elasticsearch.search.rescore.RescorePhase; import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.suggest.SuggestPhase; +import org.elasticsearch.threadpool.ThreadPool; import java.util.LinkedList; +import java.util.concurrent.ExecutorService; import static org.elasticsearch.search.query.QueryCollectorContext.createCancellableCollectorContext; import static org.elasticsearch.search.query.QueryCollectorContext.createEarlySortingTerminationCollectorContext; @@ -238,6 +242,13 @@ public class QueryPhase implements SearchPhase { for (QueryCollectorContext ctx : collectors) { ctx.postProcess(result, shouldCollect); } + EsThreadPoolExecutor executor = (EsThreadPoolExecutor) + searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);; + if (executor instanceof QueueResizingEsThreadPoolExecutor) { + QueueResizingEsThreadPoolExecutor rExecutor = (QueueResizingEsThreadPoolExecutor) executor; + queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); + queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); + } if (searchContext.getProfilers() != null) { ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers()); result.profileResults(shardResults); diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 8549f42040f..feb3e7876a3 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -21,6 +21,7 @@ package org.elasticsearch.search.query; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TopDocs; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; @@ -58,6 +59,8 @@ public final class QuerySearchResult extends SearchPhaseResult { private boolean hasScoreDocs; private long totalHits; private float maxScore; + private long serviceTimeEWMA = -1; + private int nodeQueueSize = -1; public QuerySearchResult() { } @@ -228,6 +231,24 @@ public final class QuerySearchResult extends SearchPhaseResult { return this; } + public long serviceTimeEWMA() { + return this.serviceTimeEWMA; + } + + public QuerySearchResult serviceTimeEWMA(long serviceTimeEWMA) { + this.serviceTimeEWMA = serviceTimeEWMA; + return this; + } + + public int nodeQueueSize() { + return this.nodeQueueSize; + } + + public QuerySearchResult nodeQueueSize(int nodeQueueSize) { + this.nodeQueueSize = nodeQueueSize; + return this; + } + /** * Returns true if this result has any suggest score docs */ @@ -278,6 +299,13 @@ public final class QuerySearchResult extends SearchPhaseResult { terminatedEarly = in.readOptionalBoolean(); profileShardResults = in.readOptionalWriteable(ProfileShardResult::new); hasProfileResults = profileShardResults != null; + if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { + serviceTimeEWMA = in.readZLong(); + nodeQueueSize = in.readInt(); + } else { + serviceTimeEWMA = -1; + nodeQueueSize = -1; + } } @Override @@ -315,6 +343,10 @@ public final class QuerySearchResult extends SearchPhaseResult { out.writeBoolean(searchTimedOut); out.writeOptionalBoolean(terminatedEarly); out.writeOptionalWriteable(profileShardResults); + if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { + out.writeZLong(serviceTimeEWMA); + out.writeInt(nodeQueueSize); + } } public long getTotalHits() { diff --git a/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 8445fb08fab..e6a17446476 100644 --- a/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -64,7 +64,7 @@ public class AbstractSearchAsyncActionTests extends ESTestCase { Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null, new SearchRequest(), null, new GroupShardsIterator<>(Collections.singletonList( new SearchShardIterator(null, null, Collections.emptyList(), null))), timeProvider, 0, null, - new InitialSearchPhase.ArraySearchPhaseResults<>(10)) { + new InitialSearchPhase.ArraySearchPhaseResults<>(10)) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, final SearchPhaseContext context) { return null; diff --git a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 9993d8af57c..87cebc957c6 100644 --- a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -57,7 +57,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { final boolean shard2 = randomBoolean(); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task, @@ -105,7 +105,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { public void testOldNodesTriggerException() { SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null); + Settings.builder().put("search.remote.connect", false).build(), null, null); DiscoveryNode node = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), VersionUtils.randomVersionBetween(random(), VersionUtils.getFirstVersion(), VersionUtils.getPreviousVersion(Version.V_5_6_0))); SearchAsyncActionTests.MockConnection mockConnection = new SearchAsyncActionTests.MockConnection(node); @@ -124,7 +124,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); final boolean shard1 = randomBoolean(); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task, diff --git a/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java index 5037ffe03f9..bd1d6a85b09 100644 --- a/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java @@ -70,7 +70,7 @@ public class ClearScrollControllerTests extends ESTestCase { } }; List nodesInvoked = new CopyOnWriteArrayList<>(); - SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) { + SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null, null) { @Override public void sendClearAllScrollContexts(Transport.Connection connection, ActionListener listener) { nodesInvoked.add(connection.getNode()); @@ -135,7 +135,7 @@ public class ClearScrollControllerTests extends ESTestCase { } }; List nodesInvoked = new CopyOnWriteArrayList<>(); - SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) { + SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null, null) { @Override public void sendFreeContext(Transport.Connection connection, long contextId, @@ -213,7 +213,7 @@ public class ClearScrollControllerTests extends ESTestCase { } }; List nodesInvoked = new CopyOnWriteArrayList<>(); - SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) { + SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null, null) { @Override public void sendFreeContext(Transport.Connection connection, long contextId, diff --git a/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java index 185dae410d5..15d24b85b49 100644 --- a/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java @@ -58,7 +58,7 @@ public class DfsQueryPhaseTests extends ESTestCase { SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, @@ -115,7 +115,7 @@ public class DfsQueryPhaseTests extends ESTestCase { SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, @@ -171,7 +171,7 @@ public class DfsQueryPhaseTests extends ESTestCase { SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, diff --git a/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java index b27c7ec3955..81a6359997d 100644 --- a/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java @@ -70,7 +70,7 @@ public class ExpandSearchPhaseTests extends ESTestCase { .collect(Collectors.toList())))); mockSearchPhaseContext.getRequest().source().query(originalQuery); mockSearchPhaseContext.searchTransport = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener listener) { @@ -144,7 +144,7 @@ public class ExpandSearchPhaseTests extends ESTestCase { mockSearchPhaseContext.getRequest().source(new SearchSourceBuilder() .collapse(new CollapseBuilder("someField").setInnerHits(new InnerHitBuilder().setName("foobarbaz")))); mockSearchPhaseContext.searchTransport = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener listener) { @@ -185,7 +185,7 @@ public class ExpandSearchPhaseTests extends ESTestCase { public void testSkipPhase() throws IOException { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); mockSearchPhaseContext.searchTransport = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener listener) { @@ -216,7 +216,7 @@ public class ExpandSearchPhaseTests extends ESTestCase { public void testSkipExpandCollapseNoHits() throws IOException { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); mockSearchPhaseContext.searchTransport = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener listener) { diff --git a/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java index 6e5b85745bc..bd38a420f07 100644 --- a/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java @@ -103,7 +103,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { @@ -157,7 +157,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { @@ -210,7 +210,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); } SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { @@ -271,7 +271,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); AtomicInteger numFetches = new AtomicInteger(0); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { @@ -324,7 +324,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index a2b438a66e4..3ee681383cd 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -86,7 +86,7 @@ public class SearchAsyncActionTests extends ESTestCase { } } - SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null); + SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null, null); Map lookup = new HashMap<>(); Map seenShard = new ConcurrentHashMap<>(); lookup.put(primaryNode.getId(), new MockConnection(primaryNode)); @@ -173,7 +173,7 @@ public class SearchAsyncActionTests extends ESTestCase { GroupShardsIterator shardsIter = getShardsIter("idx", new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()), 10, randomBoolean(), primaryNode, replicaNode); - SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null); + SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null, null); Map lookup = new HashMap<>(); Map seenShard = new ConcurrentHashMap<>(); lookup.put(primaryNode.getId(), new MockConnection(primaryNode)); @@ -271,7 +271,7 @@ public class SearchAsyncActionTests extends ESTestCase { new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()), randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode); AtomicInteger numFreedContext = new AtomicInteger(); - SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null) { + SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null, null) { @Override public void sendFreeContext(Transport.Connection connection, long contextId, OriginalIndices originalIndices) { numFreedContext.incrementAndGet(); diff --git a/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java b/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java new file mode 100644 index 00000000000..2ff3d156247 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java @@ -0,0 +1,140 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.node; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +public class ResponseCollectorServiceTests extends ESTestCase { + + private ClusterService clusterService; + private ResponseCollectorService collector; + private ThreadPool threadpool; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadpool = new TestThreadPool("response_collector_tests"); + clusterService = new ClusterService(Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadpool); + collector = new ResponseCollectorService(Settings.EMPTY, clusterService); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadpool.shutdownNow(); + } + + public void testNodeStats() throws Exception { + collector.addNodeStatistics("node1", 1, 100, 10); + Map nodeStats = collector.getAllNodeStatistics(); + assertTrue(nodeStats.containsKey("node1")); + assertThat(nodeStats.get("node1").queueSize, equalTo(1.0)); + assertThat(nodeStats.get("node1").responseTime, equalTo(100.0)); + assertThat(nodeStats.get("node1").serviceTime, equalTo(10.0)); + } + + /* + * Test that concurrently adding values and removing nodes does not cause exceptions + */ + public void testConcurrentAddingAndRemoving() throws Exception { + String[] nodes = new String[] {"a", "b", "c", "d"}; + + final CountDownLatch latch = new CountDownLatch(1); + + Runnable f = () -> { + try { + latch.await(); + } catch (InterruptedException e) { + fail("should not be interrupted"); + } + for (int i = 0; i < randomIntBetween(100, 200); i++) { + if (randomBoolean()) { + collector.removeNode(randomFrom(nodes)); + } + collector.addNodeStatistics(randomFrom(nodes), randomIntBetween(1,100), randomIntBetween(1,100), randomIntBetween(1,100)); + } + }; + + Thread t1 = new Thread(f); + Thread t2 = new Thread(f); + Thread t3 = new Thread(f); + Thread t4 = new Thread(f); + + t1.start(); + t2.start(); + t3.start(); + t4.start(); + latch.countDown(); + t1.join(); + t2.join(); + t3.join(); + t4.join(); + + final Map nodeStats = collector.getAllNodeStatistics(); + logger.info("--> got stats: {}", nodeStats); + for (String nodeId : nodes) { + if (nodeStats.containsKey(nodeId)) { + assertThat(nodeStats.get(nodeId).queueSize, greaterThan(0.0)); + assertThat(nodeStats.get(nodeId).responseTime, greaterThan(0.0)); + assertThat(nodeStats.get(nodeId).serviceTime, greaterThan(0.0)); + } + } + } + + public void testNodeRemoval() throws Exception { + collector.addNodeStatistics("node1", randomIntBetween(1,100), randomIntBetween(1,100), randomIntBetween(1,100)); + collector.addNodeStatistics("node2", randomIntBetween(1,100), randomIntBetween(1,100), randomIntBetween(1,100)); + + ClusterState previousState = ClusterState.builder(new ClusterName("cluster")).nodes(DiscoveryNodes.builder() + .add(DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9200), "node1")) + .add(DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9201), "node2"))) + .build(); + ClusterState newState = ClusterState.builder(previousState).nodes(DiscoveryNodes.builder(previousState.nodes()) + .remove("node2")).build(); + ClusterChangedEvent event = new ClusterChangedEvent("test", newState, previousState); + + collector.clusterChanged(event); + final Map nodeStats = collector.getAllNodeStatistics(); + assertTrue(nodeStats.containsKey("node1")); + assertFalse(nodeStats.containsKey("node2")); + } +} diff --git a/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index 016406c6129..3055cda803d 100644 --- a/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -49,11 +49,13 @@ import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.ParsedQuery; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.sort.SortAndFormats; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TestSearchContext; import java.io.IOException; @@ -62,14 +64,35 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; +import static org.mockito.Mockito.mock; -public class QueryPhaseTests extends ESTestCase { +public class QueryPhaseTests extends IndexShardTestCase { + + private IndexShard indexShard; + + @Override + public Settings threadPoolSettings() { + return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.search.min_queue_size", 10).build(); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + indexShard = newShard(true); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + closeShards(indexShard); + } private void countTestCase(Query query, IndexReader reader, boolean shouldCollect) throws Exception { - TestSearchContext context = new TestSearchContext(null); + TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(query)); context.setSize(0); context.setTask(new SearchTask(123L, "", "", "", null)); @@ -139,7 +162,7 @@ public class QueryPhaseTests extends ESTestCase { } public void testPostFilterDisablesCountOptimization() throws Exception { - TestSearchContext context = new TestSearchContext(null); + TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(0); context.setTask(new SearchTask(123L, "", "", "", null)); @@ -163,7 +186,7 @@ public class QueryPhaseTests extends ESTestCase { } public void testMinScoreDisablesCountOptimization() throws Exception { - TestSearchContext context = new TestSearchContext(null); + TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(0); context.setTask(new SearchTask(123L, "", "", "", null)); @@ -186,6 +209,30 @@ public class QueryPhaseTests extends ESTestCase { assertTrue(collected.get()); } + public void testQueryCapturesThreadPoolStats() throws Exception { + TestSearchContext context = new TestSearchContext(null, indexShard); + context.setTask(new SearchTask(123L, "", "", "", null)); + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(); + RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc); + final int numDocs = scaledRandomIntBetween(100, 200); + for (int i = 0; i < numDocs; ++i) { + w.addDocument(new Document()); + } + w.close(); + IndexReader reader = DirectoryReader.open(dir); + IndexSearcher contextSearcher = new IndexSearcher(reader); + + QueryPhase.execute(context, contextSearcher, null); + QuerySearchResult results = context.queryResult(); + assertThat(results.serviceTimeEWMA(), greaterThan(0L)); + assertThat(results.nodeQueueSize(), greaterThanOrEqualTo(0)); + reader.close(); + dir.close(); + } + public void testInOrderScrollOptimization() throws Exception { Directory dir = newDirectory(); final Sort sort = new Sort(new SortField("rank", SortField.Type.INT)); @@ -206,7 +253,7 @@ public class QueryPhaseTests extends ESTestCase { } }; - TestSearchContext context = new TestSearchContext(null); + TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); ScrollContext scrollContext = new ScrollContext(); scrollContext.lastEmittedDoc = null; @@ -251,7 +298,7 @@ public class QueryPhaseTests extends ESTestCase { w.addDocument(doc); } w.close(); - TestSearchContext context = new TestSearchContext(null); + TestSearchContext context = new TestSearchContext(null, indexShard); context.setTask(new SearchTask(123L, "", "", "", null)); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.terminateAfter(1); @@ -360,7 +407,7 @@ public class QueryPhaseTests extends ESTestCase { } w.close(); - TestSearchContext context = new TestSearchContext(null); + TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(1); context.setTask(new SearchTask(123L, "", "", "", null)); @@ -454,7 +501,7 @@ public class QueryPhaseTests extends ESTestCase { } w.close(); - TestSearchContext context = new TestSearchContext(null); + TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); ScrollContext scrollContext = new ScrollContext(); scrollContext.lastEmittedDoc = null; diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 0497b7702cb..d0ce47c97fc 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -118,7 +118,7 @@ public abstract class IndexShardTestCase extends ESTestCase { @Override public void setUp() throws Exception { super.setUp(); - threadPool = new TestThreadPool(getClass().getName()); + threadPool = new TestThreadPool(getClass().getName(), threadPoolSettings()); primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards } @@ -131,6 +131,10 @@ public abstract class IndexShardTestCase extends ESTestCase { } } + public Settings threadPoolSettings() { + return Settings.EMPTY; + } + private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { final ShardId shardId = shardPath.getShardId(); final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 2e0aa98a920..311b1778309 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -89,9 +89,10 @@ public class MockNode extends Node { @Override protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, - FetchPhase fetchPhase) { + FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) { if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) { - return super.newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase); + return super.newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, + responseCollectorService); } return new MockSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase); } diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index bf300889cd5..b9d9ff3cfc9 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -69,7 +69,7 @@ public class MockSearchService extends SearchService { public MockSearchService(ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase) { - super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase); + super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index f6908db8119..0eaa3a57df0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1710,6 +1710,12 @@ public abstract class ESIntegTestCase extends ESTestCase { .put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), nodeOrdinal % 2 == 0) // wait short time for other active shards before actually deleting, default 30s not needed in tests .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS)); + if (rarely()) { + // Sometimes adjust the minimum search thread pool size, causing + // QueueResizingEsThreadPoolExecutor to be used instead of a regular + // fixed thread pool + builder.put("thread_pool.search.min_queue_size", 100); + } return builder.build(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java index 038a21c28e8..390c9f9772e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java @@ -24,8 +24,10 @@ import org.apache.lucene.search.Query; import org.apache.lucene.util.Counter; import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; import org.elasticsearch.index.engine.Engine; @@ -106,12 +108,16 @@ public class TestSearchContext extends SearchContext { } public TestSearchContext(QueryShardContext queryShardContext) { + this(queryShardContext, null); + } + + public TestSearchContext(QueryShardContext queryShardContext, IndexShard indexShard) { this.bigArrays = null; this.indexService = null; this.indexFieldDataService = null; this.threadPool = null; this.fixedBitSetFilterCache = null; - this.indexShard = null; + this.indexShard = indexShard; this.queryShardContext = queryShardContext; } diff --git a/test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java b/test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java index 0d525f7f59a..f40820cc877 100644 --- a/test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java +++ b/test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java @@ -25,7 +25,11 @@ import org.elasticsearch.node.Node; public class TestThreadPool extends ThreadPool { public TestThreadPool(String name) { - super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).build()); + this(name, Settings.EMPTY); + } + + public TestThreadPool(String name, Settings settings) { + super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build()); } }