Let the scheduler use the data transport action instead of directly using autodetect process manager.

Original commit: elastic/x-pack-elasticsearch@2442e222fd
This commit is contained in:
Martijn van Groningen 2016-12-09 09:40:54 +01:00
parent 65f03a8888
commit 372cb7c964
5 changed files with 95 additions and 49 deletions

View File

@ -168,7 +168,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier); AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier);
DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider, DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory, clusterService.getClusterSettings()); jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory, clusterService.getClusterSettings());
ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, jobProvider, dataProcessor, ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, jobProvider,
// norelease: we will no longer need to pass the client here after we switch to a client based data extractor // norelease: we will no longer need to pass the client here after we switch to a client based data extractor
new HttpDataExtractorFactory(client), new HttpDataExtractorFactory(client),
System::currentTimeMillis); System::currentTimeMillis);

View File

@ -6,18 +6,20 @@
package org.elasticsearch.xpack.prelert.job.scheduler; package org.elasticsearch.xpack.prelert.job.scheduler;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xpack.prelert.action.FlushJobAction;
import org.elasticsearch.xpack.prelert.action.JobDataAction;
import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.data.DataProcessor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.messages.Messages; import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.TimeRange;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Optional; import java.util.Optional;
@ -25,7 +27,6 @@ import java.util.function.Supplier;
class ScheduledJob { class ScheduledJob {
private static final DataLoadParams DATA_LOAD_PARAMS = new DataLoadParams(TimeRange.builder().build());
private static final int NEXT_TASK_DELAY_MS = 100; private static final int NEXT_TASK_DELAY_MS = 100;
private final Logger logger; private final Logger logger;
@ -33,8 +34,8 @@ class ScheduledJob {
private final String jobId; private final String jobId;
private final long frequencyMs; private final long frequencyMs;
private final long queryDelayMs; private final long queryDelayMs;
private final Client client;
private final DataExtractor dataExtractor; private final DataExtractor dataExtractor;
private final DataProcessor dataProcessor;
private final Supplier<Long> currentTimeSupplier; private final Supplier<Long> currentTimeSupplier;
private volatile long lookbackStartTimeMs; private volatile long lookbackStartTimeMs;
@ -42,14 +43,14 @@ class ScheduledJob {
private volatile boolean running = true; private volatile boolean running = true;
ScheduledJob(String jobId, long frequencyMs, long queryDelayMs, DataExtractor dataExtractor, ScheduledJob(String jobId, long frequencyMs, long queryDelayMs, DataExtractor dataExtractor,
DataProcessor dataProcessor, Auditor auditor, Supplier<Long> currentTimeSupplier, Client client, Auditor auditor, Supplier<Long> currentTimeSupplier,
long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { long latestFinalBucketEndTimeMs, long latestRecordTimeMs) {
this.logger = Loggers.getLogger(jobId); this.logger = Loggers.getLogger(jobId);
this.jobId = jobId; this.jobId = jobId;
this.frequencyMs = frequencyMs; this.frequencyMs = frequencyMs;
this.queryDelayMs = queryDelayMs; this.queryDelayMs = queryDelayMs;
this.dataExtractor = dataExtractor; this.dataExtractor = dataExtractor;
this.dataProcessor = dataProcessor; this.client = client;
this.auditor = auditor; this.auditor = auditor;
this.currentTimeSupplier = currentTimeSupplier; this.currentTimeSupplier = currentTimeSupplier;
@ -80,7 +81,9 @@ class ScheduledJob {
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackEnd)); DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackEnd));
auditor.info(msg); auditor.info(msg);
run(lookbackStartTimeMs, lookbackEnd, InterimResultsParams.builder().calcInterim(true).build()); FlushJobAction.Request request = new FlushJobAction.Request(jobId);
request.setCalcInterim(true);
run(lookbackStartTimeMs, lookbackEnd, request);
auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_LOOKBACK_COMPLETED)); auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_LOOKBACK_COMPLETED));
logger.info("Lookback has finished"); logger.info("Lookback has finished");
if (isLookbackOnly) { if (isLookbackOnly) {
@ -95,10 +98,10 @@ class ScheduledJob {
long start = lastEndTimeMs == null ? lookbackStartTimeMs : lastEndTimeMs + 1; long start = lastEndTimeMs == null ? lookbackStartTimeMs : lastEndTimeMs + 1;
long nowMinusQueryDelay = currentTimeSupplier.get() - queryDelayMs; long nowMinusQueryDelay = currentTimeSupplier.get() - queryDelayMs;
long end = toIntervalStartEpochMs(nowMinusQueryDelay); long end = toIntervalStartEpochMs(nowMinusQueryDelay);
InterimResultsParams.Builder flushParams = InterimResultsParams.builder() FlushJobAction.Request request = new FlushJobAction.Request(jobId);
.calcInterim(true) request.setCalcInterim(true);
.advanceTime(String.valueOf(lastEndTimeMs)); request.setAdvanceTime(String.valueOf(lastEndTimeMs));
run(start, end, flushParams.build()); run(start, end, request);
return nextRealtimeTimestamp(); return nextRealtimeTimestamp();
} }
@ -112,7 +115,7 @@ class ScheduledJob {
return running; return running;
} }
private void run(long start, long end, InterimResultsParams flushParams) throws IOException { private void run(long start, long end, FlushJobAction.Request flushRequest) throws IOException {
if (end <= start) { if (end <= start) {
return; return;
} }
@ -132,9 +135,17 @@ class ScheduledJob {
} }
if (extractedData.isPresent()) { if (extractedData.isPresent()) {
DataCounts counts; DataCounts counts;
try { try (InputStream in = extractedData.get()) {
counts = dataProcessor.processData(jobId, extractedData.get(), DATA_LOAD_PARAMS, () -> false); JobDataAction.Request request = new JobDataAction.Request(jobId);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Streams.copy(in, outputStream);
request.setContent(new BytesArray(outputStream.toByteArray()));
JobDataAction.Response response = client.execute(JobDataAction.INSTANCE, request).get();
counts = response.getDataCounts();
} catch (Exception e) { } catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
error = new AnalysisProblemException(e); error = new AnalysisProblemException(e);
break; break;
} }
@ -157,7 +168,14 @@ class ScheduledJob {
throw new EmptyDataCountException(); throw new EmptyDataCountException();
} }
dataProcessor.flushJob(jobId, flushParams); try {
client.execute(FlushJobAction.INSTANCE, flushRequest).get();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new RuntimeException(e);
}
} }
private long nextRealtimeTimestamp() { private long nextRealtimeTimestamp() {

View File

@ -25,7 +25,6 @@ import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.config.DefaultFrequency; import org.elasticsearch.xpack.prelert.job.config.DefaultFrequency;
import org.elasticsearch.xpack.prelert.job.data.DataProcessor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation; import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
@ -44,18 +43,16 @@ public class ScheduledJobRunner extends AbstractComponent {
private final Client client; private final Client client;
private final JobProvider jobProvider; private final JobProvider jobProvider;
private final DataProcessor dataProcessor;
private final DataExtractorFactory dataExtractorFactory; private final DataExtractorFactory dataExtractorFactory;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final Supplier<Long> currentTimeSupplier; private final Supplier<Long> currentTimeSupplier;
public ScheduledJobRunner(ThreadPool threadPool, Client client, JobProvider jobProvider, DataProcessor dataProcessor, public ScheduledJobRunner(ThreadPool threadPool, Client client, JobProvider jobProvider, DataExtractorFactory dataExtractorFactory,
DataExtractorFactory dataExtractorFactory, Supplier<Long> currentTimeSupplier) { Supplier<Long> currentTimeSupplier) {
super(Settings.EMPTY); super(Settings.EMPTY);
this.threadPool = threadPool; this.threadPool = threadPool;
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.jobProvider = Objects.requireNonNull(jobProvider); this.jobProvider = Objects.requireNonNull(jobProvider);
this.dataProcessor = Objects.requireNonNull(dataProcessor);
this.dataExtractorFactory = Objects.requireNonNull(dataExtractorFactory); this.dataExtractorFactory = Objects.requireNonNull(dataExtractorFactory);
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
} }
@ -150,7 +147,7 @@ public class ScheduledJobRunner extends AbstractComponent {
Duration queryDelay = Duration.ofSeconds(job.getSchedulerConfig().getQueryDelay()); Duration queryDelay = Duration.ofSeconds(job.getSchedulerConfig().getQueryDelay());
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(job); DataExtractor dataExtractor = dataExtractorFactory.newExtractor(job);
ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(), ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(),
dataExtractor, dataProcessor, auditor, currentTimeSupplier, getLatestFinalBucketEndTimeMs(job), dataExtractor, client, auditor, currentTimeSupplier, getLatestFinalBucketEndTimeMs(job),
getLatestRecordTimestamp(job.getId())); getLatestRecordTimestamp(job.getId()));
return new Holder(job, scheduledJob, new ProblemTracker(() -> auditor), handler); return new Holder(job, scheduledJob, new ProblemTracker(() -> auditor), handler);
} }

View File

@ -6,12 +6,15 @@
package org.elasticsearch.xpack.prelert.job.scheduler; package org.elasticsearch.xpack.prelert.job.scheduler;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.PrelertPlugin; import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.FlushJobAction;
import org.elasticsearch.xpack.prelert.action.JobDataAction;
import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction; import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction;
import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction; import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig; import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
@ -62,25 +65,28 @@ import static org.mockito.Mockito.when;
public class ScheduledJobRunnerTests extends ESTestCase { public class ScheduledJobRunnerTests extends ESTestCase {
private Client client; private Client client;
private ActionFuture<JobDataAction.Response> jobDataFuture;
private ActionFuture<FlushJobAction.Response> flushJobFuture;
private ThreadPool threadPool; private ThreadPool threadPool;
private DataProcessor dataProcessor;
private DataExtractorFactory dataExtractorFactory; private DataExtractorFactory dataExtractorFactory;
private ScheduledJobRunner scheduledJobRunner; private ScheduledJobRunner scheduledJobRunner;
private long currentTime = 120000; private long currentTime = 120000;
@Before @Before
@SuppressWarnings("unchecked")
public void setUpTests() { public void setUpTests() {
client = mock(Client.class); client = mock(Client.class);
jobDataFuture = mock(ActionFuture.class);
flushJobFuture = mock(ActionFuture.class);
doAnswer(invocation -> { doAnswer(invocation -> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
ActionListener<Object> actionListener = (ActionListener) invocation.getArguments()[2]; ActionListener<Object> actionListener = (ActionListener) invocation.getArguments()[2];
actionListener.onResponse(new UpdateJobSchedulerStatusAction.Response()); actionListener.onResponse(new UpdateJobSchedulerStatusAction.Response());
return null; return null;
}).when(client).execute(eq(UpdateJobSchedulerStatusAction.INSTANCE), any(), any()); }).when(client).execute(same(UpdateJobSchedulerStatusAction.INSTANCE), any(), any());
JobProvider jobProvider = mock(JobProvider.class); JobProvider jobProvider = mock(JobProvider.class);
when(jobProvider.dataCounts(anyString())).thenReturn(new DataCounts("foo")); when(jobProvider.dataCounts(anyString())).thenReturn(new DataCounts("foo"));
dataProcessor = mock(DataProcessor.class);
dataExtractorFactory = mock(DataExtractorFactory.class); dataExtractorFactory = mock(DataExtractorFactory.class);
Auditor auditor = mock(Auditor.class); Auditor auditor = mock(Auditor.class);
threadPool = mock(ThreadPool.class); threadPool = mock(ThreadPool.class);
@ -90,16 +96,18 @@ public class ScheduledJobRunnerTests extends ESTestCase {
return null; return null;
}).when(executorService).submit(any(Runnable.class)); }).when(executorService).submit(any(Runnable.class));
when(threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME)).thenReturn(executorService); when(threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME)).thenReturn(executorService);
when(client.execute(same(JobDataAction.INSTANCE), any())).thenReturn(jobDataFuture);
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
scheduledJobRunner = scheduledJobRunner =
new ScheduledJobRunner(threadPool, client, jobProvider, dataProcessor, dataExtractorFactory, () -> currentTime); new ScheduledJobRunner(threadPool, client, jobProvider, dataExtractorFactory, () -> currentTime);
when(jobProvider.audit(anyString())).thenReturn(auditor); when(jobProvider.audit(anyString())).thenReturn(auditor);
when(jobProvider.buckets(anyString(), any(BucketsQueryBuilder.BucketsQuery.class))).thenThrow( when(jobProvider.buckets(anyString(), any(BucketsQueryBuilder.BucketsQuery.class))).thenThrow(
QueryPage.emptyQueryPage(Job.RESULTS_FIELD)); QueryPage.emptyQueryPage(Job.RESULTS_FIELD));
} }
public void testStart_GivenNewlyCreatedJobLoopBack() throws IOException { public void testStart_GivenNewlyCreatedJobLoopBack() throws Exception {
Job.Builder builder = createScheduledJob(); Job.Builder builder = createScheduledJob();
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, 60000L); SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, 60000L);
DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
@ -112,7 +120,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
when(dataExtractor.next()).thenReturn(Optional.of(in)); when(dataExtractor.next()).thenReturn(Optional.of(in));
when(dataProcessor.processData(anyString(), eq(in), any(), any())).thenReturn(dataCounts); when(jobDataFuture.get()).thenReturn(new JobDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
StartJobSchedulerAction.SchedulerTask task = mock(StartJobSchedulerAction.SchedulerTask.class); StartJobSchedulerAction.SchedulerTask task = mock(StartJobSchedulerAction.SchedulerTask.class);
scheduledJobRunner.run(job, schedulerState, allocation, task, handler); scheduledJobRunner.run(job, schedulerState, allocation, task, handler);
@ -120,11 +128,13 @@ public class ScheduledJobRunnerTests extends ESTestCase {
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any()); verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any()); verify(threadPool, never()).schedule(any(), any(), any());
verify(client).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo")));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STARTED)), any()); verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STARTED)), any());
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any());
} }
public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws IOException { public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception {
Job.Builder builder = createScheduledJob(); Job.Builder builder = createScheduledJob();
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, null); SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STOPPED, 0L, null);
DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
@ -137,7 +147,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
when(dataExtractor.next()).thenReturn(Optional.of(in)); when(dataExtractor.next()).thenReturn(Optional.of(in));
when(dataProcessor.processData(anyString(), eq(in), any(), any())).thenReturn(dataCounts); when(jobDataFuture.get()).thenReturn(new JobDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
boolean cancelled = randomBoolean(); boolean cancelled = randomBoolean();
StartJobSchedulerAction.SchedulerTask task = new StartJobSchedulerAction.SchedulerTask(1, "type", "action", null, "foo"); StartJobSchedulerAction.SchedulerTask task = new StartJobSchedulerAction.SchedulerTask(1, "type", "action", null, "foo");
@ -149,6 +159,8 @@ public class ScheduledJobRunnerTests extends ESTestCase {
task.stop(); task.stop();
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any()); verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPED)), any());
} else { } else {
verify(client).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo")));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME), any()); verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME), any());
} }
} }

