[ML] Unclutter failed job assignment explanations (elastic/x-pack-elasticsearch#4179)

Unclutter failed job assignment explanations

Original commit: elastic/x-pack-elasticsearch@1c3deebaac
This commit is contained in:
David Kyle 2018-03-22 17:45:57 +00:00 committed by GitHub
parent cd4a073bb5
commit 179090c840
2 changed files with 74 additions and 14 deletions

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -154,7 +155,8 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
Map<String, String> nodeAttributes = node.getAttributes();
String enabled = nodeAttributes.get(MachineLearning.ML_ENABLED_NODE_ATTR);
if (Boolean.valueOf(enabled) == false) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node isn't a ml node.";
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameOrId(node)
+ "], because this node isn't a ml node.";
logger.trace(reason);
reasons.add(reason);
continue;
@ -164,15 +166,15 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
Job job = mlMetadata.getJobs().get(jobId);
Set<String> compatibleJobTypes = Job.getCompatibleJobTypes(node.getVersion());
if (compatibleJobTypes.contains(job.getJobType()) == false) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node does not support jobs of type ["
+ job.getJobType() + "]";
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) +
"], because this node does not support jobs of type [" + job.getJobType() + "]";
logger.trace(reason);
reasons.add(reason);
continue;
}
if (nodeSupportsJobVersion(node.getVersion()) == false) {
String reason = "Not opening job [" + jobId + "] on node [" + node
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node)
+ "], because this node does not support jobs of version [" + job.getJobVersion() + "]";
logger.trace(reason);
reasons.add(reason);
@ -180,8 +182,9 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
}
if (nodeSupportsModelSnapshotVersion(node, job) == false) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because the job's model snapshot requires " +
"a node of version [" + job.getModelSnapshotMinVersion() + "] or higher";
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node)
+ "], because the job's model snapshot requires a node of version ["
+ job.getModelSnapshotMinVersion() + "] or higher";
logger.trace(reason);
reasons.add(reason);
continue;
@ -210,7 +213,8 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
}
}
if (numberOfAllocatingJobs >= maxConcurrentJobAllocations) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because node exceeds [" + numberOfAllocatingJobs +
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
+ "], because node exceeds [" + numberOfAllocatingJobs +
"] the maximum number of jobs [" + maxConcurrentJobAllocations + "] in opening state";
logger.trace(reason);
reasons.add(reason);
@ -224,7 +228,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
try {
maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr);
} catch (NumberFormatException e) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because " +
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " +
MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer";
logger.trace(reason);
reasons.add(reason);
@ -233,9 +237,9 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
}
long availableCount = maxNumberOfOpenJobs - numberOfAssignedJobs;
if (availableCount == 0) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node is full. " +
"Number of opened jobs [" + numberOfAssignedJobs + "], " + MAX_OPEN_JOBS_PER_NODE.getKey() +
" [" + maxNumberOfOpenJobs + "]";
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
+ "], because this node is full. Number of opened jobs [" + numberOfAssignedJobs
+ "], " + MAX_OPEN_JOBS_PER_NODE.getKey() + " [" + maxNumberOfOpenJobs + "]";
logger.trace(reason);
reasons.add(reason);
continue;
@ -253,7 +257,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
try {
machineMemory = Long.parseLong(machineMemoryStr);
} catch (NumberFormatException e) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because " +
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " +
MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long";
logger.trace(reason);
reasons.add(reason);
@ -267,7 +271,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
long estimatedMemoryFootprint = job.estimateMemoryFootprint();
long availableMemory = maxMlMemory - assignedJobMemory;
if (estimatedMemoryFootprint > availableMemory) {
String reason = "Not opening job [" + jobId + "] on node [" + node +
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) +
"], because this node has insufficient available memory. Available memory for ML [" + maxMlMemory +
"], memory required by existing jobs [" + assignedJobMemory +
"], estimated memory required for this job [" + estimatedMemoryFootprint + "]";
@ -285,7 +289,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
// the cluster, fall back to simply allocating by job count
allocateByMemory = false;
logger.debug("Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]",
jobId, node);
jobId, nodeNameAndMlAttributes(node));
}
}
}
@ -300,6 +304,33 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
}
}
static String nodeNameOrId(DiscoveryNode node) {
String nodeNameOrID = node.getName();
if (Strings.isNullOrEmpty(nodeNameOrID)) {
nodeNameOrID = node.getId();
}
return nodeNameOrID;
}
static String nodeNameAndVersion(DiscoveryNode node) {
String nodeNameOrID = nodeNameOrId(node);
StringBuilder builder = new StringBuilder("{").append(nodeNameOrID).append('}');
builder.append('{').append("version=").append(node.getVersion()).append('}');
return builder.toString();
}
static String nodeNameAndMlAttributes(DiscoveryNode node) {
String nodeNameOrID = nodeNameOrId(node);
StringBuilder builder = new StringBuilder("{").append(nodeNameOrID).append('}');
for (Map.Entry<String, String> entry : node.getAttributes().entrySet()) {
if (entry.getKey().startsWith("ml.") || entry.getKey().equals("node.ml")) {
builder.append('{').append(entry).append('}');
}
}
return builder.toString();
}
static String[] indicesOfInterest(ClusterState clusterState, String job) {
String jobResultIndex = AnomalyDetectorsIndex.getPhysicalIndexFromState(clusterState, job);
return new String[]{AnomalyDetectorsIndex.jobStateIndexName(), jobResultIndex, MlMetaIndex.INDEX_NAME};

View File

@ -54,6 +54,8 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Function;
import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder;
@ -496,6 +498,33 @@ public class TransportOpenJobActionTests extends ESTestCase {
assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger));
}
public void testNodeNameAndVersion() {
TransportAddress ta = new TransportAddress(InetAddress.getLoopbackAddress(), 9300);
Map<String, String> attributes = new HashMap<>();
attributes.put("unrelated", "attribute");
DiscoveryNode node = new DiscoveryNode("_node_name1", "_node_id1", ta, attributes, Collections.emptySet(), Version.CURRENT);
assertEquals("{_node_name1}{version=" + node.getVersion() + "}", TransportOpenJobAction.nodeNameAndVersion(node));
}
public void testNodeNameAndMlAttributes() {
TransportAddress ta = new TransportAddress(InetAddress.getLoopbackAddress(), 9300);
SortedMap<String, String> attributes = new TreeMap<>();
attributes.put("unrelated", "attribute");
DiscoveryNode node = new DiscoveryNode("_node_name1", "_node_id1", ta, attributes, Collections.emptySet(), Version.CURRENT);
assertEquals("{_node_name1}", TransportOpenJobAction.nodeNameAndMlAttributes(node));
attributes.put("ml.machine_memory", "5");
node = new DiscoveryNode("_node_name1", "_node_id1", ta, attributes, Collections.emptySet(), Version.CURRENT);
assertEquals("{_node_name1}{ml.machine_memory=5}", TransportOpenJobAction.nodeNameAndMlAttributes(node));
node = new DiscoveryNode(null, "_node_id1", ta, attributes, Collections.emptySet(), Version.CURRENT);
assertEquals("{_node_id1}{ml.machine_memory=5}", TransportOpenJobAction.nodeNameAndMlAttributes(node));
attributes.put("node.ml", "true");
node = new DiscoveryNode("_node_name1", "_node_id1", ta, attributes, Collections.emptySet(), Version.CURRENT);
assertEquals("{_node_name1}{ml.machine_memory=5}{node.ml=true}", TransportOpenJobAction.nodeNameAndMlAttributes(node));
}
public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) {
builder.addTask(MlMetadata.jobTaskId(jobId), OpenJobAction.TASK_NAME, new OpenJobAction.JobParams(jobId),
new Assignment(nodeId, "test assignment"));