From ca912624ec638f5e266056f31d23cab16e2b0249 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 18 Nov 2019 09:57:56 +0100 Subject: [PATCH] [Transform] improve error handling of script errors (#48887) improve error handling for script errors, treating it as irrecoverable errors which puts the task immediately into failed state, also improves the error extraction to properly report the script error. fixes #48467 --- .../core/transform/TransformMessages.java | 2 + .../TransformTaskFailedStateIT.java | 57 ++++-- .../transforms/TransformIndexer.java | 110 +++++------ .../utils/ExceptionRootCauseFinder.java | 58 ++++++ .../transforms/TransformIndexerTests.java | 187 +++++++++++++----- 5 files changed, 285 insertions(+), 129 deletions(-) create mode 100644 x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java index ac028974e82..9708cd301e4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java @@ -77,6 +77,8 @@ public class TransformMessages { public static final String LOG_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE = "Insufficient memory for search after repeated page size reductions to [{0}], unable to continue pivot, " + "please simplify job or increase heap size on data nodes."; + public static final String LOG_TRANSFORM_PIVOT_SCRIPT_ERROR = + "Failed to execute script with error: [{0}], stack trace: {1}"; public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS = "Failed to parse transform checkpoints for [{0}]"; diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java index a9f8a9bc963..033cf57ba84 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java @@ -28,21 +28,28 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.matchesRegex; public class TransformTaskFailedStateIT extends TransformRestTestCase { private final List failureTransforms = new ArrayList<>(); + @Before public void setClusterSettings() throws IOException { // Make sure we never retry on failure to speed up the test // Set logging level to trace // see: https://github.com/elastic/elasticsearch/issues/45562 Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings"); - addFailureRetrySetting.setJsonEntity( - "{\"transient\": {\"xpack.transform.num_transform_failure_retries\": \"" + 0 + "\"," + - "\"logger.org.elasticsearch.action.bulk\": \"info\"," + // reduces bulk failure spam - "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," + - "\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}"); + addFailureRetrySetting + .setJsonEntity( + "{\"transient\": {\"xpack.transform.num_transform_failure_retries\": \"" + + 0 + + "\"," + + "\"logger.org.elasticsearch.action.bulk\": \"info\"," + + // reduces bulk failure spam + "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," + + "\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}" + ); client().performRequest(addFailureRetrySetting); } @@ -66,18 +73,22 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase { startDataframeTransform(transformId); awaitState(transformId, TransformStats.State.FAILED); Map fullState = getDataFrameState(transformId); - final String failureReason = "task encountered more than 0 failures; latest failure: " + - "Bulk index experienced failures. See the logs of the node running the transform for details."; + final String failureReason = "task encountered more than 0 failures; latest failure: " + + ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details."; // Verify we have failed for the expected reason - assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason)); + assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason)); // verify that we cannot stop a failed transform ResponseException ex = expectThrows(ResponseException.class, () -> stopTransform(transformId, false)); assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); - assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), - equalTo("Unable to stop transform [test-force-stop-failed-transform] as it is in a failed state with reason [" + - failureReason + - "]. Use force stop to stop the transform.")); + assertThat( + (String) XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), + matchesRegex( + "Unable to stop transform \\[test-force-stop-failed-transform\\] as it is in a failed state with reason \\[" + + failureReason + + "\\]. Use force stop to stop the transform." + ) + ); // Verify that we can force stop a failed transform stopTransform(transformId, true); @@ -97,20 +108,23 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase { startDataframeTransform(transformId); awaitState(transformId, TransformStats.State.FAILED); Map fullState = getDataFrameState(transformId); - final String failureReason = "task encountered more than 0 failures; latest failure: " + - "Bulk index experienced failures. See the logs of the node running the transform for details."; + final String failureReason = "task encountered more than 0 failures; latest failure: " + + ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details."; // Verify we have failed for the expected reason - assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason)); + assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason)); - final String expectedFailure = "Unable to start transform [test-force-start-failed-transform] " + - "as it is in a failed state with failure: [" + failureReason + - "]. Use force stop and then restart the transform once error is resolved."; + final String expectedFailure = "Unable to start transform \\[test-force-start-failed-transform\\] " + + "as it is in a failed state with failure: \\[" + + failureReason + + "\\]. Use force stop and then restart the transform once error is resolved."; // Verify that we cannot start the transform when the task is in a failed state assertBusy(() -> { ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId)); assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); - assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), - equalTo(expectedFailure)); + assertThat( + (String) XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), + matchesRegex(expectedFailure) + ); }, 60, TimeUnit.SECONDS); stopTransform(transformId, true); @@ -128,7 +142,8 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); { - builder.startObject("mappings") + builder + .startObject("mappings") .startObject("properties") .startObject("reviewer") .field("type", "long") diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 877afeb1554..94d220e29c1 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -13,10 +13,8 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreakingException; @@ -24,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.script.ScriptException; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; @@ -45,6 +44,7 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils; import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; +import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder; import java.io.IOException; import java.io.UncheckedIOException; @@ -66,7 +66,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer context.getNumFailureRetries()) { - String failureMessage = isIrrecoverableFailure(e) - ? "task encountered irrecoverable failure: " + e.getMessage() - : "task encountered more than " + context.getNumFailureRetries() + " failures; latest failure: " + e.getMessage(); - failIndexer(failureMessage); + if (unwrappedException instanceof CircuitBreakingException) { + handleCircuitBreakingException((CircuitBreakingException) unwrappedException); + } else if (unwrappedException instanceof ScriptException) { + handleScriptException((ScriptException) unwrappedException); + // irrecoverable error without special handling + } else if (unwrappedException instanceof IndexNotFoundException + || unwrappedException instanceof AggregationResultUtils.AggregationExtractionException + || unwrappedException instanceof TransformConfigReloadingException) { + failIndexer("task encountered irrecoverable failure: " + e.getMessage()); + } else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) { + failIndexer( + "task encountered more than " + + context.getNumFailureRetries() + + " failures; latest failure: " + + ExceptionRootCauseFinder.getDetailedMessage(unwrappedException) + ); } else { // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one if (e.getMessage().equals(lastAuditedExceptionMessage) == false) { + String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException); + auditor .warning( getJobId(), - "Transform encountered an exception: " + e.getMessage() + " Will attempt again at next scheduled trigger." + "Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger." ); - lastAuditedExceptionMessage = e.getMessage(); + lastAuditedExceptionMessage = message; } } } @@ -510,12 +521,6 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer processBuckets(final CompositeAggregation agg) { // we reached the end if (agg.getBuckets().isEmpty()) { @@ -536,7 +541,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer searchFunction; private final Function bulkFunction; - private final Consumer failureConsumer; + private final Consumer failureConsumer; // used for synchronizing with the test private CountDownLatch latch; @@ -94,7 +99,7 @@ public class TransformIndexerTests extends ESTestCase { TransformContext context, Function searchFunction, Function bulkFunction, - Consumer failureConsumer + Consumer failureConsumer ) { super( executor, @@ -174,14 +179,12 @@ public class TransformIndexerTests extends ESTestCase { @Override protected void onFailure(Exception exc) { try { - // mimic same behavior as {@link TransformTask} - if (handleCircuitBreakingException(exc)) { - return; - } - - failureConsumer.accept(exc); + super.onFailure(exc); } catch (Exception e) { - fail("Internal error: " + e.getMessage()); + final StringWriter sw = new StringWriter(); + final PrintWriter pw = new PrintWriter(sw, true); + e.printStackTrace(pw); + fail("Unexpected failure: " + e.getMessage() + " Trace: " + sw.getBuffer().toString()); } } @@ -198,7 +201,12 @@ public class TransformIndexerTests extends ESTestCase { @Override protected void failIndexer(String message) { - fail("failIndexer should not be called, received error: " + message); + if (failureConsumer != null) { + failureConsumer.accept(message); + super.failIndexer(message); + } else { + fail("failIndexer should not be called, received error: " + message); + } } } @@ -238,33 +246,20 @@ public class TransformIndexerTests extends ESTestCase { Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - Consumer failureConsumer = e -> { - final StringWriter sw = new StringWriter(); - final PrintWriter pw = new PrintWriter(sw, true); - e.printStackTrace(pw); - fail("expected circuit breaker exception to be handled, got:" + e + " Trace: " + sw.getBuffer().toString()); - }; - final ExecutorService executor = Executors.newFixedThreadPool(1); try { TransformAuditor auditor = new TransformAuditor(client, "node_1"); TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); - MockedTransformIndexer indexer = new MockedTransformIndexer( - executor, - mock(TransformConfigManager.class), - mock(CheckpointProvider.class), - new TransformProgressGatherer(client), + MockedTransformIndexer indexer = createMockIndexer( config, - Collections.emptyMap(), - auditor, state, - null, - new TransformIndexerStats(), - context, searchFunction, bulkFunction, - failureConsumer + null, + executor, + auditor, + context ); final CountDownLatch latch = indexer.newLatch(1); indexer.start(); @@ -333,33 +328,20 @@ public class TransformIndexerTests extends ESTestCase { Function searchFunction = searchRequest -> searchResponse; Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - Consumer failureConsumer = e -> { - final StringWriter sw = new StringWriter(); - final PrintWriter pw = new PrintWriter(sw, true); - e.printStackTrace(pw); - fail(e.getMessage()); - }; - final ExecutorService executor = Executors.newFixedThreadPool(1); try { TransformAuditor auditor = mock(TransformAuditor.class); TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); - MockedTransformIndexer indexer = new MockedTransformIndexer( - executor, - mock(TransformConfigManager.class), - mock(CheckpointProvider.class), - new TransformProgressGatherer(client), + MockedTransformIndexer indexer = createMockIndexer( config, - Collections.emptyMap(), - auditor, state, - null, - new TransformIndexerStats(), - context, searchFunction, bulkFunction, - failureConsumer + null, + executor, + auditor, + context ); IterationResult newPosition = indexer.doProcess(searchResponse); @@ -372,4 +354,117 @@ public class TransformIndexerTests extends ESTestCase { } } + public void testScriptError() throws Exception { + Integer pageSize = randomBoolean() ? null : randomIntBetween(500, 10_000); + String transformId = randomAlphaOfLength(10); + TransformConfig config = new TransformConfig( + transformId, + randomSourceConfig(), + randomDestConfig(), + null, + null, + null, + new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize), + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000) + ); + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + Function searchFunction = searchRequest -> { + throw new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { + new ShardSearchFailure( + new ScriptException( + "runtime error", + new ArithmeticException("/ by zero"), + singletonList("stack"), + "test", + "painless" + ) + ) } + + ); + }; + + Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); + + final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); + final AtomicReference failureMessage = new AtomicReference<>(); + Consumer failureConsumer = message -> { + failIndexerCalled.compareAndSet(false, true); + failureMessage.compareAndSet(null, message); + }; + + final ExecutorService executor = Executors.newFixedThreadPool(1); + try { + MockTransformAuditor auditor = new MockTransformAuditor(); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); + + MockedTransformIndexer indexer = createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + failureConsumer, + executor, + auditor, + context + ); + + final CountDownLatch latch = indexer.newLatch(1); + auditor + .addExpectation( + new MockTransformAuditor.SeenAuditExpectation( + "fail indexer due to script error", + org.elasticsearch.xpack.core.common.notifications.Level.ERROR, + transformId, + "Failed to execute script with error: [*ArithmeticException: / by zero], stack trace: [stack]" + ) + ); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + + latch.countDown(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); + assertTrue(failIndexerCalled.get()); + assertThat( + failureMessage.get(), + matchesRegex("Failed to execute script with error: \\[.*ArithmeticException: / by zero\\], stack trace: \\[stack\\]") + ); + auditor.assertAllExpectationsMatched(); + } finally { + executor.shutdownNow(); + } + } + + private MockedTransformIndexer createMockIndexer( + TransformConfig config, + AtomicReference state, + Function searchFunction, + Function bulkFunction, + Consumer failureConsumer, + final ExecutorService executor, + TransformAuditor auditor, + TransformContext context + ) { + return new MockedTransformIndexer( + executor, + mock(TransformConfigManager.class), + mock(CheckpointProvider.class), + new TransformProgressGatherer(client), + config, + Collections.emptyMap(), + auditor, + state, + null, + new TransformIndexerStats(), + context, + searchFunction, + bulkFunction, + failureConsumer + ); + } + }