Moved waiting for scheduler started logic into StartSchedulerAction.TransportAction and moved the logic that was original there to a new action named InternalStartSchedulerAction.

This change prepares for elastic/elasticsearch/elastic/elasticsearch#22575, where we don't have ClusterService available in rest actions.

Original commit: elastic/x-pack-elasticsearch@87658c7fe8
This commit is contained in:
Martijn van Groningen 2017-01-18 11:42:24 +01:00
parent c33f26976d
commit d3c589c33d
10 changed files with 227 additions and 134 deletions

View File

@ -43,6 +43,7 @@ import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.ml.action.GetRecordsAction; import org.elasticsearch.xpack.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.ml.action.GetSchedulersAction; import org.elasticsearch.xpack.ml.action.GetSchedulersAction;
import org.elasticsearch.xpack.ml.action.GetSchedulersStatsAction; import org.elasticsearch.xpack.ml.action.GetSchedulersStatsAction;
import org.elasticsearch.xpack.ml.action.InternalStartSchedulerAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.PutJobAction;
@ -293,6 +294,7 @@ public class MlPlugin extends Plugin implements ActionPlugin {
new ActionHandler<>(PutSchedulerAction.INSTANCE, PutSchedulerAction.TransportAction.class), new ActionHandler<>(PutSchedulerAction.INSTANCE, PutSchedulerAction.TransportAction.class),
new ActionHandler<>(DeleteSchedulerAction.INSTANCE, DeleteSchedulerAction.TransportAction.class), new ActionHandler<>(DeleteSchedulerAction.INSTANCE, DeleteSchedulerAction.TransportAction.class),
new ActionHandler<>(StartSchedulerAction.INSTANCE, StartSchedulerAction.TransportAction.class), new ActionHandler<>(StartSchedulerAction.INSTANCE, StartSchedulerAction.TransportAction.class),
new ActionHandler<>(InternalStartSchedulerAction.INSTANCE, InternalStartSchedulerAction.TransportAction.class),
new ActionHandler<>(StopSchedulerAction.INSTANCE, StopSchedulerAction.TransportAction.class), new ActionHandler<>(StopSchedulerAction.INSTANCE, StopSchedulerAction.TransportAction.class),
new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class) new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class)
); );

View File

@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.scheduler.ScheduledJobRunner;
public class InternalStartSchedulerAction extends
Action<InternalStartSchedulerAction.Request, InternalStartSchedulerAction.Response, InternalStartSchedulerAction.RequestBuilder> {
public static final InternalStartSchedulerAction INSTANCE = new InternalStartSchedulerAction();
public static final String NAME = "cluster:admin/ml/scheduler/internal_start";
private InternalStartSchedulerAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends StartSchedulerAction.Request {
Request(String schedulerId, long startTime) {
super(schedulerId, startTime);
}
Request() {
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SchedulerTask(id, type, action, parentTaskId, getSchedulerId());
}
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, InternalStartSchedulerAction action) {
super(client, action, new Request());
}
}
public static class Response extends ActionResponse {
Response() {
}
}
public static class SchedulerTask extends CancellableTask {
private volatile ScheduledJobRunner.Holder holder;
public SchedulerTask(long id, String type, String action, TaskId parentTaskId, String schedulerId) {
super(id, type, action, "scheduler-" + schedulerId, parentTaskId);
}
public void setHolder(ScheduledJobRunner.Holder holder) {
this.holder = holder;
}
@Override
protected void onCancelled() {
stop();
}
/* public for testing */
public void stop() {
if (holder != null) {
holder.stop(null);
}
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final ScheduledJobRunner scheduledJobRunner;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ScheduledJobRunner scheduledJobRunner) {
super(settings, InternalStartSchedulerAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
Request::new);
this.scheduledJobRunner = scheduledJobRunner;
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
SchedulerTask schedulerTask = (SchedulerTask) task;
scheduledJobRunner.run(request.getSchedulerId(), request.getStartTime(), request.getEndTime(), schedulerTask, (error) -> {
if (error != null) {
listener.onFailure(error);
} else {
listener.onResponse(new Response());
}
});
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
throw new UnsupportedOperationException("the task parameter is required");
}
}
}

View File

@ -15,23 +15,27 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.scheduler.ScheduledJobRunner; import org.elasticsearch.xpack.ml.scheduler.ScheduledJobRunner;
import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig; import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.ml.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.SchedulerStatusObserver;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
@ -41,6 +45,7 @@ public class StartSchedulerAction
public static final ParseField START_TIME = new ParseField("start"); public static final ParseField START_TIME = new ParseField("start");
public static final ParseField END_TIME = new ParseField("end"); public static final ParseField END_TIME = new ParseField("end");
public static final ParseField START_TIMEOUT = new ParseField("start_timeout");
public static final StartSchedulerAction INSTANCE = new StartSchedulerAction(); public static final StartSchedulerAction INSTANCE = new StartSchedulerAction();
public static final String NAME = "cluster:admin/ml/scheduler/start"; public static final String NAME = "cluster:admin/ml/scheduler/start";
@ -67,6 +72,8 @@ public class StartSchedulerAction
PARSER.declareString((request, schedulerId) -> request.schedulerId = schedulerId, SchedulerConfig.ID); PARSER.declareString((request, schedulerId) -> request.schedulerId = schedulerId, SchedulerConfig.ID);
PARSER.declareLong((request, startTime) -> request.startTime = startTime, START_TIME); PARSER.declareLong((request, startTime) -> request.startTime = startTime, START_TIME);
PARSER.declareLong(Request::setEndTime, END_TIME); PARSER.declareLong(Request::setEndTime, END_TIME);
PARSER.declareString((request, val) -> request.setStartTimeout(TimeValue.parseTimeValue(val,
START_TIME.getPreferredName())), START_TIMEOUT);
} }
public static Request parseRequest(String schedulerId, XContentParser parser) { public static Request parseRequest(String schedulerId, XContentParser parser) {
@ -80,6 +87,7 @@ public class StartSchedulerAction
private String schedulerId; private String schedulerId;
private long startTime; private long startTime;
private Long endTime; private Long endTime;
private TimeValue startTimeout = TimeValue.timeValueSeconds(30);
public Request(String schedulerId, long startTime) { public Request(String schedulerId, long startTime) {
this.schedulerId = ExceptionsHelper.requireNonNull(schedulerId, SchedulerConfig.ID.getPreferredName()); this.schedulerId = ExceptionsHelper.requireNonNull(schedulerId, SchedulerConfig.ID.getPreferredName());
@ -105,14 +113,17 @@ public class StartSchedulerAction
this.endTime = endTime; this.endTime = endTime;
} }
@Override public TimeValue getStartTimeout() {
public ActionRequestValidationException validate() { return startTimeout;
return null; }
public void setStartTimeout(TimeValue startTimeout) {
this.startTimeout = startTimeout;
} }
@Override @Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) { public ActionRequestValidationException validate() {
return new SchedulerTask(id, type, action, parentTaskId, schedulerId); return null;
} }
@Override @Override
@ -121,6 +132,7 @@ public class StartSchedulerAction
schedulerId = in.readString(); schedulerId = in.readString();
startTime = in.readVLong(); startTime = in.readVLong();
endTime = in.readOptionalLong(); endTime = in.readOptionalLong();
startTimeout = new TimeValue(in.readVLong());
} }
@Override @Override
@ -129,6 +141,7 @@ public class StartSchedulerAction
out.writeString(schedulerId); out.writeString(schedulerId);
out.writeVLong(startTime); out.writeVLong(startTime);
out.writeOptionalLong(endTime); out.writeOptionalLong(endTime);
out.writeVLong(startTimeout.millis());
} }
@Override @Override
@ -170,66 +183,79 @@ public class StartSchedulerAction
} }
} }
public static class Response extends ActionResponse { public static class Response extends ActionResponse implements ToXContentObject {
private boolean started;
Response(boolean started) {
this.started = started;
}
Response() { Response() {
} }
} public boolean isStarted() {
return started;
public static class SchedulerTask extends CancellableTask {
private volatile ScheduledJobRunner.Holder holder;
public SchedulerTask(long id, String type, String action, TaskId parentTaskId, String schedulerId) {
super(id, type, action, "scheduler-" + schedulerId, parentTaskId);
}
public void setHolder(ScheduledJobRunner.Holder holder) {
this.holder = holder;
} }
@Override @Override
protected void onCancelled() { public void readFrom(StreamInput in) throws IOException {
stop(); super.readFrom(in);
started = in.readBoolean();
} }
/* public for testing */ @Override
public void stop() { public void writeTo(StreamOutput out) throws IOException {
if (holder != null) { super.writeTo(out);
holder.stop(null); out.writeBoolean(started);
} }
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("started", started);
builder.endObject();
return builder;
} }
} }
public static class TransportAction extends HandledTransportAction<Request, Response> { public static class TransportAction extends HandledTransportAction<Request, Response> {
private final ScheduledJobRunner scheduledJobRunner; private final ClusterService clusterService;
private final SchedulerStatusObserver schedulerStatusObserver;
private final InternalStartSchedulerAction.TransportAction transportAction;
@Inject @Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ScheduledJobRunner scheduledJobRunner) { ClusterService clusterService, InternalStartSchedulerAction.TransportAction transportAction) {
super(settings, StartSchedulerAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, super(settings, StartSchedulerAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
Request::new); Request::new);
this.scheduledJobRunner = scheduledJobRunner; this.clusterService = clusterService;
} this.schedulerStatusObserver = new SchedulerStatusObserver(threadPool, clusterService);
this.transportAction = transportAction;
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
SchedulerTask schedulerTask = (SchedulerTask) task;
scheduledJobRunner.run(request.getSchedulerId(), request.getStartTime(), request.getEndTime(), schedulerTask, (error) -> {
if (error != null) {
listener.onFailure(error);
} else {
listener.onResponse(new Response());
}
});
} }
@Override @Override
protected void doExecute(Request request, ActionListener<Response> listener) { protected void doExecute(Request request, ActionListener<Response> listener) {
throw new UnsupportedOperationException("the task parameter is required"); // This validation happens also in ScheduledJobRunner, the reason we do it here too is that if it fails there
// we are unable to provide the user immediate feedback. We would create the task and the validation would fail
// in the background, whereas now the validation failure is part of the response being returned.
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
ScheduledJobRunner.validate(request.schedulerId, mlMetadata);
InternalStartSchedulerAction.Request internalRequest =
new InternalStartSchedulerAction.Request(request.schedulerId, request.startTime);
internalRequest.setEndTime(request.endTime);
transportAction.execute(internalRequest, LoggingTaskListener.instance());
schedulerStatusObserver.waitForStatus(request.schedulerId, request.startTimeout, SchedulerStatus.STARTED, e -> {
if (e != null) {
listener.onFailure(e);
} else {
listener.onResponse(new Response(true));
}
});
} }
} }
} }

