[ML] Retry after broken scroll (elastic/x-pack-elasticsearch#1713)

Original commit: elastic/x-pack-elasticsearch@b4fc329c52
This commit is contained in:
David Kyle 2017-06-14 15:04:14 +01:00 committed by GitHub
parent 4e085f03b7
commit 1010f73ae7
3 changed files with 101 additions and 12 deletions

View File

@ -65,7 +65,7 @@ public class DatafeedJobBuilder {
}
);
// Step 3. Create data extractory factory
// Step 3. Create data extractor factory
Consumer<DataCounts> dataCountsHandler = dataCounts -> {
if (dataCounts.getLatestRecordTimeStamp() != null) {
context.latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime();

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.script.Script;
@ -50,11 +51,14 @@ class ScrollDataExtractor implements DataExtractor {
private boolean isCancelled;
private boolean hasNext;
private Long timestampOnCancel;
private Long lastTimestamp;
private boolean searchHasShardFailure;
ScrollDataExtractor(Client client, ScrollDataExtractorContext dataExtractorContext) {
this.client = Objects.requireNonNull(client);
this.context = Objects.requireNonNull(dataExtractorContext);
this.hasNext = true;
context = Objects.requireNonNull(dataExtractorContext);
hasNext = true;
searchHasShardFailure = false;
}
@Override
@ -78,16 +82,17 @@ class ScrollDataExtractor implements DataExtractor {
if (!hasNext()) {
throw new NoSuchElementException();
}
Optional<InputStream> stream = scrollId == null ? Optional.ofNullable(initScroll()) : Optional.ofNullable(continueScroll());
Optional<InputStream> stream = scrollId == null ?
Optional.ofNullable(initScroll(context.start)) : Optional.ofNullable(continueScroll());
if (!stream.isPresent()) {
hasNext = false;
}
return stream;
}
private InputStream initScroll() throws IOException {
protected InputStream initScroll(long startTimestamp) throws IOException {
LOGGER.debug("[{}] Initializing scroll", context.jobId);
SearchResponse searchResponse = executeSearchRequest(buildSearchRequest());
SearchResponse searchResponse = executeSearchRequest(buildSearchRequest(startTimestamp));
return processSearchResponse(searchResponse);
}
@ -95,7 +100,7 @@ class ScrollDataExtractor implements DataExtractor {
return searchRequestBuilder.get();
}
private SearchRequestBuilder buildSearchRequest() {
private SearchRequestBuilder buildSearchRequest(long start) {
SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client)
.setScroll(SCROLL_TIMEOUT)
.addSort(context.extractedFields.timeField(), SortOrder.ASC)
@ -103,7 +108,7 @@ class ScrollDataExtractor implements DataExtractor {
.setTypes(context.types)
.setSize(context.scrollSize)
.setQuery(ExtractorUtils.wrapInTimeRangeQuery(
context.query, context.extractedFields.timeField(), context.start, context.end));
context.query, context.extractedFields.timeField(), start, context.end));
for (String docValueField : context.extractedFields.getDocValueFields()) {
searchRequestBuilder.addDocValueField(docValueField);
@ -136,6 +141,18 @@ class ScrollDataExtractor implements DataExtractor {
}
private InputStream processSearchResponse(SearchResponse searchResponse) throws IOException {
if (searchResponse.getFailedShards() > 0 && searchHasShardFailure == false) {
// This could be a transient error with the scroll Id.
// Reinitialise the scroll and try again but only once.
LOGGER.debug("[{}] Resetting scroll search after shard failure", context.jobId);
resetScroll();
if (lastTimestamp != null) {
lastTimestamp++;
}
searchHasShardFailure = true;
return initScroll(lastTimestamp == null ? context.start : lastTimestamp);
}
ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse);
scrollId = searchResponse.getScrollId();
if (searchResponse.getHits().getHits().length == 0) {
@ -161,6 +178,8 @@ class ScrollDataExtractor implements DataExtractor {
}
hitProcessor.process(hit);
}
SearchHit lastHit = searchResponse.getHits().getHits()[searchResponse.getHits().getHits().length -1];
lastTimestamp = context.extractedFields.timeFieldValue(lastHit);
}
return new ByteArrayInputStream(outputStream.toByteArray());
}
@ -178,6 +197,11 @@ class ScrollDataExtractor implements DataExtractor {
.get();
}
private void resetScroll() {
clearScroll(scrollId);
scrollId = null;
}
void clearScroll(String scrollId) {
ClearScrollAction.INSTANCE.newRequestBuilder(client).addScrollId(scrollId).get();
}

View File

@ -30,9 +30,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
@ -55,10 +57,11 @@ public class ScrollDataExtractorTests extends ESTestCase {
private QueryBuilder query;
private List<SearchSourceBuilder.ScriptField> scriptFields;
private int scrollSize;
private long initScrollStartTime;
private class TestDataExtractor extends ScrollDataExtractor {
private SearchResponse nextResponse;
private Queue<SearchResponse> responses = new LinkedList<>();
TestDataExtractor(long start, long end) {
this(createContext(start, end));
@ -68,16 +71,22 @@ public class ScrollDataExtractorTests extends ESTestCase {
super(client, context);
}
@Override
protected InputStream initScroll(long startTimestamp) throws IOException {
initScrollStartTime = startTimestamp;
return super.initScroll(startTimestamp);
}
@Override
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
capturedSearchRequests.add(searchRequestBuilder);
return nextResponse;
return responses.remove();
}
@Override
protected SearchResponse executeSearchScrollRequest(String scrollId) {
capturedContinueScrollIds.add(scrollId);
return nextResponse;
return responses.remove();
}
@Override
@ -86,7 +95,11 @@ public class ScrollDataExtractorTests extends ESTestCase {
}
void setNextResponse(SearchResponse searchResponse) {
nextResponse = searchResponse;
responses.add(searchResponse);
}
public long getInitScrollStartTime() {
return initScrollStartTime;
}
}
@ -263,6 +276,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
public void testExtractionGivenInitSearchResponseHasShardFailures() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
extractor.setNextResponse(createResponseWithShardFailures());
extractor.setNextResponse(createResponseWithShardFailures());
assertThat(extractor.hasNext(), is(true));
expectThrows(IOException.class, () -> extractor.next());
@ -271,12 +285,61 @@ public class ScrollDataExtractorTests extends ESTestCase {
public void testExtractionGivenInitSearchResponseEncounteredUnavailableShards() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
extractor.setNextResponse(createResponseWithUnavailableShards(1));
extractor.setNextResponse(createResponseWithUnavailableShards(1));
assertThat(extractor.hasNext(), is(true));
IOException e = expectThrows(IOException.class, () -> extractor.next());
assertThat(e.getMessage(), equalTo("[" + jobId + "] Search request encountered [1] unavailable shards"));
}
public void testResetScrollAfterFailure() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
SearchResponse goodResponse = createSearchResponse(
Arrays.asList(1100L, 1200L),
Arrays.asList("a1", "a2"),
Arrays.asList("b1", "b2")
);
extractor.setNextResponse(goodResponse);
extractor.setNextResponse(createResponseWithShardFailures());
extractor.setNextResponse(goodResponse);
extractor.setNextResponse(createResponseWithShardFailures());
// first response is good
assertThat(extractor.hasNext(), is(true));
Optional<InputStream> output = extractor.next();
assertThat(output.isPresent(), is(true));
// this should recover from the first shard failure and try again
assertThat(extractor.hasNext(), is(true));
output = extractor.next();
assertThat(output.isPresent(), is(true));
// A second failure is not tolerated
assertThat(extractor.hasNext(), is(true));
expectThrows(IOException.class, () -> extractor.next());
}
public void testResetScollUsesLastResultTimestamp() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
SearchResponse goodResponse = createSearchResponse(
Arrays.asList(1100L, 1200L),
Arrays.asList("a1", "a2"),
Arrays.asList("b1", "b2")
);
extractor.setNextResponse(goodResponse);
extractor.setNextResponse(createResponseWithShardFailures());
extractor.setNextResponse(createResponseWithShardFailures());
Optional<InputStream> output = extractor.next();
assertThat(output.isPresent(), is(true));
assertEquals(1000L, extractor.getInitScrollStartTime());
expectThrows(IOException.class, () -> extractor.next());
// the new start time after error is the last record timestamp +1
assertEquals(1201L, extractor.getInitScrollStartTime());
}
public void testDomainSplitScriptField() throws IOException {
SearchSourceBuilder.ScriptField withoutSplit = new SearchSourceBuilder.ScriptField(
@ -369,6 +432,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
when(searchResponse.status()).thenReturn(RestStatus.OK);
when(searchResponse.getShardFailures()).thenReturn(
new ShardSearchFailure[] { new ShardSearchFailure(new RuntimeException("shard failed"))});
when(searchResponse.getFailedShards()).thenReturn(1);
return searchResponse;
}
@ -377,6 +441,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
when(searchResponse.status()).thenReturn(RestStatus.OK);
when(searchResponse.getSuccessfulShards()).thenReturn(2);
when(searchResponse.getTotalShards()).thenReturn(2 + unavailableShards);
when(searchResponse.getFailedShards()).thenReturn(unavailableShards);
return searchResponse;
}