From 1010f73ae7dd0da068be5bee45420662fe3094fc Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 14 Jun 2017 15:04:14 +0100 Subject: [PATCH] [ML] Retry after broken scroll (elastic/x-pack-elasticsearch#1713) Original commit: elastic/x-pack-elasticsearch@b4fc329c52c57bdf6f01b55b52468b05bc1fc462 --- .../xpack/ml/datafeed/DatafeedJobBuilder.java | 2 +- .../extractor/scroll/ScrollDataExtractor.java | 38 ++++++++-- .../scroll/ScrollDataExtractorTests.java | 73 ++++++++++++++++++- 3 files changed, 101 insertions(+), 12 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index 56e24a0bd57..3346ebdd7ab 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -65,7 +65,7 @@ public class DatafeedJobBuilder { } ); - // Step 3. Create data extractory factory + // Step 3. Create data extractor factory Consumer dataCountsHandler = dataCounts -> { if (dataCounts.getLatestRecordTimeStamp() != null) { context.latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index 08631ed18d0..89cdcc59c7e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.script.Script; @@ -50,11 +51,14 @@ class ScrollDataExtractor implements DataExtractor { private boolean isCancelled; private boolean hasNext; private Long timestampOnCancel; + private Long lastTimestamp; + private boolean searchHasShardFailure; ScrollDataExtractor(Client client, ScrollDataExtractorContext dataExtractorContext) { this.client = Objects.requireNonNull(client); - this.context = Objects.requireNonNull(dataExtractorContext); - this.hasNext = true; + context = Objects.requireNonNull(dataExtractorContext); + hasNext = true; + searchHasShardFailure = false; } @Override @@ -78,16 +82,17 @@ class ScrollDataExtractor implements DataExtractor { if (!hasNext()) { throw new NoSuchElementException(); } - Optional stream = scrollId == null ? Optional.ofNullable(initScroll()) : Optional.ofNullable(continueScroll()); + Optional stream = scrollId == null ? + Optional.ofNullable(initScroll(context.start)) : Optional.ofNullable(continueScroll()); if (!stream.isPresent()) { hasNext = false; } return stream; } - private InputStream initScroll() throws IOException { + protected InputStream initScroll(long startTimestamp) throws IOException { LOGGER.debug("[{}] Initializing scroll", context.jobId); - SearchResponse searchResponse = executeSearchRequest(buildSearchRequest()); + SearchResponse searchResponse = executeSearchRequest(buildSearchRequest(startTimestamp)); return processSearchResponse(searchResponse); } @@ -95,7 +100,7 @@ class ScrollDataExtractor implements DataExtractor { return searchRequestBuilder.get(); } - private SearchRequestBuilder buildSearchRequest() { + private SearchRequestBuilder buildSearchRequest(long start) { SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client) .setScroll(SCROLL_TIMEOUT) .addSort(context.extractedFields.timeField(), SortOrder.ASC) @@ -103,7 +108,7 @@ class ScrollDataExtractor implements DataExtractor { .setTypes(context.types) .setSize(context.scrollSize) .setQuery(ExtractorUtils.wrapInTimeRangeQuery( - context.query, context.extractedFields.timeField(), context.start, context.end)); + context.query, context.extractedFields.timeField(), start, context.end)); for (String docValueField : context.extractedFields.getDocValueFields()) { searchRequestBuilder.addDocValueField(docValueField); @@ -136,6 +141,18 @@ class ScrollDataExtractor implements DataExtractor { } private InputStream processSearchResponse(SearchResponse searchResponse) throws IOException { + if (searchResponse.getFailedShards() > 0 && searchHasShardFailure == false) { + // This could be a transient error with the scroll Id. + // Reinitialise the scroll and try again but only once. + LOGGER.debug("[{}] Resetting scroll search after shard failure", context.jobId); + resetScroll(); + if (lastTimestamp != null) { + lastTimestamp++; + } + searchHasShardFailure = true; + return initScroll(lastTimestamp == null ? context.start : lastTimestamp); + } + ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse); scrollId = searchResponse.getScrollId(); if (searchResponse.getHits().getHits().length == 0) { @@ -161,6 +178,8 @@ class ScrollDataExtractor implements DataExtractor { } hitProcessor.process(hit); } + SearchHit lastHit = searchResponse.getHits().getHits()[searchResponse.getHits().getHits().length -1]; + lastTimestamp = context.extractedFields.timeFieldValue(lastHit); } return new ByteArrayInputStream(outputStream.toByteArray()); } @@ -178,6 +197,11 @@ class ScrollDataExtractor implements DataExtractor { .get(); } + private void resetScroll() { + clearScroll(scrollId); + scrollId = null; + } + void clearScroll(String scrollId) { ClearScrollAction.INSTANCE.newRequestBuilder(client).addScrollId(scrollId).get(); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java index 8b9e384370c..6fbb630e0e3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java @@ -30,9 +30,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; @@ -55,10 +57,11 @@ public class ScrollDataExtractorTests extends ESTestCase { private QueryBuilder query; private List scriptFields; private int scrollSize; + private long initScrollStartTime; private class TestDataExtractor extends ScrollDataExtractor { - private SearchResponse nextResponse; + private Queue responses = new LinkedList<>(); TestDataExtractor(long start, long end) { this(createContext(start, end)); @@ -68,16 +71,22 @@ public class ScrollDataExtractorTests extends ESTestCase { super(client, context); } + @Override + protected InputStream initScroll(long startTimestamp) throws IOException { + initScrollStartTime = startTimestamp; + return super.initScroll(startTimestamp); + } + @Override protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { capturedSearchRequests.add(searchRequestBuilder); - return nextResponse; + return responses.remove(); } @Override protected SearchResponse executeSearchScrollRequest(String scrollId) { capturedContinueScrollIds.add(scrollId); - return nextResponse; + return responses.remove(); } @Override @@ -86,7 +95,11 @@ public class ScrollDataExtractorTests extends ESTestCase { } void setNextResponse(SearchResponse searchResponse) { - nextResponse = searchResponse; + responses.add(searchResponse); + } + + public long getInitScrollStartTime() { + return initScrollStartTime; } } @@ -263,6 +276,7 @@ public class ScrollDataExtractorTests extends ESTestCase { public void testExtractionGivenInitSearchResponseHasShardFailures() throws IOException { TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L); extractor.setNextResponse(createResponseWithShardFailures()); + extractor.setNextResponse(createResponseWithShardFailures()); assertThat(extractor.hasNext(), is(true)); expectThrows(IOException.class, () -> extractor.next()); @@ -271,12 +285,61 @@ public class ScrollDataExtractorTests extends ESTestCase { public void testExtractionGivenInitSearchResponseEncounteredUnavailableShards() throws IOException { TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L); extractor.setNextResponse(createResponseWithUnavailableShards(1)); + extractor.setNextResponse(createResponseWithUnavailableShards(1)); assertThat(extractor.hasNext(), is(true)); IOException e = expectThrows(IOException.class, () -> extractor.next()); assertThat(e.getMessage(), equalTo("[" + jobId + "] Search request encountered [1] unavailable shards")); } + public void testResetScrollAfterFailure() throws IOException { + TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L); + + SearchResponse goodResponse = createSearchResponse( + Arrays.asList(1100L, 1200L), + Arrays.asList("a1", "a2"), + Arrays.asList("b1", "b2") + ); + extractor.setNextResponse(goodResponse); + extractor.setNextResponse(createResponseWithShardFailures()); + extractor.setNextResponse(goodResponse); + extractor.setNextResponse(createResponseWithShardFailures()); + + // first response is good + assertThat(extractor.hasNext(), is(true)); + Optional output = extractor.next(); + assertThat(output.isPresent(), is(true)); + // this should recover from the first shard failure and try again + assertThat(extractor.hasNext(), is(true)); + output = extractor.next(); + assertThat(output.isPresent(), is(true)); + // A second failure is not tolerated + assertThat(extractor.hasNext(), is(true)); + expectThrows(IOException.class, () -> extractor.next()); + } + + public void testResetScollUsesLastResultTimestamp() throws IOException { + TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L); + + SearchResponse goodResponse = createSearchResponse( + Arrays.asList(1100L, 1200L), + Arrays.asList("a1", "a2"), + Arrays.asList("b1", "b2") + ); + + extractor.setNextResponse(goodResponse); + extractor.setNextResponse(createResponseWithShardFailures()); + extractor.setNextResponse(createResponseWithShardFailures()); + + Optional output = extractor.next(); + assertThat(output.isPresent(), is(true)); + assertEquals(1000L, extractor.getInitScrollStartTime()); + + expectThrows(IOException.class, () -> extractor.next()); + // the new start time after error is the last record timestamp +1 + assertEquals(1201L, extractor.getInitScrollStartTime()); + } + public void testDomainSplitScriptField() throws IOException { SearchSourceBuilder.ScriptField withoutSplit = new SearchSourceBuilder.ScriptField( @@ -369,6 +432,7 @@ public class ScrollDataExtractorTests extends ESTestCase { when(searchResponse.status()).thenReturn(RestStatus.OK); when(searchResponse.getShardFailures()).thenReturn( new ShardSearchFailure[] { new ShardSearchFailure(new RuntimeException("shard failed"))}); + when(searchResponse.getFailedShards()).thenReturn(1); return searchResponse; } @@ -377,6 +441,7 @@ public class ScrollDataExtractorTests extends ESTestCase { when(searchResponse.status()).thenReturn(RestStatus.OK); when(searchResponse.getSuccessfulShards()).thenReturn(2); when(searchResponse.getTotalShards()).thenReturn(2 + unavailableShards); + when(searchResponse.getFailedShards()).thenReturn(unavailableShards); return searchResponse; }