diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java index 1c7bc6f2f1a..95edef21b6e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java @@ -168,7 +168,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier); DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory, clusterService.getClusterSettings()); - ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, jobProvider, dataProcessor, + ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, jobProvider, // norelease: we will no longer need to pass the client here after we switch to a client based data extractor new HttpDataExtractorFactory(client), System::currentTimeMillis); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java index e8d64967a10..d8896385d47 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJob.java @@ -6,18 +6,20 @@ package org.elasticsearch.xpack.prelert.job.scheduler; import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.xpack.prelert.action.FlushJobAction; +import org.elasticsearch.xpack.prelert.action.JobDataAction; import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; -import org.elasticsearch.xpack.prelert.job.data.DataProcessor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.job.messages.Messages; -import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams; -import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams; -import org.elasticsearch.xpack.prelert.job.process.autodetect.params.TimeRange; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.Optional; @@ -25,7 +27,6 @@ import java.util.function.Supplier; class ScheduledJob { - private static final DataLoadParams DATA_LOAD_PARAMS = new DataLoadParams(TimeRange.builder().build()); private static final int NEXT_TASK_DELAY_MS = 100; private final Logger logger; @@ -33,8 +34,8 @@ class ScheduledJob { private final String jobId; private final long frequencyMs; private final long queryDelayMs; + private final Client client; private final DataExtractor dataExtractor; - private final DataProcessor dataProcessor; private final Supplier currentTimeSupplier; private volatile long lookbackStartTimeMs; @@ -42,14 +43,14 @@ class ScheduledJob { private volatile boolean running = true; ScheduledJob(String jobId, long frequencyMs, long queryDelayMs, DataExtractor dataExtractor, - DataProcessor dataProcessor, Auditor auditor, Supplier currentTimeSupplier, + Client client, Auditor auditor, Supplier currentTimeSupplier, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { this.logger = Loggers.getLogger(jobId); this.jobId = jobId; this.frequencyMs = frequencyMs; this.queryDelayMs = queryDelayMs; this.dataExtractor = dataExtractor; - this.dataProcessor = dataProcessor; + this.client = client; this.auditor = auditor; this.currentTimeSupplier = currentTimeSupplier; @@ -80,7 +81,9 @@ class ScheduledJob { DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackEnd)); auditor.info(msg); - run(lookbackStartTimeMs, lookbackEnd, InterimResultsParams.builder().calcInterim(true).build()); + FlushJobAction.Request request = new FlushJobAction.Request(jobId); + request.setCalcInterim(true); + run(lookbackStartTimeMs, lookbackEnd, request); auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_LOOKBACK_COMPLETED)); logger.info("Lookback has finished"); if (isLookbackOnly) { @@ -95,10 +98,10 @@ class ScheduledJob { long start = lastEndTimeMs == null ? lookbackStartTimeMs : lastEndTimeMs + 1; long nowMinusQueryDelay = currentTimeSupplier.get() - queryDelayMs; long end = toIntervalStartEpochMs(nowMinusQueryDelay); - InterimResultsParams.Builder flushParams = InterimResultsParams.builder() - .calcInterim(true) - .advanceTime(String.valueOf(lastEndTimeMs)); - run(start, end, flushParams.build()); + FlushJobAction.Request request = new FlushJobAction.Request(jobId); + request.setCalcInterim(true); + request.setAdvanceTime(String.valueOf(lastEndTimeMs)); + run(start, end, request); return nextRealtimeTimestamp(); } @@ -112,7 +115,7 @@ class ScheduledJob { return running; } - private void run(long start, long end, InterimResultsParams flushParams) throws IOException { + private void run(long start, long end, FlushJobAction.Request flushRequest) throws IOException { if (end <= start) { return; } @@ -132,9 +135,17 @@ class ScheduledJob { } if (extractedData.isPresent()) { DataCounts counts; - try { - counts = dataProcessor.processData(jobId, extractedData.get(), DATA_LOAD_PARAMS, () -> false); + try (InputStream in = extractedData.get()) { + JobDataAction.Request request = new JobDataAction.Request(jobId); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + Streams.copy(in, outputStream); + request.setContent(new BytesArray(outputStream.toByteArray())); + JobDataAction.Response response = client.execute(JobDataAction.INSTANCE, request).get(); + counts = response.getDataCounts(); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } error = new AnalysisProblemException(e); break; } @@ -157,7 +168,14 @@ class ScheduledJob { throw new EmptyDataCountException(); } - dataProcessor.flushJob(jobId, flushParams); + try { + client.execute(FlushJobAction.INSTANCE, flushRequest).get(); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new RuntimeException(e); + } } private long nextRealtimeTimestamp() { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java index 6f3b8c467a3..a1d1b16266d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunner.java @@ -25,7 +25,6 @@ import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.config.DefaultFrequency; -import org.elasticsearch.xpack.prelert.job.data.DataProcessor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory; import org.elasticsearch.xpack.prelert.job.metadata.Allocation; @@ -44,18 +43,16 @@ public class ScheduledJobRunner extends AbstractComponent { private final Client client; private final JobProvider jobProvider; - private final DataProcessor dataProcessor; private final DataExtractorFactory dataExtractorFactory; private final ThreadPool threadPool; private final Supplier currentTimeSupplier; - public ScheduledJobRunner(ThreadPool threadPool, Client client, JobProvider jobProvider, DataProcessor dataProcessor, - DataExtractorFactory dataExtractorFactory, Supplier currentTimeSupplier) { + public ScheduledJobRunner(ThreadPool threadPool, Client client, JobProvider jobProvider, DataExtractorFactory dataExtractorFactory, + Supplier currentTimeSupplier) { super(Settings.EMPTY); this.threadPool = threadPool; this.client = Objects.requireNonNull(client); this.jobProvider = Objects.requireNonNull(jobProvider); - this.dataProcessor = Objects.requireNonNull(dataProcessor); this.dataExtractorFactory = Objects.requireNonNull(dataExtractorFactory); this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); } @@ -150,7 +147,7 @@ public class ScheduledJobRunner extends AbstractComponent { Duration queryDelay = Duration.ofSeconds(job.getSchedulerConfig().getQueryDelay()); DataExtractor dataExtractor = dataExtractorFactory.newExtractor(job); ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(), - dataExtractor, dataProcessor, auditor, currentTimeSupplier, getLatestFinalBucketEndTimeMs(job), + dataExtractor, client, auditor, currentTimeSupplier, getLatestFinalBucketEndTimeMs(job), getLatestRecordTimestamp(job.getId())); return new Holder(job, scheduledJob, new ProblemTracker(() -> auditor), handler); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java index 84b2ff82a86..804162c9bd7 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobRunnerTests.java @@ -6,12 +6,15 @@ package org.elasticsearch.xpack.prelert.job.scheduler; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.prelert.PrelertPlugin; +import org.elasticsearch.xpack.prelert.action.FlushJobAction; +import org.elasticsearch.xpack.prelert.action.JobDataAction; import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction; import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction; import org.elasticsearch.xpack.prelert.job.AnalysisConfig; @@ -62,25 +65,28 @@ import static org.mockito.Mockito.when; public class ScheduledJobRunnerTests extends ESTestCase { private Client client; + private ActionFuture jobDataFuture; + private ActionFuture flushJobFuture; private ThreadPool threadPool; - private DataProcessor dataProcessor; private DataExtractorFactory dataExtractorFactory; private ScheduledJobRunner scheduledJobRunner; private long currentTime = 120000; @Before + @SuppressWarnings("unchecked") public void setUpTests() { client = mock(Client.class); + jobDataFuture = mock(ActionFuture.class); + flushJobFuture = mock(ActionFuture.class); doAnswer(invocation -> { @SuppressWarnings("unchecked") ActionListener actionListener = (ActionListener) invocation.getArguments()[2]; actionListener.onResponse(new UpdateJobSchedulerStatusAction.Response()); return null; - }).when(client).execute(eq(UpdateJobSchedulerStatusAction.INSTANCE), any(), any()); + }).when(client).execute(same(UpdateJobSchedulerStatusAction.INSTANCE), any(), any()); JobProvider jobProvider = mock(JobProvider.class); when(jobProvider.dataCounts(anyString())).thenReturn(new DataCounts("foo")); - dataProcessor = mock(DataProcessor.class); dataExtractorFactory = mock(DataExtractorFactory.class); Auditor auditor = mock(Auditor.class); threadPool = mock(ThreadPool.class); @@ -90,16 +96,18 @@ public class ScheduledJobRunnerTests extends ESTestCase { return null; }).when(executorService).submit(any(Runnable.class)); when(threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME)).thenReturn(executorService); + when(client.execute(same(JobDataAction.INSTANCE), any())).thenReturn(jobDataFuture); + when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); scheduledJobRunner = - new ScheduledJobRunner(threadPool, client, jobProvider, dataProcessor, dataExtractorFactory, () -> currentTime); + new ScheduledJobRunner(threadPool, client, jobProvider, dataExtractorFactory, () -> currentTime); when(jobProvider.audit(anyString())).thenReturn(auditor); when(jobProvider.buckets(anyString(), any(BucketsQueryBuilder.BucketsQuery.class))).thenThrow( QueryPage.emptyQueryPage(Job.RESULTS_FIELD)); } - public void testStart_GivenNewlyCreatedJobLoopBack() throws IOException { + public void testStart_GivenNewlyCreatedJobLoopBack() throws Exception { Job.Builder builder = createScheduledJob(); SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, 60000L); DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); @@ -112,7 +120,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); when(dataExtractor.next()).thenReturn(Optional.of(in)); - when(dataProcessor.processData(anyString(), eq(in), any(), any())).thenReturn(dataCounts); + when(jobDataFuture.get()).thenReturn(new JobDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); StartJobSchedulerAction.SchedulerTask task = mock(StartJobSchedulerAction.SchedulerTask.class); scheduledJobRunner.run(job, schedulerState, allocation, task, handler); @@ -120,11 +128,13 @@ public class ScheduledJobRunnerTests extends ESTestCase { verify(dataExtractor).newSearch(eq(0L), eq(60000L), any()); verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); + verify(client).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo"))); + verify(client).execute(same(FlushJobAction.INSTANCE), any()); verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STARTED)), any()); verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); } - public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws IOException { + public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception { Job.Builder builder = createScheduledJob(); SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, null); DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); @@ -137,7 +147,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); when(dataExtractor.next()).thenReturn(Optional.of(in)); - when(dataProcessor.processData(anyString(), eq(in), any(), any())).thenReturn(dataCounts); + when(jobDataFuture.get()).thenReturn(new JobDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); boolean cancelled = randomBoolean(); StartJobSchedulerAction.SchedulerTask task = new StartJobSchedulerAction.SchedulerTask(1, "type", "action", null, "foo"); @@ -149,6 +159,8 @@ public class ScheduledJobRunnerTests extends ESTestCase { task.stop(); verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); } else { + verify(client).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo"))); + verify(client).execute(same(FlushJobAction.INSTANCE), any()); verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME), any()); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java index c60904d6995..2edbb318cf4 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobTests.java @@ -5,19 +5,23 @@ */ package org.elasticsearch.xpack.prelert.job.scheduler; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.client.Client; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.prelert.action.FlushJobAction; +import org.elasticsearch.xpack.prelert.action.JobDataAction; import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.audit.Auditor; -import org.elasticsearch.xpack.prelert.job.data.DataProcessor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; -import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams; import org.junit.Before; import org.mockito.ArgumentCaptor; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.Date; import java.util.Optional; import java.util.function.Supplier; @@ -34,22 +38,28 @@ public class ScheduledJobTests extends ESTestCase { private Auditor auditor; private DataExtractor dataExtractor; - private DataProcessor dataProcessor; + private Client client; + private ActionFuture flushJobFuture; private long currentTime; @Before + @SuppressWarnings("unchecked") public void setup() throws Exception { auditor = mock(Auditor.class); dataExtractor = mock(DataExtractor.class); - dataProcessor = mock(DataProcessor.class); + client = mock(Client.class); + ActionFuture jobDataFuture = mock(ActionFuture.class); + flushJobFuture = mock(ActionFuture.class); currentTime = 0; when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); - InputStream inputStream = mock(InputStream.class); + InputStream inputStream = new ByteArrayInputStream("content".getBytes(StandardCharsets.UTF_8)); when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); - when(dataProcessor.processData(eq("_job_id"), same(inputStream), any(), any())).thenReturn(dataCounts); + when(client.execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("_job_id")))).thenReturn(jobDataFuture); + when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); + when(jobDataFuture.get()).thenReturn(new JobDataAction.Response(dataCounts)); } public void testLookBackRunWithEndTime() throws Exception { @@ -58,7 +68,9 @@ public class ScheduledJobTests extends ESTestCase { assertNull(scheduledJob.runLookBack(schedulerState)); verify(dataExtractor).newSearch(eq(0L), eq(1000L), any()); - verify(dataProcessor).flushJob(eq("_job_id"), any()); + FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); + flushRequest.setCalcInterim(true); + verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); } public void testLookBackRunWithNoEndTime() throws Exception { @@ -71,8 +83,9 @@ public class ScheduledJobTests extends ESTestCase { assertEquals(2000 + frequencyMs + 100, next); verify(dataExtractor).newSearch(eq(0L), eq(1500L), any()); - InterimResultsParams expectedParams = InterimResultsParams.builder().calcInterim(true).build(); - verify(dataProcessor).flushJob(eq("_job_id"), eq(expectedParams)); + FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); + flushRequest.setCalcInterim(true); + verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); } public void testLookBackRunWithOverrideStartTime() throws Exception { @@ -93,8 +106,9 @@ public class ScheduledJobTests extends ESTestCase { assertEquals(10000 + frequencyMs + 100, next); verify(dataExtractor).newSearch(eq(5000 + 1L), eq(currentTime - queryDelayMs), any()); - InterimResultsParams expectedParams = InterimResultsParams.builder().calcInterim(true).build(); - verify(dataProcessor).flushJob(eq("_job_id"), eq(expectedParams)); + FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); + flushRequest.setCalcInterim(true); + verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); } public void testRealtimeRun() throws Exception { @@ -106,7 +120,10 @@ public class ScheduledJobTests extends ESTestCase { assertEquals(currentTime + frequencyMs + 100, next); verify(dataExtractor).newSearch(eq(1000L + 1L), eq(currentTime - queryDelayMs), any()); - verify(dataProcessor).flushJob(eq("_job_id"), any()); + FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); + flushRequest.setCalcInterim(true); + flushRequest.setAdvanceTime("1000"); + verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); } public void testEmptyDataCount() throws Exception { @@ -128,7 +145,7 @@ public class ScheduledJobTests extends ESTestCase { expectThrows(ScheduledJob.ExtractionProblemException.class, () -> scheduledJob.runLookBack(schedulerState)); currentTime = 3001; - expectThrows(ScheduledJob.ExtractionProblemException.class, () -> scheduledJob.runRealtime()); + expectThrows(ScheduledJob.ExtractionProblemException.class, scheduledJob::runRealtime); ArgumentCaptor startTimeCaptor = ArgumentCaptor.forClass(Long.class); ArgumentCaptor endTimeCaptor = ArgumentCaptor.forClass(Long.class); @@ -140,15 +157,16 @@ public class ScheduledJobTests extends ESTestCase { } public void testAnalysisProblem() throws Exception { - dataProcessor = mock(DataProcessor.class); - when(dataProcessor.processData(eq("_job_id"), any(), any(), any())).thenThrow(new RuntimeException()); + client = mock(Client.class); + when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); + when(client.execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("_job_id")))).thenThrow(new RuntimeException()); ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1); SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L); expectThrows(ScheduledJob.AnalysisProblemException.class, () -> scheduledJob.runLookBack(schedulerState)); currentTime = 3001; - expectThrows(ScheduledJob.EmptyDataCountException.class, () -> scheduledJob.runRealtime()); + expectThrows(ScheduledJob.EmptyDataCountException.class, scheduledJob::runRealtime); ArgumentCaptor startTimeCaptor = ArgumentCaptor.forClass(Long.class); ArgumentCaptor endTimeCaptor = ArgumentCaptor.forClass(Long.class); @@ -157,12 +175,13 @@ public class ScheduledJobTests extends ESTestCase { assertEquals(1000L, startTimeCaptor.getAllValues().get(1).longValue()); assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue()); assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue()); + verify(client, times(0)).execute(same(FlushJobAction.INSTANCE), any()); } private ScheduledJob createScheduledJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { Supplier currentTimeSupplier = () -> currentTime; - return new ScheduledJob("_job_id", frequencyMs, queryDelayMs, dataExtractor, dataProcessor, auditor, + return new ScheduledJob("_job_id", frequencyMs, queryDelayMs, dataExtractor, client, auditor, currentTimeSupplier, latestFinalBucketEndTimeMs, latestRecordTimeMs); }