From e384bf02766b58fcd205471ed3907c6b64171765 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 11 Jun 2019 15:55:02 -0500 Subject: [PATCH] [ML-DataFrame] stop task at completion of data frame function (#42955) (#43114) * stop data frame task after it finishes * test auto stop * adapt tests * persist the state correctly and move stop into listener * Calling `onStop` even if persistence fails, changing `stop` to rely on doSaveState --- .../integration/DataFrameTransformIT.java | 2 +- .../DataFrameGetAndGetStatsIT.java | 2 +- .../integration/DataFrameRestTestCase.java | 9 +++++++ .../integration/DataFrameUsageIT.java | 4 +-- .../transforms/DataFrameTransformTask.java | 25 +++++++++++++++++-- .../test/data_frame/transforms_stats.yml | 8 +++--- 6 files changed, 40 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java index 69fb980871d..e5124ea3a0a 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java @@ -55,7 +55,7 @@ public class DataFrameTransformIT extends DataFrameIntegTestCase { DataFrameTransformStateAndStats stats = getDataFrameTransformStats(config.getId()).getTransformsStateAndStats().get(0); - assertThat(stats.getTransformState().getIndexerState(), equalTo(IndexerState.STARTED)); + assertThat(stats.getTransformState().getIndexerState(), equalTo(IndexerState.STOPPED)); } diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java index d9927cd09ed..01be0c416d4 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java @@ -107,7 +107,7 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase { assertEquals(1, transformsStats.size()); Map state = (Map) XContentMapValues.extractValue("state", transformsStats.get(0)); assertEquals(1, transformsStats.size()); - assertEquals("started", XContentMapValues.extractValue("task_state", state)); + assertEquals("stopped", XContentMapValues.extractValue("task_state", state)); assertEquals(null, XContentMapValues.extractValue("current_position", state)); assertEquals(1, XContentMapValues.extractValue("checkpoint", state)); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index 403c1e7661a..126417ea2d8 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -218,6 +218,9 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { assertTrue(indexExists(dataFrameIndex)); // wait until the dataframe has been created and all data is available waitForDataFrameCheckpoint(transformId); + + // TODO: assuming non-continuous data frames, so transform should auto-stop + waitForDataFrameStopped(transformId); refreshIndex(dataFrameIndex); } @@ -233,6 +236,12 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { return request; } + void waitForDataFrameStopped(String transformId) throws Exception { + assertBusy(() -> { + assertEquals("stopped", getDataFrameTaskState(transformId)); + }, 5, TimeUnit.SECONDS); + } + void waitForDataFrameCheckpoint(String transformId) throws Exception { assertBusy(() -> { long checkpoint = getDataFrameCheckpoint(transformId); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java index 4f209c5a9f3..5c35148abc7 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java @@ -105,8 +105,8 @@ public class DataFrameUsageIT extends DataFrameRestTestCase { usageAsMap = entityAsMap(usageResponse); // we should see some stats assertEquals(3, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap)); - assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.started", usageAsMap)); - assertEquals(2, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap)); + // TODO: due to auto-stop we only see stopped data frames + assertEquals(3, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap)); for(String statName : PROVIDED_STATS) { assertEquals("Incorrect stat " + statName, expectedStats.get(statName), XContentMapValues.extractValue("data_frame.stats." + statName, usageAsMap)); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index b351c6ad1fb..cf6affa0d3e 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -247,7 +247,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S IndexerState state = getIndexer().stop(); if (state == IndexerState.STOPPED) { - getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop()); + //doSaveState calls `onStop` when the task state is `STOPPED` + getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {}); } } @@ -559,8 +560,20 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S transformTask.setTaskStateStopped(); } + DataFrameTransformTaskState taskState = transformTask.taskState.get(); + + // TODO: check whether continuous data frames is enabled when available + if (indexerState.equals(IndexerState.STARTED) && transformTask.currentCheckpoint.get() == 1) { + // set both to stopped so they are persisted as such + taskState = DataFrameTransformTaskState.STOPPED; + indexerState = IndexerState.STOPPED; + + auditor.info(transformConfig.getId(), "Data frame finished indexing all data, initiating stop"); + logger.info("Data frame [{}] finished indexing all data, initiating stop", transformConfig.getId()); + } + final DataFrameTransformState state = new DataFrameTransformState( - transformTask.taskState.get(), + taskState, indexerState, position, transformTask.currentCheckpoint.get(), @@ -575,12 +588,20 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null ActionListener.wrap( r -> { + // for auto stop shutdown the task + if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) { + onStop(); + } next.run(); }, statsExc -> { logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc); auditor.warning(getJobId(), "Failure updating stats of transform: " + statsExc.getMessage()); + // for auto stop shutdown the task + if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) { + onStop(); + } next.run(); } )); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml index 79aa14cb6f6..6dc623683b4 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml @@ -47,8 +47,8 @@ teardown: transform_id: "airline-transform-stats" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-stats" } - - match: { transforms.0.state.indexer_state: "/started|indexing/" } - - match: { transforms.0.state.task_state: "started" } + - match: { transforms.0.state.indexer_state: "/started|indexing|stopped/" } + - match: { transforms.0.state.task_state: "/started|stopped/" } - lte: { transforms.0.state.checkpoint: 1 } - lte: { transforms.0.stats.pages_processed: 1 } - match: { transforms.0.stats.documents_processed: 0 } @@ -163,7 +163,7 @@ teardown: transform_id: "*" - match: { count: 2 } - match: { transforms.0.id: "airline-transform-stats" } - - match: { transforms.0.state.indexer_state: "/started|indexing/" } + - match: { transforms.0.state.indexer_state: "/started|indexing|stopped/" } - match: { transforms.1.id: "airline-transform-stats-dos" } - match: { transforms.1.state.indexer_state: "stopped" } @@ -172,7 +172,7 @@ teardown: transform_id: "_all" - match: { count: 2 } - match: { transforms.0.id: "airline-transform-stats" } - - match: { transforms.0.state.indexer_state: "/started|indexing/" } + - match: { transforms.0.state.indexer_state: "/started|indexing|stopped/" } - match: { transforms.1.id: "airline-transform-stats-dos" } - match: { transforms.1.state.indexer_state: "stopped" }