[ML] Allow datafeed to start when job is opening (elastic/x-pack-elasticsearch#1611)
The goal of this change is to allow datafeeds to start when the job is in the opening state. This makes the API more async and it allows clients like the ML UI to open a job and start its datafeed without having to manage the complexity of dealing with timeouts due to the job taking time to open due to restoring a large state. In order to achieve this, this commit does a number of things: - accepts a start datafeed request when the job is opening - adds logic to the DatafeedManager to wait before running the datafeed task until the job is opened - refactord the datafeed node selection logic into its own class - splitd selection issues in critical and non-critical with regard to creating the datafeed task - refactord the unit tests to make simpler to write & understand - adds unit tests for added and modified functionality - changes the response when the datafeed cannot be started to be a conflict exception relates elastic/x-pack-elasticsearch#1535 Original commit: elastic/x-pack-elasticsearch@c83196155d
This commit is contained in:
parent
fe33d8eba4
commit
15e53280dc
|
@ -5,7 +5,6 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.action;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
|
@ -16,7 +15,6 @@ import org.elasticsearch.action.ActionRequestBuilder;
|
|||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ValidateActions;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.MasterNodeRequest;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
|
@ -25,7 +23,6 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
|
@ -52,10 +49,10 @@ import org.elasticsearch.xpack.ml.MlMetadata;
|
|||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.AllocatedPersistentTask;
|
||||
|
@ -68,7 +65,6 @@ import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
|||
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.function.LongSupplier;
|
||||
import java.util.function.Predicate;
|
||||
|
@ -452,8 +448,9 @@ public class StartDatafeedAction
|
|||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (e instanceof ResourceAlreadyExistsException) {
|
||||
logger.debug(e);
|
||||
e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() +
|
||||
"] because it has already been started", RestStatus.CONFLICT, e);
|
||||
"] because it has already been started", RestStatus.CONFLICT);
|
||||
}
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
@ -513,7 +510,7 @@ public class StartDatafeedAction
|
|||
|
||||
@Override
|
||||
public Assignment getAssignment(DatafeedParams params, ClusterState clusterState) {
|
||||
return selectNode(logger, params.getDatafeedId(), clusterState, resolver);
|
||||
return new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId()).selectNode();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -521,12 +518,7 @@ public class StartDatafeedAction
|
|||
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
|
||||
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
StartDatafeedAction.validate(params.getDatafeedId(), mlMetadata, tasks);
|
||||
Assignment assignment = selectNode(logger, params.getDatafeedId(), clusterState, resolver);
|
||||
if (assignment.getExecutorNode() == null) {
|
||||
String msg = "No node found to start datafeed [" + params.getDatafeedId()
|
||||
+ "], allocation explanation [" + assignment.getExplanation() + "]";
|
||||
throw new ElasticsearchException(msg);
|
||||
}
|
||||
new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId()).checkDatafeedTaskCanBeCreated();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -561,75 +553,9 @@ public class StartDatafeedAction
|
|||
}
|
||||
DatafeedJobValidator.validate(datafeed, job);
|
||||
JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks);
|
||||
if (jobState != JobState.OPENED) {
|
||||
if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) {
|
||||
throw ExceptionsHelper.conflictStatusException("cannot start datafeed [" + datafeedId + "] because job [" + job.getId() +
|
||||
"] is not open");
|
||||
"] is " + jobState);
|
||||
}
|
||||
}
|
||||
|
||||
static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState,
|
||||
IndexNameExpressionResolver resolver) {
|
||||
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
|
||||
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
|
||||
|
||||
PersistentTask<?> jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks);
|
||||
if (jobTask == null) {
|
||||
String reason = "cannot start datafeed [" + datafeed.getId() + "], job task doesn't yet exist";
|
||||
logger.debug(reason);
|
||||
return new Assignment(null, reason);
|
||||
}
|
||||
JobTaskStatus taskStatus = (JobTaskStatus) jobTask.getStatus();
|
||||
if (taskStatus == null || taskStatus.getState() != JobState.OPENED) {
|
||||
// lets try again later when the job has been opened:
|
||||
String taskStatusAsString = taskStatus == null ? "null" : taskStatus.getState().toString();
|
||||
String reason = "cannot start datafeed [" + datafeed.getId() + "], because job's [" + datafeed.getJobId() +
|
||||
"] state is [" + taskStatusAsString + "] while state [" + JobState.OPENED + "] is required";
|
||||
logger.debug(reason);
|
||||
return new Assignment(null, reason);
|
||||
}
|
||||
if (taskStatus.isStatusStale(jobTask)) {
|
||||
String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] status is stale";
|
||||
logger.debug(reason);
|
||||
return new Assignment(null, reason);
|
||||
}
|
||||
String reason = verifyIndicesActive(logger, datafeed, clusterState, resolver);
|
||||
if (reason != null) {
|
||||
return new Assignment(null, reason);
|
||||
}
|
||||
return new Assignment(jobTask.getExecutorNode(), "");
|
||||
}
|
||||
|
||||
private static String verifyIndicesActive(Logger logger, DatafeedConfig datafeed, ClusterState clusterState,
|
||||
IndexNameExpressionResolver resolver) {
|
||||
List<String> indices = datafeed.getIndices();
|
||||
for (String index : indices) {
|
||||
String[] concreteIndices;
|
||||
String reason = "cannot start datafeed [" + datafeed.getId() + "] because index ["
|
||||
+ index + "] does not exist, is closed, or is still initializing.";
|
||||
|
||||
try {
|
||||
concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), index);
|
||||
if (concreteIndices.length == 0) {
|
||||
logger.debug(reason);
|
||||
return reason;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.debug(reason);
|
||||
return reason;
|
||||
}
|
||||
|
||||
for (String concreteIndex : concreteIndices) {
|
||||
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex);
|
||||
if (routingTable == null || !routingTable.allPrimaryShardsActive()) {
|
||||
reason = "cannot start datafeed [" + datafeed.getId() + "] because index ["
|
||||
+ concreteIndex + "] does not have all primary shards active yet.";
|
||||
logger.debug(reason);
|
||||
return reason;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -8,7 +8,9 @@ package org.elasticsearch.xpack.ml.datafeed;
|
|||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -25,6 +27,7 @@ import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
|||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
|
@ -32,15 +35,19 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
|
|||
import org.elasticsearch.xpack.ml.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.ml.job.results.Result;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
@ -64,6 +71,7 @@ public class DatafeedManager extends AbstractComponent {
|
|||
// Use allocationId as key instead of datafeed id
|
||||
private final ConcurrentMap<Long, Holder> runningDatafeedsOnThisNode = new ConcurrentHashMap<>();
|
||||
private volatile boolean isolated;
|
||||
private final TaskRunner taskRunner = new TaskRunner();
|
||||
|
||||
public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider,
|
||||
Supplier<Long> currentTimeSupplier, Auditor auditor, PersistentTasksService persistentTasksService) {
|
||||
|
@ -75,6 +83,7 @@ public class DatafeedManager extends AbstractComponent {
|
|||
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
|
||||
this.auditor = Objects.requireNonNull(auditor);
|
||||
this.persistentTasksService = Objects.requireNonNull(persistentTasksService);
|
||||
clusterService.addListener(taskRunner);
|
||||
}
|
||||
|
||||
public void run(StartDatafeedAction.DatafeedTask task, Consumer<Exception> handler) {
|
||||
|
@ -99,7 +108,7 @@ public class DatafeedManager extends AbstractComponent {
|
|||
task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
innerRun(holder, task.getDatafeedStartTime(), task.getEndTime());
|
||||
taskRunner.runWhenJobIsOpened(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -266,6 +275,14 @@ public class DatafeedManager extends AbstractComponent {
|
|||
return DataExtractorFactory.create(client, datafeed, job);
|
||||
}
|
||||
|
||||
private String getJobId(StartDatafeedAction.DatafeedTask task) {
|
||||
return runningDatafeedsOnThisNode.get(task.getAllocationId()).getJobId();
|
||||
}
|
||||
|
||||
private JobState getJobState(PersistentTasksCustomMetaData tasks, StartDatafeedAction.DatafeedTask datafeedTask) {
|
||||
return MlMetadata.getJobState(getJobId(datafeedTask), tasks);
|
||||
}
|
||||
|
||||
private static DataDescription buildDataDescription(Job job) {
|
||||
DataDescription.Builder dataDescription = new DataDescription.Builder();
|
||||
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
|
||||
|
@ -335,6 +352,10 @@ public class DatafeedManager extends AbstractComponent {
|
|||
this.handler = handler;
|
||||
}
|
||||
|
||||
String getJobId() {
|
||||
return datafeed.getJobId();
|
||||
}
|
||||
|
||||
boolean isRunning() {
|
||||
return datafeedJob.isRunning();
|
||||
}
|
||||
|
@ -406,6 +427,14 @@ public class DatafeedManager extends AbstractComponent {
|
|||
}
|
||||
|
||||
private void closeJob() {
|
||||
ClusterState clusterState = clusterService.state();
|
||||
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
JobState jobState = MlMetadata.getJobState(getJobId(), tasks);
|
||||
if (jobState != JobState.OPENED) {
|
||||
logger.debug("[{}] No need to auto-close job as job state is [{}]", getJobId(), jobState);
|
||||
return;
|
||||
}
|
||||
|
||||
persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(20),
|
||||
new WaitForPersistentTaskStatusListener<StartDatafeedAction.DatafeedParams>() {
|
||||
@Override
|
||||
|
@ -448,4 +477,55 @@ public class DatafeedManager extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
private class TaskRunner implements ClusterStateListener {
|
||||
|
||||
private final List<StartDatafeedAction.DatafeedTask> tasksToRun = new CopyOnWriteArrayList<>();
|
||||
|
||||
private void runWhenJobIsOpened(StartDatafeedAction.DatafeedTask datafeedTask) {
|
||||
ClusterState clusterState = clusterService.state();
|
||||
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
if (getJobState(tasks, datafeedTask) == JobState.OPENED) {
|
||||
runTask(datafeedTask);
|
||||
} else {
|
||||
logger.info("Datafeed [{}] is waiting for job [{}] to be opened",
|
||||
datafeedTask.getDatafeedId(), getJobId(datafeedTask));
|
||||
tasksToRun.add(datafeedTask);
|
||||
}
|
||||
}
|
||||
|
||||
private void runTask(StartDatafeedAction.DatafeedTask task) {
|
||||
innerRun(runningDatafeedsOnThisNode.get(task.getAllocationId()), task.getDatafeedStartTime(), task.getEndTime());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (tasksToRun.isEmpty() || event.metaDataChanged() == false) {
|
||||
return;
|
||||
}
|
||||
PersistentTasksCustomMetaData previousTasks = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
PersistentTasksCustomMetaData currentTasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
if (Objects.equals(previousTasks, currentTasks)) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<StartDatafeedAction.DatafeedTask> remainingTasks = new ArrayList<>();
|
||||
for (StartDatafeedAction.DatafeedTask datafeedTask : tasksToRun) {
|
||||
if (runningDatafeedsOnThisNode.containsKey(datafeedTask.getAllocationId()) == false) {
|
||||
continue;
|
||||
}
|
||||
JobState jobState = getJobState(currentTasks, datafeedTask);
|
||||
if (jobState == JobState.OPENED) {
|
||||
runTask(datafeedTask);
|
||||
} else if (jobState == JobState.OPENING) {
|
||||
remainingTasks.add(datafeedTask);
|
||||
} else {
|
||||
logger.warn("Datafeed [{}] is stopping because job [{}] state is [{}]",
|
||||
datafeedTask.getDatafeedId(), getJobId(datafeedTask), jobState);
|
||||
datafeedTask.stop("job_never_opened", TimeValue.timeValueSeconds(20));
|
||||
}
|
||||
}
|
||||
tasksToRun.retainAll(remainingTasks);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.datafeed;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class DatafeedNodeSelector {
|
||||
|
||||
private static final Logger LOGGER = Loggers.getLogger(DatafeedNodeSelector.class);
|
||||
|
||||
private final DatafeedConfig datafeed;
|
||||
private final PersistentTasksCustomMetaData.PersistentTask<?> jobTask;
|
||||
private final ClusterState clusterState;
|
||||
private final IndexNameExpressionResolver resolver;
|
||||
|
||||
public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId) {
|
||||
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
|
||||
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
this.datafeed = mlMetadata.getDatafeed(datafeedId);
|
||||
this.jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks);
|
||||
this.clusterState = Objects.requireNonNull(clusterState);
|
||||
this.resolver = Objects.requireNonNull(resolver);
|
||||
}
|
||||
|
||||
public void checkDatafeedTaskCanBeCreated() {
|
||||
AssignmentFailure assignmentFailure = checkAssignment();
|
||||
if (assignmentFailure != null && assignmentFailure.isCriticalForTaskCreation) {
|
||||
String msg = "No node found to start datafeed [" + datafeed.getId() + "], allocation explanation [" + assignmentFailure.reason
|
||||
+ "]";
|
||||
LOGGER.debug(msg);
|
||||
throw ExceptionsHelper.conflictStatusException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
public PersistentTasksCustomMetaData.Assignment selectNode() {
|
||||
AssignmentFailure assignmentFailure = checkAssignment();
|
||||
if (assignmentFailure == null) {
|
||||
return new PersistentTasksCustomMetaData.Assignment(jobTask.getExecutorNode(), "");
|
||||
}
|
||||
LOGGER.debug(assignmentFailure.reason);
|
||||
return new PersistentTasksCustomMetaData.Assignment(null, assignmentFailure.reason);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private AssignmentFailure checkAssignment() {
|
||||
PriorityFailureCollector priorityFailureCollector = new PriorityFailureCollector();
|
||||
priorityFailureCollector.add(verifyIndicesActive(datafeed));
|
||||
|
||||
JobTaskStatus taskStatus = null;
|
||||
JobState jobState = JobState.CLOSED;
|
||||
if (jobTask != null) {
|
||||
taskStatus = (JobTaskStatus) jobTask.getStatus();
|
||||
jobState = taskStatus == null ? JobState.OPENING : taskStatus.getState();
|
||||
}
|
||||
|
||||
if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) {
|
||||
// lets try again later when the job has been opened:
|
||||
String reason = "cannot start datafeed [" + datafeed.getId() + "], because job's [" + datafeed.getJobId() +
|
||||
"] state is [" + jobState + "] while state [" + JobState.OPENED + "] is required";
|
||||
priorityFailureCollector.add(new AssignmentFailure(reason, true));
|
||||
}
|
||||
|
||||
if (taskStatus != null && taskStatus.isStatusStale(jobTask)) {
|
||||
String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] status is stale";
|
||||
priorityFailureCollector.add(new AssignmentFailure(reason, true));
|
||||
}
|
||||
|
||||
return priorityFailureCollector.get();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private AssignmentFailure verifyIndicesActive(DatafeedConfig datafeed) {
|
||||
List<String> indices = datafeed.getIndices();
|
||||
for (String index : indices) {
|
||||
String[] concreteIndices;
|
||||
String reason = "cannot start datafeed [" + datafeed.getId() + "] because index ["
|
||||
+ index + "] does not exist, is closed, or is still initializing.";
|
||||
|
||||
try {
|
||||
concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), index);
|
||||
if (concreteIndices.length == 0) {
|
||||
return new AssignmentFailure(reason, true);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.debug(reason, e);
|
||||
return new AssignmentFailure(reason, true);
|
||||
}
|
||||
|
||||
for (String concreteIndex : concreteIndices) {
|
||||
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex);
|
||||
if (routingTable == null || !routingTable.allPrimaryShardsActive()) {
|
||||
reason = "cannot start datafeed [" + datafeed.getId() + "] because index ["
|
||||
+ concreteIndex + "] does not have all primary shards active yet.";
|
||||
return new AssignmentFailure(reason, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static class AssignmentFailure {
|
||||
private final String reason;
|
||||
private final boolean isCriticalForTaskCreation;
|
||||
|
||||
private AssignmentFailure(String reason, boolean isCriticalForTaskCreation) {
|
||||
this.reason = reason;
|
||||
this.isCriticalForTaskCreation = isCriticalForTaskCreation;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Collects the first critical failure if any critical failure is added
|
||||
* or the first failure otherwise
|
||||
*/
|
||||
private static class PriorityFailureCollector {
|
||||
private AssignmentFailure failure;
|
||||
|
||||
private void add(@Nullable AssignmentFailure newFailure) {
|
||||
if (newFailure == null) {
|
||||
return;
|
||||
}
|
||||
if (failure == null || (failure.isCriticalForTaskCreation == false && newFailure.isCriticalForTaskCreation)) {
|
||||
failure = newFailure;
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private AssignmentFailure get() {
|
||||
return failure;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -7,26 +7,6 @@ package org.elasticsearch.xpack.ml.action;
|
|||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
|
@ -35,249 +15,42 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
|
|||
import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask;
|
||||
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
|
||||
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
|
||||
import static org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class StartDatafeedActionTests extends ESTestCase {
|
||||
|
||||
public void testSelectNode() throws Exception {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("foo")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadata.putJob(job, false);
|
||||
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")));
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED);
|
||||
addJobTask(job.getId(), "node_id", jobState, tasksBuilder);
|
||||
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
|
||||
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
|
||||
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
|
||||
.build();
|
||||
|
||||
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.nodes(nodes)
|
||||
.routingTable(generateRoutingTable(indexMetaData));
|
||||
|
||||
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState +
|
||||
"] while state [opened] is required", result.getExplanation());
|
||||
|
||||
tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
|
||||
tasks = tasksBuilder.build();
|
||||
cs = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.nodes(nodes)
|
||||
.routingTable(generateRoutingTable(indexMetaData));
|
||||
result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
|
||||
assertEquals("node_id", result.getExecutorNode());
|
||||
}
|
||||
|
||||
public void testShardUnassigned() throws Exception {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("foo")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadata.putJob(job, false);
|
||||
|
||||
// Using wildcard index name to test for index resolving as well
|
||||
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")));
|
||||
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
|
||||
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
|
||||
.build();
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
|
||||
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
|
||||
|
||||
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2);
|
||||
states.add(new Tuple<>(0, ShardRoutingState.UNASSIGNED));
|
||||
|
||||
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.nodes(nodes)
|
||||
.routingTable(generateRoutingTable(indexMetaData, states));
|
||||
|
||||
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " +
|
||||
"does not have all primary shards active yet."));
|
||||
}
|
||||
|
||||
public void testShardNotAllActive() throws Exception {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("foo")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(2)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadata.putJob(job, false);
|
||||
|
||||
// Using wildcard index name to test for index resolving as well
|
||||
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")));
|
||||
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
|
||||
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
|
||||
.build();
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
|
||||
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
|
||||
|
||||
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2);
|
||||
states.add(new Tuple<>(0, ShardRoutingState.STARTED));
|
||||
states.add(new Tuple<>(1, ShardRoutingState.INITIALIZING));
|
||||
|
||||
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.nodes(nodes)
|
||||
.routingTable(generateRoutingTable(indexMetaData, states));
|
||||
|
||||
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " +
|
||||
"does not have all primary shards active yet."));
|
||||
}
|
||||
|
||||
public void testIndexDoesntExist() throws Exception {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("foo")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadata.putJob(job, false);
|
||||
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")));
|
||||
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
|
||||
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
|
||||
.build();
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
|
||||
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
|
||||
|
||||
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.nodes(nodes)
|
||||
.routingTable(generateRoutingTable(indexMetaData));
|
||||
|
||||
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " +
|
||||
"does not exist, is closed, or is still initializing."));
|
||||
}
|
||||
|
||||
public void testSelectNode_jobTaskStale() {
|
||||
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("foo")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadata.putJob(job, false);
|
||||
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")));
|
||||
|
||||
String nodeId = randomBoolean() ? "node_id2" : null;
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), nodeId, JobState.OPENED, tasksBuilder);
|
||||
// Set to lower allocationId, so job task is stale:
|
||||
tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId(job.getId()), new JobTaskStatus(JobState.OPENED, 0));
|
||||
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
|
||||
|
||||
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.routingTable(generateRoutingTable(indexMetaData));
|
||||
|
||||
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
|
||||
assertNull(result.getExecutorNode());
|
||||
assertEquals("cannot start datafeed [datafeed_id], job [job_id] status is stale",
|
||||
result.getExplanation());
|
||||
|
||||
tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder);
|
||||
tasks = tasksBuilder.build();
|
||||
cs = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.routingTable(generateRoutingTable(indexMetaData));
|
||||
result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build(), resolver);
|
||||
assertEquals("node_id1", result.getExecutorNode());
|
||||
}
|
||||
|
||||
public void testValidate() {
|
||||
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
|
||||
MlMetadata mlMetadata1 = new MlMetadata.Builder()
|
||||
.putJob(job1, false)
|
||||
public void testValidate_GivenDatafeedIsMissing() {
|
||||
Job job = DatafeedManagerTests.createDatafeedJob().build(new Date());
|
||||
MlMetadata mlMetadata = new MlMetadata.Builder()
|
||||
.putJob(job, false)
|
||||
.build();
|
||||
Exception e = expectThrows(ResourceNotFoundException.class,
|
||||
() -> StartDatafeedAction.validate("some-datafeed", mlMetadata1, null));
|
||||
() -> StartDatafeedAction.validate("some-datafeed", mlMetadata, null));
|
||||
assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists"));
|
||||
}
|
||||
|
||||
public void testValidate_jobClosed() {
|
||||
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
|
||||
MlMetadata mlMetadata1 = new MlMetadata.Builder()
|
||||
.putJob(job1, false)
|
||||
.build();
|
||||
PersistentTasksCustomMetaData tasks = PersistentTasksCustomMetaData.builder().build();
|
||||
DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
|
||||
MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1)
|
||||
.putDatafeed(datafeedConfig1)
|
||||
.build();
|
||||
Exception e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks));
|
||||
assertThat(e.getMessage(), equalTo("cannot start datafeed [foo-datafeed] because job [job_id] is closed"));
|
||||
}
|
||||
|
||||
public void testValidate_jobOpening() {
|
||||
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
|
||||
MlMetadata mlMetadata1 = new MlMetadata.Builder()
|
||||
.putJob(job1, false)
|
||||
|
@ -289,9 +62,24 @@ public class StartDatafeedActionTests extends ESTestCase {
|
|||
MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1)
|
||||
.putDatafeed(datafeedConfig1)
|
||||
.build();
|
||||
Exception e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks));
|
||||
assertThat(e.getMessage(), equalTo("cannot start datafeed [foo-datafeed] because job [job_id] is not open"));
|
||||
|
||||
StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks);
|
||||
}
|
||||
|
||||
public void testValidate_jobOpened() {
|
||||
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
|
||||
MlMetadata mlMetadata1 = new MlMetadata.Builder()
|
||||
.putJob(job1, false)
|
||||
.build();
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask("job_id", INITIAL_ASSIGNMENT.getExecutorNode(), JobState.OPENED, tasksBuilder);
|
||||
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
|
||||
DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
|
||||
MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1)
|
||||
.putDatafeed(datafeedConfig1)
|
||||
.build();
|
||||
|
||||
StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks);
|
||||
}
|
||||
|
||||
public static StartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action,
|
||||
|
@ -302,45 +90,4 @@ public class StartDatafeedActionTests extends ESTestCase {
|
|||
task.datafeedManager = datafeedManager;
|
||||
return task;
|
||||
}
|
||||
|
||||
private RoutingTable generateRoutingTable(IndexMetaData indexMetaData) {
|
||||
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(1);
|
||||
states.add(new Tuple<>(0, ShardRoutingState.STARTED));
|
||||
return generateRoutingTable(indexMetaData, states);
|
||||
}
|
||||
|
||||
private RoutingTable generateRoutingTable(IndexMetaData indexMetaData, List<Tuple<Integer, ShardRoutingState>> states) {
|
||||
IndexRoutingTable.Builder rtBuilder = IndexRoutingTable.builder(indexMetaData.getIndex());
|
||||
|
||||
final String index = indexMetaData.getIndex().getName();
|
||||
int counter = 0;
|
||||
for (Tuple<Integer, ShardRoutingState> state : states) {
|
||||
ShardId shardId = new ShardId(index, "_na_", counter);
|
||||
IndexShardRoutingTable.Builder shardRTBuilder = new IndexShardRoutingTable.Builder(shardId);
|
||||
ShardRouting shardRouting;
|
||||
|
||||
if (state.v2().equals(ShardRoutingState.STARTED)) {
|
||||
shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(),
|
||||
"node_" + Integer.toString(state.v1()), null, true, ShardRoutingState.STARTED);
|
||||
} else if (state.v2().equals(ShardRoutingState.INITIALIZING)) {
|
||||
shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(),
|
||||
"node_" + Integer.toString(state.v1()), null, true, ShardRoutingState.INITIALIZING);
|
||||
} else if (state.v2().equals(ShardRoutingState.RELOCATING)) {
|
||||
shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(),
|
||||
"node_" + Integer.toString(state.v1()), "node_" + Integer.toString((state.v1() + 1) % 3),
|
||||
true, ShardRoutingState.RELOCATING);
|
||||
} else {
|
||||
shardRouting = ShardRouting.newUnassigned(shardId, true,
|
||||
RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE,
|
||||
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
|
||||
}
|
||||
|
||||
shardRTBuilder.addShard(shardRouting);
|
||||
rtBuilder.addIndexShard(shardRTBuilder.build());
|
||||
counter += 1;
|
||||
}
|
||||
|
||||
return new RoutingTable.Builder().add(rtBuilder).build();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -10,8 +10,10 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
@ -55,7 +57,6 @@ import java.io.ByteArrayInputStream;
|
|||
import java.io.InputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.Optional;
|
||||
|
@ -90,6 +91,7 @@ public class DatafeedManagerTests extends ESTestCase {
|
|||
private DatafeedManager datafeedManager;
|
||||
private long currentTime = 120000;
|
||||
private Auditor auditor;
|
||||
private ArgumentCaptor<ClusterStateListener> capturedClusterStateListener = ArgumentCaptor.forClass(ClusterStateListener.class);
|
||||
|
||||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -148,6 +150,7 @@ public class DatafeedManagerTests extends ESTestCase {
|
|||
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
|
||||
|
||||
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
|
||||
|
||||
datafeedManager = new DatafeedManager(threadPool, client, clusterService, jobProvider, () -> currentTime, auditor,
|
||||
persistentTasksService) {
|
||||
@Override
|
||||
|
@ -156,6 +159,8 @@ public class DatafeedManagerTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
|
||||
verify(clusterService).addListener(capturedClusterStateListener.capture());
|
||||
|
||||
doAnswer(invocationOnMock -> {
|
||||
@SuppressWarnings("rawtypes")
|
||||
Consumer consumer = (Consumer) invocationOnMock.getArguments()[3];
|
||||
|
@ -202,8 +207,7 @@ public class DatafeedManagerTests extends ESTestCase {
|
|||
verify(client).execute(same(FlushJobAction.INSTANCE), any());
|
||||
}
|
||||
|
||||
private static PostDataAction.Request createExpectedPostDataRequest(String jobId,
|
||||
byte[] contentBytes, XContentType xContentType) {
|
||||
private static PostDataAction.Request createExpectedPostDataRequest(String jobId, byte[] contentBytes, XContentType xContentType) {
|
||||
DataDescription.Builder expectedDataDescription = new DataDescription.Builder();
|
||||
expectedDataDescription.setTimeFormat("epoch_ms");
|
||||
expectedDataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
|
||||
|
@ -354,17 +358,129 @@ public class DatafeedManagerTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testDatafeedTaskWaitsUntilJobIsOpened() throws Exception {
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
|
||||
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
|
||||
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, clusterService.state().getMetaData().custom(MlMetadata.TYPE))
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
|
||||
when(clusterService.state()).thenReturn(cs.build());
|
||||
|
||||
DataExtractor dataExtractor = mock(DataExtractor.class);
|
||||
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
|
||||
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
|
||||
when(dataExtractor.next()).thenReturn(Optional.empty());
|
||||
Consumer<Exception> handler = mockConsumer();
|
||||
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
|
||||
datafeedManager.run(task, handler);
|
||||
|
||||
// Verify datafeed has not started running yet as job is still opening
|
||||
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
|
||||
|
||||
tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
|
||||
addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder);
|
||||
ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state())
|
||||
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, clusterService.state().getMetaData().custom(MlMetadata.TYPE))
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
|
||||
|
||||
capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build()));
|
||||
|
||||
// Still no run
|
||||
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
|
||||
|
||||
tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
|
||||
ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state())
|
||||
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, clusterService.state().getMetaData().custom(MlMetadata.TYPE))
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
|
||||
|
||||
capturedClusterStateListener.getValue().clusterChanged(
|
||||
new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build()));
|
||||
|
||||
// Now it should run as the job state chanded to OPENED
|
||||
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
|
||||
}
|
||||
|
||||
public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() throws Exception {
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
|
||||
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
|
||||
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, clusterService.state().getMetaData().custom(MlMetadata.TYPE))
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
|
||||
when(clusterService.state()).thenReturn(cs.build());
|
||||
|
||||
DataExtractor dataExtractor = mock(DataExtractor.class);
|
||||
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
|
||||
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
|
||||
when(dataExtractor.next()).thenReturn(Optional.empty());
|
||||
Consumer<Exception> handler = mockConsumer();
|
||||
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
|
||||
datafeedManager.run(task, handler);
|
||||
|
||||
// Verify datafeed has not started running yet as job is still opening
|
||||
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
|
||||
|
||||
tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask("job_id", "node_id", JobState.FAILED, tasksBuilder);
|
||||
ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state())
|
||||
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, clusterService.state().getMetaData().custom(MlMetadata.TYPE))
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
|
||||
|
||||
capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", updatedCs.build(), cs.build()));
|
||||
|
||||
// Verify task never run and got stopped
|
||||
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
|
||||
verify(task).stop("job_never_opened", TimeValue.timeValueSeconds(20));
|
||||
}
|
||||
|
||||
public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() throws Exception {
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
|
||||
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
|
||||
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, clusterService.state().getMetaData().custom(MlMetadata.TYPE))
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
|
||||
when(clusterService.state()).thenReturn(cs.build());
|
||||
|
||||
DataExtractor dataExtractor = mock(DataExtractor.class);
|
||||
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
|
||||
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
|
||||
when(dataExtractor.next()).thenReturn(Optional.empty());
|
||||
Consumer<Exception> handler = mockConsumer();
|
||||
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
|
||||
datafeedManager.run(task, handler);
|
||||
|
||||
// Verify datafeed has not started running yet as job is still opening
|
||||
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
|
||||
|
||||
// Stop the datafeed
|
||||
datafeedManager.stopDatafeed(task, "test", StopDatafeedAction.DEFAULT_TIMEOUT);
|
||||
|
||||
// Update job state to opened
|
||||
tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
|
||||
ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state())
|
||||
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, clusterService.state().getMetaData().custom(MlMetadata.TYPE))
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
|
||||
|
||||
capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", cs.build(), updatedCs.build()));
|
||||
|
||||
// Verify no datafeed was run
|
||||
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
|
||||
}
|
||||
|
||||
public static DatafeedConfig.Builder createDatafeedConfig(String datafeedId, String jobId) {
|
||||
DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, jobId);
|
||||
datafeedConfig.setIndices(Arrays.asList("myIndex"));
|
||||
datafeedConfig.setTypes(Arrays.asList("myType"));
|
||||
datafeedConfig.setIndices(Collections.singletonList("myIndex"));
|
||||
datafeedConfig.setTypes(Collections.singletonList("myType"));
|
||||
return datafeedConfig;
|
||||
}
|
||||
|
||||
public static Job.Builder createDatafeedJob() {
|
||||
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build()));
|
||||
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Collections.singletonList(
|
||||
new Detector.Builder("metric", "field").build()));
|
||||
acBuilder.setBucketSpan(TimeValue.timeValueHours(1));
|
||||
acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build()));
|
||||
acBuilder.setDetectors(Collections.singletonList(new Detector.Builder("metric", "field").build()));
|
||||
|
||||
Job.Builder builder = new Job.Builder("job_id");
|
||||
builder.setAnalysisConfig(acBuilder);
|
||||
|
|
|
@ -0,0 +1,342 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.datafeed;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask;
|
||||
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
|
||||
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class DatafeedNodeSelectorTests extends ESTestCase {
|
||||
|
||||
private IndexNameExpressionResolver resolver;
|
||||
private DiscoveryNodes nodes;
|
||||
private ClusterState clusterState;
|
||||
private MlMetadata mlMetadata;
|
||||
private PersistentTasksCustomMetaData tasks;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
resolver = new IndexNameExpressionResolver(Settings.EMPTY);
|
||||
nodes = DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
|
||||
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
|
||||
.build();
|
||||
}
|
||||
|
||||
public void testSelectNode_GivenJobIsOpened() throws Exception {
|
||||
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadataBuilder.putJob(job, false);
|
||||
mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")));
|
||||
mlMetadata = mlMetadataBuilder.build();
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
|
||||
tasks = tasksBuilder.build();
|
||||
|
||||
givenClusterState("foo", 1, 0);
|
||||
|
||||
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode();
|
||||
assertEquals("node_id", result.getExecutorNode());
|
||||
new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated();
|
||||
}
|
||||
|
||||
public void testSelectNode_GivenJobIsOpening() throws Exception {
|
||||
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadataBuilder.putJob(job, false);
|
||||
mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")));
|
||||
mlMetadata = mlMetadataBuilder.build();
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), "node_id", null, tasksBuilder);
|
||||
tasks = tasksBuilder.build();
|
||||
|
||||
givenClusterState("foo", 1, 0);
|
||||
|
||||
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode();
|
||||
assertEquals("node_id", result.getExecutorNode());
|
||||
new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated();
|
||||
}
|
||||
|
||||
public void testNoJobTask() throws Exception {
|
||||
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadataBuilder.putJob(job, false);
|
||||
|
||||
// Using wildcard index name to test for index resolving as well
|
||||
mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")));
|
||||
mlMetadata = mlMetadataBuilder.build();
|
||||
|
||||
tasks = PersistentTasksCustomMetaData.builder().build();
|
||||
|
||||
givenClusterState("foo", 1, 0);
|
||||
|
||||
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode();
|
||||
assertNull(result.getExecutorNode());
|
||||
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id], because job's [job_id] state is " +
|
||||
"[closed] while state [opened] is required"));
|
||||
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
||||
() -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated());
|
||||
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
|
||||
+ "[cannot start datafeed [datafeed_id], because job's [job_id] state is [closed] while state [opened] is required]"));
|
||||
}
|
||||
|
||||
public void testSelectNode_GivenJobFailedOrClosed() throws Exception {
|
||||
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadataBuilder.putJob(job, false);
|
||||
mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")));
|
||||
mlMetadata = mlMetadataBuilder.build();
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED);
|
||||
addJobTask(job.getId(), "node_id", jobState, tasksBuilder);
|
||||
tasks = tasksBuilder.build();
|
||||
|
||||
givenClusterState("foo", 1, 0);
|
||||
|
||||
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode();
|
||||
assertNull(result.getExecutorNode());
|
||||
assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState +
|
||||
"] while state [opened] is required", result.getExplanation());
|
||||
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
||||
() -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated());
|
||||
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
|
||||
+ "[cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState
|
||||
+ "] while state [opened] is required]"));
|
||||
}
|
||||
|
||||
public void testShardUnassigned() throws Exception {
|
||||
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadataBuilder.putJob(job, false);
|
||||
|
||||
// Using wildcard index name to test for index resolving as well
|
||||
mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")));
|
||||
mlMetadata = mlMetadataBuilder.build();
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
|
||||
tasks = tasksBuilder.build();
|
||||
|
||||
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2);
|
||||
states.add(new Tuple<>(0, ShardRoutingState.UNASSIGNED));
|
||||
|
||||
givenClusterState("foo", 1, 0, states);
|
||||
|
||||
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode();
|
||||
assertNull(result.getExecutorNode());
|
||||
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " +
|
||||
"does not have all primary shards active yet."));
|
||||
|
||||
new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated();
|
||||
}
|
||||
|
||||
public void testShardNotAllActive() throws Exception {
|
||||
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadataBuilder.putJob(job, false);
|
||||
|
||||
// Using wildcard index name to test for index resolving as well
|
||||
mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")));
|
||||
mlMetadata = mlMetadataBuilder.build();
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
|
||||
tasks = tasksBuilder.build();
|
||||
|
||||
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2);
|
||||
states.add(new Tuple<>(0, ShardRoutingState.STARTED));
|
||||
states.add(new Tuple<>(1, ShardRoutingState.INITIALIZING));
|
||||
|
||||
givenClusterState("foo", 2, 0, states);
|
||||
|
||||
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode();
|
||||
assertNull(result.getExecutorNode());
|
||||
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " +
|
||||
"does not have all primary shards active yet."));
|
||||
|
||||
new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated();
|
||||
}
|
||||
|
||||
public void testIndexDoesntExist() throws Exception {
|
||||
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadataBuilder.putJob(job, false);
|
||||
mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")));
|
||||
mlMetadata = mlMetadataBuilder.build();
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
|
||||
tasks = tasksBuilder.build();
|
||||
|
||||
givenClusterState("foo", 1, 0);
|
||||
|
||||
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode();
|
||||
assertNull(result.getExecutorNode());
|
||||
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " +
|
||||
"does not exist, is closed, or is still initializing."));
|
||||
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
||||
() -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated());
|
||||
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
|
||||
+ "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]"));
|
||||
}
|
||||
|
||||
public void testSelectNode_jobTaskStale() {
|
||||
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadataBuilder.putJob(job, false);
|
||||
mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")));
|
||||
mlMetadata = mlMetadataBuilder.build();
|
||||
|
||||
String nodeId = randomBoolean() ? "node_id2" : null;
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), nodeId, JobState.OPENED, tasksBuilder);
|
||||
// Set to lower allocationId, so job task is stale:
|
||||
tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId(job.getId()), new JobTaskStatus(JobState.OPENED, 0));
|
||||
tasks = tasksBuilder.build();
|
||||
|
||||
givenClusterState("foo", 1, 0);
|
||||
|
||||
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode();
|
||||
assertNull(result.getExecutorNode());
|
||||
assertEquals("cannot start datafeed [datafeed_id], job [job_id] status is stale",
|
||||
result.getExplanation());
|
||||
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
||||
() -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated());
|
||||
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
|
||||
+ "[cannot start datafeed [datafeed_id], job [job_id] status is stale]"));
|
||||
|
||||
tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder);
|
||||
tasks = tasksBuilder.build();
|
||||
givenClusterState("foo", 1, 0);
|
||||
result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode();
|
||||
assertEquals("node_id1", result.getExecutorNode());
|
||||
new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated();
|
||||
}
|
||||
|
||||
public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() throws Exception {
|
||||
// Here we test that when there are 2 problems, the most critical gets reported first.
|
||||
// In this case job is Opening (non-critical) and the index does not exist (critical)
|
||||
|
||||
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder();
|
||||
Job job = createScheduledJob("job_id").build(new Date());
|
||||
mlMetadataBuilder.putJob(job, false);
|
||||
mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")));
|
||||
mlMetadata = mlMetadataBuilder.build();
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), "node_id", JobState.OPENING, tasksBuilder);
|
||||
tasks = tasksBuilder.build();
|
||||
|
||||
givenClusterState("foo", 1, 0);
|
||||
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
||||
() -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated());
|
||||
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
|
||||
+ "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]"));
|
||||
}
|
||||
|
||||
private void givenClusterState(String index, int numberOfShards, int numberOfReplicas) {
|
||||
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(1);
|
||||
states.add(new Tuple<>(0, ShardRoutingState.STARTED));
|
||||
givenClusterState(index, numberOfShards, numberOfReplicas, states);
|
||||
}
|
||||
|
||||
private void givenClusterState(String index, int numberOfShards, int numberOfReplicas, List<Tuple<Integer, ShardRoutingState>> states) {
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder(index)
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(numberOfShards)
|
||||
.numberOfReplicas(numberOfReplicas)
|
||||
.build();
|
||||
|
||||
clusterState = ClusterState.builder(new ClusterName("cluster_name"))
|
||||
.metaData(new MetaData.Builder()
|
||||
.putCustom(MlMetadata.TYPE, mlMetadata)
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
|
||||
.put(indexMetaData, false))
|
||||
.nodes(nodes)
|
||||
.routingTable(generateRoutingTable(indexMetaData, states))
|
||||
.build();
|
||||
}
|
||||
|
||||
private static RoutingTable generateRoutingTable(IndexMetaData indexMetaData, List<Tuple<Integer, ShardRoutingState>> states) {
|
||||
IndexRoutingTable.Builder rtBuilder = IndexRoutingTable.builder(indexMetaData.getIndex());
|
||||
|
||||
final String index = indexMetaData.getIndex().getName();
|
||||
int counter = 0;
|
||||
for (Tuple<Integer, ShardRoutingState> state : states) {
|
||||
ShardId shardId = new ShardId(index, "_na_", counter);
|
||||
IndexShardRoutingTable.Builder shardRTBuilder = new IndexShardRoutingTable.Builder(shardId);
|
||||
ShardRouting shardRouting;
|
||||
|
||||
if (state.v2().equals(ShardRoutingState.STARTED)) {
|
||||
shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(),
|
||||
"node_" + Integer.toString(state.v1()), null, true, ShardRoutingState.STARTED);
|
||||
} else if (state.v2().equals(ShardRoutingState.INITIALIZING)) {
|
||||
shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(),
|
||||
"node_" + Integer.toString(state.v1()), null, true, ShardRoutingState.INITIALIZING);
|
||||
} else if (state.v2().equals(ShardRoutingState.RELOCATING)) {
|
||||
shardRouting = TestShardRouting.newShardRouting(index, shardId.getId(),
|
||||
"node_" + Integer.toString(state.v1()), "node_" + Integer.toString((state.v1() + 1) % 3),
|
||||
true, ShardRoutingState.RELOCATING);
|
||||
} else {
|
||||
shardRouting = ShardRouting.newUnassigned(shardId, true,
|
||||
RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE,
|
||||
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
|
||||
}
|
||||
|
||||
shardRTBuilder.addShard(shardRouting);
|
||||
rtBuilder.addIndexShard(shardRTBuilder.build());
|
||||
counter += 1;
|
||||
}
|
||||
|
||||
return new RoutingTable.Builder().add(rtBuilder).build();
|
||||
}
|
||||
}
|
|
@ -110,7 +110,7 @@ setup:
|
|||
datafeed_id: "start-stop-datafeed-datafeed-1"
|
||||
start: 0
|
||||
- do:
|
||||
catch: /cannot start datafeed \[start-stop-datafeed-datafeed-1\] because job \[start-stop-datafeed-job\] is not open/
|
||||
catch: /cannot start datafeed \[start-stop-datafeed-datafeed-1\] because job \[start-stop-datafeed-job\] is closed/
|
||||
xpack.ml.start_datafeed:
|
||||
datafeed_id: "start-stop-datafeed-datafeed-1"
|
||||
start: 0
|
||||
|
|
Loading…
Reference in New Issue