[ML] Rename `xpack.ml.allocation_enabled` boolean setting to `node.ml`.

Remove `node.attr.xpack.ml.allocation_enabled` node attribute and instead use `node.attr.max_running_jobs` attribute to indicate a node is a ml node.
(in addition to indicating how many jobs can hosted on a node)

Original commit: elastic/x-pack-elasticsearch@7979bc55b4
This commit is contained in:
Martijn van Groningen 2017-02-20 14:25:45 +01:00
parent 165c0d0e4f
commit 9100fa6333
4 changed files with 26 additions and 43 deletions

View File

@ -149,9 +149,8 @@ public class MachineLearning extends Plugin implements ActionPlugin {
public static final Setting<Boolean> USE_NATIVE_PROCESS_OPTION = Setting.boolSetting("useNativeProcess", true, Property.NodeScope, public static final Setting<Boolean> USE_NATIVE_PROCESS_OPTION = Setting.boolSetting("useNativeProcess", true, Property.NodeScope,
Property.Deprecated); Property.Deprecated);
public static final String ALLOCATION_ENABLED_ATTR = "xpack.ml.allocation_enabled"; public static final Setting<Boolean> ML_ENABLED =
public static final Setting<Boolean> ALLOCATION_ENABLED = Setting.boolSetting("node.attr." + ALLOCATION_ENABLED_ATTR, Setting.boolSetting("node.ml", XPackSettings.MACHINE_LEARNING_ENABLED, Setting.Property.NodeScope);
XPackSettings.MACHINE_LEARNING_ENABLED, Setting.Property.NodeScope);
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS = public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS =
Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope); Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope);
@ -175,7 +174,7 @@ public class MachineLearning extends Plugin implements ActionPlugin {
public List<Setting<?>> getSettings() { public List<Setting<?>> getSettings() {
return Collections.unmodifiableList( return Collections.unmodifiableList(
Arrays.asList(USE_NATIVE_PROCESS_OPTION, Arrays.asList(USE_NATIVE_PROCESS_OPTION,
ALLOCATION_ENABLED, ML_ENABLED,
CONCURRENT_JOB_ALLOCATIONS, CONCURRENT_JOB_ALLOCATIONS,
ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING, ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING,
ProcessCtrl.MAX_ANOMALY_RECORDS_SETTING, ProcessCtrl.MAX_ANOMALY_RECORDS_SETTING,
@ -186,25 +185,18 @@ public class MachineLearning extends Plugin implements ActionPlugin {
@Override @Override
public Settings additionalSettings() { public Settings additionalSettings() {
Settings.Builder additionalSettings = Settings.builder(); if (enabled == false || this.transportClientMode) {
Boolean allocationEnabled = settings.getAsBoolean(ALLOCATION_ENABLED.getKey(), null);
if (enabled == false) {
if (allocationEnabled != null) {
// if the ml plugin has been disabled the ml allocation enabled node attribute shouldn't be set,
// otherwise other nodes will allocate jobs to this node and that will fail, because ml hasn't been loaded.
throw new IllegalArgumentException("Can't specify [" + ALLOCATION_ENABLED.getKey() + "] to true when [" +
XPackSettings.MACHINE_LEARNING_ENABLED.getKey() + "] has been set to false");
}
return super.additionalSettings(); return super.additionalSettings();
} }
if (allocationEnabled == null) {
// Make sure that we explicitly set allocation enabled node attribute if it has been specified in the node Settings.Builder additionalSettings = Settings.builder();
// settings. So we can always rely on it during assigning job tasks to nodes. additionalSettings.put(super.additionalSettings());
additionalSettings.put(ALLOCATION_ENABLED.getKey(), ALLOCATION_ENABLED.get(settings)); Boolean allocationEnabled = ML_ENABLED.get(settings);
if (allocationEnabled != null && allocationEnabled) {
// Copy max_running_jobs setting to node attribute, so that we use this information when assigning job tasks to nodes:
additionalSettings.put("node.attr." + AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(),
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings));
} }
// Add max running job limit as node attribute so that we use this information assigning job tasks to nodes
additionalSettings.put("node.attr." + AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(),
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings));
return additionalSettings.build(); return additionalSettings.build();
} }

View File

@ -376,8 +376,8 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
PersistentTasksInProgress persistentTasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); PersistentTasksInProgress persistentTasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
for (DiscoveryNode node : clusterState.getNodes()) { for (DiscoveryNode node : clusterState.getNodes()) {
Map<String, String> nodeAttributes = node.getAttributes(); Map<String, String> nodeAttributes = node.getAttributes();
String allocationEnabled = nodeAttributes.get(MachineLearning.ALLOCATION_ENABLED_ATTR); String maxNumberOfOpenJobsStr = nodeAttributes.get(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey());
if ("true".equals(allocationEnabled) == false) { if (maxNumberOfOpenJobsStr == null) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node isn't a ml node."; String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node isn't a ml node.";
logger.debug(reason); logger.debug(reason);
reasons.add(reason); reasons.add(reason);
@ -410,7 +410,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
continue; continue;
} }
long maxNumberOfOpenJobs = Long.parseLong(node.getAttributes().get(MAX_RUNNING_JOBS_PER_NODE.getKey())); long maxNumberOfOpenJobs = Long.parseLong(maxNumberOfOpenJobsStr);
long available = maxNumberOfOpenJobs - numberOfAssignedJobs; long available = maxNumberOfOpenJobs - numberOfAssignedJobs;
if (available == 0) { if (available == 0) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node is full. " + String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node is full. " +

View File

@ -15,7 +15,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
@ -96,7 +95,6 @@ public class OpenJobActionTests extends ESTestCase {
public void testSelectLeastLoadedMlNode() { public void testSelectLeastLoadedMlNode() {
Map<String, String> nodeAttr = new HashMap<>(); Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true");
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
DiscoveryNodes nodes = DiscoveryNodes.builder() DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
@ -128,7 +126,6 @@ public class OpenJobActionTests extends ESTestCase {
int maxRunningJobsPerNode = randomIntBetween(1, 100); int maxRunningJobsPerNode = randomIntBetween(1, 100);
Map<String, String> nodeAttr = new HashMap<>(); Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true");
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode)); nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode));
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>(); Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
@ -152,14 +149,11 @@ public class OpenJobActionTests extends ESTestCase {
} }
public void testSelectLeastLoadedMlNode_noMlNodes() { public void testSelectLeastLoadedMlNode_noMlNodes() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "false");
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
DiscoveryNodes nodes = DiscoveryNodes.builder() DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT)) Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
nodeAttr, Collections.emptySet(), Version.CURRENT)) Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build(); .build();
PersistentTaskInProgress<OpenJobAction.Request> task = PersistentTaskInProgress<OpenJobAction.Request> task =
@ -175,7 +169,6 @@ public class OpenJobActionTests extends ESTestCase {
public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
Map<String, String> nodeAttr = new HashMap<>(); Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true");
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
DiscoveryNodes nodes = DiscoveryNodes.builder() DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),

View File

@ -140,11 +140,11 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
public void testDedicatedMlNode() throws Exception { public void testDedicatedMlNode() throws Exception {
internalCluster().ensureAtMostNumDataNodes(0); internalCluster().ensureAtMostNumDataNodes(0);
// start 2 non ml node that will never get a job allocated. (but ml apis are accessable from this node) // start 2 non ml node that will never get a job allocated. (but ml apis are accessable from this node)
internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), false)); internalCluster().startNode(Settings.builder().put(MachineLearning.ML_ENABLED.getKey(), false));
internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), false)); internalCluster().startNode(Settings.builder().put(MachineLearning.ML_ENABLED.getKey(), false));
// start ml node // start ml node
if (randomBoolean()) { if (randomBoolean()) {
internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), true)); internalCluster().startNode(Settings.builder().put(MachineLearning.ML_ENABLED.getKey(), true));
} else { } else {
// the default is based on 'xpack.ml.enabled', which is enabled in base test class. // the default is based on 'xpack.ml.enabled', which is enabled in base test class.
internalCluster().startNode(); internalCluster().startNode();
@ -165,14 +165,13 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
Map<String, String> expectedNodeAttr = new HashMap<>(); Map<String, String> expectedNodeAttr = new HashMap<>();
expectedNodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true");
expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
assertEquals(expectedNodeAttr, node.getAttributes()); assertEquals(expectedNodeAttr, node.getAttributes());
assertEquals(JobState.OPENED, task.getStatus()); assertEquals(JobState.OPENED, task.getStatus());
}); });
logger.info("stop the only running ml node"); logger.info("stop the only running ml node");
internalCluster().stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), true)); internalCluster().stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ML_ENABLED.getKey(), true));
ensureStableCluster(2); ensureStableCluster(2);
assertBusy(() -> { assertBusy(() -> {
// job should get and remain in a failed state: // job should get and remain in a failed state:
@ -186,7 +185,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
}); });
logger.info("start ml node"); logger.info("start ml node");
internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), true)); internalCluster().startNode(Settings.builder().put(MachineLearning.ML_ENABLED.getKey(), true));
ensureStableCluster(3); ensureStableCluster(3);
assertBusy(() -> { assertBusy(() -> {
// job should be re-opened: // job should be re-opened:
@ -197,7 +196,6 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertNotNull(task.getExecutorNode()); assertNotNull(task.getExecutorNode());
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
Map<String, String> expectedNodeAttr = new HashMap<>(); Map<String, String> expectedNodeAttr = new HashMap<>();
expectedNodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true");
expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
assertEquals(expectedNodeAttr, node.getAttributes()); assertEquals(expectedNodeAttr, node.getAttributes());
assertEquals(JobState.OPENED, task.getStatus()); assertEquals(JobState.OPENED, task.getStatus());
@ -211,12 +209,12 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
// start non ml node, but that will hold the indices // start non ml node, but that will hold the indices
logger.info("Start non ml node:"); logger.info("Start non ml node:");
String nonMlNode = internalCluster().startNode(Settings.builder() String nonMlNode = internalCluster().startNode(Settings.builder()
.put(MachineLearning.ALLOCATION_ENABLED.getKey(), false)); .put(MachineLearning.ML_ENABLED.getKey(), false));
logger.info("Starting ml nodes"); logger.info("Starting ml nodes");
internalCluster().startNodes(numMlNodes, Settings.builder() internalCluster().startNodes(numMlNodes, Settings.builder()
.put("node.data", false) .put("node.data", false)
.put("node.master", false) .put("node.master", false)
.put(MachineLearning.ALLOCATION_ENABLED.getKey(), true).build()); .put(MachineLearning.ML_ENABLED.getKey(), true).build());
ensureStableCluster(numMlNodes + 1); ensureStableCluster(numMlNodes + 1);
int maxConcurrentJobAllocations = randomIntBetween(1, 4); int maxConcurrentJobAllocations = randomIntBetween(1, 4);
@ -273,7 +271,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
Runnable r = () -> { Runnable r = () -> {
try { try {
internalCluster() internalCluster()
.stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), false)); .stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ML_ENABLED.getKey(), false));
} catch (IOException e) { } catch (IOException e) {
logger.error("error stopping node", e); logger.error("error stopping node", e);
} }
@ -294,7 +292,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
internalCluster().startNodes(numMlNodes, Settings.builder() internalCluster().startNodes(numMlNodes, Settings.builder()
.put("node.data", false) .put("node.data", false)
.put("node.master", false) .put("node.master", false)
.put(MachineLearning.ALLOCATION_ENABLED.getKey(), true).build()); .put(MachineLearning.ML_ENABLED.getKey(), true).build());
ensureStableCluster(1 + numMlNodes); ensureStableCluster(1 + numMlNodes);
assertBusy(() -> { assertBusy(() -> {