[ml] When trying to find a node to open a job take `max_running_jobs` into account instead of just trying and then failing.

Also before even creating a persistent task for the job, check if there ml nodes available to run the job, otherwise fail quickly.

relates elastic/x-pack-elasticsearch#545

Original commit: elastic/x-pack-elasticsearch@63fd72e56a
This commit is contained in:
Martijn van Groningen 2017-02-15 13:15:30 +01:00
parent e441e0dc4e
commit 54d57f6398
5 changed files with 162 additions and 33 deletions

View File

@ -190,6 +190,7 @@ public class MachineLearning extends Plugin implements ActionPlugin {
@Override
public Settings additionalSettings() {
Settings.Builder addtionalSettings = Settings.builder();
Boolean allocationEnabled = settings.getAsBoolean(ALLOCATION_ENABLED.getKey(), null);
if (allocationEnabled != null) {
if (enabled == false && allocationEnabled) {
@ -198,14 +199,15 @@ public class MachineLearning extends Plugin implements ActionPlugin {
throw new IllegalArgumentException("Can't specify [" + ALLOCATION_ENABLED.getKey() + "] to true when [" +
XPackSettings.MACHINE_LEARNING_ENABLED.getKey() + "] has been set to false");
}
return super.additionalSettings();
} else {
// Make sure that we explicitly set allocation enabled node attribute if it has been specified in the node
// settings. So we can always rely on it during assigning job tasks to nodes.
return Settings.builder()
.put(ALLOCATION_ENABLED.getKey(), ALLOCATION_ENABLED.get(settings))
.build();
addtionalSettings.put(ALLOCATION_ENABLED.getKey(), ALLOCATION_ENABLED.get(settings));
}
// Add max running job limit as node attribute so that we use this information assigning job tasks to nodes
addtionalSettings.put("node.attr." + AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(),
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings));
return addtionalSettings.build();
}
@Override

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
@ -56,6 +57,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE;
public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActionResponse, OpenJobAction.RequestBuilder> {
public static final OpenJobAction INSTANCE = new OpenJobAction();
@ -237,6 +240,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
public static class TransportAction extends TransportPersistentAction<Request> {
private final JobStateObserver observer;
private final ClusterService clusterService;
private final AutodetectProcessManager autodetectProcessManager;
@Inject
@ -246,12 +250,21 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
ClusterService clusterService, AutodetectProcessManager autodetectProcessManager) {
super(settings, OpenJobAction.NAME, false, threadPool, transportService, persistentActionService,
persistentActionRegistry, actionFilters, indexNameExpressionResolver, Request::new, ThreadPool.Names.MANAGEMENT);
this.clusterService = clusterService;
this.autodetectProcessManager = autodetectProcessManager;
this.observer = new JobStateObserver(threadPool, clusterService);
}
@Override
protected void doExecute(Request request, ActionListener<PersistentActionResponse> listener) {
// If we already know that we can't find an ml node because all ml nodes are running at capacity or
// simply because there are no ml nodes in the cluster then we fail quickly here:
ClusterState clusterState = clusterService.state();
if (selectLeastLoadedMlNode(request.getJobId(), clusterState, logger) == null) {
throw new ElasticsearchStatusException("no nodes available to open job [" + request.getJobId() + "]",
RestStatus.TOO_MANY_REQUESTS);
}
ActionListener<PersistentActionResponse> finalListener =
ActionListener.wrap(response -> waitForJobStarted(request, response, listener), listener::onFailure);
super.doExecute(request, finalListener);
@ -269,11 +282,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
@Override
public DiscoveryNode executorNode(Request request, ClusterState clusterState) {
return selectLeastLoadedNode(clusterState, node -> {
Map<String, String> nodeAttributes = node.getAttributes();
String allocationEnabled = nodeAttributes.get(MachineLearning.ALLOCATION_ENABLED_ATTR);
return "true".equals(allocationEnabled);
});
return selectLeastLoadedMlNode(request.getJobId(), clusterState, logger);
}
@Override
@ -337,4 +346,37 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
+ "] or [" + JobState.FAILED + "], but got [" + jobState +"]", RestStatus.CONFLICT);
}
}
static DiscoveryNode selectLeastLoadedMlNode(String jobId, ClusterState clusterState, Logger logger) {
long maxAvailable = Long.MIN_VALUE;
DiscoveryNode leastLoadedNode = null;
PersistentTasksInProgress persistentTasksInProgress = clusterState.custom(PersistentTasksInProgress.TYPE);
for (DiscoveryNode node : clusterState.getNodes()) {
Map<String, String> nodeAttributes = node.getAttributes();
String allocationEnabled = nodeAttributes.get(MachineLearning.ALLOCATION_ENABLED_ATTR);
if ("true".equals(allocationEnabled) == false) {
logger.debug("Not opening job [{}] on node [{}], because this node isn't a ml node.", jobId, node);
continue;
}
long numberOfOpenedJobs;
if (persistentTasksInProgress != null) {
numberOfOpenedJobs = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), OpenJobAction.NAME);
} else {
numberOfOpenedJobs = 0;
}
long maxNumberOfOpenJobs = Long.parseLong(node.getAttributes().get(MAX_RUNNING_JOBS_PER_NODE.getKey()));
long available = maxNumberOfOpenJobs - numberOfOpenedJobs;
if (available == 0) {
logger.debug("Not opening job [{}] on node [{}], because this node is full. Number of opened jobs [{}], {} [{}]",
jobId, node, numberOfOpenedJobs, MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfOpenJobs);
continue;
}
if (maxAvailable < available) {
maxAvailable = available;
leastLoadedNode = node;
}
}
return leastLoadedNode;
}
}

View File

@ -8,10 +8,13 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
@ -20,8 +23,11 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE;
public class OpenJobActionTests extends ESTestCase {
@ -92,4 +98,79 @@ public class OpenJobActionTests extends ESTestCase {
assertEquals("[job_id] expected state [closed] or [failed], but got [" + jobState +"]", e.getMessage());
}
public void testSelectLeastLoadedMlNode() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true");
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.build();
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
taskMap.put(0L, new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), "_node_id1"));
taskMap.put(1L, new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), "_node_id1"));
taskMap.put(2L, new PersistentTaskInProgress<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), "_node_id2"));
PersistentTasksInProgress tasks = new PersistentTasksInProgress(3L, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.putCustom(PersistentTasksInProgress.TYPE, tasks);
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), logger);
assertEquals("_node_id3", result.getId());
}
public void testSelectLeastLoadedMlNode_maxCapacity() {
int numNodes = randomIntBetween(1, 10);
int maxRunningJobsPerNode = randomIntBetween(1, 100);
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true");
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode));
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
for (int i = 0; i < numNodes; i++) {
String nodeId = "_node_id" + i;
TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300 + i);
nodes.add(new DiscoveryNode("_node_name" + i, nodeId, address, nodeAttr, Collections.emptySet(), Version.CURRENT));
for (int j = 0; j < maxRunningJobsPerNode; j++) {
long id = j + (maxRunningJobsPerNode * i);
taskMap.put(id, new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request("job_id" + id), nodeId));
}
}
PersistentTasksInProgress tasks = new PersistentTasksInProgress(numNodes * maxRunningJobsPerNode, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.putCustom(PersistentTasksInProgress.TYPE, tasks);
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), logger);
assertNull(result);
}
public void testSelectLeastLoadedMlNode_noMlNodes() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "false");
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.build();
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), "_node_id1");
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.putCustom(PersistentTasksInProgress.TYPE, tasks);
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), logger);
assertNull(result);
}
}

View File

@ -23,6 +23,10 @@ import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE;
public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
@ -152,7 +156,10 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
assertEquals(Collections.singletonMap(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"), node.getAttributes());
Map<String, String> expectedNodeAttr = new HashMap<>();
expectedNodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true");
expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
assertEquals(expectedNodeAttr, node.getAttributes());
assertEquals(JobState.OPENED, task.getStatus());
});
@ -181,7 +188,10 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertNotNull(task.getExecutorNode());
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
assertEquals(Collections.singletonMap(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"), node.getAttributes());
Map<String, String> expectedNodeAttr = new HashMap<>();
expectedNodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true");
expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
assertEquals(expectedNodeAttr, node.getAttributes());
assertEquals(JobState.OPENED, task.getStatus());
});
}

View File

@ -16,7 +16,6 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import java.util.concurrent.ExecutionException;
@ -45,25 +44,19 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
assertTrue(putJobResponse.isAcknowledged());
expectThrows(ElasticsearchStatusException.class,
() -> client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request("2")).actionGet());
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request("2")).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.FAILED);
});
// close second job:
client().execute(CloseJobAction.INSTANCE, new CloseJobAction.Request("2")).actionGet();
// ensure that we remove persistent task for job 2, so that we stop the persistent task allocation loop:
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = state.custom(PersistentTasksInProgress.TYPE);
assertEquals(1, tasks.taskMap().size());
// now just double check that the first job is still opened:
PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
assertEquals(JobState.OPENED, task.getStatus());
OpenJobAction.Request openJobRequest = (OpenJobAction.Request) task.getRequest();
assertEquals("1", openJobRequest.getJobId());
});
// Ensure that the second job didn't even attempt to be opened and we still have 1 job open:
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request("2")).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.CLOSED);
ClusterState state = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = state.custom(PersistentTasksInProgress.TYPE);
assertEquals(1, tasks.taskMap().size());
// now just double check that the first job is still opened:
PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
assertEquals(JobState.OPENED, task.getStatus());
OpenJobAction.Request openJobRequest = (OpenJobAction.Request) task.getRequest();
assertEquals("1", openJobRequest.getJobId());
}
public void testSingleNode() throws Exception {
@ -83,9 +76,9 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
try {
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
PersistentActionResponse openJobResponse = client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
@ -95,13 +88,14 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
} catch (ExecutionException e) {
Exception cause = (Exception) e.getCause();
assertEquals(ElasticsearchStatusException.class, cause.getClass());
assertEquals("[" + i + "] expected state [" + JobState.OPENED + "] but got [" + JobState.FAILED +"]", cause.getMessage());
assertEquals("no nodes available to open job [" + i + "]", cause.getMessage());
logger.info("good news everybody --> reached maximum number of allowed opened jobs, after trying to open the {}th job", i);
// close the first job and check if the latest job gets opened:
CloseJobAction.Request closeRequest = new CloseJobAction.Request("1");
CloseJobAction.Response closeResponse = client().execute(CloseJobAction.INSTANCE, closeRequest).actionGet();
assertTrue(closeResponse.isClosed());
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();