[ML][Data Frame] fixing _start?force=true bug (#45660) (#45734)

* [ML][Data Frame] fixing _start?force=true bug

* removing unused import

* removing old TODO
This commit is contained in:
Benjamin Trent 2019-08-20 09:23:07 -05:00 committed by GitHub
parent 49edf9e5b5
commit 43bb5924e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 27 deletions

View File

@ -59,7 +59,6 @@ public class TransportStartDataFrameTransformTaskAction extends
protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask, protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) { ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
if (transformTask.getTransformId().equals(request.getId())) { if (transformTask.getTransformId().equals(request.getId())) {
//TODO fix bug as .start where it was failed could result in a null current checkpoint?
transformTask.start(null, request.isForce(), listener); transformTask.start(null, request.isForce(), listener);
} else { } else {
listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId() listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()

View File

@ -35,7 +35,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStoredDoc; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStoredDoc;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.dataframe.DataFrame; import org.elasticsearch.xpack.dataframe.DataFrame;
@ -120,14 +119,14 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) { protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) {
final String transformId = params.getId(); final String transformId = params.getId();
final DataFrameTransformTask buildTask = (DataFrameTransformTask) task; final DataFrameTransformTask buildTask = (DataFrameTransformTask) task;
final DataFrameTransformState transformPTaskState = (DataFrameTransformState) state; // NOTE: DataFrameTransformPersistentTasksExecutor#createTask pulls in the stored task state from the ClusterState when the object
// is created. DataFrameTransformTask#ctor takes into account setting the task as failed if that is passed in with the
// If the transform is failed then the Persistent Task Service will // persisted state.
// try to restart it on a node restart. Exiting here leaves the // DataFrameTransformPersistentTasksExecutor#startTask will fail as DataFrameTransformTask#start, when force == false, will return
// transform in the failed state and it must be force closed. // a failure indicating that a failed task cannot be started.
if (transformPTaskState != null && transformPTaskState.getTaskState() == DataFrameTransformTaskState.FAILED) { //
return; // We want the rest of the state to be populated in the task when it is loaded on the node so that users can force start it again
} // later if they want.
final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder = final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder =
new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId) new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId)
@ -299,6 +298,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
Long previousCheckpoint, Long previousCheckpoint,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) { ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
buildTask.initializeIndexer(indexerBuilder); buildTask.initializeIndexer(indexerBuilder);
// DataFrameTransformTask#start will fail if the task state is FAILED
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener); buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener);
} }

View File

@ -235,7 +235,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
} }
} }
public void setTaskStateStopped() { public synchronized void setTaskStateStopped() {
taskState.set(DataFrameTransformTaskState.STOPPED); taskState.set(DataFrameTransformTaskState.STOPPED);
} }
@ -256,8 +256,16 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return; return;
} }
if (getIndexer() == null) { if (getIndexer() == null) {
listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later", // If our state is failed AND the indexer is null, the user needs to _stop?force=true so that the indexer gets
getTransformId())); // fully initialized.
// If we are NOT failed, then we can assume that `start` was just called early in the process.
String msg = taskState.get() == DataFrameTransformTaskState.FAILED ?
"It failed during the initialization process; force stop to allow reinitialization." :
"Try again later.";
listener.onFailure(new ElasticsearchStatusException("Task for transform [{}] not fully initialized. {}",
RestStatus.CONFLICT,
getTransformId(),
msg));
return; return;
} }
final IndexerState newState = getIndexer().start(); final IndexerState newState = getIndexer().start();
@ -409,6 +417,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
} }
synchronized void markAsFailed(String reason, ActionListener<Void> listener) { synchronized void markAsFailed(String reason, ActionListener<Void> listener) {
// If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to
// flag the previously triggered indexer as failed. Exit early as we are already flagged as failed.
if (taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason);
listener.onResponse(null);
return;
}
// If the indexer is `STOPPING` this means that `DataFrameTransformTask#stop` was called previously, but something caused // If the indexer is `STOPPING` this means that `DataFrameTransformTask#stop` was called previously, but something caused
// the indexer to fail. Since `ClientDataFrameIndexer#doSaveState` will persist the state to the index once the indexer stops, // the indexer to fail. Since `ClientDataFrameIndexer#doSaveState` will persist the state to the index once the indexer stops,
// it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue. // it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue.
@ -425,26 +440,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
listener.onResponse(null); listener.onResponse(null);
return; return;
} }
// If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to
// flag the previously triggered indexer as failed. Exit early as we are already flagged as failed.
if (taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason);
listener.onResponse(null);
return;
}
auditor.error(transform.getId(), reason); auditor.error(transform.getId(), reason);
// We should not keep retrying. Either the task will be stopped, or started // We should not keep retrying. Either the task will be stopped, or started
// If it is started again, it is registered again. // If it is started again, it is registered again.
deregisterSchedulerJob(); deregisterSchedulerJob();
DataFrameTransformState newState = new DataFrameTransformState(
DataFrameTransformTaskState.FAILED,
getIndexer() == null ? initialIndexerState : getIndexer().getState(),
getIndexer() == null ? initialPosition : getIndexer().getPosition(),
currentCheckpoint.get(),
reason,
getIndexer() == null ? null : getIndexer().getProgress());
taskState.set(DataFrameTransformTaskState.FAILED); taskState.set(DataFrameTransformTaskState.FAILED);
stateReason.set(reason); stateReason.set(reason);
DataFrameTransformState newState = getState();
// Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate
// This keeps track of STARTED, FAILED, STOPPED // This keeps track of STARTED, FAILED, STOPPED
// This is because a FAILED state could occur because we failed to read the config from the internal index, which would imply that // This is because a FAILED state could occur because we failed to read the config from the internal index, which would imply that