[ML][Data Frame] treat bulk index failures as an indexing failure (#44351) (#44427)

* [ML][Data Frame] treat bulk index failures as an indexing failure

* removing redundant public modifier

* changing to an ElasticsearchException

* fixing redundant public modifier
This commit is contained in:
Benjamin Trent 2019-07-16 10:04:28 -05:00 committed by GitHub
parent a2b4687d89
commit 858dbfc074
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 9 deletions

View File

@ -6,7 +6,12 @@
package org.elasticsearch.xpack.dataframe.integration;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;
@ -17,7 +22,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -723,6 +730,44 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
assertEquals(4.47169811, actual.doubleValue(), 0.000001);
}
public void testBulkIndexFailuresCauseTaskToFail() throws Exception {
String transformId = "bulk-failure-pivot";
String dataFrameIndex = "pivot-failure-index";
createPivotReviewsTransform(transformId, dataFrameIndex, null, null, null);
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("mappings")
.startObject("properties")
.startObject("reviewer")
// This type should cause mapping coercion type conflict on bulk index
.field("type", "long")
.endObject()
.endObject()
.endObject();
}
builder.endObject();
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
Request req = new Request("PUT", dataFrameIndex);
req.setEntity(entity);
client().performRequest(req);
}
startDataframeTransform(transformId, false, null);
assertBusy(() -> assertEquals(DataFrameTransformTaskState.FAILED.value(), getDataFrameTaskState(transformId)),
120,
TimeUnit.SECONDS);
Map<?, ?> state = getDataFrameState(transformId);
assertThat((String) XContentMapValues.extractValue("state.reason", state),
containsString("task encountered more than 10 failures; latest failure: Bulk index experienced failures."));
// Force stop the transform as bulk indexing caused it to go into a failed state
stopDataFrameTransform(transformId, true);
deleteIndex(dataFrameIndex);
}
private void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);

View File

@ -211,7 +211,8 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } }"
+ " } } } },"
+ "\"frequency\":\"1s\""
+ "}";
createDataframeTransformRequest.setJsonEntity(config);

View File

@ -591,21 +591,31 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
BulkAction.INSTANCE,
request,
ActionListener.wrap(bulkResponse -> {
if (bulkResponse.hasFailures() && auditBulkFailures) {
if (bulkResponse.hasFailures()) {
int failureCount = 0;
for(BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
failureCount++;
}
// TODO gather information on irrecoverable failures and update isIrrecoverableFailure
}
auditor.warning(transformId,
"Experienced at least [" +
failureCount +
"] bulk index failures. See the logs of the node running the transform for details. " +
bulkResponse.buildFailureMessage());
auditBulkFailures = false;
if (auditBulkFailures) {
auditor.warning(transformId,
"Experienced at least [" +
failureCount +
"] bulk index failures. See the logs of the node running the transform for details. " +
bulkResponse.buildFailureMessage());
auditBulkFailures = false;
}
// This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure
// It increments the indexing failure, and then calls the `onFailure` logic
nextPhase.onFailure(
new BulkIndexingException("Bulk index experienced failures. " +
"See the logs of the node running the transform for details."));
} else {
auditBulkFailures = true;
nextPhase.onResponse(bulkResponse);
}
nextPhase.onResponse(bulkResponse);
}, nextPhase::onFailure));
}
@ -817,4 +827,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}, e -> {}));
}
}
// Considered a recoverable indexing failure
private static class BulkIndexingException extends ElasticsearchException {
BulkIndexingException(String msg, Object... args) {
super(msg, args);
}
}
}