[ml] Datafeed task improvements

When retrying datafeed task only start datafeed when job is in started state.
Run datafeed task on the node where the job task is running.
Let start datafeed api waits until datafeed state has been set to started.

Original commit: elastic/x-pack-elasticsearch@ebf1d3b9aa
This commit is contained in:
Martijn van Groningen 2017-02-10 14:11:21 +01:00
parent dd33fae50f
commit 299f9ab74b
14 changed files with 481 additions and 111 deletions

View File

@ -300,7 +300,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
throw new ElasticsearchStatusException("Cannot open job [" + jobId + "] because it has been marked as deleted",
RestStatus.CONFLICT);
}
PersistentTaskInProgress<?> task = MlMetadata.getTask(jobId, tasks);
PersistentTaskInProgress<?> task = MlMetadata.getJobTask(jobId, tasks);
JobState jobState = MlMetadata.getJobState(jobId, tasks);
if (task != null && task.getExecutorNode() != null && jobState == JobState.OPENED) {
if (nodes.nodeExists(task.getExecutorNode()) == false) {

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
@ -14,11 +15,15 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -36,6 +41,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
@ -43,17 +49,18 @@ import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.PersistentActionService;
import org.elasticsearch.xpack.persistent.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.TransportPersistentAction;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Predicate;
public class StartDatafeedAction
extends Action<StartDatafeedAction.Request, PersistentActionResponse, StartDatafeedAction.RequestBuilder> {
public static final ParseField START_TIME = new ParseField("start");
public static final ParseField END_TIME = new ParseField("end");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final StartDatafeedAction INSTANCE = new StartDatafeedAction();
public static final String NAME = "cluster:admin/ml/datafeeds/start";
@ -80,6 +87,8 @@ public class StartDatafeedAction
PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID);
PARSER.declareLong((request, startTime) -> request.startTime = startTime, START_TIME);
PARSER.declareLong(Request::setEndTime, END_TIME);
PARSER.declareString((request, val) ->
request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
}
public static Request parseRequest(String datafeedId, XContentParser parser) {
@ -93,6 +102,7 @@ public class StartDatafeedAction
private String datafeedId;
private long startTime;
private Long endTime;
private TimeValue timeout = TimeValue.timeValueSeconds(20);
public Request(String datafeedId, long startTime) {
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
@ -122,6 +132,14 @@ public class StartDatafeedAction
this.endTime = endTime;
}
public TimeValue getTimeout() {
return timeout;
}
public void setTimeout(TimeValue timeout) {
this.timeout = timeout;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -138,6 +156,7 @@ public class StartDatafeedAction
datafeedId = in.readString();
startTime = in.readVLong();
endTime = in.readOptionalLong();
timeout = TimeValue.timeValueMillis(in.readVLong());
}
@Override
@ -146,6 +165,7 @@ public class StartDatafeedAction
out.writeString(datafeedId);
out.writeVLong(startTime);
out.writeOptionalLong(endTime);
out.writeVLong(timeout.millis());
}
@Override
@ -161,13 +181,14 @@ public class StartDatafeedAction
if (endTime != null) {
builder.field(END_TIME.getPreferredName(), endTime);
}
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(datafeedId, startTime, endTime);
return Objects.hash(datafeedId, startTime, endTime, timeout);
}
@Override
@ -181,7 +202,8 @@ public class StartDatafeedAction
Request other = (Request) obj;
return Objects.equals(datafeedId, other.datafeedId) &&
Objects.equals(startTime, other.startTime) &&
Objects.equals(endTime, other.endTime);
Objects.equals(endTime, other.endTime) &&
Objects.equals(timeout, other.timeout);
}
}
@ -220,41 +242,59 @@ public class StartDatafeedAction
public static class TransportAction extends TransportPersistentAction<Request> {
private final DatafeedStateObserver observer;
private final ClusterService clusterService;
private final DatafeedJobRunner datafeedJobRunner;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
PersistentActionService persistentActionService, PersistentActionRegistry persistentActionRegistry,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
DatafeedJobRunner datafeedJobRunner) {
ClusterService clusterService, DatafeedJobRunner datafeedJobRunner) {
super(settings, NAME, false, threadPool, transportService, persistentActionService, persistentActionRegistry,
actionFilters, indexNameExpressionResolver, Request::new, ThreadPool.Names.MANAGEMENT);
this.clusterService = clusterService;
this.datafeedJobRunner = datafeedJobRunner;
this.observer = new DatafeedStateObserver(threadPool, clusterService);
}
@Override
protected void doExecute(Request request, ActionListener<PersistentActionResponse> listener) {
ClusterState state = clusterService.state();
StartDatafeedAction.validate(request.datafeedId, state.metaData().custom(MlMetadata.TYPE),
state.custom(PersistentTasksInProgress.TYPE), true);
ActionListener<PersistentActionResponse> finalListener =
ActionListener.wrap(response -> waitForDatafeedStarted(request, response, listener), listener::onFailure);
super.doExecute(request, finalListener);
}
void waitForDatafeedStarted(Request request,
PersistentActionResponse response,
ActionListener<PersistentActionResponse> listener) {
observer.waitForState(request.getDatafeedId(), request.timeout, DatafeedState.STARTED, e -> {
if (e != null) {
listener.onFailure(e);
} else {
listener.onResponse(response);
}
});
}
@Override
public DiscoveryNode executorNode(Request request, ClusterState clusterState) {
return selectNode(logger, request, clusterState);
}
@Override
public void validate(Request request, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksInProgress tasks = clusterState.custom(PersistentTasksInProgress.TYPE);
StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks);
PersistentTasksInProgress persistentTasksInProgress = clusterState.custom(PersistentTasksInProgress.TYPE);
if (persistentTasksInProgress == null) {
return;
}
Predicate<PersistentTasksInProgress.PersistentTaskInProgress<?>> predicate = taskInProgress -> {
Request storedRequest = (Request) taskInProgress.getRequest();
return storedRequest.getDatafeedId().equals(request.getDatafeedId());
};
if (persistentTasksInProgress.tasksExist(NAME, predicate)) {
throw new ElasticsearchStatusException("datafeed already started, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedState.STOPPED, DatafeedState.STARTED);
}
StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks, false);
}
@Override
protected void nodeOperation(PersistentTask task, Request request, ActionListener<TransportResponse.Empty> listener) {
DatafeedTask datafeedTask = (DatafeedTask) task;
protected void nodeOperation(PersistentTask persistentTask, Request request, ActionListener<TransportResponse.Empty> listener) {
DatafeedTask datafeedTask = (DatafeedTask) persistentTask;
datafeedJobRunner.run(request.getDatafeedId(), request.getStartTime(), request.getEndTime(),
datafeedTask,
(error) -> {
@ -268,7 +308,8 @@ public class StartDatafeedAction
}
public static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks) {
static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks,
boolean checkJobState) {
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(datafeedId);
@ -277,11 +318,46 @@ public class StartDatafeedAction
if (job == null) {
throw ExceptionsHelper.missingJobException(datafeed.getJobId());
}
JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks);
if (jobState != JobState.OPENED) {
throw new ElasticsearchStatusException("cannot start datafeed, expected job state [{}], but got [{}]",
RestStatus.CONFLICT, JobState.OPENED, jobState);
if (tasks == null) {
return;
}
// we only check job state when the user submits the start datafeed request,
// but when we persistent task framework validates the request we don't,
// because we don't know if the job task will be started before datafeed task,
// so we just wait until the job task is opened, which will happen at some point in time.
if (checkJobState) {
JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks);
if (jobState != JobState.OPENED) {
throw new ElasticsearchStatusException("cannot start datafeed, expected job state [{}], but got [{}]",
RestStatus.CONFLICT, JobState.OPENED, jobState);
}
}
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks);
if (datafeedState == DatafeedState.STARTED) {
throw new ElasticsearchStatusException("datafeed already started, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedState.STOPPED, DatafeedState.STARTED);
}
DatafeedJobValidator.validate(datafeed, job);
}
static DiscoveryNode selectNode(Logger logger, Request request, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksInProgress tasks = clusterState.custom(PersistentTasksInProgress.TYPE);
DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
DiscoveryNodes nodes = clusterState.getNodes();
JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks);
if (jobState == JobState.OPENED) {
PersistentTaskInProgress task = MlMetadata.getJobTask(datafeed.getJobId(), tasks);
return nodes.get(task.getExecutorNode());
} else {
// lets try again later when the job has been opened:
logger.debug("cannot start datafeeder, because job's [{}] state is [{}] while state [{}] is required",
datafeed.getJobId(), jobState, JobState.OPENED);
return null;
}
}
}

