From d3c589c33d826b319f328550b62e5613bf5afdc2 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 18 Jan 2017 11:42:24 +0100 Subject: [PATCH] Moved waiting for scheduler started logic into StartSchedulerAction.TransportAction and moved the logic that was original there to a new action named InternalStartSchedulerAction. This change prepares for elastic/elasticsearch/elastic/elasticsearch#22575, where we don't have ClusterService available in rest actions. Original commit: elastic/x-pack-elasticsearch@87658c7fe8916a6925ce3298bae9991660c55a04 --- .../org/elasticsearch/xpack/ml/MlPlugin.java | 2 + .../action/InternalStartSchedulerAction.java | 129 ++++++++++++++++++ .../xpack/ml/action/StartSchedulerAction.java | 114 ++++++++++------ .../xpack/ml/action/StopSchedulerAction.java | 6 +- .../schedulers/RestStartSchedulerAction.java | 52 +------ .../ml/scheduler/ScheduledJobRunner.java | 6 +- .../xpack/ml/action/ScheduledJobsIT.java | 19 +-- .../xpack/ml/integration/ScheduledJobIT.java | 4 +- .../RestStartJobSchedulerActionTests.java | 20 +-- .../ml/scheduler/ScheduledJobRunnerTests.java | 9 +- 10 files changed, 227 insertions(+), 134 deletions(-) create mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/InternalStartSchedulerAction.java diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java index fbb37897ea2..d83889dda1c 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java @@ -43,6 +43,7 @@ import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.ml.action.GetRecordsAction; import org.elasticsearch.xpack.ml.action.GetSchedulersAction; import org.elasticsearch.xpack.ml.action.GetSchedulersStatsAction; +import org.elasticsearch.xpack.ml.action.InternalStartSchedulerAction; import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.action.PutJobAction; @@ -293,6 +294,7 @@ public class MlPlugin extends Plugin implements ActionPlugin { new ActionHandler<>(PutSchedulerAction.INSTANCE, PutSchedulerAction.TransportAction.class), new ActionHandler<>(DeleteSchedulerAction.INSTANCE, DeleteSchedulerAction.TransportAction.class), new ActionHandler<>(StartSchedulerAction.INSTANCE, StartSchedulerAction.TransportAction.class), + new ActionHandler<>(InternalStartSchedulerAction.INSTANCE, InternalStartSchedulerAction.TransportAction.class), new ActionHandler<>(StopSchedulerAction.INSTANCE, StopSchedulerAction.TransportAction.class), new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class) ); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/InternalStartSchedulerAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/InternalStartSchedulerAction.java new file mode 100644 index 00000000000..08caef57b14 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/InternalStartSchedulerAction.java @@ -0,0 +1,129 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.scheduler.ScheduledJobRunner; + +public class InternalStartSchedulerAction extends + Action { + + public static final InternalStartSchedulerAction INSTANCE = new InternalStartSchedulerAction(); + public static final String NAME = "cluster:admin/ml/scheduler/internal_start"; + + private InternalStartSchedulerAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends StartSchedulerAction.Request { + + Request(String schedulerId, long startTime) { + super(schedulerId, startTime); + } + + Request() { + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return new SchedulerTask(id, type, action, parentTaskId, getSchedulerId()); + } + } + + static class RequestBuilder extends ActionRequestBuilder { + + public RequestBuilder(ElasticsearchClient client, InternalStartSchedulerAction action) { + super(client, action, new Request()); + } + } + + public static class Response extends ActionResponse { + + Response() { + } + + } + + public static class SchedulerTask extends CancellableTask { + + private volatile ScheduledJobRunner.Holder holder; + + public SchedulerTask(long id, String type, String action, TaskId parentTaskId, String schedulerId) { + super(id, type, action, "scheduler-" + schedulerId, parentTaskId); + } + + public void setHolder(ScheduledJobRunner.Holder holder) { + this.holder = holder; + } + + @Override + protected void onCancelled() { + stop(); + } + + /* public for testing */ + public void stop() { + if (holder != null) { + holder.stop(null); + } + } + } + + public static class TransportAction extends HandledTransportAction { + + private final ScheduledJobRunner scheduledJobRunner; + + @Inject + public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + ScheduledJobRunner scheduledJobRunner) { + super(settings, InternalStartSchedulerAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, + Request::new); + this.scheduledJobRunner = scheduledJobRunner; + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + SchedulerTask schedulerTask = (SchedulerTask) task; + scheduledJobRunner.run(request.getSchedulerId(), request.getStartTime(), request.getEndTime(), schedulerTask, (error) -> { + if (error != null) { + listener.onFailure(error); + } else { + listener.onResponse(new Response()); + } + }); + } + + @Override + protected void doExecute(Request request, ActionListener listener) { + throw new UnsupportedOperationException("the task parameter is required"); + } + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StartSchedulerAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StartSchedulerAction.java index f9c6cfa448b..88e92a7926a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StartSchedulerAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StartSchedulerAction.java @@ -15,23 +15,27 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.LoggingTaskListener; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.scheduler.ScheduledJobRunner; import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig; +import org.elasticsearch.xpack.ml.scheduler.SchedulerStatus; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.utils.SchedulerStatusObserver; import java.io.IOException; import java.util.Objects; @@ -41,6 +45,7 @@ public class StartSchedulerAction public static final ParseField START_TIME = new ParseField("start"); public static final ParseField END_TIME = new ParseField("end"); + public static final ParseField START_TIMEOUT = new ParseField("start_timeout"); public static final StartSchedulerAction INSTANCE = new StartSchedulerAction(); public static final String NAME = "cluster:admin/ml/scheduler/start"; @@ -67,6 +72,8 @@ public class StartSchedulerAction PARSER.declareString((request, schedulerId) -> request.schedulerId = schedulerId, SchedulerConfig.ID); PARSER.declareLong((request, startTime) -> request.startTime = startTime, START_TIME); PARSER.declareLong(Request::setEndTime, END_TIME); + PARSER.declareString((request, val) -> request.setStartTimeout(TimeValue.parseTimeValue(val, + START_TIME.getPreferredName())), START_TIMEOUT); } public static Request parseRequest(String schedulerId, XContentParser parser) { @@ -80,6 +87,7 @@ public class StartSchedulerAction private String schedulerId; private long startTime; private Long endTime; + private TimeValue startTimeout = TimeValue.timeValueSeconds(30); public Request(String schedulerId, long startTime) { this.schedulerId = ExceptionsHelper.requireNonNull(schedulerId, SchedulerConfig.ID.getPreferredName()); @@ -105,14 +113,17 @@ public class StartSchedulerAction this.endTime = endTime; } - @Override - public ActionRequestValidationException validate() { - return null; + public TimeValue getStartTimeout() { + return startTimeout; + } + + public void setStartTimeout(TimeValue startTimeout) { + this.startTimeout = startTimeout; } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new SchedulerTask(id, type, action, parentTaskId, schedulerId); + public ActionRequestValidationException validate() { + return null; } @Override @@ -121,6 +132,7 @@ public class StartSchedulerAction schedulerId = in.readString(); startTime = in.readVLong(); endTime = in.readOptionalLong(); + startTimeout = new TimeValue(in.readVLong()); } @Override @@ -129,6 +141,7 @@ public class StartSchedulerAction out.writeString(schedulerId); out.writeVLong(startTime); out.writeOptionalLong(endTime); + out.writeVLong(startTimeout.millis()); } @Override @@ -170,66 +183,79 @@ public class StartSchedulerAction } } - public static class Response extends ActionResponse { + public static class Response extends ActionResponse implements ToXContentObject { + + private boolean started; + + Response(boolean started) { + this.started = started; + } Response() { } - } - - public static class SchedulerTask extends CancellableTask { - - private volatile ScheduledJobRunner.Holder holder; - - public SchedulerTask(long id, String type, String action, TaskId parentTaskId, String schedulerId) { - super(id, type, action, "scheduler-" + schedulerId, parentTaskId); - } - - public void setHolder(ScheduledJobRunner.Holder holder) { - this.holder = holder; + public boolean isStarted() { + return started; } @Override - protected void onCancelled() { - stop(); + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + started = in.readBoolean(); } - /* public for testing */ - public void stop() { - if (holder != null) { - holder.stop(null); - } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(started); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("started", started); + builder.endObject(); + return builder; } } public static class TransportAction extends HandledTransportAction { - private final ScheduledJobRunner scheduledJobRunner; + private final ClusterService clusterService; + private final SchedulerStatusObserver schedulerStatusObserver; + private final InternalStartSchedulerAction.TransportAction transportAction; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ScheduledJobRunner scheduledJobRunner) { + ClusterService clusterService, InternalStartSchedulerAction.TransportAction transportAction) { super(settings, StartSchedulerAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); - this.scheduledJobRunner = scheduledJobRunner; - } - - @Override - protected void doExecute(Task task, Request request, ActionListener listener) { - SchedulerTask schedulerTask = (SchedulerTask) task; - scheduledJobRunner.run(request.getSchedulerId(), request.getStartTime(), request.getEndTime(), schedulerTask, (error) -> { - if (error != null) { - listener.onFailure(error); - } else { - listener.onResponse(new Response()); - } - }); + this.clusterService = clusterService; + this.schedulerStatusObserver = new SchedulerStatusObserver(threadPool, clusterService); + this.transportAction = transportAction; } @Override protected void doExecute(Request request, ActionListener listener) { - throw new UnsupportedOperationException("the task parameter is required"); + // This validation happens also in ScheduledJobRunner, the reason we do it here too is that if it fails there + // we are unable to provide the user immediate feedback. We would create the task and the validation would fail + // in the background, whereas now the validation failure is part of the response being returned. + MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE); + ScheduledJobRunner.validate(request.schedulerId, mlMetadata); + + InternalStartSchedulerAction.Request internalRequest = + new InternalStartSchedulerAction.Request(request.schedulerId, request.startTime); + internalRequest.setEndTime(request.endTime); + transportAction.execute(internalRequest, LoggingTaskListener.instance()); + schedulerStatusObserver.waitForStatus(request.schedulerId, request.startTimeout, SchedulerStatus.STARTED, e -> { + if (e != null) { + listener.onFailure(e); + } else { + listener.onResponse(new Response(true)); + } + + }); } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StopSchedulerAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StopSchedulerAction.java index 9e08dc0272d..40058d4dadb 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StopSchedulerAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/StopSchedulerAction.java @@ -33,11 +33,11 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.scheduler.SchedulerStatus; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; -import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig; import org.elasticsearch.xpack.ml.scheduler.Scheduler; +import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig; +import org.elasticsearch.xpack.ml.scheduler.SchedulerStatus; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.SchedulerStatusObserver; @@ -170,7 +170,7 @@ public class StopSchedulerAction validate(schedulerId, mlMetadata); ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setActions(StartSchedulerAction.NAME); + listTasksRequest.setActions(InternalStartSchedulerAction.NAME); listTasksRequest.setDetailed(true); listTasksAction.execute(listTasksRequest, new ActionListener() { @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/schedulers/RestStartSchedulerAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/schedulers/RestStartSchedulerAction.java index 6e45ec9ad6f..76bb570ad0e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/schedulers/RestStartSchedulerAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/schedulers/RestStartSchedulerAction.java @@ -7,29 +7,19 @@ package org.elasticsearch.xpack.ml.rest.schedulers; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.tasks.LoggingTaskListener; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.action.StartSchedulerAction; import org.elasticsearch.xpack.ml.job.messages.Messages; -import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; -import org.elasticsearch.xpack.ml.scheduler.ScheduledJobRunner; import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig; -import org.elasticsearch.xpack.ml.scheduler.SchedulerStatus; -import org.elasticsearch.xpack.ml.utils.SchedulerStatusObserver; import java.io.IOException; @@ -37,15 +27,9 @@ public class RestStartSchedulerAction extends BaseRestHandler { private static final String DEFAULT_START = "0"; - private final ClusterService clusterService; - private final SchedulerStatusObserver schedulerStatusObserver; - @Inject - public RestStartSchedulerAction(Settings settings, RestController controller, ThreadPool threadPool, - ClusterService clusterService) { + public RestStartSchedulerAction(Settings settings, RestController controller) { super(settings); - this.clusterService = clusterService; - this.schedulerStatusObserver = new SchedulerStatusObserver(threadPool, clusterService); controller.registerHandler(RestRequest.Method.POST, MlPlugin.BASE_PATH + "schedulers/{" + SchedulerConfig.ID.getPreferredName() + "}/_start", this); } @@ -53,13 +37,6 @@ public class RestStartSchedulerAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String schedulerId = restRequest.param(SchedulerConfig.ID.getPreferredName()); - - // This validation happens also in ScheduledJobRunner, the reason we do it here too is that if it fails there - // we are unable to provide the user immediate feedback. We would create the task and the validation would fail - // in the background, whereas now the validation failure is part of the response being returned. - MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE); - ScheduledJobRunner.validate(schedulerId, mlMetadata); - StartSchedulerAction.Request jobSchedulerRequest; if (restRequest.hasContentOrSourceParam()) { XContentParser parser = restRequest.contentOrSourceParamParser(); @@ -74,29 +51,12 @@ public class RestStartSchedulerAction extends BaseRestHandler { } jobSchedulerRequest = new StartSchedulerAction.Request(schedulerId, startTimeMillis); jobSchedulerRequest.setEndTime(endTimeMillis); + TimeValue startTimeout = restRequest.paramAsTime(StartSchedulerAction.START_TIMEOUT.getPreferredName(), + TimeValue.timeValueSeconds(30)); + jobSchedulerRequest.setStartTimeout(startTimeout); } - TimeValue startTimeout = restRequest.paramAsTime("start_timeout", TimeValue.timeValueSeconds(30)); return channel -> { - Task task = client.executeLocally(StartSchedulerAction.INSTANCE, jobSchedulerRequest, LoggingTaskListener.instance()); - schedulerStatusObserver.waitForStatus(schedulerId, startTimeout, SchedulerStatus.STARTED, e -> { - if (e != null) { - try { - channel.sendResponse(new BytesRestResponse(channel, e)); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } else { - try (XContentBuilder builder = channel.newBuilder()) { - builder.startObject(); - builder.field("task", clusterService.localNode().getId() + ":" + task.getId()); - builder.endObject(); - channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - - }); + client.execute(StartSchedulerAction.INSTANCE, jobSchedulerRequest, new RestToXContentListener<>(channel)); }; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java index bd43cf97700..c85edb73bda 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunner.java @@ -17,7 +17,7 @@ import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MlPlugin; -import org.elasticsearch.xpack.ml.action.StartSchedulerAction; +import org.elasticsearch.xpack.ml.action.InternalStartSchedulerAction; import org.elasticsearch.xpack.ml.action.UpdateSchedulerStatusAction; import org.elasticsearch.xpack.ml.job.DataCounts; import org.elasticsearch.xpack.ml.job.DataDescription; @@ -61,7 +61,7 @@ public class ScheduledJobRunner extends AbstractComponent { this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); } - public void run(String schedulerId, long startTime, Long endTime, StartSchedulerAction.SchedulerTask task, + public void run(String schedulerId, long startTime, Long endTime, InternalStartSchedulerAction.SchedulerTask task, Consumer handler) { MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE); validate(schedulerId, mlMetadata); @@ -185,7 +185,7 @@ public class ScheduledJobRunner extends AbstractComponent { } private Holder createJobScheduler(Scheduler scheduler, Job job, long finalBucketEndMs, long latestRecordTimeMs, - Consumer handler, StartSchedulerAction.SchedulerTask task) { + Consumer handler, InternalStartSchedulerAction.SchedulerTask task) { Auditor auditor = jobProvider.audit(job.getId()); Duration frequency = getFrequencyOrDefault(scheduler, job); Duration queryDelay = Duration.ofSeconds(scheduler.getConfig().getQueryDelay()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java index bdd8a51f668..6947c6ad3ef 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java @@ -39,12 +39,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.xpack.ml.integration.TooManyJobsIT.ensureClusterStateConsistencyWorkAround; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(numDataNodes = 1) public class ScheduledJobsIT extends ESIntegTestCase { @@ -94,7 +92,9 @@ public class ScheduledJobsIT extends ESIntegTestCase { StartSchedulerAction.Request startSchedulerRequest = new StartSchedulerAction.Request(schedulerConfig.getId(), 0L); startSchedulerRequest.setEndTime(now); - client().execute(StartSchedulerAction.INSTANCE, startSchedulerRequest).get(); + StartSchedulerAction.Response startSchedulerResponse = + client().execute(StartSchedulerAction.INSTANCE, startSchedulerRequest).get(); + assertTrue(startSchedulerResponse.isStarted()); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2)); @@ -127,16 +127,10 @@ public class ScheduledJobsIT extends ESIntegTestCase { PutSchedulerAction.Response putSchedulerResponse = client().execute(PutSchedulerAction.INSTANCE, putSchedulerRequest).get(); assertTrue(putSchedulerResponse.isAcknowledged()); - AtomicReference errorHolder = new AtomicReference<>(); - Thread t = new Thread(() -> { - try { - StartSchedulerAction.Request startSchedulerRequest = new StartSchedulerAction.Request(schedulerConfig.getId(), 0L); + StartSchedulerAction.Request startSchedulerRequest = new StartSchedulerAction.Request(schedulerConfig.getId(), 0L); + StartSchedulerAction.Response startSchedulerResponse = client().execute(StartSchedulerAction.INSTANCE, startSchedulerRequest).get(); - } catch (Exception | AssertionError e) { - errorHolder.set(e); - } - }); - t.start(); + assertTrue(startSchedulerResponse.isStarted()); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1)); @@ -160,7 +154,6 @@ public class ScheduledJobsIT extends ESIntegTestCase { .getState().metaData().custom(MlMetadata.TYPE); assertThat(mlMetadata.getScheduler(schedulerConfig.getId()).get().getStatus(), equalTo(SchedulerStatus.STOPPED)); }); - assertThat(errorHolder.get(), nullValue()); } private void indexDocs(String index, long numDocs, long start, long end) { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledJobIT.java index f8220a4e787..8f4e938def7 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledJobIT.java @@ -150,7 +150,7 @@ public class ScheduledJobIT extends ESRestTestCase { Response response = client().performRequest("post", MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_start?start=2016-06-01T00:00:00Z"); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); - assertThat(responseEntityToString(response), containsString("{\"task\":\"")); + assertThat(responseEntityToString(response), equalTo("{\"started\":true}")); assertBusy(() -> { try { Response getJobResponse = client().performRequest("get", @@ -246,7 +246,7 @@ public class ScheduledJobIT extends ESRestTestCase { Response startSchedulerRequest = client().performRequest("post", MlPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z"); assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200)); - assertThat(responseEntityToString(startSchedulerRequest), containsString("{\"task\":\"")); + assertThat(responseEntityToString(startSchedulerRequest), equalTo("{\"started\":true}")); assertBusy(() -> { try { Response schedulerStatsResponse = client().performRequest("get", diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/rest/schedulers/RestStartJobSchedulerActionTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/rest/schedulers/RestStartJobSchedulerActionTests.java index e833d6868f9..e86e5ae28ee 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/rest/schedulers/RestStartJobSchedulerActionTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/rest/schedulers/RestStartJobSchedulerActionTests.java @@ -7,20 +7,13 @@ package org.elasticsearch.xpack.ml.rest.schedulers; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.FakeRestRequest; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.job.Job; -import org.elasticsearch.xpack.ml.job.JobStatus; -import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.scheduler.ScheduledJobRunnerTests; import org.elasticsearch.xpack.ml.scheduler.SchedulerConfig; @@ -28,24 +21,13 @@ import java.util.HashMap; import java.util.Map; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class RestStartJobSchedulerActionTests extends ESTestCase { public void testPrepareRequest() throws Exception { - ClusterService clusterService = mock(ClusterService.class); Job.Builder job = ScheduledJobRunnerTests.createScheduledJob(); SchedulerConfig schedulerConfig = ScheduledJobRunnerTests.createSchedulerConfig("foo-scheduler", "foo").build(); - MlMetadata mlMetadata = new MlMetadata.Builder() - .putJob(job.build(), false) - .putScheduler(schedulerConfig) - .updateStatus("foo", JobStatus.OPENED, null) - .build(); - when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata)) - .build()); - RestStartSchedulerAction action = new RestStartSchedulerAction(Settings.EMPTY, mock(RestController.class), - mock(ThreadPool.class), clusterService); + RestStartSchedulerAction action = new RestStartSchedulerAction(Settings.EMPTY, mock(RestController.class)); Map params = new HashMap<>(); params.put("start", "not-a-date"); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunnerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunnerTests.java index c3b7798346b..4e1ff3a7a55 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunnerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/scheduler/ScheduledJobRunnerTests.java @@ -20,8 +20,8 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MlPlugin; import org.elasticsearch.xpack.ml.action.FlushJobAction; +import org.elasticsearch.xpack.ml.action.InternalStartSchedulerAction; import org.elasticsearch.xpack.ml.action.PostDataAction; -import org.elasticsearch.xpack.ml.action.StartSchedulerAction; import org.elasticsearch.xpack.ml.action.UpdateSchedulerStatusAction; import org.elasticsearch.xpack.ml.job.AnalysisConfig; import org.elasticsearch.xpack.ml.job.DataCounts; @@ -141,7 +141,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { when(dataExtractor.next()).thenReturn(Optional.of(in)); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); - StartSchedulerAction.SchedulerTask task = mock(StartSchedulerAction.SchedulerTask.class); + InternalStartSchedulerAction.SchedulerTask task = mock(InternalStartSchedulerAction.SchedulerTask.class); scheduledJobRunner.run("scheduler1", 0L, 60000L, task, handler); verify(threadPool, times(1)).executor(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME); @@ -181,7 +181,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { when(dataExtractor.next()).thenThrow(new RuntimeException("dummy")); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); - StartSchedulerAction.SchedulerTask task = mock(StartSchedulerAction.SchedulerTask.class); + InternalStartSchedulerAction.SchedulerTask task = mock(InternalStartSchedulerAction.SchedulerTask.class); scheduledJobRunner.run("scheduler1", 0L, 60000L, task, handler); verify(threadPool, times(1)).executor(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME); @@ -214,7 +214,8 @@ public class ScheduledJobRunnerTests extends ESTestCase { when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); boolean cancelled = randomBoolean(); - StartSchedulerAction.SchedulerTask task = new StartSchedulerAction.SchedulerTask(1, "type", "action", null, "scheduler1"); + InternalStartSchedulerAction.SchedulerTask task = + new InternalStartSchedulerAction.SchedulerTask(1, "type", "action", null, "scheduler1"); scheduledJobRunner.run("scheduler1", 0L, null, task, handler); verify(threadPool, times(1)).executor(MlPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME);