From 94087b3274c1f4c98c100886e4ad9aefd32c4582 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Thu, 9 Feb 2017 18:06:10 +0100 Subject: [PATCH] Removes ExpandCollapseSearchResponseListener, search response listeners and blocking calls This changes removes the SearchResponseListener that was used by the ExpandCollapseSearchResponseListener to expand collapsed hits. The removal of SearchResponseListener is not a breaking change because it was never released. This change also replace the blocking call in ExpandCollapseSearchResponseListener by a single asynchronous multi search request. The parallelism of the expand request can be set via CollapseBuilder#max_concurrent_group_searches Closes #23048 --- .../search/AbstractSearchAsyncAction.java | 26 ++-- .../action/search/SearchPhaseController.java | 19 +-- .../action/search/SearchTransportService.java | 9 ++ .../action/search/TransportSearchAction.java | 113 +++++++++++++++-- .../java/org/elasticsearch/node/Node.java | 4 +- .../elasticsearch/search/SearchModule.java | 28 ----- .../search/collapse/CollapseBuilder.java | 33 ++++- .../ExpandCollapseSearchResponseListener.java | 117 ------------------ .../search/ResponseListenerPluginIT.java | 75 ----------- .../search/SearchModuleTests.java | 15 --- .../search/collapse/CollapseBuilderTests.java | 1 + .../search/request/collapse.asciidoc | 10 +- .../test/search/110_field_collapsing.yaml | 58 ++++++++- 13 files changed, 217 insertions(+), 291 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/search/collapse/ExpandCollapseSearchResponseListener.java delete mode 100644 core/src/test/java/org/elasticsearch/search/ResponseListenerPluginIT.java diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 0b5af75674a..e40440a4911 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -559,25 +559,13 @@ abstract class AbstractSearchAsyncAction private void sendResponse(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs, String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase, AtomicArray fetchResultsArr) { - // this is only a temporary fix since field collapsing executes a blocking call on response - // which could be a network thread. we are fixing this but for now we just fork off again. - // this should be removed once https://github.com/elastic/elasticsearch/issues/23048 is fixed - getExecutor().execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - final boolean isScrollRequest = request.scroll() != null; - final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs, - reducedQueryPhase, fetchResultsArr); - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), - buildTookInMillis(), buildShardFailures())); - } - - @Override - public void onFailure(Exception e) { - raisePhaseFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures())); - } - }); + final boolean isScrollRequest = request.scroll() != null; + final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs, reducedQueryPhase, + fetchResultsArr); + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); } - } + + } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 54451556032..5193fe72784 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -38,6 +38,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; @@ -45,8 +47,6 @@ import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.profile.SearchProfileShardResults; @@ -65,7 +65,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -83,25 +82,11 @@ public class SearchPhaseController extends AbstractComponent { private final BigArrays bigArrays; private final ScriptService scriptService; - private final List > searchResponseListener; public SearchPhaseController(Settings settings, BigArrays bigArrays, ScriptService scriptService) { - this(settings, bigArrays, scriptService, Collections.emptyList()); - } - - public SearchPhaseController(Settings settings, BigArrays bigArrays, ScriptService scriptService, - List > searchResponseListener) { super(settings); this.bigArrays = bigArrays; this.scriptService = scriptService; - this.searchResponseListener = searchResponseListener; - } - - /** - * Returns the search response listeners registry - */ - public List > getSearchResponseListener() { - return searchResponseListener; } public AggregatedDfs aggregateDfs(AtomicArray results) { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 84ae9831c80..e5ff283b25b 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -173,6 +173,15 @@ public class SearchTransportService extends AbstractLifecycleComponent { new ActionListenerResponseHandler<>(listener, FetchSearchResult::new)); } + /** + * Used by {@link TransportSearchAction} to send the expand queries (field collapsing). + */ + void sendExecuteMultiSearch(DiscoveryNode node, final MultiSearchRequest request, SearchTask task, + final ActionListener listener) { + transportService.sendChildRequest(transportService.getConnection(node), MultiSearchAction.NAME, request, task, + new ActionListenerResponseHandler<>(listener, MultiSearchResponse::new)); + } + public RemoteClusterService getRemoteClusterService() { return remoteClusterService; } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index a36e98dcb15..1db6ac6a1a9 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -36,8 +36,15 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.InnerHitBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.collapse.CollapseBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -47,11 +54,11 @@ import org.elasticsearch.transport.TransportService; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; -import java.util.function.BiConsumer; import java.util.function.Function; import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH; @@ -212,16 +219,18 @@ public class TransportSearchAction extends HandledTransportAction wrapper; - if (searchPhaseController.getSearchResponseListener().size() > 0) { - wrapper = ActionListener.wrap(searchResponse -> { - List> responseListeners = - searchPhaseController.getSearchResponseListener(); - for (BiConsumer respListener : responseListeners) { - respListener.accept(searchRequest, searchResponse); - } - listener.onResponse(searchResponse); + if (searchRequest.source() != null && + searchRequest.source().collapse() != null && + searchRequest.source().collapse().getInnerHit() != null) { + wrapper = ActionListener.wrap(searchResponse -> { + if (searchResponse.getHits().getHits().length == 0) { + listener.onResponse(searchResponse); + } else { + expandCollapsedHits(nodes.getLocalNode(), task, searchRequest, searchResponse, listener); + } }, listener::onFailure); } else { wrapper = listener; @@ -284,4 +293,90 @@ public class TransportSearchAction extends HandledTransportAction finalListener) { + CollapseBuilder collapseBuilder = searchRequest.source().collapse(); + MultiSearchRequest multiRequest = new MultiSearchRequest(); + if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) { + multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests()); + } + for (SearchHit hit : searchResponse.getHits()) { + BoolQueryBuilder groupQuery = new BoolQueryBuilder(); + Object collapseValue = hit.field(collapseBuilder.getField()).getValue(); + if (collapseValue != null) { + groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue)); + } else { + groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField())); + } + QueryBuilder origQuery = searchRequest.source().query(); + if (origQuery != null) { + groupQuery.must(origQuery); + } + SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(collapseBuilder.getInnerHit()) + .query(groupQuery); + SearchRequest groupRequest = new SearchRequest(searchRequest.indices()) + .types(searchRequest.types()) + .source(sourceBuilder); + multiRequest.add(groupRequest); + } + searchTransportService.sendExecuteMultiSearch(node, multiRequest, parentTask, + ActionListener.wrap(response -> { + Iterator it = response.iterator(); + for (SearchHit hit : searchResponse.getHits()) { + MultiSearchResponse.Item item = it.next(); + if (item.isFailure()) { + finalListener.onFailure(item.getFailure()); + return; + } + SearchHits innerHits = item.getResponse().getHits(); + if (hit.getInnerHits() == null) { + hit.setInnerHits(new HashMap<>(1)); + } + hit.getInnerHits().put(collapseBuilder.getInnerHit().getName(), innerHits); + } + finalListener.onResponse(searchResponse); + }, finalListener::onFailure) + ); + } + + private SearchSourceBuilder buildExpandSearchSourceBuilder(InnerHitBuilder options) { + SearchSourceBuilder groupSource = new SearchSourceBuilder(); + groupSource.from(options.getFrom()); + groupSource.size(options.getSize()); + if (options.getSorts() != null) { + options.getSorts().forEach(groupSource::sort); + } + if (options.getFetchSourceContext() != null) { + if (options.getFetchSourceContext().includes() == null && options.getFetchSourceContext().excludes() == null) { + groupSource.fetchSource(options.getFetchSourceContext().fetchSource()); + } else { + groupSource.fetchSource(options.getFetchSourceContext().includes(), + options.getFetchSourceContext().excludes()); + } + } + if (options.getDocValueFields() != null) { + options.getDocValueFields().forEach(groupSource::docValueField); + } + if (options.getStoredFieldsContext() != null && options.getStoredFieldsContext().fieldNames() != null) { + options.getStoredFieldsContext().fieldNames().forEach(groupSource::storedField); + } + if (options.getScriptFields() != null) { + for (SearchSourceBuilder.ScriptField field : options.getScriptFields()) { + groupSource.scriptField(field.fieldName(), field.script()); + } + } + if (options.getHighlightBuilder() != null) { + groupSource.highlighter(options.getHighlightBuilder()); + } + groupSource.explain(options.isExplain()); + groupSource.trackScores(options.isTrackScores()); + return groupSource; + } } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 9279451d470..382423d9a2e 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -350,7 +350,7 @@ public class Node implements Closeable { IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); modules.add(indicesModule); - SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class), client); + SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class)); CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(), settingsModule.getClusterSettings()); resourcesToClose.add(circuitBreakerService); @@ -451,7 +451,7 @@ public class Node implements Closeable { b.bind(SearchTransportService.class).toInstance(new SearchTransportService(settings, settingsModule.getClusterSettings(), transportService)); b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays, - scriptModule.getScriptService(), searchModule.getSearchResponseListeners())); + scriptModule.getScriptService())); b.bind(Transport.class).toInstance(transport); b.bind(TransportService.class).toInstance(transportService); b.bind(NetworkService.class).toInstance(networkService); diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index 01c4e3488e7..8707d851d3a 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -22,7 +22,6 @@ package org.elasticsearch.search; import org.apache.lucene.search.BooleanQuery; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; import org.elasticsearch.common.NamedRegistry; import org.elasticsearch.common.geo.ShapesAvailability; import org.elasticsearch.common.geo.builders.ShapeBuilders; @@ -223,7 +222,6 @@ import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel; import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator; -import org.elasticsearch.search.collapse.ExpandCollapseSearchResponseListener; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.fetch.FetchSubPhase; import org.elasticsearch.search.fetch.subphase.DocValueFieldsFetchSubPhase; @@ -280,17 +278,12 @@ public class SearchModule { "moving_avg_model"); private final List fetchSubPhases = new ArrayList<>(); - private final List > searchResponseListeners = new ArrayList<> (); private final Settings settings; private final List namedWriteables = new ArrayList<>(); private final List namedXContents = new ArrayList<>(); public SearchModule(Settings settings, boolean transportClient, List plugins) { - this(settings, transportClient, plugins, null); - } - - public SearchModule(Settings settings, boolean transportClient, List plugins, Client client) { this.settings = settings; this.transportClient = transportClient; registerSuggesters(plugins); @@ -306,9 +299,6 @@ public class SearchModule { registerPipelineAggregations(plugins); registerFetchSubPhases(plugins); registerSearchExts(plugins); - if (false == transportClient) { - registerSearchResponseListeners(client, plugins); - } registerShapes(); } @@ -341,13 +331,6 @@ public class SearchModule { return movingAverageModelParserRegistry; } - /** - * Returns the search response listeners registry - */ - public List > getSearchResponseListeners() { - return searchResponseListeners; - } - private void registerAggregations(List plugins) { registerAggregation(new AggregationSpec(AvgAggregationBuilder.NAME, AvgAggregationBuilder::new, AvgAggregationBuilder::parse) .addResultReader(InternalAvg::new)); @@ -699,13 +682,6 @@ public class SearchModule { registerFromPlugin(plugins, p -> p.getFetchSubPhases(context), this::registerFetchSubPhase); } - private void registerSearchResponseListeners(Client client, List plugins) { - if (client != null) { - registerSearchResponseListener(new ExpandCollapseSearchResponseListener(client)); - } - registerFromPlugin(plugins, p -> p.getSearchResponseListeners(), this::registerSearchResponseListener); - } - private void registerSearchExts(List plugins) { registerFromPlugin(plugins, SearchPlugin::getSearchExts, this::registerSearchExt); } @@ -791,10 +767,6 @@ public class SearchModule { (p, c) -> spec.getParser().fromXContent((QueryParseContext) c))); } - private void registerSearchResponseListener(BiConsumer listener) { - searchResponseListeners.add(requireNonNull(listener, "SearchResponseListener must not be null")); - } - public FetchPhase getFetchPhase() { return new FetchPhase(fetchSubPhases); } diff --git a/core/src/main/java/org/elasticsearch/search/collapse/CollapseBuilder.java b/core/src/main/java/org/elasticsearch/search/collapse/CollapseBuilder.java index 75a7d6dadf3..542ae2c3ab9 100644 --- a/core/src/main/java/org/elasticsearch/search/collapse/CollapseBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/collapse/CollapseBuilder.java @@ -45,17 +45,20 @@ import java.util.Objects; public class CollapseBuilder extends ToXContentToBytes implements Writeable { public static final ParseField FIELD_FIELD = new ParseField("field"); public static final ParseField INNER_HITS_FIELD = new ParseField("inner_hits"); + public static final ParseField MAX_CONCURRENT_GROUP_REQUESTS_FIELD = new ParseField("max_concurrent_group_searches"); private static final ObjectParser PARSER = new ObjectParser<>("collapse", CollapseBuilder::new); static { PARSER.declareString(CollapseBuilder::setField, FIELD_FIELD); + PARSER.declareInt(CollapseBuilder::setMaxConcurrentGroupRequests, MAX_CONCURRENT_GROUP_REQUESTS_FIELD); PARSER.declareObject(CollapseBuilder::setInnerHits, (p, c) -> InnerHitBuilder.fromXContent(c), INNER_HITS_FIELD); } private String field; private InnerHitBuilder innerHit; + private int maxConcurrentGroupRequests = 0; private CollapseBuilder() {} @@ -70,12 +73,14 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable { public CollapseBuilder(StreamInput in) throws IOException { this.field = in.readString(); + this.maxConcurrentGroupRequests = in.readVInt(); this.innerHit = in.readOptionalWriteable(InnerHitBuilder::new); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(field); + out.writeVInt(maxConcurrentGroupRequests); out.writeOptionalWriteable(innerHit); } @@ -84,6 +89,7 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable { return builder; } + // for object parser only private CollapseBuilder setField(String field) { if (Strings.isEmpty(field)) { throw new IllegalArgumentException("field name is null or empty"); @@ -97,6 +103,14 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable { return this; } + public CollapseBuilder setMaxConcurrentGroupRequests(int num) { + if (num < 1) { + throw new IllegalArgumentException("maxConcurrentGroupRequests` must be positive"); + } + this.maxConcurrentGroupRequests = num; + return this; + } + /** * The name of the field to collapse against */ @@ -111,6 +125,13 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable { return this.innerHit; } + /** + * Returns the amount of group requests that are allowed to be ran concurrently in the inner_hits phase. + */ + public int getMaxConcurrentGroupRequests() { + return maxConcurrentGroupRequests; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(); @@ -121,6 +142,9 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable { private void innerToXContent(XContentBuilder builder) throws IOException { builder.field(FIELD_FIELD.getPreferredName(), field); + if (maxConcurrentGroupRequests > 0) { + builder.field(MAX_CONCURRENT_GROUP_REQUESTS_FIELD.getPreferredName(), maxConcurrentGroupRequests); + } if (innerHit != null) { builder.field(INNER_HITS_FIELD.getPreferredName(), innerHit); } @@ -133,13 +157,18 @@ public class CollapseBuilder extends ToXContentToBytes implements Writeable { CollapseBuilder that = (CollapseBuilder) o; - if (field != null ? !field.equals(that.field) : that.field != null) return false; + if (maxConcurrentGroupRequests != that.maxConcurrentGroupRequests) return false; + if (!field.equals(that.field)) return false; return innerHit != null ? innerHit.equals(that.innerHit) : that.innerHit == null; + } @Override public int hashCode() { - return Objects.hash(this.field, this.innerHit); + int result = field.hashCode(); + result = 31 * result + (innerHit != null ? innerHit.hashCode() : 0); + result = 31 * result + maxConcurrentGroupRequests; + return result; } public CollapseContext build(SearchContext context) { diff --git a/core/src/main/java/org/elasticsearch/search/collapse/ExpandCollapseSearchResponseListener.java b/core/src/main/java/org/elasticsearch/search/collapse/ExpandCollapseSearchResponseListener.java deleted file mode 100644 index b9caa5216cb..00000000000 --- a/core/src/main/java/org/elasticsearch/search/collapse/ExpandCollapseSearchResponseListener.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.search.collapse; - -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.InnerHitBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.builder.SearchSourceBuilder; - -import java.util.HashMap; -import java.util.Objects; -import java.util.function.BiConsumer; - -/** - * A search response listener that intercepts the search response and expands collapsed hits - * using the {@link CollapseBuilder#innerHit} options. - */ -public class ExpandCollapseSearchResponseListener implements BiConsumer { - private final Client client; - - public ExpandCollapseSearchResponseListener(Client client) { - this.client = Objects.requireNonNull(client); - } - - @Override - public void accept(SearchRequest searchRequest, SearchResponse searchResponse) { - if (searchRequest.source() == null) { - return ; - } - CollapseBuilder collapseBuilder = searchRequest.source().collapse(); - if (collapseBuilder == null || collapseBuilder.getInnerHit() == null) { - return ; - } - for (SearchHit hit : searchResponse.getHits()) { - BoolQueryBuilder groupQuery = new BoolQueryBuilder(); - Object collapseValue = hit.field(collapseBuilder.getField()).getValue(); - if (collapseValue != null) { - groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue)); - } else { - groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField())); - } - QueryBuilder origQuery = searchRequest.source().query(); - if (origQuery != null) { - groupQuery.must(origQuery); - } - SearchSourceBuilder sourceBuilder = createGroupSearchBuilder(collapseBuilder.getInnerHit()) - .query(groupQuery); - SearchRequest groupRequest = new SearchRequest(searchRequest.indices()) - .types(searchRequest.types()) - .source(sourceBuilder); - SearchResponse groupResponse = client.search(groupRequest).actionGet(); - SearchHits innerHits = groupResponse.getHits(); - if (hit.getInnerHits() == null) { - hit.setInnerHits(new HashMap<>(1)); - } - hit.getInnerHits().put(collapseBuilder.getInnerHit().getName(), innerHits); - } - } - - private SearchSourceBuilder createGroupSearchBuilder(InnerHitBuilder options) { - SearchSourceBuilder groupSource = new SearchSourceBuilder(); - groupSource.from(options.getFrom()); - groupSource.size(options.getSize()); - if (options.getSorts() != null) { - options.getSorts().forEach(groupSource::sort); - } - if (options.getFetchSourceContext() != null) { - if (options.getFetchSourceContext().includes() == null && options.getFetchSourceContext().excludes() == null) { - groupSource.fetchSource(options.getFetchSourceContext().fetchSource()); - } else { - groupSource.fetchSource(options.getFetchSourceContext().includes(), - options.getFetchSourceContext().excludes()); - } - } - if (options.getDocValueFields() != null) { - options.getDocValueFields().forEach(groupSource::docValueField); - } - if (options.getStoredFieldsContext() != null && options.getStoredFieldsContext().fieldNames() != null) { - options.getStoredFieldsContext().fieldNames().forEach(groupSource::storedField); - } - if (options.getScriptFields() != null) { - for (SearchSourceBuilder.ScriptField field : options.getScriptFields()) { - groupSource.scriptField(field.fieldName(), field.script()); - } - } - if (options.getHighlightBuilder() != null) { - groupSource.highlighter(options.getHighlightBuilder()); - } - groupSource.explain(options.isExplain()); - groupSource.trackScores(options.isTrackScores()); - return groupSource; - } - -} diff --git a/core/src/test/java/org/elasticsearch/search/ResponseListenerPluginIT.java b/core/src/test/java/org/elasticsearch/search/ResponseListenerPluginIT.java deleted file mode 100644 index 0fc36a42b18..00000000000 --- a/core/src/test/java/org/elasticsearch/search/ResponseListenerPluginIT.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.search; - -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.plugins.SearchPlugin; -import org.elasticsearch.test.ESIntegTestCase; -import org.junit.Before; - -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; - -import static java.util.Collections.singletonList; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.greaterThan; - -public class ResponseListenerPluginIT extends ESIntegTestCase { - - static AtomicBoolean hookWasFired = new AtomicBoolean(false); - - public static class FooResponseListenerPlugin extends Plugin implements SearchPlugin { - @Override - public List> getSearchResponseListeners() { - return singletonList((searchRequest, response) -> { - assertThat(response.getTookInMillis(), greaterThan(0L)); - boolean alreadyFired = hookWasFired.getAndSet(true); - assertFalse(alreadyFired); - }); - } - } - - @Override - protected Collection> nodePlugins() { - return Collections.singletonList(FooResponseListenerPlugin.class); - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - createIndex("test"); - ensureGreen(); - client().prepareIndex("index", "type", "1").setSource("field", "value").get(); - refresh(); - } - - public void testSearchResponseHook() { - assertHitCount(client().prepareSearch("index").setQuery(QueryBuilders.matchAllQuery()).get(), 1L); - boolean alreadyFired = hookWasFired.get(); - assertTrue(alreadyFired); - } -} diff --git a/core/src/test/java/org/elasticsearch/search/SearchModuleTests.java b/core/src/test/java/org/elasticsearch/search/SearchModuleTests.java index 0eeab9897fa..8514096b837 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchModuleTests.java +++ b/core/src/test/java/org/elasticsearch/search/SearchModuleTests.java @@ -18,8 +18,6 @@ */ package org.elasticsearch.search; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.inject.ModuleTestCase; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -71,7 +69,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.BiConsumer; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -259,18 +256,6 @@ public class SearchModuleTests extends ModuleTestCase { hasSize(1)); } - public void testRegisterSearchResponseListener() { - BiConsumer listener = (s, r) -> {}; - SearchModule module = new SearchModule(Settings.EMPTY, false, singletonList(new SearchPlugin() { - public List> getSearchResponseListeners() { - return singletonList(listener); - } - })); - List> listeners = module.getSearchResponseListeners(); - assertEquals(listeners.size(), 1); - assertEquals(listeners.get(0), listener); - } - private static final String[] NON_DEPRECATED_QUERIES = new String[] { "bool", "boosting", diff --git a/core/src/test/java/org/elasticsearch/search/collapse/CollapseBuilderTests.java b/core/src/test/java/org/elasticsearch/search/collapse/CollapseBuilderTests.java index 955b85deb82..0fe957e8796 100644 --- a/core/src/test/java/org/elasticsearch/search/collapse/CollapseBuilderTests.java +++ b/core/src/test/java/org/elasticsearch/search/collapse/CollapseBuilderTests.java @@ -68,6 +68,7 @@ public class CollapseBuilderTests extends AbstractWireSerializingTestCase { public static CollapseBuilder randomCollapseBuilder() { CollapseBuilder builder = new CollapseBuilder(randomAsciiOfLength(10)); + builder.setMaxConcurrentGroupRequests(randomIntBetween(1, 48)); if (randomBoolean()) { InnerHitBuilder innerHit = InnerHitBuilderTests.randomInnerHits(false, false); builder.setInnerHits(innerHit); diff --git a/docs/reference/search/request/collapse.asciidoc b/docs/reference/search/request/collapse.asciidoc index 7845e20e152..d91799946cf 100644 --- a/docs/reference/search/request/collapse.asciidoc +++ b/docs/reference/search/request/collapse.asciidoc @@ -54,7 +54,8 @@ GET /twitter/tweet/_search "name": "last_tweets", <2> "size": 5, <3> "sort": [{ "date": "asc" }] <4> - } + }, + "max_concurrent_group_searches": 4 <5> }, "sort": ["likes"] } @@ -65,8 +66,15 @@ GET /twitter/tweet/_search <2> the name used for the inner hit section in the response <3> the number of inner_hits to retrieve per collapse key <4> how to sort the document inside each group +<5> the number of concurrent requests allowed to retrieve the inner_hits` per group See <> for the complete list of supported options and the format of the response. +The expansion of the group is done by sending an additional query for each +collapsed hit returned in the response. +The `max_concurrent_group_searches` request parameter can be used to control +the maximum number of concurrent searches allowed in this phase. +The default is based on the number of data nodes and the default search thread pool size. + WARNING: `collapse` cannot be used in conjunction with <>, <> or <>. diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/110_field_collapsing.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/110_field_collapsing.yaml index a015f8649cf..dd17399d31a 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search/110_field_collapsing.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/110_field_collapsing.yaml @@ -47,7 +47,7 @@ setup: - skip: version: " - 5.2.99" - reason: this uses a new API that has been added in 6.0 + reason: this uses a new API that has been added in 5.3 - do: search: @@ -83,7 +83,7 @@ setup: - skip: version: " - 5.2.99" - reason: this uses a new API that has been added in 6.0 + reason: this uses a new API that has been added in 5.3 - do: search: @@ -108,7 +108,7 @@ setup: - skip: version: " - 5.2.99" - reason: this uses a new API that has been added in 6.0 + reason: this uses a new API that has been added in 5.3 - do: search: @@ -147,12 +147,58 @@ setup: - match: { hits.hits.2.inner_hits.sub_hits.hits.hits.0._id: "5" } - match: { hits.hits.2.inner_hits.sub_hits.hits.hits.1._id: "4" } + +--- +"field collapsing, inner_hits and maxConcurrentGroupRequests": + + - skip: + version: " - 5.2.99" + reason: this uses a new API that has been added in 5.3 + + - do: + search: + index: test + type: test + body: + collapse: { field: numeric_group, max_concurrent_group_searches: 10, inner_hits: { name: sub_hits, size: 2, sort: [{ sort: asc }] } } + sort: [{ sort: desc }] + + - match: { hits.total: 6 } + - length: { hits.hits: 3 } + - match: { hits.hits.0._index: test } + - match: { hits.hits.0._type: test } + - match: { hits.hits.0.fields.numeric_group: [3] } + - match: { hits.hits.0.sort: [36] } + - match: { hits.hits.0._id: "6" } + - match: { hits.hits.0.inner_hits.sub_hits.hits.total: 1 } + - length: { hits.hits.0.inner_hits.sub_hits.hits.hits: 1 } + - match: { hits.hits.0.inner_hits.sub_hits.hits.hits.0._id: "6" } + - match: { hits.hits.1._index: test } + - match: { hits.hits.1._type: test } + - match: { hits.hits.1.fields.numeric_group: [1] } + - match: { hits.hits.1.sort: [24] } + - match: { hits.hits.1._id: "3" } + - match: { hits.hits.1.inner_hits.sub_hits.hits.total: 3 } + - length: { hits.hits.1.inner_hits.sub_hits.hits.hits: 2 } + - match: { hits.hits.1.inner_hits.sub_hits.hits.hits.0._id: "2" } + - match: { hits.hits.1.inner_hits.sub_hits.hits.hits.1._id: "1" } + - match: { hits.hits.2._index: test } + - match: { hits.hits.2._type: test } + - match: { hits.hits.2.fields.numeric_group: [25] } + - match: { hits.hits.2.sort: [10] } + - match: { hits.hits.2._id: "4" } + - match: { hits.hits.2.inner_hits.sub_hits.hits.total: 2 } + - length: { hits.hits.2.inner_hits.sub_hits.hits.hits: 2 } + - match: { hits.hits.2.inner_hits.sub_hits.hits.hits.0._id: "5" } + - match: { hits.hits.2.inner_hits.sub_hits.hits.hits.1._id: "4" } + + --- "field collapsing and scroll": - skip: version: " - 5.2.99" - reason: this uses a new API that has been added in 6.0 + reason: this uses a new API that has been added in 5.3 - do: catch: /cannot use \`collapse\` in a scroll context/ @@ -168,7 +214,7 @@ setup: - skip: version: " - 5.2.99" - reason: this uses a new API that has been added in 6.0 + reason: this uses a new API that has been added in 5.3 - do: catch: /cannot use \`collapse\` in conjunction with \`search_after\`/ @@ -185,7 +231,7 @@ setup: - skip: version: " - 5.2.99" - reason: this uses a new API that has been added in 6.0 + reason: this uses a new API that has been added in 5.3 - do: catch: /cannot use \`collapse\` in conjunction with \`rescore\`/