diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 1c73460e59d..8901ada643a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -124,13 +124,13 @@ import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction; import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction; import org.elasticsearch.xpack.persistent.PersistentTaskParams; -import org.elasticsearch.xpack.persistent.StartPersistentTaskAction; import org.elasticsearch.xpack.persistent.PersistentTasksClusterService; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksExecutorRegistry; import org.elasticsearch.xpack.persistent.PersistentTasksNodeService; import org.elasticsearch.xpack.persistent.PersistentTasksService; import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; +import org.elasticsearch.xpack.persistent.StartPersistentTaskAction; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.xpack.security.InternalClient; @@ -155,6 +155,7 @@ public class MachineLearning implements ActionPlugin { Setting.boolSetting("xpack.ml.autodetect_process", true, Property.NodeScope); public static final Setting ML_ENABLED = Setting.boolSetting("node.ml", XPackSettings.MACHINE_LEARNING_ENABLED, Setting.Property.NodeScope); + public static final String ML_ENABLED_NODE_ATTR = "ml.enabled"; public static final Setting CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope); @@ -198,9 +199,7 @@ public class MachineLearning implements ActionPlugin { Settings.Builder additionalSettings = Settings.builder(); Boolean allocationEnabled = ML_ENABLED.get(settings); if (allocationEnabled != null && allocationEnabled) { - // Copy max_running_jobs setting to node attribute, so that we use this information when assigning job tasks to nodes: - additionalSettings.put("node.attr." + AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), - AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings)); + additionalSettings.put("node.attr." + ML_ENABLED_NODE_ATTR, "true"); } return additionalSettings.build(); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index ca94ce6bc1e..5e3fd5f6559 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -378,6 +378,7 @@ public class OpenJobAction extends Action unavailableIndices = verifyIndicesPrimaryShardsAreActive(jobId, clusterState); if (unavailableIndices.size() != 0) { String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" + @@ -471,8 +474,8 @@ public class OpenJobAction extends Action nodeAttributes = node.getAttributes(); - String maxNumberOfOpenJobsStr = nodeAttributes.get(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey()); - if (maxNumberOfOpenJobsStr == null) { + String enabled = nodeAttributes.get(MachineLearning.ML_ENABLED_NODE_ATTR); + if (Boolean.valueOf(enabled) == false) { String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node isn't a ml node."; logger.trace(reason); reasons.add(reason); @@ -504,7 +507,6 @@ public class OpenJobAction extends Action MAX_RUNNING_JOBS_PER_NODE = Setting.intSetting("max_running_jobs", 10, 1, 512, Setting.Property.NodeScope); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/notifications/Auditor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/notifications/Auditor.java index a675c1686a1..dad9c1b7786 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/notifications/Auditor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/notifications/Auditor.java @@ -56,7 +56,7 @@ public class Auditor { @Override public void onFailure(Exception e) { - LOGGER.error(new ParameterizedMessage("Error writing {}", new Object[]{type}, e)); + LOGGER.debug(new ParameterizedMessage("Error writing {}", new Object[]{type}, e)); } }); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index 6f528cbcbec..44c31b55ad9 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; @@ -44,7 +45,6 @@ import java.util.List; import java.util.Map; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; -import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE; public class OpenJobActionTests extends ESTestCase { @@ -79,7 +79,7 @@ public class OpenJobActionTests extends ESTestCase { public void testSelectLeastLoadedMlNode() { Map nodeAttr = new HashMap<>(); - nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); + nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), nodeAttr, Collections.emptySet(), Version.CURRENT)) @@ -103,7 +103,7 @@ public class OpenJobActionTests extends ESTestCase { metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); cs.routingTable(routingTable.build()); - Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, logger); + Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, 10, logger); assertEquals("_node_id3", result.getExecutorNode()); } @@ -112,7 +112,7 @@ public class OpenJobActionTests extends ESTestCase { int maxRunningJobsPerNode = randomIntBetween(1, 100); Map nodeAttr = new HashMap<>(); - nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode)); + nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); for (int i = 0; i < numNodes; i++) { @@ -134,7 +134,7 @@ public class OpenJobActionTests extends ESTestCase { metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); cs.routingTable(routingTable.build()); - Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger); + Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, maxRunningJobsPerNode, logger); assertNull(result.getExecutorNode()); assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode + "], max_running_jobs [" + maxRunningJobsPerNode + "]")); @@ -160,14 +160,14 @@ public class OpenJobActionTests extends ESTestCase { metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); cs.routingTable(routingTable.build()); - Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger); + Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, 10, logger); assertTrue(result.getExplanation().contains("because this node isn't a ml node")); assertNull(result.getExecutorNode()); } public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { Map nodeAttr = new HashMap<>(); - nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); + nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), nodeAttr, Collections.emptySet(), Version.CURRENT)) @@ -195,7 +195,7 @@ public class OpenJobActionTests extends ESTestCase { csBuilder.metaData(metaData); ClusterState cs = csBuilder.build(); - Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger); + Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, 10, logger); assertEquals("_node_id3", result.getExecutorNode()); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); @@ -205,7 +205,7 @@ public class OpenJobActionTests extends ESTestCase { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -216,7 +216,7 @@ public class OpenJobActionTests extends ESTestCase { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, logger); assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -227,7 +227,7 @@ public class OpenJobActionTests extends ESTestCase { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, logger); assertNull("no node selected, because null state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index f9d558a6ce6..61218f9ca8b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -41,8 +41,6 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE; - public class BasicDistributedJobsIT extends BaseMlIntegTestCase { public void testFailOverBasics() throws Exception { @@ -173,7 +171,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); Map expectedNodeAttr = new HashMap<>(); - expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); + expectedNodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); assertEquals(expectedNodeAttr, node.getAttributes()); JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); assertNotNull(jobTaskStatus); @@ -379,7 +377,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertFalse(task.needsReassignment(clusterState.nodes())); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); Map expectedNodeAttr = new HashMap<>(); - expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); + expectedNodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); assertEquals(expectedNodeAttr, node.getAttributes()); JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml index d33c449e6d3..f5fd378f390 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml @@ -111,7 +111,7 @@ setup: - match: { datafeeds.0.state: "started"} - is_true: datafeeds.0.node.name - is_true: datafeeds.0.node.transport_address - - match: { datafeeds.0.node.attributes.max_running_jobs: "10"} + - match: { datafeeds.0.node.attributes.ml\.enabled: "true"} --- "Test implicit get all datafeed stats given started datafeeds": diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml index d4df12f7362..fcc831a021a 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml @@ -79,7 +79,7 @@ setup: - match: { jobs.0.state: opened } - is_true: jobs.0.node.name - is_true: jobs.0.node.transport_address - - match: { jobs.0.node.attributes.max_running_jobs: "10"} + - match: { jobs.0.node.attributes.ml\.enabled: "true"} - is_true: jobs.0.open_time ---