[ML] Split persistent job params from action request and

introduced separate task names to register the persistent tasks executors and params.

Also renamed start and stop datafeed action names to be singular in order to be consistent with open and close action names.

Original commit: elastic/x-pack-elasticsearch@21f7b242cf
This commit is contained in:
Martijn van Groningen 2017-04-13 20:52:48 +02:00
parent 8e2299f994
commit 8f1d11df18
23 changed files with 360 additions and 232 deletions

View File

@ -215,8 +215,10 @@ public class MachineLearning implements ActionPlugin {
PersistentTasksCustomMetaData::readDiffFrom),
// Persistent action requests
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.NAME, StartDatafeedAction.Request::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.NAME, OpenJobAction.Request::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME,
StartDatafeedAction.DatafeedParams::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME,
OpenJobAction.JobParams::new),
// Task statuses
new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME,
@ -235,10 +237,10 @@ public class MachineLearning implements ActionPlugin {
PersistentTasksCustomMetaData::fromXContent),
// Persistent action requests
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(StartDatafeedAction.NAME),
StartDatafeedAction.Request::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.NAME),
OpenJobAction.Request::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(StartDatafeedAction.TASK_NAME),
StartDatafeedAction.DatafeedParams::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.TASK_NAME),
OpenJobAction.JobParams::fromXContent),
// Task statuses
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent),

View File

