diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index f9bbf890fe6..efe57f44e89 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -284,6 +284,8 @@ public abstract class AsyncTwoPhaseIndexer { + callOnAbort.set(false); + callOnStop.set(false); switch (prev) { case INDEXING: // ready for another job diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java index df2d09a875d..d814714ab66 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java @@ -26,7 +26,10 @@ import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStats import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Request; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; +import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; @@ -136,7 +139,21 @@ public class TransportGetDataFrameTransformsStatsAction extends ActionListener> searchStatsListener = ActionListener.wrap( stats -> { List allStateAndStats = response.getTransformsStateAndStats(); - allStateAndStats.addAll(stats); + // If the persistent task does NOT exist, it is STOPPED + // There is a potential race condition where the saved document does not actually have a STOPPED state + // as the task is cancelled before we persist state. + stats.forEach(stat -> + allStateAndStats.add(new DataFrameTransformStateAndStats( + stat.getId(), + new DataFrameTransformState(DataFrameTransformTaskState.STOPPED, + IndexerState.STOPPED, + stat.getTransformState().getPosition(), + stat.getTransformState().getCheckpoint(), + stat.getTransformState().getReason(), + stat.getTransformState().getProgress()), + stat.getTransformStats(), + stat.getCheckpointingInfo())) + ); transformsWithoutTasks.removeAll( stats.stream().map(DataFrameTransformStateAndStats::getId).collect(Collectors.toSet())); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 20ef5be09e8..28876ef2c4d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -237,6 +237,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S return; } + if (getIndexer().getState() == IndexerState.STOPPED) { + return; + } + IndexerState state = getIndexer().stop(); if (state == IndexerState.STOPPED) { getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop()); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 2686c57fd06..d156344b5ad 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -90,9 +90,6 @@ teardown: - match: { airline-data-by-airline-start-stop.mappings: {} } --- "Test start/stop/start transform": - - skip: - reason: "https://github.com/elastic/elasticsearch/issues/42650" - version: "all" - do: data_frame.start_data_frame_transform: transform_id: "airline-transform-start-stop"