From 22da5cf89efd6edfd44ced802fe550d91ad3c643 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 8 Aug 2017 16:16:27 +0100 Subject: [PATCH] [ML] Add max open jobs per node as a node attribute (elastic/x-pack-elasticsearch#2203) This commit adds the max_running_jobs setting from elasticsearch.yml into a node attribute called ml.max_open_jobs. Previously there was an assumption that max_running_jobs would be the same for all nodes in the cluster. However, during a rolling cluster restart where the value of the setting is being changed this clearly cannot be the case, and would cause unexpected/unpredictable limits to be used during the period when different nodes had different settings. For backwards compatibility, if another node in the cluster has not added its setting for max_running_jobs to the cluster state then the old (flawed but better than nothing) approach is applied, i.e. assume the remote node's setting for max_running_jobs is equal to that of the node deciding the job allocation. Relates elastic/x-pack-elasticsearch#2185 Original commit: elastic/x-pack-elasticsearch@1e62b89183c0726b1496f52dff102033076cdab1 --- .../xpack/ml/MachineLearning.java | 6 +++- .../xpack/ml/action/OpenJobAction.java | 32 +++++++++++++++---- .../integration/BasicDistributedJobsIT.java | 2 ++ 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 9e2e2413886..362b3afdf5f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -155,8 +155,9 @@ public class MachineLearning implements ActionPlugin { public static final Setting AUTODETECT_PROCESS = Setting.boolSetting("xpack.ml.autodetect_process", true, Property.NodeScope); public static final Setting ML_ENABLED = - Setting.boolSetting("node.ml", XPackSettings.MACHINE_LEARNING_ENABLED, Setting.Property.NodeScope); + Setting.boolSetting("node.ml", XPackSettings.MACHINE_LEARNING_ENABLED, Property.NodeScope); public static final String ML_ENABLED_NODE_ATTR = "ml.enabled"; + public static final String MAX_OPEN_JOBS_NODE_ATTR = "ml.max_open_jobs"; public static final Setting CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope); @@ -200,7 +201,10 @@ public class MachineLearning implements ActionPlugin { Settings.Builder additionalSettings = Settings.builder(); Boolean allocationEnabled = ML_ENABLED.get(settings); if (allocationEnabled != null && allocationEnabled) { + // TODO: the simple true/false flag will not be required once all supported versions have the number - consider removing in 7.0 additionalSettings.put("node.attr." + ML_ENABLED_NODE_ATTR, "true"); + additionalSettings.put("node.attr." + MAX_OPEN_JOBS_NODE_ATTR, + AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings)); } return additionalSettings.build(); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 369f77319bd..6427de84d55 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -42,7 +42,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ObjectParser; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -566,14 +565,20 @@ public class OpenJobAction extends Action unavailableIndices = verifyIndicesPrimaryShardsAreActive(jobId, clusterState); if (unavailableIndices.size() != 0) { String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" + @@ -716,6 +722,20 @@ public class OpenJobAction extends Action expectedNodeAttr = new HashMap<>(); expectedNodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); + expectedNodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10"); assertEquals(expectedNodeAttr, node.getAttributes()); JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); assertNotNull(jobTaskStatus); @@ -402,6 +403,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); Map expectedNodeAttr = new HashMap<>(); expectedNodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); + expectedNodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10"); assertEquals(expectedNodeAttr, node.getAttributes()); JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();