Rollup/DataFrame: disallow partial results (#41114)

disallow partial results in rollup and data frame, after this change the client throws an error directly
replacing the previous runtime exception thrown, allowing better error handling in implementations.
This commit is contained in:
Hendrik Muhs 2019-04-12 07:29:55 +02:00
parent e9999dfa1d
commit 3df6798c4c
3 changed files with 26 additions and 18 deletions

View File

@ -14,7 +14,6 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
@ -150,8 +149,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
// fire off the search. Note this is async, the method will return from here
executor.execute(() -> {
onStart(now, ActionListener.wrap(r -> {
stats.markStartSearch();
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
}, e -> {
finishAndSetState();
onFailure(e);
@ -305,10 +303,9 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
if (checkState(getState()) == false) {
return;
}
if (searchResponse.getShardFailures().length != 0) {
throw new RuntimeException("Shard failures encountered while running indexer for job [" + getJobId() + "]: "
+ Arrays.toString(searchResponse.getShardFailures()));
}
// allowPartialSearchResults is set to false, so we should never see shard failures here
assert (searchResponse.getShardFailures().length == 0);
stats.incrementNumPages(1);
IterationResult<JobPosition> iterationResult = doProcess(searchResponse);
@ -362,18 +359,23 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
// TODO probably something more intelligent than every-50 is needed
if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) {
doSaveState(IndexerState.INDEXING, position, () -> {
stats.markStartSearch();
doNextSearch(buildSearchRequest(), listener);
nextSearch(listener);
});
} else {
stats.markStartSearch();
doNextSearch(buildSearchRequest(), listener);
nextSearch(listener);
}
} catch (Exception e) {
finishWithIndexingFailure(e);
}
}
private void nextSearch(ActionListener<SearchResponse> listener) {
stats.markStartSearch();
// ensure that partial results are not accepted and cause a search failure
SearchRequest searchRequest = buildSearchRequest().allowPartialSearchResults(false);
doNextSearch(searchRequest, listener);
}
/**
* Checks the {@link IndexerState} and returns false if the execution should be
* stopped.

View File

@ -72,7 +72,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
protected SearchRequest buildSearchRequest() {
assertThat(step, equalTo(1));
++step;
return null;
return new SearchRequest();
}
@Override
@ -151,7 +151,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
protected SearchRequest buildSearchRequest() {
assertThat(step, equalTo(1));
++step;
return null;
return new SearchRequest();
}
@Override

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponseSections;
@ -197,7 +198,13 @@ public class RollupIndexerStateTests extends ESTestCase {
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
nextPhase.onResponse(searchFunction.apply(request));
try {
SearchResponse response = searchFunction.apply(request);
nextPhase.onResponse(response);
} catch (Exception e) {
nextPhase.onFailure(e);
}
}
@Override
@ -800,15 +807,14 @@ public class RollupIndexerStateTests extends ESTestCase {
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
ShardSearchFailure[] failures = new ShardSearchFailure[]{new ShardSearchFailure(new RuntimeException("failed"))};
return new SearchResponse(null, null, 1, 1, 0, 0,
failures, null);
throw new SearchPhaseExecutionException("query", "Partial shards failure",
new ShardSearchFailure[] { new ShardSearchFailure(new RuntimeException("failed")) });
};
Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
Consumer<Exception> failureConsumer = e -> {
assertThat(e.getMessage(), startsWith("Shard failures encountered while running indexer for job"));
assertThat(e.getMessage(), startsWith("Partial shards failure"));
isFinished.set(true);
};