Removes ExpandCollapseSearchResponseListener, search response listeners and blocking calls

This changes removes the SearchResponseListener that was used by the ExpandCollapseSearchResponseListener to expand collapsed hits.
The removal of SearchResponseListener is not a breaking change because it was never released.
This change also replace the blocking call in ExpandCollapseSearchResponseListener by a single asynchronous multi search request. The parallelism of the expand request can be set via CollapseBuilder#max_concurrent_group_searches

Closes #23048
This commit is contained in:
Jim Ferenczi 2017-02-09 18:06:10 +01:00 committed by GitHub
parent 33915aefd8
commit 94087b3274
13 changed files with 217 additions and 291 deletions

View File

@ -559,25 +559,13 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
private void sendResponse(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs,
String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) {
// this is only a temporary fix since field collapsing executes a blocking call on response
// which could be a network thread. we are fixing this but for now we just fork off again.
// this should be removed once https://github.com/elastic/elasticsearch/issues/23048 is fixed
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final boolean isScrollRequest = request.scroll() != null;
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs,
reducedQueryPhase, fetchResultsArr);
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
@Override
public void onFailure(Exception e) {
raisePhaseFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
});
final boolean isScrollRequest = request.scroll() != null;
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs, reducedQueryPhase,
fetchResultsArr);
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
}
}

View File

@ -38,6 +38,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -45,8 +47,6 @@ import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
@ -65,7 +65,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -83,25 +82,11 @@ public class SearchPhaseController extends AbstractComponent {
private final BigArrays bigArrays;
private final ScriptService scriptService;
private final List<BiConsumer<SearchRequest, SearchResponse> > searchResponseListener;
public SearchPhaseController(Settings settings, BigArrays bigArrays, ScriptService scriptService) {
this(settings, bigArrays, scriptService, Collections.emptyList());
}
public SearchPhaseController(Settings settings, BigArrays bigArrays, ScriptService scriptService,
List<BiConsumer<SearchRequest, SearchResponse> > searchResponseListener) {
super(settings);
this.bigArrays = bigArrays;
this.scriptService = scriptService;
this.searchResponseListener = searchResponseListener;
}
/**
* Returns the search response listeners registry
*/
public List<BiConsumer<SearchRequest, SearchResponse> > getSearchResponseListener() {
return searchResponseListener;
}
public AggregatedDfs aggregateDfs(AtomicArray<DfsSearchResult> results) {

View File

@ -173,6 +173,15 @@ public class SearchTransportService extends AbstractLifecycleComponent {
new ActionListenerResponseHandler<>(listener, FetchSearchResult::new));
}
/**
* Used by {@link TransportSearchAction} to send the expand queries (field collapsing).
*/
void sendExecuteMultiSearch(DiscoveryNode node, final MultiSearchRequest request, SearchTask task,
final ActionListener<MultiSearchResponse> listener) {
transportService.sendChildRequest(transportService.getConnection(node), MultiSearchAction.NAME, request, task,
new ActionListenerResponseHandler<>(listener, MultiSearchResponse::new));
}
public RemoteClusterService getRemoteClusterService() {
return remoteClusterService;
}

View File

@ -36,8 +36,15 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -47,11 +54,11 @@ import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Function;
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
@ -212,16 +219,18 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
return connection;
};
// Only enrich the search response iff collapsing has been specified:
final ActionListener<SearchResponse> wrapper;
if (searchPhaseController.getSearchResponseListener().size() > 0) {
wrapper = ActionListener.wrap(searchResponse -> {
List<BiConsumer<SearchRequest, SearchResponse>> responseListeners =
searchPhaseController.getSearchResponseListener();
for (BiConsumer<SearchRequest, SearchResponse> respListener : responseListeners) {
respListener.accept(searchRequest, searchResponse);
}
listener.onResponse(searchResponse);
if (searchRequest.source() != null &&
searchRequest.source().collapse() != null &&
searchRequest.source().collapse().getInnerHit() != null) {
wrapper = ActionListener.wrap(searchResponse -> {
if (searchResponse.getHits().getHits().length == 0) {
listener.onResponse(searchResponse);
} else {
expandCollapsedHits(nodes.getLocalNode(), task, searchRequest, searchResponse, listener);
}
}, listener::onFailure);
} else {
wrapper = listener;
@ -284,4 +293,90 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
+ "] to a greater value if you really want to query that many shards at the same time.");
}
}
/**
* Expands collapsed using the {@link CollapseBuilder#innerHit} options.
*/
void expandCollapsedHits(DiscoveryNode node,
SearchTask parentTask,
SearchRequest searchRequest,
SearchResponse searchResponse,
ActionListener<SearchResponse> finalListener) {
CollapseBuilder collapseBuilder = searchRequest.source().collapse();
MultiSearchRequest multiRequest = new MultiSearchRequest();
if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {
multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());
}
for (SearchHit hit : searchResponse.getHits()) {
BoolQueryBuilder groupQuery = new BoolQueryBuilder();
Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
if (collapseValue != null) {
groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));
} else {
groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));
}
QueryBuilder origQuery = searchRequest.source().query();
if (origQuery != null) {
groupQuery.must(origQuery);
}
SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(collapseBuilder.getInnerHit())
.query(groupQuery);
SearchRequest groupRequest = new SearchRequest(searchRequest.indices())
.types(searchRequest.types())
.source(sourceBuilder);
multiRequest.add(groupRequest);
}
searchTransportService.sendExecuteMultiSearch(node, multiRequest, parentTask,
ActionListener.wrap(response -> {
Iterator<MultiSearchResponse.Item> it = response.iterator();
for (SearchHit hit : searchResponse.getHits()) {
MultiSearchResponse.Item item = it.next();
if (item.isFailure()) {
finalListener.onFailure(item.getFailure());
return;
}
SearchHits innerHits = item.getResponse().getHits();
if (hit.getInnerHits() == null) {
hit.setInnerHits(new HashMap<>(1));
}
hit.getInnerHits().put(collapseBuilder.getInnerHit().getName(), innerHits);
}
finalListener.onResponse(searchResponse);
}, finalListener::onFailure)
);
}
private SearchSourceBuilder buildExpandSearchSourceBuilder(InnerHitBuilder options) {
SearchSourceBuilder groupSource = new SearchSourceBuilder();
groupSource.from(options.getFrom());
groupSource.size(options.getSize());
if (options.getSorts() != null) {
options.getSorts().forEach(groupSource::sort);
}
if (options.getFetchSourceContext() != null) {
if (options.getFetchSourceContext().includes() == null && options.getFetchSourceContext().excludes() == null) {
groupSource.fetchSource(options.getFetchSourceContext().fetchSource());
} else {
groupSource.fetchSource(options.getFetchSourceContext().includes(),
options.getFetchSourceContext().excludes());
}
}
if (options.getDocValueFields() != null) {
options.getDocValueFields().forEach(groupSource::docValueField);
}
if (options.getStoredFieldsContext() != null && options.getStoredFieldsContext().fieldNames() != null) {
options.getStoredFieldsContext().fieldNames().forEach(groupSource::storedField);
}
if (options.getScriptFields() != null) {
for (SearchSourceBuilder.ScriptField field : options.getScriptFields()) {
groupSource.scriptField(field.fieldName(), field.script());
}
}
if (options.getHighlightBuilder() != null) {
groupSource.highlighter(options.getHighlightBuilder());
}
groupSource.explain(options.isExplain());
groupSource.trackScores(options.isTrackScores());
return groupSource;
}
}

View File

@ -350,7 +350,7 @@ public class Node implements Closeable {
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
modules.add(indicesModule);
SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class), client);
SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
@ -451,7 +451,7 @@ public class Node implements Closeable {
b.bind(SearchTransportService.class).toInstance(new SearchTransportService(settings,
settingsModule.getClusterSettings(), transportService));
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays,
scriptModule.getScriptService(), searchModule.getSearchResponseListeners()));
scriptModule.getScriptService()));
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);

View File

@ -22,7 +22,6 @@ package org.elasticsearch.search;
import org.apache.lucene.search.BooleanQuery;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.NamedRegistry;
import org.elasticsearch.common.geo.ShapesAvailability;
import org.elasticsearch.common.geo.builders.ShapeBuilders;
@ -223,7 +222,6 @@ import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator;
import org.elasticsearch.search.collapse.ExpandCollapseSearchResponseListener;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.subphase.DocValueFieldsFetchSubPhase;
@ -280,17 +278,12 @@ public class SearchModule {
"moving_avg_model");
private final List<FetchSubPhase> fetchSubPhases = new ArrayList<>();
private final List<BiConsumer<SearchRequest, SearchResponse> > searchResponseListeners = new ArrayList<> ();
private final Settings settings;
private final List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>();
private final List<NamedXContentRegistry.Entry> namedXContents = new ArrayList<>();
public SearchModule(Settings settings, boolean transportClient, List<SearchPlugin> plugins) {
this(settings, transportClient, plugins, null);
}
public SearchModule(Settings settings, boolean transportClient, List<SearchPlugin> plugins, Client client) {
this.settings = settings;
this.transportClient = transportClient;
registerSuggesters(plugins);
@ -306,9 +299,6 @@ public class SearchModule {
registerPipelineAggregations(plugins);
registerFetchSubPhases(plugins);
registerSearchExts(plugins);
if (false == transportClient) {
registerSearchResponseListeners(client, plugins);
}
registerShapes();
}
@ -341,13 +331,6 @@ public class SearchModule {
return movingAverageModelParserRegistry;
}
/**
* Returns the search response listeners registry
*/
public List<BiConsumer<SearchRequest, SearchResponse> > getSearchResponseListeners() {
return searchResponseListeners;
}
private void registerAggregations(List<SearchPlugin> plugins) {
registerAggregation(new AggregationSpec(AvgAggregationBuilder.NAME, AvgAggregationBuilder::new, AvgAggregationBuilder::parse)
.addResultReader(InternalAvg::new));
@ -699,13 +682,6 @@ public class SearchModule {
registerFromPlugin(plugins, p -> p.getFetchSubPhases(context), this::registerFetchSubPhase);
}
private void registerSearchResponseListeners(Client client, List<SearchPlugin> plugins) {
if (client != null) {
registerSearchResponseListener(new ExpandCollapseSearchResponseListener(client));
}
registerFromPlugin(plugins, p -> p.getSearchResponseListeners(), this::registerSearchResponseListener);
}
private void registerSearchExts(List<SearchPlugin> plugins) {
registerFromPlugin(plugins, SearchPlugin::getSearchExts, this::registerSearchExt);
}
@ -791,10 +767,6 @@ public class SearchModule {
(p, c) -> spec.getParser().fromXContent((QueryParseContext) c)));
}
private void registerSearchResponseListener(BiConsumer<SearchRequest, SearchResponse> listener) {
searchResponseListeners.add(requireNonNull(listener, "SearchResponseListener must not be null"));
}
public FetchPhase getFetchPhase() {
return new FetchPhase(fetchSubPhases);
}

View File

