diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 8374edbbf21..b1d86451936 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -305,7 +305,8 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx ActionListener listener) { buildTask.initializeIndexer(indexerBuilder); // DataFrameTransformTask#start will fail if the task state is FAILED - buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener); + // Will continue to attempt to start the indexer, even if the state is STARTED + buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, false, listener); } private void setNumFailureRetries(int numFailureRetries) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index bc39cbab042..a31c148a1ed 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -219,13 +219,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S )); } - /** - * Start the background indexer and set the task's state to started - * @param startingCheckpoint Set the current checkpoint to this value. If null the - * current checkpoint is not set - * @param listener Started listener - */ - public synchronized void start(Long startingCheckpoint, boolean force, ActionListener listener) { + // Here `failOnConflict` is usually true, except when the initial start is called when the task is assigned to the node + synchronized void start(Long startingCheckpoint, boolean force, boolean failOnConflict, ActionListener listener) { logger.debug("[{}] start called with force [{}] and state [{}].", getTransformId(), force, getState()); if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) { listener.onFailure(new ElasticsearchStatusException( @@ -249,7 +244,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S return; } // If we are already in a `STARTED` state, we should not attempt to call `.start` on the indexer again. - if (taskState.get() == DataFrameTransformTaskState.STARTED) { + if (taskState.get() == DataFrameTransformTaskState.STARTED && failOnConflict) { listener.onFailure(new ElasticsearchStatusException( "Cannot start transform [{}] as it is already STARTED.", RestStatus.CONFLICT, @@ -260,7 +255,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S final IndexerState newState = getIndexer().start(); if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) { listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]", - transform.getId(), newState)); + transform.getId(), newState)); return; } stateReason.set(null); @@ -298,10 +293,20 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc); getIndexer().stop(); listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform [" - + transform.getId() + "] to [" + state.getIndexerState() + "].", exc)); + + transform.getId() + "] to [" + state.getIndexerState() + "].", exc)); } )); } + /** + * Start the background indexer and set the task's state to started + * @param startingCheckpoint Set the current checkpoint to this value. If null the + * current checkpoint is not set + * @param force Whether to force start a failed task or not + * @param listener Started listener + */ + public synchronized void start(Long startingCheckpoint, boolean force, ActionListener listener) { + start(startingCheckpoint, force, true, listener); + } public synchronized void stop(boolean force) { logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState()); diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java index a4a5025a139..6ba042a07ba 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java @@ -7,10 +7,10 @@ package org.elasticsearch.upgrades; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.client.core.IndexerState; import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats; @@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.xpack.test.rest.XPackRestTestConstants; @@ -37,7 +38,9 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -48,7 +51,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.oneOf; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/43662") public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version")); @@ -79,12 +81,19 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { * index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade */ public void testDataFramesRollingUpgrade() throws Exception { - assumeTrue("Continuous data frames not supported until 7.3", UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_3_0)); + assumeTrue("Continuous data frames time sync not fixed until 7.4", UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_4_0)); + Request adjustLoggingLevels = new Request("PUT", "/_cluster/settings"); + adjustLoggingLevels.setJsonEntity( + "{\"transient\": {" + + "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," + + "\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}"); + client().performRequest(adjustLoggingLevels); Request waitForYellow = new Request("GET", "/_cluster/health"); waitForYellow.addParameter("wait_for_nodes", "3"); waitForYellow.addParameter("wait_for_status", "yellow"); switch (CLUSTER_TYPE) { case OLD: + client().performRequest(waitForYellow); createAndStartContinuousDataFrame(); break; case MIXED: @@ -113,15 +122,15 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { private void createAndStartContinuousDataFrame() throws Exception { createIndex(CONTINUOUS_DATA_FRAME_SOURCE); - long totalDocsWritten = 0; + long totalDocsWrittenSum = 0; for (TimeValue bucket : BUCKETS) { int docs = randomIntBetween(1, 25); putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, bucket, ENTITIES); - totalDocsWritten += docs * ENTITIES.size(); + totalDocsWrittenSum += docs * ENTITIES.size(); } - + long totalDocsWritten = totalDocsWrittenSum; DataFrameTransformConfig config = DataFrameTransformConfig.builder() - .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(30))) + .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) .setPivotConfig(PivotConfig.builder() .setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("stars").field("stars"))) .setGroups(GroupConfig.builder().groupBy("user_id", TermsGroupSource.builder().setField("user_id").build()).build()) @@ -129,19 +138,28 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { .setDest(DestConfig.builder().setIndex(CONTINUOUS_DATA_FRAME_ID + "_idx").build()) .setSource(SourceConfig.builder().setIndex(CONTINUOUS_DATA_FRAME_SOURCE).build()) .setId(CONTINUOUS_DATA_FRAME_ID) + .setFrequency(TimeValue.timeValueSeconds(1)) .build(); putTransform(CONTINUOUS_DATA_FRAME_ID, config); startTransform(CONTINUOUS_DATA_FRAME_ID); waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, 0L); - DataFrameTransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); + assertBusy(() -> { + DataFrameTransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); + assertThat(stateAndStats.getIndexerStats().getOutputDocuments(), equalTo((long)ENTITIES.size())); + assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(totalDocsWritten)); + // Even if we get back to started, we may periodically get set back to `indexing` when triggered. + // Though short lived due to no changes on the source indices, it could result in flaky test behavior + assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING)); + }, 120, TimeUnit.SECONDS); - assertThat(stateAndStats.getIndexerStats().getOutputDocuments(), equalTo((long)ENTITIES.size())); - assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(totalDocsWritten)); - assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING)); + + // We want to make sure our latest state is written before we turn the node off, this makes the testing more reliable + awaitWrittenIndexerState(CONTINUOUS_DATA_FRAME_ID, IndexerState.STARTED.value()); } + @SuppressWarnings("unchecked") private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) throws Exception { // A continuous data frame should automatically become started when it gets assigned to a node @@ -161,9 +179,9 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { List entities = new ArrayList<>(1); entities.add("user_" + ENTITIES.size() + expectedLastCheckpoint); int docs = 5; - // Index the data very recently in the past so that the transform sync delay can catch up to reading it in our spin - // wait later. - putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, TimeValue.timeValueSeconds(1), entities); + // Index the data + // The frequency and delay should see the data once its indexed + putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, TimeValue.timeValueSeconds(0), entities); waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, expectedLastCheckpoint); @@ -176,10 +194,55 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING)); - assertThat(stateAndStats.getIndexerStats().getOutputDocuments(), - greaterThan(previousStateAndStats.getIndexerStats().getOutputDocuments())); - assertThat(stateAndStats.getIndexerStats().getNumDocuments(), - greaterThanOrEqualTo(docs + previousStateAndStats.getIndexerStats().getNumDocuments())); + awaitWrittenIndexerState(CONTINUOUS_DATA_FRAME_ID, (responseBody) -> { + Map indexerStats = (Map)((List)XContentMapValues.extractValue("hits.hits._source.stats", + responseBody)) + .get(0); + assertThat((Integer)indexerStats.get("documents_indexed"), + greaterThan(Long.valueOf(previousStateAndStats.getIndexerStats().getOutputDocuments()).intValue())); + assertThat((Integer)indexerStats.get("documents_processed"), + greaterThan(Long.valueOf(previousStateAndStats.getIndexerStats().getNumDocuments()).intValue())); + }); + } + + private void awaitWrittenIndexerState(String id, Consumer> responseAssertion) throws Exception { + Request getStatsDocsRequest = new Request("GET", ".data-frame-internal-*/_search"); + getStatsDocsRequest.setJsonEntity("{\n" + + " \"query\": {\n" + + " \"bool\": {\n" + + " \"filter\": \n" + + " {\"term\": {\n" + + " \"_id\": \"data_frame_transform_state_and_stats-" + id + "\"\n" + + " }}\n" + + " }\n" + + " },\n" + + " \"sort\": [\n" + + " {\n" + + " \"_index\": {\n" + + " \"order\": \"desc\"\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"size\": 1\n" + + "}"); + assertBusy(() -> { + // Want to make sure we get the latest docs + client().performRequest(new Request("POST", ".data-frame-internal-*/_refresh")); + Response response = client().performRequest(getStatsDocsRequest); + assertEquals(200, response.getStatusLine().getStatusCode()); + Map responseBody = entityAsMap(response); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", responseBody)); + responseAssertion.accept(responseBody); + }, 60, TimeUnit.SECONDS); + } + + private void awaitWrittenIndexerState(String id, String indexerState) throws Exception { + awaitWrittenIndexerState(id, (responseBody) -> { + String storedState = ((List)XContentMapValues.extractValue("hits.hits._source.state.indexer_state", responseBody)) + .get(0) + .toString(); + assertThat(storedState, equalTo(indexerState)); + }); } private void putTransform(String id, DataFrameTransformConfig config) throws IOException { @@ -222,7 +285,7 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { } private void waitUntilAfterCheckpoint(String id, long currentCheckpoint) throws Exception { - assertBusy(() -> assertThat(getTransformStats(id).getCheckpointingInfo().getNext().getCheckpoint(), greaterThan(currentCheckpoint)), + assertBusy(() -> assertThat(getTransformStats(id).getCheckpointingInfo().getLast().getCheckpoint(), greaterThan(currentCheckpoint)), 60, TimeUnit.SECONDS); } @@ -249,7 +312,7 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); Request req = new Request("PUT", indexName); req.setEntity(entity); - client().performRequest(req); + assertThat(client().performRequest(req).getStatusLine().getStatusCode(), equalTo(200)); } }