Register data node stats from info carried back in search responses (#25430)
* Register data node stats from info carried back in search responses This is part of #24915, where we now calculate the EWMA of service time for tasks in the search threadpool, and send that as well as the current queue size back to the coordinating node. The coordinating node now tracks this information for each node in the cluster. This information will be used in the future the determining the best replica a search request should be routed to. This change has no user-visible difference. * Move response time timing into ResponseListenerWrapper * Move ResponseListenerWrapper to ActionListener instead of SearchActionListener Also removes the logger * Move `requestIndex` back to private * De-guice-ify ResponseCollectorService \o/ * Undo all changes to SearchQueryThenFetchAsyncAction * Remove unneeded response collector from TransportSearchAction * Undo all changes to SearchDfsQueryThenFetchAsyncAction * Completely rewrite the inside of ResponseCollectorService's record keeping * Documentation and cleanups for ResponseCollectorService * Add unit test for collection of queue size and service time * Fix Guice construction error * Add basic unit tests for ResponseCollectorService * Fix version constant for the master merge * Fix test compilation after master merge * Add a test for node removal on cluster changed event * Remove integration test as there are now unit tests * Rename ResponseListenerWrapper -> SearchExecutionStatsCollector * Fix line-length * Make classes private and final where appropriate * Pass nodeId into SearchExecutionStatsCollector and use only ActionListener * Get nodeId from connection so searchShardTarget can be private * Remove threadpool from SearchContext, get it from IndexShard instead * Add missing import * Use BiFunction for responseWrapper rather than passing in collector service
This commit is contained in:
parent
cb4eebcd6a
commit
610ba7e427
|
@ -27,6 +27,7 @@ import org.elasticsearch.search.SearchShardTarget;
|
|||
* received by this listener.
|
||||
*/
|
||||
abstract class SearchActionListener<T extends SearchPhaseResult> implements ActionListener<T> {
|
||||
|
||||
private final int requestIndex;
|
||||
private final SearchShardTarget searchShardTarget;
|
||||
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.node.ResponseCollectorService;
|
||||
import org.elasticsearch.search.SearchPhaseResult;
|
||||
import org.elasticsearch.search.query.QuerySearchResult;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
/**
|
||||
* A wrapper of search action listeners (search results) that unwraps the query
|
||||
* result to get the piggybacked queue size and service time EWMA, adding those
|
||||
* values to the coordinating nodes' {@code ResponseCollectorService}.
|
||||
*/
|
||||
public final class SearchExecutionStatsCollector implements ActionListener<SearchPhaseResult> {
|
||||
|
||||
private final ActionListener<SearchPhaseResult> listener;
|
||||
private final String nodeId;
|
||||
private final ResponseCollectorService collector;
|
||||
private final long startNanos;
|
||||
|
||||
SearchExecutionStatsCollector(ActionListener<SearchPhaseResult> listener,
|
||||
ResponseCollectorService collector,
|
||||
String nodeId) {
|
||||
this.listener = Objects.requireNonNull(listener, "listener cannot be null");
|
||||
this.collector = Objects.requireNonNull(collector, "response collector cannot be null");
|
||||
this.startNanos = System.nanoTime();
|
||||
this.nodeId = nodeId;
|
||||
}
|
||||
|
||||
public static BiFunction<Transport.Connection, SearchActionListener, ActionListener> makeWrapper(ResponseCollectorService service) {
|
||||
return (connection, originalListener) -> new SearchExecutionStatsCollector(originalListener, service, connection.getNode().getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(SearchPhaseResult response) {
|
||||
QuerySearchResult queryResult = response.queryResult();
|
||||
if (nodeId != null && queryResult != null) {
|
||||
final long serviceTimeEWMA = queryResult.serviceTimeEWMA();
|
||||
final int queueSize = queryResult.nodeQueueSize();
|
||||
final long responseDuration = System.nanoTime() - startNanos;
|
||||
// EWMA/queue size may be -1 if the query node doesn't support capturing it
|
||||
if (serviceTimeEWMA > 0 && queueSize > 0) {
|
||||
collector.addNodeStatistics(nodeId, queueSize, responseDuration, serviceTimeEWMA);
|
||||
}
|
||||
}
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
|
@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportResponse;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
|
@ -77,10 +78,13 @@ public class SearchTransportService extends AbstractComponent {
|
|||
public static final String QUERY_CAN_MATCH_NAME = "indices:data/read/search[can_match]";
|
||||
|
||||
private final TransportService transportService;
|
||||
private final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
|
||||
|
||||
public SearchTransportService(Settings settings, TransportService transportService) {
|
||||
public SearchTransportService(Settings settings, TransportService transportService,
|
||||
BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper) {
|
||||
super(settings);
|
||||
this.transportService = transportService;
|
||||
this.responseWrapper = responseWrapper;
|
||||
}
|
||||
|
||||
public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) {
|
||||
|
@ -135,8 +139,10 @@ public class SearchTransportService extends AbstractComponent {
|
|||
// this used to be the QUERY_AND_FETCH which doesn't exist anymore.
|
||||
final boolean fetchDocuments = request.numberOfShards() == 1;
|
||||
Supplier<SearchPhaseResult> supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
|
||||
|
||||
final ActionListener handler = responseWrapper.apply(connection, listener);
|
||||
transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
|
||||
new ActionListenerResponseHandler<>(listener, supplier));
|
||||
new ActionListenerResponseHandler<>(handler, supplier));
|
||||
}
|
||||
|
||||
public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task,
|
||||
|
|
|
@ -141,6 +141,13 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
|
|||
return executionEWMA.getAverage();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current queue size (operations that are queued)
|
||||
*/
|
||||
public int getCurrentQueueSize() {
|
||||
return workQueue.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterExecute(Runnable r, Throwable t) {
|
||||
super.afterExecute(r, t);
|
||||
|
|
|
@ -291,6 +291,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
persistMetadata(path, indexSettings, shardRouting, null, logger);
|
||||
}
|
||||
|
||||
public ThreadPool getThreadPool() {
|
||||
return this.threadPool;
|
||||
}
|
||||
|
||||
public Store store() {
|
||||
return this.store;
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.ElasticsearchTimeoutException;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionModule;
|
||||
import org.elasticsearch.action.GenericAction;
|
||||
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
|
||||
import org.elasticsearch.action.search.SearchPhaseController;
|
||||
import org.elasticsearch.action.search.SearchTransportService;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
|
@ -423,8 +424,9 @@ public class Node implements Closeable {
|
|||
final Transport transport = networkModule.getTransportSupplier().get();
|
||||
final TransportService transportService = newTransportService(settings, transport, threadPool,
|
||||
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
|
||||
final SearchTransportService searchTransportService = new SearchTransportService(settings,
|
||||
transportService);
|
||||
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService);
|
||||
final SearchTransportService searchTransportService = new SearchTransportService(settings, transportService,
|
||||
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
|
||||
final Consumer<Binder> httpBind;
|
||||
final HttpServerTransport httpServerTransport;
|
||||
if (networkModule.isHttpEnabled()) {
|
||||
|
@ -469,7 +471,8 @@ public class Node implements Closeable {
|
|||
b.bind(MetaStateService.class).toInstance(metaStateService);
|
||||
b.bind(IndicesService.class).toInstance(indicesService);
|
||||
b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService,
|
||||
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase()));
|
||||
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
|
||||
responseCollectorService));
|
||||
b.bind(SearchTransportService.class).toInstance(searchTransportService);
|
||||
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays,
|
||||
scriptModule.getScriptService()));
|
||||
|
@ -897,8 +900,9 @@ public class Node implements Closeable {
|
|||
*/
|
||||
protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
|
||||
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
|
||||
FetchPhase fetchPhase) {
|
||||
return new SearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase);
|
||||
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) {
|
||||
return new SearchService(clusterService, indicesService, threadPool,
|
||||
scriptService, bigArrays, fetchPhase, responseCollectorService);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.node;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Collects statistics about queue size, response time, and service time of
|
||||
* tasks executed on each node, making the EWMA of the values available to the
|
||||
* coordinating node.
|
||||
*/
|
||||
public final class ResponseCollectorService extends AbstractComponent implements ClusterStateListener {
|
||||
|
||||
private static final double ALPHA = 0.3;
|
||||
|
||||
private final ConcurrentMap<String, NodeStatistics> nodeIdToStats = ConcurrentCollections.newConcurrentMap();
|
||||
|
||||
public ResponseCollectorService(Settings settings, ClusterService clusterService) {
|
||||
super(settings);
|
||||
clusterService.addListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.nodesRemoved()) {
|
||||
for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) {
|
||||
removeNode(removedNode.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void removeNode(String nodeId) {
|
||||
nodeIdToStats.remove(nodeId);
|
||||
}
|
||||
|
||||
public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNanos, long avgServiceTimeNanos) {
|
||||
NodeStatistics nodeStats = nodeIdToStats.get(nodeId);
|
||||
nodeIdToStats.compute(nodeId, (id, ns) -> {
|
||||
if (ns == null) {
|
||||
ExponentiallyWeightedMovingAverage queueEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, queueSize);
|
||||
ExponentiallyWeightedMovingAverage responseEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, responseTimeNanos);
|
||||
NodeStatistics newStats = new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos);
|
||||
return newStats;
|
||||
} else {
|
||||
ns.queueSize.addValue((double) queueSize);
|
||||
ns.responseTime.addValue((double) responseTimeNanos);
|
||||
ns.serviceTime = avgServiceTimeNanos;
|
||||
return ns;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public Map<String, ComputedNodeStats> getAllNodeStatistics() {
|
||||
// Transform the mutable object internally used for accounting into the computed version
|
||||
Map<String, ComputedNodeStats> nodeStats = new HashMap<>(nodeIdToStats.size());
|
||||
nodeIdToStats.forEach((k, v) -> {
|
||||
nodeStats.put(k, new ComputedNodeStats(v));
|
||||
});
|
||||
return nodeStats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Struct-like class encapsulating a point-in-time snapshot of a particular
|
||||
* node's statistics. This includes the EWMA of queue size, response time,
|
||||
* and service time.
|
||||
*/
|
||||
public static class ComputedNodeStats {
|
||||
public final String nodeId;
|
||||
public final double queueSize;
|
||||
public final double responseTime;
|
||||
public final double serviceTime;
|
||||
|
||||
ComputedNodeStats(NodeStatistics nodeStats) {
|
||||
this.nodeId = nodeStats.nodeId;
|
||||
this.queueSize = nodeStats.queueSize.getAverage();
|
||||
this.responseTime = nodeStats.responseTime.getAverage();
|
||||
this.serviceTime = nodeStats.serviceTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder("ComputedNodeStats[");
|
||||
sb.append(nodeId).append("](");
|
||||
sb.append("queue: ").append(queueSize);
|
||||
sb.append(", response time: ").append(responseTime);
|
||||
sb.append(", service time: ").append(serviceTime);
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Class encapsulating a node's exponentially weighted queue size, response
|
||||
* time, and service time, however, this class is private and intended only
|
||||
* to be used for the internal accounting of {@code ResponseCollectorService}.
|
||||
*/
|
||||
private static class NodeStatistics {
|
||||
final String nodeId;
|
||||
final ExponentiallyWeightedMovingAverage queueSize;
|
||||
final ExponentiallyWeightedMovingAverage responseTime;
|
||||
double serviceTime;
|
||||
|
||||
NodeStatistics(String nodeId,
|
||||
ExponentiallyWeightedMovingAverage queueSizeEWMA,
|
||||
ExponentiallyWeightedMovingAverage responseTimeEWMA,
|
||||
double serviceTimeEWMA) {
|
||||
this.nodeId = nodeId;
|
||||
this.queueSize = queueSizeEWMA;
|
||||
this.responseTime = responseTimeEWMA;
|
||||
this.serviceTime = serviceTimeEWMA;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -51,6 +51,7 @@ import org.elasticsearch.index.query.QueryShardContext;
|
|||
import org.elasticsearch.index.search.NestedHelper;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.node.ResponseCollectorService;
|
||||
import org.elasticsearch.search.aggregations.SearchContextAggregations;
|
||||
import org.elasticsearch.search.collapse.CollapseContext;
|
||||
import org.elasticsearch.search.dfs.DfsSearchResult;
|
||||
|
@ -81,6 +82,7 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
final class DefaultSearchContext extends SearchContext {
|
||||
|
||||
|
@ -93,6 +95,7 @@ final class DefaultSearchContext extends SearchContext {
|
|||
private final BigArrays bigArrays;
|
||||
private final IndexShard indexShard;
|
||||
private final IndexService indexService;
|
||||
private final ResponseCollectorService responseCollectorService;
|
||||
private final ContextIndexSearcher searcher;
|
||||
private final DfsSearchResult dfsResult;
|
||||
private final QuerySearchResult queryResult;
|
||||
|
@ -147,6 +150,7 @@ final class DefaultSearchContext extends SearchContext {
|
|||
private final long originNanoTime = System.nanoTime();
|
||||
private volatile long lastAccessTime = -1;
|
||||
private Profilers profilers;
|
||||
private ExecutorService searchExecutor;
|
||||
|
||||
private final Map<String, SearchExtBuilder> searchExtBuilders = new HashMap<>();
|
||||
private final Map<Class<?>, Collector> queryCollectors = new HashMap<>();
|
||||
|
@ -154,8 +158,8 @@ final class DefaultSearchContext extends SearchContext {
|
|||
private FetchPhase fetchPhase;
|
||||
|
||||
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, Engine.Searcher engineSearcher,
|
||||
IndexService indexService, IndexShard indexShard,
|
||||
BigArrays bigArrays, Counter timeEstimateCounter, TimeValue timeout, FetchPhase fetchPhase) {
|
||||
IndexService indexService, IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter,
|
||||
TimeValue timeout, FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) {
|
||||
this.id = id;
|
||||
this.request = request;
|
||||
this.fetchPhase = fetchPhase;
|
||||
|
@ -169,6 +173,7 @@ final class DefaultSearchContext extends SearchContext {
|
|||
this.fetchResult = new FetchSearchResult(id, shardTarget);
|
||||
this.indexShard = indexShard;
|
||||
this.indexService = indexService;
|
||||
this.responseCollectorService = responseCollectorService;
|
||||
this.searcher = new ContextIndexSearcher(engineSearcher, indexService.cache().query(), indexShard.getQueryCachingPolicy());
|
||||
this.timeEstimateCounter = timeEstimateCounter;
|
||||
this.timeout = timeout;
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.elasticsearch.index.shard.IndexShard;
|
|||
import org.elasticsearch.index.shard.SearchOperationListener;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
|
||||
import org.elasticsearch.node.ResponseCollectorService;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.script.SearchScript;
|
||||
import org.elasticsearch.search.aggregations.AggregationInitializationException;
|
||||
|
@ -96,6 +97,7 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -131,6 +133,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
|
||||
private final ScriptService scriptService;
|
||||
|
||||
private final ResponseCollectorService responseCollectorService;
|
||||
|
||||
private final BigArrays bigArrays;
|
||||
|
||||
private final DfsPhase dfsPhase = new DfsPhase();
|
||||
|
@ -152,12 +156,14 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
private final ConcurrentMapLong<SearchContext> activeContexts = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
|
||||
|
||||
public SearchService(ClusterService clusterService, IndicesService indicesService,
|
||||
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase) {
|
||||
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
|
||||
ResponseCollectorService responseCollectorService) {
|
||||
super(clusterService.getSettings());
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.indicesService = indicesService;
|
||||
this.scriptService = scriptService;
|
||||
this.responseCollectorService = responseCollectorService;
|
||||
this.bigArrays = bigArrays;
|
||||
this.queryPhase = new QueryPhase(settings);
|
||||
this.fetchPhase = fetchPhase;
|
||||
|
@ -520,7 +526,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher;
|
||||
|
||||
final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
|
||||
engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase);
|
||||
engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase,
|
||||
responseCollectorService);
|
||||
boolean success = false;
|
||||
try {
|
||||
// we clone the query shard context here just for rewriting otherwise we
|
||||
|
@ -813,6 +820,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
return this.activeContexts.size();
|
||||
}
|
||||
|
||||
public ResponseCollectorService getResponseCollectorService() {
|
||||
return this.responseCollectorService;
|
||||
}
|
||||
|
||||
class Reaper implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
|
|
|
@ -38,6 +38,8 @@ import org.apache.lucene.search.TopDocs;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
|
||||
import org.elasticsearch.common.util.concurrent.QueueResizingEsThreadPoolExecutor;
|
||||
import org.elasticsearch.search.DocValueFormat;
|
||||
import org.elasticsearch.search.SearchPhase;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
|
@ -50,8 +52,10 @@ import org.elasticsearch.search.profile.query.InternalProfileCollector;
|
|||
import org.elasticsearch.search.rescore.RescorePhase;
|
||||
import org.elasticsearch.search.sort.SortAndFormats;
|
||||
import org.elasticsearch.search.suggest.SuggestPhase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import static org.elasticsearch.search.query.QueryCollectorContext.createCancellableCollectorContext;
|
||||
import static org.elasticsearch.search.query.QueryCollectorContext.createEarlySortingTerminationCollectorContext;
|
||||
|
@ -238,6 +242,13 @@ public class QueryPhase implements SearchPhase {
|
|||
for (QueryCollectorContext ctx : collectors) {
|
||||
ctx.postProcess(result, shouldCollect);
|
||||
}
|
||||
EsThreadPoolExecutor executor = (EsThreadPoolExecutor)
|
||||
searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);;
|
||||
if (executor instanceof QueueResizingEsThreadPoolExecutor) {
|
||||
QueueResizingEsThreadPoolExecutor rExecutor = (QueueResizingEsThreadPoolExecutor) executor;
|
||||
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
|
||||
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
|
||||
}
|
||||
if (searchContext.getProfilers() != null) {
|
||||
ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers());
|
||||
result.profileResults(shardResults);
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.search.query;
|
|||
|
||||
import org.apache.lucene.search.FieldDoc;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.search.DocValueFormat;
|
||||
|
@ -58,6 +59,8 @@ public final class QuerySearchResult extends SearchPhaseResult {
|
|||
private boolean hasScoreDocs;
|
||||
private long totalHits;
|
||||
private float maxScore;
|
||||
private long serviceTimeEWMA = -1;
|
||||
private int nodeQueueSize = -1;
|
||||
|
||||
public QuerySearchResult() {
|
||||
}
|
||||
|
@ -228,6 +231,24 @@ public final class QuerySearchResult extends SearchPhaseResult {
|
|||
return this;
|
||||
}
|
||||
|
||||
public long serviceTimeEWMA() {
|
||||
return this.serviceTimeEWMA;
|
||||
}
|
||||
|
||||
public QuerySearchResult serviceTimeEWMA(long serviceTimeEWMA) {
|
||||
this.serviceTimeEWMA = serviceTimeEWMA;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int nodeQueueSize() {
|
||||
return this.nodeQueueSize;
|
||||
}
|
||||
|
||||
public QuerySearchResult nodeQueueSize(int nodeQueueSize) {
|
||||
this.nodeQueueSize = nodeQueueSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if this result has any suggest score docs
|
||||
*/
|
||||
|
@ -278,6 +299,13 @@ public final class QuerySearchResult extends SearchPhaseResult {
|
|||
terminatedEarly = in.readOptionalBoolean();
|
||||
profileShardResults = in.readOptionalWriteable(ProfileShardResult::new);
|
||||
hasProfileResults = profileShardResults != null;
|
||||
if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
|
||||
serviceTimeEWMA = in.readZLong();
|
||||
nodeQueueSize = in.readInt();
|
||||
} else {
|
||||
serviceTimeEWMA = -1;
|
||||
nodeQueueSize = -1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -315,6 +343,10 @@ public final class QuerySearchResult extends SearchPhaseResult {
|
|||
out.writeBoolean(searchTimedOut);
|
||||
out.writeOptionalBoolean(terminatedEarly);
|
||||
out.writeOptionalWriteable(profileShardResults);
|
||||
if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
|
||||
out.writeZLong(serviceTimeEWMA);
|
||||
out.writeInt(nodeQueueSize);
|
||||
}
|
||||
}
|
||||
|
||||
public long getTotalHits() {
|
||||
|
|
|
@ -64,7 +64,7 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
|
|||
Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null,
|
||||
new SearchRequest(), null, new GroupShardsIterator<>(Collections.singletonList(
|
||||
new SearchShardIterator(null, null, Collections.emptyList(), null))), timeProvider, 0, null,
|
||||
new InitialSearchPhase.ArraySearchPhaseResults<>(10)) {
|
||||
new InitialSearchPhase.ArraySearchPhaseResults<>(10)) {
|
||||
@Override
|
||||
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {
|
||||
return null;
|
||||
|
|
|
@ -57,7 +57,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
|
|||
final boolean shard2 = randomBoolean();
|
||||
|
||||
SearchTransportService searchTransportService = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null) {
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
|
||||
@Override
|
||||
public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task,
|
||||
|
@ -105,7 +105,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
|
|||
|
||||
public void testOldNodesTriggerException() {
|
||||
SearchTransportService searchTransportService = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null);
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null);
|
||||
DiscoveryNode node = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), VersionUtils.randomVersionBetween(random(),
|
||||
VersionUtils.getFirstVersion(), VersionUtils.getPreviousVersion(Version.V_5_6_0)));
|
||||
SearchAsyncActionTests.MockConnection mockConnection = new SearchAsyncActionTests.MockConnection(node);
|
||||
|
@ -124,7 +124,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
|
|||
lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode));
|
||||
final boolean shard1 = randomBoolean();
|
||||
SearchTransportService searchTransportService = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null) {
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
|
||||
@Override
|
||||
public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task,
|
||||
|
|
|
@ -70,7 +70,7 @@ public class ClearScrollControllerTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
List<DiscoveryNode> nodesInvoked = new CopyOnWriteArrayList<>();
|
||||
SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) {
|
||||
SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null, null) {
|
||||
@Override
|
||||
public void sendClearAllScrollContexts(Transport.Connection connection, ActionListener<TransportResponse> listener) {
|
||||
nodesInvoked.add(connection.getNode());
|
||||
|
@ -135,7 +135,7 @@ public class ClearScrollControllerTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
List<DiscoveryNode> nodesInvoked = new CopyOnWriteArrayList<>();
|
||||
SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) {
|
||||
SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null, null) {
|
||||
|
||||
@Override
|
||||
public void sendFreeContext(Transport.Connection connection, long contextId,
|
||||
|
@ -213,7 +213,7 @@ public class ClearScrollControllerTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
List<DiscoveryNode> nodesInvoked = new CopyOnWriteArrayList<>();
|
||||
SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) {
|
||||
SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null, null) {
|
||||
|
||||
@Override
|
||||
public void sendFreeContext(Transport.Connection connection, long contextId,
|
||||
|
|
|
@ -58,7 +58,7 @@ public class DfsQueryPhaseTests extends ESTestCase {
|
|||
|
||||
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
|
||||
SearchTransportService searchTransportService = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null) {
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
|
||||
@Override
|
||||
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
|
||||
|
@ -115,7 +115,7 @@ public class DfsQueryPhaseTests extends ESTestCase {
|
|||
|
||||
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
|
||||
SearchTransportService searchTransportService = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null) {
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
|
||||
@Override
|
||||
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
|
||||
|
@ -171,7 +171,7 @@ public class DfsQueryPhaseTests extends ESTestCase {
|
|||
|
||||
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
|
||||
SearchTransportService searchTransportService = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null) {
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
|
||||
@Override
|
||||
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
|
||||
|
|
|
@ -70,7 +70,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
|
|||
.collect(Collectors.toList()))));
|
||||
mockSearchPhaseContext.getRequest().source().query(originalQuery);
|
||||
mockSearchPhaseContext.searchTransport = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null) {
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
|
||||
@Override
|
||||
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
|
||||
|
@ -144,7 +144,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
|
|||
mockSearchPhaseContext.getRequest().source(new SearchSourceBuilder()
|
||||
.collapse(new CollapseBuilder("someField").setInnerHits(new InnerHitBuilder().setName("foobarbaz"))));
|
||||
mockSearchPhaseContext.searchTransport = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null) {
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
|
||||
@Override
|
||||
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
|
||||
|
@ -185,7 +185,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
|
|||
public void testSkipPhase() throws IOException {
|
||||
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
|
||||
mockSearchPhaseContext.searchTransport = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null) {
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
|
||||
@Override
|
||||
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
|
||||
|
@ -216,7 +216,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
|
|||
public void testSkipExpandCollapseNoHits() throws IOException {
|
||||
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
|
||||
mockSearchPhaseContext.searchTransport = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null) {
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
|
||||
@Override
|
||||
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
|
||||
|
|
|
@ -103,7 +103,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|||
results.consumeResult(queryResult);
|
||||
|
||||
SearchTransportService searchTransportService = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null) {
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
@Override
|
||||
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
|
||||
SearchActionListener<FetchSearchResult> listener) {
|
||||
|
@ -157,7 +157,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|||
results.consumeResult(queryResult);
|
||||
|
||||
SearchTransportService searchTransportService = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null) {
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
@Override
|
||||
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
|
||||
SearchActionListener<FetchSearchResult> listener) {
|
||||
|
@ -210,7 +210,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|||
results.consumeResult(queryResult);
|
||||
}
|
||||
SearchTransportService searchTransportService = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null) {
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
@Override
|
||||
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
|
||||
SearchActionListener<FetchSearchResult> listener) {
|
||||
|
@ -271,7 +271,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|||
results.consumeResult(queryResult);
|
||||
AtomicInteger numFetches = new AtomicInteger(0);
|
||||
SearchTransportService searchTransportService = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null) {
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
@Override
|
||||
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
|
||||
SearchActionListener<FetchSearchResult> listener) {
|
||||
|
@ -324,7 +324,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|||
results.consumeResult(queryResult);
|
||||
|
||||
SearchTransportService searchTransportService = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null) {
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
@Override
|
||||
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
|
||||
SearchActionListener<FetchSearchResult> listener) {
|
||||
|
|
|
@ -86,7 +86,7 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null);
|
||||
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null, null);
|
||||
Map<String, Transport.Connection> lookup = new HashMap<>();
|
||||
Map<ShardId, Boolean> seenShard = new ConcurrentHashMap<>();
|
||||
lookup.put(primaryNode.getId(), new MockConnection(primaryNode));
|
||||
|
@ -173,7 +173,7 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
GroupShardsIterator<SearchShardIterator> shardsIter = getShardsIter("idx",
|
||||
new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()),
|
||||
10, randomBoolean(), primaryNode, replicaNode);
|
||||
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null);
|
||||
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null, null);
|
||||
Map<String, Transport.Connection> lookup = new HashMap<>();
|
||||
Map<ShardId, Boolean> seenShard = new ConcurrentHashMap<>();
|
||||
lookup.put(primaryNode.getId(), new MockConnection(primaryNode));
|
||||
|
@ -271,7 +271,7 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()),
|
||||
randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode);
|
||||
AtomicInteger numFreedContext = new AtomicInteger();
|
||||
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null) {
|
||||
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null, null) {
|
||||
@Override
|
||||
public void sendFreeContext(Transport.Connection connection, long contextId, OriginalIndices originalIndices) {
|
||||
numFreedContext.incrementAndGet();
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.node;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
public class ResponseCollectorServiceTests extends ESTestCase {
|
||||
|
||||
private ClusterService clusterService;
|
||||
private ResponseCollectorService collector;
|
||||
private ThreadPool threadpool;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
threadpool = new TestThreadPool("response_collector_tests");
|
||||
clusterService = new ClusterService(Settings.EMPTY,
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
||||
threadpool);
|
||||
collector = new ResponseCollectorService(Settings.EMPTY, clusterService);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
threadpool.shutdownNow();
|
||||
}
|
||||
|
||||
public void testNodeStats() throws Exception {
|
||||
collector.addNodeStatistics("node1", 1, 100, 10);
|
||||
Map<String, ResponseCollectorService.ComputedNodeStats> nodeStats = collector.getAllNodeStatistics();
|
||||
assertTrue(nodeStats.containsKey("node1"));
|
||||
assertThat(nodeStats.get("node1").queueSize, equalTo(1.0));
|
||||
assertThat(nodeStats.get("node1").responseTime, equalTo(100.0));
|
||||
assertThat(nodeStats.get("node1").serviceTime, equalTo(10.0));
|
||||
}
|
||||
|
||||
/*
|
||||
* Test that concurrently adding values and removing nodes does not cause exceptions
|
||||
*/
|
||||
public void testConcurrentAddingAndRemoving() throws Exception {
|
||||
String[] nodes = new String[] {"a", "b", "c", "d"};
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
Runnable f = () -> {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
fail("should not be interrupted");
|
||||
}
|
||||
for (int i = 0; i < randomIntBetween(100, 200); i++) {
|
||||
if (randomBoolean()) {
|
||||
collector.removeNode(randomFrom(nodes));
|
||||
}
|
||||
collector.addNodeStatistics(randomFrom(nodes), randomIntBetween(1,100), randomIntBetween(1,100), randomIntBetween(1,100));
|
||||
}
|
||||
};
|
||||
|
||||
Thread t1 = new Thread(f);
|
||||
Thread t2 = new Thread(f);
|
||||
Thread t3 = new Thread(f);
|
||||
Thread t4 = new Thread(f);
|
||||
|
||||
t1.start();
|
||||
t2.start();
|
||||
t3.start();
|
||||
t4.start();
|
||||
latch.countDown();
|
||||
t1.join();
|
||||
t2.join();
|
||||
t3.join();
|
||||
t4.join();
|
||||
|
||||
final Map<String, ResponseCollectorService.ComputedNodeStats> nodeStats = collector.getAllNodeStatistics();
|
||||
logger.info("--> got stats: {}", nodeStats);
|
||||
for (String nodeId : nodes) {
|
||||
if (nodeStats.containsKey(nodeId)) {
|
||||
assertThat(nodeStats.get(nodeId).queueSize, greaterThan(0.0));
|
||||
assertThat(nodeStats.get(nodeId).responseTime, greaterThan(0.0));
|
||||
assertThat(nodeStats.get(nodeId).serviceTime, greaterThan(0.0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testNodeRemoval() throws Exception {
|
||||
collector.addNodeStatistics("node1", randomIntBetween(1,100), randomIntBetween(1,100), randomIntBetween(1,100));
|
||||
collector.addNodeStatistics("node2", randomIntBetween(1,100), randomIntBetween(1,100), randomIntBetween(1,100));
|
||||
|
||||
ClusterState previousState = ClusterState.builder(new ClusterName("cluster")).nodes(DiscoveryNodes.builder()
|
||||
.add(DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9200), "node1"))
|
||||
.add(DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9201), "node2")))
|
||||
.build();
|
||||
ClusterState newState = ClusterState.builder(previousState).nodes(DiscoveryNodes.builder(previousState.nodes())
|
||||
.remove("node2")).build();
|
||||
ClusterChangedEvent event = new ClusterChangedEvent("test", newState, previousState);
|
||||
|
||||
collector.clusterChanged(event);
|
||||
final Map<String, ResponseCollectorService.ComputedNodeStats> nodeStats = collector.getAllNodeStatistics();
|
||||
assertTrue(nodeStats.containsKey("node1"));
|
||||
assertFalse(nodeStats.containsKey("node2"));
|
||||
}
|
||||
}
|
|
@ -49,11 +49,13 @@ import org.apache.lucene.search.TotalHitCountCollector;
|
|||
import org.apache.lucene.search.Weight;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.elasticsearch.action.search.SearchTask;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.ParsedQuery;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||
import org.elasticsearch.search.DocValueFormat;
|
||||
import org.elasticsearch.search.internal.ScrollContext;
|
||||
import org.elasticsearch.search.sort.SortAndFormats;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.TestSearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -62,14 +64,35 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
|
||||
import static org.hamcrest.Matchers.anyOf;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class QueryPhaseTests extends ESTestCase {
|
||||
public class QueryPhaseTests extends IndexShardTestCase {
|
||||
|
||||
private IndexShard indexShard;
|
||||
|
||||
@Override
|
||||
public Settings threadPoolSettings() {
|
||||
return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.search.min_queue_size", 10).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
indexShard = newShard(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
closeShards(indexShard);
|
||||
}
|
||||
|
||||
private void countTestCase(Query query, IndexReader reader, boolean shouldCollect) throws Exception {
|
||||
TestSearchContext context = new TestSearchContext(null);
|
||||
TestSearchContext context = new TestSearchContext(null, indexShard);
|
||||
context.parsedQuery(new ParsedQuery(query));
|
||||
context.setSize(0);
|
||||
context.setTask(new SearchTask(123L, "", "", "", null));
|
||||
|
@ -139,7 +162,7 @@ public class QueryPhaseTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testPostFilterDisablesCountOptimization() throws Exception {
|
||||
TestSearchContext context = new TestSearchContext(null);
|
||||
TestSearchContext context = new TestSearchContext(null, indexShard);
|
||||
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
|
||||
context.setSize(0);
|
||||
context.setTask(new SearchTask(123L, "", "", "", null));
|
||||
|
@ -163,7 +186,7 @@ public class QueryPhaseTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testMinScoreDisablesCountOptimization() throws Exception {
|
||||
TestSearchContext context = new TestSearchContext(null);
|
||||
TestSearchContext context = new TestSearchContext(null, indexShard);
|
||||
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
|
||||
context.setSize(0);
|
||||
context.setTask(new SearchTask(123L, "", "", "", null));
|
||||
|
@ -186,6 +209,30 @@ public class QueryPhaseTests extends ESTestCase {
|
|||
assertTrue(collected.get());
|
||||
}
|
||||
|
||||
public void testQueryCapturesThreadPoolStats() throws Exception {
|
||||
TestSearchContext context = new TestSearchContext(null, indexShard);
|
||||
context.setTask(new SearchTask(123L, "", "", "", null));
|
||||
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
|
||||
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||
RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc);
|
||||
final int numDocs = scaledRandomIntBetween(100, 200);
|
||||
for (int i = 0; i < numDocs; ++i) {
|
||||
w.addDocument(new Document());
|
||||
}
|
||||
w.close();
|
||||
IndexReader reader = DirectoryReader.open(dir);
|
||||
IndexSearcher contextSearcher = new IndexSearcher(reader);
|
||||
|
||||
QueryPhase.execute(context, contextSearcher, null);
|
||||
QuerySearchResult results = context.queryResult();
|
||||
assertThat(results.serviceTimeEWMA(), greaterThan(0L));
|
||||
assertThat(results.nodeQueueSize(), greaterThanOrEqualTo(0));
|
||||
reader.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
public void testInOrderScrollOptimization() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
final Sort sort = new Sort(new SortField("rank", SortField.Type.INT));
|
||||
|
@ -206,7 +253,7 @@ public class QueryPhaseTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
|
||||
TestSearchContext context = new TestSearchContext(null);
|
||||
TestSearchContext context = new TestSearchContext(null, indexShard);
|
||||
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
|
||||
ScrollContext scrollContext = new ScrollContext();
|
||||
scrollContext.lastEmittedDoc = null;
|
||||
|
@ -251,7 +298,7 @@ public class QueryPhaseTests extends ESTestCase {
|
|||
w.addDocument(doc);
|
||||
}
|
||||
w.close();
|
||||
TestSearchContext context = new TestSearchContext(null);
|
||||
TestSearchContext context = new TestSearchContext(null, indexShard);
|
||||
context.setTask(new SearchTask(123L, "", "", "", null));
|
||||
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
|
||||
context.terminateAfter(1);
|
||||
|
@ -360,7 +407,7 @@ public class QueryPhaseTests extends ESTestCase {
|
|||
}
|
||||
w.close();
|
||||
|
||||
TestSearchContext context = new TestSearchContext(null);
|
||||
TestSearchContext context = new TestSearchContext(null, indexShard);
|
||||
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
|
||||
context.setSize(1);
|
||||
context.setTask(new SearchTask(123L, "", "", "", null));
|
||||
|
@ -454,7 +501,7 @@ public class QueryPhaseTests extends ESTestCase {
|
|||
}
|
||||
w.close();
|
||||
|
||||
TestSearchContext context = new TestSearchContext(null);
|
||||
TestSearchContext context = new TestSearchContext(null, indexShard);
|
||||
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
|
||||
ScrollContext scrollContext = new ScrollContext();
|
||||
scrollContext.lastEmittedDoc = null;
|
||||
|
|
|
@ -118,7 +118,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
threadPool = new TestThreadPool(getClass().getName());
|
||||
threadPool = new TestThreadPool(getClass().getName(), threadPoolSettings());
|
||||
primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards
|
||||
}
|
||||
|
||||
|
@ -131,6 +131,10 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public Settings threadPoolSettings() {
|
||||
return Settings.EMPTY;
|
||||
}
|
||||
|
||||
private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
|
||||
final ShardId shardId = shardPath.getShardId();
|
||||
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
|
||||
|
|
|
@ -89,9 +89,10 @@ public class MockNode extends Node {
|
|||
@Override
|
||||
protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
|
||||
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
|
||||
FetchPhase fetchPhase) {
|
||||
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) {
|
||||
if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) {
|
||||
return super.newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase);
|
||||
return super.newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase,
|
||||
responseCollectorService);
|
||||
}
|
||||
return new MockSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase);
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ public class MockSearchService extends SearchService {
|
|||
public MockSearchService(ClusterService clusterService,
|
||||
IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService,
|
||||
BigArrays bigArrays, FetchPhase fetchPhase) {
|
||||
super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase);
|
||||
super(clusterService, indicesService, threadPool, scriptService, bigArrays, fetchPhase, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1710,6 +1710,12 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
.put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), nodeOrdinal % 2 == 0)
|
||||
// wait short time for other active shards before actually deleting, default 30s not needed in tests
|
||||
.put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS));
|
||||
if (rarely()) {
|
||||
// Sometimes adjust the minimum search thread pool size, causing
|
||||
// QueueResizingEsThreadPoolExecutor to be used instead of a regular
|
||||
// fixed thread pool
|
||||
builder.put("thread_pool.search.min_queue_size", 100);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -24,8 +24,10 @@ import org.apache.lucene.search.Query;
|
|||
import org.apache.lucene.util.Counter;
|
||||
import org.elasticsearch.action.search.SearchTask;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
|
@ -106,12 +108,16 @@ public class TestSearchContext extends SearchContext {
|
|||
}
|
||||
|
||||
public TestSearchContext(QueryShardContext queryShardContext) {
|
||||
this(queryShardContext, null);
|
||||
}
|
||||
|
||||
public TestSearchContext(QueryShardContext queryShardContext, IndexShard indexShard) {
|
||||
this.bigArrays = null;
|
||||
this.indexService = null;
|
||||
this.indexFieldDataService = null;
|
||||
this.threadPool = null;
|
||||
this.fixedBitSetFilterCache = null;
|
||||
this.indexShard = null;
|
||||
this.indexShard = indexShard;
|
||||
this.queryShardContext = queryShardContext;
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,11 @@ import org.elasticsearch.node.Node;
|
|||
public class TestThreadPool extends ThreadPool {
|
||||
|
||||
public TestThreadPool(String name) {
|
||||
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).build());
|
||||
this(name, Settings.EMPTY);
|
||||
}
|
||||
|
||||
public TestThreadPool(String name, Settings settings) {
|
||||
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue