[Transform] improve error handling of script errors (#48887)

improve error handling for script errors, treating it as irrecoverable errors which puts the task
immediately into failed state, also improves the error extraction to properly report the script 
error.

fixes #48467
This commit is contained in:
Hendrik Muhs 2019-11-18 09:57:56 +01:00
parent 23a26cede5
commit ca912624ec
5 changed files with 285 additions and 129 deletions

View File

@ -77,6 +77,8 @@ public class TransformMessages {
public static final String LOG_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE =
"Insufficient memory for search after repeated page size reductions to [{0}], unable to continue pivot, "
+ "please simplify job or increase heap size on data nodes.";
public static final String LOG_TRANSFORM_PIVOT_SCRIPT_ERROR =
"Failed to execute script with error: [{0}], stack trace: {1}";
public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS =
"Failed to parse transform checkpoints for [{0}]";

View File

@ -28,21 +28,28 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.matchesRegex;
public class TransformTaskFailedStateIT extends TransformRestTestCase {
private final List<String> failureTransforms = new ArrayList<>();
@Before
public void setClusterSettings() throws IOException {
// Make sure we never retry on failure to speed up the test
// Set logging level to trace
// see: https://github.com/elastic/elasticsearch/issues/45562
Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings");
addFailureRetrySetting.setJsonEntity(
"{\"transient\": {\"xpack.transform.num_transform_failure_retries\": \"" + 0 + "\"," +
"\"logger.org.elasticsearch.action.bulk\": \"info\"," + // reduces bulk failure spam
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," +
"\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}");
addFailureRetrySetting
.setJsonEntity(
"{\"transient\": {\"xpack.transform.num_transform_failure_retries\": \""
+ 0
+ "\","
+ "\"logger.org.elasticsearch.action.bulk\": \"info\","
+ // reduces bulk failure spam
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\","
+ "\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}"
);
client().performRequest(addFailureRetrySetting);
}
@ -66,18 +73,22 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
startDataframeTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getDataFrameState(transformId);
final String failureReason = "task encountered more than 0 failures; latest failure: " +
"Bulk index experienced failures. See the logs of the node running the transform for details.";
final String failureReason = "task encountered more than 0 failures; latest failure: "
+ ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details.";
// Verify we have failed for the expected reason
assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason));
assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason));
// verify that we cannot stop a failed transform
ResponseException ex = expectThrows(ResponseException.class, () -> stopTransform(transformId, false));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
equalTo("Unable to stop transform [test-force-stop-failed-transform] as it is in a failed state with reason [" +
failureReason +
"]. Use force stop to stop the transform."));
assertThat(
(String) XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
matchesRegex(
"Unable to stop transform \\[test-force-stop-failed-transform\\] as it is in a failed state with reason \\["
+ failureReason
+ "\\]. Use force stop to stop the transform."
)
);
// Verify that we can force stop a failed transform
stopTransform(transformId, true);
@ -97,20 +108,23 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
startDataframeTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getDataFrameState(transformId);
final String failureReason = "task encountered more than 0 failures; latest failure: " +
"Bulk index experienced failures. See the logs of the node running the transform for details.";
final String failureReason = "task encountered more than 0 failures; latest failure: "
+ ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details.";
// Verify we have failed for the expected reason
assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason));
assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason));
final String expectedFailure = "Unable to start transform [test-force-start-failed-transform] " +
"as it is in a failed state with failure: [" + failureReason +
"]. Use force stop and then restart the transform once error is resolved.";
final String expectedFailure = "Unable to start transform \\[test-force-start-failed-transform\\] "
+ "as it is in a failed state with failure: \\["
+ failureReason
+ "\\]. Use force stop and then restart the transform once error is resolved.";
// Verify that we cannot start the transform when the task is in a failed state
assertBusy(() -> {
ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
equalTo(expectedFailure));
assertThat(
(String) XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
matchesRegex(expectedFailure)
);
}, 60, TimeUnit.SECONDS);
stopTransform(transformId, true);
@ -128,7 +142,8 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("mappings")
builder
.startObject("mappings")
.startObject("properties")
.startObject("reviewer")
.field("type", "long")

View File

@ -13,10 +13,8 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreakingException;
@ -24,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.script.ScriptException;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
@ -45,6 +44,7 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils;
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;
import java.io.IOException;
import java.io.UncheckedIOException;
@ -66,7 +66,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
* which query filters to run and which index requests to send
*/
private enum RunState {
// do a complete query/index, this is used for batch data frames and for bootstraping (1st run)
// do a complete query/index, this is used for batch transforms and for bootstrapping (1st run)
FULL_RUN,
// Partial run modes in 2 stages:
@ -422,7 +422,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
default:
// Any other state is a bug, should not happen
logger.warn("[{}] Encountered unexpected run state [{}]", getJobId(), runState);
throw new IllegalStateException("DataFrame indexer job encountered an illegal state [" + runState + "]");
throw new IllegalStateException("Transform indexer job encountered an illegal state [" + runState + "]");
}
}
@ -468,25 +468,36 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
synchronized void handleFailure(Exception e) {
logger.warn(new ParameterizedMessage("[{}] transform encountered an exception: ", getJobId()), e);
if (handleCircuitBreakingException(e)) {
return;
}
Throwable unwrappedException = ExceptionRootCauseFinder.getRootCauseException(e);
if (isIrrecoverableFailure(e) || context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
String failureMessage = isIrrecoverableFailure(e)
? "task encountered irrecoverable failure: " + e.getMessage()
: "task encountered more than " + context.getNumFailureRetries() + " failures; latest failure: " + e.getMessage();
failIndexer(failureMessage);
if (unwrappedException instanceof CircuitBreakingException) {
handleCircuitBreakingException((CircuitBreakingException) unwrappedException);
} else if (unwrappedException instanceof ScriptException) {
handleScriptException((ScriptException) unwrappedException);
// irrecoverable error without special handling
} else if (unwrappedException instanceof IndexNotFoundException
|| unwrappedException instanceof AggregationResultUtils.AggregationExtractionException
|| unwrappedException instanceof TransformConfigReloadingException) {
failIndexer("task encountered irrecoverable failure: " + e.getMessage());
} else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
failIndexer(
"task encountered more than "
+ context.getNumFailureRetries()
+ " failures; latest failure: "
+ ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
);
} else {
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);
auditor
.warning(
getJobId(),
"Transform encountered an exception: " + e.getMessage() + " Will attempt again at next scheduled trigger."
"Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
);
lastAuditedExceptionMessage = e.getMessage();
lastAuditedExceptionMessage = message;
}
}
}
@ -510,12 +521,6 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
}));
}
private boolean isIrrecoverableFailure(Exception e) {
return e instanceof IndexNotFoundException
|| e instanceof AggregationResultUtils.AggregationExtractionException
|| e instanceof TransformConfigReloadingException;
}
private IterationResult<TransformIndexerPosition> processBuckets(final CompositeAggregation agg) {
// we reached the end
if (agg.getBuckets().isEmpty()) {
@ -536,7 +541,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
agg.getBuckets().isEmpty()
);
// NOTE: progress is also mutated in ClientDataFrameIndexer#onFinished
// NOTE: progress is also mutated in onFinish
if (progress != null) {
progress.incrementDocsProcessed(getStats().getNumDocuments() - docsBeforeProcess);
progress.incrementDocsIndexed(result.getToIndex().size());
@ -671,7 +676,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
default:
// Any other state is a bug, should not happen
logger.warn("Encountered unexpected run state [" + runState + "]");
throw new IllegalStateException("DataFrame indexer job encountered an illegal state [" + runState + "]");
throw new IllegalStateException("Transform indexer job encountered an illegal state [" + runState + "]");
}
searchRequest.source(sourceBuilder);
@ -756,16 +761,9 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
* Implementation details: We take the values from the circuit breaker as a hint, but
* note that it breaks early, that's why we also reduce using
*
* @param e Exception thrown, only {@link CircuitBreakingException} are handled
* @return true if exception was handled, false if not
* @param circuitBreakingException CircuitBreakingException thrown
*/
protected boolean handleCircuitBreakingException(Exception e) {
CircuitBreakingException circuitBreakingException = getCircuitBreakingException(e);
if (circuitBreakingException == null) {
return false;
}
private void handleCircuitBreakingException(CircuitBreakingException circuitBreakingException) {
double reducingFactor = Math
.min(
(double) circuitBreakingException.getByteLimit() / circuitBreakingException.getBytesWanted(),
@ -777,15 +775,29 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
if (newPageSize < MINIMUM_PAGE_SIZE) {
String message = TransformMessages.getMessage(TransformMessages.LOG_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE, pageSize);
failIndexer(message);
return true;
return;
}
String message = TransformMessages.getMessage(TransformMessages.LOG_TRANSFORM_PIVOT_REDUCE_PAGE_SIZE, pageSize, newPageSize);
auditor.info(getJobId(), message);
logger.info("Data frame transform [" + getJobId() + "]:" + message);
logger.info("[{}] {}", getJobId(), message);
pageSize = newPageSize;
return true;
return;
}
/**
* Handle script exception case. This is error is irrecoverable.
*
* @param scriptException ScriptException thrown
*/
private void handleScriptException(ScriptException scriptException) {
String message = TransformMessages
.getMessage(
TransformMessages.LOG_TRANSFORM_PIVOT_SCRIPT_ERROR,
scriptException.getDetailedMessage(),
scriptException.getScriptStack()
);
failIndexer(message);
}
protected void failIndexer(String failureMessage) {
@ -818,7 +830,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
}
private RunState determineRunStateAtStart() {
// either 1st run or not a continuous data frame
// either 1st run or not a continuous transform
if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) {
return RunState.FULL_RUN;
}
@ -832,32 +844,6 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
return RunState.PARTIAL_RUN_IDENTIFY_CHANGES;
}
/**
* Inspect exception for circuit breaking exception and return the first one it can find.
*
* @param e Exception
* @return CircuitBreakingException instance if found, null otherwise
*/
private static CircuitBreakingException getCircuitBreakingException(Exception e) {
// circuit breaking exceptions are at the bottom
Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(e);
if (unwrappedThrowable instanceof CircuitBreakingException) {
return (CircuitBreakingException) unwrappedThrowable;
} else if (unwrappedThrowable instanceof SearchPhaseExecutionException) {
SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) e;
for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) {
Throwable unwrappedShardFailure = org.elasticsearch.ExceptionsHelper.unwrapCause(shardFailure.getCause());
if (unwrappedShardFailure instanceof CircuitBreakingException) {
return (CircuitBreakingException) unwrappedShardFailure;
}
}
}
return null;
}
static class TransformConfigReloadingException extends ElasticsearchException {
TransformConfigReloadingException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);

