[ML][Data Frame] forcing that no ptask => STOPPED state (#42800) (#42860)

* [ML][Data Frame] forcing that no ptask => STOPPED state

* Addressing side-effect, early exit for stop when stopped
This commit is contained in:
Benjamin Trent 2019-06-05 07:09:34 -05:00 committed by GitHub
parent d5baedb789
commit 293f306b9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 24 additions and 4 deletions

View File

@ -284,6 +284,8 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
AtomicBoolean callOnStop = new AtomicBoolean(false);
AtomicBoolean callOnAbort = new AtomicBoolean(false);
IndexerState updatedState = state.updateAndGet(prev -> {
callOnAbort.set(false);
callOnStop.set(false);
switch (prev) {
case INDEXING:
// ready for another job

View File

@ -26,7 +26,10 @@ import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStats
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Request;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
@ -136,7 +139,21 @@ public class TransportGetDataFrameTransformsStatsAction extends
ActionListener<List<DataFrameTransformStateAndStats>> searchStatsListener = ActionListener.wrap(
stats -> {
List<DataFrameTransformStateAndStats> allStateAndStats = response.getTransformsStateAndStats();
allStateAndStats.addAll(stats);
// If the persistent task does NOT exist, it is STOPPED
// There is a potential race condition where the saved document does not actually have a STOPPED state
// as the task is cancelled before we persist state.
stats.forEach(stat ->
allStateAndStats.add(new DataFrameTransformStateAndStats(
stat.getId(),
new DataFrameTransformState(DataFrameTransformTaskState.STOPPED,
IndexerState.STOPPED,
stat.getTransformState().getPosition(),
stat.getTransformState().getCheckpoint(),
stat.getTransformState().getReason(),
stat.getTransformState().getProgress()),
stat.getTransformStats(),
stat.getCheckpointingInfo()))
);
transformsWithoutTasks.removeAll(
stats.stream().map(DataFrameTransformStateAndStats::getId).collect(Collectors.toSet()));

View File

@ -237,6 +237,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return;
}
if (getIndexer().getState() == IndexerState.STOPPED) {
return;
}
IndexerState state = getIndexer().stop();
if (state == IndexerState.STOPPED) {
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop());

View File

@ -90,9 +90,6 @@ teardown:
- match: { airline-data-by-airline-start-stop.mappings: {} }
---
"Test start/stop/start transform":
- skip:
reason: "https://github.com/elastic/elasticsearch/issues/42650"
version: "all"
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"