[ML] State check doesn't need to know if task is stale now that task validation is only done at create time.

Original commit: elastic/x-pack-elasticsearch@d19858240b
This commit is contained in:
Martijn van Groningen 2017-03-23 14:40:20 +01:00
parent df86125a7d
commit 93d7b8c14b
4 changed files with 5 additions and 66 deletions

View File

@ -52,13 +52,12 @@ import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.JobStateObserver; import org.elasticsearch.xpack.ml.utils.JobStateObserver;
import org.elasticsearch.xpack.persistent.NodePersistentTask; import org.elasticsearch.xpack.persistent.NodePersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
import org.elasticsearch.xpack.persistent.PersistentTaskRequest; import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -452,20 +451,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
throw new ElasticsearchStatusException("Cannot open job [" + jobId + "] because it has been marked as deleted", throw new ElasticsearchStatusException("Cannot open job [" + jobId + "] because it has been marked as deleted",
RestStatus.CONFLICT); RestStatus.CONFLICT);
} }
PersistentTask<?> task = MlMetadata.getJobTask(jobId, tasks);
JobState jobState = MlMetadata.getJobState(jobId, tasks); JobState jobState = MlMetadata.getJobState(jobId, tasks);
if (task != null && jobState == JobState.OPENED) {
if (task.isAssigned() == false) {
// We can skip the job state check below, because the task got unassigned after we went into
// opened state on a node that disappeared and we didn't have the opportunity to set the status to failed
return;
} else if (nodes.nodeExists(task.getExecutorNode()) == false) {
// The state is open and the node were running on no longer exists.
// We can skip the job state check below, because when the node
// disappeared we didn't have time to set the status to failed.
return;
}
}
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
throw new ElasticsearchStatusException("[" + jobId + "] expected state [" + JobState.CLOSED throw new ElasticsearchStatusException("[" + jobId + "] expected state [" + JobState.CLOSED
+ "] or [" + JobState.FAILED + "], but got [" + jobState + "]", RestStatus.CONFLICT); + "] or [" + JobState.FAILED + "], but got [" + jobState + "]", RestStatus.CONFLICT);

View File

@ -58,12 +58,12 @@ import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.NodePersistentTask; import org.elasticsearch.xpack.persistent.NodePersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTaskRequest; import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksExecutor; import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
@ -478,20 +478,7 @@ public class StartDatafeedAction
RestStatus.CONFLICT, JobState.OPENED, jobState); RestStatus.CONFLICT, JobState.OPENED, jobState);
} }
PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks); DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks);
if (datafeedTask != null && datafeedState == DatafeedState.STARTED) {
if (datafeedTask.isAssigned() == false) {
// We can skip the datafeed state check below, because the task got unassigned after we went into
// started state on a node that disappeared and we didn't have the opportunity to set the status to stopped
return;
} else if (nodes.nodeExists(datafeedTask.getExecutorNode()) == false) {
// The state is started and the node were running on no longer exists.
// We can skip the datafeed state check below, because when the node
// disappeared we didn't have time to set the state to stopped.
return;
}
}
if (datafeedState == DatafeedState.STARTED) { if (datafeedState == DatafeedState.STARTED) {
throw new ElasticsearchStatusException("datafeed [{}] already started, expected datafeed state [{}], but got [{}]", throw new ElasticsearchStatusException("datafeed [{}] already started, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, datafeedId, DatafeedState.STOPPED, DatafeedState.STARTED); RestStatus.CONFLICT, datafeedId, DatafeedState.STOPPED, DatafeedState.STARTED);

View File

@ -62,10 +62,6 @@ public class OpenJobActionTests extends ESTestCase {
OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes); OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes);
OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap()), nodes); OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap()), nodes);
OpenJobAction.validate("job_id", mlBuilder.build(), null, nodes); OpenJobAction.validate("job_id", mlBuilder.build(), null, nodes);
task = createJobTask(1L, "job_id", "_other_node_id", JobState.OPENED);
tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task));
OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes);
} }
public void testValidate_jobMissing() { public void testValidate_jobMissing() {

View File

@ -166,36 +166,6 @@ public class StartDatafeedActionTests extends ESTestCase {
"but got [started]")); "but got [started]"));
} }
public void testValidate_staleTask() {
Job job1 = createScheduledJob("job_id").build();
DatafeedConfig datafeedConfig = createDatafeed("datafeed_id", "job_id", Collections.singletonList("*"));
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false)
.putDatafeed(datafeedConfig)
.build();
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTask<OpenJobAction.Request> jobTask = createJobTask(0L, "job_id", "node_id2", JobState.OPENED);
PersistentTask<StartDatafeedAction.Request> datafeedTask =
new PersistentTask<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
false, true, new Assignment("node_id1", "test assignment"));
datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED);
Map<Long, PersistentTask<?>> taskMap = new HashMap<>();
taskMap.put(0L, jobTask);
taskMap.put(1L, datafeedTask);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(2L, taskMap);
StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes);
datafeedTask = new PersistentTask<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
false, true, INITIAL_ASSIGNMENT);
datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED);
taskMap.put(1L, datafeedTask);
StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes);
}
public static StartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action, public static StartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action,
TaskId parentTaskId, TaskId parentTaskId,
StartDatafeedAction.Request request, StartDatafeedAction.Request request,