View File

@ -33,11 +33,11 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.ml.scheduler.Scheduler; import org.elasticsearch.xpack.ml.scheduler.Scheduler;
import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.ml.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.SchedulerStatusObserver; import org.elasticsearch.xpack.ml.utils.SchedulerStatusObserver;
@ -170,7 +170,7 @@ public class StopSchedulerAction
validate(schedulerId, mlMetadata); validate(schedulerId, mlMetadata);
ListTasksRequest listTasksRequest = new ListTasksRequest(); ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions(StartSchedulerAction.NAME); listTasksRequest.setActions(InternalStartSchedulerAction.NAME);
listTasksRequest.setDetailed(true); listTasksRequest.setDetailed(true);
listTasksAction.execute(listTasksRequest, new ActionListener<ListTasksResponse>() { listTasksAction.execute(listTasksRequest, new ActionListener<ListTasksResponse>() {
@Override @Override

View File

@ -7,29 +7,19 @@ package org.elasticsearch.xpack.ml.rest.schedulers;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.StartSchedulerAction; import org.elasticsearch.xpack.ml.action.StartSchedulerAction;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.scheduler.ScheduledJobRunner;
import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig; import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.ml.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.ml.utils.SchedulerStatusObserver;
import java.io.IOException; import java.io.IOException;
@ -37,15 +27,9 @@ public class RestStartSchedulerAction extends BaseRestHandler {
private static final String DEFAULT_START = "0"; private static final String DEFAULT_START = "0";
private final ClusterService clusterService;
private final SchedulerStatusObserver schedulerStatusObserver;
@Inject @Inject
public RestStartSchedulerAction(Settings settings, RestController controller, ThreadPool threadPool, public RestStartSchedulerAction(Settings settings, RestController controller) {
ClusterService clusterService) {
super(settings); super(settings);
this.clusterService = clusterService;
this.schedulerStatusObserver = new SchedulerStatusObserver(threadPool, clusterService);
controller.registerHandler(RestRequest.Method.POST, controller.registerHandler(RestRequest.Method.POST,
MlPlugin.BASE_PATH + "schedulers/{" + SchedulerConfig.ID.getPreferredName() + "}/_start", this); MlPlugin.BASE_PATH + "schedulers/{" + SchedulerConfig.ID.getPreferredName() + "}/_start", this);
} }
@ -53,13 +37,6 @@ public class RestStartSchedulerAction extends BaseRestHandler {
@Override @Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String schedulerId = restRequest.param(SchedulerConfig.ID.getPreferredName()); String schedulerId = restRequest.param(SchedulerConfig.ID.getPreferredName());
// This validation happens also in ScheduledJobRunner, the reason we do it here too is that if it fails there
// we are unable to provide the user immediate feedback. We would create the task and the validation would fail
// in the background, whereas now the validation failure is part of the response being returned.
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
ScheduledJobRunner.validate(schedulerId, mlMetadata);
StartSchedulerAction.Request jobSchedulerRequest; StartSchedulerAction.Request jobSchedulerRequest;
if (restRequest.hasContentOrSourceParam()) { if (restRequest.hasContentOrSourceParam()) {
XContentParser parser = restRequest.contentOrSourceParamParser(); XContentParser parser = restRequest.contentOrSourceParamParser();
@ -74,29 +51,12 @@ public class RestStartSchedulerAction extends BaseRestHandler {
} }
jobSchedulerRequest = new StartSchedulerAction.Request(schedulerId, startTimeMillis); jobSchedulerRequest = new StartSchedulerAction.Request(schedulerId, startTimeMillis);
jobSchedulerRequest.setEndTime(endTimeMillis); jobSchedulerRequest.setEndTime(endTimeMillis);
TimeValue startTimeout = restRequest.paramAsTime(StartSchedulerAction.START_TIMEOUT.getPreferredName(),
TimeValue.timeValueSeconds(30));
jobSchedulerRequest.setStartTimeout(startTimeout);
} }
TimeValue startTimeout = restRequest.paramAsTime("start_timeout", TimeValue.timeValueSeconds(30));
return channel -> { return channel -> {
Task task = client.executeLocally(StartSchedulerAction.INSTANCE, jobSchedulerRequest, LoggingTaskListener.instance()); client.execute(StartSchedulerAction.INSTANCE, jobSchedulerRequest, new RestToXContentListener<>(channel));
schedulerStatusObserver.waitForStatus(schedulerId, startTimeout, SchedulerStatus.STARTED, e -> {
if (e != null) {
try {
channel.sendResponse(new BytesRestResponse(channel, e));
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
} else {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
builder.field("task", clusterService.localNode().getId() + ":" + task.getId());
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
});
}; };
} }

View File

@ -17,7 +17,7 @@ import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.StartSchedulerAction; import org.elasticsearch.xpack.ml.action.InternalStartSchedulerAction;
import org.elasticsearch.xpack.ml.action.UpdateSchedulerStatusAction; import org.elasticsearch.xpack.ml.action.UpdateSchedulerStatusAction;
import org.elasticsearch.xpack.ml.job.DataCounts; import org.elasticsearch.xpack.ml.job.DataCounts;
import org.elasticsearch.xpack.ml.job.DataDescription; import org.elasticsearch.xpack.ml.job.DataDescription;
@ -61,7 +61,7 @@ public class ScheduledJobRunner extends AbstractComponent {
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
} }
public void run(String schedulerId, long startTime, Long endTime, StartSchedulerAction.SchedulerTask task, public void run(String schedulerId, long startTime, Long endTime, InternalStartSchedulerAction.SchedulerTask task,
Consumer<Exception> handler) { Consumer<Exception> handler) {
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE); MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
validate(schedulerId, mlMetadata); validate(schedulerId, mlMetadata);
@ -185,7 +185,7 @@ public class ScheduledJobRunner extends AbstractComponent {
} }
private Holder createJobScheduler(Scheduler scheduler, Job job, long finalBucketEndMs, long latestRecordTimeMs, private Holder createJobScheduler(Scheduler scheduler, Job job, long finalBucketEndMs, long latestRecordTimeMs,
Consumer<Exception> handler, StartSchedulerAction.SchedulerTask task) { Consumer<Exception> handler, InternalStartSchedulerAction.SchedulerTask task) {
Auditor auditor = jobProvider.audit(job.getId()); Auditor auditor = jobProvider.audit(job.getId());
Duration frequency = getFrequencyOrDefault(scheduler, job); Duration frequency = getFrequencyOrDefault(scheduler, job);
Duration queryDelay = Duration.ofSeconds(scheduler.getConfig().getQueryDelay()); Duration queryDelay = Duration.ofSeconds(scheduler.getConfig().getQueryDelay());

View File

@ -39,12 +39,10 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.xpack.ml.integration.TooManyJobsIT.ensureClusterStateConsistencyWorkAround; import static org.elasticsearch.xpack.ml.integration.TooManyJobsIT.ensureClusterStateConsistencyWorkAround;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@ESIntegTestCase.ClusterScope(numDataNodes = 1) @ESIntegTestCase.ClusterScope(numDataNodes = 1)
public class ScheduledJobsIT extends ESIntegTestCase { public class ScheduledJobsIT extends ESIntegTestCase {
@ -94,7 +92,9 @@ public class ScheduledJobsIT extends ESIntegTestCase {
StartSchedulerAction.Request startSchedulerRequest = new StartSchedulerAction.Request(schedulerConfig.getId(), 0L); StartSchedulerAction.Request startSchedulerRequest = new StartSchedulerAction.Request(schedulerConfig.getId(), 0L);
startSchedulerRequest.setEndTime(now); startSchedulerRequest.setEndTime(now);
client().execute(StartSchedulerAction.INSTANCE, startSchedulerRequest).get(); StartSchedulerAction.Response startSchedulerResponse =
client().execute(StartSchedulerAction.INSTANCE, startSchedulerRequest).get();
assertTrue(startSchedulerResponse.isStarted());
assertBusy(() -> { assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId()); DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2)); assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2));
@ -127,16 +127,10 @@ public class ScheduledJobsIT extends ESIntegTestCase {
PutSchedulerAction.Response putSchedulerResponse = client().execute(PutSchedulerAction.INSTANCE, putSchedulerRequest).get(); PutSchedulerAction.Response putSchedulerResponse = client().execute(PutSchedulerAction.INSTANCE, putSchedulerRequest).get();
assertTrue(putSchedulerResponse.isAcknowledged()); assertTrue(putSchedulerResponse.isAcknowledged());
AtomicReference<Throwable> errorHolder = new AtomicReference<>(); StartSchedulerAction.Request startSchedulerRequest = new StartSchedulerAction.Request(schedulerConfig.getId(), 0L);
Thread t = new Thread(() -> { StartSchedulerAction.Response startSchedulerResponse =
try {
StartSchedulerAction.Request startSchedulerRequest = new StartSchedulerAction.Request(schedulerConfig.getId(), 0L);
client().execute(StartSchedulerAction.INSTANCE, startSchedulerRequest).get(); client().execute(StartSchedulerAction.INSTANCE, startSchedulerRequest).get();
} catch (Exception | AssertionError e) { assertTrue(startSchedulerResponse.isStarted());
errorHolder.set(e);
}
});
t.start();
assertBusy(() -> { assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId()); DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1)); assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1));
@ -160,7 +154,6 @@ public class ScheduledJobsIT extends ESIntegTestCase {
.getState().metaData().custom(MlMetadata.TYPE); .getState().metaData().custom(MlMetadata.TYPE);
assertThat(mlMetadata.getScheduler(schedulerConfig.getId()).get().getStatus(), equalTo(SchedulerStatus.STOPPED)); assertThat(mlMetadata.getScheduler(schedulerConfig.getId()).get().getStatus(), equalTo(SchedulerStatus.STOPPED));
}); });
assertThat(errorHolder.get(), nullValue());
} }
private void indexDocs(String index, long numDocs, long start, long end) { private void indexDocs(String index, long numDocs, long start, long end) {

View File

@ -150,7 +150,7 @@ public class ScheduledJobIT extends ESRestTestCase {
Response response = client().performRequest("post", Response response = client().performRequest("post",
MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_start?start=2016-06-01T00:00:00Z"); MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_start?start=2016-06-01T00:00:00Z");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), containsString("{\"task\":\"")); assertThat(responseEntityToString(response), equalTo("{\"started\":true}"));
assertBusy(() -> { assertBusy(() -> {
try { try {
Response getJobResponse = client().performRequest("get", Response getJobResponse = client().performRequest("get",
@ -246,7 +246,7 @@ public class ScheduledJobIT extends ESRestTestCase {
Response startSchedulerRequest = client().performRequest("post", Response startSchedulerRequest = client().performRequest("post",
MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z"); MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z");
assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200)); assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(startSchedulerRequest), containsString("{\"task\":\"")); assertThat(responseEntityToString(startSchedulerRequest), equalTo("{\"started\":true}"));
assertBusy(() -> { assertBusy(() -> {
try { try {
Response schedulerStatsResponse = client().performRequest("get", Response schedulerStatsResponse = client().performRequest("get",

View File

@ -7,20 +7,13 @@ package org.elasticsearch.xpack.ml.rest.schedulers;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.Job;
import org.elasticsearch.xpack.ml.job.JobStatus;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.scheduler.ScheduledJobRunnerTests; import org.elasticsearch.xpack.ml.scheduler.ScheduledJobRunnerTests;
import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig; import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig;
@ -28,24 +21,13 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class RestStartJobSchedulerActionTests extends ESTestCase { public class RestStartJobSchedulerActionTests extends ESTestCase {
public void testPrepareRequest() throws Exception { public void testPrepareRequest() throws Exception {
ClusterService clusterService = mock(ClusterService.class);
Job.Builder job = ScheduledJobRunnerTests.createScheduledJob(); Job.Builder job = ScheduledJobRunnerTests.createScheduledJob();
SchedulerConfig schedulerConfig = ScheduledJobRunnerTests.createSchedulerConfig("foo-scheduler", "foo").build(); SchedulerConfig schedulerConfig = ScheduledJobRunnerTests.createSchedulerConfig("foo-scheduler", "foo").build();
MlMetadata mlMetadata = new MlMetadata.Builder() RestStartSchedulerAction action = new RestStartSchedulerAction(Settings.EMPTY, mock(RestController.class));
.putJob(job.build(), false)
.putScheduler(schedulerConfig)
.updateStatus("foo", JobStatus.OPENED, null)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata))
.build());
RestStartSchedulerAction action = new RestStartSchedulerAction(Settings.EMPTY, mock(RestController.class),
mock(ThreadPool.class), clusterService);
Map<String, String> params = new HashMap<>(); Map<String, String> params = new HashMap<>();
params.put("start", "not-a-date"); params.put("start", "not-a-date");

View File

@ -20,8 +20,8 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.FlushJobAction; import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.InternalStartSchedulerAction;
import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.StartSchedulerAction;
import org.elasticsearch.xpack.ml.action.UpdateSchedulerStatusAction; import org.elasticsearch.xpack.ml.action.UpdateSchedulerStatusAction;
import org.elasticsearch.xpack.ml.job.AnalysisConfig; import org.elasticsearch.xpack.ml.job.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.DataCounts; import org.elasticsearch.xpack.ml.job.DataCounts;
@ -141,7 +141,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
when(dataExtractor.next()).thenReturn(Optional.of(in)); when(dataExtractor.next()).thenReturn(Optional.of(in));
when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
StartSchedulerAction.SchedulerTask task = mock(StartSchedulerAction.SchedulerTask.class); InternalStartSchedulerAction.SchedulerTask task = mock(InternalStartSchedulerAction.SchedulerTask.class);
scheduledJobRunner.run("scheduler1", 0L, 60000L, task, handler); scheduledJobRunner.run("scheduler1", 0L, 60000L, task, handler);
verify(threadPool, times(1)).executor(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME);
@ -181,7 +181,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
when(dataExtractor.next()).thenThrow(new RuntimeException("dummy")); when(dataExtractor.next()).thenThrow(new RuntimeException("dummy"));
when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
StartSchedulerAction.SchedulerTask task = mock(StartSchedulerAction.SchedulerTask.class); InternalStartSchedulerAction.SchedulerTask task = mock(InternalStartSchedulerAction.SchedulerTask.class);
scheduledJobRunner.run("scheduler1", 0L, 60000L, task, handler); scheduledJobRunner.run("scheduler1", 0L, 60000L, task, handler);
verify(threadPool, times(1)).executor(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME);
@ -214,7 +214,8 @@ public class ScheduledJobRunnerTests extends ESTestCase {
when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
boolean cancelled = randomBoolean(); boolean cancelled = randomBoolean();
StartSchedulerAction.SchedulerTask task = new StartSchedulerAction.SchedulerTask(1, "type", "action", null, "scheduler1"); InternalStartSchedulerAction.SchedulerTask task =
new InternalStartSchedulerAction.SchedulerTask(1, "type", "action", null, "scheduler1");
scheduledJobRunner.run("scheduler1", 0L, null, task, handler); scheduledJobRunner.run("scheduler1", 0L, null, task, handler);
verify(threadPool, times(1)).executor(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME);