[ML Data Frame] Refactor stop logic (#42644) (#42763)

* Revert "invalid test"

This reverts commit 9dd8b52c13c716918ff97e6527aaf43aefc4695d.

* Testing

* mend

* Revert "[ML Data Frame] Mute Data Frame tests"

This reverts commit 5d837fa312b0e41a77a65462667a2d92d1114567.

* Call onStop and onAbort outside atomic update

* Don’t update CS

* Tidying up

* Remove invalid test that asserted logic that has been removed

* Add stopped event

* Revert "Add stopped event"

This reverts commit 02ba992f4818bebd838e1c7678bd2e1cc090bfab.

* Adding check for STOPPED in saveState
This commit is contained in:
Benjamin Trent 2019-06-03 06:53:44 -05:00 committed by GitHub
parent 10aca87389
commit 0253927ec4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 87 additions and 87 deletions

View File

@ -90,28 +90,21 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
* running in the background, {@link #onStop()} will be called when the background job
* detects that the indexer is stopped.
* If there is no job running when this function is called
* the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly.
* If there is no job running when this function is called the returned
* state is {@link IndexerState#STOPPED} and {@link #onStop()} will not be called.
*
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
public synchronized IndexerState stop() {
AtomicBoolean wasStartedAndSetStopped = new AtomicBoolean(false);
IndexerState currentState = state.updateAndGet(previousState -> {
return state.updateAndGet(previousState -> {
if (previousState == IndexerState.INDEXING) {
return IndexerState.STOPPING;
} else if (previousState == IndexerState.STARTED) {
wasStartedAndSetStopped.set(true);
return IndexerState.STOPPED;
} else {
return previousState;
}
});
if (wasStartedAndSetStopped.get()) {
onStop();
}
return currentState;
}
/**
@ -288,20 +281,22 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
}
private IndexerState finishAndSetState() {
return state.updateAndGet(prev -> {
AtomicBoolean callOnStop = new AtomicBoolean(false);
AtomicBoolean callOnAbort = new AtomicBoolean(false);
IndexerState updatedState = state.updateAndGet(prev -> {
switch (prev) {
case INDEXING:
// ready for another job
return IndexerState.STARTED;
case STOPPING:
callOnStop.set(true);
// must be started again
onStop();
return IndexerState.STOPPED;
case ABORTING:
callOnAbort.set(true);
// abort and exit
onAbort();
return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first
case STOPPED:
@ -316,6 +311,14 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]");
}
});
if (callOnStop.get()) {
onStop();
} else if (callOnAbort.get()) {
onAbort();
}
return updatedState;
}
private void onSearchResponse(SearchResponse searchResponse) {

View File

@ -268,25 +268,6 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
}
}
public void testStop_AfterIndexerIsFinished() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
indexer.start();
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
countDownLatch.countDown();
assertTrue(awaitBusy(() -> isFinished.get()));
indexer.stop();
assertTrue(isStopped.get());
assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
} finally {
executor.shutdownNow();
}
}
public void testStop_WhileIndexing() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);

View File

@ -30,7 +30,6 @@ public class DataFrameTransformIT extends DataFrameIntegTestCase {
cleanUp();
}
@AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public void testDataFrameTransformCrud() throws Exception {
createReviewsIndex();

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.dataframe.integration;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.junit.Before;
@ -23,7 +22,6 @@ import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameAuditorIT extends DataFrameRestTestCase {
private static final String TEST_USER_NAME = "df_admin_plus_data";

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.dataframe.integration;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
@ -23,7 +22,6 @@ import java.io.IOException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameConfigurationIndexIT extends DataFrameRestTestCase {
/**

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.dataframe.integration;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
@ -22,7 +21,6 @@ import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswo
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
private static final String TEST_USER_NAME = "df_user";

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.dataframe.integration;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
@ -16,7 +15,6 @@ import org.junit.Before;
import java.io.IOException;
import java.util.Map;
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameMetaDataIT extends DataFrameRestTestCase {
private boolean indicesCreated = false;

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.dataframe.integration;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;
@ -22,7 +21,6 @@ import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswo
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFramePivotRestIT extends DataFrameRestTestCase {
private static final String TEST_USER_NAME = "df_admin_plus_data";

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.dataframe.integration;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.rest.RestStatus;
@ -20,7 +19,6 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.equalTo;
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
public void testDummy() {

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.dataframe.integration;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
@ -23,7 +22,6 @@ import java.util.Map;
import static org.elasticsearch.xpack.core.dataframe.DataFrameField.INDEX_DOC_TYPE;
import static org.elasticsearch.xpack.dataframe.DataFrameFeatureSet.PROVIDED_STATS;
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameUsageIT extends DataFrameRestTestCase {
private boolean indicesCreated = false;

View File

@ -66,7 +66,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final Map<String, Object> initialPosition;
private final IndexerState initialIndexerState;
private final SetOnce<DataFrameIndexer> indexer = new SetOnce<>();
private final SetOnce<ClientDataFrameIndexer> indexer = new SetOnce<>();
private final AtomicReference<DataFrameTransformTaskState> taskState;
private final AtomicReference<String> stateReason;
@ -125,7 +125,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return getState();
}
private DataFrameIndexer getIndexer() {
private ClientDataFrameIndexer getIndexer() {
return indexer.get();
}
@ -236,7 +236,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return;
}
getIndexer().stop();
IndexerState state = getIndexer().stop();
if (state == IndexerState.STOPPED) {
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop());
}
}
@Override
@ -530,11 +533,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
next.run();
return;
}
// If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING`
// OR we called `doSaveState` manually as the indexer was not actively running.
// Since we save the state to an index, we should make sure that our task state is in parity with the indexer state
if (indexerState.equals(IndexerState.STOPPED)) {
transformTask.setTaskStateStopped();
}
final DataFrameTransformState state = new DataFrameTransformState(
transformTask.taskState.get(),
indexerState,
getPosition(),
position,
transformTask.currentCheckpoint.get(),
transformTask.stateReason.get(),
getProgress());
@ -542,28 +551,18 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
// Persisting stats when we call `doSaveState` should be ok as we only call it on a state transition and
// only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> updateClusterStateListener = ActionListener.wrap(
task -> {
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
));
},
exc -> {
logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc);
next.run();
}
);
transformTask.persistStateToClusterState(state, updateClusterStateListener);
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
));
}
@Override
@ -602,20 +601,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
protected void onStop() {
auditor.info(transformConfig.getId(), "Indexer has stopped");
logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());
transformTask.setTaskStateStopped();
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, transformTask.getState(), getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
transformTask.shutdown();
},
statsExc -> {
transformTask.shutdown();
logger.error("Updating saving stats of transform [" + transformConfig.getId() + "] failed", statsExc);
}
));
transformTask.shutdown();
}
@Override

View File

@ -190,8 +190,10 @@ teardown:
- do:
data_frame.stop_data_frame_transform:
transform_id: "airline-transform-start-stop"
wait_for_completion: true
- match: { acknowledged: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "airline-transform-start-later"
@ -209,3 +211,46 @@ teardown:
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-start-later"
---
"Test stop all":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform-stop-all"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-start-later" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-stop-all"
- match: { acknowledged: true }
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"
- match: { acknowledged: true }
- do:
data_frame.stop_data_frame_transform:
transform_id: "_all"
wait_for_completion: true
- match: { acknowledged: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "*"
- match: { count: 2 }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }
- match: { transforms.1.state.indexer_state: "stopped" }
- match: { transforms.1.state.task_state: "stopped" }
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-stop-all"