diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index 4fb8ea6fafd..0a6ebe0b9ae 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -6,7 +6,12 @@ package org.elasticsearch.xpack.dataframe.integration; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; import org.elasticsearch.client.Request; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.junit.Before; @@ -17,7 +22,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -723,6 +730,44 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase { assertEquals(4.47169811, actual.doubleValue(), 0.000001); } + public void testBulkIndexFailuresCauseTaskToFail() throws Exception { + String transformId = "bulk-failure-pivot"; + String dataFrameIndex = "pivot-failure-index"; + createPivotReviewsTransform(transformId, dataFrameIndex, null, null, null); + + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("mappings") + .startObject("properties") + .startObject("reviewer") + // This type should cause mapping coercion type conflict on bulk index + .field("type", "long") + .endObject() + .endObject() + .endObject(); + } + builder.endObject(); + final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); + Request req = new Request("PUT", dataFrameIndex); + req.setEntity(entity); + client().performRequest(req); + } + startDataframeTransform(transformId, false, null); + + assertBusy(() -> assertEquals(DataFrameTransformTaskState.FAILED.value(), getDataFrameTaskState(transformId)), + 120, + TimeUnit.SECONDS); + + Map state = getDataFrameState(transformId); + assertThat((String) XContentMapValues.extractValue("state.reason", state), + containsString("task encountered more than 10 failures; latest failure: Bulk index experienced failures.")); + + // Force stop the transform as bulk indexing caused it to go into a failed state + stopDataFrameTransform(transformId, true); + deleteIndex(dataFrameIndex); + } + private void assertOnePivotValue(String query, double expected) throws IOException { Map searchResult = getAsMap(query); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index 1b068c93364..81afc161e14 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -211,7 +211,8 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { + " \"avg_rating\": {" + " \"avg\": {" + " \"field\": \"stars\"" - + " } } } }" + + " } } } }," + + "\"frequency\":\"1s\"" + "}"; createDataframeTransformRequest.setJsonEntity(config); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 3b9f47ba397..a5fda55eb73 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -591,21 +591,31 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S BulkAction.INSTANCE, request, ActionListener.wrap(bulkResponse -> { - if (bulkResponse.hasFailures() && auditBulkFailures) { + if (bulkResponse.hasFailures()) { int failureCount = 0; for(BulkItemResponse item : bulkResponse.getItems()) { if (item.isFailed()) { failureCount++; } + // TODO gather information on irrecoverable failures and update isIrrecoverableFailure } - auditor.warning(transformId, - "Experienced at least [" + - failureCount + - "] bulk index failures. See the logs of the node running the transform for details. " + - bulkResponse.buildFailureMessage()); - auditBulkFailures = false; + if (auditBulkFailures) { + auditor.warning(transformId, + "Experienced at least [" + + failureCount + + "] bulk index failures. See the logs of the node running the transform for details. " + + bulkResponse.buildFailureMessage()); + auditBulkFailures = false; + } + // This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure + // It increments the indexing failure, and then calls the `onFailure` logic + nextPhase.onFailure( + new BulkIndexingException("Bulk index experienced failures. " + + "See the logs of the node running the transform for details.")); + } else { + auditBulkFailures = true; + nextPhase.onResponse(bulkResponse); } - nextPhase.onResponse(bulkResponse); }, nextPhase::onFailure)); } @@ -817,4 +827,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S }, e -> {})); } } + + // Considered a recoverable indexing failure + private static class BulkIndexingException extends ElasticsearchException { + BulkIndexingException(String msg, Object... args) { + super(msg, args); + } + } }