From 4b1ed5b453155472ac7174de5b56cedb0c15f7e0 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 17 Feb 2017 14:16:33 +0100 Subject: [PATCH] [ML] Add a setting (`xpack.ml.node_concurrent_job_allocations`) to control the maximum concurrent job allocations. A job allocation is either a job task in OPENING state or a job task that has been assigned to an executor node, but the executor node hasn't had the opportunity to set the job task status to OPENING. In order to keep track of restarted tasks, `allocationIdOnLastStatusUpdate` field was added to `PersistentTaskInProgress` class. This will allow persistent task implementors to detect whether the executor node has changed or has been unset since the last status update has occured. Original commit: elastic/x-pack-elasticsearch@b7b85a827422f7c30f9200738c6f14c6ef83c823 --- .../xpack/ml/MachineLearning.java | 7 +- .../xpack/ml/action/CloseJobAction.java | 8 +- .../xpack/ml/action/OpenJobAction.java | 75 ++++++++--- .../xpack/ml/job/metadata/MlMetadata.java | 10 +- .../xpack/persistent/PersistentTask.java | 1 + .../persistent/PersistentTasksInProgress.java | 43 +++++-- .../xpack/ml/action/CloseJobActionTests.java | 10 +- .../xpack/ml/action/OpenJobActionTests.java | 82 ++++++++++-- .../ml/action/StartDatafeedActionTests.java | 19 +-- .../integration/BasicDistributedJobsIT.java | 119 +++++++++++++++++- .../ml/job/metadata/MlMetadataTests.java | 7 +- .../AutodetectProcessManagerTests.java | 3 +- .../xpack/ml/support/BaseMlIntegTestCase.java | 2 +- 13 files changed, 310 insertions(+), 76 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 33c0a9a80ab..779f429d9b0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -13,7 +13,6 @@ import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Module; @@ -24,7 +23,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.ActionPlugin; @@ -122,6 +120,7 @@ import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction; +import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction; import org.elasticsearch.xpack.persistent.PersistentActionCoordinator; import org.elasticsearch.xpack.persistent.PersistentActionRegistry; import org.elasticsearch.xpack.persistent.PersistentActionRequest; @@ -129,7 +128,6 @@ import org.elasticsearch.xpack.persistent.PersistentActionService; import org.elasticsearch.xpack.persistent.PersistentTaskClusterService; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; -import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction; import org.elasticsearch.xpack.persistent.StartPersistentTaskAction; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; @@ -157,6 +155,8 @@ public class MachineLearning extends Plugin implements ActionPlugin { public static final String ALLOCATION_ENABLED_ATTR = "xpack.ml.allocation_enabled"; public static final Setting ALLOCATION_ENABLED = Setting.boolSetting("node.attr." + ALLOCATION_ENABLED_ATTR, XPackSettings.MACHINE_LEARNING_ENABLED, Setting.Property.NodeScope); + public static final Setting CONCURRENT_JOB_ALLOCATIONS = + Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope); private final Settings settings; private final Environment env; @@ -179,6 +179,7 @@ public class MachineLearning extends Plugin implements ActionPlugin { return Collections.unmodifiableList( Arrays.asList(USE_NATIVE_PROCESS_OPTION, ALLOCATION_ENABLED, + CONCURRENT_JOB_ALLOCATIONS, ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING, ProcessCtrl.MAX_ANOMALY_RECORDS_SETTING, DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index cd1f09ebc53..a254f3a177f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -344,11 +344,9 @@ public class CloseJobAction extends Action task = validateAndFindTask(jobId, currentState); PersistentTasksInProgress currentTasks = currentState.getMetaData().custom(PersistentTasksInProgress.TYPE); Map> updatedTasks = new HashMap<>(currentTasks.taskMap()); - for (PersistentTaskInProgress taskInProgress : currentTasks.tasks()) { - if (taskInProgress.getId() == task.getId()) { - updatedTasks.put(taskInProgress.getId(), new PersistentTaskInProgress<>(taskInProgress, JobState.CLOSING)); - } - } + PersistentTaskInProgress taskToUpdate = currentTasks.getTask(task.getId()); + taskToUpdate = new PersistentTaskInProgress<>(taskToUpdate, JobState.CLOSING); + updatedTasks.put(taskToUpdate.getId(), taskToUpdate); PersistentTasksInProgress newTasks = new PersistentTasksInProgress(currentTasks.getCurrentId(), updatedTasks); MlMetadata mlMetadata = currentState.metaData().custom(MlMetadata.TYPE); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index ccaefec33e7..81e0914772f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -56,6 +56,8 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa import org.elasticsearch.xpack.persistent.TransportPersistentAction; import java.io.IOException; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Consumer; @@ -247,6 +249,8 @@ public class OpenJobAction extends Action reasons = new LinkedList<>(); + DiscoveryNode minLoadedNode = null; PersistentTasksInProgress persistentTasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); for (DiscoveryNode node : clusterState.getNodes()) { Map nodeAttributes = node.getAttributes(); String allocationEnabled = nodeAttributes.get(MachineLearning.ALLOCATION_ENABLED_ATTR); if ("true".equals(allocationEnabled) == false) { - logger.debug("Not opening job [{}] on node [{}], because this node isn't a ml node.", jobId, node); + String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node isn't a ml node."; + logger.debug(reason); + reasons.add(reason); continue; } - long numberOfOpenedJobs; + long numberOfAssignedJobs; + int numberOfAllocatingJobs; if (persistentTasksInProgress != null) { - numberOfOpenedJobs = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), OpenJobAction.NAME); + numberOfAssignedJobs = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), OpenJobAction.NAME); + numberOfAllocatingJobs = persistentTasksInProgress.findTasks(OpenJobAction.NAME, task -> { + if (node.getId().equals(task.getExecutorNode()) == false) { + return false; + } + JobState jobTaskState = (JobState) task.getStatus(); + return jobTaskState == null || // executor node didn't have the chance to set job status to OPENING + jobTaskState == JobState.OPENING || // executor node is busy starting the cpp process + task.isCurrentStatus() == false; // previous executor node failed and + // current executor node didn't have the chance to set job status to OPENING + }).size(); } else { - numberOfOpenedJobs = 0; + numberOfAssignedJobs = 0; + numberOfAllocatingJobs = 0; } - long maxNumberOfOpenJobs = Long.parseLong(node.getAttributes().get(MAX_RUNNING_JOBS_PER_NODE.getKey())); - long available = maxNumberOfOpenJobs - numberOfOpenedJobs; - if (available == 0) { - logger.debug("Not opening job [{}] on node [{}], because this node is full. Number of opened jobs [{}], {} [{}]", - jobId, node, numberOfOpenedJobs, MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfOpenJobs); + if (numberOfAllocatingJobs >= maxConcurrentJobAllocations) { + String reason = "Not opening job [" + jobId + "] on node [" + node + "], because node exceeds [" + numberOfAllocatingJobs + + "] the maximum number of jobs [" + maxConcurrentJobAllocations + "] in opening state"; + logger.debug(reason); + reasons.add(reason); continue; } + + long maxNumberOfOpenJobs = Long.parseLong(node.getAttributes().get(MAX_RUNNING_JOBS_PER_NODE.getKey())); + long available = maxNumberOfOpenJobs - numberOfAssignedJobs; + if (available == 0) { + String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node is full. " + + "Number of opened jobs [" + numberOfAssignedJobs + "], " + MAX_RUNNING_JOBS_PER_NODE.getKey() + + " [" + maxNumberOfOpenJobs + "]"; + logger.debug(reason); + reasons.add(reason); + continue; + } + if (maxAvailable < available) { maxAvailable = available; - leastLoadedNode = node; + minLoadedNode = node; } } - return leastLoadedNode; + if (minLoadedNode != null) { + logger.info("selected node [{}] for job [{}]", minLoadedNode, jobId); + } else { + logger.info("no node selected for job [{}], reasons [{}]", jobId, String.join(",\n", reasons)); + } + return minLoadedNode; } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java index e876f8d5d08..cc72aa47c89 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java @@ -403,11 +403,13 @@ public class MlMetadata implements MetaData.Custom { public static JobState getJobState(String jobId, @Nullable PersistentTasksInProgress tasks) { PersistentTasksInProgress.PersistentTaskInProgress task = getJobTask(jobId, tasks); if (task != null && task.getStatus() != null) { - return (JobState) task.getStatus(); - } else { - // If we haven't opened a job than there will be no persistent task, which is the same as if the job was closed - return JobState.CLOSED; + JobState jobTaskState = (JobState) task.getStatus(); + if (jobTaskState != null) { + return jobTaskState; + } } + // If we haven't opened a job than there will be no persistent task, which is the same as if the job was closed + return JobState.CLOSED; } public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksInProgress tasks) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTask.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTask.java index e430d35ba02..bf5be288335 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTask.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTask.java @@ -57,4 +57,5 @@ public class PersistentTask extends CancellableTask { public void setPersistentTaskId(long persistentTaskId) { this.persistentTaskId = persistentTaskId; } + } \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java index 436206bdf9c..a94486e57f0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java @@ -93,6 +93,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable task, boolean stopped, String newExecutorNode) { this(task.id, task.allocationId + 1L, task.action, task.request, stopped, task.removeOnCompletion, task.status, - newExecutorNode); + newExecutorNode, task.allocationId); } public PersistentTaskInProgress(PersistentTaskInProgress task, Status status) { - this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status, task.executorNode); + this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status, + task.executorNode, task.allocationId); } private PersistentTaskInProgress(long id, long allocationId, String action, Request request, - boolean stopped, boolean removeOnCompletion, Status status, String executorNode) { + boolean stopped, boolean removeOnCompletion, Status status, + String executorNode, Long allocationIdOnLastStatusUpdate) { this.id = id; this.allocationId = allocationId; this.action = action; @@ -220,6 +226,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable setId(long id) { this.id = id; @@ -394,8 +417,14 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) { + this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate; + return this; + } + public PersistentTaskInProgress build() { - return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode); + return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status, + executorNode, allocationIdOnLastStatusUpdate); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java index 42ca1ecebec..e47907d7e98 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa import java.util.Collections; +import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; public class CloseJobActionTests extends ESTestCase { @@ -26,9 +27,7 @@ public class CloseJobActionTests extends ESTestCase { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(buildJobBuilder("job_id").build(), false); PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, null); - task = new PersistentTaskInProgress<>(task, randomFrom(JobState.OPENED, JobState.FAILED)); - + createJobTask(1L, "job_id", null, randomFrom(JobState.OPENED, JobState.FAILED)); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)))); @@ -52,10 +51,7 @@ public class CloseJobActionTests extends ESTestCase { public void testMoveJobToClosingState_unexpectedJobState() { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(buildJobBuilder("job_id").build(), false); - PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, null); - task = new PersistentTaskInProgress<>(task, JobState.OPENING); - + PersistentTaskInProgress task = createJobTask(1L, "job_id", null, JobState.OPENING); ClusterState.Builder csBuilder1 = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)))); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index f5dcf6f9f12..fc52d040940 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -41,16 +41,14 @@ public class OpenJobActionTests extends ESTestCase { .build(); PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_node_id"); - task = new PersistentTaskInProgress<>(task, randomFrom(JobState.CLOSED, JobState.FAILED)); + createJobTask(1L, "job_id", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED)); PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes); OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksInProgress(1L, Collections.emptyMap()), nodes); OpenJobAction.validate("job_id", mlBuilder.build(), null, nodes); - task = new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_other_node_id"); - task = new PersistentTaskInProgress<>(task, JobState.OPENED); + task = createJobTask(1L, "job_id", "_other_node_id", JobState.OPENED); tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes); } @@ -79,19 +77,16 @@ public class OpenJobActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_node_id"); JobState jobState = randomFrom(JobState.OPENING, JobState.OPENED, JobState.CLOSING); - task = new PersistentTaskInProgress<>(task, jobState); + PersistentTaskInProgress task = createJobTask(1L, "job_id", "_node_id", jobState); PersistentTasksInProgress tasks1 = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); Exception e = expectThrows(ElasticsearchStatusException.class, () -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1, nodes)); assertEquals("[job_id] expected state [closed] or [failed], but got [" + jobState +"]", e.getMessage()); - task = new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_other_node_id"); jobState = randomFrom(JobState.OPENING, JobState.CLOSING); - task = new PersistentTaskInProgress<>(task, jobState); + task = createJobTask(1L, "job_id", "_other_node_id", jobState); PersistentTasksInProgress tasks2 = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); e = expectThrows(ElasticsearchStatusException.class, @@ -124,7 +119,7 @@ public class OpenJobActionTests extends ESTestCase { ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); cs.nodes(nodes); cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); - DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), logger); + DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, logger); assertEquals("_node_id3", result.getId()); } @@ -152,7 +147,7 @@ public class OpenJobActionTests extends ESTestCase { ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); cs.nodes(nodes); cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); - DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), logger); + DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger); assertNull(result); } @@ -174,8 +169,71 @@ public class OpenJobActionTests extends ESTestCase { ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); cs.nodes(nodes); cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); - DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), logger); + DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger); assertNull(result); } + public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true"); + nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + Map> taskMap = new HashMap<>(); + taskMap.put(0L, createJobTask(0L, "job_id1", "_node_id1", JobState.OPENING)); + taskMap.put(1L, createJobTask(1L, "job_id2", "_node_id1", JobState.OPENING)); + taskMap.put(2L, createJobTask(2L, "job_id3", "_node_id2", JobState.OPENING)); + taskMap.put(3L, createJobTask(3L, "job_id4", "_node_id2", JobState.OPENING)); + taskMap.put(4L, createJobTask(4L, "job_id5", "_node_id3", JobState.OPENING)); + PersistentTasksInProgress tasks = new PersistentTasksInProgress(5L, taskMap); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); + DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs.build(), 2, logger); + assertEquals("_node_id3", result.getId()); + + PersistentTaskInProgress lastTask = createJobTask(5L, "job_id6", "_node_id3", JobState.OPENING); + taskMap.put(5L, lastTask); + tasks = new PersistentTasksInProgress(6L, taskMap); + + cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger); + assertNull("no node selected, because OPENING state", result); + + taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, false, "_node_id3")); + tasks = new PersistentTasksInProgress(6L, taskMap); + + cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger); + assertNull("no node selected, because stale task", result); + + taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, null)); + tasks = new PersistentTasksInProgress(6L, taskMap); + + cs = ClusterState.builder(new ClusterName("_name")); + cs.nodes(nodes); + cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); + result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger); + assertNull("no node selected, because null state", result); + } + + public static PersistentTaskInProgress createJobTask(long id, String jobId, String nodeId, JobState jobState) { + PersistentTaskInProgress task = + new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true, nodeId); + task = new PersistentTaskInProgress<>(task, jobState); + return task; + } + } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index 0ada25abd2d..7cf26b471fd 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; import static org.hamcrest.Matchers.equalTo; @@ -41,10 +42,8 @@ public class StartDatafeedActionTests extends ESTestCase { mlMetadata.putJob(job, false); mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*"))); - PersistentTaskInProgress task = - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request(job.getId()), false, true, "node_id"); - task = new PersistentTaskInProgress<>(task, randomFrom(JobState.FAILED, JobState.CLOSED, - JobState.CLOSING, JobState.OPENING)); + JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED, JobState.CLOSING, JobState.OPENING); + PersistentTaskInProgress task = createJobTask(0L, job.getId(), "node_id", jobState); PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); DiscoveryNodes nodes = DiscoveryNodes.builder() @@ -61,7 +60,7 @@ public class StartDatafeedActionTests extends ESTestCase { DiscoveryNode node = StartDatafeedAction.selectNode(logger, request, cs.build()); assertNull(node); - task = new PersistentTaskInProgress<>(task, JobState.OPENED); + task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) @@ -110,10 +109,7 @@ public class StartDatafeedActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTaskInProgress jobTask = - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), - false, true, "node_id"); - jobTask = new PersistentTaskInProgress<>(jobTask, JobState.OPENED); + PersistentTaskInProgress jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED); PersistentTaskInProgress datafeedTask = new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), false, true, "node_id"); @@ -140,10 +136,7 @@ public class StartDatafeedActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTaskInProgress jobTask = - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), - false, true, "node_id2"); - jobTask = new PersistentTaskInProgress<>(jobTask, JobState.OPENED); + PersistentTaskInProgress jobTask = createJobTask(0L, "job_id", "node_id2", JobState.OPENED); PersistentTaskInProgress datafeedTask = new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), false, true, "node_id1"); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 7cc5d1d4a3f..b2c12c7b0fd 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -22,10 +22,16 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE; @@ -155,7 +161,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertBusy(() -> { ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); - PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); + PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); Map expectedNodeAttr = new HashMap<>(); @@ -172,7 +178,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { // job should get and remain in a failed state: ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); - PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); + PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); assertNull(task.getExecutorNode()); // The status remains to be opened as from ml we didn't had the chance to set the status to failed: @@ -186,7 +192,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { // job should be re-opened: ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); - PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); + PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); assertNotNull(task.getExecutorNode()); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); @@ -199,4 +205,111 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { cleanupWorkaround(3); } + public void testMaxConcurrentJobAllocations() throws Exception { + int numMlNodes = 2; + internalCluster().ensureAtMostNumDataNodes(0); + // start non ml node, but that will hold the indices + logger.info("Start non ml node:"); + String nonMlNode = internalCluster().startNode(Settings.builder() + .put(MachineLearning.ALLOCATION_ENABLED.getKey(), false)); + logger.info("Starting ml nodes"); + internalCluster().startNodes(numMlNodes, Settings.builder() + .put("node.data", false) + .put("node.master", false) + .put(MachineLearning.ALLOCATION_ENABLED.getKey(), true).build()); + ensureStableCluster(numMlNodes + 1); + + int maxConcurrentJobAllocations = randomIntBetween(1, 4); + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .put(MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(), maxConcurrentJobAllocations)) + .get(); + + // Sample each cs update and keep track each time a node holds more than `maxConcurrentJobAllocations` opening jobs. + List violations = new CopyOnWriteArrayList<>(); + internalCluster().clusterService(nonMlNode).addListener(event -> { + PersistentTasksInProgress tasks = event.state().metaData().custom(PersistentTasksInProgress.TYPE); + if (tasks == null) { + return; + } + + for (DiscoveryNode node : event.state().nodes()) { + Collection> foundTasks = tasks.findTasks(OpenJobAction.NAME, task -> { + return node.getId().equals(task.getExecutorNode()) && + (task.getStatus() == null || task.getStatus() == JobState.OPENING || task.isCurrentStatus() == false); + }); + int count = foundTasks.size(); + if (count > maxConcurrentJobAllocations) { + violations.add("Observed node [" + node.getName() + "] with [" + count + "] opening jobs on cluster state version [" + + event.state().version() + "]"); + } + } + }); + + int numJobs = numMlNodes * 10; + for (int i = 0; i < numJobs; i++) { + Job.Builder job = createJob(Integer.toString(i)); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true, job.getId())); + PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); + assertTrue(putJobResponse.isAcknowledged()); + + OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId()); + client().execute(OpenJobAction.INSTANCE, openJobRequest).get(); + } + + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE); + assertEquals(numJobs, tasks.taskMap().size()); + for (PersistentTaskInProgress task : tasks.taskMap().values()) { + assertNotNull(task.getExecutorNode()); + assertEquals(JobState.OPENED, task.getStatus()); + } + }); + + logger.info("stopping ml nodes"); + for (int i = 0; i < numMlNodes; i++) { + // fork so stopping all ml nodes proceeds quicker: + Runnable r = () -> { + try { + internalCluster() + .stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), false)); + } catch (IOException e) { + logger.error("error stopping node", e); + } + }; + new Thread(r).start(); + } + ensureStableCluster(1, nonMlNode); + assertBusy(() -> { + ClusterState state = client(nonMlNode).admin().cluster().prepareState().get().getState(); + PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE); + assertEquals(numJobs, tasks.taskMap().size()); + for (PersistentTaskInProgress task : tasks.taskMap().values()) { + assertNull(task.getExecutorNode()); + } + }); + + logger.info("re-starting ml nodes"); + internalCluster().startNodes(numMlNodes, Settings.builder() + .put("node.data", false) + .put("node.master", false) + .put(MachineLearning.ALLOCATION_ENABLED.getKey(), true).build()); + + ensureStableCluster(1 + numMlNodes); + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE); + assertEquals(numJobs, tasks.taskMap().size()); + for (PersistentTaskInProgress task : tasks.taskMap().values()) { + assertNotNull(task.getExecutorNode()); + assertEquals(JobState.OPENED, task.getStatus()); + } + }, 30, TimeUnit.SECONDS); + + assertEquals("Expected no violations, but got [" + violations + "]", 0, violations.size()); + cleanupWorkaround(numMlNodes + 1); + } + + } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java index 46271c52ce5..36248305f84 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa import java.io.IOException; import java.util.Collections; +import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; @@ -146,11 +147,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { assertThat(result.getJobs().get("1"), sameInstance(job1)); assertThat(result.getDatafeeds().get("1"), nullValue()); - PersistentTaskInProgress task = - new PersistentTaskInProgress<>( - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("1"), false, true, null), - JobState.CLOSED - ); + PersistentTaskInProgress task = createJobTask(0L, "1", null, JobState.CLOSED); MlMetadata.Builder builder2 = new MlMetadata.Builder(result); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder2.deleteJob("1", new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task)))); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 90f16a038e8..b6c30195cf5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -131,7 +131,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { manager.openJob("foo", 1L, false, e -> {}); assertEquals(1, manager.numberOfOpenJobs()); assertTrue(manager.jobHasActiveAutodetectProcess("foo")); - UpdatePersistentTaskStatusAction.Request expectedRequest = new UpdatePersistentTaskStatusAction.Request(1L, JobState.OPENED); + UpdatePersistentTaskStatusAction.Request expectedRequest = + new UpdatePersistentTaskStatusAction.Request(1L, JobState.OPENED); verify(client).execute(eq(UpdatePersistentTaskStatusAction.INSTANCE), eq(expectedRequest), any()); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 55de9c375ad..8f4574853d9 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -115,7 +115,7 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase { deleteAllDatafeeds(client()); deleteAllJobs(client()); for (int i = 0; i < numNodes; i++) { - internalCluster().stopRandomDataNode(); + internalCluster().stopRandomNode(settings -> true); } internalCluster().startNode(Settings.builder().put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false)); ensureStableCluster(1);