Cleanup RollupSearch exceptions, disallow partial results (#41272)

- msearch exceptions should be thrown directly instead of wrapping
in a RuntimeException
- Do not allow partial results (where some indices are missing), 
instead throw an exception if any index is missing
This commit is contained in:
Zachary Tong 2019-05-08 12:37:43 -04:00 committed by Zachary Tong
parent 374ce3e6a8
commit f410f91f13
5 changed files with 129 additions and 73 deletions

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.rollup;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
@ -61,9 +62,9 @@ public class RollupResponseTranslator {
* Verifies a live-only search response. Essentially just checks for failure then returns
* the response since we have no work to do
*/
public static SearchResponse verifyResponse(MultiSearchResponse.Item normalResponse) {
public static SearchResponse verifyResponse(MultiSearchResponse.Item normalResponse) throws Exception {
if (normalResponse.isFailure()) {
throw new RuntimeException(normalResponse.getFailureMessage(), normalResponse.getFailure());
throw normalResponse.getFailure();
}
return normalResponse.getResponse();
}
@ -77,16 +78,30 @@ public class RollupResponseTranslator {
* on the translation conventions
*/
public static SearchResponse translateResponse(MultiSearchResponse.Item[] rolledMsearch,
InternalAggregation.ReduceContext reduceContext) {
InternalAggregation.ReduceContext reduceContext) throws Exception {
List<SearchResponse> responses = Arrays.stream(rolledMsearch)
.map(item -> {
if (item.isFailure()) {
throw new RuntimeException(item.getFailureMessage(), item.getFailure());
}
return item.getResponse();
}).collect(Collectors.toList());
assert rolledMsearch.length > 0;
List<SearchResponse> responses = new ArrayList<>();
for (MultiSearchResponse.Item item : rolledMsearch) {
if (item.isFailure()) {
Exception e = item.getFailure();
// If an index was deleted after execution, give a hint to the user that this is a transient error
if (e instanceof IndexNotFoundException) {
throw new ResourceNotFoundException("Index [" + ((IndexNotFoundException) e).getIndex().getName()
+ "] was not found, likely because it was deleted while the request was in-flight. " +
"Rollup does not support partial search results, please try the request again.");
}
// Otherwise just throw
throw e;
}
// No error, add to responses
responses.add(item.getResponse());
}
assert responses.size() > 0;
return doCombineResponse(null, responses, reduceContext);
}
@ -187,48 +202,45 @@ public class RollupResponseTranslator {
* @param msearchResponses The responses from the msearch, where the first response is the live-index response
*/
public static SearchResponse combineResponses(MultiSearchResponse.Item[] msearchResponses,
InternalAggregation.ReduceContext reduceContext) {
boolean liveMissing = false;
InternalAggregation.ReduceContext reduceContext) throws Exception {
assert msearchResponses.length >= 2;
// The live response is always first
MultiSearchResponse.Item liveResponse = msearchResponses[0];
if (liveResponse.isFailure()) {
Exception e = liveResponse.getFailure();
// If we have a rollup response we can tolerate a missing live response
if (e instanceof IndexNotFoundException) {
logger.warn("\"Live\" index not found during rollup search.", e);
liveMissing = true;
} else {
throw new RuntimeException(liveResponse.getFailureMessage(), liveResponse.getFailure());
boolean first = true;
SearchResponse liveResponse = null;
List<SearchResponse> rolledResponses = new ArrayList<>();
for (MultiSearchResponse.Item item : msearchResponses) {
if (item.isFailure()) {
Exception e = item.getFailure();
// If an index was deleted after execution, give a hint to the user that this is a transient error
if (e instanceof IndexNotFoundException) {
throw new ResourceNotFoundException("Index [" + ((IndexNotFoundException) e).getIndex() + "] was not found, " +
"likely because it was deleted while the request was in-flight. Rollup does not support partial search results, " +
"please try the request again.", e);
}
// Otherwise just throw
throw e;
}
}
List<SearchResponse> rolledResponses = Arrays.stream(msearchResponses)
.skip(1)
.map(item -> {
if (item.isFailure()) {
Exception e = item.getFailure();
// If we have a normal response we can tolerate a missing rollup response, although it theoretically
// should be handled by a different code path (verifyResponse)
if (e instanceof IndexNotFoundException) {
logger.warn("Rollup index not found during rollup search.", e);
} else {
throw new RuntimeException(item.getFailureMessage(), item.getFailure());
}
return null;
} else {
return item.getResponse();
}
}).filter(Objects::nonNull).collect(Collectors.toList());
// If we only have a live index left, process it directly
if (rolledResponses.isEmpty() && liveMissing == false) {
return verifyResponse(liveResponse);
} else if (rolledResponses.isEmpty() && liveMissing) {
throw new RuntimeException("No indices (live or rollup) found during rollup search");
// No error, add to responses
if (first) {
liveResponse = item.getResponse();
} else {
rolledResponses.add(item.getResponse());
}
first = false;
}
return doCombineResponse(liveResponse.getResponse(), rolledResponses, reduceContext);
// If we only have a live index left, just return it directly. We know it can't be an error already
if (rolledResponses.isEmpty() && liveResponse != null) {
return liveResponse;
} else if (rolledResponses.isEmpty()) {
throw new ResourceNotFoundException("No indices (live or rollup) found during rollup search");
}
return doCombineResponse(liveResponse, rolledResponses, reduceContext);
}
private static SearchResponse doCombineResponse(SearchResponse liveResponse, List<SearchResponse> rolledResponses,

View File

@ -111,7 +111,7 @@ public class TransportRollupSearchAction extends TransportAction<SearchRequest,
}
static SearchResponse processResponses(RollupSearchContext rollupContext, MultiSearchResponse msearchResponse,
InternalAggregation.ReduceContext reduceContext) {
InternalAggregation.ReduceContext reduceContext) throws Exception {
if (rollupContext.hasLiveIndices() && rollupContext.hasRollupIndices()) {
// Both
return RollupResponseTranslator.combineResponses(msearchResponse.getResponses(), reduceContext);

View File

@ -22,6 +22,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.CheckedConsumer;
@ -109,14 +110,13 @@ public class RollupResponseTranslationTests extends AggregatorTestCase {
public void testRollupFailure() {
MultiSearchResponse.Item[] failure = new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new IndexNotFoundException("live missing")),
new MultiSearchResponse.Item(null, new RuntimeException("rollup failure"))};
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
ScriptService scriptService = mock(ScriptService.class);
Exception e = expectThrows(RuntimeException.class,
() -> RollupResponseTranslator.combineResponses(failure,
() -> RollupResponseTranslator.translateResponse(failure,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true)));
assertThat(e.getMessage(), equalTo("rollup failure"));
}
@ -129,13 +129,14 @@ public class RollupResponseTranslationTests extends AggregatorTestCase {
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
ScriptService scriptService = mock(ScriptService.class);
Exception e = expectThrows(RuntimeException.class,
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class,
() -> RollupResponseTranslator.combineResponses(failure,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true)));
assertThat(e.getMessage(), equalTo("No indices (live or rollup) found during rollup search"));
assertThat(e.getMessage(), equalTo("Index [[foo]] was not found, likely because it was deleted while the request was in-flight. " +
"Rollup does not support partial search results, please try the request again."));
}
public void testMissingLiveIndex() {
public void testMissingLiveIndex() throws Exception {
SearchResponse responseWithout = mock(SearchResponse.class);
when(responseWithout.getTook()).thenReturn(new TimeValue(100));
List<InternalAggregation> aggTree = new ArrayList<>(1);
@ -174,16 +175,13 @@ public class RollupResponseTranslationTests extends AggregatorTestCase {
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
ScriptService scriptService = mock(ScriptService.class);
SearchResponse response = RollupResponseTranslator.combineResponses(msearch,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true));
assertNotNull(response);
Aggregations responseAggs = response.getAggregations();
assertNotNull(responseAggs);
Avg avg = responseAggs.get("foo");
assertThat(avg.getValue(), equalTo(5.0));
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, () -> RollupResponseTranslator.combineResponses(msearch,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true)));
assertThat(e.getMessage(), equalTo("Index [[foo]] was not found, likely because it was deleted while the request was in-flight. " +
"Rollup does not support partial search results, please try the request again."));
}
public void testRolledMissingAggs() {
public void testRolledMissingAggs() throws Exception {
SearchResponse responseWithout = mock(SearchResponse.class);
when(responseWithout.getTook()).thenReturn(new TimeValue(100));
@ -191,13 +189,12 @@ public class RollupResponseTranslationTests extends AggregatorTestCase {
when(responseWithout.getAggregations()).thenReturn(mockAggsWithout);
MultiSearchResponse.Item[] msearch = new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new IndexNotFoundException("foo")),
new MultiSearchResponse.Item(responseWithout, null)};
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
ScriptService scriptService = mock(ScriptService.class);
SearchResponse response = RollupResponseTranslator.combineResponses(msearch,
SearchResponse response = RollupResponseTranslator.translateResponse(msearch,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true));
assertNotNull(response);
Aggregations responseAggs = response.getAggregations();
@ -214,12 +211,13 @@ public class RollupResponseTranslationTests extends AggregatorTestCase {
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
ScriptService scriptService = mock(ScriptService.class);
SearchResponse finalResponse = RollupResponseTranslator.combineResponses(msearch,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true));
assertThat(finalResponse, equalTo(response));
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, () -> RollupResponseTranslator.combineResponses(msearch,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true)));
assertThat(e.getMessage(), equalTo("Index [[foo]] was not found, likely because it was deleted while the request was in-flight. " +
"Rollup does not support partial search results, please try the request again."));
}
public void testVerifyNormal() {
public void testVerifyNormal() throws Exception {
SearchResponse response = mock(SearchResponse.class);
MultiSearchResponse.Item item = new MultiSearchResponse.Item(response, null);
@ -234,7 +232,7 @@ public class RollupResponseTranslationTests extends AggregatorTestCase {
assertThat(e.getMessage(), equalTo("no such index [foo]"));
}
public void testTranslateRollup() {
public void testTranslateRollup() throws Exception {
SearchResponse response = mock(SearchResponse.class);
when(response.getTook()).thenReturn(new TimeValue(100));
List<InternalAggregation> aggTree = new ArrayList<>(1);
@ -285,9 +283,10 @@ public class RollupResponseTranslationTests extends AggregatorTestCase {
ScriptService scriptService = mock(ScriptService.class);
InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(bigArrays, scriptService, true);
Exception e = expectThrows(RuntimeException.class,
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class,
() -> RollupResponseTranslator.translateResponse(new MultiSearchResponse.Item[]{missing}, context));
assertThat(e.getMessage(), equalTo("no such index [foo]"));
assertThat(e.getMessage(), equalTo("Index [foo] was not found, likely because it was deleted while the request was in-flight. " +
"Rollup does not support partial search results, please try the request again."));
}
public void testMissingFilter() {
@ -350,7 +349,7 @@ public class RollupResponseTranslationTests extends AggregatorTestCase {
equalTo("Expected [filter_foo] to be a FilterAggregation, but was [InternalMax]"));
}
public void testSimpleReduction() {
public void testSimpleReduction() throws Exception {
SearchResponse protoResponse = mock(SearchResponse.class);
when(protoResponse.getTook()).thenReturn(new TimeValue(100));
List<InternalAggregation> protoAggTree = new ArrayList<>(1);

View File

@ -584,7 +584,7 @@ public class SearchActionTests extends ESTestCase {
assertThat(result.getJobCaps().size(), equalTo(1));
}
public void testLiveOnlyProcess() {
public void testLiveOnlyProcess() throws Exception {
String[] indices = new String[]{"foo"};
IndexMetaData indexMetaData = mock(IndexMetaData.class);
ImmutableOpenMap.Builder<String, IndexMetaData> meta = ImmutableOpenMap.builder(1);
@ -601,7 +601,7 @@ public class SearchActionTests extends ESTestCase {
assertThat(r, equalTo(response));
}
public void testRollupOnly() throws IOException {
public void testRollupOnly() throws Exception {
String[] indices = new String[]{"foo"};
String jobName = randomAlphaOfLength(5);
@ -701,7 +701,7 @@ public class SearchActionTests extends ESTestCase {
assertThat(e.getMessage(), equalTo("MSearch response was empty, cannot unroll RollupSearch results"));
}
public void testBoth() throws IOException {
public void testBoth() throws Exception {
String[] indices = new String[]{"foo", "bar"};
String jobName = randomAlphaOfLength(5);

View File

@ -1217,4 +1217,49 @@ setup:
- match: { aggregations.date_histogram#histo.buckets.3.doc_count: 20 }
- match: { aggregations.date_histogram#histo.buckets.3.max#the_max.value: 4 }
---
"Search error against live index":
- do:
catch: bad_request
rollup.rollup_search:
index: "foo"
body:
size: 0
aggs:
histo:
date_histogram:
field: "timestamp"
interval: "asdfasdf"
---
"Search error against rollup and live index":
- do:
catch: bad_request
rollup.rollup_search:
index: "foo*"
body:
size: 0
aggs:
histo:
date_histogram:
field: "timestamp"
interval: "asdfasdf"
---
"Search error no matching indices":
- do:
catch: /Must specify at least one concrete index/
rollup.rollup_search:
index: "bar*"
body:
size: 0
aggs:
histo:
date_histogram:
field: "timestamp"
interval: "1h"