View File

@ -147,19 +147,15 @@ public class StopDatafeedAction
validate(datafeedId, mlMetadata);
PersistentTasksInProgress tasks = state.custom(PersistentTasksInProgress.TYPE);
if (tasks != null) {
for (PersistentTaskInProgress<?> task : tasks.findTasks(StartDatafeedAction.NAME, p -> true)) {
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) task.getRequest();
if (storedRequest.getDatafeedId().equals(datafeedId)) {
RemovePersistentTaskAction.Request removeTaskRequest = new RemovePersistentTaskAction.Request();
removeTaskRequest.setTaskId(task.getId());
removePersistentTaskAction.execute(removeTaskRequest, listener);
return;
}
}
PersistentTaskInProgress<?> task = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks);
if (task != null) {
RemovePersistentTaskAction.Request removeTaskRequest = new RemovePersistentTaskAction.Request();
removeTaskRequest.setTaskId(task.getId());
removePersistentTaskAction.execute(removeTaskRequest, listener);
} else {
listener.onFailure(new ElasticsearchStatusException("datafeed already stopped, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedState.STARTED, DatafeedState.STOPPED));
}
listener.onFailure(new ElasticsearchStatusException("datafeed already stopped, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedState.STARTED, DatafeedState.STOPPED));
}
@Override

View File

@ -6,11 +6,13 @@
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.threadpool.ThreadPool;
@ -30,6 +32,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
import java.time.Duration;
import java.util.Collections;
@ -75,7 +78,11 @@ public class DatafeedJobRunner extends AbstractComponent {
latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime();
}
Holder holder = createJobDatafeed(datafeed, job, latestFinalBucketEndMs, latestRecordTimeMs, handler, task);
innerRun(holder, startTime, endTime);
UpdatePersistentTaskStatusAction.Request updateDatafeedStatus =
new UpdatePersistentTaskStatusAction.Request(task.getPersistentTaskId(), DatafeedState.STARTED);
client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateDatafeedStatus, ActionListener.wrap(r -> {
innerRun(holder, startTime, endTime);
}, handler));
}, handler);
}
@ -235,13 +242,29 @@ public class DatafeedJobRunner extends AbstractComponent {
public void stop(String source, Exception e) {
logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId());
if (datafeedJob.stop()) {
FutureUtils.cancel(future);
handler.accept(e);
logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId());
} else {
logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId());
}
// We need to fork, because:
// 1) We are being called from cluster state update thread and we should return as soon as possible
// 2) We also index into the notifaction index and that is forbidden from the cluster state update thread:
// (Caused by: java.lang.AssertionError: should not be called by a cluster state applier. reason [the applied
// cluster state is not yet available])
threadPool.executor(ThreadPool.Names.GENERIC).submit(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("failed to stop [{}] datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId());
handler.accept(e);
}
@Override
protected void doRun() throws Exception {
if (datafeedJob.stop()) {
FutureUtils.cancel(future);
handler.accept(e);
logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId());
} else {
logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId());
}
}
});
}
}