@ -78,16 +78,16 @@ public class MlAssignmentNotifier extends AbstractComponent implements ClusterSt
if (Objects.equals(currentAssignment, previousAssignment)) {
continue;
}
if (OpenJobAction.NAME.equals(currentTask.getTaskName())) {
String jobId = ((OpenJobAction.Request) currentTask.getParams()).getJobId();
if (OpenJobAction.TASK_NAME.equals(currentTask.getTaskName())) {
String jobId = ((OpenJobAction.JobParams) currentTask.getParams()).getJobId();
if (currentAssignment.getExecutorNode() == null) {
auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]");
} else {
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
auditor.info(jobId, "Opening job on node [" + node.toString() + "]");
}
} else if (StartDatafeedAction.NAME.equals(currentTask.getTaskName())) {
String datafeedId = ((StartDatafeedAction.Request) currentTask.getParams()).getDatafeedId();
} else if (StartDatafeedAction.TASK_NAME.equals(currentTask.getTaskName())) {
String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId();
MlMetadata mlMetadata = event.state().getMetaData().custom(MlMetadata.TYPE);
DatafeedConfig datafeedConfig = mlMetadata.getDatafeed(datafeedId);
if (currentAssignment.getExecutorNode() == null) {

View File

@ -26,12 +26,12 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.license.LicenseUtils;
@ -72,6 +72,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
public static final OpenJobAction INSTANCE = new OpenJobAction();
public static final String NAME = "cluster:admin/xpack/ml/job/open";
public static final String TASK_NAME = "xpack/ml/job";
private OpenJobAction() {
super(NAME);
@ -87,29 +88,110 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
return new Response();
}
public static class Request extends ActionRequest implements PersistentTaskParams {
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
static {
PARSER.declareString(Request::setJobId, Job.ID);
PARSER.declareBoolean(Request::setIgnoreDowntime, IGNORE_DOWNTIME);
PARSER.declareString((request, val) ->
request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
}
public static class Request extends ActionRequest implements ToXContent {
public static Request fromXContent(XContentParser parser) {
return parseRequest(null, parser);
}
public static Request parseRequest(String jobId, XContentParser parser) {
Request request = PARSER.apply(parser, null);
JobParams jobParams = JobParams.PARSER.apply(parser, null);
if (jobId != null) {
request.jobId = jobId;
jobParams.jobId = jobId;
}
return request;
return new Request(jobParams);
}
private JobParams jobParams;
public Request(JobParams jobParams) {
this.jobParams = jobParams;
}
public Request(String jobId) {
this.jobParams = new JobParams(jobId);
}
public Request(StreamInput in) throws IOException {
readFrom(in);
}
Request() {
}
public JobParams getJobParams() {
return jobParams;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobParams = new JobParams(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
jobParams.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
jobParams.toXContent(builder, params);
return builder;
}
@Override
public int hashCode() {
return Objects.hash(jobParams);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
OpenJobAction.Request other = (OpenJobAction.Request) obj;
return Objects.equals(jobParams, other.jobParams);
}
@Override
public String toString() {
return Strings.toString(this);
}
}
public static class JobParams implements PersistentTaskParams {
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(TASK_NAME, JobParams::new);
static {
PARSER.declareString(JobParams::setJobId, Job.ID);
PARSER.declareBoolean(JobParams::setIgnoreDowntime, IGNORE_DOWNTIME);
PARSER.declareString((params, val) ->
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
}
public static JobParams fromXContent(XContentParser parser) {
return parseRequest(null, parser);
}
public static JobParams parseRequest(String jobId, XContentParser parser) {
JobParams params = PARSER.apply(parser, null);
if (jobId != null) {
params.jobId = jobId;
}
return params;
}
private String jobId;
@ -118,15 +200,17 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
// changes here should be reflected there too.
private TimeValue timeout = MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT;
public Request(String jobId) {
JobParams() {
}
public JobParams(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
}
public Request(StreamInput in) throws IOException {
readFrom(in);
}
Request() {
public JobParams(StreamInput in) throws IOException {
jobId = in.readString();
ignoreDowntime = in.readBoolean();
timeout = TimeValue.timeValueMillis(in.readVLong());
}
public String getJobId() {
@ -154,21 +238,12 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
ignoreDowntime = in.readBoolean();
timeout = TimeValue.timeValueMillis(in.readVLong());
public String getWriteableName() {
return TASK_NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeBoolean(ignoreDowntime);
out.writeVLong(timeout.millis());
@ -184,11 +259,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
return builder;
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public int hashCode() {
return Objects.hash(jobId, ignoreDowntime, timeout);
@ -202,7 +272,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
if (obj == null || obj.getClass() != getClass()) {
return false;
}
OpenJobAction.Request other = (OpenJobAction.Request) obj;
OpenJobAction.JobParams other = (OpenJobAction.JobParams) obj;
return Objects.equals(jobId, other.jobId) &&
Objects.equals(ignoreDowntime, other.ignoreDowntime) &&
Objects.equals(timeout, other.timeout);
@ -302,34 +372,35 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
JobParams jobParams = request.getJobParams();
if (licenseState.isMachineLearningAllowed()) {
ActionListener<PersistentTask<Request>> finalListener = new ActionListener<PersistentTask<Request>>() {
ActionListener<PersistentTask<JobParams>> finalListener = new ActionListener<PersistentTask<JobParams>>() {
@Override
public void onResponse(PersistentTask<Request> task) {
waitForJobStarted(task.getId(), request, listener);
public void onResponse(PersistentTask<JobParams> task) {
waitForJobStarted(task.getId(), jobParams, listener);
}
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
e = new ElasticsearchStatusException("Cannot open job [" + request.getJobId() +
e = new ElasticsearchStatusException("Cannot open job [" + jobParams.getJobId() +
"] because it has already been opened", RestStatus.CONFLICT, e);
}
listener.onFailure(e);
}
};
persistentTasksService.startPersistentTask(MlMetadata.jobTaskId(request.jobId), NAME, request, finalListener);
persistentTasksService.startPersistentTask(MlMetadata.jobTaskId(jobParams.jobId), TASK_NAME, jobParams, finalListener);
} else {
listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING));
}
}
void waitForJobStarted(String taskId, Request request, ActionListener<Response> listener) {
void waitForJobStarted(String taskId, JobParams jobParams, ActionListener<Response> listener) {
JobPredicate predicate = new JobPredicate();
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, request.timeout,
new WaitForPersistentTaskStatusListener<Request>() {
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, jobParams.timeout,
new WaitForPersistentTaskStatusListener<JobParams>() {
@Override
public void onResponse(PersistentTask<Request> persistentTask) {
public void onResponse(PersistentTask<JobParams> persistentTask) {
listener.onResponse(new Response(predicate.opened));
}
@ -341,7 +412,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new ElasticsearchException("Opening job ["
+ request.getJobId() + "] timed out after [" + timeout + "]"));
+ jobParams.getJobId() + "] timed out after [" + timeout + "]"));
}
});
}
@ -373,7 +444,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
}
public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecutor<Request> {
public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecutor<JobParams> {
private final AutodetectProcessManager autodetectProcessManager;
private final XPackLicenseState licenseState;
@ -383,7 +454,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
public OpenJobPersistentTasksExecutor(Settings settings, XPackLicenseState licenseState,
ClusterService clusterService, AutodetectProcessManager autodetectProcessManager) {
super(settings, NAME, ThreadPool.Names.MANAGEMENT);
super(settings, TASK_NAME, ThreadPool.Names.MANAGEMENT);
this.licenseState = licenseState;
this.autodetectProcessManager = autodetectProcessManager;
this.maxNumberOfOpenJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings);
@ -393,24 +464,23 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
@Override
public Assignment getAssignment(Request request, ClusterState clusterState) {
return selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, maxNumberOfOpenJobs, logger);
public Assignment getAssignment(JobParams params, ClusterState clusterState) {
return selectLeastLoadedMlNode(params.getJobId(), clusterState, maxConcurrentJobAllocations, maxNumberOfOpenJobs, logger);
}
@Override
public void validate(Request request, ClusterState clusterState) {
public void validate(JobParams params, ClusterState clusterState) {
if (licenseState.isMachineLearningAllowed()) {
// If we already know that we can't find an ml node because all ml nodes are running at capacity or
// simply because there are no ml nodes in the cluster then we fail quickly here:
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
OpenJobAction.validate(request.getJobId(), mlMetadata, tasks);
Assignment assignment = selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations,
OpenJobAction.validate(params.getJobId(), mlMetadata);
Assignment assignment = selectLeastLoadedMlNode(params.getJobId(), clusterState, maxConcurrentJobAllocations,
maxNumberOfOpenJobs, logger);
if (assignment.getExecutorNode() == null) {
String msg = "Could not open job because no suitable nodes were found, allocation explanation ["
+ assignment.getExplanation() + "]";
logger.warn("[{}] {}", request.getJobId(), msg);
logger.warn("[{}] {}", params.getJobId(), msg);
throw new ElasticsearchStatusException(msg, RestStatus.TOO_MANY_REQUESTS);
}
} else {
@ -419,10 +489,10 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
@Override
protected void nodeOperation(AllocatedPersistentTask task, Request request) {
protected void nodeOperation(AllocatedPersistentTask task, JobParams params) {
JobTask jobTask = (JobTask) task;
jobTask.autodetectProcessManager = autodetectProcessManager;
autodetectProcessManager.openJob(request.getJobId(), jobTask, request.isIgnoreDowntime(), e2 -> {
autodetectProcessManager.openJob(params.getJobId(), jobTask, params.isIgnoreDowntime(), e2 -> {
if (e2 == null) {
task.markAsCompleted();
} else {
@ -433,7 +503,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
@Override
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTask<Request> persistentTask) {
PersistentTask<JobParams> persistentTask) {
return new JobTask(persistentTask.getParams().getJobId(), id, type, action, parentTaskId);
}
@ -448,7 +518,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
* Fail fast before trying to update the job state on master node if the job doesn't exist or its state
* is not what it should be.
*/
static void validate(String jobId, MlMetadata mlMetadata, @Nullable PersistentTasksCustomMetaData tasks) {
static void validate(String jobId, MlMetadata mlMetadata) {
Job job = mlMetadata.getJobs().get(jobId);
if (job == null) {
throw ExceptionsHelper.missingJobException(jobId);
@ -485,8 +555,8 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
long numberOfAssignedJobs;
int numberOfAllocatingJobs;
if (persistentTasks != null) {
numberOfAssignedJobs = persistentTasks.getNumberOfTasksOnNode(node.getId(), OpenJobAction.NAME);
numberOfAllocatingJobs = persistentTasks.findTasks(OpenJobAction.NAME, task -> {
numberOfAssignedJobs = persistentTasks.getNumberOfTasksOnNode(node.getId(), OpenJobAction.TASK_NAME);
numberOfAllocatingJobs = persistentTasks.findTasks(OpenJobAction.TASK_NAME, task -> {
if (node.getId().equals(task.getExecutorNode()) == false) {
return false;
}

View File

@ -41,7 +41,6 @@ import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -79,7 +78,8 @@ public class StartDatafeedAction
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final StartDatafeedAction INSTANCE = new StartDatafeedAction();
public static final String NAME = "cluster:admin/xpack/ml/datafeeds/start";
public static final String NAME = "cluster:admin/xpack/ml/datafeed/start";
public static final String TASK_NAME = "xpack/ml/datafeed";
private StartDatafeedAction() {
super(NAME);
@ -95,17 +95,103 @@ public class StartDatafeedAction
return new Response();
}
public static class Request extends ActionRequest implements PersistentTaskParams, ToXContent {
public static class Request extends ActionRequest implements ToXContent {
public static ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
public static Request fromXContent(XContentParser parser) {
return parseRequest(null, parser);
}
public static Request parseRequest(String datafeedId, XContentParser parser) {
DatafeedParams params = DatafeedParams.PARSER.apply(parser, null);
if (datafeedId != null) {
params.datafeedId = datafeedId;
}
return new Request(params);
}
private DatafeedParams params;
public Request(String datafeedId, long startTime) {
this.params = new DatafeedParams(datafeedId, startTime);
}
public Request(String datafeedId, String startTime) {
this.params = new DatafeedParams(datafeedId, startTime);
}
public Request(DatafeedParams params) {
this.params = params;
}
public Request(StreamInput in) throws IOException {
readFrom(in);
}
Request() {
}
public DatafeedParams getParams() {
return params;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException e = null;
if (params.endTime != null && params.endTime <= params.startTime) {
e = ValidateActions.addValidationError(START_TIME.getPreferredName() + " ["
+ params.startTime + "] must be earlier than " + END_TIME.getPreferredName()
+ " [" + params.endTime + "]", e);
}
return e;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
params = new DatafeedParams(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
params.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
this.params.toXContent(builder, params);
return builder;
}
@Override
public int hashCode() {
return Objects.hash(params);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(params, other.params);
}
}
public static class DatafeedParams implements PersistentTaskParams {
public static ObjectParser<DatafeedParams, Void> PARSER = new ObjectParser<>(TASK_NAME, DatafeedParams::new);
static {
PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID);
PARSER.declareString((request, startTime) -> request.startTime = parseDateOrThrow(
PARSER.declareString((params, datafeedId) -> params.datafeedId = datafeedId, DatafeedConfig.ID);
PARSER.declareString((params, startTime) -> params.startTime = parseDateOrThrow(
startTime, START_TIME, System::currentTimeMillis), START_TIME);
PARSER.declareString(Request::setEndTime, END_TIME);
PARSER.declareString((request, val) ->
request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareString(DatafeedParams::setEndTime, END_TIME);
PARSER.declareString((params, val) ->
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
}
static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) {
@ -119,16 +205,35 @@ public class StartDatafeedAction
}
}
public static Request fromXContent(XContentParser parser) {
public static DatafeedParams fromXContent(XContentParser parser) {
return parseRequest(null, parser);
}
public static Request parseRequest(String datafeedId, XContentParser parser) {
Request request = PARSER.apply(parser, null);
public static DatafeedParams parseRequest(String datafeedId, XContentParser parser) {
DatafeedParams params = PARSER.apply(parser, null);
if (datafeedId != null) {
request.datafeedId = datafeedId;
params.datafeedId = datafeedId;
}
return request;
return params;
}
public DatafeedParams(String datafeedId, long startTime) {
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
this.startTime = startTime;
}
public DatafeedParams(String datafeedId, String startTime) {
this(datafeedId, parseDateOrThrow(startTime, START_TIME, System::currentTimeMillis));
}
public DatafeedParams(StreamInput in) throws IOException {
datafeedId = in.readString();
startTime = in.readVLong();
endTime = in.readOptionalLong();
timeout = TimeValue.timeValueMillis(in.readVLong());
}
DatafeedParams() {
}
private String datafeedId;
@ -136,22 +241,6 @@ public class StartDatafeedAction
private Long endTime;
private TimeValue timeout = TimeValue.timeValueSeconds(20);
public Request(String datafeedId, long startTime) {
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
this.startTime = startTime;
}
public Request(String datafeedId, String startTime) {
this(datafeedId, parseDateOrThrow(startTime, START_TIME, System::currentTimeMillis));
}
public Request(StreamInput in) throws IOException {
readFrom(in);
}
Request() {
}
public String getDatafeedId() {
return datafeedId;
}
@ -181,39 +270,18 @@ public class StartDatafeedAction
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException e = null;
if (endTime != null && endTime <= startTime) {
e = ValidateActions.addValidationError(START_TIME.getPreferredName() + " ["
+ startTime + "] must be earlier than " + END_TIME.getPreferredName()
+ " [" + endTime + "]", e);
}
return e;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
datafeedId = in.readString();
startTime = in.readVLong();
endTime = in.readOptionalLong();
timeout = TimeValue.timeValueMillis(in.readVLong());
public String getWriteableName() {
return TASK_NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(datafeedId);
out.writeVLong(startTime);
out.writeOptionalLong(endTime);
out.writeVLong(timeout.millis());
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -240,7 +308,7 @@ public class StartDatafeedAction
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
DatafeedParams other = (DatafeedParams) obj;
return Objects.equals(datafeedId, other.datafeedId) &&
Objects.equals(startTime, other.startTime) &&
Objects.equals(endTime, other.endTime) &&
@ -297,11 +365,11 @@ public class StartDatafeedAction
/* only pck protected for testing */
volatile DatafeedManager datafeedManager;
DatafeedTask(long id, String type, String action, TaskId parentTaskId, Request request) {
super(id, type, action, "datafeed-" + request.getDatafeedId(), parentTaskId);
this.datafeedId = request.getDatafeedId();
this.startTime = request.getStartTime();
this.endTime = request.getEndTime();
DatafeedTask(long id, String type, String action, TaskId parentTaskId, DatafeedParams params) {
super(id, type, action, "datafeed-" + params.getDatafeedId(), parentTaskId);
this.datafeedId = params.getDatafeedId();
this.startTime = params.getStartTime();
this.endTime = params.getEndTime();
}
public String getDatafeedId() {
@ -351,29 +419,30 @@ public class StartDatafeedAction
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
DatafeedParams params = request.params;
if (licenseState.isMachineLearningAllowed()) {
ActionListener<PersistentTask<Request>> finalListener = new ActionListener<PersistentTask<Request>>() {
ActionListener<PersistentTask<DatafeedParams>> finalListener = new ActionListener<PersistentTask<DatafeedParams>>() {
@Override
public void onResponse(PersistentTask<Request> persistentTask) {
waitForDatafeedStarted(persistentTask.getId(), request, listener);
public void onResponse(PersistentTask<DatafeedParams> persistentTask) {
waitForDatafeedStarted(persistentTask.getId(), params, listener);
}
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
e = new ElasticsearchStatusException("cannot start datafeed [" + request.getDatafeedId() +
e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() +
"] because it has already been started", RestStatus.CONFLICT, e);
}
listener.onFailure(e);
}
};
persistentTasksService.startPersistentTask(MlMetadata.datafeedTaskId(request.datafeedId), NAME, request, finalListener);
persistentTasksService.startPersistentTask(MlMetadata.datafeedTaskId(params.datafeedId), TASK_NAME, params, finalListener);
} else {
listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING));
}
}
void waitForDatafeedStarted(String taskId, Request request, ActionListener<Response> listener) {
void waitForDatafeedStarted(String taskId, DatafeedParams params, ActionListener<Response> listener) {
Predicate<PersistentTask<?>> predicate = persistentTask -> {
if (persistentTask == null) {
return false;
@ -381,10 +450,10 @@ public class StartDatafeedAction
DatafeedState datafeedState = (DatafeedState) persistentTask.getStatus();
return datafeedState == DatafeedState.STARTED;
};
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, request.timeout,
new WaitForPersistentTaskStatusListener<Request>() {
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, params.timeout,
new WaitForPersistentTaskStatusListener<DatafeedParams>() {
@Override
public void onResponse(PersistentTask<Request> task) {
public void onResponse(PersistentTask<DatafeedParams> task) {
listener.onResponse(new Response(true));
}
@ -396,38 +465,38 @@ public class StartDatafeedAction
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new ElasticsearchException("Starting datafeed ["
+ request.getDatafeedId() + "] timed out after [" + timeout + "]"));
+ params.getDatafeedId() + "] timed out after [" + timeout + "]"));
}
});
}
}
public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor<Request> {
public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor<DatafeedParams> {
private final DatafeedManager datafeedManager;
private final XPackLicenseState licenseState;
private final IndexNameExpressionResolver resolver;
public StartDatafeedPersistentTasksExecutor(Settings settings, XPackLicenseState licenseState, DatafeedManager datafeedManager) {
super(settings, NAME, ThreadPool.Names.MANAGEMENT);
super(settings, TASK_NAME, ThreadPool.Names.MANAGEMENT);
this.licenseState = licenseState;
this.datafeedManager = datafeedManager;
this.resolver = new IndexNameExpressionResolver(settings);
}
@Override
public Assignment getAssignment(Request request, ClusterState clusterState) {
return selectNode(logger, request.getDatafeedId(), clusterState, resolver);
public Assignment getAssignment(DatafeedParams params, ClusterState clusterState) {
return selectNode(logger, params.getDatafeedId(), clusterState, resolver);
}
@Override
public void validate(Request request, ClusterState clusterState) {
public void validate(DatafeedParams params, ClusterState clusterState) {
if (licenseState.isMachineLearningAllowed()) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks);
Assignment assignment = selectNode(logger, request.getDatafeedId(), clusterState, resolver);
StartDatafeedAction.validate(params.getDatafeedId(), mlMetadata, tasks);
Assignment assignment = selectNode(logger, params.getDatafeedId(), clusterState, resolver);
if (assignment.getExecutorNode() == null) {
String msg = "No node found to start datafeed [" + request.getDatafeedId()
String msg = "No node found to start datafeed [" + params.getDatafeedId()
+ "], allocation explanation [" + assignment.getExplanation() + "]";
throw new ElasticsearchException(msg);
}
@ -437,7 +506,7 @@ public class StartDatafeedAction
}
@Override
protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, Request request) {
protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, DatafeedParams params) {
DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask;
datafeedTask.datafeedManager = datafeedManager;
datafeedManager.run(datafeedTask,
@ -452,7 +521,7 @@ public class StartDatafeedAction
@Override
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTask<Request> persistentTask) {
PersistentTask<DatafeedParams> persistentTask) {
return new DatafeedTask(id, type, action, parentTaskId, persistentTask.getParams());
}
}

View File

@ -62,7 +62,7 @@ public class StopDatafeedAction
extends Action<StopDatafeedAction.Request, StopDatafeedAction.Response, StopDatafeedAction.RequestBuilder> {
public static final StopDatafeedAction INSTANCE = new StopDatafeedAction();
public static final String NAME = "cluster:admin/xpack/ml/datafeeds/stop";
public static final String NAME = "cluster:admin/xpack/ml/datafeed/stop";
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField FORCE = new ParseField("force");
public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes(5);
@ -112,7 +112,6 @@ public class StopDatafeedAction
public Request(String datafeedId) {
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
this.resolvedDatafeedIds = new String[] { datafeedId };
setActions(StartDatafeedAction.NAME);
}
Request() {

View File

@ -32,7 +32,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
@ -376,9 +375,9 @@ public class DatafeedManager extends AbstractComponent {
private void closeJob() {
persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(20),
new WaitForPersistentTaskStatusListener<StartDatafeedAction.Request>() {
new WaitForPersistentTaskStatusListener<StartDatafeedAction.DatafeedParams>() {
@Override
public void onResponse(PersistentTask<StartDatafeedAction.Request> PersistentTask) {
public void onResponse(PersistentTask<StartDatafeedAction.DatafeedParams> PersistentTask) {
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(datafeed.getJobId());
client.execute(CloseJobAction.INSTANCE, closeJobRequest, new ActionListener<CloseJobAction.Response>() {

View File

@ -23,7 +23,7 @@ public enum DatafeedState implements Task.Status {
STARTED, STOPPED;
public static final String NAME = StartDatafeedAction.NAME;
public static final String NAME = StartDatafeedAction.TASK_NAME;
private static final ConstructingObjectParser<DatafeedState, Void> PARSER =
new ConstructingObjectParser<>(NAME, args -> fromString((String) args[0]));

View File

@ -23,7 +23,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru
public class JobTaskStatus implements Task.Status {
public static final String NAME = OpenJobAction.NAME;
public static final String NAME = OpenJobAction.TASK_NAME;
private static ParseField STATE = new ParseField("state");
private static ParseField ALLOCATION_ID = new ParseField("allocation_id");

View File

@ -42,15 +42,16 @@ public class RestStartDatafeedAction extends BaseRestHandler {
jobDatafeedRequest = StartDatafeedAction.Request.parseRequest(datafeedId, parser);
} else {
String startTime = restRequest.param(StartDatafeedAction.START_TIME.getPreferredName(), DEFAULT_START);
jobDatafeedRequest = new StartDatafeedAction.Request(datafeedId, startTime);
StartDatafeedAction.DatafeedParams datafeedParams = new StartDatafeedAction.DatafeedParams(datafeedId, startTime);
if (restRequest.hasParam(StartDatafeedAction.END_TIME.getPreferredName())) {
jobDatafeedRequest.setEndTime(restRequest.param(StartDatafeedAction.END_TIME.getPreferredName()));
datafeedParams.setEndTime(restRequest.param(StartDatafeedAction.END_TIME.getPreferredName()));
}
if (restRequest.hasParam(StartDatafeedAction.TIMEOUT.getPreferredName())) {
TimeValue openTimeout = restRequest.paramAsTime(
StartDatafeedAction.TIMEOUT.getPreferredName(), TimeValue.timeValueSeconds(20));
jobDatafeedRequest.setTimeout(openTimeout);
datafeedParams.setTimeout(openTimeout);
}
jobDatafeedRequest = new StartDatafeedAction.Request(datafeedParams);
}
return channel -> {
client.execute(StartDatafeedAction.INSTANCE, jobDatafeedRequest,

View File

@ -36,12 +36,14 @@ public class RestOpenJobAction extends BaseRestHandler {
if (restRequest.hasContentOrSourceParam()) {
request = OpenJobAction.Request.parseRequest(restRequest.param(Job.ID.getPreferredName()), restRequest.contentParser());
} else {
request = new OpenJobAction.Request(restRequest.param(Job.ID.getPreferredName()));
request.setIgnoreDowntime(restRequest.paramAsBoolean(OpenJobAction.Request.IGNORE_DOWNTIME.getPreferredName(), true));
if (restRequest.hasParam("timeout")) {
TimeValue openTimeout = restRequest.paramAsTime("timeout", TimeValue.timeValueSeconds(20));
request.setTimeout(openTimeout);
OpenJobAction.JobParams jobParams = new OpenJobAction.JobParams(restRequest.param(Job.ID.getPreferredName()));
jobParams.setIgnoreDowntime(restRequest.paramAsBoolean(OpenJobAction.JobParams.IGNORE_DOWNTIME.getPreferredName(), true));
if (restRequest.hasParam(OpenJobAction.JobParams.TIMEOUT.getPreferredName())) {
TimeValue openTimeout = restRequest.paramAsTime(OpenJobAction.JobParams.TIMEOUT.getPreferredName(),
TimeValue.timeValueSeconds(20));
jobParams.setTimeout(openTimeout);
}
request = new OpenJobAction.Request(jobParams);
}
return channel -> {
client.execute(OpenJobAction.INSTANCE, request, new RestBuilderListener<OpenJobAction.Response>(channel) {

View File

@ -271,8 +271,8 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
MlMetadata beforeMetadata = builder.build();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L);
tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed1"), StartDatafeedAction.NAME, request, INITIAL_ASSIGNMENT);
StartDatafeedAction.DatafeedParams params = new StartDatafeedAction.DatafeedParams(datafeedConfig1.getId(), 0L);
tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed1"), StartDatafeedAction.TASK_NAME, params, INITIAL_ASSIGNMENT);
PersistentTasksCustomMetaData tasksInProgress = tasksBuilder.build();
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId());
@ -332,8 +332,8 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1));
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L);
tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed1"), StartDatafeedAction.NAME, request, INITIAL_ASSIGNMENT);
StartDatafeedAction.DatafeedParams params = new StartDatafeedAction.DatafeedParams("datafeed1", 0L);
tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed1"), StartDatafeedAction.TASK_NAME, params, INITIAL_ASSIGNMENT);
PersistentTasksCustomMetaData tasksInProgress = tasksBuilder.build();
MlMetadata.Builder builder2 = new MlMetadata.Builder(result);

View File

@ -113,8 +113,8 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
public static void addTask(String datafeedId, long startTime, String nodeId, DatafeedState state,
PersistentTasksCustomMetaData.Builder tasks) {
tasks.addTask(MlMetadata.datafeedTaskId(datafeedId), StartDatafeedAction.NAME,
new StartDatafeedAction.Request(datafeedId, startTime), new Assignment(nodeId, "test assignment"));
tasks.addTask(MlMetadata.datafeedTaskId(datafeedId), StartDatafeedAction.TASK_NAME,
new StartDatafeedAction.DatafeedParams(datafeedId, startTime), new Assignment(nodeId, "test assignment"));
tasks.updateTaskStatus(MlMetadata.datafeedTaskId(datafeedId), state);
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
@ -74,9 +73,9 @@ public class DatafeedJobsIT extends SecurityIntegTestCase {
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE,
PersistentTasksCustomMetaData::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.NAME,
StartDatafeedAction.Request::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.NAME, OpenJobAction.Request::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME,
StartDatafeedAction.DatafeedParams::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME, OpenJobAction.JobParams::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME,
PersistentTasksNodeService.Status::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new));
@ -155,7 +154,7 @@ public class DatafeedJobsIT extends SecurityIntegTestCase {
assertTrue(putDatafeedResponse.isAcknowledged());
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(datafeedConfig.getId(), 0L);
startDatafeedRequest.setEndTime(now);
startDatafeedRequest.getParams().setEndTime(now);
client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());

View File

@ -14,14 +14,14 @@ public class OpenJobActionRequestTests extends AbstractStreamableXContentTestCas
@Override
protected Request createTestInstance() {
Request request = new Request(randomAlphaOfLengthBetween(1, 20));
OpenJobAction.JobParams params = new OpenJobAction.JobParams(randomAlphaOfLengthBetween(1, 20));
if (randomBoolean()) {
request.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
params.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
if (randomBoolean()) {
request.setIgnoreDowntime(randomBoolean());
params.setIgnoreDowntime(randomBoolean());
}
return request;
return new Request(params);
}
@Override

View File

@ -48,23 +48,10 @@ import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
public class OpenJobActionTests extends ESTestCase {
public void testValidate() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id2", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED), tasksBuilder);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
OpenJobAction.validate("job_id", mlBuilder.build(), tasks);
OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap()));
OpenJobAction.validate("job_id", mlBuilder.build(), null);
}
public void testValidate_jobMissing() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id1").build(), false);
expectThrows(ResourceNotFoundException.class, () -> OpenJobAction.validate("job_id2", mlBuilder.build(), null));
expectThrows(ResourceNotFoundException.class, () -> OpenJobAction.validate("job_id2", mlBuilder.build()));
}
public void testValidate_jobMarkedAsDeleted() {
@ -73,7 +60,7 @@ public class OpenJobActionTests extends ESTestCase {
jobBuilder.setDeleted(true);
mlBuilder.putJob(jobBuilder.build(), false);
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> OpenJobAction.validate("job_id", mlBuilder.build(), null));
() -> OpenJobAction.validate("job_id", mlBuilder.build()));
assertEquals("Cannot open job [job_id] because it has been marked as deleted", e.getMessage());
}
@ -268,7 +255,7 @@ public class OpenJobActionTests extends ESTestCase {
}
public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) {
builder.addTask(MlMetadata.jobTaskId(jobId), OpenJobAction.NAME, new OpenJobAction.Request(jobId),
builder.addTask(MlMetadata.jobTaskId(jobId), OpenJobAction.TASK_NAME, new OpenJobAction.JobParams(jobId),
new Assignment(nodeId, "test assignment"));
if (jobState != null) {
builder.updateTaskStatus(MlMetadata.jobTaskId(jobId), new JobTaskStatus(jobState, builder.getLastAllocationId()));

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction.DatafeedParams;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction.Request;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
@ -17,14 +18,14 @@ public class StartDatafeedActionRequestTests extends AbstractStreamableXContentT
@Override
protected Request createTestInstance() {
Request request = new Request(randomAlphaOfLength(10), randomNonNegativeLong());
DatafeedParams params = new DatafeedParams(randomAlphaOfLength(10), randomNonNegativeLong());
if (randomBoolean()) {
request.setEndTime(randomNonNegativeLong());
params.setEndTime(randomNonNegativeLong());
}
if (randomBoolean()) {
request.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
params.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
return request;
return new Request(params);
}
@Override
@ -38,15 +39,15 @@ public class StartDatafeedActionRequestTests extends AbstractStreamableXContentT
}
public void testParseDateOrThrow() {
assertEquals(0L, StartDatafeedAction.Request.parseDateOrThrow("0",
assertEquals(0L, StartDatafeedAction.DatafeedParams.parseDateOrThrow("0",
StartDatafeedAction.START_TIME, () -> System.currentTimeMillis()));
assertEquals(0L, StartDatafeedAction.Request.parseDateOrThrow("1970-01-01T00:00:00Z",
assertEquals(0L, StartDatafeedAction.DatafeedParams.parseDateOrThrow("1970-01-01T00:00:00Z",
StartDatafeedAction.START_TIME, () -> System.currentTimeMillis()));
assertThat(StartDatafeedAction.Request.parseDateOrThrow("now",
assertThat(StartDatafeedAction.DatafeedParams.parseDateOrThrow("now",
StartDatafeedAction.START_TIME, () -> 123456789L), equalTo(123456789L));
Exception e = expectThrows(ElasticsearchParseException.class,
() -> StartDatafeedAction.Request.parseDateOrThrow("not-a-date",
() -> StartDatafeedAction.DatafeedParams.parseDateOrThrow("not-a-date",
StartDatafeedAction.START_TIME, () -> System.currentTimeMillis()));
assertEquals("Query param 'start' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).",
e.getMessage());

View File

@ -300,9 +300,9 @@ public class StartDatafeedActionTests extends ESTestCase {
public static StartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action,
TaskId parentTaskId,
StartDatafeedAction.Request request,
StartDatafeedAction.DatafeedParams params,
DatafeedManager datafeedManager) {
StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(id, type, action, parentTaskId, request);
StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(id, type, action, parentTaskId, params);
task.datafeedManager = datafeedManager;
return task;
}

View File

@ -52,8 +52,8 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
public void testValidate() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder.addTask(MlMetadata.datafeedTaskId("foo"), StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo", 0L), new Assignment("node_id", ""));
tasksBuilder.addTask(MlMetadata.datafeedTaskId("foo"), StartDatafeedAction.TASK_NAME,
new StartDatafeedAction.DatafeedParams("foo", 0L), new Assignment("node_id", ""));
tasksBuilder.updateTaskStatus(MlMetadata.datafeedTaskId("foo"), DatafeedState.STARTED);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
@ -74,8 +74,8 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
PersistentTasksCustomMetaData tasks;
if (randomBoolean()) {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder.addTask(MlMetadata.datafeedTaskId("foo2"), StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo2", 0L), new Assignment("node_id", ""));
tasksBuilder.addTask(MlMetadata.datafeedTaskId("foo2"), StartDatafeedAction.TASK_NAME,
new StartDatafeedAction.DatafeedParams("foo2", 0L), new Assignment("node_id", ""));
tasks = tasksBuilder.build();
} else {
tasks = randomBoolean() ? null : new PersistentTasksCustomMetaData(0L, Collections.emptyMap());
@ -95,22 +95,22 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
public void testResolveAll() {
Builder mlMetadataBuilder = new MlMetadata.Builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed_1"), StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_1", 0L), new Assignment("node_id", ""));
tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed_1"), StartDatafeedAction.TASK_NAME,
new StartDatafeedAction.DatafeedParams("datafeed_1", 0L), new Assignment("node_id", ""));
tasksBuilder.updateTaskStatus(MlMetadata.datafeedTaskId("datafeed_1"), DatafeedState.STARTED);
Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date());
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed_2"), StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_1", 0L), new Assignment("node_id", ""));
tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed_2"), StartDatafeedAction.TASK_NAME,
new StartDatafeedAction.DatafeedParams("datafeed_1", 0L), new Assignment("node_id", ""));
tasksBuilder.updateTaskStatus(MlMetadata.datafeedTaskId("datafeed_2"), DatafeedState.STOPPED);
job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date());
datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed_3"), StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_3", 0L), new Assignment("node_id", ""));
tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed_3"), StartDatafeedAction.TASK_NAME,
new StartDatafeedAction.DatafeedParams("datafeed_3", 0L), new Assignment("node_id", ""));
tasksBuilder.updateTaskStatus(MlMetadata.datafeedTaskId("datafeed_3"), DatafeedState.STARTED);
job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date());
datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build();

View File

@ -281,9 +281,9 @@ public class DatafeedManagerTests extends ESTestCase {
new Date(0), new Date(0), new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L);
StartDatafeedAction.DatafeedParams params = new StartDatafeedAction.DatafeedParams("datafeed_id", 0L);
DatafeedTask task = StartDatafeedActionTests.createDatafeedTask(1, "type", "action", null,
startDatafeedRequest, datafeedManager);
params, datafeedManager);
task = spyDatafeedTask(task);
datafeedManager.run(task, handler);
@ -310,9 +310,9 @@ public class DatafeedManagerTests extends ESTestCase {
new Date(0), new Date(0), new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L);
StartDatafeedAction.DatafeedParams params = new StartDatafeedAction.DatafeedParams("datafeed_id", 0L);
DatafeedTask task = StartDatafeedActionTests.createDatafeedTask(1, "type", "action", null,
startDatafeedRequest, datafeedManager);
params, datafeedManager);
task = spyDatafeedTask(task);
datafeedManager.run(task, handler);
@ -333,9 +333,9 @@ public class DatafeedManagerTests extends ESTestCase {
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
boolean cancelled = randomBoolean();
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L);
StartDatafeedAction.DatafeedParams params = new StartDatafeedAction.DatafeedParams("datafeed_id", 0L);
DatafeedTask task = StartDatafeedActionTests.createDatafeedTask(1, "type", "action", null,
startDatafeedRequest, datafeedManager);
params, datafeedManager);
task = spyDatafeedTask(task);
datafeedManager.run(task, handler);

View File

@ -225,7 +225,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
}
for (DiscoveryNode node : event.state().nodes()) {
Collection<PersistentTask<?>> foundTasks = tasks.findTasks(OpenJobAction.NAME, task -> {
Collection<PersistentTask<?>> foundTasks = tasks.findTasks(OpenJobAction.TASK_NAME, task -> {
JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus();
return node.getId().equals(task.getExecutorNode()) &&
(jobTaskState == null || jobTaskState.staleStatus(task));

View File

@ -36,7 +36,6 @@ import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.security.Security;
import org.junit.After;
import java.io.IOException;
import java.util.ArrayList;
@ -147,7 +146,7 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase {
protected void startDatafeed(String datafeedId, long start, long end) throws Exception {
StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedId, start);
request.setEndTime(end);
request.getParams().setEndTime(end);
client().execute(StartDatafeedAction.INSTANCE, request).get();
}

View File

@ -132,8 +132,8 @@ cluster:admin/xpack/ml/job/model_snapshots/update
cluster:admin/xpack/ml/job/flush
cluster:admin/xpack/ml/filters/delete
cluster:monitor/xpack/ml/datafeeds/stats/get
cluster:admin/xpack/ml/datafeeds/stop
cluster:admin/xpack/ml/datafeeds/start
cluster:admin/xpack/ml/datafeed/stop
cluster:admin/xpack/ml/datafeed/start
cluster:admin/xpack/ml/job/open
cluster:admin/xpack/ml/job/update
indices:internal/data/write/xpackdeletebyquery

View File

@ -339,7 +339,7 @@
cluster.state:
metric: [ metadata ]
filter_path: metadata.persistent_tasks
- match: {"metadata.persistent_tasks.tasks.0.task.cluster:admin/xpack/ml/job/open.status.state": opened}
- match: {"metadata.persistent_tasks.tasks.0.task.xpack/ml/job.status.state": opened}
- do:
xpack.ml.close_job:
@ -396,7 +396,7 @@
cluster.state:
metric: [ metadata ]
filter_path: metadata.persistent_tasks
- match: {"metadata.persistent_tasks.tasks.0.task.cluster:admin/xpack/ml/job/open.status.state": opened}
- match: {"metadata.persistent_tasks.tasks.0.task.xpack/ml/job.status.state": opened}
- do:
xpack.ml.close_job: