[ML] Retry after SearchPhaseExecutionException in ScrollDataExtractor (elastic/x-pack-elasticsearch#1788)

Original commit: elastic/x-pack-elasticsearch@bbe287b9c3
This commit is contained in:
David Kyle 2017-06-21 08:55:09 +01:00 committed by GitHub
parent 8103a1bf8b
commit 410b210736
2 changed files with 72 additions and 12 deletions

View File

@ -8,11 +8,11 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.ClearScrollAction;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.script.Script;
@ -51,7 +51,7 @@ class ScrollDataExtractor implements DataExtractor {
private boolean isCancelled;
private boolean hasNext;
private Long timestampOnCancel;
private Long lastTimestamp;
protected Long lastTimestamp;
private boolean searchHasShardFailure;
ScrollDataExtractor(Client client, ScrollDataExtractorContext dataExtractorContext) {
@ -141,15 +141,10 @@ class ScrollDataExtractor implements DataExtractor {
}
private InputStream processSearchResponse(SearchResponse searchResponse) throws IOException {
if (searchResponse.getFailedShards() > 0 && searchHasShardFailure == false) {
// This could be a transient error with the scroll Id.
// Reinitialise the scroll and try again but only once.
LOGGER.debug("[{}] Resetting scroll search after shard failure", context.jobId);
resetScroll();
if (lastTimestamp != null) {
lastTimestamp++;
}
searchHasShardFailure = true;
markScrollAsErrored();
return initScroll(lastTimestamp == null ? context.start : lastTimestamp);
}
@ -186,10 +181,31 @@ class ScrollDataExtractor implements DataExtractor {
private InputStream continueScroll() throws IOException {
LOGGER.debug("[{}] Continuing scroll with id [{}]", context.jobId, scrollId);
SearchResponse searchResponse = executeSearchScrollRequest(scrollId);
SearchResponse searchResponse = null;
try {
searchResponse = executeSearchScrollRequest(scrollId);
} catch (SearchPhaseExecutionException searchExecutionException) {
if (searchHasShardFailure == false) {
LOGGER.debug("[{}] Reinitializing scroll due to SearchPhaseExecutionException", context.jobId);
markScrollAsErrored();
searchResponse = executeSearchRequest(buildSearchRequest(lastTimestamp == null ? context.start : lastTimestamp));
} else {
throw searchExecutionException;
}
}
return processSearchResponse(searchResponse);
}
private void markScrollAsErrored() {
// This could be a transient error with the scroll Id.
// Reinitialise the scroll and try again but only once.
resetScroll();
if (lastTimestamp != null) {
lastTimestamp++;
}
searchHasShardFailure = true;
}
protected SearchResponse executeSearchScrollRequest(String scrollId) {
return SearchScrollAction.INSTANCE.newRequestBuilder(client)
.setScroll(SCROLL_TIMEOUT)

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
@ -86,7 +87,12 @@ public class ScrollDataExtractorTests extends ESTestCase {
@Override
protected SearchResponse executeSearchScrollRequest(String scrollId) {
capturedContinueScrollIds.add(scrollId);
return responses.remove();
SearchResponse searchResponse = responses.remove();
if (searchResponse == null) {
throw new SearchPhaseExecutionException("foo", "bar", new ShardSearchFailure[] {});
} else {
return searchResponse;
}
}
@Override
@ -101,6 +107,10 @@ public class ScrollDataExtractorTests extends ESTestCase {
public long getInitScrollStartTime() {
return initScrollStartTime;
}
public Long getLastTimestamp() {
return lastTimestamp;
}
}
@Before
@ -292,7 +302,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
assertThat(e.getMessage(), equalTo("[" + jobId + "] Search request encountered [1] unavailable shards"));
}
public void testResetScrollAfterFailure() throws IOException {
public void testResetScrollAfterShardFailure() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
SearchResponse goodResponse = createSearchResponse(
@ -340,6 +350,40 @@ public class ScrollDataExtractorTests extends ESTestCase {
assertEquals(1201L, extractor.getInitScrollStartTime());
}
public void testResetScrollAfterSearchPhaseExecutionException() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
SearchResponse firstResponse = createSearchResponse(
Arrays.asList(1100L, 1200L),
Arrays.asList("a1", "a2"),
Arrays.asList("b1", "b2")
);
SearchResponse secondResponse = createSearchResponse(
Arrays.asList(1300L, 1400L),
Arrays.asList("a1", "a2"),
Arrays.asList("b1", "b2")
);
extractor.setNextResponse(firstResponse);
extractor.setNextResponse(null); // this will throw a SearchPhaseExecutionException
extractor.setNextResponse(secondResponse);
extractor.setNextResponse(null); // this will throw a SearchPhaseExecutionException
// first response is good
assertThat(extractor.hasNext(), is(true));
Optional<InputStream> output = extractor.next();
assertThat(output.isPresent(), is(true));
// this should recover from the SearchPhaseExecutionException and try again
assertThat(extractor.hasNext(), is(true));
output = extractor.next();
assertThat(output.isPresent(), is(true));
assertEquals(new Long(1400L), extractor.getLastTimestamp());
// A second failure is not tolerated
assertThat(extractor.hasNext(), is(true));
expectThrows(SearchPhaseExecutionException.class, () -> extractor.next());
}
public void testDomainSplitScriptField() throws IOException {
SearchSourceBuilder.ScriptField withoutSplit = new SearchSourceBuilder.ScriptField(