SearchPhaseContext to not extend ActionListener (#43269)

The fact that SearchPhaseContext extends ActionListener makes it hard
to reason about when the original listener is notified and to trace
those calls. Also, the corresponding onFailure and onResponse were
only needed in two places, one each, where they can be replaced by a
more intuitive call, like sendSearchResponse for onResponse.
This commit is contained in:
Luca Cavanna 2019-06-19 11:18:40 +02:00
parent c33d62adbc
commit 94a4bc9933
7 changed files with 97 additions and 112 deletions

View File

@ -284,8 +284,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
return request;
}
@Override
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
ShardSearchFailure[] failures = buildShardFailures();
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
@ -296,6 +295,11 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
skippedOps.get(), buildTookInMillis(), failures, clusters);
}
@Override
public void sendSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
listener.onResponse(buildSearchResponse(internalSearchResponse, scrollId));
}
@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
@ -316,11 +320,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
executor.execute(command);
}
@Override
public final void onResponse(SearchResponse response) {
listener.onResponse(response);
}
@Override
public final void onFailure(Exception e) {
listener.onFailure(e);

View File

@ -30,7 +30,6 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.internal.InternalSearchResponse;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -65,7 +64,7 @@ final class ExpandSearchPhase extends SearchPhase {
}
@Override
public void run() throws IOException {
public void run() {
if (isCollapseRequest() && searchResponse.hits().getHits().length > 0) {
SearchRequest searchRequest = context.getRequest();
CollapseBuilder collapseBuilder = searchRequest.source().collapse();

View File

@ -22,8 +22,8 @@ import com.carrotsearch.hppc.IntArrayList;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
@ -76,10 +76,10 @@ final class FetchSearchPhase extends SearchPhase {
}
@Override
public void run() throws IOException {
context.execute(new ActionRunnable<SearchResponse>(context) {
public void run() {
context.execute(new AbstractRunnable() {
@Override
public void doRun() throws IOException {
protected void doRun() throws Exception {
// we do the heavy lifting in this inner run method where we reduce aggs etc. that's why we fork this phase
// off immediately instead of forking when we send back the response to the user since there we only need
// to merge together the fetched results which is a linear operation.
@ -209,8 +209,8 @@ final class FetchSearchPhase extends SearchPhase {
private static SearchPhase sendResponsePhase(InternalSearchResponse response, String scrollId, SearchPhaseContext context) {
return new SearchPhase("response") {
@Override
public void run() throws IOException {
context.onResponse(context.buildSearchResponse(response, scrollId));
public void run() {
context.sendSearchResponse(response, scrollId);
}
};
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.search.SearchShardTarget;
@ -32,7 +31,7 @@ import java.util.concurrent.Executor;
/**
* This class provide contextual state and access to resources across multiple search phases.
*/
interface SearchPhaseContext extends ActionListener<SearchResponse>, Executor {
interface SearchPhaseContext extends Executor {
// TODO maybe we can make this concrete later - for now we just implement this in the base class for all initial phases
/**
@ -56,11 +55,16 @@ interface SearchPhaseContext extends ActionListener<SearchResponse>, Executor {
SearchRequest getRequest();
/**
* Builds the final search response that should be send back to the user.
* Builds and sends the final search response back to the user.
* @param internalSearchResponse the internal search response
* @param scrollId an optional scroll ID if this search is a scroll search
*/
SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId);
void sendSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId);
/**
* Notifies the top-level listener of the provided exception
*/
void onFailure(Exception e);
/**
* This method will communicate a fatal phase failure back to the user. In contrast to a shard failure
@ -113,5 +117,4 @@ interface SearchPhaseContext extends ActionListener<SearchResponse>, Executor {
* a response is returned to the user indicating that all shards have failed.
*/
void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase);
}

View File

@ -40,7 +40,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -94,8 +93,8 @@ public class ExpandSearchPhaseTests extends ESTestCase {
for (int innerHitNum = 0; innerHitNum < numInnerHits; innerHitNum++) {
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits.get(innerHitNum),
null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
mSearchResponses.add(new MultiSearchResponse.Item(response, null));
mockSearchPhaseContext.sendSearchResponse(internalSearchResponse, null);
mSearchResponses.add(new MultiSearchResponse.Item(mockSearchPhaseContext.searchResponse.get(), null));
}
listener.onResponse(
@ -107,20 +106,19 @@ public class ExpandSearchPhaseTests extends ESTestCase {
Collections.singletonMap("someField", new DocumentField("someField", Collections.singletonList(collapseValue))))},
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
AtomicReference<SearchResponse> reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, (r) ->
new SearchPhase("test") {
@Override
public void run() throws IOException {
reference.set(mockSearchPhaseContext.buildSearchResponse(r, null));
public void run() {
mockSearchPhaseContext.sendSearchResponse(r, null);
}
}
);
phase.run();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(reference.get());
SearchResponse theResponse = reference.get();
SearchResponse theResponse = mockSearchPhaseContext.searchResponse.get();
assertNotNull(theResponse);
assertEquals(numInnerHits, theResponse.getHits().getHits()[0].getInnerHits().size());
for (int innerHitNum = 0; innerHitNum < numInnerHits; innerHitNum++) {
@ -148,11 +146,12 @@ public class ExpandSearchPhaseTests extends ESTestCase {
assertTrue(executedMultiSearch.compareAndSet(false, true));
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits,
null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, 1, 1, 0, 0,
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
listener.onResponse(new MultiSearchResponse(
new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new RuntimeException("boom")),
new MultiSearchResponse.Item(response, null)
new MultiSearchResponse.Item(searchResponse, null)
}, randomIntBetween(1, 10000)));
}
};
@ -163,12 +162,11 @@ public class ExpandSearchPhaseTests extends ESTestCase {
Collections.singletonMap("someField", new DocumentField("someField", Collections.singletonList(collapseValue))))},
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
AtomicReference<SearchResponse> reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, r ->
new SearchPhase("test") {
@Override
public void run() throws IOException {
reference.set(mockSearchPhaseContext.buildSearchResponse(r, null));
public void run() {
mockSearchPhaseContext.sendSearchResponse(r, null);
}
}
);
@ -176,7 +174,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
assertThat(mockSearchPhaseContext.phaseFailure.get(), Matchers.instanceOf(RuntimeException.class));
assertEquals("boom", mockSearchPhaseContext.phaseFailure.get().getMessage());
assertNotNull(mockSearchPhaseContext.phaseFailure.get());
assertNull(reference.get());
assertNull(mockSearchPhaseContext.searchResponse.get());
assertEquals(0, mockSearchPhaseContext.phasesExecuted.get());
}
@ -195,18 +193,17 @@ public class ExpandSearchPhaseTests extends ESTestCase {
Collections.singletonMap("someField", new DocumentField("someField", Collections.singletonList(null))))},
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
AtomicReference<SearchResponse> reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, r ->
new SearchPhase("test") {
@Override
public void run() throws IOException {
reference.set(mockSearchPhaseContext.buildSearchResponse(r, null));
public void run() {
mockSearchPhaseContext.sendSearchResponse(r, null);
}
}
);
phase.run();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(reference.get());
assertNotNull(mockSearchPhaseContext.searchResponse.get());
assertEquals(1, mockSearchPhaseContext.phasesExecuted.get());
}
@ -223,18 +220,17 @@ public class ExpandSearchPhaseTests extends ESTestCase {
SearchHits hits = new SearchHits(new SearchHit[0], new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0f);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
AtomicReference<SearchResponse> reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, r ->
new SearchPhase("test") {
@Override
public void run() throws IOException {
reference.set(mockSearchPhaseContext.buildSearchResponse(r, null));
public void run() {
mockSearchPhaseContext.sendSearchResponse(r, null);
}
}
);
phase.run();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(reference.get());
assertNotNull(mockSearchPhaseContext.searchResponse.get());
assertEquals(1, mockSearchPhaseContext.phasesExecuted.get());
}
@ -269,18 +265,17 @@ public class ExpandSearchPhaseTests extends ESTestCase {
SearchHits hits = new SearchHits(new SearchHit[0], new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0f);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
AtomicReference<SearchResponse> reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, r ->
new SearchPhase("test") {
@Override
public void run() throws IOException {
reference.set(mockSearchPhaseContext.buildSearchResponse(r, null));
public void run() {
mockSearchPhaseContext.sendSearchResponse(r, null);
}
}
);
phase.run();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(reference.get());
assertNotNull(mockSearchPhaseContext.searchResponse.get());
assertEquals(1, mockSearchPhaseContext.phasesExecuted.get());
}
}

View File

@ -42,7 +42,6 @@ import org.elasticsearch.transport.Transport;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class FetchSearchPhaseTests extends ESTestCase {
@ -52,7 +51,6 @@ public class FetchSearchPhaseTests extends ESTestCase {
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 1);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
boolean hasHits = randomBoolean();
final int numHits;
if (hasHits) {
@ -73,17 +71,18 @@ public class FetchSearchPhaseTests extends ESTestCase {
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
public void run() {
mockSearchPhaseContext.sendSearchResponse(searchResponse, null);
}
});
assertEquals("fetch", phase.getName());
phase.run();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(responseRef.get());
assertEquals(numHits, responseRef.get().getHits().getTotalHits().value);
SearchResponse searchResponse = mockSearchPhaseContext.searchResponse.get();
assertNotNull(searchResponse);
assertEquals(numHits, searchResponse.getHits().getTotalHits().value);
if (numHits != 0) {
assertEquals(42, responseRef.get().getHits().getAt(0).docId());
assertEquals(42, searchResponse.getHits().getAt(0).docId());
}
assertTrue(mockSearchPhaseContext.releasedSearchContexts.isEmpty());
}
@ -94,7 +93,6 @@ public class FetchSearchPhaseTests extends ESTestCase {
(b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = randomIntBetween(2, 10);
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
null, OriginalIndices.NONE));
@ -111,7 +109,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
queryResult.setShardIndex(1);
results.consumeResult(queryResult);
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
mockSearchPhaseContext.searchTransport = new SearchTransportService(null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
@ -127,23 +125,23 @@ public class FetchSearchPhaseTests extends ESTestCase {
listener.onResponse(fetchResult);
}
};
mockSearchPhaseContext.searchTransport = searchTransportService;
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
public void run() {
mockSearchPhaseContext.sendSearchResponse(searchResponse, null);
}
});
assertEquals("fetch", phase.getName());
phase.run();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(responseRef.get());
assertEquals(2, responseRef.get().getHits().getTotalHits().value);
assertEquals(84, responseRef.get().getHits().getAt(0).docId());
assertEquals(42, responseRef.get().getHits().getAt(1).docId());
assertEquals(0, responseRef.get().getFailedShards());
assertEquals(2, responseRef.get().getSuccessfulShards());
SearchResponse searchResponse = mockSearchPhaseContext.searchResponse.get();
assertNotNull(searchResponse);
assertEquals(2, searchResponse.getHits().getTotalHits().value);
assertEquals(84, searchResponse.getHits().getAt(0).docId());
assertEquals(42, searchResponse.getHits().getAt(1).docId());
assertEquals(0, searchResponse.getFailedShards());
assertEquals(2, searchResponse.getSuccessfulShards());
assertTrue(mockSearchPhaseContext.releasedSearchContexts.isEmpty());
}
@ -153,7 +151,6 @@ public class FetchSearchPhaseTests extends ESTestCase {
(b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = randomIntBetween(2, 10);
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
null, OriginalIndices.NONE));
@ -170,7 +167,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
queryResult.setShardIndex(1);
results.consumeResult(queryResult);
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
mockSearchPhaseContext.searchTransport = new SearchTransportService(null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
@ -185,24 +182,24 @@ public class FetchSearchPhaseTests extends ESTestCase {
}
};
mockSearchPhaseContext.searchTransport = searchTransportService;
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
public void run() {
mockSearchPhaseContext.sendSearchResponse(searchResponse, null);
}
});
assertEquals("fetch", phase.getName());
phase.run();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(responseRef.get());
assertEquals(2, responseRef.get().getHits().getTotalHits().value);
assertEquals(84, responseRef.get().getHits().getAt(0).docId());
assertEquals(1, responseRef.get().getFailedShards());
assertEquals(1, responseRef.get().getSuccessfulShards());
assertEquals(1, responseRef.get().getShardFailures().length);
assertTrue(responseRef.get().getShardFailures()[0].getCause() instanceof MockDirectoryWrapper.FakeIOException);
SearchResponse searchResponse = mockSearchPhaseContext.searchResponse.get();
assertNotNull(searchResponse);
assertEquals(2, searchResponse.getHits().getTotalHits().value);
assertEquals(84, searchResponse.getHits().getAt(0).docId());
assertEquals(1, searchResponse.getFailedShards());
assertEquals(1, searchResponse.getSuccessfulShards());
assertEquals(1, searchResponse.getShardFailures().length);
assertTrue(searchResponse.getShardFailures()[0].getCause() instanceof MockDirectoryWrapper.FakeIOException);
assertEquals(1, mockSearchPhaseContext.releasedSearchContexts.size());
assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(123L));
}
@ -216,7 +213,6 @@ public class FetchSearchPhaseTests extends ESTestCase {
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), numHits);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
for (int i = 0; i < numHits; i++) {
QuerySearchResult queryResult = new QuerySearchResult(i, new SearchShardTarget("node1", new ShardId("test", "na", 0),
null, OriginalIndices.NONE));
@ -226,7 +222,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
queryResult.setShardIndex(i);
results.consumeResult(queryResult);
}
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
mockSearchPhaseContext.searchTransport = new SearchTransportService(null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
@ -238,13 +234,12 @@ public class FetchSearchPhaseTests extends ESTestCase {
}).start();
}
};
mockSearchPhaseContext.searchTransport = searchTransportService;
CountDownLatch latch = new CountDownLatch(1);
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
public void run() {
mockSearchPhaseContext.sendSearchResponse(searchResponse, null);
latch.countDown();
}
});
@ -252,17 +247,18 @@ public class FetchSearchPhaseTests extends ESTestCase {
phase.run();
latch.await();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(responseRef.get());
assertEquals(numHits, responseRef.get().getHits().getTotalHits().value);
assertEquals(Math.min(numHits, resultSetSize), responseRef.get().getHits().getHits().length);
SearchHit[] hits = responseRef.get().getHits().getHits();
SearchResponse searchResponse = mockSearchPhaseContext.searchResponse.get();
assertNotNull(searchResponse);
assertEquals(numHits, searchResponse.getHits().getTotalHits().value);
assertEquals(Math.min(numHits, resultSetSize), searchResponse.getHits().getHits().length);
SearchHit[] hits = searchResponse.getHits().getHits();
for (int i = 0; i < hits.length; i++) {
assertNotNull(hits[i]);
assertEquals("index: " + i, numHits-i, hits[i].docId());
assertEquals("index: " + i, numHits-1-i, (int)hits[i].getScore());
}
assertEquals(0, responseRef.get().getFailedShards());
assertEquals(numHits, responseRef.get().getSuccessfulShards());
assertEquals(0, searchResponse.getFailedShards());
assertEquals(numHits, searchResponse.getSuccessfulShards());
int sizeReleasedContexts = Math.max(0, numHits - resultSetSize); // all non fetched results will be freed
assertEquals(mockSearchPhaseContext.releasedSearchContexts.toString(),
sizeReleasedContexts, mockSearchPhaseContext.releasedSearchContexts.size());
@ -274,7 +270,6 @@ public class FetchSearchPhaseTests extends ESTestCase {
(b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = randomIntBetween(2, 10);
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
null, OriginalIndices.NONE));
@ -291,7 +286,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
queryResult.setShardIndex(1);
results.consumeResult(queryResult);
AtomicInteger numFetches = new AtomicInteger(0);
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
mockSearchPhaseContext.searchTransport = new SearchTransportService(null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
@ -310,19 +305,18 @@ public class FetchSearchPhaseTests extends ESTestCase {
listener.onResponse(fetchResult);
}
};
mockSearchPhaseContext.searchTransport = searchTransportService;
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
public void run() {
mockSearchPhaseContext.sendSearchResponse(searchResponse, null);
}
});
assertEquals("fetch", phase.getName());
phase.run();
assertNotNull(mockSearchPhaseContext.phaseFailure.get());
assertEquals(mockSearchPhaseContext.phaseFailure.get().getMessage(), "BOOM");
assertNull(responseRef.get());
assertNull(mockSearchPhaseContext.searchResponse.get());
assertTrue(mockSearchPhaseContext.releasedSearchContexts.isEmpty());
}
@ -332,7 +326,6 @@ public class FetchSearchPhaseTests extends ESTestCase {
(b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = 1;
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
null, OriginalIndices.NONE));
@ -349,7 +342,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
queryResult.setShardIndex(1);
results.consumeResult(queryResult);
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
mockSearchPhaseContext.searchTransport = new SearchTransportService(null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
@ -363,23 +356,23 @@ public class FetchSearchPhaseTests extends ESTestCase {
listener.onResponse(fetchResult);
}
};
mockSearchPhaseContext.searchTransport = searchTransportService;
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
public void run() {
mockSearchPhaseContext.sendSearchResponse(searchResponse, null);
}
});
assertEquals("fetch", phase.getName());
phase.run();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(responseRef.get());
assertEquals(2, responseRef.get().getHits().getTotalHits().value);
assertEquals(1, responseRef.get().getHits().getHits().length);
assertEquals(84, responseRef.get().getHits().getAt(0).docId());
assertEquals(0, responseRef.get().getFailedShards());
assertEquals(2, responseRef.get().getSuccessfulShards());
SearchResponse searchResponse = mockSearchPhaseContext.searchResponse.get();
assertNotNull(searchResponse);
assertEquals(2, searchResponse.getHits().getTotalHits().value);
assertEquals(1, searchResponse.getHits().getHits().length);
assertEquals(84, searchResponse.getHits().getAt(0).docId());
assertEquals(0, searchResponse.getFailedShards());
assertEquals(2, searchResponse.getSuccessfulShards());
assertEquals(1, mockSearchPhaseContext.releasedSearchContexts.size());
assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(123L));
}

View File

@ -49,6 +49,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
Set<Long> releasedSearchContexts = new HashSet<>();
SearchRequest searchRequest = new SearchRequest();
AtomicInteger phasesExecuted = new AtomicInteger();
AtomicReference<SearchResponse> searchResponse = new AtomicReference<>();
public MockSearchPhaseContext(int numShards) {
this.numShards = numShards;
@ -82,9 +83,9 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
}
@Override
public SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
return new SearchResponse(internalSearchResponse, scrollId, numShards, numSuccess.get(), 0, 0,
failures.toArray(new ShardSearchFailure[failures.size()]), SearchResponse.Clusters.EMPTY);
public void sendSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
searchResponse.set(new SearchResponse(internalSearchResponse, scrollId, numShards, numSuccess.get(), 0, 0,
failures.toArray(ShardSearchFailure.EMPTY_ARRAY), SearchResponse.Clusters.EMPTY));
}
@Override
@ -130,11 +131,6 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
command.run();
}
@Override
public void onResponse(SearchResponse response) {
Assert.fail("should not be called");
}
@Override
public void onFailure(Exception e) {
Assert.fail("should not be called");