[ML] Introduce a "starting" datafeed state for lazy jobs (#54065)

It is possible for ML jobs to open lazily if the "allow_lazy_open"
option in the job config is set to true.  Such jobs wait in the
"opening" state until a node has sufficient capacity to run them.

This commit fixes the bug that prevented datafeeds for jobs lazily
waiting assignment from being started.  The state of such datafeeds
is "starting", and they can be stopped by the stop datafeed API
while in this state with or without force.

Backport of #53918
This commit is contained in:
David Roberts 2020-03-24 13:00:04 +00:00 committed by GitHub
parent 30105a5ab5
commit 1421471556
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 97 additions and 24 deletions

View File

@ -1387,7 +1387,10 @@ tag::state-datafeed[]
The status of the {dfeed}, which can be one of the following values: The status of the {dfeed}, which can be one of the following values:
+ +
-- --
* `starting`: The {dfeed} has been requested to start but has not yet started.
* `started`: The {dfeed} is actively receiving data. * `started`: The {dfeed} is actively receiving data.
* `stopping`: The {dfeed} has been requested to stop gracefully and is
completing its final action.
* `stopped`: The {dfeed} is stopped and will not receive data until it is * `stopped`: The {dfeed} is stopped and will not receive data until it is
re-started. re-started.
-- --

View File

@ -132,14 +132,18 @@ public final class MlTasks {
public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) { public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
PersistentTasksCustomMetaData.PersistentTask<?> task = getDatafeedTask(datafeedId, tasks); PersistentTasksCustomMetaData.PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
// TODO: report (task != null && task.getState() == null) as STARTING in version 8, and fix side effects if (task == null) {
if (task != null && task.getState() != null) {
return (DatafeedState) task.getState();
} else {
// If we haven't started a datafeed then there will be no persistent task, // If we haven't started a datafeed then there will be no persistent task,
// which is the same as if the datafeed was't started // which is the same as if the datafeed was't started
return DatafeedState.STOPPED; return DatafeedState.STOPPED;
} }
DatafeedState taskState = (DatafeedState) task.getState();
if (taskState == null) {
// If we haven't set a state yet then the task has never been assigned, so
// report that it's starting
return DatafeedState.STARTING;
}
return taskState;
} }
public static DataFrameAnalyticsState getDataFrameAnalyticsState(String analyticsId, @Nullable PersistentTasksCustomMetaData tasks) { public static DataFrameAnalyticsState getDataFrameAnalyticsState(String analyticsId, @Nullable PersistentTasksCustomMetaData tasks) {

View File

@ -55,7 +55,8 @@ public class MlTasksTests extends ESTestCase {
tasksBuilder.addTask(MlTasks.datafeedTaskId("foo"), MlTasks.DATAFEED_TASK_NAME, tasksBuilder.addTask(MlTasks.datafeedTaskId("foo"), MlTasks.DATAFEED_TASK_NAME,
new StartDatafeedAction.DatafeedParams("foo", 0L), new StartDatafeedAction.DatafeedParams("foo", 0L),
new PersistentTasksCustomMetaData.Assignment("bar", "test assignment")); new PersistentTasksCustomMetaData.Assignment("bar", "test assignment"));
assertEquals(DatafeedState.STOPPED, MlTasks.getDatafeedState("foo", tasksBuilder.build())); // A task with no state means the datafeed is starting
assertEquals(DatafeedState.STARTING, MlTasks.getDatafeedState("foo", tasksBuilder.build()));
tasksBuilder.updateTaskState(MlTasks.datafeedTaskId("foo"), DatafeedState.STARTED); tasksBuilder.updateTaskState(MlTasks.datafeedTaskId("foo"), DatafeedState.STARTED);
assertEquals(DatafeedState.STARTED, MlTasks.getDatafeedState("foo", tasksBuilder.build())); assertEquals(DatafeedState.STARTED, MlTasks.getDatafeedState("foo", tasksBuilder.build()));

View File

@ -287,7 +287,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
listener.onFailure(predicate.exception); listener.onFailure(predicate.exception);
} }
} else { } else {
listener.onResponse(new AcknowledgedResponse(predicate.opened)); listener.onResponse(new AcknowledgedResponse(true));
} }
} }
@ -535,9 +535,8 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
* Important: the methods of this class must NOT throw exceptions. If they did then the callers * Important: the methods of this class must NOT throw exceptions. If they did then the callers
* of endpoints waiting for a condition tested by this predicate would never get a response. * of endpoints waiting for a condition tested by this predicate would never get a response.
*/ */
private class JobPredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> { private static class JobPredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> {
private volatile boolean opened;
private volatile Exception exception; private volatile Exception exception;
private volatile boolean shouldCancel; private volatile boolean shouldCancel;
@ -584,7 +583,6 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
case CLOSED: case CLOSED:
return false; return false;
case OPENED: case OPENED:
opened = true;
return true; return true;
case CLOSING: case CLOSING:
exception = ExceptionsHelper.conflictStatusException("The job has been " + JobState.CLOSED + " while waiting to be " exception = ExceptionsHelper.conflictStatusException("The job has been " + JobState.CLOSED + " while waiting to be "

View File

@ -476,7 +476,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
* Important: the methods of this class must NOT throw exceptions. If they did then the callers * Important: the methods of this class must NOT throw exceptions. If they did then the callers
* of endpoints waiting for a condition tested by this predicate would never get a response. * of endpoints waiting for a condition tested by this predicate would never get a response.
*/ */
private class DatafeedPredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> { private static class DatafeedPredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> {
private volatile Exception exception; private volatile Exception exception;
@ -486,13 +486,18 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
return false; return false;
} }
PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment(); PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment();
if (assignment != null && assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false && if (assignment != null) {
assignment.isAssigned() == false) { // This means we are awaiting the datafeed's job to be assigned to a node
if (assignment.equals(DatafeedNodeSelector.AWAITING_JOB_ASSIGNMENT)) {
return true;
}
if (assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false && assignment.isAssigned() == false) {
// Assignment has failed despite passing our "fast fail" validation // Assignment has failed despite passing our "fast fail" validation
exception = new ElasticsearchStatusException("Could not start datafeed, allocation explanation [" + exception = new ElasticsearchStatusException("Could not start datafeed, allocation explanation [" +
assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS); assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS);
return true; return true;
} }
}
DatafeedState datafeedState = (DatafeedState) persistentTask.getState(); DatafeedState datafeedState = (DatafeedState) persistentTask.getState();
return datafeedState == DatafeedState.STARTED; return datafeedState == DatafeedState.STARTED;
} }

View File

@ -93,14 +93,8 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
List<String> stoppingDatafeedIds, List<String> stoppingDatafeedIds,
List<String> notStoppedDatafeedIds) { List<String> notStoppedDatafeedIds) {
switch (datafeedState) { switch (datafeedState) {
// Treat STARTING like STARTED for stop API behaviour.
case STARTING: case STARTING:
// The STARTING state is not used anywhere at present, so this should never happen.
// At present datafeeds that have a persistent task that hasn't yet been assigned
// a state are reported as STOPPED (which is not great). It could be considered a
// breaking change to introduce the STARTING state though, so let's aim to do it in
// version 8. Also consider treating STARTING like STARTED for stop API behaviour.
notStoppedDatafeedIds.add(datafeedId);
break;
case STARTED: case STARTED:
startedDatafeedIds.add(datafeedId); startedDatafeedIds.add(datafeedId);
notStoppedDatafeedIds.add(datafeedId); notStoppedDatafeedIds.add(datafeedId);

View File

@ -32,6 +32,9 @@ public class DatafeedNodeSelector {
private static final Logger LOGGER = LogManager.getLogger(DatafeedNodeSelector.class); private static final Logger LOGGER = LogManager.getLogger(DatafeedNodeSelector.class);
public static final PersistentTasksCustomMetaData.Assignment AWAITING_JOB_ASSIGNMENT =
new PersistentTasksCustomMetaData.Assignment(null, "datafeed awaiting job assignment.");
private final String datafeedId; private final String datafeedId;
private final String jobId; private final String jobId;
private final List<String> datafeedIndices; private final List<String> datafeedIndices;
@ -76,9 +79,14 @@ public class DatafeedNodeSelector {
AssignmentFailure assignmentFailure = checkAssignment(); AssignmentFailure assignmentFailure = checkAssignment();
if (assignmentFailure == null) { if (assignmentFailure == null) {
return new PersistentTasksCustomMetaData.Assignment(jobTask.getExecutorNode(), ""); String jobNode = jobTask.getExecutorNode();
if (jobNode == null) {
return AWAITING_JOB_ASSIGNMENT;
}
return new PersistentTasksCustomMetaData.Assignment(jobNode, "");
} }
LOGGER.debug(assignmentFailure.reason); LOGGER.debug(assignmentFailure.reason);
assert assignmentFailure.reason.isEmpty() == false;
return new PersistentTasksCustomMetaData.Assignment(null, assignmentFailure.reason); return new PersistentTasksCustomMetaData.Assignment(null, assignmentFailure.reason);
} }

View File

@ -32,6 +32,7 @@ import org.elasticsearch.xpack.core.ml.action.PostDataAction;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
@ -424,6 +425,60 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertBusy(() -> assertJobTask(jobId, JobState.OPENED, true)); assertBusy(() -> assertJobTask(jobId, JobState.OPENED, true));
} }
public void testCloseUnassignedLazyJobAndDatafeed() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);
ensureStableCluster(3);
String jobId = "test-lazy-stop";
String datafeedId = jobId + "-datafeed";
// Assume the test machine won't have space to assign a 2TB job
Job.Builder job = createJob(jobId, new ByteSizeValue(2, ByteSizeUnit.TB), true);
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
client().admin().indices().prepareCreate("data").addMapping("type", "time", "type=date").get();
DatafeedConfig config = createDatafeed(datafeedId, jobId, Collections.singletonList("data"));
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet();
ensureYellow(); // at least the primary shards of the indices a job uses should be started
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(jobId);
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
// Job state should be opening because it won't fit anyway, but is allowed to open lazily
GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId);
GetJobsStatsAction.Response jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet();
assertEquals(JobState.OPENING, jobStatsResponse.getResponse().results().get(0).getState());
StartDatafeedAction.Request startDataFeedRequest = new StartDatafeedAction.Request(config.getId(), 0L);
client().execute(StartDatafeedAction.INSTANCE, startDataFeedRequest).actionGet();
// Datafeed state should be starting while it waits for job assignment
GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId);
GetDatafeedsStatsAction.Response datafeedStatsResponse =
client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet();
assertEquals(DatafeedState.STARTING, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState());
// A starting datafeed can be stopped normally or by force
StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId);
stopDatafeedRequest.setForce(randomBoolean());
StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet();
assertTrue(stopDatafeedResponse.isStopped());
datafeedStatsResponse = client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet();
assertEquals(DatafeedState.STOPPED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState());
// An opening job can also be stopped normally or by force
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId);
closeJobRequest.setForce(randomBoolean());
CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet();
assertTrue(closeJobResponse.isClosed());
jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet();
assertEquals(JobState.CLOSED, jobStatsResponse.getResponse().results().get(0).getState());
}
private void assertJobTask(String jobId, JobState expectedState, boolean hasExecutorNode) { private void assertJobTask(String jobId, JobState expectedState, boolean hasExecutorNode) {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

View File

@ -146,6 +146,10 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
} }
protected Job.Builder createJob(String id, ByteSizeValue modelMemoryLimit) { protected Job.Builder createJob(String id, ByteSizeValue modelMemoryLimit) {
return createJob(id, modelMemoryLimit, false);
}
protected Job.Builder createJob(String id, ByteSizeValue modelMemoryLimit, boolean allowLazyOpen) {
DataDescription.Builder dataDescription = new DataDescription.Builder(); DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
dataDescription.setTimeFormat(DataDescription.EPOCH_MS); dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
@ -160,6 +164,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
} }
builder.setAnalysisConfig(analysisConfig); builder.setAnalysisConfig(analysisConfig);
builder.setDataDescription(dataDescription); builder.setDataDescription(dataDescription);
builder.setAllowLazyOpen(allowLazyOpen);
return builder; return builder;
} }

View File

@ -142,7 +142,7 @@
- do: - do:
ml.open_job: ml.open_job:
job_id: job-model-memory-limit-as-string job_id: job-model-memory-limit-as-string
- match: { opened: false } - match: { opened: true }
- do: - do:
headers: headers: