mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Compute the took time of the query after the expand phase (#24902)
The took time computed for search requests does not take in account the expand search phase. This change delays the computation to after the expand phase finishes. Relates #24900
This commit is contained in:
parent
a77b38cdd1
commit
ec64c2c05f
@ -28,6 +28,7 @@ import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.collapse.CollapseBuilder;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
@ -42,11 +43,11 @@ import java.util.function.Function;
|
||||
*/
|
||||
final class ExpandSearchPhase extends SearchPhase {
|
||||
private final SearchPhaseContext context;
|
||||
private final SearchResponse searchResponse;
|
||||
private final Function<SearchResponse, SearchPhase> nextPhaseFactory;
|
||||
private final InternalSearchResponse searchResponse;
|
||||
private final Function<InternalSearchResponse, SearchPhase> nextPhaseFactory;
|
||||
|
||||
ExpandSearchPhase(SearchPhaseContext context, SearchResponse searchResponse,
|
||||
Function<SearchResponse, SearchPhase> nextPhaseFactory) {
|
||||
ExpandSearchPhase(SearchPhaseContext context, InternalSearchResponse searchResponse,
|
||||
Function<InternalSearchResponse, SearchPhase> nextPhaseFactory) {
|
||||
super("expand");
|
||||
this.context = context;
|
||||
this.searchResponse = searchResponse;
|
||||
@ -65,7 +66,7 @@ final class ExpandSearchPhase extends SearchPhase {
|
||||
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
if (isCollapseRequest() && searchResponse.getHits().getHits().length > 0) {
|
||||
if (isCollapseRequest() && searchResponse.hits().getHits().length > 0) {
|
||||
SearchRequest searchRequest = context.getRequest();
|
||||
CollapseBuilder collapseBuilder = searchRequest.source().collapse();
|
||||
final List<InnerHitBuilder> innerHitBuilders = collapseBuilder.getInnerHits();
|
||||
@ -73,7 +74,7 @@ final class ExpandSearchPhase extends SearchPhase {
|
||||
if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {
|
||||
multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());
|
||||
}
|
||||
for (SearchHit hit : searchResponse.getHits()) {
|
||||
for (SearchHit hit : searchResponse.hits().getHits()) {
|
||||
BoolQueryBuilder groupQuery = new BoolQueryBuilder();
|
||||
Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
|
||||
if (collapseValue != null) {
|
||||
@ -97,7 +98,7 @@ final class ExpandSearchPhase extends SearchPhase {
|
||||
context.getSearchTransport().sendExecuteMultiSearch(multiRequest, context.getTask(),
|
||||
ActionListener.wrap(response -> {
|
||||
Iterator<MultiSearchResponse.Item> it = response.iterator();
|
||||
for (SearchHit hit : searchResponse.getHits()) {
|
||||
for (SearchHit hit : searchResponse.hits.getHits()) {
|
||||
for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {
|
||||
MultiSearchResponse.Item item = it.next();
|
||||
if (item.isFailure()) {
|
||||
|
@ -36,7 +36,7 @@ import org.elasticsearch.transport.Transport;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
/**
|
||||
* This search phase merges the query results from the previous phase together and calculates the topN hits for this search.
|
||||
@ -46,7 +46,7 @@ final class FetchSearchPhase extends SearchPhase {
|
||||
private final AtomicArray<FetchSearchResult> fetchResults;
|
||||
private final SearchPhaseController searchPhaseController;
|
||||
private final AtomicArray<SearchPhaseResult> queryResults;
|
||||
private final Function<SearchResponse, SearchPhase> nextPhaseFactory;
|
||||
private final BiFunction<InternalSearchResponse, String, SearchPhase> nextPhaseFactory;
|
||||
private final SearchPhaseContext context;
|
||||
private final Logger logger;
|
||||
private final InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer;
|
||||
@ -55,13 +55,13 @@ final class FetchSearchPhase extends SearchPhase {
|
||||
SearchPhaseController searchPhaseController,
|
||||
SearchPhaseContext context) {
|
||||
this(resultConsumer, searchPhaseController, context,
|
||||
(response) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits
|
||||
(finalResponse) -> sendResponsePhase(finalResponse, context)));
|
||||
(response, scrollId) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits
|
||||
(finalResponse) -> sendResponsePhase(finalResponse, scrollId, context)));
|
||||
}
|
||||
|
||||
FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer,
|
||||
SearchPhaseController searchPhaseController,
|
||||
SearchPhaseContext context, Function<SearchResponse, SearchPhase> nextPhaseFactory) {
|
||||
SearchPhaseContext context, BiFunction<InternalSearchResponse, String, SearchPhase> nextPhaseFactory) {
|
||||
super("fetch");
|
||||
if (context.getNumShards() != resultConsumer.getNumShards()) {
|
||||
throw new IllegalStateException("number of shards must match the length of the query results but doesn't:"
|
||||
@ -205,14 +205,14 @@ final class FetchSearchPhase extends SearchPhase {
|
||||
AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,
|
||||
reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);
|
||||
context.executeNextPhase(this, nextPhaseFactory.apply(context.buildSearchResponse(internalResponse, scrollId)));
|
||||
context.executeNextPhase(this, nextPhaseFactory.apply(internalResponse, scrollId));
|
||||
}
|
||||
|
||||
private static SearchPhase sendResponsePhase(SearchResponse response, SearchPhaseContext context) {
|
||||
private static SearchPhase sendResponsePhase(InternalSearchResponse response, String scrollId, SearchPhaseContext context) {
|
||||
return new SearchPhase("response") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
context.onResponse(response);
|
||||
context.onResponse(context.buildSearchResponse(response, scrollId));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -108,13 +108,12 @@ public class ExpandSearchPhaseTests extends ESTestCase {
|
||||
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(collapseValue))))},
|
||||
1, 1.0F);
|
||||
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
|
||||
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
|
||||
AtomicReference<SearchResponse> reference = new AtomicReference<>();
|
||||
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
|
||||
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, (r) ->
|
||||
new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
reference.set(r);
|
||||
reference.set(mockSearchPhaseContext.buildSearchResponse(r, null));
|
||||
}
|
||||
}
|
||||
);
|
||||
@ -123,7 +122,6 @@ public class ExpandSearchPhaseTests extends ESTestCase {
|
||||
mockSearchPhaseContext.assertNoFailure();
|
||||
assertNotNull(reference.get());
|
||||
SearchResponse theResponse = reference.get();
|
||||
assertSame(theResponse, response);
|
||||
assertEquals(numInnerHits, theResponse.getHits().getHits()[0].getInnerHits().size());
|
||||
|
||||
for (int innerHitNum = 0; innerHitNum < numInnerHits; innerHitNum++) {
|
||||
@ -167,13 +165,12 @@ public class ExpandSearchPhaseTests extends ESTestCase {
|
||||
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(collapseValue))))}, 1,
|
||||
1.0F);
|
||||
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
|
||||
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
|
||||
AtomicReference<SearchResponse> reference = new AtomicReference<>();
|
||||
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
|
||||
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, r ->
|
||||
new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
reference.set(r);
|
||||
reference.set(mockSearchPhaseContext.buildSearchResponse(r, null));
|
||||
}
|
||||
}
|
||||
);
|
||||
@ -201,13 +198,12 @@ public class ExpandSearchPhaseTests extends ESTestCase {
|
||||
new SearchHit(2, "ID2", new Text("type"),
|
||||
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(null))))}, 1, 1.0F);
|
||||
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
|
||||
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
|
||||
AtomicReference<SearchResponse> reference = new AtomicReference<>();
|
||||
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
|
||||
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, r ->
|
||||
new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
reference.set(r);
|
||||
reference.set(mockSearchPhaseContext.buildSearchResponse(r, null));
|
||||
}
|
||||
}
|
||||
);
|
||||
@ -232,13 +228,12 @@ public class ExpandSearchPhaseTests extends ESTestCase {
|
||||
|
||||
SearchHits hits = new SearchHits(new SearchHit[0], 1, 1.0f);
|
||||
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
|
||||
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
|
||||
AtomicReference<SearchResponse> reference = new AtomicReference<>();
|
||||
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
|
||||
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, r ->
|
||||
new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
reference.set(r);
|
||||
reference.set(mockSearchPhaseContext.buildSearchResponse(r, null));
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -32,6 +32,7 @@ import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.fetch.FetchSearchResult;
|
||||
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
|
||||
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
import org.elasticsearch.search.query.QuerySearchResult;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
@ -66,10 +67,10 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
||||
}
|
||||
|
||||
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
|
||||
(searchResponse) -> new SearchPhase("test") {
|
||||
(searchResponse, scrollId) -> new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
responseRef.set(searchResponse);
|
||||
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
|
||||
}
|
||||
});
|
||||
assertEquals("fetch", phase.getName());
|
||||
@ -119,10 +120,10 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
||||
};
|
||||
mockSearchPhaseContext.searchTransport = searchTransportService;
|
||||
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
|
||||
(searchResponse) -> new SearchPhase("test") {
|
||||
(searchResponse, scrollId) -> new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
responseRef.set(searchResponse);
|
||||
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
|
||||
}
|
||||
});
|
||||
assertEquals("fetch", phase.getName());
|
||||
@ -173,10 +174,10 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
||||
};
|
||||
mockSearchPhaseContext.searchTransport = searchTransportService;
|
||||
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
|
||||
(searchResponse) -> new SearchPhase("test") {
|
||||
(searchResponse, scrollId) -> new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
responseRef.set(searchResponse);
|
||||
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
|
||||
}
|
||||
});
|
||||
assertEquals("fetch", phase.getName());
|
||||
@ -224,10 +225,10 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
||||
mockSearchPhaseContext.searchTransport = searchTransportService;
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
|
||||
(searchResponse) -> new SearchPhase("test") {
|
||||
(searchResponse, scrollId) -> new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
responseRef.set(searchResponse);
|
||||
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
@ -290,10 +291,10 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
||||
};
|
||||
mockSearchPhaseContext.searchTransport = searchTransportService;
|
||||
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
|
||||
(searchResponse) -> new SearchPhase("test") {
|
||||
(searchResponse, scrollId) -> new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
responseRef.set(searchResponse);
|
||||
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
|
||||
}
|
||||
});
|
||||
assertEquals("fetch", phase.getName());
|
||||
@ -339,10 +340,10 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
||||
};
|
||||
mockSearchPhaseContext.searchTransport = searchTransportService;
|
||||
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
|
||||
(searchResponse) -> new SearchPhase("test") {
|
||||
(searchResponse, scrollId) -> new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
responseRef.set(searchResponse);
|
||||
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
|
||||
}
|
||||
});
|
||||
assertEquals("fetch", phase.getName());
|
||||
@ -357,5 +358,4 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
||||
assertEquals(1, mockSearchPhaseContext.releasedSearchContexts.size());
|
||||
assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(123L));
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user