Search: Add search type `scan` allowing to efficiently scan large result set, closes #707.

This commit is contained in:
kimchy 2011-02-22 00:11:31 +02:00
parent 65ca857027
commit 818f3b4d75
19 changed files with 827 additions and 12 deletions

View File

@ -125,8 +125,10 @@ public class TransportActionModule extends AbstractModule {
bind(TransportSearchQueryThenFetchAction.class).asEagerSingleton(); bind(TransportSearchQueryThenFetchAction.class).asEagerSingleton();
bind(TransportSearchDfsQueryAndFetchAction.class).asEagerSingleton(); bind(TransportSearchDfsQueryAndFetchAction.class).asEagerSingleton();
bind(TransportSearchQueryAndFetchAction.class).asEagerSingleton(); bind(TransportSearchQueryAndFetchAction.class).asEagerSingleton();
bind(TransportSearchScanAction.class).asEagerSingleton();
bind(TransportSearchAction.class).asEagerSingleton(); bind(TransportSearchAction.class).asEagerSingleton();
bind(TransportSearchScrollScanAction.class).asEagerSingleton();
bind(TransportSearchScrollQueryThenFetchAction.class).asEagerSingleton(); bind(TransportSearchScrollQueryThenFetchAction.class).asEagerSingleton();
bind(TransportSearchScrollQueryAndFetchAction.class).asEagerSingleton(); bind(TransportSearchScrollQueryAndFetchAction.class).asEagerSingleton();
bind(TransportSearchScrollAction.class).asEagerSingleton(); bind(TransportSearchScrollAction.class).asEagerSingleton();

View File

@ -41,7 +41,7 @@ public class SearchScrollRequest implements ActionRequest {
private Scroll scroll; private Scroll scroll;
private boolean listenerThreaded = false; private boolean listenerThreaded = false;
private SearchOperationThreading operationThreading = SearchOperationThreading.SINGLE_THREAD; private SearchOperationThreading operationThreading = SearchOperationThreading.THREAD_PER_SHARD;
public SearchScrollRequest() { public SearchScrollRequest() {
} }

View File

@ -49,7 +49,12 @@ public enum SearchType {
* and return the results. Each shard returns size results. Since each shard already returns size hits, this * and return the results. Each shard returns size results. Since each shard already returns size hits, this
* type actually returns size times number of shards results back to the caller. * type actually returns size times number of shards results back to the caller.
*/ */
QUERY_AND_FETCH((byte) 3); QUERY_AND_FETCH((byte) 3),
/**
* Performs scanning of the results which executes the search without any sorting.
* It will automatically start scrolling the result set.
*/
SCAN((byte) 4);
/** /**
* The default search type ({@link #QUERY_THEN_FETCH}. * The default search type ({@link #QUERY_THEN_FETCH}.
@ -81,6 +86,8 @@ public enum SearchType {
return DFS_QUERY_AND_FETCH; return DFS_QUERY_AND_FETCH;
} else if (id == 3) { } else if (id == 3) {
return QUERY_AND_FETCH; return QUERY_AND_FETCH;
} else if (id == 4) {
return SCAN;
} else { } else {
throw new ElasticSearchIllegalArgumentException("No search type for [" + id + "]"); throw new ElasticSearchIllegalArgumentException("No search type for [" + id + "]");
} }
@ -89,7 +96,7 @@ public enum SearchType {
/** /**
* The a string representation search type to execute, defaults to {@link SearchType#DEFAULT}. Can be * The a string representation search type to execute, defaults to {@link SearchType#DEFAULT}. Can be
* one of "dfs_query_then_fetch"/"dfsQueryThenFetch", "dfs_query_and_fetch"/"dfsQueryAndFetch", * one of "dfs_query_then_fetch"/"dfsQueryThenFetch", "dfs_query_and_fetch"/"dfsQueryAndFetch",
* "query_then_fetch"/"queryThenFetch", and "query_and_fetch"/"queryAndFetch". * "query_then_fetch"/"queryThenFetch", "query_and_fetch"/"queryAndFetch", and "scan".
*/ */
public static SearchType fromString(String searchType) throws ElasticSearchIllegalArgumentException { public static SearchType fromString(String searchType) throws ElasticSearchIllegalArgumentException {
if (searchType == null) { if (searchType == null) {
@ -103,6 +110,8 @@ public enum SearchType {
return SearchType.QUERY_THEN_FETCH; return SearchType.QUERY_THEN_FETCH;
} else if ("query_and_fetch".equals(searchType)) { } else if ("query_and_fetch".equals(searchType)) {
return SearchType.QUERY_AND_FETCH; return SearchType.QUERY_AND_FETCH;
} else if ("scan".equals(searchType)) {
return SearchType.SCAN;
} else { } else {
throw new ElasticSearchIllegalArgumentException("No search type for [" + searchType + "]"); throw new ElasticSearchIllegalArgumentException("No search type for [" + searchType + "]");
} }

View File

@ -21,10 +21,7 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.search.type.TransportSearchDfsQueryAndFetchAction; import org.elasticsearch.action.search.type.*;
import org.elasticsearch.action.search.type.TransportSearchDfsQueryThenFetchAction;
import org.elasticsearch.action.search.type.TransportSearchQueryAndFetchAction;
import org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction;
import org.elasticsearch.action.support.BaseAction; import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -54,6 +51,8 @@ public class TransportSearchAction extends BaseAction<SearchRequest, SearchRespo
private final TransportSearchQueryAndFetchAction queryAndFetchAction; private final TransportSearchQueryAndFetchAction queryAndFetchAction;
private final TransportSearchScanAction scanAction;
private final boolean optimizeSingleShard; private final boolean optimizeSingleShard;
@Inject public TransportSearchAction(Settings settings, ThreadPool threadPool, @Inject public TransportSearchAction(Settings settings, ThreadPool threadPool,
@ -61,13 +60,15 @@ public class TransportSearchAction extends BaseAction<SearchRequest, SearchRespo
TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction, TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction,
TransportSearchQueryThenFetchAction queryThenFetchAction, TransportSearchQueryThenFetchAction queryThenFetchAction,
TransportSearchDfsQueryAndFetchAction dfsQueryAndFetchAction, TransportSearchDfsQueryAndFetchAction dfsQueryAndFetchAction,
TransportSearchQueryAndFetchAction queryAndFetchAction) { TransportSearchQueryAndFetchAction queryAndFetchAction,
TransportSearchScanAction scanAction) {
super(settings, threadPool); super(settings, threadPool);
this.clusterService = clusterService; this.clusterService = clusterService;
this.dfsQueryThenFetchAction = dfsQueryThenFetchAction; this.dfsQueryThenFetchAction = dfsQueryThenFetchAction;
this.queryThenFetchAction = queryThenFetchAction; this.queryThenFetchAction = queryThenFetchAction;
this.dfsQueryAndFetchAction = dfsQueryAndFetchAction; this.dfsQueryAndFetchAction = dfsQueryAndFetchAction;
this.queryAndFetchAction = queryAndFetchAction; this.queryAndFetchAction = queryAndFetchAction;
this.scanAction = scanAction;
this.optimizeSingleShard = componentSettings.getAsBoolean("optimize_single_shard", true); this.optimizeSingleShard = componentSettings.getAsBoolean("optimize_single_shard", true);
@ -76,7 +77,7 @@ public class TransportSearchAction extends BaseAction<SearchRequest, SearchRespo
@Override protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) { @Override protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
// optimize search type for cases where there is only one shard group to search on // optimize search type for cases where there is only one shard group to search on
if (optimizeSingleShard) { if (optimizeSingleShard && searchRequest.searchType() != SCAN) {
try { try {
ClusterState clusterState = clusterService.state(); ClusterState clusterState = clusterService.state();
searchRequest.indices(clusterState.metaData().concreteIndices(searchRequest.indices())); searchRequest.indices(clusterState.metaData().concreteIndices(searchRequest.indices()));
@ -101,6 +102,8 @@ public class TransportSearchAction extends BaseAction<SearchRequest, SearchRespo
dfsQueryAndFetchAction.execute(searchRequest, listener); dfsQueryAndFetchAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.QUERY_AND_FETCH) { } else if (searchRequest.searchType() == SearchType.QUERY_AND_FETCH) {
queryAndFetchAction.execute(searchRequest, listener); queryAndFetchAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.SCAN) {
scanAction.execute(searchRequest, listener);
} }
} }

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.search.type.ParsedScrollId; import org.elasticsearch.action.search.type.ParsedScrollId;
import org.elasticsearch.action.search.type.TransportSearchScrollQueryAndFetchAction; import org.elasticsearch.action.search.type.TransportSearchScrollQueryAndFetchAction;
import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchAction; import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchAction;
import org.elasticsearch.action.search.type.TransportSearchScrollScanAction;
import org.elasticsearch.action.support.BaseAction; import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -45,12 +46,16 @@ public class TransportSearchScrollAction extends BaseAction<SearchScrollRequest,
private final TransportSearchScrollQueryAndFetchAction queryAndFetchAction; private final TransportSearchScrollQueryAndFetchAction queryAndFetchAction;
private final TransportSearchScrollScanAction scanAction;
@Inject public TransportSearchScrollAction(Settings settings, ThreadPool threadPool, TransportService transportService, @Inject public TransportSearchScrollAction(Settings settings, ThreadPool threadPool, TransportService transportService,
TransportSearchScrollQueryThenFetchAction queryThenFetchAction, TransportSearchScrollQueryThenFetchAction queryThenFetchAction,
TransportSearchScrollQueryAndFetchAction queryAndFetchAction) { TransportSearchScrollQueryAndFetchAction queryAndFetchAction,
TransportSearchScrollScanAction scanAction) {
super(settings, threadPool); super(settings, threadPool);
this.queryThenFetchAction = queryThenFetchAction; this.queryThenFetchAction = queryThenFetchAction;
this.queryAndFetchAction = queryAndFetchAction; this.queryAndFetchAction = queryAndFetchAction;
this.scanAction = scanAction;
transportService.registerHandler(TransportActions.SEARCH_SCROLL, new TransportHandler()); transportService.registerHandler(TransportActions.SEARCH_SCROLL, new TransportHandler());
} }
@ -62,6 +67,8 @@ public class TransportSearchScrollAction extends BaseAction<SearchScrollRequest,
queryThenFetchAction.execute(request, scrollId, listener); queryThenFetchAction.execute(request, scrollId, listener);
} else if (scrollId.type().equals(QUERY_AND_FETCH_TYPE)) { } else if (scrollId.type().equals(QUERY_AND_FETCH_TYPE)) {
queryAndFetchAction.execute(request, scrollId, listener); queryAndFetchAction.execute(request, scrollId, listener);
} else if (scrollId.type().equals(SCAN)) {
scanAction.execute(request, scrollId, listener);
} else { } else {
throw new ElasticSearchIllegalArgumentException("Scroll id type [" + scrollId.type() + "] unrecognized"); throw new ElasticSearchIllegalArgumentException("Scroll id type [" + scrollId.type() + "] unrecognized");
} }

View File

@ -30,6 +30,8 @@ public class ParsedScrollId {
public static final String QUERY_AND_FETCH_TYPE = "queryAndFetch"; public static final String QUERY_AND_FETCH_TYPE = "queryAndFetch";
public static final String SCAN = "scan";
private final String source; private final String source;
private final String type; private final String type;

View File

@ -84,6 +84,8 @@ public abstract class TransportSearchHelper {
return buildScrollId(ParsedScrollId.QUERY_THEN_FETCH_TYPE, searchPhaseResults); return buildScrollId(ParsedScrollId.QUERY_THEN_FETCH_TYPE, searchPhaseResults);
} else if (searchType == SearchType.QUERY_AND_FETCH || searchType == SearchType.DFS_QUERY_AND_FETCH) { } else if (searchType == SearchType.QUERY_AND_FETCH || searchType == SearchType.DFS_QUERY_AND_FETCH) {
return buildScrollId(ParsedScrollId.QUERY_AND_FETCH_TYPE, searchPhaseResults); return buildScrollId(ParsedScrollId.QUERY_AND_FETCH_TYPE, searchPhaseResults);
} else if (searchType == SearchType.SCAN) {
return buildScrollId(ParsedScrollId.SCAN, searchPhaseResults);
} else { } else {
throw new ElasticSearchIllegalStateException(); throw new ElasticSearchIllegalStateException();
} }

View File

@ -0,0 +1,87 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search.type;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.scan.ScanSearchResult;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
public class TransportSearchScanAction extends TransportSearchTypeAction {
@Inject public TransportSearchScanAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache transportSearchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings, threadPool, clusterService, transportSearchCache, searchService, searchPhaseController);
}
@Override protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
new AsyncAction(searchRequest, listener).start();
}
private class AsyncAction extends BaseAsyncAction<ScanSearchResult> {
private final Map<SearchShardTarget, ScanSearchResult> scanResults = ConcurrentCollections.newConcurrentMap();
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener);
}
@Override protected String firstPhaseName() {
return "init_scan";
}
@Override protected void sendExecuteFirstPhase(DiscoveryNode node, InternalSearchRequest request, SearchServiceListener<ScanSearchResult> listener) {
searchService.sendExecuteScan(node, request, listener);
}
@Override protected void processFirstPhaseResult(ShardRouting shard, ScanSearchResult result) {
scanResults.put(result.shardTarget(), result);
}
@Override protected void moveToSecondPhase() throws Exception {
long totalHits = 0;
for (ScanSearchResult scanSearchResult : scanResults.values()) {
totalHits += scanSearchResult.totalHits();
}
final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, totalHits, 0.0f), null, false);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), scanResults.values());
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
}

View File

@ -105,6 +105,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
public void start() { public void start() {
if (scrollId.values().length == 0) { if (scrollId.values().length == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null)); listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null));
return;
} }
int localOperations = 0; int localOperations = 0;

View File

@ -110,6 +110,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
public void start() { public void start() {
if (scrollId.values().length == 0) { if (scrollId.values().length == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null)); listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null));
return;
} }
final AtomicInteger counter = new AtomicInteger(scrollId.values().length); final AtomicInteger counter = new AtomicInteger(scrollId.values().length);

View File

@ -0,0 +1,246 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search.type;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.controller.ShardDoc;
import org.elasticsearch.search.controller.ShardScoreDoc;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.*;
/**
* @author kimchy (shay.banon)
*/
public class TransportSearchScrollScanAction extends AbstractComponent {
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final SearchServiceTransportAction searchService;
private final SearchPhaseController searchPhaseController;
private final TransportSearchCache searchCache;
@Inject public TransportSearchScrollScanAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache searchCache,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.searchCache = searchCache;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
}
public void execute(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
new AsyncAction(request, scrollId, listener).start();
}
private class AsyncAction {
private final SearchScrollRequest request;
private final ActionListener<SearchResponse> listener;
private final ParsedScrollId scrollId;
private final DiscoveryNodes nodes;
protected final Collection<ShardSearchFailure> shardFailures = searchCache.obtainShardFailures();
private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults();
private final AtomicInteger successfulOps;
private final AtomicInteger counter;
private final long startTime = System.currentTimeMillis();
private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
this.request = request;
this.listener = listener;
this.scrollId = scrollId;
this.nodes = clusterService.state().nodes();
this.successfulOps = new AtomicInteger(scrollId.values().length);
this.counter = new AtomicInteger(scrollId.values().length);
}
public void start() {
if (scrollId.values().length == 0) {
final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, 0, 0.0f), null, false);
searchCache.releaseQueryFetchResults(queryFetchResults);
listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, TransportSearchHelper.buildShardFailures(shardFailures, searchCache)));
return;
}
int localOperations = 0;
for (Tuple<String, Long> target : scrollId.values()) {
DiscoveryNode node = nodes.get(target.v1());
if (node != null) {
if (nodes.localNodeId().equals(node.id())) {
localOperations++;
} else {
executePhase(node, target.v2());
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.source() + "]");
}
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
}
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
for (Tuple<String, Long> target : scrollId.values()) {
DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
executePhase(node, target.v2());
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final Tuple<String, Long> target : scrollId.values()) {
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
executePhase(node, target.v2());
}
});
} else {
executePhase(node, target.v2());
}
}
}
}
}
for (Tuple<String, Long> target : scrollId.values()) {
DiscoveryNode node = nodes.get(target.v1());
if (node == null) {
if (logger.isDebugEnabled()) {
logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.source() + "]");
}
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
} else {
}
}
}
private void executePhase(DiscoveryNode node, final long searchId) {
searchService.sendExecuteScan(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QueryFetchSearchResult>() {
@Override public void onResult(QueryFetchSearchResult result) {
queryFetchResults.put(result.shardTarget(), result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
shardFailures.add(new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
});
}
private void finishHim() {
try {
innerFinishHim();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache)));
}
}
private void innerFinishHim() throws IOException {
int numberOfHits = 0;
for (QueryFetchSearchResult shardResult : queryFetchResults.values()) {
numberOfHits += shardResult.queryResult().topDocs().scoreDocs.length;
}
ShardDoc[] docs = new ShardDoc[numberOfHits];
int counter = 0;
for (QueryFetchSearchResult shardResult : queryFetchResults.values()) {
ScoreDoc[] scoreDocs = shardResult.queryResult().topDocs().scoreDocs;
for (ScoreDoc scoreDoc : scoreDocs) {
docs[counter++] = new ShardScoreDoc(shardResult.shardTarget(), scoreDoc.doc, 0.0f);
}
}
final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults);
for (QueryFetchSearchResult shardResult : queryFetchResults.values()) {
if (shardResult.queryResult().topDocs().scoreDocs.length < shardResult.queryResult().size()) {
// we found more than we want for this round, remove this from our scrolling
queryFetchResults.remove(shardResult.shardTarget());
}
}
String scrollId = null;
if (request.scroll() != null) {
// we rebuild the scroll id since we remove shards that we finished scrolling on
scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values());
}
searchCache.releaseQueryFetchResults(queryFetchResults);
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache)));
}
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -49,6 +50,7 @@ import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchRequest; import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.query.*; import org.elasticsearch.search.query.*;
import org.elasticsearch.search.scan.ScanSearchResult;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
@ -161,6 +163,56 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
} }
} }
public ScanSearchResult executeScan(InternalSearchRequest request) throws ElasticSearchException {
SearchContext context = createContext(request);
activeContexts.put(context.id(), context);
contextProcessing(context);
try {
if (context.scroll() == null) {
throw new ElasticSearchException("Scroll must be provided when scanning...");
}
Lucene.CountCollector countCollector = new Lucene.CountCollector(-1);
context.searcher().search(context.query(), countCollector);
contextProcessedSuccessfully(context);
return new ScanSearchResult(context.id(), countCollector.count());
} catch (RuntimeException e) {
freeContext(context);
throw e;
} catch (IOException e) {
freeContext(context);
throw new QueryPhaseExecutionException(context, "failed to execute initial scan query", e);
} finally {
cleanContext(context);
}
}
public ScrollQueryFetchSearchResult executeScan(InternalScrollSearchRequest request) throws ElasticSearchException {
SearchContext context = findContext(request.id());
contextProcessing(context);
try {
processScroll(request, context);
if (!context.scanning()) {
// first scanning, reset the from to 0
context.scanning(true);
context.from(0);
}
queryPhase.execute(context);
shortcutDocIdsToLoadForScanning(context);
fetchPhase.execute(context);
if (context.scroll() == null || context.fetchResult().hits().hits().length < context.size()) {
freeContext(request.id());
} else {
contextProcessedSuccessfully(context);
}
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
} catch (RuntimeException e) {
freeContext(context);
throw e;
} finally {
cleanContext(context);
}
}
public QuerySearchResult executeQueryPhase(InternalSearchRequest request) throws ElasticSearchException { public QuerySearchResult executeQueryPhase(InternalSearchRequest request) throws ElasticSearchException {
SearchContext context = createContext(request); SearchContext context = createContext(request);
activeContexts.put(context.id(), context); activeContexts.put(context.id(), context);
@ -444,6 +496,20 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
context.docIdsToLoad(docIdsToLoad, 0, counter); context.docIdsToLoad(docIdsToLoad, 0, counter);
} }
private void shortcutDocIdsToLoadForScanning(SearchContext context) {
TopDocs topDocs = context.queryResult().topDocs();
if (topDocs.scoreDocs.length == 0) {
// no more docs...
context.docIdsToLoad(EMPTY_DOC_IDS, 0, 0);
return;
}
int[] docIdsToLoad = new int[topDocs.scoreDocs.length];
for (int i = 0; i < docIdsToLoad.length; i++) {
docIdsToLoad[i] = topDocs.scoreDocs[i].doc;
}
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
}
private void processScroll(InternalScrollSearchRequest request, SearchContext context) { private void processScroll(InternalScrollSearchRequest request, SearchContext context) {
// process scroll // process scroll
context.from(context.from() + context.size()); context.from(context.from() + context.size());

View File

@ -38,6 +38,7 @@ import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult; import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.search.scan.ScanSearchResult;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
@ -86,6 +87,8 @@ public class SearchServiceTransportAction extends AbstractComponent {
transportService.registerHandler(SearchQueryQueryFetchTransportHandler.ACTION, new SearchQueryQueryFetchTransportHandler()); transportService.registerHandler(SearchQueryQueryFetchTransportHandler.ACTION, new SearchQueryQueryFetchTransportHandler());
transportService.registerHandler(SearchQueryFetchScrollTransportHandler.ACTION, new SearchQueryFetchScrollTransportHandler()); transportService.registerHandler(SearchQueryFetchScrollTransportHandler.ACTION, new SearchQueryFetchScrollTransportHandler());
transportService.registerHandler(SearchFetchByIdTransportHandler.ACTION, new SearchFetchByIdTransportHandler()); transportService.registerHandler(SearchFetchByIdTransportHandler.ACTION, new SearchFetchByIdTransportHandler());
transportService.registerHandler(SearchScanTransportHandler.ACTION, new SearchScanTransportHandler());
transportService.registerHandler(SearchScanScrollTransportHandler.ACTION, new SearchScanScrollTransportHandler());
} }
public void sendFreeContext(DiscoveryNode node, final long contextId) { public void sendFreeContext(DiscoveryNode node, final long contextId) {
@ -336,6 +339,66 @@ public class SearchServiceTransportAction extends AbstractComponent {
} }
} }
public void sendExecuteScan(DiscoveryNode node, final InternalSearchRequest request, final SearchServiceListener<ScanSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
ScanSearchResult result = searchService.executeScan(request);
listener.onResult(result);
} catch (Exception e) {
listener.onFailure(e);
}
} else {
transportService.sendRequest(node, SearchScanTransportHandler.ACTION, request, new BaseTransportResponseHandler<ScanSearchResult>() {
@Override public ScanSearchResult newInstance() {
return new ScanSearchResult();
}
@Override public void handleResponse(ScanSearchResult response) {
listener.onResult(response);
}
@Override public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
});
}
}
public void sendExecuteScan(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener<QueryFetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
ScrollQueryFetchSearchResult result = searchService.executeScan(request);
listener.onResult(result.result());
} catch (Exception e) {
listener.onFailure(e);
}
} else {
transportService.sendRequest(node, SearchScanScrollTransportHandler.ACTION, request, new BaseTransportResponseHandler<ScrollQueryFetchSearchResult>() {
@Override public ScrollQueryFetchSearchResult newInstance() {
return new ScrollQueryFetchSearchResult();
}
@Override public void handleResponse(ScrollQueryFetchSearchResult response) {
listener.onResult(response.result());
}
@Override public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
});
}
}
private class SearchFreeContextTransportHandler extends BaseTransportRequestHandler<LongStreamable> { private class SearchFreeContextTransportHandler extends BaseTransportRequestHandler<LongStreamable> {
static final String ACTION = "search/freeContext"; static final String ACTION = "search/freeContext";
@ -498,4 +561,40 @@ public class SearchServiceTransportAction extends AbstractComponent {
return ThreadPool.Names.SEARCH; return ThreadPool.Names.SEARCH;
} }
} }
private class SearchScanTransportHandler extends BaseTransportRequestHandler<InternalSearchRequest> {
static final String ACTION = "search/phase/scan";
@Override public InternalSearchRequest newInstance() {
return new InternalSearchRequest();
}
@Override public void messageReceived(InternalSearchRequest request, TransportChannel channel) throws Exception {
ScanSearchResult result = searchService.executeScan(request);
channel.sendResponse(result);
}
@Override public String executor() {
return ThreadPool.Names.SEARCH;
}
}
private class SearchScanScrollTransportHandler extends BaseTransportRequestHandler<InternalScrollSearchRequest> {
static final String ACTION = "search/phase/scan/scroll";
@Override public InternalScrollSearchRequest newInstance() {
return new InternalScrollSearchRequest();
}
@Override public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel) throws Exception {
ScrollQueryFetchSearchResult result = searchService.executeScan(request);
channel.sendResponse(result);
}
@Override public String executor() {
return ThreadPool.Names.SEARCH;
}
}
} }

View File

@ -91,7 +91,7 @@ public class InternalSearchHits implements SearchHits {
} }
private static final InternalSearchHit[] EMPTY = new InternalSearchHit[0]; public static final InternalSearchHit[] EMPTY = new InternalSearchHit[0];
private InternalSearchHit[] hits; private InternalSearchHit[] hits;

View File

@ -96,6 +96,8 @@ public class SearchContext implements Releasable {
private float queryBoost = 1.0f; private float queryBoost = 1.0f;
private boolean scanning = false;
private Scroll scroll; private Scroll scroll;
private boolean explain; private boolean explain;
@ -289,6 +291,15 @@ public class SearchContext implements Releasable {
return timeout; return timeout;
} }
public SearchContext scanning(boolean scanning) {
this.scanning = scanning;
return this;
}
public boolean scanning() {
return this.scanning;
}
public SearchContext sort(Sort sort) { public SearchContext sort(Sort sort) {
this.sort = sort; this.sort = sort;
return this; return this;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.query; package org.elasticsearch.search.query;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.*; import org.apache.lucene.search.*;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -35,6 +36,8 @@ import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortParseElement; import org.elasticsearch.search.sort.SortParseElement;
import org.elasticsearch.search.sort.TrackScoresParseElement; import org.elasticsearch.search.sort.TrackScoresParseElement;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map; import java.util.Map;
/** /**
@ -184,7 +187,15 @@ public class QueryPhase implements SearchPhase {
} }
} }
if (sort) { if (searchContext.scanning()) {
ScanCollector scanCollector = new ScanCollector(searchContext.from(), searchContext.size());
try {
searchContext.searcher().search(query, scanCollector);
} catch (ScanCollector.StopCollectingException e) {
// all is well
}
topDocs = scanCollector.topDocs();
} else if (sort) {
topDocs = searchContext.searcher().search(query, null, numDocs, searchContext.sort()); topDocs = searchContext.searcher().search(query, null, numDocs, searchContext.sort());
} else { } else {
topDocs = searchContext.searcher().search(query, numDocs); topDocs = searchContext.searcher().search(query, numDocs);
@ -198,4 +209,56 @@ public class QueryPhase implements SearchPhase {
facetPhase.execute(searchContext); facetPhase.execute(searchContext);
} }
static class ScanCollector extends Collector {
private final int from;
private final int to;
private final ArrayList<ScoreDoc> docs;
private int docBase;
private int counter;
ScanCollector(int from, int size) {
this.from = from;
this.to = from + size;
this.docs = new ArrayList<ScoreDoc>(size);
}
public TopDocs topDocs() {
return new TopDocs(docs.size(), docs.toArray(new ScoreDoc[docs.size()]), 0f);
}
@Override public void setScorer(Scorer scorer) throws IOException {
}
@Override public void collect(int doc) throws IOException {
if (counter >= from) {
docs.add(new ScoreDoc(docBase + doc, 0f));
}
counter++;
if (counter >= to) {
throw StopCollectingException;
}
}
@Override public void setNextReader(IndexReader reader, int docBase) throws IOException {
this.docBase = docBase;
}
@Override public boolean acceptsDocsOutOfOrder() {
return true;
}
public static final RuntimeException StopCollectingException = new StopCollectingException();
static class StopCollectingException extends RuntimeException {
@Override public Throwable fillInStackTrace() {
return null;
}
}
}
} }

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.scan;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import java.io.IOException;
/**
* The initial scan search result, including the count of hits matching the provided query.
*/
public class ScanSearchResult implements Streamable, SearchPhaseResult {
private long id;
private long totalHits;
private SearchShardTarget shardTarget;
public ScanSearchResult() {
}
public ScanSearchResult(long id, long totalHits) {
this.id = id;
this.totalHits = totalHits;
}
public long id() {
return this.id;
}
public long totalHits() {
return this.totalHits;
}
@Override public SearchShardTarget shardTarget() {
return shardTarget;
}
@Override public void shardTarget(SearchShardTarget shardTarget) {
this.shardTarget = shardTarget;
}
@Override public void readFrom(StreamInput in) throws IOException {
id = in.readLong();
totalHits = in.readVLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeLong(id);
out.writeVLong(totalHits);
}
}

View File

@ -36,10 +36,12 @@ import org.elasticsearch.search.facet.query.QueryFacet;
import org.elasticsearch.search.fetch.FetchSearchRequest; import org.elasticsearch.search.fetch.FetchSearchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchRequest; import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.scan.ScanSearchResult;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.integration.AbstractNodesTests; import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass; import org.testng.annotations.AfterClass;
@ -124,6 +126,44 @@ public class SingleShardEmbeddedSearchTests extends AbstractNodesTests {
assertThat(queryFetchResult.fetchResult().hits().hits()[0].type(), equalTo("type1")); assertThat(queryFetchResult.fetchResult().hits().hits()[0].type(), equalTo("type1"));
} }
@Test public void testScan() throws Exception {
Scroll scroll = new Scroll(TimeValue.timeValueMillis(500));
ScanSearchResult scanResult = searchService.executeScan(searchRequest(searchSource().query(matchAllQuery()).size(2)).scroll(scroll));
assertThat(scanResult.totalHits(), equalTo(5l));
Set<String> idsLoaded = Sets.newHashSet();
// start scrolling
FetchSearchResult fetchResult = searchService.executeScan(new InternalScrollSearchRequest(scanResult.id()).scroll(scroll)).result().fetchResult();
assertThat(fetchResult.hits().hits().length, equalTo(2));
for (SearchHit hit : fetchResult.hits()) {
idsLoaded.add(hit.id());
}
// and again...
fetchResult = searchService.executeScan(new InternalScrollSearchRequest(scanResult.id()).scroll(scroll)).result().fetchResult();
assertThat(fetchResult.hits().hits().length, equalTo(2));
for (SearchHit hit : fetchResult.hits()) {
idsLoaded.add(hit.id());
}
fetchResult = searchService.executeScan(new InternalScrollSearchRequest(scanResult.id()).scroll(scroll)).result().fetchResult();
assertThat(fetchResult.hits().hits().length, equalTo(1));
for (SearchHit hit : fetchResult.hits()) {
idsLoaded.add(hit.id());
}
Set<String> expectedIds = Sets.newHashSet("1", "2", "3", "4", "5");
assertThat(idsLoaded, equalTo(expectedIds));
// do another one for fun, should be expired context
try {
searchService.executeScan(new InternalScrollSearchRequest(scanResult.id()).scroll(scroll)).result().fetchResult();
assert false;
} catch (SearchContextMissingException e) {
// ignore
}
}
@Test public void testQueryThenFetch() throws Exception { @Test public void testQueryThenFetch() throws Exception {
QuerySearchResult queryResult = searchService.executeQueryPhase(searchRequest(searchSource().query(termQuery("name", "test1")))); QuerySearchResult queryResult = searchService.executeQueryPhase(searchRequest(searchSource().query(termQuery("name", "test1"))));
assertThat(queryResult.topDocs().totalHits, equalTo(1)); assertThat(queryResult.topDocs().totalHits, equalTo(1));

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.search.scan;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.Set;
import static org.elasticsearch.index.query.xcontent.QueryBuilders.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
public class SearchScanTests extends AbstractNodesTests {
private Client client;
@BeforeClass public void createNodes() throws Exception {
startNode("node1");
startNode("node2");
client = getClient();
}
@AfterClass public void closeNodes() {
client.close();
closeAllNodes();
}
protected Client getClient() {
return client("node1");
}
@Test public void testSimpleScroll() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 3)).execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
Set<String> ids = Sets.newHashSet();
Set<String> expectedIds = Sets.newHashSet();
for (int i = 0; i < 100; i++) {
String id = Integer.toString(i);
expectedIds.add(id);
client.prepareIndex("test", "type1", id).setSource("field", i).execute().actionGet();
}
client.admin().indices().prepareRefresh().execute().actionGet();
SearchResponse searchResponse = client.prepareSearch()
.setSearchType(SearchType.SCAN)
.setQuery(matchAllQuery())
.setSize(7)
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse.hits().totalHits(), equalTo(100l));
// start scrolling, until we get not results
while (true) {
searchResponse = client.prepareSearchScroll(searchResponse.scrollId()).setScroll(TimeValue.timeValueMinutes(2)).execute().actionGet();
assertThat(searchResponse.failedShards(), equalTo(0));
for (SearchHit hit : searchResponse.hits()) {
assertThat(hit.id() + "should not exists in the result set", ids.contains(hit.id()), equalTo(false));
ids.add(hit.id());
}
if (searchResponse.hits().totalHits() == 0) {
break;
}
}
assertThat(expectedIds, equalTo(ids));
}
}