Changed rest start scheduler api to wait with returning a response until the the scheduler status has been set to STARTED and

changed stop scheduler api to wait with returning a response until the scheduler status has been set to STOPPED.

Original commit: elastic/x-pack-elasticsearch@e20fcd1ae9
This commit is contained in:
Martijn van Groningen 2016-12-13 17:00:12 +01:00
parent 3b487a268b
commit 985bdf6cb1
10 changed files with 183 additions and 50 deletions

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
@ -38,6 +39,7 @@ import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig;
import org.elasticsearch.xpack.prelert.job.metadata.Scheduler;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import org.elasticsearch.xpack.prelert.utils.SchedulerStatusObserver;
import java.io.IOException;
import java.util.Objects;
@ -65,6 +67,7 @@ public class StopSchedulerAction
public static class Request extends ActionRequest {
private String schedulerId;
private TimeValue stopTimeout = TimeValue.timeValueSeconds(30);
public Request(String jobId) {
this.schedulerId = ExceptionsHelper.requireNonNull(jobId, SchedulerConfig.ID.getPreferredName());
@ -77,6 +80,10 @@ public class StopSchedulerAction
return schedulerId;
}
public void setStopTimeout(TimeValue stopTimeout) {
this.stopTimeout = stopTimeout;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -141,16 +148,19 @@ public class StopSchedulerAction
private final ClusterService clusterService;
private final TransportListTasksAction listTasksAction;
private final TransportCancelTasksAction cancelTasksAction;
private final SchedulerStatusObserver schedulerStatusObserver;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
TransportCancelTasksAction cancelTasksAction, TransportListTasksAction listTasksAction) {
ClusterService clusterService, TransportCancelTasksAction cancelTasksAction,
TransportListTasksAction listTasksAction) {
super(settings, StopSchedulerAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.clusterService = clusterService;
this.listTasksAction = listTasksAction;
this.cancelTasksAction = cancelTasksAction;
this.schedulerStatusObserver = new SchedulerStatusObserver(threadPool, clusterService);
}
@Override
@ -173,7 +183,13 @@ public class StopSchedulerAction
cancelTasksAction.execute(cancelTasksRequest, new ActionListener<CancelTasksResponse>() {
@Override
public void onResponse(CancelTasksResponse cancelTasksResponse) {
listener.onResponse(new Response());
schedulerStatusObserver.waitForStatus(schedulerId, request.stopTimeout, SchedulerStatus.STOPPED, e -> {
if (e != null) {
listener.onFailure(e);
} else {
listener.onResponse(new Response());
}
});
}
@Override
@ -193,6 +209,8 @@ public class StopSchedulerAction
}
});
}
}
static void validate(String schedulerId, PrelertMetadata prelertMetadata) {

View File

@ -11,6 +11,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
@ -22,12 +23,15 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.StartSchedulerAction;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig;
import org.elasticsearch.xpack.prelert.job.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunner;
import org.elasticsearch.xpack.prelert.utils.SchedulerStatusObserver;
import java.io.IOException;
@ -36,11 +40,14 @@ public class RestStartSchedulerAction extends BaseRestHandler {
private static final String DEFAULT_START = "0";
private final ClusterService clusterService;
private final SchedulerStatusObserver schedulerStatusObserver;
@Inject
public RestStartSchedulerAction(Settings settings, RestController controller, ClusterService clusterService) {
public RestStartSchedulerAction(Settings settings, RestController controller, ThreadPool threadPool,
ClusterService clusterService) {
super(settings);
this.clusterService = clusterService;
this.schedulerStatusObserver = new SchedulerStatusObserver(threadPool, clusterService);
controller.registerHandler(RestRequest.Method.POST,
PrelertPlugin.BASE_PATH + "schedulers/{" + SchedulerConfig.ID.getPreferredName() + "}/_start", this);
}
@ -71,17 +78,28 @@ public class RestStartSchedulerAction extends BaseRestHandler {
jobSchedulerRequest = new StartSchedulerAction.Request(schedulerId, startTimeMillis);
jobSchedulerRequest.setEndTime(endTimeMillis);
}
return sendTask(client.executeLocally(StartSchedulerAction.INSTANCE, jobSchedulerRequest, LoggingTaskListener.instance()));
}
private RestChannelConsumer sendTask(Task task) throws IOException {
TimeValue startTimeout = restRequest.paramAsTime("start_timeout", TimeValue.timeValueSeconds(30));
return channel -> {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
builder.field("task", clusterService.localNode().getId() + ":" + task.getId());
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}
Task task = client.executeLocally(StartSchedulerAction.INSTANCE, jobSchedulerRequest, LoggingTaskListener.instance());
schedulerStatusObserver.waitForStatus(schedulerId, startTimeout, SchedulerStatus.STARTED, e -> {
if (e != null) {
try {
channel.sendResponse(new BytesRestResponse(channel, e));
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
} else {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
builder.field("task", clusterService.localNode().getId() + ":" + task.getId());
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
});
};
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.prelert.rest.schedulers;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
@ -35,6 +36,9 @@ public class RestStopSchedulerAction extends BaseRestHandler {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
StopSchedulerAction.Request jobSchedulerRequest = new StopSchedulerAction.Request(
restRequest.param(SchedulerConfig.ID.getPreferredName()));
if (restRequest.hasParam("stop_timeout")) {
jobSchedulerRequest.setStopTimeout(TimeValue.parseTimeValue(restRequest.param("stop_timeout"), "stop_timeout"));
}
return channel -> transportJobSchedulerAction.execute(jobSchedulerRequest, new AcknowledgedRestListener<>(channel));
}
}

View File

@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.utils;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.job.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.metadata.Scheduler;
import java.util.function.Consumer;
public class SchedulerStatusObserver {
private static final Logger LOGGER = Loggers.getLogger(SchedulerStatusObserver.class);
private final ThreadPool threadPool;
private final ClusterService clusterService;
public SchedulerStatusObserver(ThreadPool threadPool, ClusterService clusterService) {
this.threadPool = threadPool;
this.clusterService = clusterService;
}
public void waitForStatus(String schedulerId, TimeValue waitTimeout, SchedulerStatus expectedStatus, Consumer<Exception> handler) {
ClusterStateObserver observer =
new ClusterStateObserver(clusterService, LOGGER, threadPool.getThreadContext());
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
handler.accept(null);
}
@Override
public void onClusterServiceClose() {
Exception e = new IllegalArgumentException("Cluster service closed while waiting for scheduler status to change to ["
+ expectedStatus + "]");
handler.accept(new IllegalStateException(e));
}
@Override
public void onTimeout(TimeValue timeout) {
Exception e = new IllegalArgumentException("Timeout expired while waiting for scheduler status to change to ["
+ expectedStatus + "]");
handler.accept(e);
}
}, new SchedulerStoppedPredicate(schedulerId, expectedStatus), waitTimeout);
}
private static class SchedulerStoppedPredicate implements ClusterStateObserver.ChangePredicate {
private final String schedulerId;
private final SchedulerStatus expectedStatus;
SchedulerStoppedPredicate(String schedulerId, SchedulerStatus expectedStatus) {
this.schedulerId = schedulerId;
this.expectedStatus = expectedStatus;
}
@Override
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus, ClusterState newState,
ClusterState.ClusterStateStatus newStatus) {
return apply(newState);
}
@Override
public boolean apply(ClusterChangedEvent changedEvent) {
return apply(changedEvent.state());
}
boolean apply(ClusterState newState) {
PrelertMetadata metadata = newState.getMetaData().custom(PrelertMetadata.TYPE);
if (metadata != null) {
Scheduler scheduler = metadata.getScheduler(schedulerId);
if (scheduler != null) {
return scheduler.getStatus() == expectedStatus;
}
}
return false;
}
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.prelert.action.StopSchedulerAction.Request;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig;
@ -14,7 +15,6 @@ import org.elasticsearch.xpack.prelert.job.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder;
import static org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunnerTests.createScheduledJob;
import static org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunnerTests.createSchedulerConfig;
import static org.hamcrest.Matchers.equalTo;
@ -23,7 +23,9 @@ public class StopJobSchedulerActionRequestTests extends AbstractStreamableTestCa
@Override
protected Request createTestInstance() {
return new Request(randomAsciiOfLengthBetween(1, 20));
Request r = new Request(randomAsciiOfLengthBetween(1, 20));
r.setStopTimeout(TimeValue.timeValueSeconds(randomIntBetween(0, 999)));
return r;
}
@Override

View File

@ -6,11 +6,9 @@
package org.elasticsearch.xpack.prelert.integration;
import org.apache.http.entity.StringEntity;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
@ -51,7 +49,6 @@ public class ScheduledJobIT extends ESRestTestCase {
throw new RuntimeException(e);
}
});
waitForSchedulerStoppedState(client(), jobId);
}
public void testStartJobScheduler_GivenRealtime() throws Exception {
@ -69,9 +66,8 @@ public class ScheduledJobIT extends ESRestTestCase {
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats",
Collections.singletonMap("metric", "data_counts,status"));
Collections.singletonMap("metric", "data_counts"));
String responseAsString = responseEntityToString(getJobResponse);
assertThat(responseAsString, containsString("\"status\":\"OPENED\""));
assertThat(responseAsString, containsString("\"input_record_count\":2"));
} catch (Exception e1) {
throw new RuntimeException(e1);
@ -88,7 +84,6 @@ public class ScheduledJobIT extends ESRestTestCase {
response = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + schedulerId + "/_stop");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
waitForSchedulerStoppedState(client(), jobId);
client().performRequest("POST", "/_xpack/prelert/anomaly_detectors/" + jobId + "/_close");
@ -139,25 +134,6 @@ public class ScheduledJobIT extends ESRestTestCase {
}
}
private static void waitForSchedulerStoppedState(RestClient client, String jobId) throws Exception {
try {
assertBusy(() -> {
try {
Response getJobResponse = client.performRequest("get",
PrelertPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats",
Collections.singletonMap("metric", "scheduler_status"));
assertThat(responseEntityToString(getJobResponse), containsString("\"scheduler_status\":\"STOPPED\""));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (AssertionError e) {
Response response = client.performRequest("get", "/_nodes/hotthreads");
Logger logger = Loggers.getLogger(ScheduledJobIT.class);
logger.info("hot_threads: {}", responseEntityToString(response));
}
}
@After
public void clearPrelertState() throws IOException {
clearPrelertMetadata(adminClient());
@ -181,10 +157,8 @@ public class ScheduledJobIT extends ESRestTestCase {
for (Map<String, Object> scheduler : schedulers) {
Map<String, Object> schedulerMap = (Map<String, Object>) scheduler.get("config");
String schedulerId = (String) schedulerMap.get("scheduler_id");
String jobId = (String) schedulerMap.get("job_id");
try {
client.performRequest("POST", "/_xpack/prelert/schedulers/" + schedulerId + "/_stop");
waitForSchedulerStoppedState(client, jobId);
} catch (Exception e) {
// ignore
}

View File

@ -16,6 +16,8 @@ import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig;
@ -42,7 +44,8 @@ public class RestStartJobSchedulerActionTests extends ESTestCase {
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata))
.build());
RestStartSchedulerAction action = new RestStartSchedulerAction(Settings.EMPTY, mock(RestController.class), clusterService);
RestStartSchedulerAction action = new RestStartSchedulerAction(Settings.EMPTY, mock(RestController.class),
mock(ThreadPool.class), clusterService);
Map<String, String> params = new HashMap<>();
params.put("start", "not-a-date");

View File

@ -21,6 +21,10 @@
"type": "string",
"required": false,
"description": "The end time when the scheduler should stop. When not set, the scheduler continues in real time"
},
"start_timeout": {
"type": "time",
"description": "Controls the time to wait until a scheduler has started. Default to 30 seconds"
}
}
},

View File

@ -1,17 +1,25 @@
{
"xpack.prelert.stop_scheduler": {
"methods": [ "POST" ],
"methods": [
"POST"
],
"url": {
"path": "/_xpack/prelert/schedulers/{scheduler_id}/_stop",
"paths": [ "/_xpack/prelert/schedulers/{scheduler_id}/_stop" ],
"paths": [
"/_xpack/prelert/schedulers/{scheduler_id}/_stop"
],
"parts": {
"scheduler_id": {
"type": "string",
"required": true,
"description": "The ID of the scheduler to stop"
},
"stop_timeout": {
"type": "time",
"description": "Controls the time to wait until a scheduler has stopped. Default to 30 seconds"
}
}
},
"body": null
},
"body": null
}
}
}

View File

@ -33,10 +33,19 @@ setup:
xpack.prelert.start_scheduler:
"scheduler_id": "scheduler-1"
"start": 0
- do:
xpack.prelert.get_jobs:
job_id: "scheduled-job"
metric: "scheduler_status"
- match: { jobs.0.scheduler_status: STARTED }
- do:
xpack.prelert.stop_scheduler:
"scheduler_id": "scheduler-1"
- do:
xpack.prelert.get_jobs:
job_id: "scheduled-job"
metric: "scheduler_status"
- match: { jobs.0.scheduler_status: STOPPED }
---
"Test start non existing scheduler":
- do: