[Transform] improve irrecoverable error detection - part 2 (#52003)
base error handling on rest status instead of listing individual exception types relates to #51820
This commit is contained in:
parent
3f151d1d75
commit
5d35eaa1cb
|
@ -19,7 +19,6 @@ import org.elasticsearch.action.support.IndicesOptions;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.breaker.CircuitBreakingException;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.script.ScriptException;
|
||||
|
@ -42,7 +41,6 @@ import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
|
|||
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
|
||||
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
||||
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
|
||||
import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils;
|
||||
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
|
||||
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;
|
||||
|
||||
|
@ -287,7 +285,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
|||
// If the transform config index or the transform config is gone, something serious occurred
|
||||
// We are in an unknown state and should fail out
|
||||
if (failure instanceof ResourceNotFoundException) {
|
||||
updateConfigListener.onFailure(new TransformConfigReloadingException(msg, failure));
|
||||
updateConfigListener.onFailure(new TransformConfigLostOnReloadException(msg, failure));
|
||||
} else {
|
||||
auditor.warning(getJobId(), msg);
|
||||
updateConfigListener.onResponse(null);
|
||||
|
@ -477,37 +475,54 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
|||
|
||||
if (unwrappedException instanceof CircuitBreakingException) {
|
||||
handleCircuitBreakingException((CircuitBreakingException) unwrappedException);
|
||||
} else if (unwrappedException instanceof ScriptException) {
|
||||
handleScriptException((ScriptException) unwrappedException);
|
||||
// irrecoverable error without special handling
|
||||
} else if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) {
|
||||
handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException);
|
||||
} else if (unwrappedException instanceof IndexNotFoundException
|
||||
|| unwrappedException instanceof AggregationResultUtils.AggregationExtractionException
|
||||
|| unwrappedException instanceof TransformConfigReloadingException
|
||||
|| unwrappedException instanceof ResourceNotFoundException
|
||||
|| unwrappedException instanceof IllegalArgumentException) {
|
||||
failIndexer("task encountered irrecoverable failure: " + e.getMessage());
|
||||
} else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
|
||||
failIndexer(
|
||||
"task encountered more than "
|
||||
+ context.getNumFailureRetries()
|
||||
+ " failures; latest failure: "
|
||||
+ ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
|
||||
);
|
||||
} else {
|
||||
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
|
||||
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
|
||||
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
|
||||
String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);
|
||||
return;
|
||||
}
|
||||
|
||||
auditor.warning(
|
||||
getJobId(),
|
||||
"Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
|
||||
);
|
||||
lastAuditedExceptionMessage = message;
|
||||
}
|
||||
if (unwrappedException instanceof ScriptException) {
|
||||
handleScriptException((ScriptException) unwrappedException);
|
||||
return;
|
||||
}
|
||||
|
||||
if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) {
|
||||
handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException);
|
||||
return;
|
||||
}
|
||||
|
||||
// irrecoverable error without special handling
|
||||
if (unwrappedException instanceof ElasticsearchException) {
|
||||
ElasticsearchException elasticsearchException = (ElasticsearchException) unwrappedException;
|
||||
if (ExceptionRootCauseFinder.IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) {
|
||||
failIndexer("task encountered irrecoverable failure: " + elasticsearchException.getDetailedMessage());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (unwrappedException instanceof IllegalArgumentException) {
|
||||
failIndexer("task encountered irrecoverable failure: " + e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
|
||||
failIndexer(
|
||||
"task encountered more than "
|
||||
+ context.getNumFailureRetries()
|
||||
+ " failures; latest failure: "
|
||||
+ ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
|
||||
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
|
||||
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
|
||||
String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);
|
||||
|
||||
auditor.warning(
|
||||
getJobId(),
|
||||
"Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
|
||||
);
|
||||
lastAuditedExceptionMessage = message;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -901,8 +916,12 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
|||
return RunState.PARTIAL_RUN_IDENTIFY_CHANGES;
|
||||
}
|
||||
|
||||
static class TransformConfigReloadingException extends ElasticsearchException {
|
||||
TransformConfigReloadingException(String msg, Throwable cause, Object... args) {
|
||||
/**
|
||||
* Thrown when the transform configuration disappeared permanently.
|
||||
* (not if reloading failed due to an intermittent problem)
|
||||
*/
|
||||
static class TransformConfigLostOnReloadException extends ResourceNotFoundException {
|
||||
TransformConfigLostOnReloadException(String msg, Throwable cause, Object... args) {
|
||||
super(msg, cause, args);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,19 +7,37 @@
|
|||
package org.elasticsearch.xpack.transform.utils;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.index.mapper.MapperParsingException;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Set of static utils to find the cause of a search exception.
|
||||
*/
|
||||
public final class ExceptionRootCauseFinder {
|
||||
|
||||
/**
|
||||
* List of rest statuses that we consider irrecoverable
|
||||
*/
|
||||
public static final Set<RestStatus> IRRECOVERABLE_REST_STATUSES = new HashSet<>(
|
||||
Arrays.asList(
|
||||
RestStatus.GONE,
|
||||
RestStatus.NOT_IMPLEMENTED,
|
||||
RestStatus.NOT_FOUND,
|
||||
RestStatus.BAD_REQUEST,
|
||||
RestStatus.UNAUTHORIZED,
|
||||
RestStatus.FORBIDDEN,
|
||||
RestStatus.METHOD_NOT_ALLOWED,
|
||||
RestStatus.NOT_ACCEPTABLE
|
||||
)
|
||||
);
|
||||
|
||||
/**
|
||||
* Unwrap the exception stack and return the most likely cause.
|
||||
*
|
||||
|
@ -61,17 +79,22 @@ public final class ExceptionRootCauseFinder {
|
|||
/**
|
||||
* Return the first irrecoverableException from a collection of bulk responses if there are any.
|
||||
*
|
||||
* @param failures a collection of bulk item responses
|
||||
* @param failures a collection of bulk item responses with failures
|
||||
* @return The first exception considered irrecoverable if there are any, null if no irrecoverable exception found
|
||||
*/
|
||||
public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collection<BulkItemResponse> failures) {
|
||||
for (BulkItemResponse failure : failures) {
|
||||
Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(failure.getFailure().getCause());
|
||||
if (unwrappedThrowable instanceof MapperParsingException
|
||||
|| unwrappedThrowable instanceof IllegalArgumentException
|
||||
|| unwrappedThrowable instanceof ResourceNotFoundException) {
|
||||
if (unwrappedThrowable instanceof IllegalArgumentException) {
|
||||
return unwrappedThrowable;
|
||||
}
|
||||
|
||||
if (unwrappedThrowable instanceof ElasticsearchException) {
|
||||
ElasticsearchException elasticsearchException = (ElasticsearchException) unwrappedThrowable;
|
||||
if (IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) {
|
||||
return elasticsearchException;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.transform.utils;
|
||||
|
||||
import org.elasticsearch.ElasticsearchSecurityException;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.DocWriteRequest.OpType;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.index.mapper.MapperParsingException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.TranslogException;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class ExceptionRootCauseFinderTests extends ESTestCase {
|
||||
public void testFetFirstIrrecoverableExceptionFromBulkResponses() {
|
||||
Map<Integer, BulkItemResponse> bulkItemResponses = new HashMap<>();
|
||||
|
||||
int id = 1;
|
||||
// 1
|
||||
bulkItemResponses.put(
|
||||
id,
|
||||
new BulkItemResponse(
|
||||
id++,
|
||||
OpType.INDEX,
|
||||
new BulkItemResponse.Failure("the_index", "type", "id", new MapperParsingException("mapper parsing error"))
|
||||
)
|
||||
);
|
||||
// 2
|
||||
bulkItemResponses.put(
|
||||
id,
|
||||
new BulkItemResponse(
|
||||
id++,
|
||||
OpType.INDEX,
|
||||
new BulkItemResponse.Failure("the_index", "type", "id", new ResourceNotFoundException("resource not found error"))
|
||||
)
|
||||
);
|
||||
// 3
|
||||
bulkItemResponses.put(
|
||||
id,
|
||||
new BulkItemResponse(
|
||||
id++,
|
||||
OpType.INDEX,
|
||||
new BulkItemResponse.Failure("the_index", "type", "id", new IllegalArgumentException("illegal argument error"))
|
||||
)
|
||||
);
|
||||
// 4 not irrecoverable
|
||||
bulkItemResponses.put(
|
||||
id,
|
||||
new BulkItemResponse(
|
||||
id++,
|
||||
OpType.INDEX,
|
||||
new BulkItemResponse.Failure("the_index", "type", "id", new EsRejectedExecutionException("es rejected execution"))
|
||||
)
|
||||
);
|
||||
// 5 not irrecoverable
|
||||
bulkItemResponses.put(
|
||||
id,
|
||||
new BulkItemResponse(
|
||||
id++,
|
||||
OpType.INDEX,
|
||||
new BulkItemResponse.Failure(
|
||||
"the_index",
|
||||
"type",
|
||||
"id",
|
||||
new TranslogException(new ShardId("the_index", "uid", 0), "translog error")
|
||||
)
|
||||
)
|
||||
);
|
||||
// 6
|
||||
bulkItemResponses.put(
|
||||
id,
|
||||
new BulkItemResponse(
|
||||
id++,
|
||||
OpType.INDEX,
|
||||
new BulkItemResponse.Failure(
|
||||
"the_index",
|
||||
"type",
|
||||
"id",
|
||||
new ElasticsearchSecurityException("Authentication required", RestStatus.UNAUTHORIZED)
|
||||
)
|
||||
)
|
||||
);
|
||||
// 7
|
||||
bulkItemResponses.put(
|
||||
id,
|
||||
new BulkItemResponse(
|
||||
id++,
|
||||
OpType.INDEX,
|
||||
new BulkItemResponse.Failure(
|
||||
"the_index",
|
||||
"type",
|
||||
"id",
|
||||
new ElasticsearchSecurityException("current license is non-compliant for [transform]", RestStatus.FORBIDDEN)
|
||||
)
|
||||
)
|
||||
);
|
||||
// 8 not irrecoverable
|
||||
bulkItemResponses.put(
|
||||
id,
|
||||
new BulkItemResponse(
|
||||
id++,
|
||||
OpType.INDEX,
|
||||
new BulkItemResponse.Failure(
|
||||
"the_index",
|
||||
"type",
|
||||
"id",
|
||||
new ElasticsearchSecurityException("overloaded, to many requests", RestStatus.TOO_MANY_REQUESTS)
|
||||
)
|
||||
)
|
||||
);
|
||||
// 9 not irrecoverable
|
||||
bulkItemResponses.put(
|
||||
id,
|
||||
new BulkItemResponse(
|
||||
id++,
|
||||
OpType.INDEX,
|
||||
new BulkItemResponse.Failure(
|
||||
"the_index",
|
||||
"type",
|
||||
"id",
|
||||
new ElasticsearchSecurityException("internal error", RestStatus.INTERNAL_SERVER_ERROR)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
assertFirstException(bulkItemResponses.values(), MapperParsingException.class, "mapper parsing error");
|
||||
bulkItemResponses.remove(1);
|
||||
assertFirstException(bulkItemResponses.values(), ResourceNotFoundException.class, "resource not found error");
|
||||
bulkItemResponses.remove(2);
|
||||
assertFirstException(bulkItemResponses.values(), IllegalArgumentException.class, "illegal argument error");
|
||||
bulkItemResponses.remove(3);
|
||||
assertFirstException(bulkItemResponses.values(), ElasticsearchSecurityException.class, "Authentication required");
|
||||
bulkItemResponses.remove(6);
|
||||
assertFirstException(
|
||||
bulkItemResponses.values(),
|
||||
ElasticsearchSecurityException.class,
|
||||
"current license is non-compliant for [transform]"
|
||||
);
|
||||
bulkItemResponses.remove(7);
|
||||
|
||||
assertNull(ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses.values()));
|
||||
}
|
||||
|
||||
private static void assertFirstException(Collection<BulkItemResponse> bulkItemResponses, Class<?> expectedClass, String message) {
|
||||
Throwable t = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses);
|
||||
assertNotNull(t);
|
||||
assertEquals(t.getClass(), expectedClass);
|
||||
assertEquals(t.getMessage(), message);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue