Handle shard failures in extractors (elastic/elasticsearch#794)

Even though a search response may return a 200 status code, things could
still have gone wrong. A search response may report shard failures.

The datafeed extractors should check for that and report an extraction
error accordingly.

Closes elastic/elasticsearch#775

Original commit: elastic/x-pack-elasticsearch@5d6d899738
This commit is contained in:
Dimitris Athanasiou 2017-01-26 16:01:43 +00:00 committed by GitHub
parent efc47c2a6f
commit 5790a6f152
5 changed files with 102 additions and 14 deletions

View File

@ -5,15 +5,24 @@
*/
package org.elasticsearch.xpack.ml.datafeed.extractor;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Arrays;
/**
* Collects common utility methods needed by various {@link DataExtractor} implementations
*/
public final class ExtractorUtils {
private static final Logger LOGGER = Loggers.getLogger(ExtractorUtils.class);
private static final String EPOCH_MILLIS = "epoch_millis";
private ExtractorUtils() {}
@ -25,4 +34,23 @@ public final class ExtractorUtils {
QueryBuilder timeQuery = new RangeQueryBuilder(timeField).gte(start).lt(end).format(EPOCH_MILLIS);
return new BoolQueryBuilder().filter(userQuery).filter(timeQuery);
}
/**
* Checks that a {@link SearchResponse} has an OK status code and no shard failures
*/
public static void checkSearchWasSuccessful(String jobId, SearchResponse searchResponse) throws IOException {
if (searchResponse.status() != RestStatus.OK) {
throw new IOException("[" + jobId + "] Search request returned status code: " + searchResponse.status()
+ ". Response was:\n" + searchResponse.toString());
}
ShardSearchFailure[] shardFailures = searchResponse.getShardFailures();
if (shardFailures != null && shardFailures.length > 0) {
LOGGER.error("[{}] Search request returned shard failures: {}", jobId, Arrays.toString(shardFailures));
throw new IOException("[" + jobId + "] Search request returned shard failures; see more info in the logs");
}
int unavailableShards = searchResponse.getTotalShards() - searchResponse.getSuccessfulShards();
if (unavailableShards > 0) {
throw new IOException("[" + jobId + "] Search request encountered [" + unavailableShards + "] unavailable shards");
}
}
}

View File

@ -11,7 +11,6 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
@ -72,11 +71,9 @@ class AggregationDataExtractor implements DataExtractor {
}
private InputStream search() throws IOException {
LOGGER.debug("[{}] Executing aggregated search", context.jobId);
SearchResponse searchResponse = executeSearchRequest(buildSearchRequest());
if (searchResponse.status() != RestStatus.OK) {
throw new IOException("[" + context.jobId + "] Search request returned status code: " + searchResponse.status()
+ ". Response was:\n" + searchResponse.toString());
}
ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse);
return processSearchResponse(searchResponse);
}

View File

@ -14,7 +14,6 @@ import org.elasticsearch.action.search.SearchScrollAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
@ -82,11 +81,8 @@ class ScrollDataExtractor implements DataExtractor {
}
private InputStream initScroll() throws IOException {
LOGGER.debug("[{}] Initializing scroll", context.jobId);
SearchResponse searchResponse = executeSearchRequest(buildSearchRequest());
if (searchResponse.status() != RestStatus.OK) {
throw new IOException("[" + context.jobId + "] Search request returned status code: " + searchResponse.status()
+ ". Response was:\n" + searchResponse.toString());
}
return processSearchResponse(searchResponse);
}
@ -118,6 +114,7 @@ class ScrollDataExtractor implements DataExtractor {
}
private InputStream processSearchResponse(SearchResponse searchResponse) throws IOException {
ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse);
scrollId = searchResponse.getScrollId();
if (searchResponse.getHits().hits().length == 0) {
hasNext = false;
@ -146,11 +143,8 @@ class ScrollDataExtractor implements DataExtractor {
}
private InputStream continueScroll() throws IOException {
LOGGER.debug("[{}] Continuing scroll with id [{}]", context.jobId, scrollId);
SearchResponse searchResponse = executeSearchScrollRequest(scrollId);
if (searchResponse.status() != RestStatus.OK) {
throw new IOException("[" + context.jobId + "] Continue search scroll request with id '" + scrollId + "' returned status code: "
+ searchResponse.status() + ". Response was:\n" + searchResponse.toString());
}
return processSearchResponse(searchResponse);
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
@ -16,6 +17,7 @@ import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorTests;
import org.junit.Before;
import java.io.BufferedReader;
@ -137,6 +139,23 @@ public class AggregationDataExtractorTests extends ESTestCase {
expectThrows(IOException.class, () -> extractor.next());
}
public void testExtractionGivenSearchResponseHasShardFailures() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
extractor.setNextResponse(createResponseWithShardFailures());
assertThat(extractor.hasNext(), is(true));
IOException e = expectThrows(IOException.class, () -> extractor.next());
}
public void testExtractionGivenInitSearchResponseEncounteredUnavailableShards() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
extractor.setNextResponse(createResponseWithUnavailableShards(2));
assertThat(extractor.hasNext(), is(true));
IOException e = expectThrows(IOException.class, () -> extractor.next());
assertThat(e.getMessage(), equalTo("[" + jobId + "] Search request encountered [2] unavailable shards"));
}
private AggregationDataExtractorContext createContext(long start, long end) {
return new AggregationDataExtractorContext(jobId, timeField, indexes, types, query, aggs, start, end);
}
@ -162,6 +181,22 @@ public class AggregationDataExtractorTests extends ESTestCase {
return searchResponse;
}
private SearchResponse createResponseWithShardFailures() {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.OK);
when(searchResponse.getShardFailures()).thenReturn(
new ShardSearchFailure[] { new ShardSearchFailure(new RuntimeException("shard failed"))});
return searchResponse;
}
private SearchResponse createResponseWithUnavailableShards(int unavailableShards) {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.OK);
when(searchResponse.getSuccessfulShards()).thenReturn(3);
when(searchResponse.getTotalShards()).thenReturn(3 + unavailableShards);
return searchResponse;
}
private static String asString(InputStream inputStream) throws IOException {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
@ -249,6 +250,23 @@ public class ScrollDataExtractorTests extends ESTestCase {
expectThrows(IOException.class, () -> extractor.next());
}
public void testExtractionGivenInitSearchResponseHasShardFailures() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
extractor.setNextResponse(createResponseWithShardFailures());
assertThat(extractor.hasNext(), is(true));
expectThrows(IOException.class, () -> extractor.next());
}
public void testExtractionGivenInitSearchResponseEncounteredUnavailableShards() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
extractor.setNextResponse(createResponseWithUnavailableShards(1));
assertThat(extractor.hasNext(), is(true));
IOException e = expectThrows(IOException.class, () -> extractor.next());
assertThat(e.getMessage(), equalTo("[" + jobId + "] Search request encountered [1] unavailable shards"));
}
private ScrollDataExtractorContext createContext(long start, long end) {
return new ScrollDataExtractorContext(jobId, extractedFields, indexes, types, query, scriptFields, scrollSize, start, end);
}
@ -284,6 +302,22 @@ public class ScrollDataExtractorTests extends ESTestCase {
return searchResponse;
}
private SearchResponse createResponseWithShardFailures() {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.OK);
when(searchResponse.getShardFailures()).thenReturn(
new ShardSearchFailure[] { new ShardSearchFailure(new RuntimeException("shard failed"))});
return searchResponse;
}
private SearchResponse createResponseWithUnavailableShards(int unavailableShards) {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.OK);
when(searchResponse.getSuccessfulShards()).thenReturn(2);
when(searchResponse.getTotalShards()).thenReturn(2 + unavailableShards);
return searchResponse;
}
private static String asString(InputStream inputStream) throws IOException {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));