Re-enabled disabled scheduler tests. Main issues seemed to be that the state was sometimes set back from STOPPED to STOPPING and stop scheduler would close the job without updating the status.

Also when executing lookback / realtime searches keep `Future` instances around, so that we can cancel the opertion when a job gets closed.

Closes elastic/elasticsearch#381

Original commit: elastic/x-pack-elasticsearch@9773ff3810
This commit is contained in:
Martijn van Groningen 2016-11-24 21:50:31 +01:00
parent 04968d3ee6
commit a627e6621c
4 changed files with 87 additions and 77 deletions

View File

@ -14,7 +14,6 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadPar
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.prelert.job.results.AutodetectResult;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
@ -29,7 +28,7 @@ import java.time.ZonedDateTime;
* message is expected on the {@link #getProcessOutStream()} stream. This class writes the flush
* acknowledgement immediately.
*/
public class BlackHoleAutodetectProcess implements AutodetectProcess, Closeable {
public class BlackHoleAutodetectProcess implements AutodetectProcess {
private static final Logger LOGGER = Loggers.getLogger(BlackHoleAutodetectProcess.class);
private static final String FLUSH_ID = "flush-1";

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction;
@ -31,9 +32,9 @@ import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.function.Supplier;
public class ScheduledJobService extends AbstractComponent {
@ -77,7 +78,7 @@ public class ScheduledJobService extends AbstractComponent {
Holder holder = createJobScheduler(job);
registry.put(job.getId(), holder);
threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME).execute(() -> {
holder.future = threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME).submit(() -> {
try {
Long next = holder.scheduledJob.runLookBack(allocation.getSchedulerState());
if (next != null) {
@ -113,30 +114,22 @@ public class ScheduledJobService extends AbstractComponent {
schedulerState.getStatus() + "] instead");
}
if (registry.containsKey(allocation.getJobId()) == false) {
Holder holder = registry.remove(allocation.getJobId());
if (holder == null) {
throw new IllegalStateException("job [" + allocation.getJobId() + "] has not been started");
}
logger.info("Stopping scheduler for job [{}]", allocation.getJobId());
Holder holder = registry.remove(allocation.getJobId());
holder.scheduledJob.stop();
dataProcessor.closeJob(allocation.getJobId());
holder.stop();
// Don't close the job directly without going via the close data api to change the job status:
// dataProcessor.closeJob(allocation.getJobId());
setJobSchedulerStatus(allocation.getJobId(), JobSchedulerStatus.STOPPED);
}
public void stopAllJobs() {
for (Map.Entry<String, Holder> entry : registry.entrySet()) {
entry.getValue().scheduledJob.stop();
dataProcessor.closeJob(entry.getKey());
}
registry.clear();
}
private void doScheduleRealtime(long delayInMsSinceEpoch, String jobId, Holder holder) {
if (holder.scheduledJob.isRunning()) {
TimeValue delay = computeNextDelay(delayInMsSinceEpoch);
logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId);
threadPool.schedule(delay, PrelertPlugin.SCHEDULER_THREAD_POOL_NAME, () -> {
holder.future = threadPool.schedule(delay, PrelertPlugin.SCHEDULER_THREAD_POOL_NAME, () -> {
long nextDelayInMsSinceEpoch;
try {
nextDelayInMsSinceEpoch = holder.scheduledJob.runRealtime();
@ -161,8 +154,6 @@ public class ScheduledJobService extends AbstractComponent {
holder.problemTracker.finishReport();
doScheduleRealtime(nextDelayInMsSinceEpoch, jobId, holder);
});
} else {
requestStopping(jobId);
}
}
@ -243,10 +234,17 @@ public class ScheduledJobService extends AbstractComponent {
private final ScheduledJob scheduledJob;
private final ProblemTracker problemTracker;
volatile Future<?> future;
private Holder(ScheduledJob scheduledJob, ProblemTracker problemTracker) {
this.scheduledJob = scheduledJob;
this.problemTracker = problemTracker;
}
void stop() {
scheduledJob.stop();
FutureUtils.cancel(future);
}
}
}

View File

@ -7,11 +7,11 @@ package org.elasticsearch.xpack.prelert.integration;
import org.apache.http.HttpHost;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ElasticsearchStatusException;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.containsString;
@ -32,81 +31,71 @@ import static org.hamcrest.Matchers.equalTo;
public class ScheduledJobIT extends ESRestTestCase {
@AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/381")
public void testStartJobScheduler_GivenMissingJob() {
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/invalid-job/_start"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/381")
public void testStartJobScheduler_GivenNonScheduledJob() throws Exception {
createNonScheduledJob();
String jobId = "_id1";
createNonScheduledJob(jobId);
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/non-scheduled/_start"));
() -> client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
String responseAsString = responseEntityToString(e.getResponse());
assertThat(responseAsString, containsString("\"reason\":\"There is no job 'non-scheduled' with a scheduler configured\""));
assertThat(responseAsString, containsString("\"reason\":\"There is no job '" + jobId + "' with a scheduler configured\""));
}
@AwaitsFix(bugUrl = "The lookback is sometimes too quick and then we fail to see that the scheduler_state to see is STARTED. " +
"We need to find a different way to assert this.")
public void testStartJobScheduler_GivenLookbackOnly() throws Exception {
String jobId = "_id2";
createAirlineDataIndex();
createScheduledJob();
createScheduledJob(jobId);
Response response = client().performRequest("post",
PrelertPlugin.BASE_PATH + "schedulers/scheduled/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
Response startSchedulerRequest = client().performRequest("post",
PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z");
assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(startSchedulerRequest), equalTo("{\"acknowledged\":true}"));
waitForSchedulerStartedState(jobId);
assertBusy(() -> {
try {
Response response2 = client().performRequest("get", "/_cluster/state",
Collections.singletonMap("filter_path", "metadata.prelert.allocations.scheduler_state"));
assertThat(responseEntityToString(response2), containsString("\"status\":\"STARTED\""));
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId,
Collections.singletonMap("metric", "data_counts"));
assertThat(responseEntityToString(getJobResponse), containsString("\"input_record_count\":2"));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
waitForSchedulerToBeStopped();
waitForSchedulerStoppedState(client(), jobId);
}
@AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/381")
public void testStartJobScheduler_GivenRealtime() throws Exception {
String jobId = "_id3";
createAirlineDataIndex();
createScheduledJob();
createScheduledJob(jobId);
Response response = client().performRequest("post",
PrelertPlugin.BASE_PATH + "schedulers/scheduled/_start?start=2016-06-01T00:00:00Z");
PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start?start=2016-06-01T00:00:00Z");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
assertBusy(() -> {
try {
Response response2 = client().performRequest("get", "/_cluster/state",
Collections.singletonMap("filter_path", "metadata.prelert.allocations.scheduler_state"));
assertThat(responseEntityToString(response2), containsString("\"status\":\"STARTED\""));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
waitForSchedulerStartedState(jobId);
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/scheduled"));
() -> client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/" + jobId));
response = e.getResponse();
assertThat(response.getStatusLine().getStatusCode(), equalTo(409));
assertThat(responseEntityToString(response), containsString("Cannot delete job 'scheduled' while the scheduler is running"));
assertThat(responseEntityToString(response), containsString("Cannot delete job '" + jobId + "' while the scheduler is running"));
response = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/scheduled/_stop");
response = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_stop");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
waitForSchedulerToBeStopped();
waitForSchedulerStoppedState(client(), jobId);
response = client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/scheduled");
response = client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/" + jobId);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
}
@ -118,15 +107,15 @@ public class ScheduledJobIT extends ESRestTestCase {
client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(airlineDataMappings));
client().performRequest("put", "airline-data/response/1", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-10-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}"));
new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}"));
client().performRequest("put", "airline-data/response/2", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-10-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}"));
new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}"));
client().performRequest("post", "airline-data/_refresh");
}
private Response createNonScheduledJob() throws Exception {
String job = "{\n" + " \"jobId\":\"non-scheduled\",\n" + " \"description\":\"Analysis of response time by airline\",\n"
private Response createNonScheduledJob(String id) throws Exception {
String job = "{\n" + " \"jobId\":\"" + id + "\",\n" + " \"description\":\"Analysis of response time by airline\",\n"
+ " \"analysisConfig\" : {\n" + " \"bucketSpan\":3600,\n"
+ " \"detectors\" :[{\"function\":\"mean\",\"fieldName\":\"responsetime\",\"byFieldName\":\"airline\"}]\n"
+ " },\n" + " \"dataDescription\" : {\n" + " \"fieldDelimiter\":\",\",\n" + " \"timeField\":\"time\",\n"
@ -135,9 +124,9 @@ public class ScheduledJobIT extends ESRestTestCase {
return client().performRequest("put", PrelertPlugin.BASE_PATH + "jobs", Collections.emptyMap(), new StringEntity(job));
}
private Response createScheduledJob() throws Exception {
private Response createScheduledJob(String id) throws Exception {
HttpHost httpHost = getClusterHosts().get(0);
String job = "{\n" + " \"jobId\":\"scheduled\",\n" + " \"description\":\"Analysis of response time by airline\",\n"
String job = "{\n" + " \"jobId\":\"" + id + "\",\n" + " \"description\":\"Analysis of response time by airline\",\n"
+ " \"analysisConfig\" : {\n" + " \"bucketSpan\":3600,\n"
+ " \"detectors\" :[{\"function\":\"mean\",\"fieldName\":\"responsetime\",\"byFieldName\":\"airline\"}]\n"
+ " },\n" + " \"dataDescription\" : {\n" + " \"format\":\"ELASTICSEARCH\",\n"
@ -155,16 +144,39 @@ public class ScheduledJobIT extends ESRestTestCase {
}
}
private void waitForSchedulerToBeStopped() throws Exception {
private static void waitForSchedulerStoppedState(RestClient client, String jobId) throws Exception {
try {
assertBusy(() -> {
try {
Response response = client().performRequest("get", "/_cluster/state",
Collections.singletonMap("filter_path", "metadata.prelert.allocations.scheduler_state"));
assertThat(responseEntityToString(response), containsString("\"status\":\"STOPPED\""));
Response getJobResponse = client.performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId,
Collections.singletonMap("metric", "scheduler_state"));
assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"STOPPED\""));
} catch (Exception e) {
fail();
throw new RuntimeException(e);
}
});
} catch (AssertionError e) {
Response response = client.performRequest("get", "/_nodes/hotthreads");
Logger logger = Loggers.getLogger(ScheduledJobIT.class);
logger.info("hot_threads: {}", responseEntityToString(response));
}
}
private void waitForSchedulerStartedState(String jobId) throws Exception {
try {
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId,
Collections.singletonMap("metric", "scheduler_state"));
assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"STARTED\""));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (AssertionError e) {
Response response = client().performRequest("get", "/_nodes/hotthreads");
logger.info("hot_threads: {}", responseEntityToString(response));
}
}, 1500, TimeUnit.MILLISECONDS);
}
@After
@ -186,6 +198,7 @@ public class ScheduledJobIT extends ESRestTestCase {
String jobId = (String) jobConfig.get("jobId");
try {
client.performRequest("POST", "/_xpack/prelert/schedulers/" + jobId + "/_stop");
waitForSchedulerStoppedState(client, jobId);
} catch (Exception e) {
// ignore
}

View File

@ -80,7 +80,7 @@ public class ScheduledJobServiceTests extends ESTestCase {
doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
}).when(executorService).submit(any(Runnable.class));
when(threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME)).thenReturn(executorService);
scheduledJobService =
@ -136,7 +136,7 @@ public class ScheduledJobServiceTests extends ESTestCase {
allocation = new Allocation(allocation.getNodeId(), allocation.getJobId(), allocation.getStatus(),
new SchedulerState(JobSchedulerStatus.STOPPING, 0L, 60000L));
scheduledJobService.stop(allocation);
verify(dataProcessor).closeJob("foo");
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any());
}
public void testStop_GivenNonScheduledJob() {
@ -166,8 +166,8 @@ public class ScheduledJobServiceTests extends ESTestCase {
scheduledJobService.stop(allocation2);
// We stopped twice but the first time should have been ignored. We can assert that indirectly
// by verifying that the job was closed only once.
verify(dataProcessor, times(1)).closeJob("foo");
// by verifying that the scheduler status was set to STOPPED.
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any());
}
private static Job.Builder createScheduledJob() {