Clear scroll after it is complete (elastic/elasticsearch#847)
The ScrollDataExtractor needs to clear the scroll after it is complete. Originally, it was thought that completing a scroll leads to an automatic clearing of its context. That is not true, thus manual clearing has to be requested. - Also removes sorting in AggregationDataExtractor as it was redundant Original commit: elastic/x-pack-elasticsearch@8f955da8ce
This commit is contained in:
parent
3504608a1e
commit
5ba9a6cfcc
|
@ -83,7 +83,6 @@ class AggregationDataExtractor implements DataExtractor {
|
||||||
|
|
||||||
private SearchRequestBuilder buildSearchRequest() {
|
private SearchRequestBuilder buildSearchRequest() {
|
||||||
SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client)
|
SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client)
|
||||||
.addSort(context.timeField, SortOrder.ASC)
|
|
||||||
.setIndices(context.indexes)
|
.setIndices(context.indexes)
|
||||||
.setTypes(context.types)
|
.setTypes(context.types)
|
||||||
.setSize(0)
|
.setSize(0)
|
||||||
|
|
|
@ -120,6 +120,7 @@ class ScrollDataExtractor implements DataExtractor {
|
||||||
scrollId = searchResponse.getScrollId();
|
scrollId = searchResponse.getScrollId();
|
||||||
if (searchResponse.getHits().hits().length == 0) {
|
if (searchResponse.getHits().hits().length == 0) {
|
||||||
hasNext = false;
|
hasNext = false;
|
||||||
|
clearScroll(scrollId);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -117,7 +117,6 @@ public class AggregationDataExtractorTests extends ESTestCase {
|
||||||
assertThat(searchRequest, containsString("\"query\":{\"bool\":{\"filter\":[{\"match_all\":{\"boost\":1.0}}," +
|
assertThat(searchRequest, containsString("\"query\":{\"bool\":{\"filter\":[{\"match_all\":{\"boost\":1.0}}," +
|
||||||
"{\"range\":{\"time\":{\"from\":1000,\"to\":4000,\"include_lower\":true,\"include_upper\":false," +
|
"{\"range\":{\"time\":{\"from\":1000,\"to\":4000,\"include_lower\":true,\"include_upper\":false," +
|
||||||
"\"format\":\"epoch_millis\",\"boost\":1.0}}}]"));
|
"\"format\":\"epoch_millis\",\"boost\":1.0}}}]"));
|
||||||
assertThat(searchRequest, containsString("\"sort\":[{\"time\":{\"order\":\"asc\"}}]"));
|
|
||||||
assertThat(searchRequest,
|
assertThat(searchRequest,
|
||||||
stringContainsInOrder(Arrays.asList("aggregations", "histogram", "time", "terms", "airline", "avg", "responsetime")));
|
stringContainsInOrder(Arrays.asList("aggregations", "histogram", "time", "terms", "airline", "avg", "responsetime")));
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,12 +105,12 @@ public class ScrollDataExtractorTests extends ESTestCase {
|
||||||
public void testSinglePageExtraction() throws IOException {
|
public void testSinglePageExtraction() throws IOException {
|
||||||
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
|
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
|
||||||
|
|
||||||
SearchResponse response = createSearchResponse(
|
SearchResponse response1 = createSearchResponse(
|
||||||
Arrays.asList(1100L, 1200L),
|
Arrays.asList(1100L, 1200L),
|
||||||
Arrays.asList("a1", "a2"),
|
Arrays.asList("a1", "a2"),
|
||||||
Arrays.asList("b1", "b2")
|
Arrays.asList("b1", "b2")
|
||||||
);
|
);
|
||||||
extractor.setNextResponse(response);
|
extractor.setNextResponse(response1);
|
||||||
|
|
||||||
assertThat(extractor.hasNext(), is(true));
|
assertThat(extractor.hasNext(), is(true));
|
||||||
Optional<InputStream> stream = extractor.next();
|
Optional<InputStream> stream = extractor.next();
|
||||||
|
@ -118,7 +118,8 @@ public class ScrollDataExtractorTests extends ESTestCase {
|
||||||
String expectedStream = "{\"time\":1100,\"field_1\":\"a1\"} {\"time\":1200,\"field_1\":\"a2\"}";
|
String expectedStream = "{\"time\":1100,\"field_1\":\"a1\"} {\"time\":1200,\"field_1\":\"a2\"}";
|
||||||
assertThat(asString(stream.get()), equalTo(expectedStream));
|
assertThat(asString(stream.get()), equalTo(expectedStream));
|
||||||
|
|
||||||
extractor.setNextResponse(createEmptySearchResponse());
|
SearchResponse response2 = createEmptySearchResponse();
|
||||||
|
extractor.setNextResponse(response2);
|
||||||
assertThat(extractor.hasNext(), is(true));
|
assertThat(extractor.hasNext(), is(true));
|
||||||
assertThat(extractor.next().isPresent(), is(false));
|
assertThat(extractor.next().isPresent(), is(false));
|
||||||
assertThat(extractor.hasNext(), is(false));
|
assertThat(extractor.hasNext(), is(false));
|
||||||
|
@ -133,9 +134,10 @@ public class ScrollDataExtractorTests extends ESTestCase {
|
||||||
assertThat(searchRequest, containsString("\"stored_fields\":\"_none_\""));
|
assertThat(searchRequest, containsString("\"stored_fields\":\"_none_\""));
|
||||||
|
|
||||||
assertThat(capturedContinueScrollIds.size(), equalTo(1));
|
assertThat(capturedContinueScrollIds.size(), equalTo(1));
|
||||||
assertThat(capturedContinueScrollIds.get(0), equalTo(response.getScrollId()));
|
assertThat(capturedContinueScrollIds.get(0), equalTo(response1.getScrollId()));
|
||||||
|
|
||||||
assertThat(capturedClearScrollIds.isEmpty(), is(true));
|
assertThat(capturedClearScrollIds.size(), equalTo(1));
|
||||||
|
assertThat(capturedClearScrollIds.get(0), equalTo(response2.getScrollId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMultiplePageExtraction() throws IOException {
|
public void testMultiplePageExtraction() throws IOException {
|
||||||
|
@ -167,7 +169,8 @@ public class ScrollDataExtractorTests extends ESTestCase {
|
||||||
expectedStream = "{\"time\":3000,\"field_1\":\"a3\"} {\"time\":4000,\"field_1\":\"a4\"}";
|
expectedStream = "{\"time\":3000,\"field_1\":\"a3\"} {\"time\":4000,\"field_1\":\"a4\"}";
|
||||||
assertThat(asString(stream.get()), equalTo(expectedStream));
|
assertThat(asString(stream.get()), equalTo(expectedStream));
|
||||||
|
|
||||||
extractor.setNextResponse(createEmptySearchResponse());
|
SearchResponse response3 = createEmptySearchResponse();
|
||||||
|
extractor.setNextResponse(response3);
|
||||||
assertThat(extractor.hasNext(), is(true));
|
assertThat(extractor.hasNext(), is(true));
|
||||||
assertThat(extractor.next().isPresent(), is(false));
|
assertThat(extractor.next().isPresent(), is(false));
|
||||||
assertThat(extractor.hasNext(), is(false));
|
assertThat(extractor.hasNext(), is(false));
|
||||||
|
@ -184,7 +187,8 @@ public class ScrollDataExtractorTests extends ESTestCase {
|
||||||
assertThat(capturedContinueScrollIds.get(0), equalTo(response1.getScrollId()));
|
assertThat(capturedContinueScrollIds.get(0), equalTo(response1.getScrollId()));
|
||||||
assertThat(capturedContinueScrollIds.get(1), equalTo(response2.getScrollId()));
|
assertThat(capturedContinueScrollIds.get(1), equalTo(response2.getScrollId()));
|
||||||
|
|
||||||
assertThat(capturedClearScrollIds.isEmpty(), is(true));
|
assertThat(capturedClearScrollIds.size(), equalTo(1));
|
||||||
|
assertThat(capturedClearScrollIds.get(0), equalTo(response3.getScrollId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMultiplePageExtractionGivenCancel() throws IOException {
|
public void testMultiplePageExtractionGivenCancel() throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue