[ML] Made `max_running_jobs` a cluster wide setting and

remove `node.attr.max_running_jobs` node attribute and use `node.attr.ml.enabled` node attribute instead to know whether a node is a ml node or not.

Also renamed `max_running_jobs` setting to `xpack.ml.max_running_jobs`.

Original commit: elastic/x-pack-elasticsearch@798732886b
This commit is contained in:
Martijn van Groningen 2017-04-13 10:46:24 +02:00
parent 2385619158
commit 911cfc9623
8 changed files with 31 additions and 28 deletions

View File

@ -124,13 +124,13 @@ import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction;
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.persistent.StartPersistentTaskAction;
import org.elasticsearch.xpack.persistent.PersistentTasksClusterService;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksExecutorRegistry;
import org.elasticsearch.xpack.persistent.PersistentTasksNodeService;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.elasticsearch.xpack.persistent.StartPersistentTaskAction;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
import org.elasticsearch.xpack.security.InternalClient;
@ -155,6 +155,7 @@ public class MachineLearning implements ActionPlugin {
Setting.boolSetting("xpack.ml.autodetect_process", true, Property.NodeScope);
public static final Setting<Boolean> ML_ENABLED =
Setting.boolSetting("node.ml", XPackSettings.MACHINE_LEARNING_ENABLED, Setting.Property.NodeScope);
public static final String ML_ENABLED_NODE_ATTR = "ml.enabled";
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS =
Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope);
@ -198,9 +199,7 @@ public class MachineLearning implements ActionPlugin {
Settings.Builder additionalSettings = Settings.builder();
Boolean allocationEnabled = ML_ENABLED.get(settings);
if (allocationEnabled != null && allocationEnabled) {
// Copy max_running_jobs setting to node attribute, so that we use this information when assigning job tasks to nodes:
additionalSettings.put("node.attr." + AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(),
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings));
additionalSettings.put("node.attr." + ML_ENABLED_NODE_ATTR, "true");
}
return additionalSettings.build();
}

View File

@ -378,6 +378,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
private final AutodetectProcessManager autodetectProcessManager;
private final XPackLicenseState licenseState;
private final int maxNumberOfOpenJobs;
private volatile int maxConcurrentJobAllocations;
public OpenJobPersistentTasksExecutor(Settings settings, XPackLicenseState licenseState,
@ -385,6 +386,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
super(settings, NAME, ThreadPool.Names.MANAGEMENT);
this.licenseState = licenseState;
this.autodetectProcessManager = autodetectProcessManager;
this.maxNumberOfOpenJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings);
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
@ -392,7 +394,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
@Override
public Assignment getAssignment(Request request, ClusterState clusterState) {
return selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger);
return selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, maxNumberOfOpenJobs, logger);
}
@Override
@ -403,7 +405,8 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
OpenJobAction.validate(request.getJobId(), mlMetadata, tasks);
Assignment assignment = selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger);
Assignment assignment = selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations,
maxNumberOfOpenJobs, logger);
if (assignment.getExecutorNode() == null) {
String msg = "Could not open job because no suitable nodes were found, allocation explanation ["
+ assignment.getExplanation() + "]";
@ -456,7 +459,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
static Assignment selectLeastLoadedMlNode(String jobId, ClusterState clusterState, int maxConcurrentJobAllocations,
Logger logger) {
long maxNumberOfOpenJobs, Logger logger) {
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(jobId, clusterState);
if (unavailableIndices.size() != 0) {
String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
@ -471,8 +474,8 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
PersistentTasksCustomMetaData persistentTasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
for (DiscoveryNode node : clusterState.getNodes()) {
Map<String, String> nodeAttributes = node.getAttributes();
String maxNumberOfOpenJobsStr = nodeAttributes.get(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey());
if (maxNumberOfOpenJobsStr == null) {
String enabled = nodeAttributes.get(MachineLearning.ML_ENABLED_NODE_ATTR);
if (Boolean.valueOf(enabled) == false) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node isn't a ml node.";
logger.trace(reason);
reasons.add(reason);
@ -504,7 +507,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
continue;
}
long maxNumberOfOpenJobs = Long.parseLong(maxNumberOfOpenJobsStr);
long available = maxNumberOfOpenJobs - numberOfAssignedJobs;
if (available == 0) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node is full. " +

View File

@ -77,6 +77,10 @@ public class AutodetectProcessManager extends AbstractComponent {
// and if we know that then we can prior to assigning a job to a node fail based on the
// available resources on that node: https://github.com/elastic/x-pack-elasticsearch/issues/546
// Note: on small instances on cloud, this setting will be set to: 1
// WARNING: This setting cannot be made DYNAMIC, because it is tied to several threadpools
// and a threadpool's size can't be changed at runtime.
// See MachineLearning#getExecutorBuilders(...)
public static final Setting<Integer> MAX_RUNNING_JOBS_PER_NODE =
Setting.intSetting("max_running_jobs", 10, 1, 512, Setting.Property.NodeScope);

View File

@ -56,7 +56,7 @@ public class Auditor {
@Override
public void onFailure(Exception e) {
LOGGER.error(new ParameterizedMessage("Error writing {}", new Object[]{type}, e));
LOGGER.debug(new ParameterizedMessage("Error writing {}", new Object[]{type}, e));
}
});
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
@ -44,7 +45,6 @@ import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE;
public class OpenJobActionTests extends ESTestCase {
@ -79,7 +79,7 @@ public class OpenJobActionTests extends ESTestCase {
public void testSelectLeastLoadedMlNode() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
@ -103,7 +103,7 @@ public class OpenJobActionTests extends ESTestCase {
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, logger);
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, 10, logger);
assertEquals("_node_id3", result.getExecutorNode());
}
@ -112,7 +112,7 @@ public class OpenJobActionTests extends ESTestCase {
int maxRunningJobsPerNode = randomIntBetween(1, 100);
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode));
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
for (int i = 0; i < numNodes; i++) {
@ -134,7 +134,7 @@ public class OpenJobActionTests extends ESTestCase {
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, maxRunningJobsPerNode, logger);
assertNull(result.getExecutorNode());
assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode
+ "], max_running_jobs [" + maxRunningJobsPerNode + "]"));
@ -160,14 +160,14 @@ public class OpenJobActionTests extends ESTestCase {
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, 10, logger);
assertTrue(result.getExplanation().contains("because this node isn't a ml node"));
assertNull(result.getExecutorNode());
}
public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
@ -195,7 +195,7 @@ public class OpenJobActionTests extends ESTestCase {
csBuilder.metaData(metaData);
ClusterState cs = csBuilder.build();
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger);
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, 10, logger);
assertEquals("_node_id3", result.getExecutorNode());
tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
@ -205,7 +205,7 @@ public class OpenJobActionTests extends ESTestCase {
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, logger);
assertNull("no node selected, because OPENING state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
@ -216,7 +216,7 @@ public class OpenJobActionTests extends ESTestCase {
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, logger);
assertNull("no node selected, because stale task", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
@ -227,7 +227,7 @@ public class OpenJobActionTests extends ESTestCase {
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, logger);
assertNull("no node selected, because null state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
}

View File

@ -41,8 +41,6 @@ import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE;
public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
public void testFailOverBasics() throws Exception {
@ -173,7 +171,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
Map<String, String> expectedNodeAttr = new HashMap<>();
expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
expectedNodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
assertEquals(expectedNodeAttr, node.getAttributes());
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
assertNotNull(jobTaskStatus);
@ -379,7 +377,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertFalse(task.needsReassignment(clusterState.nodes()));
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
Map<String, String> expectedNodeAttr = new HashMap<>();
expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
expectedNodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
assertEquals(expectedNodeAttr, node.getAttributes());
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();

View File

@ -111,7 +111,7 @@ setup:
- match: { datafeeds.0.state: "started"}
- is_true: datafeeds.0.node.name
- is_true: datafeeds.0.node.transport_address
- match: { datafeeds.0.node.attributes.max_running_jobs: "10"}
- match: { datafeeds.0.node.attributes.ml\.enabled: "true"}
---
"Test implicit get all datafeed stats given started datafeeds":

View File

@ -79,7 +79,7 @@ setup:
- match: { jobs.0.state: opened }
- is_true: jobs.0.node.name
- is_true: jobs.0.node.transport_address
- match: { jobs.0.node.attributes.max_running_jobs: "10"}
- match: { jobs.0.node.attributes.ml\.enabled: "true"}
- is_true: jobs.0.open_time
---