Search API: Scroll, closes #77.

This commit is contained in:
kimchy 2010-03-21 01:14:49 +02:00
parent 2ae6de2433
commit 1e455789d0
23 changed files with 2787 additions and 149 deletions

View File

@ -102,6 +102,7 @@ public class TransportActionModule extends AbstractModule {
bind(TransportSearchAction.class).asEagerSingleton();
bind(TransportSearchScrollQueryThenFetchAction.class).asEagerSingleton();
bind(TransportSearchScrollQueryAndFetchAction.class).asEagerSingleton();
bind(TransportSearchScrollAction.class).asEagerSingleton();
bind(TransportMoreLikeThisAction.class).asEagerSingleton();

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
@ -39,6 +40,9 @@ public class SearchScrollRequest implements ActionRequest {
private Scroll scroll;
private boolean listenerThreaded = false;
private SearchOperationThreading operationThreading = SearchOperationThreading.SINGLE_THREAD;
public SearchScrollRequest() {
}
@ -54,29 +58,67 @@ public class SearchScrollRequest implements ActionRequest {
return validationException;
}
/**
* Controls the the search operation threading model.
*/
public SearchOperationThreading operationThreading() {
return this.operationThreading;
}
/**
* Controls the the search operation threading model.
*/
public SearchScrollRequest operationThreading(SearchOperationThreading operationThreading) {
this.operationThreading = operationThreading;
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/
@Override public boolean listenerThreaded() {
// TODO threaded
return false; //To change body of implemented methods use File | Settings | File Templates.
return listenerThreaded;
}
@Override public ActionRequest listenerThreaded(boolean threadedListener) {
// TODO threaded
return null; //To change body of implemented methods use File | Settings | File Templates.
/**
* Should the listener be called on a separate thread if needed.
*/
@Override public SearchScrollRequest listenerThreaded(boolean threadedListener) {
this.listenerThreaded = threadedListener;
return this;
}
/**
* The scroll id used to scroll the search.
*/
public String scrollId() {
return scrollId;
}
/**
* If set, will enable scrolling of the search request.
*/
public Scroll scroll() {
return scroll;
}
public void scroll(Scroll scroll) {
/**
* If set, will enable scrolling of the search request.
*/
public SearchScrollRequest scroll(Scroll scroll) {
this.scroll = scroll;
return this;
}
/**
* If set, will enable scrolling of the search request for the specified timeout.
*/
public SearchScrollRequest scroll(TimeValue keepAlive) {
return scroll(new Scroll(keepAlive));
}
@Override public void readFrom(StreamInput in) throws IOException {
operationThreading = SearchOperationThreading.fromId(in.readByte());
scrollId = in.readUTF();
if (in.readBoolean()) {
scroll = readScroll(in);
@ -84,6 +126,7 @@ public class SearchScrollRequest implements ActionRequest {
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeByte(operationThreading.id());
out.writeUTF(scrollId);
if (scroll == null) {
out.writeBoolean(false);

View File

@ -24,6 +24,7 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.search.type.ParsedScrollId;
import org.elasticsearch.action.search.type.TransportSearchScrollQueryAndFetchAction;
import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchAction;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.transport.BaseTransportRequestHandler;
@ -41,10 +42,14 @@ public class TransportSearchScrollAction extends BaseAction<SearchScrollRequest,
private final TransportSearchScrollQueryThenFetchAction queryThenFetchAction;
private final TransportSearchScrollQueryAndFetchAction queryAndFetchAction;
@Inject public TransportSearchScrollAction(Settings settings, TransportService transportService,
TransportSearchScrollQueryThenFetchAction queryThenFetchAction) {
TransportSearchScrollQueryThenFetchAction queryThenFetchAction,
TransportSearchScrollQueryAndFetchAction queryAndFetchAction) {
super(settings);
this.queryThenFetchAction = queryThenFetchAction;
this.queryAndFetchAction = queryAndFetchAction;
transportService.registerHandler(TransportActions.SEARCH_SCROLL, new TransportHandler());
}
@ -54,8 +59,10 @@ public class TransportSearchScrollAction extends BaseAction<SearchScrollRequest,
ParsedScrollId scrollId = parseScrollId(request.scrollId());
if (scrollId.type().equals(QUERY_THEN_FETCH_TYPE)) {
queryThenFetchAction.execute(request, scrollId, listener);
} else if (scrollId.type().equals(QUERY_AND_FETCH_TYPE)) {
queryAndFetchAction.execute(request, scrollId, listener);
} else {
throw new ElasticSearchIllegalArgumentException("Scroll id type [" + scrollId.type() + "] unrecongnized");
throw new ElasticSearchIllegalArgumentException("Scroll id type [" + scrollId.type() + "] unrecognized");
}
} catch (Exception e) {
listener.onFailure(e);

View File

@ -107,7 +107,6 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
executeSecondPhase(counter, node, querySearchRequest);
}
}
searchCache.releaseDfsResults(dfsResults);
}
});
} else {
@ -127,7 +126,6 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
}
}
}
searchCache.releaseDfsResults(dfsResults);
}
}
}
@ -158,28 +156,20 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
try {
innerFinishHim();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures()));
invokeListener(new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures()));
}
}
private void innerFinishHim() {
sortedShardList = searchPhaseController.sortDocs(queryFetchResults.values());
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
String scrollIdX = null;
String scrollId = null;
if (request.scroll() != null) {
scrollIdX = buildScrollId(request.searchType(), queryFetchResults.values());
scrollId = buildScrollId(request.searchType(), dfsResults);
}
final String scrollId = scrollIdX;
searchCache.releaseDfsResults(dfsResults);
searchCache.releaseQueryFetchResults(queryFetchResults);
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
});
} else {
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
invokeListener(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
}
}

View File

@ -113,7 +113,6 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
executeQuery(counter, querySearchRequest, node);
}
}
searchCache.releaseDfsResults(dfsResults);
}
});
} else {
@ -133,7 +132,6 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
}
}
}
searchCache.releaseDfsResults(dfsResults);
}
}
}
@ -164,7 +162,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
try {
innerExecuteFetchPhase();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures()));
invokeListener(new ReduceSearchPhaseException("query", "", e, buildShardFailures()));
}
}
@ -248,28 +246,20 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
try {
innerFinishHim();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
invokeListener(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
}
private void innerFinishHim() {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
String scrollIdX = null;
String scrollId = null;
if (request.scroll() != null) {
scrollIdX = TransportSearchHelper.buildScrollId(request.searchType(), fetchResults.values());
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), dfsResults);
}
final String scrollId = scrollIdX;
searchCache.releaseDfsResults(dfsResults);
searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults);
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
});
} else {
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
invokeListener(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
}
}

View File

@ -19,17 +19,21 @@
package org.elasticsearch.action.search.type;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.fetch.FetchSearchResultProvider;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.util.Base64;
import org.elasticsearch.util.Tuple;
import org.elasticsearch.util.Unicode;
import java.io.IOException;
import java.util.Collection;
import java.util.regex.Pattern;
@ -74,25 +78,30 @@ public abstract class TransportSearchHelper {
return internalRequest;
}
public static String buildScrollId(SearchType searchType, Iterable<? extends FetchSearchResultProvider> fetchResults) {
public static String buildScrollId(SearchType searchType, Iterable<? extends SearchPhaseResult> searchPhaseResults) {
if (searchType == SearchType.DFS_QUERY_THEN_FETCH || searchType == SearchType.QUERY_THEN_FETCH) {
return buildScrollId(ParsedScrollId.QUERY_THEN_FETCH_TYPE, fetchResults);
return buildScrollId(ParsedScrollId.QUERY_THEN_FETCH_TYPE, searchPhaseResults);
} else if (searchType == SearchType.QUERY_AND_FETCH || searchType == SearchType.DFS_QUERY_AND_FETCH) {
return buildScrollId(ParsedScrollId.QUERY_AND_FETCH_TYPE, fetchResults);
return buildScrollId(ParsedScrollId.QUERY_AND_FETCH_TYPE, searchPhaseResults);
} else {
throw new ElasticSearchIllegalStateException();
}
}
public static String buildScrollId(String type, Iterable<? extends FetchSearchResultProvider> fetchResults) {
public static String buildScrollId(String type, Iterable<? extends SearchPhaseResult> searchPhaseResults) {
StringBuilder sb = new StringBuilder().append(type).append(';');
for (FetchSearchResultProvider fetchResult : fetchResults) {
sb.append(fetchResult.id()).append(':').append(fetchResult.shardTarget().nodeId()).append(';');
for (SearchPhaseResult searchPhaseResult : searchPhaseResults) {
sb.append(searchPhaseResult.id()).append(':').append(searchPhaseResult.shardTarget().nodeId()).append(';');
}
return sb.toString();
return Base64.encodeBytes(Unicode.fromStringAsBytes(sb.toString()));
}
public static ParsedScrollId parseScrollId(String scrollId) {
try {
scrollId = Unicode.fromBytes(Base64.decode(scrollId));
} catch (IOException e) {
throw new ElasticSearchIllegalArgumentException("Failed to decode scrollId", e);
}
String[] elements = scrollIdPattern.split(scrollId);
@SuppressWarnings({"unchecked"}) Tuple<String, Long>[] values = new Tuple[elements.length - 1];
for (int i = 1; i < elements.length; i++) {

View File

@ -79,21 +79,12 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
@Override protected void moveToSecondPhase() {
sortedShardList = searchPhaseController.sortDocs(queryFetchResults.values());
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
String scrollIdX = null;
String scrollId = null;
if (request.scroll() != null) {
scrollIdX = buildScrollId(request.searchType(), queryFetchResults.values());
scrollId = buildScrollId(request.searchType(), queryFetchResults.values());
}
final String scrollId = scrollIdX;
searchCache.releaseQueryFetchResults(queryFetchResults);
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
});
} else {
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
invokeListener(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
}
}

View File

@ -161,7 +161,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
try {
innerFinishHim();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
invokeListener(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
}
@ -169,11 +169,11 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), fetchResults.values());
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), queryResults.values());
}
searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults);
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
invokeListener(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));
}
}
}

