* stop data frame task after it finishes * test auto stop * adapt tests * persist the state correctly and move stop into listener * Calling `onStop` even if persistence fails, changing `stop` to rely on doSaveState
This commit is contained in:
parent
30d8085d96
commit
e384bf0276
|
@ -55,7 +55,7 @@ public class DataFrameTransformIT extends DataFrameIntegTestCase {
|
||||||
|
|
||||||
DataFrameTransformStateAndStats stats = getDataFrameTransformStats(config.getId()).getTransformsStateAndStats().get(0);
|
DataFrameTransformStateAndStats stats = getDataFrameTransformStats(config.getId()).getTransformsStateAndStats().get(0);
|
||||||
|
|
||||||
assertThat(stats.getTransformState().getIndexerState(), equalTo(IndexerState.STARTED));
|
assertThat(stats.getTransformState().getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -107,7 +107,7 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
|
||||||
assertEquals(1, transformsStats.size());
|
assertEquals(1, transformsStats.size());
|
||||||
Map<String, Object> state = (Map<String, Object>) XContentMapValues.extractValue("state", transformsStats.get(0));
|
Map<String, Object> state = (Map<String, Object>) XContentMapValues.extractValue("state", transformsStats.get(0));
|
||||||
assertEquals(1, transformsStats.size());
|
assertEquals(1, transformsStats.size());
|
||||||
assertEquals("started", XContentMapValues.extractValue("task_state", state));
|
assertEquals("stopped", XContentMapValues.extractValue("task_state", state));
|
||||||
assertEquals(null, XContentMapValues.extractValue("current_position", state));
|
assertEquals(null, XContentMapValues.extractValue("current_position", state));
|
||||||
assertEquals(1, XContentMapValues.extractValue("checkpoint", state));
|
assertEquals(1, XContentMapValues.extractValue("checkpoint", state));
|
||||||
|
|
||||||
|
|
|
@ -218,6 +218,9 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
|
||||||
assertTrue(indexExists(dataFrameIndex));
|
assertTrue(indexExists(dataFrameIndex));
|
||||||
// wait until the dataframe has been created and all data is available
|
// wait until the dataframe has been created and all data is available
|
||||||
waitForDataFrameCheckpoint(transformId);
|
waitForDataFrameCheckpoint(transformId);
|
||||||
|
|
||||||
|
// TODO: assuming non-continuous data frames, so transform should auto-stop
|
||||||
|
waitForDataFrameStopped(transformId);
|
||||||
refreshIndex(dataFrameIndex);
|
refreshIndex(dataFrameIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,6 +236,12 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void waitForDataFrameStopped(String transformId) throws Exception {
|
||||||
|
assertBusy(() -> {
|
||||||
|
assertEquals("stopped", getDataFrameTaskState(transformId));
|
||||||
|
}, 5, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
void waitForDataFrameCheckpoint(String transformId) throws Exception {
|
void waitForDataFrameCheckpoint(String transformId) throws Exception {
|
||||||
assertBusy(() -> {
|
assertBusy(() -> {
|
||||||
long checkpoint = getDataFrameCheckpoint(transformId);
|
long checkpoint = getDataFrameCheckpoint(transformId);
|
||||||
|
|
|
@ -105,8 +105,8 @@ public class DataFrameUsageIT extends DataFrameRestTestCase {
|
||||||
usageAsMap = entityAsMap(usageResponse);
|
usageAsMap = entityAsMap(usageResponse);
|
||||||
// we should see some stats
|
// we should see some stats
|
||||||
assertEquals(3, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap));
|
assertEquals(3, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap));
|
||||||
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.started", usageAsMap));
|
// TODO: due to auto-stop we only see stopped data frames
|
||||||
assertEquals(2, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap));
|
assertEquals(3, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap));
|
||||||
for(String statName : PROVIDED_STATS) {
|
for(String statName : PROVIDED_STATS) {
|
||||||
assertEquals("Incorrect stat " + statName,
|
assertEquals("Incorrect stat " + statName,
|
||||||
expectedStats.get(statName), XContentMapValues.extractValue("data_frame.stats." + statName, usageAsMap));
|
expectedStats.get(statName), XContentMapValues.extractValue("data_frame.stats." + statName, usageAsMap));
|
||||||
|
|
|
@ -247,7 +247,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
||||||
|
|
||||||
IndexerState state = getIndexer().stop();
|
IndexerState state = getIndexer().stop();
|
||||||
if (state == IndexerState.STOPPED) {
|
if (state == IndexerState.STOPPED) {
|
||||||
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop());
|
//doSaveState calls `onStop` when the task state is `STOPPED`
|
||||||
|
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -559,8 +560,20 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
||||||
transformTask.setTaskStateStopped();
|
transformTask.setTaskStateStopped();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DataFrameTransformTaskState taskState = transformTask.taskState.get();
|
||||||
|
|
||||||
|
// TODO: check whether continuous data frames is enabled when available
|
||||||
|
if (indexerState.equals(IndexerState.STARTED) && transformTask.currentCheckpoint.get() == 1) {
|
||||||
|
// set both to stopped so they are persisted as such
|
||||||
|
taskState = DataFrameTransformTaskState.STOPPED;
|
||||||
|
indexerState = IndexerState.STOPPED;
|
||||||
|
|
||||||
|
auditor.info(transformConfig.getId(), "Data frame finished indexing all data, initiating stop");
|
||||||
|
logger.info("Data frame [{}] finished indexing all data, initiating stop", transformConfig.getId());
|
||||||
|
}
|
||||||
|
|
||||||
final DataFrameTransformState state = new DataFrameTransformState(
|
final DataFrameTransformState state = new DataFrameTransformState(
|
||||||
transformTask.taskState.get(),
|
taskState,
|
||||||
indexerState,
|
indexerState,
|
||||||
position,
|
position,
|
||||||
transformTask.currentCheckpoint.get(),
|
transformTask.currentCheckpoint.get(),
|
||||||
|
@ -575,12 +588,20 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
||||||
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
|
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
|
||||||
ActionListener.wrap(
|
ActionListener.wrap(
|
||||||
r -> {
|
r -> {
|
||||||
|
// for auto stop shutdown the task
|
||||||
|
if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) {
|
||||||
|
onStop();
|
||||||
|
}
|
||||||
next.run();
|
next.run();
|
||||||
},
|
},
|
||||||
statsExc -> {
|
statsExc -> {
|
||||||
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
|
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
|
||||||
auditor.warning(getJobId(),
|
auditor.warning(getJobId(),
|
||||||
"Failure updating stats of transform: " + statsExc.getMessage());
|
"Failure updating stats of transform: " + statsExc.getMessage());
|
||||||
|
// for auto stop shutdown the task
|
||||||
|
if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) {
|
||||||
|
onStop();
|
||||||
|
}
|
||||||
next.run();
|
next.run();
|
||||||
}
|
}
|
||||||
));
|
));
|
||||||
|
|
|
@ -47,8 +47,8 @@ teardown:
|
||||||
transform_id: "airline-transform-stats"
|
transform_id: "airline-transform-stats"
|
||||||
- match: { count: 1 }
|
- match: { count: 1 }
|
||||||
- match: { transforms.0.id: "airline-transform-stats" }
|
- match: { transforms.0.id: "airline-transform-stats" }
|
||||||
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
|
- match: { transforms.0.state.indexer_state: "/started|indexing|stopped/" }
|
||||||
- match: { transforms.0.state.task_state: "started" }
|
- match: { transforms.0.state.task_state: "/started|stopped/" }
|
||||||
- lte: { transforms.0.state.checkpoint: 1 }
|
- lte: { transforms.0.state.checkpoint: 1 }
|
||||||
- lte: { transforms.0.stats.pages_processed: 1 }
|
- lte: { transforms.0.stats.pages_processed: 1 }
|
||||||
- match: { transforms.0.stats.documents_processed: 0 }
|
- match: { transforms.0.stats.documents_processed: 0 }
|
||||||
|
@ -163,7 +163,7 @@ teardown:
|
||||||
transform_id: "*"
|
transform_id: "*"
|
||||||
- match: { count: 2 }
|
- match: { count: 2 }
|
||||||
- match: { transforms.0.id: "airline-transform-stats" }
|
- match: { transforms.0.id: "airline-transform-stats" }
|
||||||
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
|
- match: { transforms.0.state.indexer_state: "/started|indexing|stopped/" }
|
||||||
- match: { transforms.1.id: "airline-transform-stats-dos" }
|
- match: { transforms.1.id: "airline-transform-stats-dos" }
|
||||||
- match: { transforms.1.state.indexer_state: "stopped" }
|
- match: { transforms.1.state.indexer_state: "stopped" }
|
||||||
|
|
||||||
|
@ -172,7 +172,7 @@ teardown:
|
||||||
transform_id: "_all"
|
transform_id: "_all"
|
||||||
- match: { count: 2 }
|
- match: { count: 2 }
|
||||||
- match: { transforms.0.id: "airline-transform-stats" }
|
- match: { transforms.0.id: "airline-transform-stats" }
|
||||||
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
|
- match: { transforms.0.state.indexer_state: "/started|indexing|stopped/" }
|
||||||
- match: { transforms.1.id: "airline-transform-stats-dos" }
|
- match: { transforms.1.id: "airline-transform-stats-dos" }
|
||||||
- match: { transforms.1.state.indexer_state: "stopped" }
|
- match: { transforms.1.state.indexer_state: "stopped" }
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue