diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index 7c7e5f0edec..4fb8ea6fafd 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -6,12 +6,7 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; import org.elasticsearch.client.Request; -import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.junit.Before; @@ -22,9 +17,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -730,45 +723,6 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase { assertEquals(4.47169811, actual.doubleValue(), 0.000001); } - @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/pull/44583") - public void testBulkIndexFailuresCauseTaskToFail() throws Exception { - String transformId = "bulk-failure-pivot"; - String dataFrameIndex = "pivot-failure-index"; - createPivotReviewsTransform(transformId, dataFrameIndex, null, null, null); - - try (XContentBuilder builder = jsonBuilder()) { - builder.startObject(); - { - builder.startObject("mappings") - .startObject("properties") - .startObject("reviewer") - // This type should cause mapping coercion type conflict on bulk index - .field("type", "long") - .endObject() - .endObject() - .endObject(); - } - builder.endObject(); - final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); - Request req = new Request("PUT", dataFrameIndex); - req.setEntity(entity); - client().performRequest(req); - } - startDataframeTransform(transformId, false, null); - - assertBusy(() -> assertEquals(DataFrameTransformTaskState.FAILED.value(), getDataFrameTaskState(transformId)), - 120, - TimeUnit.SECONDS); - - Map state = getDataFrameState(transformId); - assertThat((String) XContentMapValues.extractValue("state.reason", state), - containsString("task encountered more than 10 failures; latest failure: Bulk index experienced failures.")); - - // Force stop the transform as bulk indexing caused it to go into a failed state - stopDataFrameTransform(transformId, true); - deleteIndex(dataFrameIndex); - } - private void assertOnePivotValue(String query, double expected) throws IOException { Map searchResult = getAsMap(query); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index 54dea2fdab6..867fc6f7004 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -51,11 +51,9 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", BASIC_AUTH_VALUE_SUPER_USER).build(); } - protected void createReviewsIndex(String indexName) throws IOException { + protected void createReviewsIndex(String indexName, int numDocs) throws IOException { int[] distributionTable = {5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1}; - final int numDocs = 1000; - // create mapping try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -146,6 +144,10 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { createReviewsIndex(REVIEWS_INDEX_NAME); } + protected void createReviewsIndex(String indexName) throws IOException { + createReviewsIndex(indexName, 1000); + } + protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query) throws IOException { createPivotReviewsTransform(transformId, dataFrameIndex, query, null); } diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java index 0690fbd8fca..0bf668ddadd 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.dataframe.integration; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.Strings; @@ -17,6 +16,7 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.util.List; @@ -27,13 +27,21 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.oneOf; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/44583") public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { private static final String TRANSFORM_ID = "failure_pivot_1"; + @Before + public void setClusterSettings() throws IOException { + // Make sure we never retry on failure to speed up the test + Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings"); + addFailureRetrySetting.setJsonEntity( + "{\"persistent\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"}}"); + client().performRequest(addFailureRetrySetting); + } + @After public void cleanUpPotentiallyFailedTransform() throws Exception { // If the tests failed in the middle, we should force stop it. This prevents other transform tests from failing due @@ -43,14 +51,14 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { } public void testForceStopFailedTransform() throws Exception { - createReviewsIndex(); + createReviewsIndex(REVIEWS_INDEX_NAME, 10); String dataFrameIndex = "failure_pivot_reviews"; createDestinationIndexWithBadMapping(dataFrameIndex); createContinuousPivotReviewsTransform(TRANSFORM_ID, dataFrameIndex, null); startDataframeTransform(TRANSFORM_ID, false); awaitState(TRANSFORM_ID, DataFrameTransformTaskState.FAILED); Map fullState = getDataFrameState(TRANSFORM_ID); - final String failureReason = "task encountered more than 10 failures; latest failure: " + + final String failureReason = "task encountered more than 0 failures; latest failure: " + "Bulk index experienced failures. See the logs of the node running the transform for details."; // Verify we have failed for the expected reason assertThat(XContentMapValues.extractValue("state.reason", fullState), @@ -69,21 +77,20 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { awaitState(TRANSFORM_ID, DataFrameTransformTaskState.STOPPED); fullState = getDataFrameState(TRANSFORM_ID); - // Verify we have failed for the expected reason assertThat(XContentMapValues.extractValue("state.reason", fullState), is(nullValue())); } public void testForceStartFailedTransform() throws Exception { - createReviewsIndex(); + createReviewsIndex(REVIEWS_INDEX_NAME, 10); String dataFrameIndex = "failure_pivot_reviews"; createDestinationIndexWithBadMapping(dataFrameIndex); createContinuousPivotReviewsTransform(TRANSFORM_ID, dataFrameIndex, null); startDataframeTransform(TRANSFORM_ID, false); awaitState(TRANSFORM_ID, DataFrameTransformTaskState.FAILED); Map fullState = getDataFrameState(TRANSFORM_ID); - final String failureReason = "task encountered more than 10 failures; latest failure: " + + final String failureReason = "task encountered more than 0 failures; latest failure: " + "Bulk index experienced failures. See the logs of the node running the transform for details."; // Verify we have failed for the expected reason assertThat(XContentMapValues.extractValue("state.reason", fullState), @@ -101,23 +108,15 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { deleteIndex(dataFrameIndex); // Force start the data frame to indicate failure correction startDataframeTransform(TRANSFORM_ID, true); - // Wait for data to be indexed appropriately and refresh for search - waitForDataFrameCheckpoint(TRANSFORM_ID); - refreshIndex(dataFrameIndex); // Verify that we have started and that our reason is cleared fullState = getDataFrameState(TRANSFORM_ID); assertThat(XContentMapValues.extractValue("state.reason", fullState), is(nullValue())); assertThat(XContentMapValues.extractValue("state.task_state", fullState), equalTo("started")); - assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), equalTo("started")); - assertThat((int)XContentMapValues.extractValue("stats.index_failures", fullState), greaterThan(0)); + assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), is(oneOf("started", "indexing"))); + assertThat(XContentMapValues.extractValue("stats.index_failures", fullState), equalTo(1)); - // get and check some users to verify we restarted - assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417); - assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72); - assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846); - assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769); - assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918); + stopDataFrameTransform(TRANSFORM_ID, true); } private void awaitState(String transformId, DataFrameTransformTaskState state) throws Exception { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java index 77aceaac74f..62e50bcf4ae 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java @@ -219,8 +219,15 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu assert dataFrameAuditor.get() != null; assert dataFrameTransformsCheckpointService.get() != null; - return Collections.singletonList(new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(), - dataFrameTransformsCheckpointService.get(), schedulerEngine.get(), dataFrameAuditor.get(), threadPool)); + return Collections.singletonList( + new DataFrameTransformPersistentTasksExecutor(client, + dataFrameTransformsConfigManager.get(), + dataFrameTransformsCheckpointService.get(), + schedulerEngine.get(), + dataFrameAuditor.get(), + threadPool, + clusterService, + settingsModule.getSettings())); } public List> getSettings() { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java index 07bfd151e35..f8e3a3f1e85 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java @@ -32,7 +32,6 @@ public class TransportStartDataFrameTransformTaskAction extends TransportTasksAction { - private volatile int numFailureRetries; private final XPackLicenseState licenseState; @Inject @@ -42,8 +41,6 @@ public class TransportStartDataFrameTransformTaskAction extends StartDataFrameTransformTaskAction.Request::new, StartDataFrameTransformTaskAction.Response::new, StartDataFrameTransformTaskAction.Response::new, ThreadPool.Names.SAME); this.licenseState = licenseState; - clusterService.getClusterSettings() - .addSettingsUpdateConsumer(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING, this::setNumFailureRetries); } @Override @@ -62,7 +59,7 @@ public class TransportStartDataFrameTransformTaskAction extends protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask, ActionListener listener) { if (transformTask.getTransformId().equals(request.getId())) { - transformTask.setNumFailureRetries(numFailureRetries).start(null, listener); + transformTask.start(null, listener); } else { listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId() + "] does not match request's ID [" + request.getId() + "]")); @@ -93,8 +90,4 @@ public class TransportStartDataFrameTransformTaskAction extends boolean allStarted = tasks.stream().allMatch(StartDataFrameTransformTaskAction.Response::isStarted); return new StartDataFrameTransformTaskAction.Response(allStarted); } - - void setNumFailureRetries(int numFailureRetries) { - this.numFailureRetries = numFailureRetries; - } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 6942d09cd0e..1c52d51921b 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -18,7 +18,9 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -61,13 +63,16 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; private final DataFrameAuditor auditor; + private volatile int numFailureRetries; public DataFrameTransformPersistentTasksExecutor(Client client, DataFrameTransformsConfigManager transformsConfigManager, DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService, SchedulerEngine schedulerEngine, DataFrameAuditor auditor, - ThreadPool threadPool) { + ThreadPool threadPool, + ClusterService clusterService, + Settings settings) { super(DataFrameField.TASK_NAME, DataFrame.TASK_THREAD_POOL_NAME); this.client = client; this.transformsConfigManager = transformsConfigManager; @@ -75,6 +80,9 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx this.schedulerEngine = schedulerEngine; this.auditor = auditor; this.threadPool = threadPool; + this.numFailureRetries = DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING.get(settings); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING, this::setNumFailureRetries); } @Override @@ -286,7 +294,11 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx Long previousCheckpoint, ActionListener listener) { buildTask.initializeIndexer(indexerBuilder); - buildTask.start(previousCheckpoint, listener); + buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener); + } + + private void setNumFailureRetries(int numFailureRetries) { + this.numFailureRetries = numFailureRetries; } @Override diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index f99d618e4a9..59db069a7dc 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -329,19 +329,33 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } synchronized void markAsFailed(String reason, ActionListener listener) { - taskState.set(DataFrameTransformTaskState.FAILED); - stateReason.set(reason); auditor.error(transform.getId(), reason); // We should not keep retrying. Either the task will be stopped, or started // If it is started again, it is registered again. deregisterSchedulerJob(); + DataFrameTransformState newState = new DataFrameTransformState( + DataFrameTransformTaskState.FAILED, + initialIndexerState, + initialPosition, + currentCheckpoint.get(), + reason, + getIndexer() == null ? null : getIndexer().getProgress()); // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate // This keeps track of STARTED, FAILED, STOPPED // This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that // we could not read the previous state information from said index. - persistStateToClusterState(getState(), ActionListener.wrap( - r -> listener.onResponse(null), - listener::onFailure + persistStateToClusterState(newState, ActionListener.wrap( + r -> { + taskState.set(DataFrameTransformTaskState.FAILED); + stateReason.set(reason); + listener.onResponse(null); + }, + e -> { + logger.error("Failed to set task state as failed to cluster state", e); + taskState.set(DataFrameTransformTaskState.FAILED); + stateReason.set(reason); + listener.onFailure(e); + } )); } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java index 1186c3972b0..3d8b290ad80 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java @@ -21,6 +21,8 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -42,6 +44,7 @@ import java.util.List; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase { @@ -99,12 +102,17 @@ public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase { DataFrameTransformsConfigManager transformsConfigManager = new DataFrameTransformsConfigManager(client, xContentRegistry()); DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService = new DataFrameTransformsCheckpointService(client, transformsConfigManager); - + ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY, + Collections.singleton(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING)); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(cSettings); DataFrameTransformPersistentTasksExecutor executor = new DataFrameTransformPersistentTasksExecutor(client, transformsConfigManager, dataFrameTransformsCheckpointService, mock(SchedulerEngine.class), new DataFrameAuditor(client, ""), - mock(ThreadPool.class)); + mock(ThreadPool.class), + clusterService, + Settings.EMPTY); assertThat(executor.getAssignment(new DataFrameTransform("new-task-id", Version.CURRENT, null), cs).getExecutorNode(), equalTo("current-data-node-with-1-tasks"));