From 410b2107366282da6670c8175145df4177e5deca Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 21 Jun 2017 08:55:09 +0100 Subject: [PATCH] [ML] Retry after SearchPhaseExecutionException in ScrollDataExtractor (elastic/x-pack-elasticsearch#1788) Original commit: elastic/x-pack-elasticsearch@bbe287b9c3ddfb7b10d2a51c5f8db97c5e84724a --- .../extractor/scroll/ScrollDataExtractor.java | 36 ++++++++++---- .../scroll/ScrollDataExtractorTests.java | 48 ++++++++++++++++++- 2 files changed, 72 insertions(+), 12 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index 339b0ed01f4..ac286d976ae 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -8,11 +8,11 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.search.ClearScrollAction; import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.client.Client; -import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.script.Script; @@ -51,7 +51,7 @@ class ScrollDataExtractor implements DataExtractor { private boolean isCancelled; private boolean hasNext; private Long timestampOnCancel; - private Long lastTimestamp; + protected Long lastTimestamp; private boolean searchHasShardFailure; ScrollDataExtractor(Client client, ScrollDataExtractorContext dataExtractorContext) { @@ -141,15 +141,10 @@ class ScrollDataExtractor implements DataExtractor { } private InputStream processSearchResponse(SearchResponse searchResponse) throws IOException { + if (searchResponse.getFailedShards() > 0 && searchHasShardFailure == false) { - // This could be a transient error with the scroll Id. - // Reinitialise the scroll and try again but only once. LOGGER.debug("[{}] Resetting scroll search after shard failure", context.jobId); - resetScroll(); - if (lastTimestamp != null) { - lastTimestamp++; - } - searchHasShardFailure = true; + markScrollAsErrored(); return initScroll(lastTimestamp == null ? context.start : lastTimestamp); } @@ -186,10 +181,31 @@ class ScrollDataExtractor implements DataExtractor { private InputStream continueScroll() throws IOException { LOGGER.debug("[{}] Continuing scroll with id [{}]", context.jobId, scrollId); - SearchResponse searchResponse = executeSearchScrollRequest(scrollId); + SearchResponse searchResponse = null; + try { + searchResponse = executeSearchScrollRequest(scrollId); + } catch (SearchPhaseExecutionException searchExecutionException) { + if (searchHasShardFailure == false) { + LOGGER.debug("[{}] Reinitializing scroll due to SearchPhaseExecutionException", context.jobId); + markScrollAsErrored(); + searchResponse = executeSearchRequest(buildSearchRequest(lastTimestamp == null ? context.start : lastTimestamp)); + } else { + throw searchExecutionException; + } + } return processSearchResponse(searchResponse); } + private void markScrollAsErrored() { + // This could be a transient error with the scroll Id. + // Reinitialise the scroll and try again but only once. + resetScroll(); + if (lastTimestamp != null) { + lastTimestamp++; + } + searchHasShardFailure = true; + } + protected SearchResponse executeSearchScrollRequest(String scrollId) { return SearchScrollAction.INSTANCE.newRequestBuilder(client) .setScroll(SCROLL_TIMEOUT) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java index 6fbb630e0e3..389dff72d90 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; +import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; @@ -86,7 +87,12 @@ public class ScrollDataExtractorTests extends ESTestCase { @Override protected SearchResponse executeSearchScrollRequest(String scrollId) { capturedContinueScrollIds.add(scrollId); - return responses.remove(); + SearchResponse searchResponse = responses.remove(); + if (searchResponse == null) { + throw new SearchPhaseExecutionException("foo", "bar", new ShardSearchFailure[] {}); + } else { + return searchResponse; + } } @Override @@ -101,6 +107,10 @@ public class ScrollDataExtractorTests extends ESTestCase { public long getInitScrollStartTime() { return initScrollStartTime; } + + public Long getLastTimestamp() { + return lastTimestamp; + } } @Before @@ -292,7 +302,7 @@ public class ScrollDataExtractorTests extends ESTestCase { assertThat(e.getMessage(), equalTo("[" + jobId + "] Search request encountered [1] unavailable shards")); } - public void testResetScrollAfterFailure() throws IOException { + public void testResetScrollAfterShardFailure() throws IOException { TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L); SearchResponse goodResponse = createSearchResponse( @@ -340,6 +350,40 @@ public class ScrollDataExtractorTests extends ESTestCase { assertEquals(1201L, extractor.getInitScrollStartTime()); } + public void testResetScrollAfterSearchPhaseExecutionException() throws IOException { + TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L); + SearchResponse firstResponse = createSearchResponse( + Arrays.asList(1100L, 1200L), + Arrays.asList("a1", "a2"), + Arrays.asList("b1", "b2") + ); + + SearchResponse secondResponse = createSearchResponse( + Arrays.asList(1300L, 1400L), + Arrays.asList("a1", "a2"), + Arrays.asList("b1", "b2") + ); + + extractor.setNextResponse(firstResponse); + extractor.setNextResponse(null); // this will throw a SearchPhaseExecutionException + extractor.setNextResponse(secondResponse); + extractor.setNextResponse(null); // this will throw a SearchPhaseExecutionException + + + // first response is good + assertThat(extractor.hasNext(), is(true)); + Optional output = extractor.next(); + assertThat(output.isPresent(), is(true)); + // this should recover from the SearchPhaseExecutionException and try again + assertThat(extractor.hasNext(), is(true)); + output = extractor.next(); + assertThat(output.isPresent(), is(true)); + assertEquals(new Long(1400L), extractor.getLastTimestamp()); + // A second failure is not tolerated + assertThat(extractor.hasNext(), is(true)); + expectThrows(SearchPhaseExecutionException.class, () -> extractor.next()); + } + public void testDomainSplitScriptField() throws IOException { SearchSourceBuilder.ScriptField withoutSplit = new SearchSourceBuilder.ScriptField(