diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 2d65d0552f3..3be27816ff1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -190,6 +190,7 @@ public class MachineLearning extends Plugin implements ActionPlugin { @Override public Settings additionalSettings() { + Settings.Builder addtionalSettings = Settings.builder(); Boolean allocationEnabled = settings.getAsBoolean(ALLOCATION_ENABLED.getKey(), null); if (allocationEnabled != null) { if (enabled == false && allocationEnabled) { @@ -198,14 +199,15 @@ public class MachineLearning extends Plugin implements ActionPlugin { throw new IllegalArgumentException("Can't specify [" + ALLOCATION_ENABLED.getKey() + "] to true when [" + XPackSettings.MACHINE_LEARNING_ENABLED.getKey() + "] has been set to false"); } - return super.additionalSettings(); } else { // Make sure that we explicitly set allocation enabled node attribute if it has been specified in the node // settings. So we can always rely on it during assigning job tasks to nodes. - return Settings.builder() - .put(ALLOCATION_ENABLED.getKey(), ALLOCATION_ENABLED.get(settings)) - .build(); + addtionalSettings.put(ALLOCATION_ENABLED.getKey(), ALLOCATION_ENABLED.get(settings)); } + // Add max running job limit as node attribute so that we use this information assigning job tasks to nodes + addtionalSettings.put("node.attr." + AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), + AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings)); + return addtionalSettings.build(); } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 934a0a9e044..ef74d2753a4 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.action; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; @@ -56,6 +57,8 @@ import java.util.Map; import java.util.Objects; import java.util.function.Consumer; +import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE; + public class OpenJobAction extends Action { public static final OpenJobAction INSTANCE = new OpenJobAction(); @@ -237,6 +240,7 @@ public class OpenJobAction extends Action { private final JobStateObserver observer; + private final ClusterService clusterService; private final AutodetectProcessManager autodetectProcessManager; @Inject @@ -246,12 +250,21 @@ public class OpenJobAction extends Action listener) { + // If we already know that we can't find an ml node because all ml nodes are running at capacity or + // simply because there are no ml nodes in the cluster then we fail quickly here: + ClusterState clusterState = clusterService.state(); + if (selectLeastLoadedMlNode(request.getJobId(), clusterState, logger) == null) { + throw new ElasticsearchStatusException("no nodes available to open job [" + request.getJobId() + "]", + RestStatus.TOO_MANY_REQUESTS); + } + ActionListener finalListener = ActionListener.wrap(response -> waitForJobStarted(request, response, listener), listener::onFailure); super.doExecute(request, finalListener); @@ -269,11 +282,7 @@ public class OpenJobAction extends Action { - Map nodeAttributes = node.getAttributes(); - String allocationEnabled = nodeAttributes.get(MachineLearning.ALLOCATION_ENABLED_ATTR); - return "true".equals(allocationEnabled); - }); + return selectLeastLoadedMlNode(request.getJobId(), clusterState, logger); } @Override @@ -337,4 +346,37 @@ public class OpenJobAction extends Action nodeAttributes = node.getAttributes(); + String allocationEnabled = nodeAttributes.get(MachineLearning.ALLOCATION_ENABLED_ATTR); + if ("true".equals(allocationEnabled) == false) { + logger.debug("Not opening job [{}] on node [{}], because this node isn't a ml node.", jobId, node); + continue; + } + + long numberOfOpenedJobs; + if (persistentTasksInProgress != null) { + numberOfOpenedJobs = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), OpenJobAction.NAME); + } else { + numberOfOpenedJobs = 0; + } + long maxNumberOfOpenJobs = Long.parseLong(node.getAttributes().get(MAX_RUNNING_JOBS_PER_NODE.getKey())); + long available = maxNumberOfOpenJobs - numberOfOpenedJobs; + if (available == 0) { + logger.debug("Not opening job [{}] on node [{}], because this node is full. Number of opened jobs [{}], {} [{}]", + jobId, node, numberOfOpenedJobs, MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfOpenJobs); + continue; + } + if (maxAvailable < available) { + maxAvailable = available; + leastLoadedNode = node; + } + } + return leastLoadedNode; + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index a90a484264d..49db4674dbf 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -8,10 +8,13 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; @@ -20,8 +23,11 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa import java.net.InetAddress; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; +import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE; public class OpenJobActionTests extends ESTestCase { @@ -92,4 +98,79 @@ public class OpenJobActionTests extends ESTestCase { assertEquals("[job_id] expected state [closed] or [failed], but got [" + jobState +"]", e.getMessage()); } + public void testSelectLeastLoadedMlNode() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"); + nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + Map> taskMap = new HashMap<>(); + taskMap.put(0L, new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), "_node_id1")); + taskMap.put(1L, new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), "_node_id1")); + taskMap.put(2L, new PersistentTaskInProgress<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), "_node_id2")); + PersistentTasksInProgress tasks = new PersistentTasksInProgress(3L, taskMap); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + cs.putCustom(PersistentTasksInProgress.TYPE, tasks); + DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), logger); + assertEquals("_node_id3", result.getId()); + } + + public void testSelectLeastLoadedMlNode_maxCapacity() { + int numNodes = randomIntBetween(1, 10); + int maxRunningJobsPerNode = randomIntBetween(1, 100); + + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"); + nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode)); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); + Map> taskMap = new HashMap<>(); + for (int i = 0; i < numNodes; i++) { + String nodeId = "_node_id" + i; + TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300 + i); + nodes.add(new DiscoveryNode("_node_name" + i, nodeId, address, nodeAttr, Collections.emptySet(), Version.CURRENT)); + for (int j = 0; j < maxRunningJobsPerNode; j++) { + long id = j + (maxRunningJobsPerNode * i); + taskMap.put(id, new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request("job_id" + id), nodeId)); + } + } + PersistentTasksInProgress tasks = new PersistentTasksInProgress(numNodes * maxRunningJobsPerNode, taskMap); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + cs.putCustom(PersistentTasksInProgress.TYPE, tasks); + DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), logger); + assertNull(result); + } + + public void testSelectLeastLoadedMlNode_noMlNodes() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "false"); + nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + PersistentTaskInProgress task = + new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), "_node_id1"); + PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + cs.putCustom(PersistentTasksInProgress.TYPE, tasks); + DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), logger); + assertNull(result); + } + } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index ec625c4962b..ed71da70bf6 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -23,6 +23,10 @@ import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE; public class BasicDistributedJobsIT extends BaseMlIntegTestCase { @@ -152,7 +156,10 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); - assertEquals(Collections.singletonMap(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"), node.getAttributes()); + Map expectedNodeAttr = new HashMap<>(); + expectedNodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"); + expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); + assertEquals(expectedNodeAttr, node.getAttributes()); assertEquals(JobState.OPENED, task.getStatus()); }); @@ -181,7 +188,10 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertNotNull(task.getExecutorNode()); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); - assertEquals(Collections.singletonMap(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"), node.getAttributes()); + Map expectedNodeAttr = new HashMap<>(); + expectedNodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"); + expectedNodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); + assertEquals(expectedNodeAttr, node.getAttributes()); assertEquals(JobState.OPENED, task.getStatus()); }); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index ac2aee473ca..9684115fdd4 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -16,7 +16,6 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; -import org.elasticsearch.xpack.persistent.PersistentActionResponse; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import java.util.concurrent.ExecutionException; @@ -45,25 +44,19 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { assertTrue(putJobResponse.isAcknowledged()); expectThrows(ElasticsearchStatusException.class, () -> client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request("2")).actionGet()); - assertBusy(() -> { - GetJobsStatsAction.Response statsResponse = - client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request("2")).actionGet(); - assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.FAILED); - }); - // close second job: - client().execute(CloseJobAction.INSTANCE, new CloseJobAction.Request("2")).actionGet(); - // ensure that we remove persistent task for job 2, so that we stop the persistent task allocation loop: - assertBusy(() -> { - ClusterState state = client().admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = state.custom(PersistentTasksInProgress.TYPE); - assertEquals(1, tasks.taskMap().size()); - // now just double check that the first job is still opened: - PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); - assertEquals(JobState.OPENED, task.getStatus()); - OpenJobAction.Request openJobRequest = (OpenJobAction.Request) task.getRequest(); - assertEquals("1", openJobRequest.getJobId()); - }); + // Ensure that the second job didn't even attempt to be opened and we still have 1 job open: + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request("2")).actionGet(); + assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.CLOSED); + ClusterState state = client().admin().cluster().prepareState().get().getState(); + PersistentTasksInProgress tasks = state.custom(PersistentTasksInProgress.TYPE); + assertEquals(1, tasks.taskMap().size()); + // now just double check that the first job is still opened: + PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); + assertEquals(JobState.OPENED, task.getStatus()); + OpenJobAction.Request openJobRequest = (OpenJobAction.Request) task.getRequest(); + assertEquals("1", openJobRequest.getJobId()); } public void testSingleNode() throws Exception { @@ -83,9 +76,9 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); assertTrue(putJobResponse.isAcknowledged()); + OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId()); try { - OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId()); - PersistentActionResponse openJobResponse = client().execute(OpenJobAction.INSTANCE, openJobRequest).get(); + client().execute(OpenJobAction.INSTANCE, openJobRequest).get(); assertBusy(() -> { GetJobsStatsAction.Response statsResponse = client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); @@ -95,13 +88,14 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { } catch (ExecutionException e) { Exception cause = (Exception) e.getCause(); assertEquals(ElasticsearchStatusException.class, cause.getClass()); - assertEquals("[" + i + "] expected state [" + JobState.OPENED + "] but got [" + JobState.FAILED +"]", cause.getMessage()); + assertEquals("no nodes available to open job [" + i + "]", cause.getMessage()); logger.info("good news everybody --> reached maximum number of allowed opened jobs, after trying to open the {}th job", i); // close the first job and check if the latest job gets opened: CloseJobAction.Request closeRequest = new CloseJobAction.Request("1"); CloseJobAction.Response closeResponse = client().execute(CloseJobAction.INSTANCE, closeRequest).actionGet(); assertTrue(closeResponse.isClosed()); + client().execute(OpenJobAction.INSTANCE, openJobRequest).get(); assertBusy(() -> { GetJobsStatsAction.Response statsResponse = client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();