View File

@ -5,19 +5,23 @@
*/ */
package org.elasticsearch.xpack.prelert.job.scheduler; package org.elasticsearch.xpack.prelert.job.scheduler;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.action.FlushJobAction;
import org.elasticsearch.xpack.prelert.action.JobDataAction;
import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState; import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.data.DataProcessor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
import org.junit.Before; import org.junit.Before;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Date; import java.util.Date;
import java.util.Optional; import java.util.Optional;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -34,22 +38,28 @@ public class ScheduledJobTests extends ESTestCase {
private Auditor auditor; private Auditor auditor;
private DataExtractor dataExtractor; private DataExtractor dataExtractor;
private DataProcessor dataProcessor; private Client client;
private ActionFuture<FlushJobAction.Response> flushJobFuture;
private long currentTime; private long currentTime;
@Before @Before
@SuppressWarnings("unchecked")
public void setup() throws Exception { public void setup() throws Exception {
auditor = mock(Auditor.class); auditor = mock(Auditor.class);
dataExtractor = mock(DataExtractor.class); dataExtractor = mock(DataExtractor.class);
dataProcessor = mock(DataProcessor.class); client = mock(Client.class);
ActionFuture<JobDataAction.Response> jobDataFuture = mock(ActionFuture.class);
flushJobFuture = mock(ActionFuture.class);
currentTime = 0; currentTime = 0;
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream inputStream = mock(InputStream.class); InputStream inputStream = new ByteArrayInputStream("content".getBytes(StandardCharsets.UTF_8));
when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
when(dataProcessor.processData(eq("_job_id"), same(inputStream), any(), any())).thenReturn(dataCounts); when(client.execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("_job_id")))).thenReturn(jobDataFuture);
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
when(jobDataFuture.get()).thenReturn(new JobDataAction.Response(dataCounts));
} }
public void testLookBackRunWithEndTime() throws Exception { public void testLookBackRunWithEndTime() throws Exception {
@ -58,7 +68,9 @@ public class ScheduledJobTests extends ESTestCase {
assertNull(scheduledJob.runLookBack(schedulerState)); assertNull(scheduledJob.runLookBack(schedulerState));
verify(dataExtractor).newSearch(eq(0L), eq(1000L), any()); verify(dataExtractor).newSearch(eq(0L), eq(1000L), any());
verify(dataProcessor).flushJob(eq("_job_id"), any()); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
flushRequest.setCalcInterim(true);
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
} }
public void testLookBackRunWithNoEndTime() throws Exception { public void testLookBackRunWithNoEndTime() throws Exception {
@ -71,8 +83,9 @@ public class ScheduledJobTests extends ESTestCase {
assertEquals(2000 + frequencyMs + 100, next); assertEquals(2000 + frequencyMs + 100, next);
verify(dataExtractor).newSearch(eq(0L), eq(1500L), any()); verify(dataExtractor).newSearch(eq(0L), eq(1500L), any());
InterimResultsParams expectedParams = InterimResultsParams.builder().calcInterim(true).build(); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
verify(dataProcessor).flushJob(eq("_job_id"), eq(expectedParams)); flushRequest.setCalcInterim(true);
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
} }
public void testLookBackRunWithOverrideStartTime() throws Exception { public void testLookBackRunWithOverrideStartTime() throws Exception {
@ -93,8 +106,9 @@ public class ScheduledJobTests extends ESTestCase {
assertEquals(10000 + frequencyMs + 100, next); assertEquals(10000 + frequencyMs + 100, next);
verify(dataExtractor).newSearch(eq(5000 + 1L), eq(currentTime - queryDelayMs), any()); verify(dataExtractor).newSearch(eq(5000 + 1L), eq(currentTime - queryDelayMs), any());
InterimResultsParams expectedParams = InterimResultsParams.builder().calcInterim(true).build(); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
verify(dataProcessor).flushJob(eq("_job_id"), eq(expectedParams)); flushRequest.setCalcInterim(true);
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
} }
public void testRealtimeRun() throws Exception { public void testRealtimeRun() throws Exception {
@ -106,7 +120,10 @@ public class ScheduledJobTests extends ESTestCase {
assertEquals(currentTime + frequencyMs + 100, next); assertEquals(currentTime + frequencyMs + 100, next);
verify(dataExtractor).newSearch(eq(1000L + 1L), eq(currentTime - queryDelayMs), any()); verify(dataExtractor).newSearch(eq(1000L + 1L), eq(currentTime - queryDelayMs), any());
verify(dataProcessor).flushJob(eq("_job_id"), any()); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
flushRequest.setCalcInterim(true);
flushRequest.setAdvanceTime("1000");
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
} }
public void testEmptyDataCount() throws Exception { public void testEmptyDataCount() throws Exception {
@ -128,7 +145,7 @@ public class ScheduledJobTests extends ESTestCase {
expectThrows(ScheduledJob.ExtractionProblemException.class, () -> scheduledJob.runLookBack(schedulerState)); expectThrows(ScheduledJob.ExtractionProblemException.class, () -> scheduledJob.runLookBack(schedulerState));
currentTime = 3001; currentTime = 3001;
expectThrows(ScheduledJob.ExtractionProblemException.class, () -> scheduledJob.runRealtime()); expectThrows(ScheduledJob.ExtractionProblemException.class, scheduledJob::runRealtime);
ArgumentCaptor<Long> startTimeCaptor = ArgumentCaptor.forClass(Long.class); ArgumentCaptor<Long> startTimeCaptor = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Long> endTimeCaptor = ArgumentCaptor.forClass(Long.class); ArgumentCaptor<Long> endTimeCaptor = ArgumentCaptor.forClass(Long.class);
@ -140,15 +157,16 @@ public class ScheduledJobTests extends ESTestCase {
} }
public void testAnalysisProblem() throws Exception { public void testAnalysisProblem() throws Exception {
dataProcessor = mock(DataProcessor.class); client = mock(Client.class);
when(dataProcessor.processData(eq("_job_id"), any(), any(), any())).thenThrow(new RuntimeException()); when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
when(client.execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("_job_id")))).thenThrow(new RuntimeException());
ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1); ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1);
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L); SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L);
expectThrows(ScheduledJob.AnalysisProblemException.class, () -> scheduledJob.runLookBack(schedulerState)); expectThrows(ScheduledJob.AnalysisProblemException.class, () -> scheduledJob.runLookBack(schedulerState));
currentTime = 3001; currentTime = 3001;
expectThrows(ScheduledJob.EmptyDataCountException.class, () -> scheduledJob.runRealtime()); expectThrows(ScheduledJob.EmptyDataCountException.class, scheduledJob::runRealtime);
ArgumentCaptor<Long> startTimeCaptor = ArgumentCaptor.forClass(Long.class); ArgumentCaptor<Long> startTimeCaptor = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Long> endTimeCaptor = ArgumentCaptor.forClass(Long.class); ArgumentCaptor<Long> endTimeCaptor = ArgumentCaptor.forClass(Long.class);
@ -157,12 +175,13 @@ public class ScheduledJobTests extends ESTestCase {
assertEquals(1000L, startTimeCaptor.getAllValues().get(1).longValue()); assertEquals(1000L, startTimeCaptor.getAllValues().get(1).longValue());
assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue()); assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue());
assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue()); assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue());
verify(client, times(0)).execute(same(FlushJobAction.INSTANCE), any());
} }
private ScheduledJob createScheduledJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs, private ScheduledJob createScheduledJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs,
long latestRecordTimeMs) { long latestRecordTimeMs) {
Supplier<Long> currentTimeSupplier = () -> currentTime; Supplier<Long> currentTimeSupplier = () -> currentTime;
return new ScheduledJob("_job_id", frequencyMs, queryDelayMs, dataExtractor, dataProcessor, auditor, return new ScheduledJob("_job_id", frequencyMs, queryDelayMs, dataExtractor, client, auditor,
currentTimeSupplier, latestFinalBucketEndTimeMs, latestRecordTimeMs); currentTimeSupplier, latestFinalBucketEndTimeMs, latestRecordTimeMs);
} }