diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 6b282131e9d..6f1925bdc81 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -19,7 +19,6 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.script.ScriptException; @@ -42,7 +41,6 @@ import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; -import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils; import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder; @@ -287,7 +285,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer context.getNumFailureRetries()) { - failIndexer( - "task encountered more than " - + context.getNumFailureRetries() - + " failures; latest failure: " - + ExceptionRootCauseFinder.getDetailedMessage(unwrappedException) - ); - } else { - // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous - // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one - if (e.getMessage().equals(lastAuditedExceptionMessage) == false) { - String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException); + return; + } - auditor.warning( - getJobId(), - "Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger." - ); - lastAuditedExceptionMessage = message; - } + if (unwrappedException instanceof ScriptException) { + handleScriptException((ScriptException) unwrappedException); + return; + } + + if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) { + handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException); + return; + } + + // irrecoverable error without special handling + if (unwrappedException instanceof ElasticsearchException) { + ElasticsearchException elasticsearchException = (ElasticsearchException) unwrappedException; + if (ExceptionRootCauseFinder.IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) { + failIndexer("task encountered irrecoverable failure: " + elasticsearchException.getDetailedMessage()); + return; } + } + + if (unwrappedException instanceof IllegalArgumentException) { + failIndexer("task encountered irrecoverable failure: " + e.getMessage()); + return; + } + + if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) { + failIndexer( + "task encountered more than " + + context.getNumFailureRetries() + + " failures; latest failure: " + + ExceptionRootCauseFinder.getDetailedMessage(unwrappedException) + ); + return; + } + + // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous + // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one + if (e.getMessage().equals(lastAuditedExceptionMessage) == false) { + String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException); + + auditor.warning( + getJobId(), + "Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger." + ); + lastAuditedExceptionMessage = message; + } } /** @@ -901,8 +916,12 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer IRRECOVERABLE_REST_STATUSES = new HashSet<>( + Arrays.asList( + RestStatus.GONE, + RestStatus.NOT_IMPLEMENTED, + RestStatus.NOT_FOUND, + RestStatus.BAD_REQUEST, + RestStatus.UNAUTHORIZED, + RestStatus.FORBIDDEN, + RestStatus.METHOD_NOT_ALLOWED, + RestStatus.NOT_ACCEPTABLE + ) + ); + /** * Unwrap the exception stack and return the most likely cause. * @@ -61,17 +79,22 @@ public final class ExceptionRootCauseFinder { /** * Return the first irrecoverableException from a collection of bulk responses if there are any. * - * @param failures a collection of bulk item responses + * @param failures a collection of bulk item responses with failures * @return The first exception considered irrecoverable if there are any, null if no irrecoverable exception found */ public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collection failures) { for (BulkItemResponse failure : failures) { Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(failure.getFailure().getCause()); - if (unwrappedThrowable instanceof MapperParsingException - || unwrappedThrowable instanceof IllegalArgumentException - || unwrappedThrowable instanceof ResourceNotFoundException) { + if (unwrappedThrowable instanceof IllegalArgumentException) { return unwrappedThrowable; } + + if (unwrappedThrowable instanceof ElasticsearchException) { + ElasticsearchException elasticsearchException = (ElasticsearchException) unwrappedThrowable; + if (IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) { + return elasticsearchException; + } + } } return null; diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java new file mode 100644 index 00000000000..7e5cf02ce62 --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.utils; + +import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.DocWriteRequest.OpType; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.TranslogException; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class ExceptionRootCauseFinderTests extends ESTestCase { + public void testFetFirstIrrecoverableExceptionFromBulkResponses() { + Map bulkItemResponses = new HashMap<>(); + + int id = 1; + // 1 + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure("the_index", "type", "id", new MapperParsingException("mapper parsing error")) + ) + ); + // 2 + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure("the_index", "type", "id", new ResourceNotFoundException("resource not found error")) + ) + ); + // 3 + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure("the_index", "type", "id", new IllegalArgumentException("illegal argument error")) + ) + ); + // 4 not irrecoverable + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure("the_index", "type", "id", new EsRejectedExecutionException("es rejected execution")) + ) + ); + // 5 not irrecoverable + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure( + "the_index", + "type", + "id", + new TranslogException(new ShardId("the_index", "uid", 0), "translog error") + ) + ) + ); + // 6 + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure( + "the_index", + "type", + "id", + new ElasticsearchSecurityException("Authentication required", RestStatus.UNAUTHORIZED) + ) + ) + ); + // 7 + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure( + "the_index", + "type", + "id", + new ElasticsearchSecurityException("current license is non-compliant for [transform]", RestStatus.FORBIDDEN) + ) + ) + ); + // 8 not irrecoverable + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure( + "the_index", + "type", + "id", + new ElasticsearchSecurityException("overloaded, to many requests", RestStatus.TOO_MANY_REQUESTS) + ) + ) + ); + // 9 not irrecoverable + bulkItemResponses.put( + id, + new BulkItemResponse( + id++, + OpType.INDEX, + new BulkItemResponse.Failure( + "the_index", + "type", + "id", + new ElasticsearchSecurityException("internal error", RestStatus.INTERNAL_SERVER_ERROR) + ) + ) + ); + + assertFirstException(bulkItemResponses.values(), MapperParsingException.class, "mapper parsing error"); + bulkItemResponses.remove(1); + assertFirstException(bulkItemResponses.values(), ResourceNotFoundException.class, "resource not found error"); + bulkItemResponses.remove(2); + assertFirstException(bulkItemResponses.values(), IllegalArgumentException.class, "illegal argument error"); + bulkItemResponses.remove(3); + assertFirstException(bulkItemResponses.values(), ElasticsearchSecurityException.class, "Authentication required"); + bulkItemResponses.remove(6); + assertFirstException( + bulkItemResponses.values(), + ElasticsearchSecurityException.class, + "current license is non-compliant for [transform]" + ); + bulkItemResponses.remove(7); + + assertNull(ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses.values())); + } + + private static void assertFirstException(Collection bulkItemResponses, Class expectedClass, String message) { + Throwable t = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses); + assertNotNull(t); + assertEquals(t.getClass(), expectedClass); + assertEquals(t.getMessage(), message); + } +}