Remove QUERY_AND_FETCH search type (#22996)
`QUERY_AND_FETCH` has been treated as an internal optimization for 2 major versions. This commit removes the search type and it's implementation details and folds the optimization in the case of a single shard into the search controller such that every search with a single shard (non DFS) will receive this optimization.
This commit is contained in:
parent
5e7d22357f
commit
7513c6e4eb
|
@ -38,7 +38,6 @@ import org.elasticsearch.common.util.concurrent.CountDown;
|
|||
import org.elasticsearch.search.SearchPhaseResult;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.fetch.FetchSearchResult;
|
||||
import org.elasticsearch.search.fetch.FetchSearchResultProvider;
|
||||
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
|
||||
import org.elasticsearch.search.internal.AliasFilter;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
|
@ -51,7 +50,6 @@ import org.elasticsearch.transport.Transport;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
|
@ -137,7 +135,6 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
|
|||
final Transport.Connection connection = nodeIdToConnection.apply(shard.currentNodeId());
|
||||
AliasFilter filter = this.aliasFilter.get(shard.index().getUUID());
|
||||
assert filter != null;
|
||||
|
||||
float indexBoost = concreteIndexBoosts.getOrDefault(shard.index().getUUID(), DEFAULT_INDEX_BOOST);
|
||||
ShardSearchTransportRequest transportRequest = new ShardSearchTransportRequest(request, shardIt.shardId(), shardsIts.size(),
|
||||
filter, indexBoost, startTime());
|
||||
|
@ -440,39 +437,45 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
|
|||
public void run() throws Exception {
|
||||
final boolean isScrollRequest = request.scroll() != null;
|
||||
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults);
|
||||
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs);
|
||||
final IntConsumer finishPhase = successOpts
|
||||
-> sendResponseAsync("fetch", searchPhaseController, sortedShardDocs, queryResults, fetchResults);
|
||||
if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return
|
||||
queryResults.asList().stream()
|
||||
.map(e -> e.value.queryResult())
|
||||
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
|
||||
finishPhase.accept(successfulOps.get());
|
||||
if (queryResults.length() == 1) {
|
||||
assert queryResults.get(0) == null || queryResults.get(0).fetchResult() != null;
|
||||
// query AND fetch optimization
|
||||
sendResponseAsync("fetch", searchPhaseController, sortedShardDocs, queryResults, queryResults);
|
||||
} else {
|
||||
final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ?
|
||||
searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length())
|
||||
: null;
|
||||
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(fetchResults,
|
||||
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
|
||||
finishPhase);
|
||||
for (int i = 0; i < docIdsToLoad.length; i++) {
|
||||
IntArrayList entry = docIdsToLoad[i];
|
||||
QuerySearchResultProvider queryResult = queryResults.get(i);
|
||||
if (entry == null) { // no results for this shard ID
|
||||
if (queryResult != null) {
|
||||
// if we got some hits from this shard we have to release the context there
|
||||
// we do this as we go since it will free up resources and passing on the request on the
|
||||
// transport layer is cheap.
|
||||
releaseIrrelevantSearchContext(queryResult.queryResult());
|
||||
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs);
|
||||
final IntConsumer finishPhase = successOpts
|
||||
-> sendResponseAsync("fetch", searchPhaseController, sortedShardDocs, queryResults, fetchResults);
|
||||
if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return
|
||||
queryResults.asList().stream()
|
||||
.map(e -> e.value.queryResult())
|
||||
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
|
||||
finishPhase.accept(successfulOps.get());
|
||||
} else {
|
||||
final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ?
|
||||
searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length())
|
||||
: null;
|
||||
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(fetchResults,
|
||||
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
|
||||
finishPhase);
|
||||
for (int i = 0; i < docIdsToLoad.length; i++) {
|
||||
IntArrayList entry = docIdsToLoad[i];
|
||||
QuerySearchResultProvider queryResult = queryResults.get(i);
|
||||
if (entry == null) { // no results for this shard ID
|
||||
if (queryResult != null) {
|
||||
// if we got some hits from this shard we have to release the context there
|
||||
// we do this as we go since it will free up resources and passing on the request on the
|
||||
// transport layer is cheap.
|
||||
releaseIrrelevantSearchContext(queryResult.queryResult());
|
||||
}
|
||||
// in any case we count down this result since we don't talk to this shard anymore
|
||||
counter.countDown();
|
||||
} else {
|
||||
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId());
|
||||
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), i, entry,
|
||||
lastEmittedDocPerShard);
|
||||
executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(),
|
||||
connection);
|
||||
}
|
||||
// in any case we count down this result since we don't talk to this shard anymore
|
||||
counter.countDown();
|
||||
} else {
|
||||
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId());
|
||||
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), i, entry,
|
||||
lastEmittedDocPerShard);
|
||||
executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(),
|
||||
connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -529,16 +532,14 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
|
|||
*/
|
||||
final void sendResponseAsync(String phase, SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs,
|
||||
AtomicArray<? extends QuerySearchResultProvider> queryResultsArr,
|
||||
AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr) {
|
||||
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) {
|
||||
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
|
||||
@Override
|
||||
public void doRun() throws IOException {
|
||||
final boolean isScrollRequest = request.scroll() != null;
|
||||
final ScoreDoc[] theScoreDocs = sortedDocs == null ? searchPhaseController.sortDocs(isScrollRequest, queryResultsArr)
|
||||
: sortedDocs;
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, theScoreDocs, queryResultsArr,
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs, queryResultsArr,
|
||||
fetchResultsArr);
|
||||
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), queryResultsArr) : null;
|
||||
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(queryResultsArr) : null;
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
|
||||
buildTookInMillis(), buildShardFailures()));
|
||||
}
|
||||
|
|
|
@ -31,15 +31,11 @@ import org.apache.lucene.search.TermStatistics;
|
|||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.search.TopFieldDocs;
|
||||
import org.apache.lucene.search.grouping.CollapseTopFieldDocs;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.IntsRef;
|
||||
import org.elasticsearch.common.collect.HppcMaps;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.ArrayUtils;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.IntArray;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
|
@ -49,7 +45,6 @@ import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
|
|||
import org.elasticsearch.search.dfs.AggregatedDfs;
|
||||
import org.elasticsearch.search.dfs.DfsSearchResult;
|
||||
import org.elasticsearch.search.fetch.FetchSearchResult;
|
||||
import org.elasticsearch.search.fetch.FetchSearchResultProvider;
|
||||
import org.elasticsearch.search.internal.InternalSearchHit;
|
||||
import org.elasticsearch.search.internal.InternalSearchHits;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
|
@ -68,7 +63,6 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiConsumer;
|
||||
|
@ -391,10 +385,10 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
*/
|
||||
public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs,
|
||||
AtomicArray<? extends QuerySearchResultProvider> queryResultsArr,
|
||||
AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr) {
|
||||
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) {
|
||||
|
||||
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults = queryResultsArr.asList();
|
||||
List<? extends AtomicArray.Entry<? extends FetchSearchResultProvider>> fetchResults = fetchResultsArr.asList();
|
||||
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> fetchResults = fetchResultsArr.asList();
|
||||
|
||||
if (queryResults.isEmpty()) {
|
||||
return InternalSearchResponse.empty();
|
||||
|
@ -448,7 +442,7 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
}
|
||||
|
||||
// clean the fetch counter
|
||||
for (AtomicArray.Entry<? extends FetchSearchResultProvider> entry : fetchResults) {
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : fetchResults) {
|
||||
entry.value.fetchResult().initCounter();
|
||||
}
|
||||
int from = ignoreFrom ? 0 : firstResult.queryResult().from();
|
||||
|
@ -460,7 +454,7 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
if (!fetchResults.isEmpty()) {
|
||||
for (int i = 0; i < numSearchHits; i++) {
|
||||
ScoreDoc shardDoc = sortedDocs[i];
|
||||
FetchSearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
|
||||
QuerySearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
|
||||
if (fetchResultProvider == null) {
|
||||
continue;
|
||||
}
|
||||
|
@ -503,11 +497,11 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
final List<CompletionSuggestion.Entry.Option> suggestionOptions = suggestion.getOptions();
|
||||
for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) {
|
||||
ScoreDoc shardDoc = sortedDocs[scoreDocIndex];
|
||||
FetchSearchResultProvider fetchSearchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
|
||||
if (fetchSearchResultProvider == null) {
|
||||
QuerySearchResultProvider searchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
|
||||
if (searchResultProvider == null) {
|
||||
continue;
|
||||
}
|
||||
FetchSearchResult fetchResult = fetchSearchResultProvider.fetchResult();
|
||||
FetchSearchResult fetchResult = searchResultProvider.fetchResult();
|
||||
int fetchResultIndex = fetchResult.counterGetAndIncrement();
|
||||
if (fetchResultIndex < fetchResult.hits().internalHits().length) {
|
||||
InternalSearchHit hit = fetchResult.hits().internalHits()[fetchResultIndex];
|
||||
|
@ -569,7 +563,7 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
private static int topN(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults) {
|
||||
QuerySearchResultProvider firstResult = queryResults.get(0).value;
|
||||
int topN = firstResult.queryResult().size();
|
||||
if (firstResult.includeFetch()) {
|
||||
if (firstResult.fetchResult() != null) {
|
||||
// if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
|
||||
// this is also important since we shortcut and fetch only docs from "from" and up to "size"
|
||||
topN *= queryResults.size();
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.common.CheckedRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
|
||||
import org.elasticsearch.search.internal.AliasFilter;
|
||||
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Function;
|
||||
|
||||
final class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetchSearchResult> {
|
||||
|
||||
SearchQueryAndFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
|
||||
Function<String, Transport.Connection> nodeIdToConnection,
|
||||
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
|
||||
SearchPhaseController searchPhaseController, Executor executor,
|
||||
SearchRequest request, ActionListener<SearchResponse> listener,
|
||||
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
|
||||
SearchTask task) {
|
||||
super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor,
|
||||
request, listener, shardsIts, startTime, clusterStateVersion, task);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String initialPhaseName() {
|
||||
return "query_fetch";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request,
|
||||
ActionListener<QueryFetchSearchResult> listener) {
|
||||
searchTransportService.sendExecuteFetch(connection, request, task, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CheckedRunnable<Exception> getNextPhase(AtomicArray<QueryFetchSearchResult> initialResults) {
|
||||
return () -> sendResponseAsync("fetch", searchPhaseController, null, initialResults, initialResults);
|
||||
}
|
||||
}
|
|
@ -44,6 +44,7 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Qu
|
|||
SearchTask task) {
|
||||
super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor,
|
||||
request, listener, shardsIts, startTime, clusterStateVersion, task);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionListenerResponseHandler;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
|
@ -55,7 +56,7 @@ import org.elasticsearch.transport.TransportResponse;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
|
||||
|
@ -119,8 +120,18 @@ public class SearchTransportService extends AbstractLifecycleComponent {
|
|||
|
||||
public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
|
||||
final ActionListener<QuerySearchResultProvider> listener) {
|
||||
transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
|
||||
new ActionListenerResponseHandler<>(listener, QuerySearchResult::new));
|
||||
// we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request
|
||||
// this used to be the QUERY_AND_FETCH which doesn't exists anymore.
|
||||
final boolean fetchDocuments = request.numberOfShards() == 1;
|
||||
Supplier<QuerySearchResultProvider> supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
|
||||
if (connection.getVersion().onOrBefore(Version.V_5_3_0_UNRELEASED) && fetchDocuments) {
|
||||
// TODO this BWC layer can be removed once this is back-ported to 5.3
|
||||
transportService.sendChildRequest(connection, QUERY_FETCH_ACTION_NAME, request, task,
|
||||
new ActionListenerResponseHandler<>(listener, supplier));
|
||||
} else {
|
||||
transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
|
||||
new ActionListenerResponseHandler<>(listener, supplier));
|
||||
}
|
||||
}
|
||||
|
||||
public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task,
|
||||
|
@ -135,12 +146,6 @@ public class SearchTransportService extends AbstractLifecycleComponent {
|
|||
new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new));
|
||||
}
|
||||
|
||||
public void sendExecuteFetch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
|
||||
final ActionListener<QueryFetchSearchResult> listener) {
|
||||
transportService.sendChildRequest(connection, QUERY_FETCH_ACTION_NAME, request, task,
|
||||
new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new));
|
||||
}
|
||||
|
||||
public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task,
|
||||
final ActionListener<ScrollQueryFetchSearchResult> listener) {
|
||||
transportService.sendChildRequest(transportService.getConnection(node), QUERY_FETCH_SCROLL_ACTION_NAME, request, task,
|
||||
|
@ -334,11 +339,15 @@ public class SearchTransportService extends AbstractLifecycleComponent {
|
|||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);
|
||||
|
||||
// this is for BWC with 5.3 until the QUERY_AND_FETCH removal change has been back-ported to 5.x
|
||||
// in 5.3 we will only execute a `indices:data/read/search[phase/query+fetch]` if the node is pre 5.3
|
||||
// such that we can remove this after the back-port.
|
||||
transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
|
||||
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
|
||||
@Override
|
||||
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
QueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
|
||||
assert request.numberOfShards() == 1 : "expected single shard request but got: " + request.numberOfShards();
|
||||
QuerySearchResultProvider result = searchService.executeQueryPhase(request, (SearchTask)task);
|
||||
channel.sendResponse(result);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -36,14 +36,9 @@ public enum SearchType {
|
|||
* document content. The return number of hits is exactly as specified in size, since they are the only ones that
|
||||
* are fetched. This is very handy when the index has a lot of shards (not replicas, shard id groups).
|
||||
*/
|
||||
QUERY_THEN_FETCH((byte) 1),
|
||||
QUERY_THEN_FETCH((byte) 1);
|
||||
// 2 used to be DFS_QUERY_AND_FETCH
|
||||
/**
|
||||
* The most naive (and possibly fastest) implementation is to simply execute the query on all relevant shards
|
||||
* and return the results. Each shard returns size results. Since each shard already returns size hits, this
|
||||
* type actually returns size times number of shards results back to the caller.
|
||||
*/
|
||||
QUERY_AND_FETCH((byte) 3);
|
||||
// 3 used to be QUERY_AND_FETCH
|
||||
|
||||
/**
|
||||
* The default search type ({@link #QUERY_THEN_FETCH}.
|
||||
|
@ -69,10 +64,9 @@ public enum SearchType {
|
|||
public static SearchType fromId(byte id) {
|
||||
if (id == 0) {
|
||||
return DFS_QUERY_THEN_FETCH;
|
||||
} else if (id == 1) {
|
||||
} else if (id == 1
|
||||
|| id == 3) { // TODO this bwc layer can be removed once this is back-ported to 5.3 QUERY_AND_FETCH is removed now
|
||||
return QUERY_THEN_FETCH;
|
||||
} else if (id == 3) {
|
||||
return QUERY_AND_FETCH;
|
||||
} else {
|
||||
throw new IllegalArgumentException("No search type for [" + id + "]");
|
||||
}
|
||||
|
@ -91,8 +85,6 @@ public enum SearchType {
|
|||
return SearchType.DFS_QUERY_THEN_FETCH;
|
||||
} else if ("query_then_fetch".equals(searchType)) {
|
||||
return SearchType.QUERY_THEN_FETCH;
|
||||
} else if ("query_and_fetch".equals(searchType)) {
|
||||
return SearchType.QUERY_AND_FETCH;
|
||||
} else {
|
||||
throw new IllegalArgumentException("No search type for [" + searchType + "]");
|
||||
}
|
||||
|
|
|
@ -54,7 +54,6 @@ import java.util.concurrent.Executor;
|
|||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH;
|
||||
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
|
||||
|
||||
public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
|
||||
|
@ -185,7 +184,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
// optimize search type for cases where there is only one shard group to search on
|
||||
if (shardIterators.size() == 1) {
|
||||
// if we only have one group, then we always want Q_A_F, no need for DFS, and no need to do THEN since we hit one shard
|
||||
searchRequest.searchType(QUERY_AND_FETCH);
|
||||
searchRequest.searchType(QUERY_THEN_FETCH);
|
||||
}
|
||||
if (searchRequest.isSuggestOnly()) {
|
||||
// disable request cache if we have only suggest
|
||||
|
@ -269,11 +268,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
|
||||
clusterStateVersion, task);
|
||||
break;
|
||||
case QUERY_AND_FETCH:
|
||||
searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchTransportService, connectionLookup,
|
||||
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
|
||||
clusterStateVersion, task);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
|
||||
}
|
||||
|
|
|
@ -34,19 +34,9 @@ final class TransportSearchHelper {
|
|||
return new InternalScrollSearchRequest(request, id);
|
||||
}
|
||||
|
||||
static String buildScrollId(SearchType searchType, AtomicArray<? extends SearchPhaseResult> searchPhaseResults) throws IOException {
|
||||
if (searchType == SearchType.DFS_QUERY_THEN_FETCH || searchType == SearchType.QUERY_THEN_FETCH) {
|
||||
return buildScrollId(ParsedScrollId.QUERY_THEN_FETCH_TYPE, searchPhaseResults);
|
||||
} else if (searchType == SearchType.QUERY_AND_FETCH) {
|
||||
return buildScrollId(ParsedScrollId.QUERY_AND_FETCH_TYPE, searchPhaseResults);
|
||||
} else {
|
||||
throw new IllegalStateException("search_type [" + searchType + "] not supported");
|
||||
}
|
||||
}
|
||||
|
||||
static String buildScrollId(String type, AtomicArray<? extends SearchPhaseResult> searchPhaseResults) throws IOException {
|
||||
static String buildScrollId(AtomicArray<? extends SearchPhaseResult> searchPhaseResults) throws IOException {
|
||||
try (RAMOutputStream out = new RAMOutputStream()) {
|
||||
out.writeString(type);
|
||||
out.writeString(searchPhaseResults.length() == 1 ? ParsedScrollId.QUERY_AND_FETCH_TYPE : ParsedScrollId.QUERY_THEN_FETCH_TYPE);
|
||||
out.writeVInt(searchPhaseResults.asList().size());
|
||||
for (AtomicArray.Entry<? extends SearchPhaseResult> entry : searchPhaseResults.asList()) {
|
||||
SearchPhaseResult searchPhaseResult = entry.value;
|
||||
|
|
|
@ -66,7 +66,7 @@ public class TransportSearchScrollAction extends HandledTransportAction<SearchSc
|
|||
action = new SearchScrollQueryThenFetchAsyncAction(logger, clusterService, searchTransportService,
|
||||
searchPhaseController, request, (SearchTask)task, scrollId, listener);
|
||||
break;
|
||||
case QUERY_AND_FETCH_TYPE:
|
||||
case QUERY_AND_FETCH_TYPE: // TODO can we get rid of this?
|
||||
action = new SearchScrollQueryAndFetchAsyncAction(logger, clusterService, searchTransportService,
|
||||
searchPhaseController, request, (SearchTask)task, scrollId, listener);
|
||||
break;
|
||||
|
|
|
@ -1071,8 +1071,6 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
}
|
||||
|
||||
|
||||
private static final Set<SearchType> CACHEABLE_SEARCH_TYPES = EnumSet.of(SearchType.QUERY_THEN_FETCH, SearchType.QUERY_AND_FETCH);
|
||||
|
||||
/**
|
||||
* Can the shard request be cached at all?
|
||||
*/
|
||||
|
@ -1082,7 +1080,7 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
// on the overridden statistics. So if you ran two queries on the same index with different stats
|
||||
// (because an other shard was updated) you would get wrong results because of the scores
|
||||
// (think about top_hits aggs or scripts using the score)
|
||||
if (!CACHEABLE_SEARCH_TYPES.contains(context.searchType())) {
|
||||
if (SearchType.QUERY_THEN_FETCH != context.searchType()) {
|
||||
return false;
|
||||
}
|
||||
IndexSettings settings = context.indexShard().indexSettings();
|
||||
|
|
|
@ -92,7 +92,7 @@ public class RestSearchAction extends BaseRestHandler {
|
|||
// from the REST layer. these modes are an internal optimization and should
|
||||
// not be specified explicitly by the user.
|
||||
String searchType = request.param("search_type");
|
||||
if (SearchType.fromString(searchType).equals(SearchType.QUERY_AND_FETCH) ||
|
||||
if ("query_and_fetch".equals(searchType) ||
|
||||
"dfs_query_and_fetch".equals(searchType)) {
|
||||
throw new IllegalArgumentException("Unsupported search type [" + searchType + "]");
|
||||
} else {
|
||||
|
|
|
@ -265,8 +265,11 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
operationListener.onQueryPhase(context, System.nanoTime() - time);
|
||||
|
||||
final long afterQueryTime = System.nanoTime();
|
||||
operationListener.onQueryPhase(context, afterQueryTime - time);
|
||||
if (request.numberOfShards() == 1) {
|
||||
return executeFetchPhase(context, operationListener, afterQueryTime);
|
||||
}
|
||||
return context.queryResult();
|
||||
} catch (Exception e) {
|
||||
// execution exception can happen while loading the cache, strip it
|
||||
|
@ -283,6 +286,25 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
}
|
||||
}
|
||||
|
||||
private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOperationListener operationListener,
|
||||
long afterQueryTime) {
|
||||
operationListener.onPreFetchPhase(context);
|
||||
try {
|
||||
shortcutDocIdsToLoad(context);
|
||||
fetchPhase.execute(context);
|
||||
if (fetchPhaseShouldFreeContext(context)) {
|
||||
freeContext(context.id());
|
||||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
operationListener.onFailedFetchPhase(context);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
operationListener.onFetchPhase(context, System.nanoTime() - afterQueryTime);
|
||||
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
|
||||
}
|
||||
|
||||
public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request, SearchTask task) {
|
||||
final SearchContext context = findContext(request.id());
|
||||
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
|
||||
|
@ -348,89 +370,6 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
}
|
||||
}
|
||||
|
||||
public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request, SearchTask task) throws IOException {
|
||||
final SearchContext context = createAndPutContext(request);
|
||||
context.incRef();
|
||||
try {
|
||||
contextProcessing(context);
|
||||
context.setTask(task);
|
||||
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
|
||||
operationListener.onPreQueryPhase(context);
|
||||
long time = System.nanoTime();
|
||||
try {
|
||||
loadOrExecuteQueryPhase(request, context);
|
||||
} catch (Exception e) {
|
||||
operationListener.onFailedQueryPhase(context);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
long time2 = System.nanoTime();
|
||||
operationListener.onQueryPhase(context, time2 - time);
|
||||
operationListener.onPreFetchPhase(context);
|
||||
try {
|
||||
shortcutDocIdsToLoad(context);
|
||||
fetchPhase.execute(context);
|
||||
if (fetchPhaseShouldFreeContext(context)) {
|
||||
freeContext(context.id());
|
||||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
operationListener.onFailedFetchPhase(context);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
operationListener.onFetchPhase(context, System.nanoTime() - time2);
|
||||
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
|
||||
} catch (Exception e) {
|
||||
logger.trace("Fetch phase failed", e);
|
||||
processFailure(context, e);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
}
|
||||
|
||||
public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request, SearchTask task) {
|
||||
final SearchContext context = findContext(request.id());
|
||||
context.incRef();
|
||||
try {
|
||||
context.setTask(task);
|
||||
contextProcessing(context);
|
||||
context.searcher().setAggregatedDfs(request.dfs());
|
||||
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
|
||||
operationListener.onPreQueryPhase(context);
|
||||
long time = System.nanoTime();
|
||||
try {
|
||||
queryPhase.execute(context);
|
||||
} catch (Exception e) {
|
||||
operationListener.onFailedQueryPhase(context);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
long time2 = System.nanoTime();
|
||||
operationListener.onQueryPhase(context, time2 - time);
|
||||
operationListener.onPreFetchPhase(context);
|
||||
try {
|
||||
shortcutDocIdsToLoad(context);
|
||||
fetchPhase.execute(context);
|
||||
if (fetchPhaseShouldFreeContext(context)) {
|
||||
freeContext(request.id());
|
||||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
operationListener.onFailedFetchPhase(context);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
operationListener.onFetchPhase(context, System.nanoTime() - time2);
|
||||
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
|
||||
} catch (Exception e) {
|
||||
logger.trace("Fetch phase failed", e);
|
||||
processFailure(context, e);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
} finally {
|
||||
cleanContext(context);
|
||||
}
|
||||
}
|
||||
|
||||
public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request, SearchTask task) {
|
||||
final SearchContext context = findContext(request.id());
|
||||
context.incRef();
|
||||
|
@ -440,30 +379,18 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
|
||||
processScroll(request, context);
|
||||
operationListener.onPreQueryPhase(context);
|
||||
long time = System.nanoTime();
|
||||
final long time = System.nanoTime();
|
||||
try {
|
||||
queryPhase.execute(context);
|
||||
} catch (Exception e) {
|
||||
operationListener.onFailedQueryPhase(context);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
long time2 = System.nanoTime();
|
||||
operationListener.onQueryPhase(context, time2 - time);
|
||||
operationListener.onPreFetchPhase(context);
|
||||
try {
|
||||
shortcutDocIdsToLoad(context);
|
||||
fetchPhase.execute(context);
|
||||
if (fetchPhaseShouldFreeContext(context)) {
|
||||
freeContext(request.id());
|
||||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
operationListener.onFailedFetchPhase(context);
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
operationListener.onFetchPhase(context, System.nanoTime() - time2);
|
||||
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()),
|
||||
long afterQueryTime = System.nanoTime();
|
||||
operationListener.onQueryPhase(context, afterQueryTime - time);
|
||||
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, operationListener, afterQueryTime);
|
||||
|
||||
return new ScrollQueryFetchSearchResult(fetchSearchResult,
|
||||
context.shardTarget());
|
||||
} catch (Exception e) {
|
||||
logger.trace("Fetch phase failed", e);
|
||||
|
|
|
@ -24,11 +24,13 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.internal.InternalSearchHits;
|
||||
import org.elasticsearch.search.query.QuerySearchResult;
|
||||
import org.elasticsearch.search.query.QuerySearchResultProvider;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class FetchSearchResult extends TransportResponse implements FetchSearchResultProvider {
|
||||
public class FetchSearchResult extends QuerySearchResultProvider {
|
||||
|
||||
private long id;
|
||||
private SearchShardTarget shardTarget;
|
||||
|
@ -45,6 +47,11 @@ public class FetchSearchResult extends TransportResponse implements FetchSearchR
|
|||
this.shardTarget = shardTarget;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QuerySearchResult queryResult() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FetchSearchResult fetchResult() {
|
||||
return this;
|
||||
|
|
|
@ -1,27 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.fetch;
|
||||
|
||||
import org.elasticsearch.search.SearchPhaseResult;
|
||||
|
||||
public interface FetchSearchResultProvider extends SearchPhaseResult {
|
||||
|
||||
FetchSearchResult fetchResult();
|
||||
}
|
|
@ -30,7 +30,7 @@ import java.io.IOException;
|
|||
import static org.elasticsearch.search.fetch.FetchSearchResult.readFetchSearchResult;
|
||||
import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchResult;
|
||||
|
||||
public class QueryFetchSearchResult extends QuerySearchResultProvider implements FetchSearchResultProvider {
|
||||
public class QueryFetchSearchResult extends QuerySearchResultProvider {
|
||||
|
||||
private QuerySearchResult queryResult;
|
||||
private FetchSearchResult fetchResult;
|
||||
|
@ -60,11 +60,6 @@ public class QueryFetchSearchResult extends QuerySearchResultProvider implements
|
|||
fetchResult.shardTarget(shardTarget);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean includeFetch() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QuerySearchResult queryResult() {
|
||||
return queryResult;
|
||||
|
|
|
@ -55,7 +55,8 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
|
|||
|
||||
public ShardSearchTransportRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards,
|
||||
AliasFilter aliasFilter, float indexBoost, long nowInMillis) {
|
||||
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, numberOfShards, aliasFilter, indexBoost, nowInMillis);
|
||||
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, numberOfShards, aliasFilter, indexBoost,
|
||||
nowInMillis);
|
||||
this.originalIndices = new OriginalIndices(searchRequest);
|
||||
}
|
||||
|
||||
|
|
|
@ -260,16 +260,12 @@ public class QueryPhase implements SearchPhase {
|
|||
topDocs.totalHits = scrollContext.totalHits;
|
||||
topDocs.setMaxScore(scrollContext.maxScore);
|
||||
}
|
||||
switch (searchType) {
|
||||
case QUERY_AND_FETCH:
|
||||
// for QUERY_AND_FETCH, we already know the last emitted doc
|
||||
if (searchContext.request().numberOfShards() == 1) {
|
||||
// if we fetch the document in the same roundtrip, we already know the last emitted doc
|
||||
if (topDocs.scoreDocs.length > 0) {
|
||||
// set the last emitted doc
|
||||
scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
return topDocs;
|
||||
|
|
|
@ -65,11 +65,6 @@ public class QuerySearchResult extends QuerySearchResultProvider {
|
|||
this.shardTarget = shardTarget;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean includeFetch() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QuerySearchResult queryResult() {
|
||||
return this;
|
||||
|
|
|
@ -20,14 +20,22 @@
|
|||
package org.elasticsearch.search.query;
|
||||
|
||||
import org.elasticsearch.search.SearchPhaseResult;
|
||||
import org.elasticsearch.search.fetch.FetchSearchResult;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
|
||||
public abstract class QuerySearchResultProvider extends TransportResponse implements SearchPhaseResult {
|
||||
|
||||
/**
|
||||
* If both query and fetch happened on the same call.
|
||||
* Returns the query result iff it's included in this response otherwise <code>null</code>
|
||||
*/
|
||||
public abstract boolean includeFetch();
|
||||
public QuerySearchResult queryResult() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public abstract QuerySearchResult queryResult();
|
||||
/**
|
||||
* Returns the fetch result iff it's included in this response otherwise <code>null</code>
|
||||
*/
|
||||
public FetchSearchResult fetchResult() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -580,27 +580,6 @@ public class IndicesRequestIT extends ESIntegTestCase {
|
|||
assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
|
||||
}
|
||||
|
||||
public void testSearchQueryAndFetch() throws Exception {
|
||||
interceptTransportActions(SearchTransportService.QUERY_FETCH_ACTION_NAME,
|
||||
SearchTransportService.FREE_CONTEXT_ACTION_NAME);
|
||||
|
||||
String[] randomIndicesOrAliases = randomIndicesOrAliases();
|
||||
for (int i = 0; i < randomIndicesOrAliases.length; i++) {
|
||||
client().prepareIndex(randomIndicesOrAliases[i], "type", "id-" + i).setSource("field", "value").get();
|
||||
}
|
||||
refresh();
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest(randomIndicesOrAliases).searchType(SearchType.QUERY_AND_FETCH);
|
||||
SearchResponse searchResponse = internalCluster().coordOnlyNodeClient().search(searchRequest).actionGet();
|
||||
assertNoFailures(searchResponse);
|
||||
assertThat(searchResponse.getHits().totalHits(), greaterThan(0L));
|
||||
|
||||
clearInterceptedActions();
|
||||
assertSameIndices(searchRequest, SearchTransportService.QUERY_FETCH_ACTION_NAME);
|
||||
//free context messages are not necessarily sent, but if they are, check their indices
|
||||
assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
|
||||
}
|
||||
|
||||
private static void assertSameIndices(IndicesRequest originalRequest, String... actions) {
|
||||
assertSameIndices(originalRequest, false, actions);
|
||||
}
|
||||
|
|
|
@ -372,7 +372,6 @@ public class TasksIT extends ESIntegTestCase {
|
|||
assertEquals(mainTask.get(0).getTaskId(), taskInfo.getParentTaskId());
|
||||
switch (taskInfo.getAction()) {
|
||||
case SearchTransportService.QUERY_ACTION_NAME:
|
||||
case SearchTransportService.QUERY_FETCH_ACTION_NAME:
|
||||
case SearchTransportService.DFS_ACTION_NAME:
|
||||
assertTrue(taskInfo.getDescription(), Regex.simpleMatch("shardId[[test][*]]", taskInfo.getDescription()));
|
||||
break;
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.fetch.FetchSearchResult;
|
||||
import org.elasticsearch.search.fetch.FetchSearchResultProvider;
|
||||
import org.elasticsearch.search.internal.InternalSearchHit;
|
||||
import org.elasticsearch.search.internal.InternalSearchHits;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
|
@ -197,8 +196,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
|||
return TopDocs.merge(topN, shardTopDocs).scoreDocs;
|
||||
}
|
||||
|
||||
private AtomicArray<FetchSearchResultProvider> generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) {
|
||||
AtomicArray<FetchSearchResultProvider> fetchResults = new AtomicArray<>(nShards);
|
||||
private AtomicArray<QuerySearchResultProvider> generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) {
|
||||
AtomicArray<QuerySearchResultProvider> fetchResults = new AtomicArray<>(nShards);
|
||||
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
|
||||
float maxScore = -1F;
|
||||
SearchShardTarget shardTarget = new SearchShardTarget("", new Index("", ""), shardIndex);
|
||||
|
|
|
@ -54,7 +54,7 @@ public class SearchWithRejectionsIT extends ESIntegTestCase {
|
|||
|
||||
int numSearches = 10;
|
||||
Future<SearchResponse>[] responses = new Future[numSearches];
|
||||
SearchType searchType = randomFrom(SearchType.DEFAULT, SearchType.QUERY_AND_FETCH, SearchType.QUERY_THEN_FETCH, SearchType.DFS_QUERY_THEN_FETCH);
|
||||
SearchType searchType = randomFrom(SearchType.DEFAULT, SearchType.QUERY_THEN_FETCH, SearchType.DFS_QUERY_THEN_FETCH);
|
||||
logger.info("search type is {}", searchType);
|
||||
for (int i = 0; i < numSearches; i++) {
|
||||
responses[i] = client().prepareSearch()
|
||||
|
|
|
@ -99,7 +99,7 @@ public class DiversifiedSamplerIT extends ESIntegTestCase {
|
|||
// Tests that we can refer to nested elements under a sample in a path
|
||||
// statement
|
||||
boolean asc = randomBoolean();
|
||||
SearchResponse response = client().prepareSearch("test").setTypes("book").setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
SearchResponse response = client().prepareSearch("test").setTypes("book").setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.addAggregation(terms("genres")
|
||||
.field("genre")
|
||||
.order(Terms.Order.aggregation("sample>max_price.value", asc))
|
||||
|
@ -132,7 +132,7 @@ public class DiversifiedSamplerIT extends ESIntegTestCase {
|
|||
sampleAgg.field("author").maxDocsPerValue(MAX_DOCS_PER_AUTHOR).executionHint(randomExecutionHint());
|
||||
sampleAgg.subAggregation(terms("authors").field("author"));
|
||||
SearchResponse response = client().prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("genre", "fantasy"))
|
||||
.setFrom(0).setSize(60)
|
||||
.addAggregation(sampleAgg)
|
||||
|
@ -158,7 +158,7 @@ public class DiversifiedSamplerIT extends ESIntegTestCase {
|
|||
sampleAgg.subAggregation(terms("authors").field("author"));
|
||||
|
||||
rootTerms.subAggregation(sampleAgg);
|
||||
SearchResponse response = client().prepareSearch("test").setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
SearchResponse response = client().prepareSearch("test").setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.addAggregation(rootTerms).execute().actionGet();
|
||||
assertSearchResponse(response);
|
||||
Terms genres = response.getAggregations().get("genres");
|
||||
|
@ -188,7 +188,7 @@ public class DiversifiedSamplerIT extends ESIntegTestCase {
|
|||
sampleAgg.subAggregation(terms("genres").field("genre"));
|
||||
|
||||
rootSample.subAggregation(sampleAgg);
|
||||
SearchResponse response = client().prepareSearch("test").setSearchType(SearchType.QUERY_AND_FETCH).addAggregation(rootSample)
|
||||
SearchResponse response = client().prepareSearch("test").setSearchType(SearchType.QUERY_THEN_FETCH).addAggregation(rootSample)
|
||||
.execute().actionGet();
|
||||
assertSearchResponse(response);
|
||||
Sampler genreSample = response.getAggregations().get("genreSample");
|
||||
|
@ -213,7 +213,7 @@ public class DiversifiedSamplerIT extends ESIntegTestCase {
|
|||
DiversifiedAggregationBuilder sampleAgg = new DiversifiedAggregationBuilder("sample").shardSize(100).field("author")
|
||||
.maxDocsPerValue(1);
|
||||
sampleAgg.subAggregation(terms("authors").field("author"));
|
||||
SearchResponse response = client().prepareSearch("idx_unmapped_author", "test").setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
SearchResponse response = client().prepareSearch("idx_unmapped_author", "test").setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("genre", "fantasy")).setFrom(0).setSize(60).addAggregation(sampleAgg)
|
||||
.execute().actionGet();
|
||||
assertSearchResponse(response);
|
||||
|
@ -229,7 +229,7 @@ public class DiversifiedSamplerIT extends ESIntegTestCase {
|
|||
DiversifiedAggregationBuilder sampleAgg = new DiversifiedAggregationBuilder("sample").shardSize(100);
|
||||
sampleAgg.field("author").maxDocsPerValue(MAX_DOCS_PER_AUTHOR).executionHint(randomExecutionHint());
|
||||
sampleAgg.subAggregation(terms("authors").field("author"));
|
||||
SearchResponse response = client().prepareSearch("idx_unmapped", "idx_unmapped_author").setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
SearchResponse response = client().prepareSearch("idx_unmapped", "idx_unmapped_author").setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("genre", "fantasy")).setFrom(0).setSize(60).addAggregation(sampleAgg).execute().actionGet();
|
||||
assertSearchResponse(response);
|
||||
Sampler sample = response.getAggregations().get("sample");
|
||||
|
|
|
@ -95,7 +95,7 @@ public class SamplerIT extends ESIntegTestCase {
|
|||
// Tests that we can refer to nested elements under a sample in a path
|
||||
// statement
|
||||
boolean asc = randomBoolean();
|
||||
SearchResponse response = client().prepareSearch("test").setTypes("book").setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
SearchResponse response = client().prepareSearch("test").setTypes("book").setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.addAggregation(terms("genres")
|
||||
.field("genre")
|
||||
.order(Terms.Order.aggregation("sample>max_price.value", asc))
|
||||
|
@ -125,7 +125,7 @@ public class SamplerIT extends ESIntegTestCase {
|
|||
public void testSimpleSampler() throws Exception {
|
||||
SamplerAggregationBuilder sampleAgg = sampler("sample").shardSize(100);
|
||||
sampleAgg.subAggregation(terms("authors").field("author"));
|
||||
SearchResponse response = client().prepareSearch("test").setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
SearchResponse response = client().prepareSearch("test").setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("genre", "fantasy")).setFrom(0).setSize(60).addAggregation(sampleAgg).execute().actionGet();
|
||||
assertSearchResponse(response);
|
||||
Sampler sample = response.getAggregations().get("sample");
|
||||
|
@ -143,7 +143,7 @@ public class SamplerIT extends ESIntegTestCase {
|
|||
SamplerAggregationBuilder sampleAgg = sampler("sample").shardSize(100);
|
||||
sampleAgg.subAggregation(terms("authors").field("author"));
|
||||
SearchResponse response = client().prepareSearch("idx_unmapped")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("genre", "fantasy"))
|
||||
.setFrom(0).setSize(60)
|
||||
.addAggregation(sampleAgg)
|
||||
|
@ -160,7 +160,7 @@ public class SamplerIT extends ESIntegTestCase {
|
|||
SamplerAggregationBuilder sampleAgg = sampler("sample").shardSize(100);
|
||||
sampleAgg.subAggregation(terms("authors").field("author"));
|
||||
SearchResponse response = client().prepareSearch("idx_unmapped", "test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("genre", "fantasy"))
|
||||
.setFrom(0).setSize(60).setExplain(true)
|
||||
.addAggregation(sampleAgg)
|
||||
|
|
|
@ -127,7 +127,7 @@ public class SignificantTermsIT extends ESIntegTestCase {
|
|||
|
||||
public void testStructuredAnalysis() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("description", "terje"))
|
||||
.setFrom(0).setSize(60).setExplain(true)
|
||||
.addAggregation(significantTerms("mySignificantTerms").field("fact_category").executionHint(randomExecutionHint())
|
||||
|
@ -143,7 +143,7 @@ public class SignificantTermsIT extends ESIntegTestCase {
|
|||
public void testStructuredAnalysisWithIncludeExclude() throws Exception {
|
||||
long[] excludeTerms = { MUSIC_CATEGORY };
|
||||
SearchResponse response = client().prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("description", "paul"))
|
||||
.setFrom(0).setSize(60).setExplain(true)
|
||||
.addAggregation(significantTerms("mySignificantTerms").field("fact_category").executionHint(randomExecutionHint())
|
||||
|
@ -223,7 +223,7 @@ public class SignificantTermsIT extends ESIntegTestCase {
|
|||
|
||||
public void testUnmapped() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx_unmapped")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("description", "terje"))
|
||||
.setFrom(0).setSize(60).setExplain(true)
|
||||
.addAggregation(significantTerms("mySignificantTerms").field("fact_category").executionHint(randomExecutionHint())
|
||||
|
@ -237,7 +237,7 @@ public class SignificantTermsIT extends ESIntegTestCase {
|
|||
|
||||
public void testTextAnalysis() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("description", "terje"))
|
||||
.setFrom(0).setSize(60).setExplain(true)
|
||||
.addAggregation(significantTerms("mySignificantTerms").field("description").executionHint(randomExecutionHint())
|
||||
|
@ -251,7 +251,7 @@ public class SignificantTermsIT extends ESIntegTestCase {
|
|||
|
||||
public void testTextAnalysisGND() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("description", "terje"))
|
||||
.setFrom(0).setSize(60).setExplain(true)
|
||||
.addAggregation(significantTerms("mySignificantTerms").field("description").executionHint(randomExecutionHint()).significanceHeuristic(new GND(true))
|
||||
|
@ -265,7 +265,7 @@ public class SignificantTermsIT extends ESIntegTestCase {
|
|||
|
||||
public void testTextAnalysisChiSquare() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("description", "terje"))
|
||||
.setFrom(0).setSize(60).setExplain(true)
|
||||
.addAggregation(significantTerms("mySignificantTerms").field("description").executionHint(randomExecutionHint()).significanceHeuristic(new ChiSquare(false,true))
|
||||
|
@ -280,7 +280,7 @@ public class SignificantTermsIT extends ESIntegTestCase {
|
|||
public void testTextAnalysisPercentageScore() throws Exception {
|
||||
SearchResponse response = client()
|
||||
.prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("description", "terje"))
|
||||
.setFrom(0)
|
||||
.setSize(60)
|
||||
|
@ -299,7 +299,7 @@ public class SignificantTermsIT extends ESIntegTestCase {
|
|||
// We search for the name of a snowboarder but use music-related content (fact_category:1)
|
||||
// as the background source of term statistics.
|
||||
SearchResponse response = client().prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("description", "terje"))
|
||||
.setFrom(0).setSize(60).setExplain(true)
|
||||
.addAggregation(significantTerms("mySignificantTerms").field("description")
|
||||
|
@ -323,7 +323,7 @@ public class SignificantTermsIT extends ESIntegTestCase {
|
|||
|
||||
public void testFilteredAnalysis() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("description", "weller"))
|
||||
.setFrom(0).setSize(60).setExplain(true)
|
||||
.addAggregation(significantTerms("mySignificantTerms").field("description")
|
||||
|
@ -348,7 +348,7 @@ public class SignificantTermsIT extends ESIntegTestCase {
|
|||
{ "paul", "smith" },
|
||||
{ "craig", "kelly", "terje", "haakonsen", "burton" }};
|
||||
SearchResponse response = client().prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.addAggregation(terms("myCategories").field("fact_category").minDocCount(2)
|
||||
.subAggregation(
|
||||
significantTerms("mySignificantTerms").field("description")
|
||||
|
@ -373,7 +373,7 @@ public class SignificantTermsIT extends ESIntegTestCase {
|
|||
|
||||
public void testPartiallyUnmapped() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx_unmapped", "test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("description", "terje"))
|
||||
.setFrom(0).setSize(60).setExplain(true)
|
||||
.addAggregation(significantTerms("mySignificantTerms").field("description")
|
||||
|
@ -388,7 +388,7 @@ public class SignificantTermsIT extends ESIntegTestCase {
|
|||
|
||||
public void testPartiallyUnmappedWithFormat() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx_unmapped", "test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(boolQuery().should(termQuery("description", "the")).should(termQuery("description", "terje")))
|
||||
.setFrom(0).setSize(60).setExplain(true)
|
||||
.addAggregation(significantTerms("mySignificantTerms")
|
||||
|
@ -425,7 +425,7 @@ public class SignificantTermsIT extends ESIntegTestCase {
|
|||
|
||||
public void testDefaultSignificanceHeuristic() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("description", "terje"))
|
||||
.setFrom(0).setSize(60).setExplain(true)
|
||||
.addAggregation(significantTerms("mySignificantTerms")
|
||||
|
@ -442,7 +442,7 @@ public class SignificantTermsIT extends ESIntegTestCase {
|
|||
|
||||
public void testMutualInformation() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_AND_FETCH)
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("description", "terje"))
|
||||
.setFrom(0).setSize(60).setExplain(true)
|
||||
.addAggregation(significantTerms("mySignificantTerms")
|
||||
|
|
|
@ -49,7 +49,6 @@ import java.util.Set;
|
|||
import java.util.TreeSet;
|
||||
|
||||
import static org.elasticsearch.action.search.SearchType.DFS_QUERY_THEN_FETCH;
|
||||
import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH;
|
||||
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
|
||||
import static org.elasticsearch.client.Requests.createIndexRequest;
|
||||
import static org.elasticsearch.client.Requests.searchRequest;
|
||||
|
@ -279,45 +278,6 @@ public class TransportTwoNodesSearchIT extends ESIntegTestCase {
|
|||
assertEquals(100, total);
|
||||
}
|
||||
|
||||
public void testQueryAndFetch() throws Exception {
|
||||
prepareData(3);
|
||||
|
||||
SearchSourceBuilder source = searchSource()
|
||||
.query(termQuery("multi", "test"))
|
||||
.from(0).size(20).explain(true);
|
||||
|
||||
Set<String> expectedIds = new HashSet<>();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
expectedIds.add(Integer.toString(i));
|
||||
}
|
||||
|
||||
SearchResponse searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_AND_FETCH).scroll(new Scroll(timeValueMinutes(10)))).actionGet();
|
||||
assertNoFailures(searchResponse);
|
||||
assertThat(searchResponse.getHits().totalHits(), equalTo(100L));
|
||||
assertThat(searchResponse.getHits().hits().length, equalTo(60)); // 20 per shard
|
||||
for (int i = 0; i < 60; i++) {
|
||||
SearchHit hit = searchResponse.getHits().hits()[i];
|
||||
// System.out.println(hit.shard() + ": " + hit.explanation());
|
||||
assertThat(hit.explanation(), notNullValue());
|
||||
// we can't really check here, since its query and fetch, and not controlling distribution
|
||||
// assertThat("id[" + hit.id() + "]", hit.id(), equalTo(Integer.toString(100 - i - 1)));
|
||||
assertThat("make sure we don't have duplicates", expectedIds.remove(hit.id()), notNullValue());
|
||||
}
|
||||
|
||||
do {
|
||||
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll("10m").get();
|
||||
assertThat(searchResponse.getHits().totalHits(), equalTo(100L));
|
||||
assertThat(searchResponse.getHits().hits().length, lessThanOrEqualTo(40));
|
||||
for (int i = 0; i < searchResponse.getHits().hits().length; i++) {
|
||||
SearchHit hit = searchResponse.getHits().hits()[i];
|
||||
// we don't do perfect sorting when it comes to scroll with Query+Fetch
|
||||
assertThat("make sure we don't have duplicates", expectedIds.remove(hit.id()), notNullValue());
|
||||
}
|
||||
} while (searchResponse.getHits().getHits().length > 0);
|
||||
clearScroll(searchResponse.getScrollId());
|
||||
assertThat("make sure we got all [" + expectedIds + "]", expectedIds.size(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testSimpleFacets() throws Exception {
|
||||
prepareData();
|
||||
|
||||
|
|
|
@ -256,16 +256,9 @@ public class DuelScrollIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testDuelIndexOrderQueryAndFetch() throws Exception {
|
||||
final SearchType searchType = SearchType.QUERY_AND_FETCH;
|
||||
// QUERY_AND_FETCH only works with a single shard
|
||||
final int numDocs = createIndex(true);
|
||||
testDuelIndexOrder(searchType, false, numDocs);
|
||||
testDuelIndexOrder(searchType, true, numDocs);
|
||||
}
|
||||
|
||||
public void testDuelIndexOrderQueryThenFetch() throws Exception {
|
||||
final SearchType searchType = RandomPicks.randomFrom(random(), Arrays.asList(SearchType.QUERY_THEN_FETCH, SearchType.DFS_QUERY_THEN_FETCH));
|
||||
final SearchType searchType = RandomPicks.randomFrom(random(), Arrays.asList(SearchType.QUERY_THEN_FETCH,
|
||||
SearchType.DFS_QUERY_THEN_FETCH));
|
||||
final int numDocs = createIndex(false);
|
||||
testDuelIndexOrder(searchType, false, numDocs);
|
||||
testDuelIndexOrder(searchType, true, numDocs);
|
||||
|
|
Loading…
Reference in New Issue