View File

@ -0,0 +1,239 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search.type;
import com.google.inject.Inject;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.cluster.node.Nodes;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.controller.ShardDoc;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.Tuple;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.settings.Settings;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.*;
/**
* @author kimchy (shay.banon)
*/
public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent {
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final SearchServiceTransportAction searchService;
private final SearchPhaseController searchPhaseController;
private final TransportSearchCache searchCache;
@Inject public TransportSearchScrollQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache searchCache,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.searchCache = searchCache;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
}
public void execute(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
new AsyncAction(request, scrollId, listener).start();
}
private class AsyncAction {
private final SearchScrollRequest request;
private final ActionListener<SearchResponse> listener;
private final ParsedScrollId scrollId;
private final Nodes nodes;
protected final Collection<ShardSearchFailure> shardFailures = searchCache.obtainShardFailures();
private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults();
private final AtomicInteger successfulOps;
private final AtomicInteger counter;
private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
this.request = request;
this.listener = listener;
this.scrollId = scrollId;
this.nodes = clusterService.state().nodes();
this.successfulOps = new AtomicInteger(scrollId.values().length);
this.counter = new AtomicInteger(scrollId.values().length);
}
public void start() {
if (scrollId.values().length == 0) {
invokeListener(new SearchPhaseExecutionException("query", "no nodes to search on", null));
}
int localOperations = 0;
for (Tuple<String, Long> target : scrollId.values()) {
Node node = nodes.get(target.v1());
if (node != null) {
if (nodes.localNodeId().equals(node.id())) {
localOperations++;
} else {
executePhase(node, target.v2());
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.source() + "]");
}
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
}
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.execute(new Runnable() {
@Override public void run() {
for (Tuple<String, Long> target : scrollId.values()) {
Node node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
executePhase(node, target.v2());
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final Tuple<String, Long> target : scrollId.values()) {
final Node node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
threadPool.execute(new Runnable() {
@Override public void run() {
executePhase(node, target.v2());
}
});
} else {
executePhase(node, target.v2());
}
}
}
}
}
for (Tuple<String, Long> target : scrollId.values()) {
Node node = nodes.get(target.v1());
if (node == null) {
if (logger.isDebugEnabled()) {
logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.source() + "]");
}
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
} else {
}
}
}
private void executePhase(Node node, long searchId) {
searchService.sendExecuteFetch(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QueryFetchSearchResult>() {
@Override public void onResult(QueryFetchSearchResult result) {
queryFetchResults.put(result.shardTarget(), result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to execute query phase", t);
}
shardFailures.add(new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
});
}
private void finishHim() {
try {
innerFinishHim();
} catch (Exception e) {
invokeListener(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache)));
}
}
private void innerFinishHim() {
ShardDoc[] sortedShardList = searchPhaseController.sortDocs(queryFetchResults.values());
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
}
searchCache.releaseQueryFetchResults(queryFetchResults);
invokeListener(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(), buildShardFailures(shardFailures, searchCache)));
}
protected void invokeListener(final SearchResponse response) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
} else {
listener.onResponse(response);
}
}
protected void invokeListener(final Throwable t) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onFailure(t);
}
});
} else {
listener.onFailure(t);
}
}
}
}

View File

@ -21,9 +21,7 @@ package org.elasticsearch.action.search.type;
import com.google.inject.Inject;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.search.*;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.cluster.node.Nodes;
@ -37,6 +35,7 @@ import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.Tuple;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.settings.Settings;
@ -49,10 +48,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent {
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final SearchServiceTransportAction searchService;
@ -61,10 +62,11 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
private final TransportSearchCache searchCache;
@Inject public TransportSearchScrollQueryThenFetchAction(Settings settings, ClusterService clusterService,
@Inject public TransportSearchScrollQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache searchCache,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.searchCache = searchCache;
this.searchService = searchService;
@ -104,10 +106,21 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
}
public void start() {
if (scrollId.values().length == 0) {
invokeListener(new SearchPhaseExecutionException("query", "no nodes to search on", null));
}
final AtomicInteger counter = new AtomicInteger(scrollId.values().length);
int localOperations = 0;
for (Tuple<String, Long> target : scrollId.values()) {
Node node = nodes.get(target.v1());
if (node == null) {
if (node != null) {
if (nodes.localNodeId().equals(node.id())) {
localOperations++;
} else {
executeQueryPhase(counter, node, target.v2());
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.source() + "]");
}
@ -115,30 +128,63 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
} else {
searchService.sendExecuteQuery(node, TransportSearchHelper.internalScrollSearchRequest(target.v2(), request), new SearchServiceListener<QuerySearchResult>() {
@Override public void onResult(QuerySearchResult result) {
queryResults.put(result.shardTarget(), result);
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
}
}
}
@Override public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to execute query phase", t);
}
shardFailures.add(new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.execute(new Runnable() {
@Override public void run() {
for (Tuple<String, Long> target : scrollId.values()) {
Node node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
executeQueryPhase(counter, node, target.v2());
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final Tuple<String, Long> target : scrollId.values()) {
final Node node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
threadPool.execute(new Runnable() {
@Override public void run() {
executeQueryPhase(counter, node, target.v2());
}
});
} else {
executeQueryPhase(counter, node, target.v2());
}
}
}
}
}
}
private void executeQueryPhase(final AtomicInteger counter, Node node, long searchId) {
searchService.sendExecuteQuery(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QuerySearchResult>() {
@Override public void onResult(QuerySearchResult result) {
queryResults.put(result.shardTarget(), result);
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
}
@Override public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to execute query phase", t);
}
shardFailures.add(new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
}
});
}
private void executeFetchPhase() {
sortedShardList = searchPhaseController.sortDocs(queryResults.values());
Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList);
@ -176,15 +222,46 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
}
private void finishHim() {
try {
innerFinishHim();
} catch (Exception e) {
invokeListener(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache)));
}
}
private void innerFinishHim() {
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), fetchResults.values());
scrollId = request.scrollId();
}
searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults);
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(), buildShardFailures(shardFailures, searchCache)));
invokeListener(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(), buildShardFailures(shardFailures, searchCache)));
}
protected void invokeListener(final SearchResponse response) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
} else {
listener.onResponse(response);
}
}
protected void invokeListener(final Throwable t) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onFailure(t);
}
});
} else {
listener.onFailure(t);
}
}
}
}

