Return the same point in time in search response (#64188)

With this change, we will always return the same point in time in a
search response as its input until we implement the retry mechanism
for the point in times.
This commit is contained in:
Nhat Nguyen 2020-10-27 09:46:32 -04:00
parent 6f1c8136a6
commit 566d1fd459
3 changed files with 69 additions and 19 deletions

View File

@ -533,8 +533,10 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
skippedOps.get(), buildTookInMillis(), failures, clusters, searchContextId);
}
boolean includeSearchContextInResponse() {
return request.pointInTimeBuilder() != null;
boolean buildPointInTimeFromSearchResults() {
// TODO: Until we implement the retry mechanism for point in times (i.e., replace an unavailable shard with an equivalent copy),
// we can simply return the point in time of the search request.
return false;
}
@Override
@ -547,8 +549,16 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
} else {
final Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
final String scrollId = request.scroll() != null ? TransportSearchHelper.buildScrollId(queryResults, minNodeVersion) : null;
final String searchContextId =
includeSearchContextInResponse() ? SearchContextId.encode(queryResults.asList(), aliasFilter, minNodeVersion) : null;
final String searchContextId;
if (buildPointInTimeFromSearchResults()) {
searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minNodeVersion);
} else {
if (request.source() != null && request.source().pointInTimeBuilder() != null) {
searchContextId = request.source().pointInTimeBuilder().getId();
} else {
searchContextId = null;
}
}
listener.onResponse(buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));
}
}

View File

@ -268,7 +268,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
@Override
boolean includeSearchContextInResponse() {
boolean buildPointInTimeFromSearchResults() {
return includeSearchContext;
}
};

View File

@ -46,6 +46,7 @@ import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.arrayWithSize;
@ -146,7 +147,7 @@ public class PointInTimeIT extends ESIntegTestCase {
assertNoFailures(resp);
assertHitCount(resp, numDocs);
assertNotNull(resp.pointInTimeId());
pitId = resp.pointInTimeId();
assertThat(resp.pointInTimeId(), equalTo(pitId));
int moreDocs = randomIntBetween(10, 50);
for (int i = 0; i < moreDocs; i++) {
String id = "more-" + i;
@ -162,7 +163,7 @@ public class PointInTimeIT extends ESIntegTestCase {
assertNoFailures(resp);
assertHitCount(resp, numDocs);
assertNotNull(resp.pointInTimeId());
pitId = resp.pointInTimeId();
assertThat(resp.pointInTimeId(), equalTo(pitId));
} finally {
closePointInTime(pitId);
}
@ -185,10 +186,7 @@ public class PointInTimeIT extends ESIntegTestCase {
.get();
assertNoFailures(resp);
assertHitCount(resp, numDocs);
assertNotNull(resp.pointInTimeId());
if (randomBoolean()) {
pitId = resp.pointInTimeId();
}
assertThat(resp.pointInTimeId(), equalTo(pitId));
final Set<String> dataNodes = StreamSupport.stream(clusterService().state().nodes().getDataNodes().spliterator(), false)
.map(e -> e.value.getId()).collect(Collectors.toSet());
final List<String> excludedNodes = randomSubsetOf(2, dataNodes);
@ -207,10 +205,7 @@ public class PointInTimeIT extends ESIntegTestCase {
.get();
assertNoFailures(resp);
assertHitCount(resp, numDocs);
assertNotNull(resp.pointInTimeId());
if (randomBoolean()) {
pitId = resp.pointInTimeId();
}
assertThat(resp.pointInTimeId(), equalTo(pitId));
assertBusy(() -> {
final Set<String> assignedNodes = clusterService().state().routingTable().allShards().stream()
.filter(shr -> shr.index().getName().equals("test") && shr.assignedToNode())
@ -224,10 +219,7 @@ public class PointInTimeIT extends ESIntegTestCase {
.get();
assertNoFailures(resp);
assertHitCount(resp, numDocs);
assertNotNull(resp.pointInTimeId());
if (randomBoolean()) {
pitId = resp.pointInTimeId();
}
assertThat(resp.pointInTimeId(), equalTo(pitId));
} finally {
closePointInTime(pitId);
}
@ -345,6 +337,54 @@ public class PointInTimeIT extends ESIntegTestCase {
}
}
public void testPartialResults() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
final List<String> dataNodes =
StreamSupport.stream(internalCluster().clusterService().state().nodes().getDataNodes().spliterator(), false)
.map(e -> e.value.getName())
.collect(Collectors.toList());
final String assignedNodeForIndex1 = randomFrom(dataNodes);
createIndex("test-1", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.include._name", assignedNodeForIndex1)
.build());
createIndex("test-2", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.exclude._name", assignedNodeForIndex1)
.build());
int numDocs1 = randomIntBetween(10, 50);
for (int i = 0; i < numDocs1; i++) {
client().prepareIndex("test-1", "_doc").setId(Integer.toString(i)).setSource("value", i).get();
}
int numDocs2 = randomIntBetween(10, 50);
for (int i = 0; i < numDocs2; i++) {
client().prepareIndex("test-2", "_doc").setId(Integer.toString(i)).setSource("value", i).get();
}
refresh();
String pitId = openPointInTime(new String[]{"test-*"}, TimeValue.timeValueMinutes(2));
try {
SearchResponse resp = client().prepareSearch()
.setPreference(null)
.setPointInTime(new PointInTimeBuilder(pitId))
.get();
assertNoFailures(resp);
assertHitCount(resp, numDocs1 + numDocs2);
assertThat(resp.pointInTimeId(), equalTo(pitId));
internalCluster().restartNode(assignedNodeForIndex1);
resp = client().prepareSearch()
.setPreference(null)
.setAllowPartialSearchResults(true)
.setPointInTime(new PointInTimeBuilder(pitId))
.get();
assertFailures(resp);
assertThat(resp.pointInTimeId(), equalTo(pitId));
assertHitCount(resp, numDocs2);
} finally {
closePointInTime(pitId);
}
}
private String openPointInTime(String[] indices, TimeValue keepAlive) {
OpenPointInTimeRequest request = new OpenPointInTimeRequest(
indices,