From 610ba7e4273473d649a922f5df53f5e692a4d4c3 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 17 Jul 2017 11:04:51 -0600 Subject: [PATCH] Register data node stats from info carried back in search responses (#25430) * Register data node stats from info carried back in search responses This is part of #24915, where we now calculate the EWMA of service time for tasks in the search threadpool, and send that as well as the current queue size back to the coordinating node. The coordinating node now tracks this information for each node in the cluster. This information will be used in the future the determining the best replica a search request should be routed to. This change has no user-visible difference. * Move response time timing into ResponseListenerWrapper * Move ResponseListenerWrapper to ActionListener instead of SearchActionListener Also removes the logger * Move `requestIndex` back to private * De-guice-ify ResponseCollectorService \o/ * Undo all changes to SearchQueryThenFetchAsyncAction * Remove unneeded response collector from TransportSearchAction * Undo all changes to SearchDfsQueryThenFetchAsyncAction * Completely rewrite the inside of ResponseCollectorService's record keeping * Documentation and cleanups for ResponseCollectorService * Add unit test for collection of queue size and service time * Fix Guice construction error * Add basic unit tests for ResponseCollectorService * Fix version constant for the master merge * Fix test compilation after master merge * Add a test for node removal on cluster changed event * Remove integration test as there are now unit tests * Rename ResponseListenerWrapper -> SearchExecutionStatsCollector * Fix line-length * Make classes private and final where appropriate * Pass nodeId into SearchExecutionStatsCollector and use only ActionListener * Get nodeId from connection so searchShardTarget can be private * Remove threadpool from SearchContext, get it from IndexShard instead * Add missing import * Use BiFunction for responseWrapper rather than passing in collector service --- .../action/search/SearchActionListener.java | 1 + .../search/SearchExecutionStatsCollector.java | 75 +++++++++ .../action/search/SearchTransportService.java | 10 +- .../QueueResizingEsThreadPoolExecutor.java | 7 + .../elasticsearch/index/shard/IndexShard.java | 4 + .../java/org/elasticsearch/node/Node.java | 14 +- .../node/ResponseCollectorService.java | 144 ++++++++++++++++++ .../search/DefaultSearchContext.java | 9 +- .../elasticsearch/search/SearchService.java | 15 +- .../search/query/QueryPhase.java | 11 ++ .../search/query/QuerySearchResult.java | 32 ++++ .../AbstractSearchAsyncActionTests.java | 2 +- .../CanMatchPreFilterSearchPhaseTests.java | 6 +- .../search/ClearScrollControllerTests.java | 6 +- .../action/search/DfsQueryPhaseTests.java | 6 +- .../action/search/ExpandSearchPhaseTests.java | 8 +- .../action/search/FetchSearchPhaseTests.java | 10 +- .../action/search/SearchAsyncActionTests.java | 6 +- .../node/ResponseCollectorServiceTests.java | 140 +++++++++++++++++ .../search/query/QueryPhaseTests.java | 65 ++++++-- .../index/shard/IndexShardTestCase.java | 6 +- .../java/org/elasticsearch/node/MockNode.java | 5 +- .../search/MockSearchService.java | 2 +- .../elasticsearch/test/ESIntegTestCase.java | 6 + .../elasticsearch/test/TestSearchContext.java | 8 +- .../threadpool/TestThreadPool.java | 6 +- 26 files changed, 556 insertions(+), 48 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java create mode 100644 core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java create mode 100644 core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java b/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java index 67de87b1bb1..d34c8c61d43 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java @@ -27,6 +27,7 @@ import org.elasticsearch.search.SearchShardTarget; * received by this listener. */ abstract class SearchActionListener implements ActionListener { + private final int requestIndex; private final SearchShardTarget searchShardTarget; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java b/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java new file mode 100644 index 00000000000..72c3d5eaab6 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.node.ResponseCollectorService; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.transport.Transport; + +import java.util.Objects; +import java.util.function.BiFunction; + +/** + * A wrapper of search action listeners (search results) that unwraps the query + * result to get the piggybacked queue size and service time EWMA, adding those + * values to the coordinating nodes' {@code ResponseCollectorService}. + */ +public final class SearchExecutionStatsCollector implements ActionListener { + + private final ActionListener listener; + private final String nodeId; + private final ResponseCollectorService collector; + private final long startNanos; + + SearchExecutionStatsCollector(ActionListener listener, + ResponseCollectorService collector, + String nodeId) { + this.listener = Objects.requireNonNull(listener, "listener cannot be null"); + this.collector = Objects.requireNonNull(collector, "response collector cannot be null"); + this.startNanos = System.nanoTime(); + this.nodeId = nodeId; + } + + public static BiFunction makeWrapper(ResponseCollectorService service) { + return (connection, originalListener) -> new SearchExecutionStatsCollector(originalListener, service, connection.getNode().getId()); + } + + @Override + public void onResponse(SearchPhaseResult response) { + QuerySearchResult queryResult = response.queryResult(); + if (nodeId != null && queryResult != null) { + final long serviceTimeEWMA = queryResult.serviceTimeEWMA(); + final int queueSize = queryResult.nodeQueueSize(); + final long responseDuration = System.nanoTime() - startNanos; + // EWMA/queue size may be -1 if the query node doesn't support capturing it + if (serviceTimeEWMA > 0 && queueSize > 0) { + collector.addNodeStatistics(nodeId, queueSize, responseDuration, serviceTimeEWMA); + } + } + listener.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index d9c60438efc..5edfbbedb03 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.function.BiFunction; import java.util.function.Supplier; /** @@ -77,10 +78,13 @@ public class SearchTransportService extends AbstractComponent { public static final String QUERY_CAN_MATCH_NAME = "indices:data/read/search[can_match]"; private final TransportService transportService; + private final BiFunction responseWrapper; - public SearchTransportService(Settings settings, TransportService transportService) { + public SearchTransportService(Settings settings, TransportService transportService, + BiFunction responseWrapper) { super(settings); this.transportService = transportService; + this.responseWrapper = responseWrapper; } public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) { @@ -135,8 +139,10 @@ public class SearchTransportService extends AbstractComponent { // this used to be the QUERY_AND_FETCH which doesn't exist anymore. final boolean fetchDocuments = request.numberOfShards() == 1; Supplier supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new; + + final ActionListener handler = responseWrapper.apply(connection, listener); transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, supplier)); + new ActionListenerResponseHandler<>(handler, supplier)); } public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task, diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java index c24b6899bcc..1f694d73fa7 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java @@ -141,6 +141,13 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto return executionEWMA.getAverage(); } + /** + * Returns the current queue size (operations that are queued) + */ + public int getCurrentQueueSize() { + return workQueue.size(); + } + @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6b027339c9d..8637b79ca35 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -291,6 +291,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl persistMetadata(path, indexSettings, shardRouting, null, logger); } + public ThreadPool getThreadPool() { + return this.threadPool; + } + public Store store() { return this.store; } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 0b0f1a6aaa7..201513373b8 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -29,6 +29,7 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.GenericAction; +import org.elasticsearch.action.search.SearchExecutionStatsCollector; import org.elasticsearch.action.search.SearchPhaseController; import org.elasticsearch.action.search.SearchTransportService; import org.elasticsearch.action.support.TransportAction; @@ -423,8 +424,9 @@ public class Node implements Closeable { final Transport transport = networkModule.getTransportSupplier().get(); final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings()); - final SearchTransportService searchTransportService = new SearchTransportService(settings, - transportService); + final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService); + final SearchTransportService searchTransportService = new SearchTransportService(settings, transportService, + SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); final Consumer httpBind; final HttpServerTransport httpServerTransport; if (networkModule.isHttpEnabled()) { @@ -469,7 +471,8 @@ public class Node implements Closeable { b.bind(MetaStateService.class).toInstance(metaStateService); b.bind(IndicesService.class).toInstance(indicesService); b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService, - threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase())); + threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(), + responseCollectorService)); b.bind(SearchTransportService.class).toInstance(searchTransportService); b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays, scriptModule.getScriptService())); @@ -897,8 +900,9 @@ public class Node implements Closeable { */ protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, - FetchPhase fetchPhase) { - return new SearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase); + FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) { + return new SearchService(clusterService, indicesService, threadPool, + scriptService, bigArrays, fetchPhase, responseCollectorService); } /** diff --git a/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java new file mode 100644 index 00000000000..1afbd3b2997 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/node/ResponseCollectorService.java @@ -0,0 +1,144 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.node; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +/** + * Collects statistics about queue size, response time, and service time of + * tasks executed on each node, making the EWMA of the values available to the + * coordinating node. + */ +public final class ResponseCollectorService extends AbstractComponent implements ClusterStateListener { + + private static final double ALPHA = 0.3; + + private final ConcurrentMap nodeIdToStats = ConcurrentCollections.newConcurrentMap(); + + public ResponseCollectorService(Settings settings, ClusterService clusterService) { + super(settings); + clusterService.addListener(this); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.nodesRemoved()) { + for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) { + removeNode(removedNode.getId()); + } + } + } + + void removeNode(String nodeId) { + nodeIdToStats.remove(nodeId); + } + + public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNanos, long avgServiceTimeNanos) { + NodeStatistics nodeStats = nodeIdToStats.get(nodeId); + nodeIdToStats.compute(nodeId, (id, ns) -> { + if (ns == null) { + ExponentiallyWeightedMovingAverage queueEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, queueSize); + ExponentiallyWeightedMovingAverage responseEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, responseTimeNanos); + NodeStatistics newStats = new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos); + return newStats; + } else { + ns.queueSize.addValue((double) queueSize); + ns.responseTime.addValue((double) responseTimeNanos); + ns.serviceTime = avgServiceTimeNanos; + return ns; + } + }); + } + + public Map getAllNodeStatistics() { + // Transform the mutable object internally used for accounting into the computed version + Map nodeStats = new HashMap<>(nodeIdToStats.size()); + nodeIdToStats.forEach((k, v) -> { + nodeStats.put(k, new ComputedNodeStats(v)); + }); + return nodeStats; + } + + /** + * Struct-like class encapsulating a point-in-time snapshot of a particular + * node's statistics. This includes the EWMA of queue size, response time, + * and service time. + */ + public static class ComputedNodeStats { + public final String nodeId; + public final double queueSize; + public final double responseTime; + public final double serviceTime; + + ComputedNodeStats(NodeStatistics nodeStats) { + this.nodeId = nodeStats.nodeId; + this.queueSize = nodeStats.queueSize.getAverage(); + this.responseTime = nodeStats.responseTime.getAverage(); + this.serviceTime = nodeStats.serviceTime; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("ComputedNodeStats["); + sb.append(nodeId).append("]("); + sb.append("queue: ").append(queueSize); + sb.append(", response time: ").append(responseTime); + sb.append(", service time: ").append(serviceTime); + sb.append(")"); + return sb.toString(); + } + } + + /** + * Class encapsulating a node's exponentially weighted queue size, response + * time, and service time, however, this class is private and intended only + * to be used for the internal accounting of {@code ResponseCollectorService}. + */ + private static class NodeStatistics { + final String nodeId; + final ExponentiallyWeightedMovingAverage queueSize; + final ExponentiallyWeightedMovingAverage responseTime; + double serviceTime; + + NodeStatistics(String nodeId, + ExponentiallyWeightedMovingAverage queueSizeEWMA, + ExponentiallyWeightedMovingAverage responseTimeEWMA, + double serviceTimeEWMA) { + this.nodeId = nodeId; + this.queueSize = queueSizeEWMA; + this.responseTime = responseTimeEWMA; + this.serviceTime = serviceTimeEWMA; + } + } +} diff --git a/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 8e0536adfb4..815971e27d4 100644 --- a/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -51,6 +51,7 @@ import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.search.NestedHelper; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.similarity.SimilarityService; +import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.search.aggregations.SearchContextAggregations; import org.elasticsearch.search.collapse.CollapseContext; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -81,6 +82,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; final class DefaultSearchContext extends SearchContext { @@ -93,6 +95,7 @@ final class DefaultSearchContext extends SearchContext { private final BigArrays bigArrays; private final IndexShard indexShard; private final IndexService indexService; + private final ResponseCollectorService responseCollectorService; private final ContextIndexSearcher searcher; private final DfsSearchResult dfsResult; private final QuerySearchResult queryResult; @@ -147,6 +150,7 @@ final class DefaultSearchContext extends SearchContext { private final long originNanoTime = System.nanoTime(); private volatile long lastAccessTime = -1; private Profilers profilers; + private ExecutorService searchExecutor; private final Map searchExtBuilders = new HashMap<>(); private final Map, Collector> queryCollectors = new HashMap<>(); @@ -154,8 +158,8 @@ final class DefaultSearchContext extends SearchContext { private FetchPhase fetchPhase; DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, Engine.Searcher engineSearcher, - IndexService indexService, IndexShard indexShard, - BigArrays bigArrays, Counter timeEstimateCounter, TimeValue timeout, FetchPhase fetchPhase) { + IndexService indexService, IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter, + TimeValue timeout, FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) { this.id = id; this.request = request; this.fetchPhase = fetchPhase; @@ -169,6 +173,7 @@ final class DefaultSearchContext extends SearchContext { this.fetchResult = new FetchSearchResult(id, shardTarget); this.indexShard = indexShard; this.indexService = indexService; + this.responseCollectorService = responseCollectorService; this.searcher = new ContextIndexSearcher(engineSearcher, indexService.cache().query(), indexShard.getQueryCachingPolicy()); this.timeEstimateCounter = timeEstimateCounter; this.timeout = timeout; diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 2c5e6c49b41..6f831b6099c 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -52,6 +52,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; +import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.SearchScript; import org.elasticsearch.search.aggregations.AggregationInitializationException; @@ -96,6 +97,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; @@ -131,6 +133,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final ScriptService scriptService; + private final ResponseCollectorService responseCollectorService; + private final BigArrays bigArrays; private final DfsPhase dfsPhase = new DfsPhase(); @@ -152,12 +156,14 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final ConcurrentMapLong activeContexts = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); public SearchService(ClusterService clusterService, IndicesService indicesService, - ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase) { + ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase, + ResponseCollectorService responseCollectorService) { super(clusterService.getSettings()); this.threadPool = threadPool; this.clusterService = clusterService; this.indicesService = indicesService; this.scriptService = scriptService; + this.responseCollectorService = responseCollectorService; this.bigArrays = bigArrays; this.queryPhase = new QueryPhase(settings); this.fetchPhase = fetchPhase; @@ -520,7 +526,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher; final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, - engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase); + engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase, + responseCollectorService); boolean success = false; try { // we clone the query shard context here just for rewriting otherwise we @@ -813,6 +820,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv return this.activeContexts.size(); } + public ResponseCollectorService getResponseCollectorService() { + return this.responseCollectorService; + } + class Reaper implements Runnable { @Override public void run() { diff --git a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 82e572a180e..a944f0d1a51 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -38,6 +38,8 @@ import org.apache.lucene.search.TopDocs; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.QueueResizingEsThreadPoolExecutor; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchPhase; import org.elasticsearch.search.SearchService; @@ -50,8 +52,10 @@ import org.elasticsearch.search.profile.query.InternalProfileCollector; import org.elasticsearch.search.rescore.RescorePhase; import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.suggest.SuggestPhase; +import org.elasticsearch.threadpool.ThreadPool; import java.util.LinkedList; +import java.util.concurrent.ExecutorService; import static org.elasticsearch.search.query.QueryCollectorContext.createCancellableCollectorContext; import static org.elasticsearch.search.query.QueryCollectorContext.createEarlySortingTerminationCollectorContext; @@ -238,6 +242,13 @@ public class QueryPhase implements SearchPhase { for (QueryCollectorContext ctx : collectors) { ctx.postProcess(result, shouldCollect); } + EsThreadPoolExecutor executor = (EsThreadPoolExecutor) + searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);; + if (executor instanceof QueueResizingEsThreadPoolExecutor) { + QueueResizingEsThreadPoolExecutor rExecutor = (QueueResizingEsThreadPoolExecutor) executor; + queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); + queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); + } if (searchContext.getProfilers() != null) { ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers()); result.profileResults(shardResults); diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 8549f42040f..feb3e7876a3 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -21,6 +21,7 @@ package org.elasticsearch.search.query; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TopDocs; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; @@ -58,6 +59,8 @@ public final class QuerySearchResult extends SearchPhaseResult { private boolean hasScoreDocs; private long totalHits; private float maxScore; + private long serviceTimeEWMA = -1; + private int nodeQueueSize = -1; public QuerySearchResult() { } @@ -228,6 +231,24 @@ public final class QuerySearchResult extends SearchPhaseResult { return this; } + public long serviceTimeEWMA() { + return this.serviceTimeEWMA; + } + + public QuerySearchResult serviceTimeEWMA(long serviceTimeEWMA) { + this.serviceTimeEWMA = serviceTimeEWMA; + return this; + } + + public int nodeQueueSize() { + return this.nodeQueueSize; + } + + public QuerySearchResult nodeQueueSize(int nodeQueueSize) { + this.nodeQueueSize = nodeQueueSize; + return this; + } + /** * Returns true if this result has any suggest score docs */ @@ -278,6 +299,13 @@ public final class QuerySearchResult extends SearchPhaseResult { terminatedEarly = in.readOptionalBoolean(); profileShardResults = in.readOptionalWriteable(ProfileShardResult::new); hasProfileResults = profileShardResults != null; + if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { + serviceTimeEWMA = in.readZLong(); + nodeQueueSize = in.readInt(); + } else { + serviceTimeEWMA = -1; + nodeQueueSize = -1; + } } @Override @@ -315,6 +343,10 @@ public final class QuerySearchResult extends SearchPhaseResult { out.writeBoolean(searchTimedOut); out.writeOptionalBoolean(terminatedEarly); out.writeOptionalWriteable(profileShardResults); + if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { + out.writeZLong(serviceTimeEWMA); + out.writeInt(nodeQueueSize); + } } public long getTotalHits() { diff --git a/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 8445fb08fab..e6a17446476 100644 --- a/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -64,7 +64,7 @@ public class AbstractSearchAsyncActionTests extends ESTestCase { Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null, new SearchRequest(), null, new GroupShardsIterator<>(Collections.singletonList( new SearchShardIterator(null, null, Collections.emptyList(), null))), timeProvider, 0, null, - new InitialSearchPhase.ArraySearchPhaseResults<>(10)) { + new InitialSearchPhase.ArraySearchPhaseResults<>(10)) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, final SearchPhaseContext context) { return null; diff --git a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 9993d8af57c..87cebc957c6 100644 --- a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -57,7 +57,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { final boolean shard2 = randomBoolean(); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task, @@ -105,7 +105,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { public void testOldNodesTriggerException() { SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null); + Settings.builder().put("search.remote.connect", false).build(), null, null); DiscoveryNode node = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), VersionUtils.randomVersionBetween(random(), VersionUtils.getFirstVersion(), VersionUtils.getPreviousVersion(Version.V_5_6_0))); SearchAsyncActionTests.MockConnection mockConnection = new SearchAsyncActionTests.MockConnection(node); @@ -124,7 +124,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); final boolean shard1 = randomBoolean(); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task, diff --git a/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java index 5037ffe03f9..bd1d6a85b09 100644 --- a/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java @@ -70,7 +70,7 @@ public class ClearScrollControllerTests extends ESTestCase { } }; List nodesInvoked = new CopyOnWriteArrayList<>(); - SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) { + SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null, null) { @Override public void sendClearAllScrollContexts(Transport.Connection connection, ActionListener listener) { nodesInvoked.add(connection.getNode()); @@ -135,7 +135,7 @@ public class ClearScrollControllerTests extends ESTestCase { } }; List nodesInvoked = new CopyOnWriteArrayList<>(); - SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) { + SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null, null) { @Override public void sendFreeContext(Transport.Connection connection, long contextId, @@ -213,7 +213,7 @@ public class ClearScrollControllerTests extends ESTestCase { } }; List nodesInvoked = new CopyOnWriteArrayList<>(); - SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) { + SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null, null) { @Override public void sendFreeContext(Transport.Connection connection, long contextId, diff --git a/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java index 185dae410d5..15d24b85b49 100644 --- a/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java @@ -58,7 +58,7 @@ public class DfsQueryPhaseTests extends ESTestCase { SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, @@ -115,7 +115,7 @@ public class DfsQueryPhaseTests extends ESTestCase { SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, @@ -171,7 +171,7 @@ public class DfsQueryPhaseTests extends ESTestCase { SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, diff --git a/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java index b27c7ec3955..81a6359997d 100644 --- a/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java @@ -70,7 +70,7 @@ public class ExpandSearchPhaseTests extends ESTestCase { .collect(Collectors.toList())))); mockSearchPhaseContext.getRequest().source().query(originalQuery); mockSearchPhaseContext.searchTransport = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener listener) { @@ -144,7 +144,7 @@ public class ExpandSearchPhaseTests extends ESTestCase { mockSearchPhaseContext.getRequest().source(new SearchSourceBuilder() .collapse(new CollapseBuilder("someField").setInnerHits(new InnerHitBuilder().setName("foobarbaz")))); mockSearchPhaseContext.searchTransport = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener listener) { @@ -185,7 +185,7 @@ public class ExpandSearchPhaseTests extends ESTestCase { public void testSkipPhase() throws IOException { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); mockSearchPhaseContext.searchTransport = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener listener) { @@ -216,7 +216,7 @@ public class ExpandSearchPhaseTests extends ESTestCase { public void testSkipExpandCollapseNoHits() throws IOException { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); mockSearchPhaseContext.searchTransport = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener listener) { diff --git a/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java index 6e5b85745bc..bd38a420f07 100644 --- a/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java @@ -103,7 +103,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { @@ -157,7 +157,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { @@ -210,7 +210,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); } SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { @@ -271,7 +271,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); AtomicInteger numFetches = new AtomicInteger(0); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { @@ -324,7 +324,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null) { + Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index a2b438a66e4..3ee681383cd 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -86,7 +86,7 @@ public class SearchAsyncActionTests extends ESTestCase { } } - SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null); + SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null, null); Map lookup = new HashMap<>(); Map seenShard = new ConcurrentHashMap<>(); lookup.put(primaryNode.getId(), new MockConnection(primaryNode)); @@ -173,7 +173,7 @@ public class SearchAsyncActionTests extends ESTestCase { GroupShardsIterator shardsIter = getShardsIter("idx", new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()), 10, randomBoolean(), primaryNode, replicaNode); - SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null); + SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null, null); Map lookup = new HashMap<>(); Map seenShard = new ConcurrentHashMap<>(); lookup.put(primaryNode.getId(), new MockConnection(primaryNode)); @@ -271,7 +271,7 @@ public class SearchAsyncActionTests extends ESTestCase { new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()), randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode); AtomicInteger numFreedContext = new AtomicInteger(); - SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null) { + SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null, null) { @Override public void sendFreeContext(Transport.Connection connection, long contextId, OriginalIndices originalIndices) { numFreedContext.incrementAndGet(); diff --git a/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java b/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java new file mode 100644 index 00000000000..2ff3d156247 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java @@ -0,0 +1,140 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.node; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +public class ResponseCollectorServiceTests extends ESTestCase { + + private ClusterService clusterService; + private ResponseCollectorService collector; + private ThreadPool threadpool; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadpool = new TestThreadPool("response_collector_tests"); + clusterService = new ClusterService(Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadpool); + collector = new ResponseCollectorService(Settings.EMPTY, clusterService); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadpool.shutdownNow(); + } + + public void testNodeStats() throws Exception { + collector.addNodeStatistics("node1", 1, 100, 10); + Map nodeStats = collector.getAllNodeStatistics(); + assertTrue(nodeStats.containsKey("node1")); + assertThat(nodeStats.get("node1").queueSize, equalTo(1.0)); + assertThat(nodeStats.get("node1").responseTime, equalTo(100.0)); + assertThat(nodeStats.get("node1").serviceTime, equalTo(10.0)); + } + + /* + * Test that concurrently adding values and removing nodes does not cause exceptions + */ + public void testConcurrentAddingAndRemoving() throws Exception { + String[] nodes = new String[] {"a", "b", "c", "d"}; + + final CountDownLatch latch = new CountDownLatch(1); + + Runnable f = () -> { + try { + latch.await(); + } catch (InterruptedException e) { + fail("should not be interrupted"); + } + for (int i = 0; i < randomIntBetween(100, 200); i++) { + if (randomBoolean()) { + collector.removeNode(randomFrom(nodes)); + } + collector.addNodeStatistics(randomFrom(nodes), randomIntBetween(1,100), randomIntBetween(1,100), randomIntBetween(1,100)); + } + }; + + Thread t1 = new Thread(f); + Thread t2 = new Thread(f); + Thread t3 = new Thread(f); + Thread t4 = new Thread(f); + + t1.start(); + t2.start(); + t3.start(); + t4.start(); + latch.countDown(); + t1.join(); + t2.join(); + t3.join(); + t4.join(); + + final Map nodeStats = collector.getAllNodeStatistics(); + logger.info("--> got stats: {}", nodeStats); + for (String nodeId : nodes) { + if (nodeStats.containsKey(nodeId)) { + assertThat(nodeStats.get(nodeId).queueSize, greaterThan(0.0)); + assertThat(nodeStats.get(nodeId).responseTime, greaterThan(0.0)); + assertThat(nodeStats.get(nodeId).serviceTime, greaterThan(0.0)); + } + } + } + + public void testNodeRemoval() throws Exception { + collector.addNodeStatistics("node1", randomIntBetween(1,100), randomIntBetween(1,100), randomIntBetween(1,100)); + collector.addNodeStatistics("node2", randomIntBetween(1,100), randomIntBetween(1,100), randomIntBetween(1,100)); + + ClusterState previousState = ClusterState.builder(new ClusterName("cluster")).nodes(DiscoveryNodes.builder() + .add(DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9200), "node1")) + .add(DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9201), "node2"))) + .build(); + ClusterState newState = ClusterState.builder(previousState).nodes(DiscoveryNodes.builder(previousState.nodes()) + .remove("node2")).build(); + ClusterChangedEvent event = new ClusterChangedEvent("test", newState, previousState); + + collector.clusterChanged(event); + final Map nodeStats = collector.getAllNodeStatistics(); + assertTrue(nodeStats.containsKey("node1")); + assertFalse(nodeStats.containsKey("node2")); + } +} diff --git a/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index 016406c6129..3055cda803d 100644 --- a/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -49,11 +49,13 @@ import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; import org.elasticsearch.action.search.SearchTask; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.ParsedQuery; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.sort.SortAndFormats; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TestSearchContext; import java.io.IOException; @@ -62,14 +64,35 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; +import static org.mockito.Mockito.mock; -public class QueryPhaseTests extends ESTestCase { +public class QueryPhaseTests extends IndexShardTestCase { + + private IndexShard indexShard; + + @Override + public Settings threadPoolSettings() { + return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.search.min_queue_size", 10).build(); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + indexShard = newShard(true); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + closeShards(indexShard); + } private void countTestCase(Query query, IndexReader reader, boolean shouldCollect) throws Exception { - TestSearchContext context = new TestSearchContext(null); + TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(query)); context.setSize(0); context.setTask(new SearchTask(123L, "", "", "", null)); @@ -139,7 +162,7 @@ public class QueryPhaseTests extends ESTestCase { } public void testPostFilterDisablesCountOptimization() throws Exception { - TestSearchContext context = new TestSearchContext(null); + TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(0); context.setTask(new SearchTask(123L, "", "", "", null)); @@ -163,7 +186,7 @@ public class QueryPhaseTests extends ESTestCase { } public void testMinScoreDisablesCountOptimization() throws Exception { - TestSearchContext context = new TestSearchContext(null); + TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(0); context.setTask(new SearchTask(123L, "", "", "", null)); @@ -186,6 +209,30 @@ public class QueryPhaseTests extends ESTestCase { assertTrue(collected.get()); } + public void testQueryCapturesThreadPoolStats() throws Exception { + TestSearchContext context = new TestSearchContext(null, indexShard); + context.setTask(new SearchTask(123L, "", "", "", null)); + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(); + RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc); + final int numDocs = scaledRandomIntBetween(100, 200); + for (int i = 0; i < numDocs; ++i) { + w.addDocument(new Document()); + } + w.close(); + IndexReader reader = DirectoryReader.open(dir); + IndexSearcher contextSearcher = new IndexSearcher(reader); + + QueryPhase.execute(context, contextSearcher, null); + QuerySearchResult results = context.queryResult(); + assertThat(results.serviceTimeEWMA(), greaterThan(0L)); + assertThat(results.nodeQueueSize(), greaterThanOrEqualTo(0)); + reader.close(); + dir.close(); + } + public void testInOrderScrollOptimization() throws Exception { Directory dir = newDirectory(); final Sort sort = new Sort(new SortField("rank", SortField.Type.INT)); @@ -206,7 +253,7 @@ public class QueryPhaseTests extends ESTestCase { } }; - TestSearchContext context = new TestSearchContext(null); + TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); ScrollContext scrollContext = new ScrollContext(); scrollContext.lastEmittedDoc = null; @@ -251,7 +298,7 @@ public class QueryPhaseTests extends ESTestCase { w.addDocument(doc); } w.close(); - TestSearchContext context = new TestSearchContext(null); + TestSearchContext context = new TestSearchContext(null, indexShard); context.setTask(new SearchTask(123L, "", "", "", null)); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.terminateAfter(1); @@ -360,7 +407,7 @@ public class QueryPhaseTests extends ESTestCase { } w.close(); - TestSearchContext context = new TestSearchContext(null); + TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(1); context.setTask(new SearchTask(123L, "", "", "", null)); @@ -454,7 +501,7 @@ public class QueryPhaseTests extends ESTestCase { } w.close(); - TestSearchContext context = new TestSearchContext(null); + TestSearchContext context = new TestSearchContext(null, indexShard); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); ScrollContext scrollContext = new ScrollContext(); scrollContext.lastEmittedDoc = null; diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 0497b7702cb..d0ce47c97fc 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -118,7 +118,7 @@ public abstract class IndexShardTestCase extends ESTestCase { @Override public void setUp() throws Exception { super.setUp(); - threadPool = new TestThreadPool(getClass().getName()); + threadPool = new TestThreadPool(getClass().getName(), threadPoolSettings()); primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards } @@ -131,6 +131,10 @@ public abstract class IndexShardTestCase extends ESTestCase { } } + public Settings threadPoolSettings() { + return Settings.EMPTY; + } + private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { final ShardId shardId = shardPath.getShardId(); final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 2e0aa98a920..311b1778309 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -89,9 +89,10 @@ public class MockNode extends Node { @Override protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, - FetchPhase fetchPhase) { + FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) { if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) { - return super.newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase); + return super.newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, + responseCollectorService); } return new MockSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase); } diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index bf300889cd5..b9d9ff3cfc9 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -69,7 +69,7 @@ public class MockSearchService extends SearchService { public MockSearchService(ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase) { - super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase); + super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index f6908db8119..0eaa3a57df0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1710,6 +1710,12 @@ public abstract class ESIntegTestCase extends ESTestCase { .put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), nodeOrdinal % 2 == 0) // wait short time for other active shards before actually deleting, default 30s not needed in tests .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS)); + if (rarely()) { + // Sometimes adjust the minimum search thread pool size, causing + // QueueResizingEsThreadPoolExecutor to be used instead of a regular + // fixed thread pool + builder.put("thread_pool.search.min_queue_size", 100); + } return builder.build(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java index 038a21c28e8..390c9f9772e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java @@ -24,8 +24,10 @@ import org.apache.lucene.search.Query; import org.apache.lucene.util.Counter; import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; import org.elasticsearch.index.engine.Engine; @@ -106,12 +108,16 @@ public class TestSearchContext extends SearchContext { } public TestSearchContext(QueryShardContext queryShardContext) { + this(queryShardContext, null); + } + + public TestSearchContext(QueryShardContext queryShardContext, IndexShard indexShard) { this.bigArrays = null; this.indexService = null; this.indexFieldDataService = null; this.threadPool = null; this.fixedBitSetFilterCache = null; - this.indexShard = null; + this.indexShard = indexShard; this.queryShardContext = queryShardContext; } diff --git a/test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java b/test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java index 0d525f7f59a..f40820cc877 100644 --- a/test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java +++ b/test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java @@ -25,7 +25,11 @@ import org.elasticsearch.node.Node; public class TestThreadPool extends ThreadPool { public TestThreadPool(String name) { - super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).build()); + this(name, Settings.EMPTY); + } + + public TestThreadPool(String name, Settings settings) { + super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build()); } }