diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index e0586dfbdba..cf444f6f50b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -149,9 +149,8 @@ public class MachineLearning extends Plugin implements ActionPlugin { public static final Setting USE_NATIVE_PROCESS_OPTION = Setting.boolSetting("useNativeProcess", true, Property.NodeScope, Property.Deprecated); - public static final String ALLOCATION_ENABLED_ATTR = "xpack.ml.allocation_enabled"; - public static final Setting ALLOCATION_ENABLED = Setting.boolSetting("node.attr." + ALLOCATION_ENABLED_ATTR, - XPackSettings.MACHINE_LEARNING_ENABLED, Setting.Property.NodeScope); + public static final Setting ML_ENABLED = + Setting.boolSetting("node.ml", XPackSettings.MACHINE_LEARNING_ENABLED, Setting.Property.NodeScope); public static final Setting CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope); @@ -175,7 +174,7 @@ public class MachineLearning extends Plugin implements ActionPlugin { public List> getSettings() { return Collections.unmodifiableList( Arrays.asList(USE_NATIVE_PROCESS_OPTION, - ALLOCATION_ENABLED, + ML_ENABLED, CONCURRENT_JOB_ALLOCATIONS, ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING, ProcessCtrl.MAX_ANOMALY_RECORDS_SETTING, @@ -186,25 +185,18 @@ public class MachineLearning extends Plugin implements ActionPlugin { @Override public Settings additionalSettings() { - Settings.Builder additionalSettings = Settings.builder(); - Boolean allocationEnabled = settings.getAsBoolean(ALLOCATION_ENABLED.getKey(), null); - if (enabled == false) { - if (allocationEnabled != null) { - // if the ml plugin has been disabled the ml allocation enabled node attribute shouldn't be set, - // otherwise other nodes will allocate jobs to this node and that will fail, because ml hasn't been loaded. - throw new IllegalArgumentException("Can't specify [" + ALLOCATION_ENABLED.getKey() + "] to true when [" + - XPackSettings.MACHINE_LEARNING_ENABLED.getKey() + "] has been set to false"); - } + if (enabled == false || this.transportClientMode) { return super.additionalSettings(); } - if (allocationEnabled == null) { - // Make sure that we explicitly set allocation enabled node attribute if it has been specified in the node - // settings. So we can always rely on it during assigning job tasks to nodes. - additionalSettings.put(ALLOCATION_ENABLED.getKey(), ALLOCATION_ENABLED.get(settings)); + + Settings.Builder additionalSettings = Settings.builder(); + additionalSettings.put(super.additionalSettings()); + Boolean allocationEnabled = ML_ENABLED.get(settings); + if (allocationEnabled != null && allocationEnabled) { + // Copy max_running_jobs setting to node attribute, so that we use this information when assigning job tasks to nodes: + additionalSettings.put("node.attr." + AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), + AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings)); } - // Add max running job limit as node attribute so that we use this information assigning job tasks to nodes - additionalSettings.put("node.attr." + AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), - AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings)); return additionalSettings.build(); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 6720b233cc0..7b0f2c8e051 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -376,8 +376,8 @@ public class OpenJobAction extends Action nodeAttributes = node.getAttributes(); - String allocationEnabled = nodeAttributes.get(MachineLearning.ALLOCATION_ENABLED_ATTR); - if ("true".equals(allocationEnabled) == false) { + String maxNumberOfOpenJobsStr = nodeAttributes.get(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey()); + if (maxNumberOfOpenJobsStr == null) { String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node isn't a ml node."; logger.debug(reason); reasons.add(reason); @@ -410,7 +410,7 @@ public class OpenJobAction extends Action nodeAttr = new HashMap<>(); - nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"); nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), @@ -128,7 +126,6 @@ public class OpenJobActionTests extends ESTestCase { int maxRunningJobsPerNode = randomIntBetween(1, 100); Map nodeAttr = new HashMap<>(); - nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"); nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode)); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); Map> taskMap = new HashMap<>(); @@ -152,14 +149,11 @@ public class OpenJobActionTests extends ESTestCase { } public void testSelectLeastLoadedMlNode_noMlNodes() { - Map nodeAttr = new HashMap<>(); - nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "false"); - nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), - nodeAttr, Collections.emptySet(), Version.CURRENT)) + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), - nodeAttr, Collections.emptySet(), Version.CURRENT)) + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); PersistentTaskInProgress task = @@ -175,7 +169,6 @@ public class OpenJobActionTests extends ESTestCase { public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { Map nodeAttr = new HashMap<>(); - nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"); nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index b2c12c7b0fd..2528603f28b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -140,11 +140,11 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { public void testDedicatedMlNode() throws Exception { internalCluster().ensureAtMostNumDataNodes(0); // start 2 non ml node that will never get a job allocated. (but ml apis are accessable from this node) - internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), false)); - internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), false)); + internalCluster().startNode(Settings.builder().put(MachineLearning.ML_ENABLED.getKey(), false)); + internalCluster().startNode(Settings.builder().put(MachineLearning.ML_ENABLED.getKey(), false)); // start ml node if (randomBoolean()) { - internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), true)); + internalCluster().startNode(Settings.builder().put(MachineLearning.ML_ENABLED.getKey(), true)); } else { // the default is based on 'xpack.ml.enabled', which is enabled in base test class. internalCluster().startNode(); @@ -165,14 +165,13 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); Map expectedNodeAttr = new HashMap<>(); - expectedNodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"); expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); assertEquals(expectedNodeAttr, node.getAttributes()); assertEquals(JobState.OPENED, task.getStatus()); }); logger.info("stop the only running ml node"); - internalCluster().stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), true)); + internalCluster().stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ML_ENABLED.getKey(), true)); ensureStableCluster(2); assertBusy(() -> { // job should get and remain in a failed state: @@ -186,7 +185,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { }); logger.info("start ml node"); - internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), true)); + internalCluster().startNode(Settings.builder().put(MachineLearning.ML_ENABLED.getKey(), true)); ensureStableCluster(3); assertBusy(() -> { // job should be re-opened: @@ -197,7 +196,6 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertNotNull(task.getExecutorNode()); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); Map expectedNodeAttr = new HashMap<>(); - expectedNodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"); expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); assertEquals(expectedNodeAttr, node.getAttributes()); assertEquals(JobState.OPENED, task.getStatus()); @@ -211,12 +209,12 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { // start non ml node, but that will hold the indices logger.info("Start non ml node:"); String nonMlNode = internalCluster().startNode(Settings.builder() - .put(MachineLearning.ALLOCATION_ENABLED.getKey(), false)); + .put(MachineLearning.ML_ENABLED.getKey(), false)); logger.info("Starting ml nodes"); internalCluster().startNodes(numMlNodes, Settings.builder() .put("node.data", false) .put("node.master", false) - .put(MachineLearning.ALLOCATION_ENABLED.getKey(), true).build()); + .put(MachineLearning.ML_ENABLED.getKey(), true).build()); ensureStableCluster(numMlNodes + 1); int maxConcurrentJobAllocations = randomIntBetween(1, 4); @@ -273,7 +271,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { Runnable r = () -> { try { internalCluster() - .stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), false)); + .stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ML_ENABLED.getKey(), false)); } catch (IOException e) { logger.error("error stopping node", e); } @@ -294,7 +292,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { internalCluster().startNodes(numMlNodes, Settings.builder() .put("node.data", false) .put("node.master", false) - .put(MachineLearning.ALLOCATION_ENABLED.getKey(), true).build()); + .put(MachineLearning.ML_ENABLED.getKey(), true).build()); ensureStableCluster(1 + numMlNodes); assertBusy(() -> {