From a627e6621c891906027af13135b025daf47491ed Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 24 Nov 2016 21:50:31 +0100 Subject: [PATCH] Re-enabled disabled scheduler tests. Main issues seemed to be that the state was sometimes set back from STOPPED to STOPPING and stop scheduler would close the job without updating the status. Also when executing lookback / realtime searches keep `Future` instances around, so that we can cancel the opertion when a job gets closed. Closes elastic/elasticsearch#381 Original commit: elastic/x-pack-elasticsearch@9773ff38107543680863bb0c5b9726efcc5a1674 --- .../BlackHoleAutodetectProcess.java | 3 +- .../job/scheduler/ScheduledJobService.java | 34 +++-- .../prelert/integration/ScheduledJobIT.java | 119 ++++++++++-------- .../scheduler/ScheduledJobServiceTests.java | 8 +- 4 files changed, 87 insertions(+), 77 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/BlackHoleAutodetectProcess.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/BlackHoleAutodetectProcess.java index 9fff7c4e642..84c7d6de765 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -14,7 +14,6 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadPar import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.prelert.job.results.AutodetectResult; -import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.PipedInputStream; @@ -29,7 +28,7 @@ import java.time.ZonedDateTime; * message is expected on the {@link #getProcessOutStream()} stream. This class writes the flush * acknowledgement immediately. */ -public class BlackHoleAutodetectProcess implements AutodetectProcess, Closeable { +public class BlackHoleAutodetectProcess implements AutodetectProcess { private static final Logger LOGGER = Loggers.getLogger(BlackHoleAutodetectProcess.class); private static final String FLUSH_ID = "flush-1"; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobService.java index d58993ffaeb..5b5f107a7b3 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobService.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.prelert.PrelertPlugin; import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction; @@ -31,9 +32,9 @@ import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; import org.elasticsearch.xpack.prelert.job.results.Bucket; import java.time.Duration; -import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; import java.util.function.Supplier; public class ScheduledJobService extends AbstractComponent { @@ -77,7 +78,7 @@ public class ScheduledJobService extends AbstractComponent { Holder holder = createJobScheduler(job); registry.put(job.getId(), holder); - threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME).execute(() -> { + holder.future = threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME).submit(() -> { try { Long next = holder.scheduledJob.runLookBack(allocation.getSchedulerState()); if (next != null) { @@ -113,30 +114,22 @@ public class ScheduledJobService extends AbstractComponent { schedulerState.getStatus() + "] instead"); } - if (registry.containsKey(allocation.getJobId()) == false) { + Holder holder = registry.remove(allocation.getJobId()); + if (holder == null) { throw new IllegalStateException("job [" + allocation.getJobId() + "] has not been started"); } - logger.info("Stopping scheduler for job [{}]", allocation.getJobId()); - Holder holder = registry.remove(allocation.getJobId()); - holder.scheduledJob.stop(); - dataProcessor.closeJob(allocation.getJobId()); + holder.stop(); + // Don't close the job directly without going via the close data api to change the job status: +// dataProcessor.closeJob(allocation.getJobId()); setJobSchedulerStatus(allocation.getJobId(), JobSchedulerStatus.STOPPED); } - public void stopAllJobs() { - for (Map.Entry entry : registry.entrySet()) { - entry.getValue().scheduledJob.stop(); - dataProcessor.closeJob(entry.getKey()); - } - registry.clear(); - } - private void doScheduleRealtime(long delayInMsSinceEpoch, String jobId, Holder holder) { if (holder.scheduledJob.isRunning()) { TimeValue delay = computeNextDelay(delayInMsSinceEpoch); logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId); - threadPool.schedule(delay, PrelertPlugin.SCHEDULER_THREAD_POOL_NAME, () -> { + holder.future = threadPool.schedule(delay, PrelertPlugin.SCHEDULER_THREAD_POOL_NAME, () -> { long nextDelayInMsSinceEpoch; try { nextDelayInMsSinceEpoch = holder.scheduledJob.runRealtime(); @@ -161,8 +154,6 @@ public class ScheduledJobService extends AbstractComponent { holder.problemTracker.finishReport(); doScheduleRealtime(nextDelayInMsSinceEpoch, jobId, holder); }); - } else { - requestStopping(jobId); } } @@ -243,10 +234,17 @@ public class ScheduledJobService extends AbstractComponent { private final ScheduledJob scheduledJob; private final ProblemTracker problemTracker; + volatile Future future; private Holder(ScheduledJob scheduledJob, ProblemTracker problemTracker) { this.scheduledJob = scheduledJob; this.problemTracker = problemTracker; } + + void stop() { + scheduledJob.stop(); + FutureUtils.cancel(future); + } + } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java index 54e649085ec..ae16b6dbc47 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/ScheduledJobIT.java @@ -7,11 +7,11 @@ package org.elasticsearch.xpack.prelert.integration; import org.apache.http.HttpHost; import org.apache.http.entity.StringEntity; -import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.ElasticsearchStatusException; +import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.prelert.PrelertPlugin; @@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; @@ -32,81 +31,71 @@ import static org.hamcrest.Matchers.equalTo; public class ScheduledJobIT extends ESRestTestCase { - @AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/381") public void testStartJobScheduler_GivenMissingJob() { ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/invalid-job/_start")); assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/381") public void testStartJobScheduler_GivenNonScheduledJob() throws Exception { - createNonScheduledJob(); + String jobId = "_id1"; + createNonScheduledJob(jobId); ResponseException e = expectThrows(ResponseException.class, - () -> client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/non-scheduled/_start")); + () -> client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start")); assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400)); String responseAsString = responseEntityToString(e.getResponse()); - assertThat(responseAsString, containsString("\"reason\":\"There is no job 'non-scheduled' with a scheduler configured\"")); + assertThat(responseAsString, containsString("\"reason\":\"There is no job '" + jobId + "' with a scheduler configured\"")); } - @AwaitsFix(bugUrl = "The lookback is sometimes too quick and then we fail to see that the scheduler_state to see is STARTED. " + - "We need to find a different way to assert this.") public void testStartJobScheduler_GivenLookbackOnly() throws Exception { + String jobId = "_id2"; createAirlineDataIndex(); - createScheduledJob(); + createScheduledJob(jobId); - Response response = client().performRequest("post", - PrelertPlugin.BASE_PATH + "schedulers/scheduled/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z"); - assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); - assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}")); + Response startSchedulerRequest = client().performRequest("post", + PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z"); + assertThat(startSchedulerRequest.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(responseEntityToString(startSchedulerRequest), equalTo("{\"acknowledged\":true}")); + waitForSchedulerStartedState(jobId); assertBusy(() -> { try { - Response response2 = client().performRequest("get", "/_cluster/state", - Collections.singletonMap("filter_path", "metadata.prelert.allocations.scheduler_state")); - assertThat(responseEntityToString(response2), containsString("\"status\":\"STARTED\"")); + Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId, + Collections.singletonMap("metric", "data_counts")); + assertThat(responseEntityToString(getJobResponse), containsString("\"input_record_count\":2")); } catch (Exception e) { throw new RuntimeException(e); } }); - waitForSchedulerToBeStopped(); + waitForSchedulerStoppedState(client(), jobId); } - @AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/381") public void testStartJobScheduler_GivenRealtime() throws Exception { + String jobId = "_id3"; createAirlineDataIndex(); - createScheduledJob(); + createScheduledJob(jobId); Response response = client().performRequest("post", - PrelertPlugin.BASE_PATH + "schedulers/scheduled/_start?start=2016-06-01T00:00:00Z"); + PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_start?start=2016-06-01T00:00:00Z"); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}")); - - assertBusy(() -> { - try { - Response response2 = client().performRequest("get", "/_cluster/state", - Collections.singletonMap("filter_path", "metadata.prelert.allocations.scheduler_state")); - assertThat(responseEntityToString(response2), containsString("\"status\":\"STARTED\"")); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + waitForSchedulerStartedState(jobId); ResponseException e = expectThrows(ResponseException.class, - () -> client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/scheduled")); + () -> client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/" + jobId)); response = e.getResponse(); assertThat(response.getStatusLine().getStatusCode(), equalTo(409)); - assertThat(responseEntityToString(response), containsString("Cannot delete job 'scheduled' while the scheduler is running")); + assertThat(responseEntityToString(response), containsString("Cannot delete job '" + jobId + "' while the scheduler is running")); - response = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/scheduled/_stop"); + response = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_stop"); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}")); - waitForSchedulerToBeStopped(); + waitForSchedulerStoppedState(client(), jobId); - response = client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/scheduled"); + response = client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/" + jobId); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}")); } @@ -118,15 +107,15 @@ public class ScheduledJobIT extends ESRestTestCase { client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(airlineDataMappings)); client().performRequest("put", "airline-data/response/1", Collections.emptyMap(), - new StringEntity("{\"time\":\"2016-10-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}")); + new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}")); client().performRequest("put", "airline-data/response/2", Collections.emptyMap(), - new StringEntity("{\"time\":\"2016-10-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}")); + new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}")); client().performRequest("post", "airline-data/_refresh"); } - private Response createNonScheduledJob() throws Exception { - String job = "{\n" + " \"jobId\":\"non-scheduled\",\n" + " \"description\":\"Analysis of response time by airline\",\n" + private Response createNonScheduledJob(String id) throws Exception { + String job = "{\n" + " \"jobId\":\"" + id + "\",\n" + " \"description\":\"Analysis of response time by airline\",\n" + " \"analysisConfig\" : {\n" + " \"bucketSpan\":3600,\n" + " \"detectors\" :[{\"function\":\"mean\",\"fieldName\":\"responsetime\",\"byFieldName\":\"airline\"}]\n" + " },\n" + " \"dataDescription\" : {\n" + " \"fieldDelimiter\":\",\",\n" + " \"timeField\":\"time\",\n" @@ -135,9 +124,9 @@ public class ScheduledJobIT extends ESRestTestCase { return client().performRequest("put", PrelertPlugin.BASE_PATH + "jobs", Collections.emptyMap(), new StringEntity(job)); } - private Response createScheduledJob() throws Exception { + private Response createScheduledJob(String id) throws Exception { HttpHost httpHost = getClusterHosts().get(0); - String job = "{\n" + " \"jobId\":\"scheduled\",\n" + " \"description\":\"Analysis of response time by airline\",\n" + String job = "{\n" + " \"jobId\":\"" + id + "\",\n" + " \"description\":\"Analysis of response time by airline\",\n" + " \"analysisConfig\" : {\n" + " \"bucketSpan\":3600,\n" + " \"detectors\" :[{\"function\":\"mean\",\"fieldName\":\"responsetime\",\"byFieldName\":\"airline\"}]\n" + " },\n" + " \"dataDescription\" : {\n" + " \"format\":\"ELASTICSEARCH\",\n" @@ -155,16 +144,39 @@ public class ScheduledJobIT extends ESRestTestCase { } } - private void waitForSchedulerToBeStopped() throws Exception { - assertBusy(() -> { - try { - Response response = client().performRequest("get", "/_cluster/state", - Collections.singletonMap("filter_path", "metadata.prelert.allocations.scheduler_state")); - assertThat(responseEntityToString(response), containsString("\"status\":\"STOPPED\"")); - } catch (Exception e) { - fail(); - } - }, 1500, TimeUnit.MILLISECONDS); + private static void waitForSchedulerStoppedState(RestClient client, String jobId) throws Exception { + try { + assertBusy(() -> { + try { + Response getJobResponse = client.performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId, + Collections.singletonMap("metric", "scheduler_state")); + assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"STOPPED\"")); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } catch (AssertionError e) { + Response response = client.performRequest("get", "/_nodes/hotthreads"); + Logger logger = Loggers.getLogger(ScheduledJobIT.class); + logger.info("hot_threads: {}", responseEntityToString(response)); + } + } + + private void waitForSchedulerStartedState(String jobId) throws Exception { + try { + assertBusy(() -> { + try { + Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId, + Collections.singletonMap("metric", "scheduler_state")); + assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"STARTED\"")); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } catch (AssertionError e) { + Response response = client().performRequest("get", "/_nodes/hotthreads"); + logger.info("hot_threads: {}", responseEntityToString(response)); + } } @After @@ -186,6 +198,7 @@ public class ScheduledJobIT extends ESRestTestCase { String jobId = (String) jobConfig.get("jobId"); try { client.performRequest("POST", "/_xpack/prelert/schedulers/" + jobId + "/_stop"); + waitForSchedulerStoppedState(client, jobId); } catch (Exception e) { // ignore } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java index 1861bcab6e1..348812089c0 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java @@ -80,7 +80,7 @@ public class ScheduledJobServiceTests extends ESTestCase { doAnswer(invocation -> { ((Runnable) invocation.getArguments()[0]).run(); return null; - }).when(executorService).execute(any(Runnable.class)); + }).when(executorService).submit(any(Runnable.class)); when(threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME)).thenReturn(executorService); scheduledJobService = @@ -136,7 +136,7 @@ public class ScheduledJobServiceTests extends ESTestCase { allocation = new Allocation(allocation.getNodeId(), allocation.getJobId(), allocation.getStatus(), new SchedulerState(JobSchedulerStatus.STOPPING, 0L, 60000L)); scheduledJobService.stop(allocation); - verify(dataProcessor).closeJob("foo"); + verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); } public void testStop_GivenNonScheduledJob() { @@ -166,8 +166,8 @@ public class ScheduledJobServiceTests extends ESTestCase { scheduledJobService.stop(allocation2); // We stopped twice but the first time should have been ignored. We can assert that indirectly - // by verifying that the job was closed only once. - verify(dataProcessor, times(1)).closeJob("foo"); + // by verifying that the scheduler status was set to STOPPED. + verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); } private static Job.Builder createScheduledJob() {