@ -45,17 +45,20 @@ import java.util.Objects;
public class CollapseBuilder extends ToXContentToBytes implements Writeable {
public static final ParseField FIELD_FIELD = new ParseField("field");
public static final ParseField INNER_HITS_FIELD = new ParseField("inner_hits");
public static final ParseField MAX_CONCURRENT_GROUP_REQUESTS_FIELD = new ParseField("max_concurrent_group_searches");
private static final ObjectParser<CollapseBuilder, QueryParseContext> PARSER =
new ObjectParser<>("collapse", CollapseBuilder::new);
static {
PARSER.declareString(CollapseBuilder::setField, FIELD_FIELD);
PARSER.declareInt(CollapseBuilder::setMaxConcurrentGroupRequests, MAX_CONCURRENT_GROUP_REQUESTS_FIELD);
PARSER.declareObject(CollapseBuilder::setInnerHits,
(p, c) -> InnerHitBuilder.fromXContent(c), INNER_HITS_FIELD);
}
private String field;
private InnerHitBuilder innerHit;
private int maxConcurrentGroupRequests = 0;
private CollapseBuilder() {}
@ -70,12 +73,14 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable {
public CollapseBuilder(StreamInput in) throws IOException {
this.field = in.readString();
this.maxConcurrentGroupRequests = in.readVInt();
this.innerHit = in.readOptionalWriteable(InnerHitBuilder::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(field);
out.writeVInt(maxConcurrentGroupRequests);
out.writeOptionalWriteable(innerHit);
}
@ -84,6 +89,7 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable {
return builder;
}
// for object parser only
private CollapseBuilder setField(String field) {
if (Strings.isEmpty(field)) {
throw new IllegalArgumentException("field name is null or empty");
@ -97,6 +103,14 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable {
return this;
}
public CollapseBuilder setMaxConcurrentGroupRequests(int num) {
if (num < 1) {
throw new IllegalArgumentException("maxConcurrentGroupRequests` must be positive");
}
this.maxConcurrentGroupRequests = num;
return this;
}
/**
* The name of the field to collapse against
*/
@ -111,6 +125,13 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable {
return this.innerHit;
}
/**
* Returns the amount of group requests that are allowed to be ran concurrently in the inner_hits phase.
*/
public int getMaxConcurrentGroupRequests() {
return maxConcurrentGroupRequests;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
@ -121,6 +142,9 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable {
private void innerToXContent(XContentBuilder builder) throws IOException {
builder.field(FIELD_FIELD.getPreferredName(), field);
if (maxConcurrentGroupRequests > 0) {
builder.field(MAX_CONCURRENT_GROUP_REQUESTS_FIELD.getPreferredName(), maxConcurrentGroupRequests);
}
if (innerHit != null) {
builder.field(INNER_HITS_FIELD.getPreferredName(), innerHit);
}
@ -133,13 +157,18 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable {
CollapseBuilder that = (CollapseBuilder) o;
if (field != null ? !field.equals(that.field) : that.field != null) return false;
if (maxConcurrentGroupRequests != that.maxConcurrentGroupRequests) return false;
if (!field.equals(that.field)) return false;
return innerHit != null ? innerHit.equals(that.innerHit) : that.innerHit == null;
}
@Override
public int hashCode() {
return Objects.hash(this.field, this.innerHit);
int result = field.hashCode();
result = 31 * result + (innerHit != null ? innerHit.hashCode() : 0);
result = 31 * result + maxConcurrentGroupRequests;
return result;
}
public CollapseContext build(SearchContext context) {

View File

@ -1,117 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.collapse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.util.HashMap;
import java.util.Objects;
import java.util.function.BiConsumer;
/**
* A search response listener that intercepts the search response and expands collapsed hits
* using the {@link CollapseBuilder#innerHit} options.
*/
public class ExpandCollapseSearchResponseListener implements BiConsumer<SearchRequest, SearchResponse> {
private final Client client;
public ExpandCollapseSearchResponseListener(Client client) {
this.client = Objects.requireNonNull(client);
}
@Override
public void accept(SearchRequest searchRequest, SearchResponse searchResponse) {
if (searchRequest.source() == null) {
return ;
}
CollapseBuilder collapseBuilder = searchRequest.source().collapse();
if (collapseBuilder == null || collapseBuilder.getInnerHit() == null) {
return ;
}
for (SearchHit hit : searchResponse.getHits()) {
BoolQueryBuilder groupQuery = new BoolQueryBuilder();
Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
if (collapseValue != null) {
groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));
} else {
groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));
}
QueryBuilder origQuery = searchRequest.source().query();
if (origQuery != null) {
groupQuery.must(origQuery);
}
SearchSourceBuilder sourceBuilder = createGroupSearchBuilder(collapseBuilder.getInnerHit())
.query(groupQuery);
SearchRequest groupRequest = new SearchRequest(searchRequest.indices())
.types(searchRequest.types())
.source(sourceBuilder);
SearchResponse groupResponse = client.search(groupRequest).actionGet();
SearchHits innerHits = groupResponse.getHits();
if (hit.getInnerHits() == null) {
hit.setInnerHits(new HashMap<>(1));
}
hit.getInnerHits().put(collapseBuilder.getInnerHit().getName(), innerHits);
}
}
private SearchSourceBuilder createGroupSearchBuilder(InnerHitBuilder options) {
SearchSourceBuilder groupSource = new SearchSourceBuilder();
groupSource.from(options.getFrom());
groupSource.size(options.getSize());
if (options.getSorts() != null) {
options.getSorts().forEach(groupSource::sort);
}
if (options.getFetchSourceContext() != null) {
if (options.getFetchSourceContext().includes() == null && options.getFetchSourceContext().excludes() == null) {
groupSource.fetchSource(options.getFetchSourceContext().fetchSource());
} else {
groupSource.fetchSource(options.getFetchSourceContext().includes(),
options.getFetchSourceContext().excludes());
}
}
if (options.getDocValueFields() != null) {
options.getDocValueFields().forEach(groupSource::docValueField);
}
if (options.getStoredFieldsContext() != null && options.getStoredFieldsContext().fieldNames() != null) {
options.getStoredFieldsContext().fieldNames().forEach(groupSource::storedField);
}
if (options.getScriptFields() != null) {
for (SearchSourceBuilder.ScriptField field : options.getScriptFields()) {
groupSource.scriptField(field.fieldName(), field.script());
}
}
if (options.getHighlightBuilder() != null) {
groupSource.highlighter(options.getHighlightBuilder());
}
groupSource.explain(options.isExplain());
groupSource.trackScores(options.isTrackScores());
return groupSource;
}
}

View File

@ -1,75 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Before;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import static java.util.Collections.singletonList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThan;
public class ResponseListenerPluginIT extends ESIntegTestCase {
static AtomicBoolean hookWasFired = new AtomicBoolean(false);
public static class FooResponseListenerPlugin extends Plugin implements SearchPlugin {
@Override
public List<BiConsumer<SearchRequest, SearchResponse>> getSearchResponseListeners() {
return singletonList((searchRequest, response) -> {
assertThat(response.getTookInMillis(), greaterThan(0L));
boolean alreadyFired = hookWasFired.getAndSet(true);
assertFalse(alreadyFired);
});
}
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(FooResponseListenerPlugin.class);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
createIndex("test");
ensureGreen();
client().prepareIndex("index", "type", "1").setSource("field", "value").get();
refresh();
}
public void testSearchResponseHook() {
assertHitCount(client().prepareSearch("index").setQuery(QueryBuilders.matchAllQuery()).get(), 1L);
boolean alreadyFired = hookWasFired.get();
assertTrue(alreadyFired);
}
}

View File

@ -18,8 +18,6 @@
*/
package org.elasticsearch.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.inject.ModuleTestCase;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -71,7 +69,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
@ -259,18 +256,6 @@ public class SearchModuleTests extends ModuleTestCase {
hasSize(1));
}
public void testRegisterSearchResponseListener() {
BiConsumer<SearchRequest, SearchResponse> listener = (s, r) -> {};
SearchModule module = new SearchModule(Settings.EMPTY, false, singletonList(new SearchPlugin() {
public List<BiConsumer<SearchRequest, SearchResponse>> getSearchResponseListeners() {
return singletonList(listener);
}
}));
List<BiConsumer<SearchRequest, SearchResponse>> listeners = module.getSearchResponseListeners();
assertEquals(listeners.size(), 1);
assertEquals(listeners.get(0), listener);
}
private static final String[] NON_DEPRECATED_QUERIES = new String[] {
"bool",
"boosting",

View File

@ -68,6 +68,7 @@ public class CollapseBuilderTests extends AbstractWireSerializingTestCase {
public static CollapseBuilder randomCollapseBuilder() {
CollapseBuilder builder = new CollapseBuilder(randomAsciiOfLength(10));
builder.setMaxConcurrentGroupRequests(randomIntBetween(1, 48));
if (randomBoolean()) {
InnerHitBuilder innerHit = InnerHitBuilderTests.randomInnerHits(false, false);
builder.setInnerHits(innerHit);

View File

@ -54,7 +54,8 @@ GET /twitter/tweet/_search
"name": "last_tweets", <2>
"size": 5, <3>
"sort": [{ "date": "asc" }] <4>
}
},
"max_concurrent_group_searches": 4 <5>
},
"sort": ["likes"]
}
@ -65,8 +66,15 @@ GET /twitter/tweet/_search
<2> the name used for the inner hit section in the response
<3> the number of inner_hits to retrieve per collapse key
<4> how to sort the document inside each group
<5> the number of concurrent requests allowed to retrieve the inner_hits` per group
See <<search-request-inner-hits, inner hits>> for the complete list of supported options and the format of the response.
The expansion of the group is done by sending an additional query for each
collapsed hit returned in the response.
The `max_concurrent_group_searches` request parameter can be used to control
the maximum number of concurrent searches allowed in this phase.
The default is based on the number of data nodes and the default search thread pool size.
WARNING: `collapse` cannot be used in conjunction with <<search-request-scroll, scroll>>,
<<search-request-rescore, rescore>> or <<search-request-search-after, search after>>.

View File

@ -47,7 +47,7 @@ setup:
- skip:
version: " - 5.2.99"
reason: this uses a new API that has been added in 6.0
reason: this uses a new API that has been added in 5.3
- do:
search:
@ -83,7 +83,7 @@ setup:
- skip:
version: " - 5.2.99"
reason: this uses a new API that has been added in 6.0
reason: this uses a new API that has been added in 5.3
- do:
search:
@ -108,7 +108,7 @@ setup:
- skip:
version: " - 5.2.99"
reason: this uses a new API that has been added in 6.0
reason: this uses a new API that has been added in 5.3
- do:
search:
@ -147,12 +147,58 @@ setup:
- match: { hits.hits.2.inner_hits.sub_hits.hits.hits.0._id: "5" }
- match: { hits.hits.2.inner_hits.sub_hits.hits.hits.1._id: "4" }
---
"field collapsing, inner_hits and maxConcurrentGroupRequests":
- skip:
version: " - 5.2.99"
reason: this uses a new API that has been added in 5.3
- do:
search:
index: test
type: test
body:
collapse: { field: numeric_group, max_concurrent_group_searches: 10, inner_hits: { name: sub_hits, size: 2, sort: [{ sort: asc }] } }
sort: [{ sort: desc }]
- match: { hits.total: 6 }
- length: { hits.hits: 3 }
- match: { hits.hits.0._index: test }
- match: { hits.hits.0._type: test }
- match: { hits.hits.0.fields.numeric_group: [3] }
- match: { hits.hits.0.sort: [36] }
- match: { hits.hits.0._id: "6" }
- match: { hits.hits.0.inner_hits.sub_hits.hits.total: 1 }
- length: { hits.hits.0.inner_hits.sub_hits.hits.hits: 1 }
- match: { hits.hits.0.inner_hits.sub_hits.hits.hits.0._id: "6" }
- match: { hits.hits.1._index: test }
- match: { hits.hits.1._type: test }
- match: { hits.hits.1.fields.numeric_group: [1] }
- match: { hits.hits.1.sort: [24] }
- match: { hits.hits.1._id: "3" }
- match: { hits.hits.1.inner_hits.sub_hits.hits.total: 3 }
- length: { hits.hits.1.inner_hits.sub_hits.hits.hits: 2 }
- match: { hits.hits.1.inner_hits.sub_hits.hits.hits.0._id: "2" }
- match: { hits.hits.1.inner_hits.sub_hits.hits.hits.1._id: "1" }
- match: { hits.hits.2._index: test }
- match: { hits.hits.2._type: test }
- match: { hits.hits.2.fields.numeric_group: [25] }
- match: { hits.hits.2.sort: [10] }
- match: { hits.hits.2._id: "4" }
- match: { hits.hits.2.inner_hits.sub_hits.hits.total: 2 }
- length: { hits.hits.2.inner_hits.sub_hits.hits.hits: 2 }
- match: { hits.hits.2.inner_hits.sub_hits.hits.hits.0._id: "5" }
- match: { hits.hits.2.inner_hits.sub_hits.hits.hits.1._id: "4" }
---
"field collapsing and scroll":
- skip:
version: " - 5.2.99"
reason: this uses a new API that has been added in 6.0
reason: this uses a new API that has been added in 5.3
- do:
catch: /cannot use \`collapse\` in a scroll context/
@ -168,7 +214,7 @@ setup:
- skip:
version: " - 5.2.99"
reason: this uses a new API that has been added in 6.0
reason: this uses a new API that has been added in 5.3
- do:
catch: /cannot use \`collapse\` in conjunction with \`search_after\`/
@ -185,7 +231,7 @@ setup:
- skip:
version: " - 5.2.99"
reason: this uses a new API that has been added in 6.0
reason: this uses a new API that has been added in 5.3
- do:
catch: /cannot use \`collapse\` in conjunction with \`rescore\`/