[ml] Added allocation enabled node attribute setting

This node attribute setting defaults to true when ml is enabled and jobs are only opened on nodes where the allocation enabled node attribute is set to true.

Original commit: elastic/x-pack-elasticsearch@fd3f1b3058
This commit is contained in:
Martijn van Groningen 2017-02-14 12:39:12 +01:00
parent 6183015639
commit 6e411c6118
3 changed files with 109 additions and 2 deletions

View File

@ -155,6 +155,10 @@ public class MachineLearning extends Plugin implements ActionPlugin {
public static final Setting<Boolean> USE_NATIVE_PROCESS_OPTION = Setting.boolSetting("useNativeProcess", true, Property.NodeScope,
Property.Deprecated);
public static final String ALLOCATION_ENABLED_ATTR = "xpack.ml.allocation_enabled";
public static final Setting<Boolean> ALLOCATION_ENABLED = Setting.boolSetting("node.attr." + ALLOCATION_ENABLED_ATTR,
XPackSettings.MACHINE_LEARNING_ENABLED, Setting.Property.NodeScope);
private final Settings settings;
private final Environment env;
private boolean enabled;
@ -175,6 +179,7 @@ public class MachineLearning extends Plugin implements ActionPlugin {
public List<Setting<?>> getSettings() {
return Collections.unmodifiableList(
Arrays.asList(USE_NATIVE_PROCESS_OPTION,
ALLOCATION_ENABLED,
ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING,
ProcessCtrl.MAX_ANOMALY_RECORDS_SETTING,
DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
@ -182,6 +187,26 @@ public class MachineLearning extends Plugin implements ActionPlugin {
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE));
}
@Override
public Settings additionalSettings() {
Boolean allocationEnabled = settings.getAsBoolean(ALLOCATION_ENABLED.getKey(), null);
if (allocationEnabled != null) {
if (enabled == false && allocationEnabled) {
// if the ml plugin has been disabled the ml allocation enabled node attribute shouldn't be set,
// otherwise other nodes will allocate jobs to this node and that will fail, because ml hasn't been loaded.
throw new IllegalArgumentException("Can't specify [" + ALLOCATION_ENABLED.getKey() + "] to true when [" +
XPackSettings.MACHINE_LEARNING_ENABLED.getKey() + "] has been set to false");
}
return super.additionalSettings();
} else {
// Make sure that we explicitly set allocation enabled node attribute if it has been specified in the node
// settings. So we can always rely on it during assigning job tasks to nodes.
return Settings.builder()
.put(ALLOCATION_ENABLED.getKey(), ALLOCATION_ENABLED.get(settings))
.build();
}
}
@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(

View File

@ -15,6 +15,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
@ -34,6 +35,7 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
@ -50,6 +52,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa
import org.elasticsearch.xpack.persistent.TransportPersistentAction;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
@ -264,6 +267,15 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
});
}
@Override
public DiscoveryNode executorNode(Request request, ClusterState clusterState) {
return selectLeastLoadedNode(clusterState, node -> {
Map<String, String> nodeAttributes = node.getAttributes();
String allocationEnabled = nodeAttributes.get(MachineLearning.ALLOCATION_ENABLED_ATTR);
return "true".equals(allocationEnabled);
});
}
@Override
public void validate(Request request, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
@ -308,8 +320,12 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
}
PersistentTaskInProgress<?> task = MlMetadata.getJobTask(jobId, tasks);
JobState jobState = MlMetadata.getJobState(jobId, tasks);
if (task != null && task.getExecutorNode() != null && jobState == JobState.OPENED) {
if (nodes.nodeExists(task.getExecutorNode()) == false) {
if (task != null && jobState == JobState.OPENED) {
if (task.getExecutorNode() == null) {
// We can skip the job state check below, because the task got unassigned after we went into
// opened state on a node that disappeared and we didn't have the opportunity to set the status to failed
return;
} else if (nodes.nodeExists(task.getExecutorNode()) == false) {
// The state is open and the node were running on no longer exists.
// We can skip the job state check below, because when the node
// disappeared we didn't have time to set the status to failed.

View File

@ -5,6 +5,10 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
@ -16,6 +20,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import java.util.Collections;
@ -120,4 +125,65 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
cleanupWorkaround(2);
}
public void testDedicatedMlNode() throws Exception {
internalCluster().ensureAtMostNumDataNodes(0);
// start 2 non ml node that will never get a job allocated. (but ml apis are accessable from this node)
internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), false));
internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), false));
// start ml node
if (randomBoolean()) {
internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), true));
} else {
// the default is based on 'xpack.ml.enabled', which is enabled in base test class.
internalCluster().startNode();
}
ensureStableCluster(3);
Job.Builder job = createJob("job_id");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true, job.getId()));
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
assertEquals(Collections.singletonMap(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"), node.getAttributes());
assertEquals(JobState.OPENED, task.getStatus());
});
// stop the only running ml node
internalCluster().stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), true));
ensureStableCluster(2);
assertBusy(() -> {
// job should get and remain in a failed state:
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
assertNull(task.getExecutorNode());
// The status remains to be opened as from ml we didn't had the chance to set the status to failed:
assertEquals(JobState.OPENED, task.getStatus());
});
// start ml node
internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), true));
ensureStableCluster(3);
assertBusy(() -> {
// job should be re-opened:
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
assertNotNull(task.getExecutorNode());
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
assertEquals(Collections.singletonMap(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"), node.getAttributes());
assertEquals(JobState.OPENED, task.getStatus());
});
}
}