diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java index 9b6b8d31a8c..23d9dfca92d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java @@ -125,8 +125,10 @@ public class TransportActionModule extends AbstractModule { bind(TransportSearchQueryThenFetchAction.class).asEagerSingleton(); bind(TransportSearchDfsQueryAndFetchAction.class).asEagerSingleton(); bind(TransportSearchQueryAndFetchAction.class).asEagerSingleton(); + bind(TransportSearchScanAction.class).asEagerSingleton(); bind(TransportSearchAction.class).asEagerSingleton(); + bind(TransportSearchScrollScanAction.class).asEagerSingleton(); bind(TransportSearchScrollQueryThenFetchAction.class).asEagerSingleton(); bind(TransportSearchScrollQueryAndFetchAction.class).asEagerSingleton(); bind(TransportSearchScrollAction.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java index 1fa2d0fbbd9..afc21040864 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java @@ -41,7 +41,7 @@ public class SearchScrollRequest implements ActionRequest { private Scroll scroll; private boolean listenerThreaded = false; - private SearchOperationThreading operationThreading = SearchOperationThreading.SINGLE_THREAD; + private SearchOperationThreading operationThreading = SearchOperationThreading.THREAD_PER_SHARD; public SearchScrollRequest() { } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchType.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchType.java index 595ea65a4d7..84a142b7b77 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchType.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchType.java @@ -49,7 +49,12 @@ public enum SearchType { * and return the results. Each shard returns size results. Since each shard already returns size hits, this * type actually returns size times number of shards results back to the caller. */ - QUERY_AND_FETCH((byte) 3); + QUERY_AND_FETCH((byte) 3), + /** + * Performs scanning of the results which executes the search without any sorting. + * It will automatically start scrolling the result set. + */ + SCAN((byte) 4); /** * The default search type ({@link #QUERY_THEN_FETCH}. @@ -81,6 +86,8 @@ public enum SearchType { return DFS_QUERY_AND_FETCH; } else if (id == 3) { return QUERY_AND_FETCH; + } else if (id == 4) { + return SCAN; } else { throw new ElasticSearchIllegalArgumentException("No search type for [" + id + "]"); } @@ -89,7 +96,7 @@ public enum SearchType { /** * The a string representation search type to execute, defaults to {@link SearchType#DEFAULT}. Can be * one of "dfs_query_then_fetch"/"dfsQueryThenFetch", "dfs_query_and_fetch"/"dfsQueryAndFetch", - * "query_then_fetch"/"queryThenFetch", and "query_and_fetch"/"queryAndFetch". + * "query_then_fetch"/"queryThenFetch", "query_and_fetch"/"queryAndFetch", and "scan". */ public static SearchType fromString(String searchType) throws ElasticSearchIllegalArgumentException { if (searchType == null) { @@ -103,6 +110,8 @@ public enum SearchType { return SearchType.QUERY_THEN_FETCH; } else if ("query_and_fetch".equals(searchType)) { return SearchType.QUERY_AND_FETCH; + } else if ("scan".equals(searchType)) { + return SearchType.SCAN; } else { throw new ElasticSearchIllegalArgumentException("No search type for [" + searchType + "]"); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 7b3b8304046..9716eafcac1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -21,10 +21,7 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.TransportActions; -import org.elasticsearch.action.search.type.TransportSearchDfsQueryAndFetchAction; -import org.elasticsearch.action.search.type.TransportSearchDfsQueryThenFetchAction; -import org.elasticsearch.action.search.type.TransportSearchQueryAndFetchAction; -import org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction; +import org.elasticsearch.action.search.type.*; import org.elasticsearch.action.support.BaseAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -54,6 +51,8 @@ public class TransportSearchAction extends BaseAction listener) { // optimize search type for cases where there is only one shard group to search on - if (optimizeSingleShard) { + if (optimizeSingleShard && searchRequest.searchType() != SCAN) { try { ClusterState clusterState = clusterService.state(); searchRequest.indices(clusterState.metaData().concreteIndices(searchRequest.indices())); @@ -101,6 +102,8 @@ public class TransportSearchAction extends BaseAction listener) { + new AsyncAction(searchRequest, listener).start(); + } + + private class AsyncAction extends BaseAsyncAction { + + private final Map scanResults = ConcurrentCollections.newConcurrentMap(); + + private AsyncAction(SearchRequest request, ActionListener listener) { + super(request, listener); + } + + @Override protected String firstPhaseName() { + return "init_scan"; + } + + @Override protected void sendExecuteFirstPhase(DiscoveryNode node, InternalSearchRequest request, SearchServiceListener listener) { + searchService.sendExecuteScan(node, request, listener); + } + + @Override protected void processFirstPhaseResult(ShardRouting shard, ScanSearchResult result) { + scanResults.put(result.shardTarget(), result); + } + + @Override protected void moveToSecondPhase() throws Exception { + long totalHits = 0; + for (ScanSearchResult scanSearchResult : scanResults.values()) { + totalHits += scanSearchResult.totalHits(); + } + final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, totalHits, 0.0f), null, false); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), scanResults.values()); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures())); + } + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java index 29e0c21f9f3..62139431525 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java @@ -105,6 +105,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent public void start() { if (scrollId.values().length == 0) { listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null)); + return; } int localOperations = 0; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java index 1d16535a570..074f2e718f4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java @@ -110,6 +110,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent public void start() { if (scrollId.values().length == 0) { listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null)); + return; } final AtomicInteger counter = new AtomicInteger(scrollId.values().length); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java new file mode 100644 index 00000000000..63eaea44c7b --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java @@ -0,0 +1,246 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search.type; + +import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.*; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.action.SearchServiceListener; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.controller.ShardDoc; +import org.elasticsearch.search.controller.ShardScoreDoc; +import org.elasticsearch.search.fetch.QueryFetchSearchResult; +import org.elasticsearch.search.internal.InternalSearchHits; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.action.search.type.TransportSearchHelper.*; + +/** + * @author kimchy (shay.banon) + */ +public class TransportSearchScrollScanAction extends AbstractComponent { + + private final ThreadPool threadPool; + + private final ClusterService clusterService; + + private final SearchServiceTransportAction searchService; + + private final SearchPhaseController searchPhaseController; + + private final TransportSearchCache searchCache; + + @Inject public TransportSearchScrollScanAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + TransportSearchCache searchCache, + SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { + super(settings); + this.threadPool = threadPool; + this.clusterService = clusterService; + this.searchCache = searchCache; + this.searchService = searchService; + this.searchPhaseController = searchPhaseController; + } + + public void execute(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { + new AsyncAction(request, scrollId, listener).start(); + } + + private class AsyncAction { + + private final SearchScrollRequest request; + + private final ActionListener listener; + + private final ParsedScrollId scrollId; + + private final DiscoveryNodes nodes; + + protected final Collection shardFailures = searchCache.obtainShardFailures(); + + private final Map queryFetchResults = searchCache.obtainQueryFetchResults(); + + private final AtomicInteger successfulOps; + + private final AtomicInteger counter; + + private final long startTime = System.currentTimeMillis(); + + private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { + this.request = request; + this.listener = listener; + this.scrollId = scrollId; + this.nodes = clusterService.state().nodes(); + this.successfulOps = new AtomicInteger(scrollId.values().length); + this.counter = new AtomicInteger(scrollId.values().length); + } + + public void start() { + if (scrollId.values().length == 0) { + final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, 0, 0.0f), null, false); + searchCache.releaseQueryFetchResults(queryFetchResults); + listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, TransportSearchHelper.buildShardFailures(shardFailures, searchCache))); + return; + } + + int localOperations = 0; + for (Tuple target : scrollId.values()) { + DiscoveryNode node = nodes.get(target.v1()); + if (node != null) { + if (nodes.localNodeId().equals(node.id())) { + localOperations++; + } else { + executePhase(node, target.v2()); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.source() + "]"); + } + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + } + + if (localOperations > 0) { + if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override public void run() { + for (Tuple target : scrollId.values()) { + DiscoveryNode node = nodes.get(target.v1()); + if (node != null && nodes.localNodeId().equals(node.id())) { + executePhase(node, target.v2()); + } + } + } + }); + } else { + boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; + for (final Tuple target : scrollId.values()) { + final DiscoveryNode node = nodes.get(target.v1()); + if (node != null && nodes.localNodeId().equals(node.id())) { + if (localAsync) { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override public void run() { + executePhase(node, target.v2()); + } + }); + } else { + executePhase(node, target.v2()); + } + } + } + } + } + + for (Tuple target : scrollId.values()) { + DiscoveryNode node = nodes.get(target.v1()); + if (node == null) { + if (logger.isDebugEnabled()) { + logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.source() + "]"); + } + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } else { + } + } + } + + private void executePhase(DiscoveryNode node, final long searchId) { + searchService.sendExecuteScan(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener() { + @Override public void onResult(QueryFetchSearchResult result) { + queryFetchResults.put(result.shardTarget(), result); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + @Override public void onFailure(Throwable t) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute query phase", t, searchId); + } + shardFailures.add(new ShardSearchFailure(t)); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + }); + } + + private void finishHim() { + try { + innerFinishHim(); + } catch (Exception e) { + listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache))); + } + } + + private void innerFinishHim() throws IOException { + int numberOfHits = 0; + for (QueryFetchSearchResult shardResult : queryFetchResults.values()) { + numberOfHits += shardResult.queryResult().topDocs().scoreDocs.length; + } + ShardDoc[] docs = new ShardDoc[numberOfHits]; + int counter = 0; + for (QueryFetchSearchResult shardResult : queryFetchResults.values()) { + ScoreDoc[] scoreDocs = shardResult.queryResult().topDocs().scoreDocs; + for (ScoreDoc scoreDoc : scoreDocs) { + docs[counter++] = new ShardScoreDoc(shardResult.shardTarget(), scoreDoc.doc, 0.0f); + } + } + final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults); + + + for (QueryFetchSearchResult shardResult : queryFetchResults.values()) { + if (shardResult.queryResult().topDocs().scoreDocs.length < shardResult.queryResult().size()) { + // we found more than we want for this round, remove this from our scrolling + queryFetchResults.remove(shardResult.shardTarget()); + } + } + + String scrollId = null; + if (request.scroll() != null) { + // we rebuild the scroll id since we remove shards that we finished scrolling on + scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values()); + } + searchCache.releaseQueryFetchResults(queryFetchResults); + listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(), + System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache))); + } + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java index 6db84a2d031..0f657a31653 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.Unicode; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -49,6 +50,7 @@ import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalSearchRequest; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.query.*; +import org.elasticsearch.search.scan.ScanSearchResult; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -161,6 +163,56 @@ public class SearchService extends AbstractLifecycleComponent { } } + public ScanSearchResult executeScan(InternalSearchRequest request) throws ElasticSearchException { + SearchContext context = createContext(request); + activeContexts.put(context.id(), context); + contextProcessing(context); + try { + if (context.scroll() == null) { + throw new ElasticSearchException("Scroll must be provided when scanning..."); + } + Lucene.CountCollector countCollector = new Lucene.CountCollector(-1); + context.searcher().search(context.query(), countCollector); + contextProcessedSuccessfully(context); + return new ScanSearchResult(context.id(), countCollector.count()); + } catch (RuntimeException e) { + freeContext(context); + throw e; + } catch (IOException e) { + freeContext(context); + throw new QueryPhaseExecutionException(context, "failed to execute initial scan query", e); + } finally { + cleanContext(context); + } + } + + public ScrollQueryFetchSearchResult executeScan(InternalScrollSearchRequest request) throws ElasticSearchException { + SearchContext context = findContext(request.id()); + contextProcessing(context); + try { + processScroll(request, context); + if (!context.scanning()) { + // first scanning, reset the from to 0 + context.scanning(true); + context.from(0); + } + queryPhase.execute(context); + shortcutDocIdsToLoadForScanning(context); + fetchPhase.execute(context); + if (context.scroll() == null || context.fetchResult().hits().hits().length < context.size()) { + freeContext(request.id()); + } else { + contextProcessedSuccessfully(context); + } + return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget()); + } catch (RuntimeException e) { + freeContext(context); + throw e; + } finally { + cleanContext(context); + } + } + public QuerySearchResult executeQueryPhase(InternalSearchRequest request) throws ElasticSearchException { SearchContext context = createContext(request); activeContexts.put(context.id(), context); @@ -444,6 +496,20 @@ public class SearchService extends AbstractLifecycleComponent { context.docIdsToLoad(docIdsToLoad, 0, counter); } + private void shortcutDocIdsToLoadForScanning(SearchContext context) { + TopDocs topDocs = context.queryResult().topDocs(); + if (topDocs.scoreDocs.length == 0) { + // no more docs... + context.docIdsToLoad(EMPTY_DOC_IDS, 0, 0); + return; + } + int[] docIdsToLoad = new int[topDocs.scoreDocs.length]; + for (int i = 0; i < docIdsToLoad.length; i++) { + docIdsToLoad[i] = topDocs.scoreDocs[i].doc; + } + context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length); + } + private void processScroll(InternalScrollSearchRequest request, SearchContext context) { // process scroll context.from(context.from() + context.size()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java index aba1ed52e2f..df569339eb2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java @@ -38,6 +38,7 @@ import org.elasticsearch.search.internal.InternalSearchRequest; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.ScrollQuerySearchResult; +import org.elasticsearch.search.scan.ScanSearchResult; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -86,6 +87,8 @@ public class SearchServiceTransportAction extends AbstractComponent { transportService.registerHandler(SearchQueryQueryFetchTransportHandler.ACTION, new SearchQueryQueryFetchTransportHandler()); transportService.registerHandler(SearchQueryFetchScrollTransportHandler.ACTION, new SearchQueryFetchScrollTransportHandler()); transportService.registerHandler(SearchFetchByIdTransportHandler.ACTION, new SearchFetchByIdTransportHandler()); + transportService.registerHandler(SearchScanTransportHandler.ACTION, new SearchScanTransportHandler()); + transportService.registerHandler(SearchScanScrollTransportHandler.ACTION, new SearchScanScrollTransportHandler()); } public void sendFreeContext(DiscoveryNode node, final long contextId) { @@ -336,6 +339,66 @@ public class SearchServiceTransportAction extends AbstractComponent { } } + public void sendExecuteScan(DiscoveryNode node, final InternalSearchRequest request, final SearchServiceListener listener) { + if (clusterService.state().nodes().localNodeId().equals(node.id())) { + try { + ScanSearchResult result = searchService.executeScan(request); + listener.onResult(result); + } catch (Exception e) { + listener.onFailure(e); + } + } else { + transportService.sendRequest(node, SearchScanTransportHandler.ACTION, request, new BaseTransportResponseHandler() { + + @Override public ScanSearchResult newInstance() { + return new ScanSearchResult(); + } + + @Override public void handleResponse(ScanSearchResult response) { + listener.onResult(response); + } + + @Override public void handleException(TransportException exp) { + listener.onFailure(exp); + } + + @Override public String executor() { + return ThreadPool.Names.SAME; + } + }); + } + } + + public void sendExecuteScan(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener listener) { + if (clusterService.state().nodes().localNodeId().equals(node.id())) { + try { + ScrollQueryFetchSearchResult result = searchService.executeScan(request); + listener.onResult(result.result()); + } catch (Exception e) { + listener.onFailure(e); + } + } else { + transportService.sendRequest(node, SearchScanScrollTransportHandler.ACTION, request, new BaseTransportResponseHandler() { + + @Override public ScrollQueryFetchSearchResult newInstance() { + return new ScrollQueryFetchSearchResult(); + } + + @Override public void handleResponse(ScrollQueryFetchSearchResult response) { + listener.onResult(response.result()); + } + + @Override public void handleException(TransportException exp) { + listener.onFailure(exp); + } + + @Override public String executor() { + return ThreadPool.Names.SAME; + } + }); + } + } + private class SearchFreeContextTransportHandler extends BaseTransportRequestHandler { static final String ACTION = "search/freeContext"; @@ -498,4 +561,40 @@ public class SearchServiceTransportAction extends AbstractComponent { return ThreadPool.Names.SEARCH; } } + + private class SearchScanTransportHandler extends BaseTransportRequestHandler { + + static final String ACTION = "search/phase/scan"; + + @Override public InternalSearchRequest newInstance() { + return new InternalSearchRequest(); + } + + @Override public void messageReceived(InternalSearchRequest request, TransportChannel channel) throws Exception { + ScanSearchResult result = searchService.executeScan(request); + channel.sendResponse(result); + } + + @Override public String executor() { + return ThreadPool.Names.SEARCH; + } + } + + private class SearchScanScrollTransportHandler extends BaseTransportRequestHandler { + + static final String ACTION = "search/phase/scan/scroll"; + + @Override public InternalScrollSearchRequest newInstance() { + return new InternalScrollSearchRequest(); + } + + @Override public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel) throws Exception { + ScrollQueryFetchSearchResult result = searchService.executeScan(request); + channel.sendResponse(result); + } + + @Override public String executor() { + return ThreadPool.Names.SEARCH; + } + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java index 8c34464aaa7..54f1572399e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java @@ -91,7 +91,7 @@ public class InternalSearchHits implements SearchHits { } - private static final InternalSearchHit[] EMPTY = new InternalSearchHit[0]; + public static final InternalSearchHit[] EMPTY = new InternalSearchHit[0]; private InternalSearchHit[] hits; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java index 8fa0462b8f2..1beaf6edf5a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -96,6 +96,8 @@ public class SearchContext implements Releasable { private float queryBoost = 1.0f; + private boolean scanning = false; + private Scroll scroll; private boolean explain; @@ -289,6 +291,15 @@ public class SearchContext implements Releasable { return timeout; } + public SearchContext scanning(boolean scanning) { + this.scanning = scanning; + return this; + } + + public boolean scanning() { + return this.scanning; + } + public SearchContext sort(Sort sort) { this.sort = sort; return this; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 44f3a5b335b..583d1e11a1e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.query; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.*; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.inject.Inject; @@ -35,6 +36,8 @@ import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.sort.SortParseElement; import org.elasticsearch.search.sort.TrackScoresParseElement; +import java.io.IOException; +import java.util.ArrayList; import java.util.Map; /** @@ -184,7 +187,15 @@ public class QueryPhase implements SearchPhase { } } - if (sort) { + if (searchContext.scanning()) { + ScanCollector scanCollector = new ScanCollector(searchContext.from(), searchContext.size()); + try { + searchContext.searcher().search(query, scanCollector); + } catch (ScanCollector.StopCollectingException e) { + // all is well + } + topDocs = scanCollector.topDocs(); + } else if (sort) { topDocs = searchContext.searcher().search(query, null, numDocs, searchContext.sort()); } else { topDocs = searchContext.searcher().search(query, numDocs); @@ -198,4 +209,56 @@ public class QueryPhase implements SearchPhase { facetPhase.execute(searchContext); } + + static class ScanCollector extends Collector { + + private final int from; + + private final int to; + + private final ArrayList docs; + + private int docBase; + + private int counter; + + ScanCollector(int from, int size) { + this.from = from; + this.to = from + size; + this.docs = new ArrayList(size); + } + + public TopDocs topDocs() { + return new TopDocs(docs.size(), docs.toArray(new ScoreDoc[docs.size()]), 0f); + } + + @Override public void setScorer(Scorer scorer) throws IOException { + } + + @Override public void collect(int doc) throws IOException { + if (counter >= from) { + docs.add(new ScoreDoc(docBase + doc, 0f)); + } + counter++; + if (counter >= to) { + throw StopCollectingException; + } + } + + @Override public void setNextReader(IndexReader reader, int docBase) throws IOException { + this.docBase = docBase; + } + + @Override public boolean acceptsDocsOutOfOrder() { + return true; + } + + public static final RuntimeException StopCollectingException = new StopCollectingException(); + + static class StopCollectingException extends RuntimeException { + @Override public Throwable fillInStackTrace() { + return null; + } + } + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/scan/ScanSearchResult.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/scan/ScanSearchResult.java new file mode 100644 index 00000000000..84d95a5569b --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/scan/ScanSearchResult.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.scan; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; + +import java.io.IOException; + +/** + * The initial scan search result, including the count of hits matching the provided query. + */ +public class ScanSearchResult implements Streamable, SearchPhaseResult { + + private long id; + + private long totalHits; + + private SearchShardTarget shardTarget; + + public ScanSearchResult() { + } + + public ScanSearchResult(long id, long totalHits) { + this.id = id; + this.totalHits = totalHits; + } + + public long id() { + return this.id; + } + + public long totalHits() { + return this.totalHits; + } + + @Override public SearchShardTarget shardTarget() { + return shardTarget; + } + + @Override public void shardTarget(SearchShardTarget shardTarget) { + this.shardTarget = shardTarget; + } + + @Override public void readFrom(StreamInput in) throws IOException { + id = in.readLong(); + totalHits = in.readVLong(); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeLong(id); + out.writeVLong(totalHits); + } +} \ No newline at end of file diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/embedded/SingleShardEmbeddedSearchTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/embedded/SingleShardEmbeddedSearchTests.java index 784a8dfb3b8..9d9a9510b3c 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/embedded/SingleShardEmbeddedSearchTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/embedded/SingleShardEmbeddedSearchTests.java @@ -36,10 +36,12 @@ import org.elasticsearch.search.facet.query.QueryFacet; import org.elasticsearch.search.fetch.FetchSearchRequest; import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.QueryFetchSearchResult; +import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.scan.ScanSearchResult; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.integration.AbstractNodesTests; import org.testng.annotations.AfterClass; @@ -124,6 +126,44 @@ public class SingleShardEmbeddedSearchTests extends AbstractNodesTests { assertThat(queryFetchResult.fetchResult().hits().hits()[0].type(), equalTo("type1")); } + @Test public void testScan() throws Exception { + Scroll scroll = new Scroll(TimeValue.timeValueMillis(500)); + ScanSearchResult scanResult = searchService.executeScan(searchRequest(searchSource().query(matchAllQuery()).size(2)).scroll(scroll)); + assertThat(scanResult.totalHits(), equalTo(5l)); + + Set idsLoaded = Sets.newHashSet(); + // start scrolling + FetchSearchResult fetchResult = searchService.executeScan(new InternalScrollSearchRequest(scanResult.id()).scroll(scroll)).result().fetchResult(); + assertThat(fetchResult.hits().hits().length, equalTo(2)); + for (SearchHit hit : fetchResult.hits()) { + idsLoaded.add(hit.id()); + } + // and again... + fetchResult = searchService.executeScan(new InternalScrollSearchRequest(scanResult.id()).scroll(scroll)).result().fetchResult(); + assertThat(fetchResult.hits().hits().length, equalTo(2)); + for (SearchHit hit : fetchResult.hits()) { + idsLoaded.add(hit.id()); + } + + fetchResult = searchService.executeScan(new InternalScrollSearchRequest(scanResult.id()).scroll(scroll)).result().fetchResult(); + assertThat(fetchResult.hits().hits().length, equalTo(1)); + for (SearchHit hit : fetchResult.hits()) { + idsLoaded.add(hit.id()); + } + + + Set expectedIds = Sets.newHashSet("1", "2", "3", "4", "5"); + assertThat(idsLoaded, equalTo(expectedIds)); + + // do another one for fun, should be expired context + try { + searchService.executeScan(new InternalScrollSearchRequest(scanResult.id()).scroll(scroll)).result().fetchResult(); + assert false; + } catch (SearchContextMissingException e) { + // ignore + } + } + @Test public void testQueryThenFetch() throws Exception { QuerySearchResult queryResult = searchService.executeQueryPhase(searchRequest(searchSource().query(termQuery("name", "test1")))); assertThat(queryResult.topDocs().totalHits, equalTo(1)); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/scan/SearchScanTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/scan/SearchScanTests.java new file mode 100644 index 00000000000..6d26146a6ee --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/scan/SearchScanTests.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.search.scan; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.Sets; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.Set; + +import static org.elasticsearch.index.query.xcontent.QueryBuilders.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +public class SearchScanTests extends AbstractNodesTests { + + private Client client; + + @BeforeClass public void createNodes() throws Exception { + startNode("node1"); + startNode("node2"); + client = getClient(); + } + + @AfterClass public void closeNodes() { + client.close(); + closeAllNodes(); + } + + protected Client getClient() { + return client("node1"); + } + + @Test public void testSimpleScroll() throws Exception { + try { + client.admin().indices().prepareDelete("test").execute().actionGet(); + } catch (Exception e) { + // ignore + } + client.admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 3)).execute().actionGet(); + client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + + Set ids = Sets.newHashSet(); + Set expectedIds = Sets.newHashSet(); + for (int i = 0; i < 100; i++) { + String id = Integer.toString(i); + expectedIds.add(id); + client.prepareIndex("test", "type1", id).setSource("field", i).execute().actionGet(); + } + + client.admin().indices().prepareRefresh().execute().actionGet(); + + SearchResponse searchResponse = client.prepareSearch() + .setSearchType(SearchType.SCAN) + .setQuery(matchAllQuery()) + .setSize(7) + .setScroll(TimeValue.timeValueMinutes(2)) + .execute().actionGet(); + + assertThat(searchResponse.hits().totalHits(), equalTo(100l)); + + // start scrolling, until we get not results + while (true) { + searchResponse = client.prepareSearchScroll(searchResponse.scrollId()).setScroll(TimeValue.timeValueMinutes(2)).execute().actionGet(); + assertThat(searchResponse.failedShards(), equalTo(0)); + for (SearchHit hit : searchResponse.hits()) { + assertThat(hit.id() + "should not exists in the result set", ids.contains(hit.id()), equalTo(false)); + ids.add(hit.id()); + } + if (searchResponse.hits().totalHits() == 0) { + break; + } + } + + assertThat(expectedIds, equalTo(ids)); + } +} \ No newline at end of file