diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 3b7e192f6e7..966025e1d09 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.action; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchStatusException; @@ -16,7 +15,6 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -25,7 +23,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; @@ -52,10 +49,10 @@ import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; +import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; -import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.AllocatedPersistentTask; @@ -68,7 +65,6 @@ import org.elasticsearch.xpack.persistent.PersistentTasksService; import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import java.io.IOException; -import java.util.List; import java.util.Objects; import java.util.function.LongSupplier; import java.util.function.Predicate; @@ -452,8 +448,9 @@ public class StartDatafeedAction @Override public void onFailure(Exception e) { if (e instanceof ResourceAlreadyExistsException) { + logger.debug(e); e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() + - "] because it has already been started", RestStatus.CONFLICT, e); + "] because it has already been started", RestStatus.CONFLICT); } listener.onFailure(e); } @@ -513,7 +510,7 @@ public class StartDatafeedAction @Override public Assignment getAssignment(DatafeedParams params, ClusterState clusterState) { - return selectNode(logger, params.getDatafeedId(), clusterState, resolver); + return new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId()).selectNode(); } @Override @@ -521,12 +518,7 @@ public class StartDatafeedAction MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); StartDatafeedAction.validate(params.getDatafeedId(), mlMetadata, tasks); - Assignment assignment = selectNode(logger, params.getDatafeedId(), clusterState, resolver); - if (assignment.getExecutorNode() == null) { - String msg = "No node found to start datafeed [" + params.getDatafeedId() - + "], allocation explanation [" + assignment.getExplanation() + "]"; - throw new ElasticsearchException(msg); - } + new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId()).checkDatafeedTaskCanBeCreated(); } @Override @@ -561,75 +553,9 @@ public class StartDatafeedAction } DatafeedJobValidator.validate(datafeed, job); JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks); - if (jobState != JobState.OPENED) { + if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) { throw ExceptionsHelper.conflictStatusException("cannot start datafeed [" + datafeedId + "] because job [" + job.getId() + - "] is not open"); + "] is " + jobState); } } - - static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState, - IndexNameExpressionResolver resolver) { - MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); - PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); - - PersistentTask jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks); - if (jobTask == null) { - String reason = "cannot start datafeed [" + datafeed.getId() + "], job task doesn't yet exist"; - logger.debug(reason); - return new Assignment(null, reason); - } - JobTaskStatus taskStatus = (JobTaskStatus) jobTask.getStatus(); - if (taskStatus == null || taskStatus.getState() != JobState.OPENED) { - // lets try again later when the job has been opened: - String taskStatusAsString = taskStatus == null ? "null" : taskStatus.getState().toString(); - String reason = "cannot start datafeed [" + datafeed.getId() + "], because job's [" + datafeed.getJobId() + - "] state is [" + taskStatusAsString + "] while state [" + JobState.OPENED + "] is required"; - logger.debug(reason); - return new Assignment(null, reason); - } - if (taskStatus.isStatusStale(jobTask)) { - String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] status is stale"; - logger.debug(reason); - return new Assignment(null, reason); - } - String reason = verifyIndicesActive(logger, datafeed, clusterState, resolver); - if (reason != null) { - return new Assignment(null, reason); - } - return new Assignment(jobTask.getExecutorNode(), ""); - } - - private static String verifyIndicesActive(Logger logger, DatafeedConfig datafeed, ClusterState clusterState, - IndexNameExpressionResolver resolver) { - List indices = datafeed.getIndices(); - for (String index : indices) { - String[] concreteIndices; - String reason = "cannot start datafeed [" + datafeed.getId() + "] because index [" - + index + "] does not exist, is closed, or is still initializing."; - - try { - concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), index); - if (concreteIndices.length == 0) { - logger.debug(reason); - return reason; - } - } catch (Exception e) { - logger.debug(reason); - return reason; - } - - for (String concreteIndex : concreteIndices) { - IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex); - if (routingTable == null || !routingTable.allPrimaryShardsActive()) { - reason = "cannot start datafeed [" + datafeed.getId() + "] because index [" - + concreteIndex + "] does not have all primary shards active yet."; - logger.debug(reason); - return reason; - } - } - } - return null; - } - } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 6bf621ca582..f674d17b237 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -8,7 +8,9 @@ package org.elasticsearch.xpack.ml.datafeed; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; @@ -25,6 +27,7 @@ import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; @@ -32,15 +35,19 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksService; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -64,6 +71,7 @@ public class DatafeedManager extends AbstractComponent { // Use allocationId as key instead of datafeed id private final ConcurrentMap runningDatafeedsOnThisNode = new ConcurrentHashMap<>(); private volatile boolean isolated; + private final TaskRunner taskRunner = new TaskRunner(); public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider, Supplier currentTimeSupplier, Auditor auditor, PersistentTasksService persistentTasksService) { @@ -75,6 +83,7 @@ public class DatafeedManager extends AbstractComponent { this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); this.auditor = Objects.requireNonNull(auditor); this.persistentTasksService = Objects.requireNonNull(persistentTasksService); + clusterService.addListener(taskRunner); } public void run(StartDatafeedAction.DatafeedTask task, Consumer handler) { @@ -99,7 +108,7 @@ public class DatafeedManager extends AbstractComponent { task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { - innerRun(holder, task.getDatafeedStartTime(), task.getEndTime()); + taskRunner.runWhenJobIsOpened(task); } @Override @@ -266,6 +275,14 @@ public class DatafeedManager extends AbstractComponent { return DataExtractorFactory.create(client, datafeed, job); } + private String getJobId(StartDatafeedAction.DatafeedTask task) { + return runningDatafeedsOnThisNode.get(task.getAllocationId()).getJobId(); + } + + private JobState getJobState(PersistentTasksCustomMetaData tasks, StartDatafeedAction.DatafeedTask datafeedTask) { + return MlMetadata.getJobState(getJobId(datafeedTask), tasks); + } + private static DataDescription buildDataDescription(Job job) { DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); @@ -335,6 +352,10 @@ public class DatafeedManager extends AbstractComponent { this.handler = handler; } + String getJobId() { + return datafeed.getJobId(); + } + boolean isRunning() { return datafeedJob.isRunning(); } @@ -406,6 +427,14 @@ public class DatafeedManager extends AbstractComponent { } private void closeJob() { + ClusterState clusterState = clusterService.state(); + PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + JobState jobState = MlMetadata.getJobState(getJobId(), tasks); + if (jobState != JobState.OPENED) { + logger.debug("[{}] No need to auto-close job as job state is [{}]", getJobId(), jobState); + return; + } + persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(20), new WaitForPersistentTaskStatusListener() { @Override @@ -448,4 +477,55 @@ public class DatafeedManager extends AbstractComponent { }); } } + + private class TaskRunner implements ClusterStateListener { + + private final List tasksToRun = new CopyOnWriteArrayList<>(); + + private void runWhenJobIsOpened(StartDatafeedAction.DatafeedTask datafeedTask) { + ClusterState clusterState = clusterService.state(); + PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (getJobState(tasks, datafeedTask) == JobState.OPENED) { + runTask(datafeedTask); + } else { + logger.info("Datafeed [{}] is waiting for job [{}] to be opened", + datafeedTask.getDatafeedId(), getJobId(datafeedTask)); + tasksToRun.add(datafeedTask); + } + } + + private void runTask(StartDatafeedAction.DatafeedTask task) { + innerRun(runningDatafeedsOnThisNode.get(task.getAllocationId()), task.getDatafeedStartTime(), task.getEndTime()); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (tasksToRun.isEmpty() || event.metaDataChanged() == false) { + return; + } + PersistentTasksCustomMetaData previousTasks = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData currentTasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (Objects.equals(previousTasks, currentTasks)) { + return; + } + + List remainingTasks = new ArrayList<>(); + for (StartDatafeedAction.DatafeedTask datafeedTask : tasksToRun) { + if (runningDatafeedsOnThisNode.containsKey(datafeedTask.getAllocationId()) == false) { + continue; + } + JobState jobState = getJobState(currentTasks, datafeedTask); + if (jobState == JobState.OPENED) { + runTask(datafeedTask); + } else if (jobState == JobState.OPENING) { + remainingTasks.add(datafeedTask); + } else { + logger.warn("Datafeed [{}] is stopping because job [{}] state is [{}]", + datafeedTask.getDatafeedId(), getJobId(datafeedTask), jobState); + datafeedTask.stop("job_never_opened", TimeValue.timeValueSeconds(20)); + } + } + tasksToRun.retainAll(remainingTasks); + } + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java new file mode 100644 index 00000000000..27da2fcb4e3 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; + +import java.util.List; +import java.util.Objects; + +public class DatafeedNodeSelector { + + private static final Logger LOGGER = Loggers.getLogger(DatafeedNodeSelector.class); + + private final DatafeedConfig datafeed; + private final PersistentTasksCustomMetaData.PersistentTask jobTask; + private final ClusterState clusterState; + private final IndexNameExpressionResolver resolver; + + public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId) { + MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); + PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + this.datafeed = mlMetadata.getDatafeed(datafeedId); + this.jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks); + this.clusterState = Objects.requireNonNull(clusterState); + this.resolver = Objects.requireNonNull(resolver); + } + + public void checkDatafeedTaskCanBeCreated() { + AssignmentFailure assignmentFailure = checkAssignment(); + if (assignmentFailure != null && assignmentFailure.isCriticalForTaskCreation) { + String msg = "No node found to start datafeed [" + datafeed.getId() + "], allocation explanation [" + assignmentFailure.reason + + "]"; + LOGGER.debug(msg); + throw ExceptionsHelper.conflictStatusException(msg); + } + } + + public PersistentTasksCustomMetaData.Assignment selectNode() { + AssignmentFailure assignmentFailure = checkAssignment(); + if (assignmentFailure == null) { + return new PersistentTasksCustomMetaData.Assignment(jobTask.getExecutorNode(), ""); + } + LOGGER.debug(assignmentFailure.reason); + return new PersistentTasksCustomMetaData.Assignment(null, assignmentFailure.reason); + } + + @Nullable + private AssignmentFailure checkAssignment() { + PriorityFailureCollector priorityFailureCollector = new PriorityFailureCollector(); + priorityFailureCollector.add(verifyIndicesActive(datafeed)); + + JobTaskStatus taskStatus = null; + JobState jobState = JobState.CLOSED; + if (jobTask != null) { + taskStatus = (JobTaskStatus) jobTask.getStatus(); + jobState = taskStatus == null ? JobState.OPENING : taskStatus.getState(); + } + + if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) { + // lets try again later when the job has been opened: + String reason = "cannot start datafeed [" + datafeed.getId() + "], because job's [" + datafeed.getJobId() + + "] state is [" + jobState + "] while state [" + JobState.OPENED + "] is required"; + priorityFailureCollector.add(new AssignmentFailure(reason, true)); + } + + if (taskStatus != null && taskStatus.isStatusStale(jobTask)) { + String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] status is stale"; + priorityFailureCollector.add(new AssignmentFailure(reason, true)); + } + + return priorityFailureCollector.get(); + } + + @Nullable + private AssignmentFailure verifyIndicesActive(DatafeedConfig datafeed) { + List indices = datafeed.getIndices(); + for (String index : indices) { + String[] concreteIndices; + String reason = "cannot start datafeed [" + datafeed.getId() + "] because index [" + + index + "] does not exist, is closed, or is still initializing."; + + try { + concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), index); + if (concreteIndices.length == 0) { + return new AssignmentFailure(reason, true); + } + } catch (Exception e) { + LOGGER.debug(reason, e); + return new AssignmentFailure(reason, true); + } + + for (String concreteIndex : concreteIndices) { + IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex); + if (routingTable == null || !routingTable.allPrimaryShardsActive()) { + reason = "cannot start datafeed [" + datafeed.getId() + "] because index [" + + concreteIndex + "] does not have all primary shards active yet."; + return new AssignmentFailure(reason, false); + } + } + } + return null; + } + + private static class AssignmentFailure { + private final String reason; + private final boolean isCriticalForTaskCreation; + + private AssignmentFailure(String reason, boolean isCriticalForTaskCreation) { + this.reason = reason; + this.isCriticalForTaskCreation = isCriticalForTaskCreation; + } + } + + /** + * Collects the first critical failure if any critical failure is added + * or the first failure otherwise + */ + private static class PriorityFailureCollector { + private AssignmentFailure failure; + + private void add(@Nullable AssignmentFailure newFailure) { + if (newFailure == null) { + return; + } + if (failure == null || (failure.isCriticalForTaskCreation == false && newFailure.isCriticalForTaskCreation)) { + failure = newFailure; + } + } + + @Nullable + private AssignmentFailure get() { + return failure; + } + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index 5a8e7dbaf73..57fd108fe48 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -7,26 +7,6 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RecoverySource; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.MlMetadata; @@ -35,249 +15,42 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; -import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collections; import java.util.Date; -import java.util.List; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask; -import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; -import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; import static org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT; import static org.hamcrest.Matchers.equalTo; public class StartDatafeedActionTests extends ESTestCase { - public void testSelectNode() throws Exception { - IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY); - IndexMetaData indexMetaData = IndexMetaData.builder("foo") - .settings(settings(Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - - MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); - Job job = createScheduledJob("job_id").build(new Date()); - mlMetadata.putJob(job, false); - mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED); - addJobTask(job.getId(), "node_id", jobState, tasksBuilder); - PersistentTasksCustomMetaData tasks = tasksBuilder.build(); - - DiscoveryNodes nodes = DiscoveryNodes.builder() - .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), - Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) - .build(); - - ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) - .put(indexMetaData, false)) - .nodes(nodes) - .routingTable(generateRoutingTable(indexMetaData)); - - Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); - assertNull(result.getExecutorNode()); - assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState + - "] while state [opened] is required", result.getExplanation()); - - tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); - tasks = tasksBuilder.build(); - cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) - .put(indexMetaData, false)) - .nodes(nodes) - .routingTable(generateRoutingTable(indexMetaData)); - result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); - assertEquals("node_id", result.getExecutorNode()); - } - - public void testShardUnassigned() throws Exception { - IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY); - IndexMetaData indexMetaData = IndexMetaData.builder("foo") - .settings(settings(Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - - MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); - Job job = createScheduledJob("job_id").build(new Date()); - mlMetadata.putJob(job, false); - - // Using wildcard index name to test for index resolving as well - mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*"))); - - DiscoveryNodes nodes = DiscoveryNodes.builder() - .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), - Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) - .build(); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); - PersistentTasksCustomMetaData tasks = tasksBuilder.build(); - - List> states = new ArrayList<>(2); - states.add(new Tuple<>(0, ShardRoutingState.UNASSIGNED)); - - ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) - .put(indexMetaData, false)) - .nodes(nodes) - .routingTable(generateRoutingTable(indexMetaData, states)); - - Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); - assertNull(result.getExecutorNode()); - assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + - "does not have all primary shards active yet.")); - } - - public void testShardNotAllActive() throws Exception { - IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY); - IndexMetaData indexMetaData = IndexMetaData.builder("foo") - .settings(settings(Version.CURRENT)) - .numberOfShards(2) - .numberOfReplicas(0) - .build(); - - MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); - Job job = createScheduledJob("job_id").build(new Date()); - mlMetadata.putJob(job, false); - - // Using wildcard index name to test for index resolving as well - mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*"))); - - DiscoveryNodes nodes = DiscoveryNodes.builder() - .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), - Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) - .build(); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); - PersistentTasksCustomMetaData tasks = tasksBuilder.build(); - - List> states = new ArrayList<>(2); - states.add(new Tuple<>(0, ShardRoutingState.STARTED)); - states.add(new Tuple<>(1, ShardRoutingState.INITIALIZING)); - - ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) - .put(indexMetaData, false)) - .nodes(nodes) - .routingTable(generateRoutingTable(indexMetaData, states)); - - Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); - assertNull(result.getExecutorNode()); - assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + - "does not have all primary shards active yet.")); - } - - public void testIndexDoesntExist() throws Exception { - IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY); - IndexMetaData indexMetaData = IndexMetaData.builder("foo") - .settings(settings(Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - - MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); - Job job = createScheduledJob("job_id").build(new Date()); - mlMetadata.putJob(job, false); - mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo"))); - - DiscoveryNodes nodes = DiscoveryNodes.builder() - .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), - Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) - .build(); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); - PersistentTasksCustomMetaData tasks = tasksBuilder.build(); - - ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) - .put(indexMetaData, false)) - .nodes(nodes) - .routingTable(generateRoutingTable(indexMetaData)); - - Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); - assertNull(result.getExecutorNode()); - assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " + - "does not exist, is closed, or is still initializing.")); - } - - public void testSelectNode_jobTaskStale() { - IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY); - IndexMetaData indexMetaData = IndexMetaData.builder("foo") - .settings(settings(Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - - MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); - Job job = createScheduledJob("job_id").build(new Date()); - mlMetadata.putJob(job, false); - mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); - - String nodeId = randomBoolean() ? "node_id2" : null; - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask(job.getId(), nodeId, JobState.OPENED, tasksBuilder); - // Set to lower allocationId, so job task is stale: - tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId(job.getId()), new JobTaskStatus(JobState.OPENED, 0)); - PersistentTasksCustomMetaData tasks = tasksBuilder.build(); - - ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) - .put(indexMetaData, false)) - .routingTable(generateRoutingTable(indexMetaData)); - - Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); - assertNull(result.getExecutorNode()); - assertEquals("cannot start datafeed [datafeed_id], job [job_id] status is stale", - result.getExplanation()); - - tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder); - tasks = tasksBuilder.build(); - cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) - .put(indexMetaData, false)) - .routingTable(generateRoutingTable(indexMetaData)); - result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver); - assertEquals("node_id1", result.getExecutorNode()); - } - - public void testValidate() { - Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date()); - MlMetadata mlMetadata1 = new MlMetadata.Builder() - .putJob(job1, false) + public void testValidate_GivenDatafeedIsMissing() { + Job job = DatafeedManagerTests.createDatafeedJob().build(new Date()); + MlMetadata mlMetadata = new MlMetadata.Builder() + .putJob(job, false) .build(); Exception e = expectThrows(ResourceNotFoundException.class, - () -> StartDatafeedAction.validate("some-datafeed", mlMetadata1, null)); + () -> StartDatafeedAction.validate("some-datafeed", mlMetadata, null)); assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists")); } public void testValidate_jobClosed() { + Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date()); + MlMetadata mlMetadata1 = new MlMetadata.Builder() + .putJob(job1, false) + .build(); + PersistentTasksCustomMetaData tasks = PersistentTasksCustomMetaData.builder().build(); + DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); + MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) + .putDatafeed(datafeedConfig1) + .build(); + Exception e = expectThrows(ElasticsearchStatusException.class, + () -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks)); + assertThat(e.getMessage(), equalTo("cannot start datafeed [foo-datafeed] because job [job_id] is closed")); + } + + public void testValidate_jobOpening() { Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date()); MlMetadata mlMetadata1 = new MlMetadata.Builder() .putJob(job1, false) @@ -289,9 +62,24 @@ public class StartDatafeedActionTests extends ESTestCase { MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) .putDatafeed(datafeedConfig1) .build(); - Exception e = expectThrows(ElasticsearchStatusException.class, - () -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks)); - assertThat(e.getMessage(), equalTo("cannot start datafeed [foo-datafeed] because job [job_id] is not open")); + + StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks); + } + + public void testValidate_jobOpened() { + Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date()); + MlMetadata mlMetadata1 = new MlMetadata.Builder() + .putJob(job1, false) + .build(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", INITIAL_ASSIGNMENT.getExecutorNode(), JobState.OPENED, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); + DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); + MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) + .putDatafeed(datafeedConfig1) + .build(); + + StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks); } public static StartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action, @@ -302,45 +90,4 @@ public class StartDatafeedActionTests extends ESTestCase { task.datafeedManager = datafeedManager; return task; } - - private RoutingTable generateRoutingTable(IndexMetaData indexMetaData) { - List> states = new ArrayList<>(1); - states.add(new Tuple<>(0, ShardRoutingState.STARTED)); - return generateRoutingTable(indexMetaData, states); - } - - private RoutingTable generateRoutingTable(IndexMetaData indexMetaData, List> states) { - IndexRoutingTable.Builder rtBuilder = IndexRoutingTable.builder(indexMetaData.getIndex()); - - final String index = indexMetaData.getIndex().getName(); - int counter = 0; - for (Tuple state : states) { - ShardId shardId = new ShardId(index, "_na_", counter); - IndexShardRoutingTable.Builder shardRTBuilder = new IndexShardRoutingTable.Builder(shardId); - ShardRouting shardRouting; - - if (state.v2().equals(ShardRoutingState.STARTED)) { - shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(), - "node_" + Integer.toString(state.v1()), null, true, ShardRoutingState.STARTED); - } else if (state.v2().equals(ShardRoutingState.INITIALIZING)) { - shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(), - "node_" + Integer.toString(state.v1()), null, true, ShardRoutingState.INITIALIZING); - } else if (state.v2().equals(ShardRoutingState.RELOCATING)) { - shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(), - "node_" + Integer.toString(state.v1()), "node_" + Integer.toString((state.v1() + 1) % 3), - true, ShardRoutingState.RELOCATING); - } else { - shardRouting = ShardRouting.newUnassigned(shardId, true, - RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); - } - - shardRTBuilder.addShard(shardRouting); - rtBuilder.addIndexShard(shardRTBuilder.build()); - counter += 1; - } - - return new RoutingTable.Builder().add(rtBuilder).build(); - } - } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index cfb6316f3b3..8c5bab53d94 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -10,8 +10,10 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -55,7 +57,6 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; import java.net.InetAddress; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.Optional; @@ -90,6 +91,7 @@ public class DatafeedManagerTests extends ESTestCase { private DatafeedManager datafeedManager; private long currentTime = 120000; private Auditor auditor; + private ArgumentCaptor capturedClusterStateListener = ArgumentCaptor.forClass(ClusterStateListener.class); @Before @SuppressWarnings("unchecked") @@ -148,6 +150,7 @@ public class DatafeedManagerTests extends ESTestCase { when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); + datafeedManager = new DatafeedManager(threadPool, client, clusterService, jobProvider, () -> currentTime, auditor, persistentTasksService) { @Override @@ -156,6 +159,8 @@ public class DatafeedManagerTests extends ESTestCase { } }; + verify(clusterService).addListener(capturedClusterStateListener.capture()); + doAnswer(invocationOnMock -> { @SuppressWarnings("rawtypes") Consumer consumer = (Consumer) invocationOnMock.getArguments()[3]; @@ -202,8 +207,7 @@ public class DatafeedManagerTests extends ESTestCase { verify(client).execute(same(FlushJobAction.INSTANCE), any()); } - private static PostDataAction.Request createExpectedPostDataRequest(String jobId, - byte[] contentBytes, XContentType xContentType) { + private static PostDataAction.Request createExpectedPostDataRequest(String jobId, byte[] contentBytes, XContentType xContentType) { DataDescription.Builder expectedDataDescription = new DataDescription.Builder(); expectedDataDescription.setTimeFormat("epoch_ms"); expectedDataDescription.setFormat(DataDescription.DataFormat.XCONTENT); @@ -354,17 +358,129 @@ public class DatafeedManagerTests extends ESTestCase { } } + public void testDatafeedTaskWaitsUntilJobIsOpened() throws Exception { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); + ClusterState.Builder cs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, clusterService.state().getMetaData().custom(MlMetadata.TYPE)) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + when(clusterService.state()).thenReturn(cs.build()); + + DataExtractor dataExtractor = mock(DataExtractor.class); + when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor); + when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); + when(dataExtractor.next()).thenReturn(Optional.empty()); + Consumer handler = mockConsumer(); + DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); + datafeedManager.run(task, handler); + + // Verify datafeed has not started running yet as job is still opening + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); + addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder); + ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, clusterService.state().getMetaData().custom(MlMetadata.TYPE)) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + + capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build())); + + // Still no run + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); + ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, clusterService.state().getMetaData().custom(MlMetadata.TYPE)) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + + capturedClusterStateListener.getValue().clusterChanged( + new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build())); + + // Now it should run as the job state chanded to OPENED + verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + } + + public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() throws Exception { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); + ClusterState.Builder cs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, clusterService.state().getMetaData().custom(MlMetadata.TYPE)) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + when(clusterService.state()).thenReturn(cs.build()); + + DataExtractor dataExtractor = mock(DataExtractor.class); + when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor); + when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); + when(dataExtractor.next()).thenReturn(Optional.empty()); + Consumer handler = mockConsumer(); + DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); + datafeedManager.run(task, handler); + + // Verify datafeed has not started running yet as job is still opening + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.FAILED, tasksBuilder); + ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, clusterService.state().getMetaData().custom(MlMetadata.TYPE)) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + + capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", updatedCs.build(), cs.build())); + + // Verify task never run and got stopped + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + verify(task).stop("job_never_opened", TimeValue.timeValueSeconds(20)); + } + + public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() throws Exception { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); + ClusterState.Builder cs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, clusterService.state().getMetaData().custom(MlMetadata.TYPE)) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + when(clusterService.state()).thenReturn(cs.build()); + + DataExtractor dataExtractor = mock(DataExtractor.class); + when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor); + when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); + when(dataExtractor.next()).thenReturn(Optional.empty()); + Consumer handler = mockConsumer(); + DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); + datafeedManager.run(task, handler); + + // Verify datafeed has not started running yet as job is still opening + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + + // Stop the datafeed + datafeedManager.stopDatafeed(task, "test", StopDatafeedAction.DEFAULT_TIMEOUT); + + // Update job state to opened + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); + ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, clusterService.state().getMetaData().custom(MlMetadata.TYPE)) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + + capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", cs.build(), updatedCs.build())); + + // Verify no datafeed was run + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + } + public static DatafeedConfig.Builder createDatafeedConfig(String datafeedId, String jobId) { DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, jobId); - datafeedConfig.setIndices(Arrays.asList("myIndex")); - datafeedConfig.setTypes(Arrays.asList("myType")); + datafeedConfig.setIndices(Collections.singletonList("myIndex")); + datafeedConfig.setTypes(Collections.singletonList("myType")); return datafeedConfig; } public static Job.Builder createDatafeedJob() { - AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); + AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Collections.singletonList( + new Detector.Builder("metric", "field").build())); acBuilder.setBucketSpan(TimeValue.timeValueHours(1)); - acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build())); + acBuilder.setDetectors(Collections.singletonList(new Detector.Builder("metric", "field").build())); Job.Builder builder = new Job.Builder("job_id"); builder.setAnalysisConfig(acBuilder); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java new file mode 100644 index 00000000000..8ce1fc1c983 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -0,0 +1,342 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; +import org.junit.Before; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class DatafeedNodeSelectorTests extends ESTestCase { + + private IndexNameExpressionResolver resolver; + private DiscoveryNodes nodes; + private ClusterState clusterState; + private MlMetadata mlMetadata; + private PersistentTasksCustomMetaData tasks; + + @Before + public void init() { + resolver = new IndexNameExpressionResolver(Settings.EMPTY); + nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) + .build(); + } + + public void testSelectNode_GivenJobIsOpened() throws Exception { + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); + Job job = createScheduledJob("job_id").build(new Date()); + mlMetadataBuilder.putJob(job, false); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); + mlMetadata = mlMetadataBuilder.build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); + tasks = tasksBuilder.build(); + + givenClusterState("foo", 1, 0); + + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + assertEquals("node_id", result.getExecutorNode()); + new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + } + + public void testSelectNode_GivenJobIsOpening() throws Exception { + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); + Job job = createScheduledJob("job_id").build(new Date()); + mlMetadataBuilder.putJob(job, false); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); + mlMetadata = mlMetadataBuilder.build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), "node_id", null, tasksBuilder); + tasks = tasksBuilder.build(); + + givenClusterState("foo", 1, 0); + + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + assertEquals("node_id", result.getExecutorNode()); + new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + } + + public void testNoJobTask() throws Exception { + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); + Job job = createScheduledJob("job_id").build(new Date()); + mlMetadataBuilder.putJob(job, false); + + // Using wildcard index name to test for index resolving as well + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*"))); + mlMetadata = mlMetadataBuilder.build(); + + tasks = PersistentTasksCustomMetaData.builder().build(); + + givenClusterState("foo", 1, 0); + + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + assertNull(result.getExecutorNode()); + assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id], because job's [job_id] state is " + + "[closed] while state [opened] is required")); + + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + + "[cannot start datafeed [datafeed_id], because job's [job_id] state is [closed] while state [opened] is required]")); + } + + public void testSelectNode_GivenJobFailedOrClosed() throws Exception { + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); + Job job = createScheduledJob("job_id").build(new Date()); + mlMetadataBuilder.putJob(job, false); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); + mlMetadata = mlMetadataBuilder.build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED); + addJobTask(job.getId(), "node_id", jobState, tasksBuilder); + tasks = tasksBuilder.build(); + + givenClusterState("foo", 1, 0); + + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + assertNull(result.getExecutorNode()); + assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState + + "] while state [opened] is required", result.getExplanation()); + + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + + "[cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState + + "] while state [opened] is required]")); + } + + public void testShardUnassigned() throws Exception { + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); + Job job = createScheduledJob("job_id").build(new Date()); + mlMetadataBuilder.putJob(job, false); + + // Using wildcard index name to test for index resolving as well + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*"))); + mlMetadata = mlMetadataBuilder.build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); + tasks = tasksBuilder.build(); + + List> states = new ArrayList<>(2); + states.add(new Tuple<>(0, ShardRoutingState.UNASSIGNED)); + + givenClusterState("foo", 1, 0, states); + + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + assertNull(result.getExecutorNode()); + assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + + "does not have all primary shards active yet.")); + + new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + } + + public void testShardNotAllActive() throws Exception { + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); + Job job = createScheduledJob("job_id").build(new Date()); + mlMetadataBuilder.putJob(job, false); + + // Using wildcard index name to test for index resolving as well + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*"))); + mlMetadata = mlMetadataBuilder.build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); + tasks = tasksBuilder.build(); + + List> states = new ArrayList<>(2); + states.add(new Tuple<>(0, ShardRoutingState.STARTED)); + states.add(new Tuple<>(1, ShardRoutingState.INITIALIZING)); + + givenClusterState("foo", 2, 0, states); + + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + assertNull(result.getExecutorNode()); + assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + + "does not have all primary shards active yet.")); + + new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + } + + public void testIndexDoesntExist() throws Exception { + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); + Job job = createScheduledJob("job_id").build(new Date()); + mlMetadataBuilder.putJob(job, false); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo"))); + mlMetadata = mlMetadataBuilder.build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); + tasks = tasksBuilder.build(); + + givenClusterState("foo", 1, 0); + + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + assertNull(result.getExecutorNode()); + assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " + + "does not exist, is closed, or is still initializing.")); + + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); + } + + public void testSelectNode_jobTaskStale() { + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); + Job job = createScheduledJob("job_id").build(new Date()); + mlMetadataBuilder.putJob(job, false); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); + mlMetadata = mlMetadataBuilder.build(); + + String nodeId = randomBoolean() ? "node_id2" : null; + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), nodeId, JobState.OPENED, tasksBuilder); + // Set to lower allocationId, so job task is stale: + tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId(job.getId()), new JobTaskStatus(JobState.OPENED, 0)); + tasks = tasksBuilder.build(); + + givenClusterState("foo", 1, 0); + + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + assertNull(result.getExecutorNode()); + assertEquals("cannot start datafeed [datafeed_id], job [job_id] status is stale", + result.getExplanation()); + + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + + "[cannot start datafeed [datafeed_id], job [job_id] status is stale]")); + + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder); + tasks = tasksBuilder.build(); + givenClusterState("foo", 1, 0); + result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + assertEquals("node_id1", result.getExecutorNode()); + new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + } + + public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() throws Exception { + // Here we test that when there are 2 problems, the most critical gets reported first. + // In this case job is Opening (non-critical) and the index does not exist (critical) + + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); + Job job = createScheduledJob("job_id").build(new Date()); + mlMetadataBuilder.putJob(job, false); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo"))); + mlMetadata = mlMetadataBuilder.build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), "node_id", JobState.OPENING, tasksBuilder); + tasks = tasksBuilder.build(); + + givenClusterState("foo", 1, 0); + + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); + } + + private void givenClusterState(String index, int numberOfShards, int numberOfReplicas) { + List> states = new ArrayList<>(1); + states.add(new Tuple<>(0, ShardRoutingState.STARTED)); + givenClusterState(index, numberOfShards, numberOfReplicas, states); + } + + private void givenClusterState(String index, int numberOfShards, int numberOfReplicas, List> states) { + IndexMetaData indexMetaData = IndexMetaData.builder(index) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + .build(); + + clusterState = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(new MetaData.Builder() + .putCustom(MlMetadata.TYPE, mlMetadata) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) + .put(indexMetaData, false)) + .nodes(nodes) + .routingTable(generateRoutingTable(indexMetaData, states)) + .build(); + } + + private static RoutingTable generateRoutingTable(IndexMetaData indexMetaData, List> states) { + IndexRoutingTable.Builder rtBuilder = IndexRoutingTable.builder(indexMetaData.getIndex()); + + final String index = indexMetaData.getIndex().getName(); + int counter = 0; + for (Tuple state : states) { + ShardId shardId = new ShardId(index, "_na_", counter); + IndexShardRoutingTable.Builder shardRTBuilder = new IndexShardRoutingTable.Builder(shardId); + ShardRouting shardRouting; + + if (state.v2().equals(ShardRoutingState.STARTED)) { + shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(), + "node_" + Integer.toString(state.v1()), null, true, ShardRoutingState.STARTED); + } else if (state.v2().equals(ShardRoutingState.INITIALIZING)) { + shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(), + "node_" + Integer.toString(state.v1()), null, true, ShardRoutingState.INITIALIZING); + } else if (state.v2().equals(ShardRoutingState.RELOCATING)) { + shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(), + "node_" + Integer.toString(state.v1()), "node_" + Integer.toString((state.v1() + 1) % 3), + true, ShardRoutingState.RELOCATING); + } else { + shardRouting = ShardRouting.newUnassigned(shardId, true, + RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + } + + shardRTBuilder.addShard(shardRouting); + rtBuilder.addIndexShard(shardRTBuilder.build()); + counter += 1; + } + + return new RoutingTable.Builder().add(rtBuilder).build(); + } +} \ No newline at end of file diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml index 38c03d33b5b..4d123241f33 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml @@ -110,7 +110,7 @@ setup: datafeed_id: "start-stop-datafeed-datafeed-1" start: 0 - do: - catch: /cannot start datafeed \[start-stop-datafeed-datafeed-1\] because job \[start-stop-datafeed-job\] is not open/ + catch: /cannot start datafeed \[start-stop-datafeed-datafeed-1\] because job \[start-stop-datafeed-job\] is closed/ xpack.ml.start_datafeed: datafeed_id: "start-stop-datafeed-datafeed-1" start: 0