View File

@ -194,7 +194,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
try {
moveToSecondPhase();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
invokeListener(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
}
}
}
@ -210,12 +210,12 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
shardFailures.add(new ShardSearchFailure(t));
if (successulOps.get() == 0) {
// no successful ops, raise an exception
listener.onFailure(new SearchPhaseExecutionException(firstPhaseName(), "total failure", buildShardFailures()));
invokeListener(new SearchPhaseExecutionException(firstPhaseName(), "total failure", buildShardFailures()));
} else {
try {
moveToSecondPhase();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
invokeListener(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
}
}
} else {
@ -235,6 +235,30 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
return TransportSearchHelper.buildShardFailures(shardFailures, searchCache);
}
protected void invokeListener(final SearchResponse response) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
} else {
listener.onResponse(response);
}
}
protected void invokeListener(final Throwable t) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onFailure(t);
}
});
} else {
listener.onFailure(t);
}
}
protected abstract void sendExecuteFirstPhase(Node node, InternalSearchRequest request, SearchServiceListener<FirstResult> listener);
protected abstract void processFirstPhaseResult(ShardRouting shard, FirstResult result);

View File

@ -28,7 +28,7 @@ import java.io.IOException;
import java.io.Serializable;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public interface ShardRouting extends Streamable, Serializable {

View File

@ -43,6 +43,7 @@ import org.elasticsearch.rest.action.index.RestIndexAction;
import org.elasticsearch.rest.action.main.RestMainAction;
import org.elasticsearch.rest.action.mlt.RestMoreLikeThisAction;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
import org.elasticsearch.rest.action.terms.RestTermsAction;
/**
@ -86,6 +87,7 @@ public class RestActionModule extends AbstractModule {
bind(RestTermsAction.class).asEagerSingleton();
bind(RestSearchAction.class).asEagerSingleton();
bind(RestSearchScrollAction.class).asEagerSingleton();
bind(RestMoreLikeThisAction.class).asEagerSingleton();
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.search;
import com.google.inject.Inject;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.rest.*;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.*;
import static org.elasticsearch.rest.RestResponse.Status.*;
import static org.elasticsearch.rest.action.support.RestJsonBuilder.*;
import static org.elasticsearch.util.TimeValue.*;
/**
* @author kimchy (shay.banon)
*/
public class RestSearchScrollAction extends BaseRestHandler {
@Inject public RestSearchScrollAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(GET, "/_searchScroll", this);
controller.registerHandler(POST, "/_searchScroll", this);
controller.registerHandler(GET, "/_searchScroll/{scrollId}", this);
controller.registerHandler(POST, "/_searchScroll/{scrollId}", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(request.param("scrollId"));
try {
String scroll = request.param("scroll");
if (scroll != null) {
searchScrollRequest.scroll(new Scroll(parseTimeValue(scroll, null)));
}
searchScrollRequest.listenerThreaded(false);
SearchOperationThreading operationThreading = SearchOperationThreading.fromString(request.param("operationThreading"), SearchOperationThreading.SINGLE_THREAD);
if (operationThreading == SearchOperationThreading.NO_THREADS) {
// since we don't spawn, don't allow no_threads, but change it to a single thread
operationThreading = SearchOperationThreading.SINGLE_THREAD;
}
searchScrollRequest.operationThreading(operationThreading);
} catch (Exception e) {
try {
JsonBuilder builder = restJsonBuilder(request);
channel.sendResponse(new JsonRestResponse(request, BAD_REQUEST, builder.startObject().field("error", e.getMessage()).endObject()));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
return;
}
client.searchScroll(searchScrollRequest, new ActionListener<SearchResponse>() {
@Override public void onResponse(SearchResponse response) {
try {
JsonBuilder builder = restJsonBuilder(request);
builder.startObject();
response.toJson(builder, request);
builder.endObject();
channel.sendResponse(new JsonRestResponse(request, OK, builder));
} catch (Exception e) {
onFailure(e);
}
}
@Override public void onFailure(Throwable e) {
try {
channel.sendResponse(new JsonThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search;
import org.elasticsearch.util.io.stream.Streamable;
/**
* @author kimchy (shay.banon)
*/
public interface SearchPhaseResult extends Streamable {
long id();
SearchShardTarget shardTarget();
}

View File

@ -315,11 +315,13 @@ public class SearchService extends AbstractComponent implements LifecycleCompone
}
}
private static final int[] EMPTY_DOC_IDS = new int[0];
private void shortcutDocIdsToLoad(SearchContext context) {
TopDocs topDocs = context.queryResult().topDocs();
if (topDocs.scoreDocs.length < context.from()) {
// no more docs...
context.docIdsToLoad(new int[0]);
context.docIdsToLoad(EMPTY_DOC_IDS);
return;
}
int totalSize = context.from() + context.size();

View File

@ -70,7 +70,7 @@ public class SearchPhaseController {
}
public ShardDoc[] sortDocs(Collection<? extends QuerySearchResultProvider> results) {
if (Iterables.isEmpty(results)) {
if (results.isEmpty()) {
return EMPTY;
}
@ -199,9 +199,11 @@ public class SearchPhaseController {
}
FetchSearchResult fetchResult = fetchResultProvider.fetchResult();
int index = fetchResult.counterGetAndIncrement();
SearchHit searchHit = fetchResult.hits().hits()[index];
((InternalSearchHit) searchHit).shard(fetchResult.shardTarget());
hits.add(searchHit);
if (index < fetchResult.hits().hits().length) {
SearchHit searchHit = fetchResult.hits().hits()[index];
((InternalSearchHit) searchHit).shard(fetchResult.shardTarget());
hits.add(searchHit);
}
}
}
InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new SearchHit[hits.size()]), totalHits);

View File

@ -20,10 +20,10 @@
package org.elasticsearch.search.dfs;
import org.apache.lucene.index.Term;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import java.io.IOException;
@ -32,7 +32,7 @@ import static org.elasticsearch.search.SearchShardTarget.*;
/**
* @author kimchy (Shay Banon)
*/
public class DfsSearchResult implements Streamable {
public class DfsSearchResult implements SearchPhaseResult {
private static Term[] EMPTY_TERMS = new Term[0];

View File

@ -19,16 +19,12 @@
package org.elasticsearch.search.fetch;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.SearchPhaseResult;
/**
* @author kimchy (Shay Banon)
*/
public interface FetchSearchResultProvider {
long id();
SearchShardTarget shardTarget();
public interface FetchSearchResultProvider extends SearchPhaseResult {
FetchSearchResult fetchResult();
}

View File

@ -19,21 +19,17 @@
package org.elasticsearch.search.query;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.SearchPhaseResult;
/**
* @author kimchy (Shay Banon)
*/
public interface QuerySearchResultProvider {
public interface QuerySearchResultProvider extends SearchPhaseResult {
/**
* If both query and fetch happened on the same call.
*/
boolean includeFetch();
long id();
SearchShardTarget shardTarget();
QuerySearchResult queryResult();
}

File diff suppressed because it is too large Load Diff

View File

@ -19,8 +19,10 @@
package org.elasticsearch.test.integration.search;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.search.Scroll;
@ -35,6 +37,7 @@ import org.testng.annotations.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import static org.elasticsearch.action.search.SearchType.*;
import static org.elasticsearch.client.Requests.*;
@ -101,8 +104,6 @@ public class TransportTwoServersSearchTests extends AbstractServersTests {
}
}
//
@Test public void testDfsQueryThenFetchWithSort() throws Exception {
SearchSourceBuilder source = searchSource()
.query(termQuery("multi", "test"))
@ -197,15 +198,21 @@ public class TransportTwoServersSearchTests extends AbstractServersTests {
assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1)));
}
// TODO support scrolling
// searchResponse = client.searchScroll(searchScrollRequest(searchResponse.scrollId())).actionGet();
//
// assertThat(searchResponse.hits().totalHits(), equalTo(100l));
// assertThat(searchResponse.hits().hits().length, equalTo(40));
// for (int i = 0; i < 40; i++) {
// SearchHit hit = searchResponse.hits().hits()[i];
searchResponse = client.searchScroll(searchScrollRequest(searchResponse.scrollId())).actionGet();
assertThat(searchResponse.hits().totalHits(), equalTo(100l));
assertThat(searchResponse.hits().hits().length, equalTo(40));
Set<String> expectedIds = Sets.newHashSet();
for (int i = 0; i < 40; i++) {
expectedIds.add(Integer.toString(i));
}
for (int i = 0; i < 40; i++) {
SearchHit hit = searchResponse.hits().hits()[i];
// assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - 60 - 1 - i)));
// }
// we don't do perfect sorting when it comes to scroll with Query+Fetch
assertThat("make sure we don't have duplicates", expectedIds.remove(hit.id()), notNullValue());
}
assertThat("make sure we got all [" + expectedIds + "]", expectedIds.size(), equalTo(0));
}
@Test public void testDfsQueryAndFetch() throws Exception {
@ -224,15 +231,22 @@ public class TransportTwoServersSearchTests extends AbstractServersTests {
assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1)));
}
// TODO support scrolling
// searchResponse = searchScrollAction.submit(new SearchScrollRequest(searchResponse.scrollId())).actionGet();
//
// assertEquals(100, searchResponse.hits().totalHits());
// assertEquals(40, searchResponse.hits().hits().length);
// for (int i = 0; i < 40; i++) {
// SearchHit hit = searchResponse.hits().hits()[i];
// assertEquals("id[" + hit.id() + "]", Integer.toString(100 - 60 - 1 - i), hit.id());
// }
searchResponse = client.searchScroll(searchScrollRequest(searchResponse.scrollId())).actionGet();
assertThat(searchResponse.hits().totalHits(), equalTo(100l));
assertThat(searchResponse.hits().hits().length, equalTo(40));
Set<String> expectedIds = Sets.newHashSet();
for (int i = 0; i < 40; i++) {
expectedIds.add(Integer.toString(i));
}
for (int i = 0; i < 40; i++) {
SearchHit hit = searchResponse.hits().hits()[i];
// System.out.println(hit.shard() + ": " + hit.explanation());
// assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - 60 - 1 - i)));
// we don't do perfect sorting when it comes to scroll with Query+Fetch
assertThat("make sure we don't have duplicates", expectedIds.remove(hit.id()), notNullValue());
}
assertThat("make sure we got all [" + expectedIds + "]", expectedIds.size(), equalTo(0));
}
@Test public void testSimpleFacets() throws Exception {
@ -256,11 +270,12 @@ public class TransportTwoServersSearchTests extends AbstractServersTests {
@Test public void testFailedSearch() throws Exception {
logger.info("Start Testing failed search");
SearchResponse searchResponse = client.search(searchRequest("test").source(Unicode.fromStringAsBytes("{ xxx }"))).actionGet();
assertThat(searchResponse.successfulShards(), equalTo(0));
logger.info("Failures:");
for (ShardSearchFailure searchFailure : searchResponse.shardFailures()) {
logger.info("Reason : " + searchFailure.reason() + ", shard " + searchFailure.shard());
try {
client.search(searchRequest("test").source(Unicode.fromStringAsBytes("{ xxx }"))).actionGet();
assert false : "search should fail";
} catch (ElasticSearchException e) {
assertThat(e.unwrapCause(), instanceOf(SearchPhaseExecutionException.class));
// all is well
}
logger.info("Done Testing failed search");
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.test.integration.search;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
@ -51,6 +52,7 @@ import org.testng.annotations.Test;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static com.google.common.collect.Lists.*;
@ -287,22 +289,30 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractServersTests {
assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1)));
}
// TODO we need to support scrolling for query+fetch
// Map<SearchShardTarget, QueryFetchSearchResult> scrollQueryFetchResults = newHashMap();
// for (QueryFetchSearchResult searchResult : queryFetchResults.values()) {
// QueryFetchSearchResult queryFetchResult = nodeToSearchService.get(searchResult.shardTarget().nodeId()).executeFetchPhase(new InternalScrollSearchRequest(searchResult.id()).scroll(new Scroll(timeValueMinutes(10))));
// scrollQueryFetchResults.put(queryFetchResult.shardTarget(), queryFetchResult);
// }
// queryFetchResults = scrollQueryFetchResults;
//
// sortedShardList = searchPhaseController.sortDocs(queryFetchResults.values());
// hits = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults).hits();
// assertThat(hits.totalHits(), equalTo(100l));
// assertThat(hits.hits().length, equalTo(40));
// for (int i = 0; i < 40; i++) {
// SearchHit hit = hits.hits()[i];
// scrolling with query+fetch is not perfect when it comes to dist sorting
Map<SearchShardTarget, QueryFetchSearchResult> scrollQueryFetchResults = newHashMap();
for (QueryFetchSearchResult searchResult : queryFetchResults.values()) {
QueryFetchSearchResult queryFetchResult = nodeToSearchService.get(searchResult.shardTarget().nodeId()).executeFetchPhase(new InternalScrollSearchRequest(searchResult.id()).scroll(new Scroll(timeValueMinutes(10))));
scrollQueryFetchResults.put(queryFetchResult.shardTarget(), queryFetchResult);
}
queryFetchResults = scrollQueryFetchResults;
sortedShardList = searchPhaseController.sortDocs(queryFetchResults.values());
hits = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults).hits();
assertThat(hits.totalHits(), equalTo(100l));
assertThat(hits.hits().length, equalTo(40));
Set<String> expectedIds = Sets.newHashSet();
for (int i = 0; i < 40; i++) {
expectedIds.add(Integer.toString(i));
}
for (int i = 0; i < 40; i++) {
SearchHit hit = hits.hits()[i];
// System.out.println(hit.id() + " " + hit.explanation());
// we don't do perfect sorting when it comes to scroll with Query+Fetch
// assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - 60 - 1 - i)));
// }
assertThat("make sure we don't have duplicates", expectedIds.remove(hit.id()), notNullValue());
}
assertThat("make sure we got all [" + expectedIds + "]", expectedIds.size(), equalTo(0));
}
@Test public void testSimpleFacets() {