diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 0c4477b6b70..f9bbf890fe6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -90,28 +90,21 @@ public abstract class AsyncTwoPhaseIndexer { + return state.updateAndGet(previousState -> { if (previousState == IndexerState.INDEXING) { return IndexerState.STOPPING; } else if (previousState == IndexerState.STARTED) { - wasStartedAndSetStopped.set(true); return IndexerState.STOPPED; } else { return previousState; } }); - - if (wasStartedAndSetStopped.get()) { - onStop(); - } - return currentState; } /** @@ -288,20 +281,22 @@ public abstract class AsyncTwoPhaseIndexer { + AtomicBoolean callOnStop = new AtomicBoolean(false); + AtomicBoolean callOnAbort = new AtomicBoolean(false); + IndexerState updatedState = state.updateAndGet(prev -> { switch (prev) { case INDEXING: // ready for another job return IndexerState.STARTED; case STOPPING: + callOnStop.set(true); // must be started again - onStop(); return IndexerState.STOPPED; case ABORTING: + callOnAbort.set(true); // abort and exit - onAbort(); return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first case STOPPED: @@ -316,6 +311,14 @@ public abstract class AsyncTwoPhaseIndexer state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); - try { - CountDownLatch countDownLatch = new CountDownLatch(1); - MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false); - indexer.start(); - assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - countDownLatch.countDown(); - assertTrue(awaitBusy(() -> isFinished.get())); - - indexer.stop(); - assertTrue(isStopped.get()); - assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)); - } finally { - executor.shutdownNow(); - } - } - public void testStop_WhileIndexing() throws InterruptedException { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java index 1ec425c6416..69fb980871d 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java @@ -30,7 +30,6 @@ public class DataFrameTransformIT extends DataFrameIntegTestCase { cleanUp(); } - @AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public void testDataFrameTransformCrud() throws Exception { createReviewsIndex(); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java index 7dc79c1ae8f..9884c9bb679 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import org.junit.Before; @@ -23,7 +22,6 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameAuditorIT extends DataFrameRestTestCase { private static final String TEST_USER_NAME = "df_admin_plus_data"; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java index d7e12cf2bee..681599331c8 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.dataframe.integration; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; @@ -23,7 +22,6 @@ import java.io.IOException; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameConfigurationIndexIT extends DataFrameRestTestCase { /** diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java index 9bac6ca0b40..d9927cd09ed 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.xpack.core.dataframe.DataFrameField; @@ -22,7 +21,6 @@ import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswo import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase { private static final String TEST_USER_NAME = "df_user"; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java index 5b95d1daead..26a957ea055 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; @@ -16,7 +15,6 @@ import org.junit.Before; import java.io.IOException; import java.util.Map; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameMetaDataIT extends DataFrameRestTestCase { private boolean indicesCreated = false; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index 36f95e599ff..933fcc6c8e5 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.junit.Before; @@ -22,7 +21,6 @@ import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswo import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFramePivotRestIT extends DataFrameRestTestCase { private static final String TEST_USER_NAME = "df_admin_plus_data"; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java index 7b63644dd34..96aeeda8755 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.rest.RestStatus; @@ -20,7 +19,6 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.equalTo; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { public void testDummy() { diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java index f98fa6a2713..4f209c5a9f3 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -23,7 +22,6 @@ import java.util.Map; import static org.elasticsearch.xpack.core.dataframe.DataFrameField.INDEX_DOC_TYPE; import static org.elasticsearch.xpack.dataframe.DataFrameFeatureSet.PROVIDED_STATS; -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344") public class DataFrameUsageIT extends DataFrameRestTestCase { private boolean indicesCreated = false; diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 13deab6748c..575cd4c15bd 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -66,7 +66,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private final Map initialPosition; private final IndexerState initialIndexerState; - private final SetOnce indexer = new SetOnce<>(); + private final SetOnce indexer = new SetOnce<>(); private final AtomicReference taskState; private final AtomicReference stateReason; @@ -125,7 +125,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S return getState(); } - private DataFrameIndexer getIndexer() { + private ClientDataFrameIndexer getIndexer() { return indexer.get(); } @@ -236,7 +236,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S return; } - getIndexer().stop(); + IndexerState state = getIndexer().stop(); + if (state == IndexerState.STOPPED) { + getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop()); + } } @Override @@ -530,11 +533,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S next.run(); return; } + // If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING` + // OR we called `doSaveState` manually as the indexer was not actively running. + // Since we save the state to an index, we should make sure that our task state is in parity with the indexer state + if (indexerState.equals(IndexerState.STOPPED)) { + transformTask.setTaskStateStopped(); + } final DataFrameTransformState state = new DataFrameTransformState( transformTask.taskState.get(), indexerState, - getPosition(), + position, transformTask.currentCheckpoint.get(), transformTask.stateReason.get(), getProgress()); @@ -542,28 +551,18 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S // Persisting stats when we call `doSaveState` should be ok as we only call it on a state transition and // only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity - ActionListener> updateClusterStateListener = ActionListener.wrap( - task -> { - transformsConfigManager.putOrUpdateTransformStats( - new DataFrameTransformStateAndStats(transformId, state, getStats(), - DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null - ActionListener.wrap( - r -> { - next.run(); - }, - statsExc -> { - logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc); - next.run(); - } - )); - }, - exc -> { - logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc); - next.run(); - } - ); - - transformTask.persistStateToClusterState(state, updateClusterStateListener); + transformsConfigManager.putOrUpdateTransformStats( + new DataFrameTransformStateAndStats(transformId, state, getStats(), + DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null + ActionListener.wrap( + r -> { + next.run(); + }, + statsExc -> { + logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc); + next.run(); + } + )); } @Override @@ -602,20 +601,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S protected void onStop() { auditor.info(transformConfig.getId(), "Indexer has stopped"); logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId()); - - transformTask.setTaskStateStopped(); - transformsConfigManager.putOrUpdateTransformStats( - new DataFrameTransformStateAndStats(transformId, transformTask.getState(), getStats(), - DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null - ActionListener.wrap( - r -> { - transformTask.shutdown(); - }, - statsExc -> { - transformTask.shutdown(); - logger.error("Updating saving stats of transform [" + transformConfig.getId() + "] failed", statsExc); - } - )); + transformTask.shutdown(); } @Override diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 31f80033e7b..2686c57fd06 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -190,8 +190,10 @@ teardown: - do: data_frame.stop_data_frame_transform: transform_id: "airline-transform-start-stop" + wait_for_completion: true - match: { acknowledged: true } + - do: data_frame.get_data_frame_transform_stats: transform_id: "airline-transform-start-later" @@ -209,3 +211,46 @@ teardown: - do: data_frame.delete_data_frame_transform: transform_id: "airline-transform-start-later" + +--- +"Test stop all": + - do: + data_frame.put_data_frame_transform: + transform_id: "airline-transform-stop-all" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-data-start-later" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - do: + data_frame.start_data_frame_transform: + transform_id: "airline-transform-stop-all" + - match: { acknowledged: true } + + - do: + data_frame.start_data_frame_transform: + transform_id: "airline-transform-start-stop" + - match: { acknowledged: true } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "_all" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "*" + - match: { count: 2 } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + - match: { transforms.1.state.indexer_state: "stopped" } + - match: { transforms.1.state.task_state: "stopped" } + + - do: + data_frame.delete_data_frame_transform: + transform_id: "airline-transform-stop-all"