[ML] Don't treat stale FAILED jobs as OPENING in job allocation (#31800)

Job persistent tasks with stale allocation IDs used to always be
considered as OPENING jobs in the ML job node allocation decision.
However, FAILED jobs are not relocated to other nodes, which leads
to them blocking up the nodes they failed on after node restarts.
FAILED jobs should not restrict how many other jobs can open on a
node, regardless of whether they are stale or not.

Closes #31794
This commit is contained in:
David Roberts 2018-07-05 13:26:17 +01:00 committed by GitHub
parent 9c11bf1e12
commit 92de94c237
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 10 deletions

View File

@ -210,16 +210,27 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
for (PersistentTasksCustomMetaData.PersistentTask<?> assignedTask : assignedTasks) {
JobTaskState jobTaskState = (JobTaskState) assignedTask.getState();
JobState jobState;
if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING
// previous executor node failed and current executor node didn't have the chance to set job status to OPENING
jobTaskState.isStatusStale(assignedTask)) {
if (jobTaskState == null) {
// executor node didn't have the chance to set job status to OPENING
++numberOfAllocatingJobs;
jobState = JobState.OPENING;
} else {
jobState = jobTaskState.getState();
if (jobTaskState.isStatusStale(assignedTask)) {
if (jobState == JobState.CLOSING) {
// previous executor node failed while the job was closing - it won't
// be reopened, so consider it CLOSED for resource usage purposes
jobState = JobState.CLOSED;
} else if (jobState != JobState.FAILED) {
// previous executor node failed and current executor node didn't
// have the chance to set job status to OPENING
++numberOfAllocatingJobs;
jobState = JobState.OPENING;
}
}
}
// Don't count FAILED jobs, as they don't consume native memory
if (jobState != JobState.FAILED) {
// Don't count CLOSED or FAILED jobs, as they don't consume native memory
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
++numberOfAssignedJobs;
String assignedJobId = ((OpenJobAction.JobParams) assignedTask.getParams()).getJobId();
Job assignedJob = mlMetadata.getJobs().get(assignedJobId);

View File

@ -55,7 +55,6 @@ import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@ -285,7 +284,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
nodeAttr, Collections.emptySet(), Version.CURRENT))
.build();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id1", "_node_id1", null, tasksBuilder);
addJobTask("job_id2", "_node_id1", null, tasksBuilder);
addJobTask("job_id3", "_node_id2", null, tasksBuilder);
@ -340,6 +339,55 @@ public class TransportOpenJobActionTests extends ESTestCase {
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
}
public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.build();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id1", "_node_id1", JobState.fromString("failed"), tasksBuilder);
// This will make the allocation stale for job_id1
tasksBuilder.reassignTask(MlMetadata.jobTaskId("job_id1"), new Assignment("_node_id1", "test assignment"));
addJobTask("job_id2", "_node_id1", null, tasksBuilder);
addJobTask("job_id3", "_node_id2", null, tasksBuilder);
addJobTask("job_id4", "_node_id2", null, tasksBuilder);
addJobTask("job_id5", "_node_id3", null, tasksBuilder);
addJobTask("job_id6", "_node_id3", null, tasksBuilder);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
csBuilder.nodes(nodes);
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5", "job_id6", "job_id7", "job_id8");
csBuilder.routingTable(routingTable.build());
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
csBuilder.metaData(metaData);
ClusterState cs = csBuilder.build();
// Allocation won't be possible if the stale failed job is treated as opening
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger);
assertEquals("_node_id1", result.getExecutorNode());
tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
addJobTask("job_id7", "_node_id1", null, tasksBuilder);
tasks = tasksBuilder.build();
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", cs, 2, 10, 30, logger);
assertNull("no node selected, because OPENING state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
}
public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
@ -710,13 +758,13 @@ public class TransportOpenJobActionTests extends ESTestCase {
private static Function<String, Job> jobWithRulesCreator() {
return jobId -> {
DetectionRule rule = new DetectionRule.Builder(Arrays.asList(
DetectionRule rule = new DetectionRule.Builder(Collections.singletonList(
new RuleCondition(RuleCondition.AppliesTo.TYPICAL, Operator.LT, 100.0)
)).build();
Detector.Builder detector = new Detector.Builder("count", null);
detector.setRules(Arrays.asList(rule));
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
detector.setRules(Collections.singletonList(rule));
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
DataDescription.Builder dataDescription = new DataDescription.Builder();
Job.Builder job = new Job.Builder(jobId);
job.setAnalysisConfig(analysisConfig);