Add ExpandSearchPhase as a successor for the FetchSearchPhase (#23165)
Now that we have more flexible search phases we should move the rather hacky integration of the collapse feature as a real search phase that can be tested and used by itself. This commit adds a new ExpandSearchPhase including a unittest for the phase. It's integrated into the fetch phase as an optional successor.
This commit is contained in:
parent
1556e81e9a
commit
a7a3729596
|
@ -0,0 +1,149 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.InnerHitBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.collapse.CollapseBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* This search phase is an optional phase that will be executed once all hits are fetched from the shards that executes
|
||||
* field-collapsing on the inner hits. This phase only executes if field collapsing is requested in the search request and otherwise
|
||||
* forwards to the next phase immediately.
|
||||
*/
|
||||
final class ExpandSearchPhase extends SearchPhase {
|
||||
private final SearchPhaseContext context;
|
||||
private final SearchResponse searchResponse;
|
||||
private final Function<SearchResponse, SearchPhase> nextPhaseFactory;
|
||||
|
||||
ExpandSearchPhase(SearchPhaseContext context, SearchResponse searchResponse,
|
||||
Function<SearchResponse, SearchPhase> nextPhaseFactory) {
|
||||
super("expand");
|
||||
this.context = context;
|
||||
this.searchResponse = searchResponse;
|
||||
this.nextPhaseFactory = nextPhaseFactory;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> iff the search request has inner hits and needs field collapsing
|
||||
*/
|
||||
private boolean isCollapseRequest() {
|
||||
final SearchRequest searchRequest = context.getRequest();
|
||||
return searchRequest.source() != null &&
|
||||
searchRequest.source().collapse() != null &&
|
||||
searchRequest.source().collapse().getInnerHit() != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
if (isCollapseRequest()) {
|
||||
SearchRequest searchRequest = context.getRequest();
|
||||
CollapseBuilder collapseBuilder = searchRequest.source().collapse();
|
||||
MultiSearchRequest multiRequest = new MultiSearchRequest();
|
||||
if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {
|
||||
multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());
|
||||
}
|
||||
for (SearchHit hit : searchResponse.getHits()) {
|
||||
BoolQueryBuilder groupQuery = new BoolQueryBuilder();
|
||||
Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
|
||||
if (collapseValue != null) {
|
||||
groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));
|
||||
} else {
|
||||
groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));
|
||||
}
|
||||
QueryBuilder origQuery = searchRequest.source().query();
|
||||
if (origQuery != null) {
|
||||
groupQuery.must(origQuery);
|
||||
}
|
||||
SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(collapseBuilder.getInnerHit())
|
||||
.query(groupQuery);
|
||||
SearchRequest groupRequest = new SearchRequest(searchRequest.indices())
|
||||
.types(searchRequest.types())
|
||||
.source(sourceBuilder);
|
||||
multiRequest.add(groupRequest);
|
||||
}
|
||||
context.getSearchTransport().sendExecuteMultiSearch(multiRequest, context.getTask(),
|
||||
ActionListener.wrap(response -> {
|
||||
Iterator<MultiSearchResponse.Item> it = response.iterator();
|
||||
for (SearchHit hit : searchResponse.getHits()) {
|
||||
MultiSearchResponse.Item item = it.next();
|
||||
if (item.isFailure()) {
|
||||
context.onPhaseFailure(this, "failed to expand hits", item.getFailure());
|
||||
return;
|
||||
}
|
||||
SearchHits innerHits = item.getResponse().getHits();
|
||||
if (hit.getInnerHits() == null) {
|
||||
hit.setInnerHits(new HashMap<>(1));
|
||||
}
|
||||
hit.getInnerHits().put(collapseBuilder.getInnerHit().getName(), innerHits);
|
||||
}
|
||||
context.executeNextPhase(this, nextPhaseFactory.apply(searchResponse));
|
||||
}, context::onFailure)
|
||||
);
|
||||
} else {
|
||||
context.executeNextPhase(this, nextPhaseFactory.apply(searchResponse));
|
||||
}
|
||||
}
|
||||
|
||||
private SearchSourceBuilder buildExpandSearchSourceBuilder(InnerHitBuilder options) {
|
||||
SearchSourceBuilder groupSource = new SearchSourceBuilder();
|
||||
groupSource.from(options.getFrom());
|
||||
groupSource.size(options.getSize());
|
||||
if (options.getSorts() != null) {
|
||||
options.getSorts().forEach(groupSource::sort);
|
||||
}
|
||||
if (options.getFetchSourceContext() != null) {
|
||||
if (options.getFetchSourceContext().includes() == null && options.getFetchSourceContext().excludes() == null) {
|
||||
groupSource.fetchSource(options.getFetchSourceContext().fetchSource());
|
||||
} else {
|
||||
groupSource.fetchSource(options.getFetchSourceContext().includes(),
|
||||
options.getFetchSourceContext().excludes());
|
||||
}
|
||||
}
|
||||
if (options.getDocValueFields() != null) {
|
||||
options.getDocValueFields().forEach(groupSource::docValueField);
|
||||
}
|
||||
if (options.getStoredFieldsContext() != null && options.getStoredFieldsContext().fieldNames() != null) {
|
||||
options.getStoredFieldsContext().fieldNames().forEach(groupSource::storedField);
|
||||
}
|
||||
if (options.getScriptFields() != null) {
|
||||
for (SearchSourceBuilder.ScriptField field : options.getScriptFields()) {
|
||||
groupSource.scriptField(field.fieldName(), field.script());
|
||||
}
|
||||
}
|
||||
if (options.getHighlightBuilder() != null) {
|
||||
groupSource.highlighter(options.getHighlightBuilder());
|
||||
}
|
||||
groupSource.explain(options.isExplain());
|
||||
groupSource.trackScores(options.isTrackScores());
|
||||
return groupSource;
|
||||
}
|
||||
}
|
|
@ -53,7 +53,9 @@ final class FetchSearchPhase extends SearchPhase {
|
|||
FetchSearchPhase(AtomicArray<QuerySearchResultProvider> queryResults,
|
||||
SearchPhaseController searchPhaseController,
|
||||
SearchPhaseContext context) {
|
||||
this(queryResults, searchPhaseController, context, (response) -> sendResponsePhase(response, context));
|
||||
this(queryResults, searchPhaseController, context,
|
||||
(response) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits
|
||||
(finalResponse) -> sendResponsePhase(finalResponse, context)));
|
||||
}
|
||||
|
||||
FetchSearchPhase(AtomicArray<QuerySearchResultProvider> queryResults,
|
||||
|
|
|
@ -176,10 +176,10 @@ public class SearchTransportService extends AbstractLifecycleComponent {
|
|||
/**
|
||||
* Used by {@link TransportSearchAction} to send the expand queries (field collapsing).
|
||||
*/
|
||||
void sendExecuteMultiSearch(DiscoveryNode node, final MultiSearchRequest request, SearchTask task,
|
||||
void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task,
|
||||
final ActionListener<MultiSearchResponse> listener) {
|
||||
transportService.sendChildRequest(transportService.getConnection(node), MultiSearchAction.NAME, request, task,
|
||||
new ActionListenerResponseHandler<>(listener, MultiSearchResponse::new));
|
||||
transportService.sendChildRequest(transportService.getConnection(transportService.getLocalNode()), MultiSearchAction.NAME, request,
|
||||
task, new ActionListenerResponseHandler<>(listener, MultiSearchResponse::new));
|
||||
}
|
||||
|
||||
public RemoteClusterService getRemoteClusterService() {
|
||||
|
|
|
@ -36,15 +36,8 @@ import org.elasticsearch.common.settings.Setting;
|
|||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.InnerHitBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.collapse.CollapseBuilder;
|
||||
import org.elasticsearch.search.internal.AliasFilter;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -54,7 +47,6 @@ import org.elasticsearch.transport.TransportService;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -219,24 +211,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
return connection;
|
||||
};
|
||||
|
||||
// Only enrich the search response iff collapsing has been specified:
|
||||
final ActionListener<SearchResponse> wrapper;
|
||||
if (searchRequest.source() != null &&
|
||||
searchRequest.source().collapse() != null &&
|
||||
searchRequest.source().collapse().getInnerHit() != null) {
|
||||
|
||||
wrapper = ActionListener.wrap(searchResponse -> {
|
||||
if (searchResponse.getHits().getHits().length == 0) {
|
||||
listener.onResponse(searchResponse);
|
||||
} else {
|
||||
expandCollapsedHits(nodes.getLocalNode(), task, searchRequest, searchResponse, listener);
|
||||
}
|
||||
}, listener::onFailure);
|
||||
} else {
|
||||
wrapper = listener;
|
||||
}
|
||||
searchAsyncAction(task, searchRequest, shardIterators, startTimeInMillis, connectionLookup, clusterState.version(),
|
||||
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, wrapper).start();
|
||||
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener).start();
|
||||
}
|
||||
|
||||
private static GroupShardsIterator mergeShardsIterators(GroupShardsIterator localShardsIterator,
|
||||
|
@ -293,90 +269,4 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
+ "] to a greater value if you really want to query that many shards at the same time.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Expands collapsed using the {@link CollapseBuilder#innerHit} options.
|
||||
*/
|
||||
void expandCollapsedHits(DiscoveryNode node,
|
||||
SearchTask parentTask,
|
||||
SearchRequest searchRequest,
|
||||
SearchResponse searchResponse,
|
||||
ActionListener<SearchResponse> finalListener) {
|
||||
CollapseBuilder collapseBuilder = searchRequest.source().collapse();
|
||||
MultiSearchRequest multiRequest = new MultiSearchRequest();
|
||||
if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {
|
||||
multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());
|
||||
}
|
||||
for (SearchHit hit : searchResponse.getHits()) {
|
||||
BoolQueryBuilder groupQuery = new BoolQueryBuilder();
|
||||
Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
|
||||
if (collapseValue != null) {
|
||||
groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));
|
||||
} else {
|
||||
groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));
|
||||
}
|
||||
QueryBuilder origQuery = searchRequest.source().query();
|
||||
if (origQuery != null) {
|
||||
groupQuery.must(origQuery);
|
||||
}
|
||||
SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(collapseBuilder.getInnerHit())
|
||||
.query(groupQuery);
|
||||
SearchRequest groupRequest = new SearchRequest(searchRequest.indices())
|
||||
.types(searchRequest.types())
|
||||
.source(sourceBuilder);
|
||||
multiRequest.add(groupRequest);
|
||||
}
|
||||
searchTransportService.sendExecuteMultiSearch(node, multiRequest, parentTask,
|
||||
ActionListener.wrap(response -> {
|
||||
Iterator<MultiSearchResponse.Item> it = response.iterator();
|
||||
for (SearchHit hit : searchResponse.getHits()) {
|
||||
MultiSearchResponse.Item item = it.next();
|
||||
if (item.isFailure()) {
|
||||
finalListener.onFailure(item.getFailure());
|
||||
return;
|
||||
}
|
||||
SearchHits innerHits = item.getResponse().getHits();
|
||||
if (hit.getInnerHits() == null) {
|
||||
hit.setInnerHits(new HashMap<>(1));
|
||||
}
|
||||
hit.getInnerHits().put(collapseBuilder.getInnerHit().getName(), innerHits);
|
||||
}
|
||||
finalListener.onResponse(searchResponse);
|
||||
}, finalListener::onFailure)
|
||||
);
|
||||
}
|
||||
|
||||
private SearchSourceBuilder buildExpandSearchSourceBuilder(InnerHitBuilder options) {
|
||||
SearchSourceBuilder groupSource = new SearchSourceBuilder();
|
||||
groupSource.from(options.getFrom());
|
||||
groupSource.size(options.getSize());
|
||||
if (options.getSorts() != null) {
|
||||
options.getSorts().forEach(groupSource::sort);
|
||||
}
|
||||
if (options.getFetchSourceContext() != null) {
|
||||
if (options.getFetchSourceContext().includes() == null && options.getFetchSourceContext().excludes() == null) {
|
||||
groupSource.fetchSource(options.getFetchSourceContext().fetchSource());
|
||||
} else {
|
||||
groupSource.fetchSource(options.getFetchSourceContext().includes(),
|
||||
options.getFetchSourceContext().excludes());
|
||||
}
|
||||
}
|
||||
if (options.getDocValueFields() != null) {
|
||||
options.getDocValueFields().forEach(groupSource::docValueField);
|
||||
}
|
||||
if (options.getStoredFieldsContext() != null && options.getStoredFieldsContext().fieldNames() != null) {
|
||||
options.getStoredFieldsContext().fieldNames().forEach(groupSource::storedField);
|
||||
}
|
||||
if (options.getScriptFields() != null) {
|
||||
for (SearchSourceBuilder.ScriptField field : options.getScriptFields()) {
|
||||
groupSource.scriptField(field.fieldName(), field.script());
|
||||
}
|
||||
}
|
||||
if (options.getHighlightBuilder() != null) {
|
||||
groupSource.highlighter(options.getHighlightBuilder());
|
||||
}
|
||||
groupSource.explain(options.isExplain());
|
||||
groupSource.trackScores(options.isTrackScores());
|
||||
return groupSource;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,199 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.InnerHitBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHitField;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.collapse.CollapseBuilder;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class ExpandSearchPhaseTests extends ESTestCase {
|
||||
|
||||
public void testCollapseSingleHit() throws IOException {
|
||||
final int iters = randomIntBetween(5, 10);
|
||||
for (int i = 0; i < iters; i++) {
|
||||
SearchHits collapsedHits = new SearchHits(new SearchHit[]{new SearchHit(2, "ID", new Text("type"),
|
||||
Collections.emptyMap()), new SearchHit(3, "ID", new Text("type"),
|
||||
Collections.emptyMap())}, 1, 1.0F);
|
||||
AtomicBoolean executedMultiSearch = new AtomicBoolean(false);
|
||||
QueryBuilder originalQuery = randomBoolean() ? null : QueryBuilders.termQuery("foo", "bar");
|
||||
|
||||
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
|
||||
String collapseValue = randomBoolean() ? null : "boom";
|
||||
mockSearchPhaseContext.getRequest().source(new SearchSourceBuilder()
|
||||
.collapse(new CollapseBuilder("someField").setInnerHits(new InnerHitBuilder().setName("foobarbaz"))));
|
||||
mockSearchPhaseContext.getRequest().source().query(originalQuery);
|
||||
mockSearchPhaseContext.searchTransport = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
|
||||
@Override
|
||||
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
|
||||
assertTrue(executedMultiSearch.compareAndSet(false, true));
|
||||
assertEquals(1, request.requests().size());
|
||||
SearchRequest searchRequest = request.requests().get(0);
|
||||
assertTrue(searchRequest.source().query() instanceof BoolQueryBuilder);
|
||||
BoolQueryBuilder groupBuilder = (BoolQueryBuilder) searchRequest.source().query();
|
||||
if (collapseValue == null) {
|
||||
assertThat(groupBuilder.mustNot(), Matchers.contains(QueryBuilders.existsQuery("someField")));
|
||||
} else {
|
||||
assertThat(groupBuilder.filter(), Matchers.contains(QueryBuilders.matchQuery("someField", "boom")));
|
||||
}
|
||||
if (originalQuery != null) {
|
||||
assertThat(groupBuilder.must(), Matchers.contains(QueryBuilders.termQuery("foo", "bar")));
|
||||
}
|
||||
assertArrayEquals(mockSearchPhaseContext.getRequest().indices(), searchRequest.indices());
|
||||
assertArrayEquals(mockSearchPhaseContext.getRequest().types(), searchRequest.types());
|
||||
|
||||
|
||||
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits,
|
||||
null, null, null, false, null);
|
||||
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
|
||||
listener.onResponse(new MultiSearchResponse(new MultiSearchResponse.Item[]{
|
||||
new MultiSearchResponse.Item(response, null)
|
||||
}));
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
SearchHits hits = new SearchHits(new SearchHit[]{new SearchHit(1, "ID", new Text("type"),
|
||||
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(collapseValue))))},
|
||||
1, 1.0F);
|
||||
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null);
|
||||
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
|
||||
AtomicReference<SearchResponse> reference = new AtomicReference<>();
|
||||
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
|
||||
new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
reference.set(r);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
phase.run();
|
||||
mockSearchPhaseContext.assertNoFailure();
|
||||
assertNotNull(reference.get());
|
||||
SearchResponse theResponse = reference.get();
|
||||
assertSame(theResponse, response);
|
||||
assertEquals(1, theResponse.getHits().getHits()[0].getInnerHits().size());
|
||||
assertSame(theResponse.getHits().getHits()[0].getInnerHits().get("foobarbaz"), collapsedHits);
|
||||
assertTrue(executedMultiSearch.get());
|
||||
assertEquals(1, mockSearchPhaseContext.phasesExecuted.get());
|
||||
}
|
||||
}
|
||||
|
||||
public void testFailOneItemFailsEntirePhase() throws IOException {
|
||||
AtomicBoolean executedMultiSearch = new AtomicBoolean(false);
|
||||
|
||||
SearchHits collapsedHits = new SearchHits(new SearchHit[]{new SearchHit(2, "ID", new Text("type"),
|
||||
Collections.emptyMap()), new SearchHit(3, "ID", new Text("type"),
|
||||
Collections.emptyMap())}, 1, 1.0F);
|
||||
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
|
||||
String collapseValue = randomBoolean() ? null : "boom";
|
||||
mockSearchPhaseContext.getRequest().source(new SearchSourceBuilder()
|
||||
.collapse(new CollapseBuilder("someField").setInnerHits(new InnerHitBuilder().setName("foobarbaz"))));
|
||||
mockSearchPhaseContext.searchTransport = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
|
||||
@Override
|
||||
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
|
||||
assertTrue(executedMultiSearch.compareAndSet(false, true));
|
||||
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits,
|
||||
null, null, null, false, null);
|
||||
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
|
||||
listener.onResponse(new MultiSearchResponse(new MultiSearchResponse.Item[]{
|
||||
new MultiSearchResponse.Item(null, new RuntimeException("boom")),
|
||||
new MultiSearchResponse.Item(response, null)
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
SearchHits hits = new SearchHits(new SearchHit[]{new SearchHit(1, "ID", new Text("type"),
|
||||
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(collapseValue)))),
|
||||
new SearchHit(2, "ID2", new Text("type"),
|
||||
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(collapseValue))))}, 1,
|
||||
1.0F);
|
||||
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null);
|
||||
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
|
||||
AtomicReference<SearchResponse> reference = new AtomicReference<>();
|
||||
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
|
||||
new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
reference.set(r);
|
||||
}
|
||||
}
|
||||
);
|
||||
phase.run();
|
||||
assertThat(mockSearchPhaseContext.phaseFailure.get(), Matchers.instanceOf(RuntimeException.class));
|
||||
assertEquals("boom", mockSearchPhaseContext.phaseFailure.get().getMessage());
|
||||
assertNotNull(mockSearchPhaseContext.phaseFailure.get());
|
||||
assertNull(reference.get());
|
||||
assertEquals(0, mockSearchPhaseContext.phasesExecuted.get());
|
||||
}
|
||||
|
||||
public void testSkipPhase() throws IOException {
|
||||
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
|
||||
mockSearchPhaseContext.searchTransport = new SearchTransportService(
|
||||
Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
|
||||
@Override
|
||||
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
|
||||
fail("no collapsing here");
|
||||
}
|
||||
};
|
||||
|
||||
SearchHits hits = new SearchHits(new SearchHit[]{new SearchHit(1, "ID", new Text("type"),
|
||||
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(null)))),
|
||||
new SearchHit(2, "ID2", new Text("type"),
|
||||
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(null))))}, 1, 1.0F);
|
||||
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null);
|
||||
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
|
||||
AtomicReference<SearchResponse> reference = new AtomicReference<>();
|
||||
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
|
||||
new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
reference.set(r);
|
||||
}
|
||||
}
|
||||
);
|
||||
phase.run();
|
||||
mockSearchPhaseContext.assertNoFailure();
|
||||
assertNotNull(reference.get());
|
||||
assertEquals(1, mockSearchPhaseContext.phasesExecuted.get());
|
||||
}
|
||||
}
|
|
@ -36,6 +36,7 @@ import java.util.Collections;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -50,13 +51,14 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
|
|||
List<ShardSearchFailure> failures = Collections.synchronizedList(new ArrayList<>());
|
||||
SearchTransportService searchTransport;
|
||||
Set<Long> releasedSearchContexts = new HashSet<>();
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
AtomicInteger phasesExecuted = new AtomicInteger();
|
||||
|
||||
public MockSearchPhaseContext(int numShards) {
|
||||
this.numShards = numShards;
|
||||
numSuccess = new AtomicInteger(numShards);
|
||||
}
|
||||
|
||||
|
||||
public void assertNoFailure() {
|
||||
if (phaseFailure.get() != null) {
|
||||
throw new AssertionError(phaseFailure.get());
|
||||
|
@ -80,7 +82,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
|
|||
|
||||
@Override
|
||||
public SearchRequest getRequest() {
|
||||
return new SearchRequest();
|
||||
return searchRequest;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -119,10 +121,11 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
|
|||
|
||||
@Override
|
||||
public void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase) {
|
||||
phasesExecuted.incrementAndGet();
|
||||
try {
|
||||
nextPhase.run();
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
} catch (Exception e) {
|
||||
onPhaseFailure(nextPhase, "phase failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue