[ML] Use JobTaskStatus#staleTask(...) instead of PersistentTask#needsReassignement(...) for checking whether a job task is stale when allocation a datafeed to a node.

Original commit: elastic/x-pack-elasticsearch@0952c455fe
This commit is contained in:
Martijn van Groningen 2017-04-24 14:53:30 +02:00
parent 0b267242f1
commit a98d593576
5 changed files with 12 additions and 19 deletions

View File

@ -585,7 +585,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus();
return jobTaskState == null || // executor node didn't have the chance to set job status to OPENING
jobTaskState.staleStatus(task); // previous executor node failed and
jobTaskState.isStatusStale(task); // previous executor node failed and
// current executor node didn't have the chance to set job status to OPENING
}).size();
} else {

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -572,7 +571,6 @@ public class StartDatafeedAction
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
DiscoveryNodes nodes = clusterState.getNodes();
PersistentTask<?> jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks);
if (jobTask == null) {
@ -580,12 +578,6 @@ public class StartDatafeedAction
logger.debug(reason);
return new Assignment(null, reason);
}
if (jobTask.needsReassignment(nodes)) {
String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() +
"] is unassigned or unassigned to a non existing node";
logger.debug(reason);
return new Assignment(null, reason);
}
JobTaskStatus taskStatus = (JobTaskStatus) jobTask.getStatus();
if (taskStatus == null || taskStatus.getState() != JobState.OPENED) {
// lets try again later when the job has been opened:
@ -595,6 +587,11 @@ public class StartDatafeedAction
logger.debug(reason);
return new Assignment(null, reason);
}
if (taskStatus.isStatusStale(jobTask)) {
String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] status is stale";
logger.debug(reason);
return new Assignment(null, reason);
}
String reason = verifyIndicesActive(logger, datafeed, clusterState, resolver);
if (reason != null) {
return new Assignment(null, reason);

View File

@ -67,7 +67,7 @@ public class JobTaskStatus implements Task.Status {
return state;
}
public boolean staleStatus(PersistentTask<?> task) {
public boolean isStatusStale(PersistentTask<?> task) {
return allocationId != task.getAllocationId();
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
@ -237,24 +238,20 @@ public class StartDatafeedActionTests extends ESTestCase {
String nodeId = randomBoolean() ? "node_id2" : null;
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask(job.getId(), nodeId, JobState.OPENED, tasksBuilder);
// Set to lower allocationId, so job task is stale:
tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId(job.getId()), new JobTaskStatus(JobState.OPENED, 0));
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
.put(indexMetaData, false))
.nodes(nodes)
.routingTable(generateRoutingTable(indexMetaData));
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
assertNull(result.getExecutorNode());
assertEquals("cannot start datafeed [datafeed_id], job [job_id] is unassigned or unassigned to a non existing node",
assertEquals("cannot start datafeed [datafeed_id], job [job_id] status is stale",
result.getExplanation());
tasksBuilder = PersistentTasksCustomMetaData.builder();
@ -265,7 +262,6 @@ public class StartDatafeedActionTests extends ESTestCase {
.putCustom(MlMetadata.TYPE, mlMetadata.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
.put(indexMetaData, false))
.nodes(nodes)
.routingTable(generateRoutingTable(indexMetaData));
result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
assertEquals("node_id1", result.getExecutorNode());

View File

@ -273,7 +273,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
Collection<PersistentTask<?>> foundTasks = tasks.findTasks(OpenJobAction.TASK_NAME, task -> {
JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus();
return node.getId().equals(task.getExecutorNode()) &&
(jobTaskState == null || jobTaskState.staleStatus(task));
(jobTaskState == null || jobTaskState.isStatusStale(task));
});
int count = foundTasks.size();
if (count > maxConcurrentJobAllocations) {