[7.x][ML] Scrolling datafeed should clear scroll contexts on error (#40773) (#40794)

Closes #40772
This commit is contained in:
Dimitris Athanasiou 2019-04-04 12:28:06 +03:00 committed by GitHub
parent 25944c4317
commit 65cca2ee6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 13 deletions

View File

@ -87,14 +87,24 @@ class ScrollDataExtractor implements DataExtractor {
if (!hasNext()) {
throw new NoSuchElementException();
}
Optional<InputStream> stream = scrollId == null ?
Optional.ofNullable(initScroll(context.start)) : Optional.ofNullable(continueScroll());
Optional<InputStream> stream = tryNextStream();
if (!stream.isPresent()) {
hasNext = false;
}
return stream;
}
private Optional<InputStream> tryNextStream() throws IOException {
try {
return scrollId == null ?
Optional.ofNullable(initScroll(context.start)) : Optional.ofNullable(continueScroll());
} catch (Exception e) {
// In case of error make sure we clear the scroll context
clearScroll();
throw e;
}
}
protected InputStream initScroll(long startTimestamp) throws IOException {
LOGGER.debug("[{}] Initializing scroll", context.jobId);
SearchResponse searchResponse = executeSearchRequest(buildSearchRequest(startTimestamp));
@ -131,6 +141,8 @@ class ScrollDataExtractor implements DataExtractor {
private InputStream processSearchResponse(SearchResponse searchResponse) throws IOException {
scrollId = searchResponse.getScrollId();
if (searchResponse.getFailedShards() > 0 && searchHasShardFailure == false) {
LOGGER.debug("[{}] Resetting scroll search after shard failure", context.jobId);
markScrollAsErrored();
@ -138,10 +150,9 @@ class ScrollDataExtractor implements DataExtractor {
}
ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse);
scrollId = searchResponse.getScrollId();
if (searchResponse.getHits().getHits().length == 0) {
hasNext = false;
clearScroll(scrollId);
clearScroll();
return null;
}
@ -155,7 +166,7 @@ class ScrollDataExtractor implements DataExtractor {
timestampOnCancel = timestamp;
} else if (timestamp.equals(timestampOnCancel) == false) {
hasNext = false;
clearScroll(scrollId);
clearScroll();
break;
}
}
@ -189,7 +200,7 @@ class ScrollDataExtractor implements DataExtractor {
private void markScrollAsErrored() {
// This could be a transient error with the scroll Id.
// Reinitialise the scroll and try again but only once.
resetScroll();
clearScroll();
if (lastTimestamp != null) {
lastTimestamp++;
}
@ -204,17 +215,13 @@ class ScrollDataExtractor implements DataExtractor {
.get());
}
private void resetScroll() {
clearScroll(scrollId);
scrollId = null;
}
private void clearScroll(String scrollId) {
private void clearScroll() {
if (scrollId != null) {
ClearScrollRequest request = new ClearScrollRequest();
request.addScrollId(scrollId);
ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client,
() -> client.execute(ClearScrollAction.INSTANCE, request).actionGet());
scrollId = null;
}
}
}

View File

@ -296,6 +296,9 @@ public class ScrollDataExtractorTests extends ESTestCase {
extractor.setNextResponse(createErrorResponse());
assertThat(extractor.hasNext(), is(true));
expectThrows(IOException.class, extractor::next);
List<String> capturedClearScrollIds = getCapturedClearScrollIds();
assertThat(capturedClearScrollIds.size(), equalTo(1));
}
public void testExtractionGivenInitSearchResponseHasShardFailures() throws IOException {
@ -305,6 +308,11 @@ public class ScrollDataExtractorTests extends ESTestCase {
assertThat(extractor.hasNext(), is(true));
expectThrows(IOException.class, extractor::next);
List<String> capturedClearScrollIds = getCapturedClearScrollIds();
// We should clear the scroll context twice: once for the first search when we retry
// and once after the retry where we'll have an exception
assertThat(capturedClearScrollIds.size(), equalTo(2));
}
public void testExtractionGivenInitSearchResponseEncounteredUnavailableShards() throws IOException {
@ -341,6 +349,9 @@ public class ScrollDataExtractorTests extends ESTestCase {
// A second failure is not tolerated
assertThat(extractor.hasNext(), is(true));
expectThrows(IOException.class, extractor::next);
List<String> capturedClearScrollIds = getCapturedClearScrollIds();
assertThat(capturedClearScrollIds.size(), equalTo(2));
}
public void testResetScollUsesLastResultTimestamp() throws IOException {
@ -397,6 +408,9 @@ public class ScrollDataExtractorTests extends ESTestCase {
// A second failure is not tolerated
assertThat(extractor.hasNext(), is(true));
expectThrows(SearchPhaseExecutionException.class, extractor::next);
List<String> capturedClearScrollIds = getCapturedClearScrollIds();
assertThat(capturedClearScrollIds.size(), equalTo(2));
}
public void testSearchPhaseExecutionExceptionOnInitScroll() throws IOException {
@ -408,7 +422,9 @@ public class ScrollDataExtractorTests extends ESTestCase {
expectThrows(IOException.class, extractor::next);
List<String> capturedClearScrollIds = getCapturedClearScrollIds();
assertThat(capturedClearScrollIds.isEmpty(), is(true));
// We should clear the scroll context twice: once for the first search when we retry
// and once after the retry where we'll have an exception
assertThat(capturedClearScrollIds.size(), equalTo(2));
}
public void testDomainSplitScriptField() throws IOException {
@ -496,6 +512,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
private SearchResponse createErrorResponse() {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR);
when(searchResponse.getScrollId()).thenReturn(randomAlphaOfLength(1000));
return searchResponse;
}
@ -505,6 +522,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
when(searchResponse.getShardFailures()).thenReturn(
new ShardSearchFailure[] { new ShardSearchFailure(new RuntimeException("shard failed"))});
when(searchResponse.getFailedShards()).thenReturn(1);
when(searchResponse.getScrollId()).thenReturn(randomAlphaOfLength(1000));
return searchResponse;
}