[Transform] Handle permanent bulk indexing errors (#51307)

check bulk indexing error for permanent problems and ensure the state goes into failed instead of
retry. Corrects the stats API to show the real error and avoids excessive audit logging.

fixes #50122
This commit is contained in:
Hendrik Muhs 2020-01-23 16:16:27 +01:00
parent 84664e8d60
commit 3553f68f5a
9 changed files with 184 additions and 38 deletions

View File

@ -79,6 +79,8 @@ public class TransformMessages {
+ "please simplify job or increase heap size on data nodes.";
public static final String LOG_TRANSFORM_PIVOT_SCRIPT_ERROR =
"Failed to execute script with error: [{0}], stack trace: {1}";
public static final String LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR =
"Failed to index documents into destination index due to permanent error: [{0}]";
public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS =
"Failed to parse transform checkpoints for [{0}]";

View File

@ -72,8 +72,11 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
startTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getTransformStateAndStats(transformId);
final String failureReason = "task encountered more than 0 failures; latest failure: "
+ ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details.";
final String failureReason = "Failed to index documents into destination index due to permanent error: "
+ "\\[org.elasticsearch.xpack.transform.transforms.BulkIndexingException: Bulk index experienced \\[7\\] "
+ "failures and at least 1 irrecoverable "
+ "\\[org.elasticsearch.xpack.transform.transforms.TransformException: Destination index mappings are "
+ "incompatible with the transform configuration.;.*";
// Verify we have failed for the expected reason
assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason));
@ -107,8 +110,11 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
startTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getTransformStateAndStats(transformId);
final String failureReason = "task encountered more than 0 failures; latest failure: "
+ ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details.";
final String failureReason = "Failed to index documents into destination index due to permanent error: "
+ "\\[org.elasticsearch.xpack.transform.transforms.BulkIndexingException: Bulk index experienced \\[7\\] "
+ "failures and at least 1 irrecoverable "
+ "\\[org.elasticsearch.xpack.transform.transforms.TransformException: Destination index mappings are "
+ "incompatible with the transform configuration.;.*";
// Verify we have failed for the expected reason
assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason));

View File

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.transform.transforms;
import org.elasticsearch.ElasticsearchException;
// Wrapper for indexing failures thrown internally in the transform indexer
class BulkIndexingException extends ElasticsearchException {
private final boolean irrecoverable;
/**
* Create a BulkIndexingException
*
* @param msg The message
* @param cause The most important cause of the bulk indexing failure
* @param irrecoverable whether this is a permanent or irrecoverable error (controls retry)
* @param args arguments for formating the message
*/
BulkIndexingException(String msg, Throwable cause, boolean irrecoverable, Object... args) {
super(msg, cause, args);
this.irrecoverable = irrecoverable;
}
public boolean isIrrecoverable() {
return irrecoverable;
}
}

View File

@ -21,6 +21,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
@ -36,8 +37,11 @@ import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -154,31 +158,63 @@ class ClientTransformIndexer extends TransformIndexer {
ActionListener.wrap(bulkResponse -> {
if (bulkResponse.hasFailures()) {
int failureCount = 0;
// dedup the failures by the type of the exception, as they most likely have the same cause
Map<String, BulkItemResponse> deduplicatedFailures = new LinkedHashMap<>();
for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
deduplicatedFailures.putIfAbsent(item.getFailure().getCause().getClass().getSimpleName(), item);
failureCount++;
}
// TODO gather information on irrecoverable failures and update isIrrecoverableFailure
}
if (auditBulkFailures) {
String failureMessage = bulkResponse.buildFailureMessage();
logger.debug("[{}] Bulk index failure encountered: {}", getJobId(), failureMessage);
auditor.warning(
getJobId(),
"Experienced at least ["
+ failureCount
+ "] bulk index failures. See the logs of the node running the transform for details. "
+ failureMessage
);
auditBulkFailures = false;
}
// note: bulk failures are audited/logged in {@link TransformIndexer#handleFailure(Exception)}
// This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure
// It increments the indexing failure, and then calls the `onFailure` logic
nextPhase.onFailure(
new BulkIndexingException(
"Bulk index experienced failures. " + "See the logs of the node running the transform for details."
)
// Determine whether the failure is irrecoverable (transform should go into failed state) or not (transform increments
// the indexing failure counter
// and possibly retries)
Exception irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(
deduplicatedFailures.values()
);
if (irrecoverableException == null) {
String failureMessage = getBulkIndexDetailedFailureMessage(" Significant failures: ", deduplicatedFailures);
logger.debug("[{}] Bulk index experienced [{}] failures.{}", getJobId(), failureCount, failureMessage);
Exception firstException = deduplicatedFailures.values().iterator().next().getFailure().getCause();
nextPhase.onFailure(
new BulkIndexingException(
"Bulk index experienced [{}] failures. Significant falures: {}",
firstException,
false,
failureCount,
failureMessage
)
);
} else {
deduplicatedFailures.remove(irrecoverableException.getClass().getSimpleName());
String failureMessage = getBulkIndexDetailedFailureMessage(" Other failures: ", deduplicatedFailures);
irrecoverableException = decorateBulkIndexException(irrecoverableException);
logger.debug(
"[{}] Bulk index experienced [{}] failures and at least 1 irrecoverable [{}].{}",
getJobId(),
failureCount,
ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException),
failureMessage
);
nextPhase.onFailure(
new BulkIndexingException(
"Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. Other failures: {}",
irrecoverableException,
true,
failureCount,
ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException),
failureMessage
)
);
}
} else {
auditBulkFailures = true;
nextPhase.onResponse(bulkResponse);
@ -320,11 +356,31 @@ class ClientTransformIndexer extends TransformIndexer {
return seqNoPrimaryTermAndIndex.get();
}
// Considered a recoverable indexing failure
private static class BulkIndexingException extends ElasticsearchException {
BulkIndexingException(String msg, Object... args) {
super(msg, args);
private static String getBulkIndexDetailedFailureMessage(String prefix, Map<String, BulkItemResponse> failures) {
if (failures.isEmpty()) {
return "";
}
StringBuilder failureMessageBuilder = new StringBuilder(prefix);
for (Entry<String, BulkItemResponse> failure : failures.entrySet()) {
failureMessageBuilder.append("\n[")
.append(failure.getKey())
.append("] message [")
.append(failure.getValue().getFailureMessage())
.append("]");
}
String failureMessage = failureMessageBuilder.toString();
return failureMessage;
}
private static Exception decorateBulkIndexException(Exception irrecoverableException) {
if (irrecoverableException instanceof MapperParsingException) {
return new TransformException(
"Destination index mappings are incompatible with the transform configuration.",
irrecoverableException
);
}
return irrecoverableException;
}
}

View File

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.transform.transforms;
import org.elasticsearch.ElasticsearchException;
class TransformException extends ElasticsearchException {
TransformException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

View File

@ -480,6 +480,8 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
} else if (unwrappedException instanceof ScriptException) {
handleScriptException((ScriptException) unwrappedException);
// irrecoverable error without special handling
} else if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) {
handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException);
} else if (unwrappedException instanceof IndexNotFoundException
|| unwrappedException instanceof AggregationResultUtils.AggregationExtractionException
|| unwrappedException instanceof TransformConfigReloadingException) {
@ -834,9 +836,21 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
failIndexer(message);
}
/**
* Handle permanent bulk indexing exception case. This is error is irrecoverable.
*
* @param bulkIndexingException BulkIndexingException thrown
*/
private void handleIrrecoverableBulkIndexingException(BulkIndexingException bulkIndexingException) {
String message = TransformMessages.getMessage(
TransformMessages.LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR,
bulkIndexingException.getDetailedMessage()
);
failIndexer(message);
}
protected void failIndexer(String failureMessage) {
logger.error("[{}] transform has failed; experienced: [{}].", getJobId(), failureMessage);
auditor.error(getJobId(), failureMessage);
// note: logging and audit is done as part of context.markAsFailed
context.markAsFailed(failureMessage);
}

View File

@ -455,6 +455,8 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
listener.onResponse(null);
return;
}
logger.error("[{}] transform has failed; experienced: [{}].", transform.getId(), reason);
auditor.error(transform.getId(), reason);
// We should not keep retrying. Either the task will be stopped, or started
// If it is started again, it is registered again.

View File

@ -7,8 +7,12 @@
package org.elasticsearch.xpack.transform.utils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.index.mapper.MapperParsingException;
import java.util.Collection;
/**
* Set of static utils to find the cause of a search exception.
@ -53,6 +57,22 @@ public final class ExceptionRootCauseFinder {
return t.getMessage();
}
/**
* Return the first irrecoverableException from a collection of bulk responses if there are any.
*
* @param failures a collection of bulk item responses
* @return The first exception considered irrecoverable if there are any, null if no irrecoverable exception found
*/
public static Exception getFirstIrrecoverableExceptionFromBulkResponses(Collection<BulkItemResponse> failures) {
for (BulkItemResponse failure : failures) {
if (failure.getFailure().getCause() instanceof MapperParsingException) {
return failure.getFailure().getCause();
}
}
return null;
}
private ExceptionRootCauseFinder() {}
}

View File

@ -67,7 +67,9 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.matchesRegex;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.matches;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -398,7 +400,8 @@ public class TransformIndexerTests extends ESTestCase {
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
MockTransformAuditor auditor = new MockTransformAuditor();
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
TransformContext.Listener contextListener = mock(TransformContext.Listener.class);
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
MockedTransformIndexer indexer = createMockIndexer(
config,
@ -412,14 +415,7 @@ public class TransformIndexerTests extends ESTestCase {
);
final CountDownLatch latch = indexer.newLatch(1);
auditor.addExpectation(
new MockTransformAuditor.SeenAuditExpectation(
"fail indexer due to script error",
org.elasticsearch.xpack.core.common.notifications.Level.ERROR,
transformId,
"Failed to execute script with error: [*ArithmeticException: / by zero], stack trace: [stack]"
)
);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
@ -428,11 +424,15 @@ public class TransformIndexerTests extends ESTestCase {
latch.countDown();
assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
assertTrue(failIndexerCalled.get());
verify(contextListener, times(1)).fail(
matches("Failed to execute script with error: \\[.*ArithmeticException: / by zero\\], stack trace: \\[stack\\]"),
any()
);
assertThat(
failureMessage.get(),
matchesRegex("Failed to execute script with error: \\[.*ArithmeticException: / by zero\\], stack trace: \\[stack\\]")
);
auditor.assertAllExpectationsMatched();
} finally {
executor.shutdownNow();
}