View File

@ -337,7 +337,9 @@ public class MlMetadata implements MetaData.Custom {
}
public static PersistentTasksInProgress.PersistentTaskInProgress<?> getTask(String jobId, @Nullable PersistentTasksInProgress tasks) {
@Nullable
public static PersistentTasksInProgress.PersistentTaskInProgress<?> getJobTask(String jobId,
@Nullable PersistentTasksInProgress tasks) {
if (tasks != null) {
Predicate<PersistentTasksInProgress.PersistentTaskInProgress<?>> p = t -> {
OpenJobAction.Request storedRequest = (OpenJobAction.Request) t.getRequest();
@ -350,8 +352,23 @@ public class MlMetadata implements MetaData.Custom {
return null;
}
@Nullable
public static PersistentTasksInProgress.PersistentTaskInProgress<?> getDatafeedTask(String datafeedId,
@Nullable PersistentTasksInProgress tasks) {
if (tasks != null) {
Predicate<PersistentTasksInProgress.PersistentTaskInProgress<?>> p = t -> {
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest();
return storedRequest.getDatafeedId().equals(datafeedId);
};
for (PersistentTasksInProgress.PersistentTaskInProgress<?> task : tasks.findTasks(StartDatafeedAction.NAME, p)) {
return task;
}
}
return null;
}
public static JobState getJobState(String jobId, @Nullable PersistentTasksInProgress tasks) {
PersistentTasksInProgress.PersistentTaskInProgress<?> task = getTask(jobId, tasks);
PersistentTasksInProgress.PersistentTaskInProgress<?> task = getJobTask(jobId, tasks);
if (task != null && task.getStatus() != null) {
return (JobState) task.getStatus();
} else {
@ -360,4 +377,15 @@ public class MlMetadata implements MetaData.Custom {
}
}
public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksInProgress tasks) {
PersistentTasksInProgress.PersistentTaskInProgress<?> task = getDatafeedTask(datafeedId, tasks);
if (task != null && task.getStatus() != null) {
return (DatafeedState) task.getStatus();
} else {
// If we haven't started a datafeed then there will be no persistent task,
// which is the same as if the datafeed was't started
return DatafeedState.STOPPED;
}
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.rest.datafeeds;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.DateFieldMapper;
@ -53,6 +54,10 @@ public class RestStartDatafeedAction extends BaseRestHandler {
}
jobDatafeedRequest = new StartDatafeedAction.Request(datafeedId, startTimeMillis);
jobDatafeedRequest.setEndTime(endTimeMillis);
if (restRequest.hasParam("timeout")) {
TimeValue openTimeout = restRequest.paramAsTime("timeout", TimeValue.timeValueSeconds(20));
jobDatafeedRequest.setTimeout(openTimeout);
}
}
return channel -> {
client.execute(StartDatafeedAction.INSTANCE, jobDatafeedRequest,

View File

@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.utils;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import java.util.function.Consumer;
import java.util.function.Predicate;
public class DatafeedStateObserver {
private static final Logger LOGGER = Loggers.getLogger(DatafeedStateObserver.class);
private final ThreadPool threadPool;
private final ClusterService clusterService;
public DatafeedStateObserver(ThreadPool threadPool, ClusterService clusterService) {
this.threadPool = threadPool;
this.clusterService = clusterService;
}
public void waitForState(String datafeedId, TimeValue waitTimeout, DatafeedState expectedState, Consumer<Exception> handler) {
ClusterStateObserver observer =
new ClusterStateObserver(clusterService, LOGGER, threadPool.getThreadContext());
Predicate<ClusterState> predicate = (newState) -> {
PersistentTasksInProgress tasks = newState.custom(PersistentTasksInProgress.TYPE);
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks);
return datafeedState == expectedState;
};
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
handler.accept(null);
}
@Override
public void onClusterServiceClose() {
Exception e = new IllegalArgumentException("Cluster service closed while waiting for datafeed state to change to ["
+ expectedState + "]");
handler.accept(new IllegalStateException(e));
}
@Override
public void onTimeout(TimeValue timeout) {
if (predicate.test(clusterService.state())) {
handler.accept(null);
} else {
Exception e = new IllegalArgumentException("Timeout expired while waiting for datafeed state to change to ["
+ expectedState + "]");
handler.accept(e);
}
}
}, predicate, waitTimeout);
}
}

View File

@ -17,9 +17,6 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
@ -32,7 +29,6 @@ import org.junit.Before;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
@ -176,31 +172,6 @@ public class DatafeedJobsIT extends BaseMlIntegTestCase {
logger.info("Indexed [{}] documents", numDocs);
}
private Job.Builder createScheduledJob(String jobId) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.JSON);
dataDescription.setTimeFormat("yyyy-MM-dd HH:mm:ss");
Detector.Builder d = new Detector.Builder("count", null);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
Job.Builder builder = new Job.Builder();
builder.setId(jobId);
builder.setAnalysisConfig(analysisConfig);
builder.setDataDescription(dataDescription);
return builder;
}
private DatafeedConfig createDatafeed(String datafeedId, String jobId, List<String> indexes) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedId, jobId);
builder.setQueryDelay(1);
builder.setFrequency(2);
builder.setIndexes(indexes);
builder.setTypes(Collections.singletonList("type"));
return builder.build();
}
private DataCounts getDataCounts(String jobId) {
GetResponse getResponse = client().prepareGet(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
DataCounts.TYPE.getPreferredName(), jobId + "-data-counts").get();

View File

@ -5,21 +5,9 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction.Request;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
public class StartDatafeedActionRequestTests extends AbstractStreamableXContentTestCase<StartDatafeedAction.Request> {
@ -42,26 +30,4 @@ public class StartDatafeedActionRequestTests extends AbstractStreamableXContentT
return Request.parseRequest(null, parser);
}
public void testValidate() {
Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build();
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false)
.build();
Exception e = expectThrows(ResourceNotFoundException.class,
() -> StartDatafeedAction.validate("some-datafeed", mlMetadata1,
new PersistentTasksInProgress(0L, Collections.emptyMap())));
assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists"));
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("foo"), null);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task));
DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "foo").build();
MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1)
.putDatafeed(datafeedConfig1)
.build();
e = expectThrows(ElasticsearchStatusException.class,
() -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks));
assertThat(e.getMessage(), equalTo("cannot start datafeed, expected job state [opened], but got [closed]"));
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.net.InetAddress;
import java.util.Collections;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
import static org.hamcrest.Matchers.equalTo;
public class StartDatafeedActionTests extends ESTestCase {
public void testSelectNode() throws Exception {
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
Job job = createScheduledJob("job_id").build();
mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*")));
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request(job.getId()), "node_id");
task = new PersistentTaskInProgress<OpenJobAction.Request>(task, randomFrom(JobState.FAILED, JobState.CLOSED,
JobState.CLOSING, JobState.OPENING));
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()))
.putCustom(PersistentTasksInProgress.TYPE, tasks)
.nodes(nodes);
StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed_id", 0L);
DiscoveryNode node = StartDatafeedAction.selectNode(logger, request, cs.build());
assertNull(node);
task = new PersistentTaskInProgress<OpenJobAction.Request>(task, JobState.OPENED);
tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()))
.putCustom(PersistentTasksInProgress.TYPE, tasks)
.nodes(nodes);
node = StartDatafeedAction.selectNode(logger, request, cs.build());
assertEquals("node_id", node.getId());
}
public void testValidate() {
Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build();
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false)
.build();
Exception e = expectThrows(ResourceNotFoundException.class,
() -> StartDatafeedAction.validate("some-datafeed", mlMetadata1, null, false));
assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists"));
}
public void testValidate_jobClosed() {
Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build();
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false)
.build();
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("foo"), null);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task));
DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "foo").build();
MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1)
.putDatafeed(datafeedConfig1)
.build();
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks, true));
assertThat(e.getMessage(), equalTo("cannot start datafeed, expected job state [opened], but got [closed]"));
}
public void testValidate_dataFeedAlreadyStarted() {
Job job1 = createScheduledJob("job_id").build();
DatafeedConfig datafeedConfig = createDatafeed("datafeed_id", "job_id", Collections.singletonList("*"));
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false)
.putDatafeed(datafeedConfig)
.build();
PersistentTaskInProgress<StartDatafeedAction.Request> task =
new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
"node_id");
task = new PersistentTaskInProgress<>(task, DatafeedState.STARTED);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, false));
assertThat(e.getMessage(), equalTo("datafeed already started, expected datafeed state [stopped], but got [started]"));
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -34,6 +35,8 @@ import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction.Response;
import org.junit.Before;
import java.io.ByteArrayInputStream;
@ -93,6 +96,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
return null;
}).when(executorService).submit(any(Runnable.class));
when(threadPool.executor(MlPlugin.DATAFEED_RUNNER_THREAD_POOL_NAME)).thenReturn(executorService);
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
when(client.execute(same(PostDataAction.INSTANCE), any())).thenReturn(jobDataFuture);
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
@ -110,6 +114,13 @@ public class DatafeedJobRunnerTests extends ESTestCase {
consumer.accept(new ResourceNotFoundException("dummy"));
return null;
}).when(jobProvider).buckets(any(), any(), any(), any());
doAnswer(invocationOnMock -> {
@SuppressWarnings("rawtypes")
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
listener.onResponse(new Response(true));
return null;
}).when(client).execute(same(UpdatePersistentTaskStatusAction.INSTANCE), any(), any());
}
public void testStart_GivenNewlyCreatedJobLoopBack() throws Exception {

View File

@ -5,13 +5,20 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import java.util.Collections;
public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
public void testFailOverBasics() throws Exception {
@ -51,4 +58,66 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
cleanupWorkaround(2);
}
public void testFailOverBasics_withDataFeeder() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(4);
ensureStableCluster(4);
Job.Builder job = createScheduledJob("job_id");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true, job.getId()));
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
DatafeedConfig config = createDatafeed("data_feed_id", job.getId(), Collections.singletonList("*"));
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
PutDatafeedAction.Response putDatadeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).get();
assertTrue(putDatadeedResponse.isAcknowledged());
ensureGreen();
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState());
});
StartDatafeedAction.Request startDataFeedRequest = new StartDatafeedAction.Request(config.getId(), 0L);
client().execute(StartDatafeedAction.INSTANCE, startDataFeedRequest);
assertBusy(() -> {
GetDatafeedsStatsAction.Response statsResponse =
client().execute(GetDatafeedsStatsAction.INSTANCE, new GetDatafeedsStatsAction.Request(config.getId())).actionGet();
assertEquals(1, statsResponse.getResponse().results().size());
assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState());
});
internalCluster().stopRandomDataNode();
ensureStableCluster(3);
ensureGreen();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState());
});
assertBusy(() -> {
GetDatafeedsStatsAction.Response statsResponse =
client().execute(GetDatafeedsStatsAction.INSTANCE, new GetDatafeedsStatsAction.Request(config.getId())).actionGet();
assertEquals(1, statsResponse.getResponse().results().size());
assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState());
});
internalCluster().stopRandomDataNode();
ensureStableCluster(2);
ensureGreen();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState());
});
assertBusy(() -> {
GetDatafeedsStatsAction.Response statsResponse =
client().execute(GetDatafeedsStatsAction.INSTANCE, new GetDatafeedsStatsAction.Request(config.getId())).actionGet();
assertEquals(1, statsResponse.getResponse().results().size());
assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState());
});
cleanupWorkaround(2);
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
@ -24,10 +25,12 @@ import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@ -72,6 +75,31 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase {
return builder;
}
public static Job.Builder createScheduledJob(String jobId) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.JSON);
dataDescription.setTimeFormat("yyyy-MM-dd HH:mm:ss");
Detector.Builder d = new Detector.Builder("count", null);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
Job.Builder builder = new Job.Builder();
builder.setId(jobId);
builder.setAnalysisConfig(analysisConfig);
builder.setDataDescription(dataDescription);
return builder;
}
public static DatafeedConfig createDatafeed(String datafeedId, String jobId, List<String> indexes) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedId, jobId);
builder.setQueryDelay(1);
builder.setFrequency(2);
builder.setIndexes(indexes);
builder.setTypes(Collections.singletonList("type"));
return builder.build();
}
// Due to the fact that ml plugin creates the state, notifications and meta indices automatically
// when the test framework removes all indices then ml plugin adds them back. Causing validation to fail
// we should move to templates instead as that will fix the test problem
@ -129,6 +157,11 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase {
throw new RuntimeException(e);
}
}
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(jobId)).actionGet();
assertEquals(JobState.CLOSED, statsResponse.getResponse().results().get(0).getState());
});
DeleteJobAction.Response response =
client.execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId)).get();
assertTrue(response.isAcknowledged());

View File

@ -21,6 +21,10 @@
"type": "string",
"required": false,
"description": "The end time when the datafeed should stop. When not set, the datafeed continues in real time"
},
"timeout": {
"type": "time",
"description": "Controls the time to wait until a datafeed has started. Default to 20 seconds"
}
}
},