[ML] Add max open jobs per node as a node attribute (elastic/x-pack-elasticsearch#2203)

This commit adds the max_running_jobs setting from elasticsearch.yml
into a node attribute called ml.max_open_jobs.  Previously there was
an assumption that max_running_jobs would be the same for all nodes in
the cluster.  However, during a rolling cluster restart where the value
of the setting is being changed this clearly cannot be the case, and
would cause unexpected/unpredictable limits to be used during the period
when different nodes had different settings.

For backwards compatibility, if another node in the cluster has not added
its setting for max_running_jobs to the cluster state then the old
(flawed but better than nothing) approach is applied, i.e. assume the
remote node's setting for max_running_jobs is equal to that of the node
deciding the job allocation.

Relates elastic/x-pack-elasticsearch#2185

Original commit: elastic/x-pack-elasticsearch@1e62b89183
This commit is contained in:
David Roberts 2017-08-08 16:16:27 +01:00 committed by GitHub
parent 2864078da2
commit 22da5cf89e
3 changed files with 33 additions and 7 deletions

View File

@ -155,8 +155,9 @@ public class MachineLearning implements ActionPlugin {
public static final Setting<Boolean> AUTODETECT_PROCESS =
Setting.boolSetting("xpack.ml.autodetect_process", true, Property.NodeScope);
public static final Setting<Boolean> ML_ENABLED =
Setting.boolSetting("node.ml", XPackSettings.MACHINE_LEARNING_ENABLED, Setting.Property.NodeScope);
Setting.boolSetting("node.ml", XPackSettings.MACHINE_LEARNING_ENABLED, Property.NodeScope);
public static final String ML_ENABLED_NODE_ATTR = "ml.enabled";
public static final String MAX_OPEN_JOBS_NODE_ATTR = "ml.max_open_jobs";
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS =
Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope);
@ -200,7 +201,10 @@ public class MachineLearning implements ActionPlugin {
Settings.Builder additionalSettings = Settings.builder();
Boolean allocationEnabled = ML_ENABLED.get(settings);
if (allocationEnabled != null && allocationEnabled) {
// TODO: the simple true/false flag will not be required once all supported versions have the number - consider removing in 7.0
additionalSettings.put("node.attr." + ML_ENABLED_NODE_ATTR, "true");
additionalSettings.put("node.attr." + MAX_OPEN_JOBS_NODE_ATTR,
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings));
}
return additionalSettings.build();
}

View File

@ -42,7 +42,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -566,14 +565,20 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
private final AutodetectProcessManager autodetectProcessManager;
private final int maxNumberOfOpenJobs;
/**
* The maximum number of open jobs can be different on each node. However, nodes on older versions
* won't add their setting to the cluster state, so for backwards compatibility with these nodes we
* assume the older node's setting is the same as that of the node running this code.
* TODO: remove this member in 7.0
*/
private final int fallbackMaxNumberOfOpenJobs;
private volatile int maxConcurrentJobAllocations;
public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService,
AutodetectProcessManager autodetectProcessManager) {
super(settings, TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME);
this.autodetectProcessManager = autodetectProcessManager;
this.maxNumberOfOpenJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings);
this.fallbackMaxNumberOfOpenJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings);
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
@ -581,7 +586,8 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
@Override
public Assignment getAssignment(JobParams params, ClusterState clusterState) {
return selectLeastLoadedMlNode(params.getJobId(), clusterState, maxConcurrentJobAllocations, maxNumberOfOpenJobs, logger);
return selectLeastLoadedMlNode(params.getJobId(), clusterState, maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs,
logger);
}
@Override
@ -591,7 +597,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
OpenJobAction.validate(params.getJobId(), mlMetadata);
Assignment assignment = selectLeastLoadedMlNode(params.getJobId(), clusterState, maxConcurrentJobAllocations,
maxNumberOfOpenJobs, logger);
fallbackMaxNumberOfOpenJobs, logger);
if (assignment.getExecutorNode() == null) {
String msg = "Could not open job because no suitable nodes were found, allocation explanation ["
+ assignment.getExplanation() + "]";
@ -649,7 +655,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
static Assignment selectLeastLoadedMlNode(String jobId, ClusterState clusterState, int maxConcurrentJobAllocations,
long maxNumberOfOpenJobs, Logger logger) {
int fallbackMaxNumberOfOpenJobs, Logger logger) {
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(jobId, clusterState);
if (unavailableIndices.size() != 0) {
String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
@ -716,6 +722,20 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
continue;
}
String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR);
int maxNumberOfOpenJobs = fallbackMaxNumberOfOpenJobs;
// TODO: remove leniency and reject the node if the attribute is null in 7.0
if (maxNumberOfOpenJobsStr != null) {
try {
maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr);
} catch (NumberFormatException e) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because " +
MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer";
logger.trace(reason);
reasons.add(reason);
continue;
}
}
long available = maxNumberOfOpenJobs - numberOfAssignedJobs;
if (available == 0) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node is full. " +

View File

@ -215,6 +215,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
Map<String, String> expectedNodeAttr = new HashMap<>();
expectedNodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
expectedNodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
assertEquals(expectedNodeAttr, node.getAttributes());
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
assertNotNull(jobTaskStatus);
@ -402,6 +403,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
Map<String, String> expectedNodeAttr = new HashMap<>();
expectedNodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
expectedNodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
assertEquals(expectedNodeAttr, node.getAttributes());
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();