View File

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.transform.utils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
/**
* Set of static utils to find the cause of a search exception.
*/
public final class ExceptionRootCauseFinder {
/**
* Unwrap the exception stack and return the most likely cause.
*
* @param t raw Throwable
* @return unwrapped throwable if possible
*/
public static Throwable getRootCauseException(Throwable t) {
// circuit breaking exceptions are at the bottom
Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(t);
if (unwrappedThrowable instanceof SearchPhaseExecutionException) {
SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) t;
for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) {
Throwable unwrappedShardFailure = org.elasticsearch.ExceptionsHelper.unwrapCause(shardFailure.getCause());
if (unwrappedShardFailure instanceof ElasticsearchException) {
return unwrappedShardFailure;
}
}
}
return t;
}
/**
* Return the best error message possible given a already unwrapped exception.
*
* @param t the throwable
* @return the message string of the given throwable
*/
public static String getDetailedMessage(Throwable t) {
if (t instanceof ElasticsearchException) {
return ((ElasticsearchException) t).getDetailedMessage();
}
return t.getMessage();
}
private ExceptionRootCauseFinder() {}
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.breaker.CircuitBreaker.Durability;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.script.ScriptException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
@ -36,6 +37,7 @@ import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfig
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
@ -51,10 +53,12 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig;
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig;
import static org.hamcrest.CoreMatchers.is;
@ -62,6 +66,7 @@ import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.matchesRegex;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -75,7 +80,7 @@ public class TransformIndexerTests extends ESTestCase {
private final Function<SearchRequest, SearchResponse> searchFunction;
private final Function<BulkRequest, BulkResponse> bulkFunction;
private final Consumer<Exception> failureConsumer;
private final Consumer<String> failureConsumer;
// used for synchronizing with the test
private CountDownLatch latch;
@ -94,7 +99,7 @@ public class TransformIndexerTests extends ESTestCase {
TransformContext context,
Function<SearchRequest, SearchResponse> searchFunction,
Function<BulkRequest, BulkResponse> bulkFunction,
Consumer<Exception> failureConsumer
Consumer<String> failureConsumer
) {
super(
executor,
@ -174,14 +179,12 @@ public class TransformIndexerTests extends ESTestCase {
@Override
protected void onFailure(Exception exc) {
try {
// mimic same behavior as {@link TransformTask}
if (handleCircuitBreakingException(exc)) {
return;
}
failureConsumer.accept(exc);
super.onFailure(exc);
} catch (Exception e) {
fail("Internal error: " + e.getMessage());
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw, true);
e.printStackTrace(pw);
fail("Unexpected failure: " + e.getMessage() + " Trace: " + sw.getBuffer().toString());
}
}
@ -198,7 +201,12 @@ public class TransformIndexerTests extends ESTestCase {
@Override
protected void failIndexer(String message) {
fail("failIndexer should not be called, received error: " + message);
if (failureConsumer != null) {
failureConsumer.accept(message);
super.failIndexer(message);
} else {
fail("failIndexer should not be called, received error: " + message);
}
}
}
@ -238,33 +246,20 @@ public class TransformIndexerTests extends ESTestCase {
Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
Consumer<Exception> failureConsumer = e -> {
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw, true);
e.printStackTrace(pw);
fail("expected circuit breaker exception to be handled, got:" + e + " Trace: " + sw.getBuffer().toString());
};
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
TransformAuditor auditor = new TransformAuditor(client, "node_1");
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
MockedTransformIndexer indexer = new MockedTransformIndexer(
executor,
mock(TransformConfigManager.class),
mock(CheckpointProvider.class),
new TransformProgressGatherer(client),
MockedTransformIndexer indexer = createMockIndexer(
config,
Collections.emptyMap(),
auditor,
state,
null,
new TransformIndexerStats(),
context,
searchFunction,
bulkFunction,
failureConsumer
null,
executor,
auditor,
context
);
final CountDownLatch latch = indexer.newLatch(1);
indexer.start();
@ -333,33 +328,20 @@ public class TransformIndexerTests extends ESTestCase {
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> searchResponse;
Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
Consumer<Exception> failureConsumer = e -> {
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw, true);
e.printStackTrace(pw);
fail(e.getMessage());
};
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
TransformAuditor auditor = mock(TransformAuditor.class);
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
MockedTransformIndexer indexer = new MockedTransformIndexer(
executor,
mock(TransformConfigManager.class),
mock(CheckpointProvider.class),
new TransformProgressGatherer(client),
MockedTransformIndexer indexer = createMockIndexer(
config,
Collections.emptyMap(),
auditor,
state,
null,
new TransformIndexerStats(),
context,
searchFunction,
bulkFunction,
failureConsumer
null,
executor,
auditor,
context
);
IterationResult<TransformIndexerPosition> newPosition = indexer.doProcess(searchResponse);
@ -372,4 +354,117 @@ public class TransformIndexerTests extends ESTestCase {
}
}
public void testScriptError() throws Exception {
Integer pageSize = randomBoolean() ? null : randomIntBetween(500, 10_000);
String transformId = randomAlphaOfLength(10);
TransformConfig config = new TransformConfig(
transformId,
randomSourceConfig(),
randomDestConfig(),
null,
null,
null,
new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)
);
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
throw new SearchPhaseExecutionException(
"query",
"Partial shards failure",
new ShardSearchFailure[] {
new ShardSearchFailure(
new ScriptException(
"runtime error",
new ArithmeticException("/ by zero"),
singletonList("stack"),
"test",
"painless"
)
) }
);
};
Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
final AtomicBoolean failIndexerCalled = new AtomicBoolean(false);
final AtomicReference<String> failureMessage = new AtomicReference<>();
Consumer<String> failureConsumer = message -> {
failIndexerCalled.compareAndSet(false, true);
failureMessage.compareAndSet(null, message);
};
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
MockTransformAuditor auditor = new MockTransformAuditor();
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
MockedTransformIndexer indexer = createMockIndexer(
config,
state,
searchFunction,
bulkFunction,
failureConsumer,
executor,
auditor,
context
);
final CountDownLatch latch = indexer.newLatch(1);
auditor
.addExpectation(
new MockTransformAuditor.SeenAuditExpectation(
"fail indexer due to script error",
org.elasticsearch.xpack.core.common.notifications.Level.ERROR,
transformId,
"Failed to execute script with error: [*ArithmeticException: / by zero], stack trace: [stack]"
)
);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
latch.countDown();
assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
assertTrue(failIndexerCalled.get());
assertThat(
failureMessage.get(),
matchesRegex("Failed to execute script with error: \\[.*ArithmeticException: / by zero\\], stack trace: \\[stack\\]")
);
auditor.assertAllExpectationsMatched();
} finally {
executor.shutdownNow();
}
}
private MockedTransformIndexer createMockIndexer(
TransformConfig config,
AtomicReference<IndexerState> state,
Function<SearchRequest, SearchResponse> searchFunction,
Function<BulkRequest, BulkResponse> bulkFunction,
Consumer<String> failureConsumer,
final ExecutorService executor,
TransformAuditor auditor,
TransformContext context
) {
return new MockedTransformIndexer(
executor,
mock(TransformConfigManager.class),
mock(CheckpointProvider.class),
new TransformProgressGatherer(client),
config,
Collections.emptyMap(),
auditor,
state,
null,
new TransformIndexerStats(),
context,
searchFunction,
bulkFunction,
failureConsumer
);
}
}