From f410f91f135f640838b2de56753b20cb5528f420 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 8 May 2019 12:37:43 -0400 Subject: [PATCH] Cleanup RollupSearch exceptions, disallow partial results (#41272) - msearch exceptions should be thrown directly instead of wrapping in a RuntimeException - Do not allow partial results (where some indices are missing), instead throw an exception if any index is missing --- .../rollup/RollupResponseTranslator.java | 104 ++++++++++-------- .../action/TransportRollupSearchAction.java | 2 +- .../RollupResponseTranslationTests.java | 45 ++++---- .../rollup/action/SearchActionTests.java | 6 +- .../test/rollup/rollup_search.yml | 45 ++++++++ 5 files changed, 129 insertions(+), 73 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java index e900d76c849..4a8d007e3b8 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.rollup; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; @@ -61,9 +62,9 @@ public class RollupResponseTranslator { * Verifies a live-only search response. Essentially just checks for failure then returns * the response since we have no work to do */ - public static SearchResponse verifyResponse(MultiSearchResponse.Item normalResponse) { + public static SearchResponse verifyResponse(MultiSearchResponse.Item normalResponse) throws Exception { if (normalResponse.isFailure()) { - throw new RuntimeException(normalResponse.getFailureMessage(), normalResponse.getFailure()); + throw normalResponse.getFailure(); } return normalResponse.getResponse(); } @@ -77,16 +78,30 @@ public class RollupResponseTranslator { * on the translation conventions */ public static SearchResponse translateResponse(MultiSearchResponse.Item[] rolledMsearch, - InternalAggregation.ReduceContext reduceContext) { + InternalAggregation.ReduceContext reduceContext) throws Exception { - List responses = Arrays.stream(rolledMsearch) - .map(item -> { - if (item.isFailure()) { - throw new RuntimeException(item.getFailureMessage(), item.getFailure()); - } - return item.getResponse(); - }).collect(Collectors.toList()); + assert rolledMsearch.length > 0; + List responses = new ArrayList<>(); + for (MultiSearchResponse.Item item : rolledMsearch) { + if (item.isFailure()) { + Exception e = item.getFailure(); + // If an index was deleted after execution, give a hint to the user that this is a transient error + if (e instanceof IndexNotFoundException) { + throw new ResourceNotFoundException("Index [" + ((IndexNotFoundException) e).getIndex().getName() + + "] was not found, likely because it was deleted while the request was in-flight. " + + "Rollup does not support partial search results, please try the request again."); + } + + // Otherwise just throw + throw e; + } + + // No error, add to responses + responses.add(item.getResponse()); + } + + assert responses.size() > 0; return doCombineResponse(null, responses, reduceContext); } @@ -187,48 +202,45 @@ public class RollupResponseTranslator { * @param msearchResponses The responses from the msearch, where the first response is the live-index response */ public static SearchResponse combineResponses(MultiSearchResponse.Item[] msearchResponses, - InternalAggregation.ReduceContext reduceContext) { - boolean liveMissing = false; + InternalAggregation.ReduceContext reduceContext) throws Exception { + assert msearchResponses.length >= 2; - // The live response is always first - MultiSearchResponse.Item liveResponse = msearchResponses[0]; - if (liveResponse.isFailure()) { - Exception e = liveResponse.getFailure(); - // If we have a rollup response we can tolerate a missing live response - if (e instanceof IndexNotFoundException) { - logger.warn("\"Live\" index not found during rollup search.", e); - liveMissing = true; - } else { - throw new RuntimeException(liveResponse.getFailureMessage(), liveResponse.getFailure()); + boolean first = true; + SearchResponse liveResponse = null; + List rolledResponses = new ArrayList<>(); + for (MultiSearchResponse.Item item : msearchResponses) { + if (item.isFailure()) { + Exception e = item.getFailure(); + + // If an index was deleted after execution, give a hint to the user that this is a transient error + if (e instanceof IndexNotFoundException) { + throw new ResourceNotFoundException("Index [" + ((IndexNotFoundException) e).getIndex() + "] was not found, " + + "likely because it was deleted while the request was in-flight. Rollup does not support partial search results, " + + "please try the request again.", e); + } + + // Otherwise just throw + throw e; } - } - List rolledResponses = Arrays.stream(msearchResponses) - .skip(1) - .map(item -> { - if (item.isFailure()) { - Exception e = item.getFailure(); - // If we have a normal response we can tolerate a missing rollup response, although it theoretically - // should be handled by a different code path (verifyResponse) - if (e instanceof IndexNotFoundException) { - logger.warn("Rollup index not found during rollup search.", e); - } else { - throw new RuntimeException(item.getFailureMessage(), item.getFailure()); - } - return null; - } else { - return item.getResponse(); - } - }).filter(Objects::nonNull).collect(Collectors.toList()); - // If we only have a live index left, process it directly - if (rolledResponses.isEmpty() && liveMissing == false) { - return verifyResponse(liveResponse); - } else if (rolledResponses.isEmpty() && liveMissing) { - throw new RuntimeException("No indices (live or rollup) found during rollup search"); + // No error, add to responses + if (first) { + liveResponse = item.getResponse(); + } else { + rolledResponses.add(item.getResponse()); + } + first = false; } - return doCombineResponse(liveResponse.getResponse(), rolledResponses, reduceContext); + // If we only have a live index left, just return it directly. We know it can't be an error already + if (rolledResponses.isEmpty() && liveResponse != null) { + return liveResponse; + } else if (rolledResponses.isEmpty()) { + throw new ResourceNotFoundException("No indices (live or rollup) found during rollup search"); + } + + return doCombineResponse(liveResponse, rolledResponses, reduceContext); } private static SearchResponse doCombineResponse(SearchResponse liveResponse, List rolledResponses, diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java index 414a0d08ef3..2a1308353d6 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java @@ -111,7 +111,7 @@ public class TransportRollupSearchAction extends TransportAction RollupResponseTranslator.combineResponses(failure, + () -> RollupResponseTranslator.translateResponse(failure, new InternalAggregation.ReduceContext(bigArrays, scriptService, true))); assertThat(e.getMessage(), equalTo("rollup failure")); } @@ -129,13 +129,14 @@ public class RollupResponseTranslationTests extends AggregatorTestCase { BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); ScriptService scriptService = mock(ScriptService.class); - Exception e = expectThrows(RuntimeException.class, + ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, () -> RollupResponseTranslator.combineResponses(failure, new InternalAggregation.ReduceContext(bigArrays, scriptService, true))); - assertThat(e.getMessage(), equalTo("No indices (live or rollup) found during rollup search")); + assertThat(e.getMessage(), equalTo("Index [[foo]] was not found, likely because it was deleted while the request was in-flight. " + + "Rollup does not support partial search results, please try the request again.")); } - public void testMissingLiveIndex() { + public void testMissingLiveIndex() throws Exception { SearchResponse responseWithout = mock(SearchResponse.class); when(responseWithout.getTook()).thenReturn(new TimeValue(100)); List aggTree = new ArrayList<>(1); @@ -174,16 +175,13 @@ public class RollupResponseTranslationTests extends AggregatorTestCase { BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); ScriptService scriptService = mock(ScriptService.class); - SearchResponse response = RollupResponseTranslator.combineResponses(msearch, - new InternalAggregation.ReduceContext(bigArrays, scriptService, true)); - assertNotNull(response); - Aggregations responseAggs = response.getAggregations(); - assertNotNull(responseAggs); - Avg avg = responseAggs.get("foo"); - assertThat(avg.getValue(), equalTo(5.0)); + ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, () -> RollupResponseTranslator.combineResponses(msearch, + new InternalAggregation.ReduceContext(bigArrays, scriptService, true))); + assertThat(e.getMessage(), equalTo("Index [[foo]] was not found, likely because it was deleted while the request was in-flight. " + + "Rollup does not support partial search results, please try the request again.")); } - public void testRolledMissingAggs() { + public void testRolledMissingAggs() throws Exception { SearchResponse responseWithout = mock(SearchResponse.class); when(responseWithout.getTook()).thenReturn(new TimeValue(100)); @@ -191,13 +189,12 @@ public class RollupResponseTranslationTests extends AggregatorTestCase { when(responseWithout.getAggregations()).thenReturn(mockAggsWithout); MultiSearchResponse.Item[] msearch = new MultiSearchResponse.Item[]{ - new MultiSearchResponse.Item(null, new IndexNotFoundException("foo")), new MultiSearchResponse.Item(responseWithout, null)}; BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); ScriptService scriptService = mock(ScriptService.class); - SearchResponse response = RollupResponseTranslator.combineResponses(msearch, + SearchResponse response = RollupResponseTranslator.translateResponse(msearch, new InternalAggregation.ReduceContext(bigArrays, scriptService, true)); assertNotNull(response); Aggregations responseAggs = response.getAggregations(); @@ -214,12 +211,13 @@ public class RollupResponseTranslationTests extends AggregatorTestCase { BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); ScriptService scriptService = mock(ScriptService.class); - SearchResponse finalResponse = RollupResponseTranslator.combineResponses(msearch, - new InternalAggregation.ReduceContext(bigArrays, scriptService, true)); - assertThat(finalResponse, equalTo(response)); + ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, () -> RollupResponseTranslator.combineResponses(msearch, + new InternalAggregation.ReduceContext(bigArrays, scriptService, true))); + assertThat(e.getMessage(), equalTo("Index [[foo]] was not found, likely because it was deleted while the request was in-flight. " + + "Rollup does not support partial search results, please try the request again.")); } - public void testVerifyNormal() { + public void testVerifyNormal() throws Exception { SearchResponse response = mock(SearchResponse.class); MultiSearchResponse.Item item = new MultiSearchResponse.Item(response, null); @@ -234,7 +232,7 @@ public class RollupResponseTranslationTests extends AggregatorTestCase { assertThat(e.getMessage(), equalTo("no such index [foo]")); } - public void testTranslateRollup() { + public void testTranslateRollup() throws Exception { SearchResponse response = mock(SearchResponse.class); when(response.getTook()).thenReturn(new TimeValue(100)); List aggTree = new ArrayList<>(1); @@ -285,9 +283,10 @@ public class RollupResponseTranslationTests extends AggregatorTestCase { ScriptService scriptService = mock(ScriptService.class); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(bigArrays, scriptService, true); - Exception e = expectThrows(RuntimeException.class, + ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, () -> RollupResponseTranslator.translateResponse(new MultiSearchResponse.Item[]{missing}, context)); - assertThat(e.getMessage(), equalTo("no such index [foo]")); + assertThat(e.getMessage(), equalTo("Index [foo] was not found, likely because it was deleted while the request was in-flight. " + + "Rollup does not support partial search results, please try the request again.")); } public void testMissingFilter() { @@ -350,7 +349,7 @@ public class RollupResponseTranslationTests extends AggregatorTestCase { equalTo("Expected [filter_foo] to be a FilterAggregation, but was [InternalMax]")); } - public void testSimpleReduction() { + public void testSimpleReduction() throws Exception { SearchResponse protoResponse = mock(SearchResponse.class); when(protoResponse.getTook()).thenReturn(new TimeValue(100)); List protoAggTree = new ArrayList<>(1); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java index a795edca83e..448e901997f 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java @@ -584,7 +584,7 @@ public class SearchActionTests extends ESTestCase { assertThat(result.getJobCaps().size(), equalTo(1)); } - public void testLiveOnlyProcess() { + public void testLiveOnlyProcess() throws Exception { String[] indices = new String[]{"foo"}; IndexMetaData indexMetaData = mock(IndexMetaData.class); ImmutableOpenMap.Builder meta = ImmutableOpenMap.builder(1); @@ -601,7 +601,7 @@ public class SearchActionTests extends ESTestCase { assertThat(r, equalTo(response)); } - public void testRollupOnly() throws IOException { + public void testRollupOnly() throws Exception { String[] indices = new String[]{"foo"}; String jobName = randomAlphaOfLength(5); @@ -701,7 +701,7 @@ public class SearchActionTests extends ESTestCase { assertThat(e.getMessage(), equalTo("MSearch response was empty, cannot unroll RollupSearch results")); } - public void testBoth() throws IOException { + public void testBoth() throws Exception { String[] indices = new String[]{"foo", "bar"}; String jobName = randomAlphaOfLength(5); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/rollup_search.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/rollup_search.yml index 0f3488d146a..e93a0deb037 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/rollup_search.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/rollup_search.yml @@ -1217,4 +1217,49 @@ setup: - match: { aggregations.date_histogram#histo.buckets.3.doc_count: 20 } - match: { aggregations.date_histogram#histo.buckets.3.max#the_max.value: 4 } +--- +"Search error against live index": + + - do: + catch: bad_request + rollup.rollup_search: + index: "foo" + body: + size: 0 + aggs: + histo: + date_histogram: + field: "timestamp" + interval: "asdfasdf" + + +--- +"Search error against rollup and live index": + + - do: + catch: bad_request + rollup.rollup_search: + index: "foo*" + body: + size: 0 + aggs: + histo: + date_histogram: + field: "timestamp" + interval: "asdfasdf" + +--- +"Search error no matching indices": + + - do: + catch: /Must specify at least one concrete index/ + rollup.rollup_search: + index: "bar*" + body: + size: 0 + aggs: + histo: + date_histogram: + field: "timestamp" + interval: "1h"