When failures happen during the scheduler's loopback run then make sure we either continue or stop.

Relates to elastic/elasticsearch#628

Original commit: elastic/x-pack-elasticsearch@c5726bd8e4
This commit is contained in:
Martijn van Groningen 2017-01-04 13:25:24 +01:00
parent a816ee27bb
commit 5371cc8454
2 changed files with 48 additions and 9 deletions

View File

@ -77,26 +77,33 @@ public class ScheduledJobRunner extends AbstractComponent {
Holder holder = createJobScheduler(scheduler, job, handler);
task.setHolder(holder);
holder.future = threadPool.executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME).submit(() -> {
Long next = null;
try {
Long next = holder.scheduledJob.runLookBack(startTime, endTime);
if (next != null) {
doScheduleRealtime(next, job.getId(), holder);
} else {
holder.stop(null);
}
next = holder.scheduledJob.runLookBack(startTime, endTime);
} catch (ScheduledJob.ExtractionProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
} catch (ScheduledJob.AnalysisProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (ScheduledJob.EmptyDataCountException e) {
if (holder.problemTracker.updateEmptyDataCount(true)) {
holder.stop(e);
if (endTime == null && holder.problemTracker.updateEmptyDataCount(true) == false) {
next = e.nextDelayInMsSinceEpoch;
}
} catch (Exception e) {
logger.error("Failed lookback import for job [" + job.getId() + "]", e);
holder.stop(e);
}
holder.problemTracker.finishReport();
if (next != null) {
doScheduleRealtime(next, job.getId(), holder);
} else {
holder.stop(null);
holder.problemTracker.finishReport();
}
});
});
}

View File

@ -141,6 +141,38 @@ public class ScheduledJobRunnerTests extends ESTestCase {
verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STOPPED)), any());
}
public void testStart_extractionProblem() throws Exception {
Job.Builder jobBuilder = createScheduledJob();
SchedulerConfig schedulerConfig = createSchedulerConfig("scheduler1", "foo").build();
DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
Job job = jobBuilder.build();
PrelertMetadata prelertMetadata = new PrelertMetadata.Builder()
.putJob(job, false)
.putScheduler(schedulerConfig, mock(SearchRequestParsers.class))
.updateStatus("foo", JobStatus.OPENED, null)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata))
.build());
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(schedulerConfig, job)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenThrow(new RuntimeException("dummy"));
when(jobDataFuture.get()).thenReturn(new JobDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
StartSchedulerAction.SchedulerTask task = mock(StartSchedulerAction.SchedulerTask.class);
scheduledJobRunner.run("scheduler1", 0L, 60000L, task, handler);
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client, never()).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo")));
verify(client, never()).execute(same(FlushJobAction.INSTANCE), any());
verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STARTED)), any());
verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STOPPED)), any());
}
public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception {
Job.Builder jobBuilder = createScheduledJob();
SchedulerConfig schedulerConfig = createSchedulerConfig("scheduler1", "foo").build();