diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java index 824301166ee..f96eb319a5d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java @@ -125,7 +125,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { public static final String NAME = "prelert"; public static final String BASE_PATH = "/_xpack/prelert/"; public static final String THREAD_POOL_NAME = NAME; - public static final String SCHEDULER_THREAD_POOL_NAME = NAME + "_scheduler"; + public static final String SCHEDULED_RUNNER_THREAD_POOL_NAME = NAME + "_scheduled_runner"; public static final String AUTODETECT_PROCESS_THREAD_POOL_NAME = NAME + "_autodetect_process"; // NORELEASE - temporary solution @@ -313,7 +313,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { // TODO: if scheduled and non scheduled jobs are considered more equal and the scheduler and // autodetect process are created at the same time then these two different TPs can merge. - FixedExecutorBuilder scheduler = new FixedExecutorBuilder(settings, SCHEDULER_THREAD_POOL_NAME, + FixedExecutorBuilder scheduler = new FixedExecutorBuilder(settings, SCHEDULED_RUNNER_THREAD_POOL_NAME, maxNumberOfJobs, 1, "xpack.prelert.scheduler_thread_pool"); return Arrays.asList(prelert, autoDetect, scheduler); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/StartSchedulerAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/StartSchedulerAction.java index 842b116a719..7ed54d4a84b 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/StartSchedulerAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/StartSchedulerAction.java @@ -198,7 +198,7 @@ public class StartSchedulerAction /* public for testing */ public void stop() { if (holder != null) { - holder.stop(); + holder.stop(null); } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java index aef02fdbd0a..dc24cba1080 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java @@ -66,18 +66,23 @@ public class ScheduledJobRunner extends AbstractComponent { validate(schedulerId, prelertMetadata); setJobSchedulerStatus(schedulerId, SchedulerStatus.STARTED, error -> { + if (error != null) { + handler.accept(error); + return; + } + Scheduler scheduler = prelertMetadata.getScheduler(schedulerId); logger.info("Starting scheduler [{}] for job [{}]", schedulerId, scheduler.getJobId()); Job job = prelertMetadata.getJobs().get(scheduler.getJobId()); Holder holder = createJobScheduler(scheduler, job, handler); task.setHolder(holder); - holder.future = threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME).submit(() -> { + holder.future = threadPool.executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME).submit(() -> { try { Long next = holder.scheduledJob.runLookBack(startTime, endTime); if (next != null) { doScheduleRealtime(next, job.getId(), holder); } else { - holder.stop(); + holder.stop(null); } } catch (ScheduledJob.ExtractionProblemException e) { holder.problemTracker.reportExtractionProblem(e.getCause().getMessage()); @@ -85,11 +90,11 @@ public class ScheduledJobRunner extends AbstractComponent { holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage()); } catch (ScheduledJob.EmptyDataCountException e) { if (holder.problemTracker.updateEmptyDataCount(true)) { - holder.stop(); + holder.stop(e); } } catch (Exception e) { logger.error("Failed lookback import for job [" + job.getId() + "]", e); - holder.stop(); + holder.stop(e); } holder.problemTracker.finishReport(); }); @@ -100,7 +105,7 @@ public class ScheduledJobRunner extends AbstractComponent { if (holder.isRunning()) { TimeValue delay = computeNextDelay(delayInMsSinceEpoch); logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId); - holder.future = threadPool.schedule(delay, PrelertPlugin.SCHEDULER_THREAD_POOL_NAME, () -> { + holder.future = threadPool.schedule(delay, PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME, () -> { long nextDelayInMsSinceEpoch; try { nextDelayInMsSinceEpoch = holder.scheduledJob.runRealtime(); @@ -114,19 +119,19 @@ public class ScheduledJobRunner extends AbstractComponent { nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch; if (holder.problemTracker.updateEmptyDataCount(true)) { holder.problemTracker.finishReport(); - holder.stop(); + holder.stop(e); return; } } catch (Exception e) { logger.error("Unexpected scheduler failure for job [" + jobId + "] stopping...", e); - holder.stop(); + holder.stop(e); return; } holder.problemTracker.finishReport(); doScheduleRealtime(nextDelayInMsSinceEpoch, jobId, holder); }); } else { - holder.stop(); + holder.stop(null); } } @@ -245,11 +250,11 @@ public class ScheduledJobRunner extends AbstractComponent { return scheduledJob.isRunning(); } - public void stop() { + public void stop(Exception e) { logger.info("Stopping scheduler [{}] for job [{}]", scheduler.getId(), scheduler.getJobId()); scheduledJob.stop(); FutureUtils.cancel(future); - setJobSchedulerStatus(scheduler.getId(), SchedulerStatus.STOPPED, error -> handler.accept(null)); + setJobSchedulerStatus(scheduler.getId(), SchedulerStatus.STOPPED, error -> handler.accept(e)); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java index 2e5a436e2a8..c8d08bced65 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java @@ -32,7 +32,6 @@ import static org.mockito.Mockito.mock; public class JobResultsPersisterTests extends ESTestCase { - private static final String CLUSTER_NAME = "myCluster"; private static final String JOB_ID = "foo"; public void testPersistBucket_OneRecord() throws IOException { @@ -168,7 +167,7 @@ public class JobResultsPersisterTests extends ESTestCase { private Client mockClient(AtomicReference reference) { Client client = mock(Client.class); doAnswer(invocationOnMock -> { - reference.set((BulkRequest) invocationOnMock.getArguments()[1]); + reference.set(invocationOnMock.getArguments()[1]); ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L)); return null; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunnerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunnerTests.java index 0c87d33645e..d1dcf419dea 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunnerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunnerTests.java @@ -96,7 +96,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { ((Runnable) invocation.getArguments()[0]).run(); return null; }).when(executorService).submit(any(Runnable.class)); - when(threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME)).thenReturn(executorService); + when(threadPool.executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME)).thenReturn(executorService); when(client.execute(same(JobDataAction.INSTANCE), any())).thenReturn(jobDataFuture); when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); @@ -133,7 +133,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { scheduledJobRunner.run("scheduler1", 0L, 60000L, task, handler); verify(dataExtractor).newSearch(eq(0L), eq(60000L), any()); - verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME); + verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); verify(client).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo"))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); @@ -167,14 +167,14 @@ public class ScheduledJobRunnerTests extends ESTestCase { scheduledJobRunner.run("scheduler1", 0L, null, task, handler); verify(dataExtractor).newSearch(eq(0L), eq(60000L), any()); - verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME); + verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME); if (cancelled) { task.stop(); verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STOPPED)), any()); } else { verify(client).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo"))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); - verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME), any()); + verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME), any()); } }