From 5371cc8454a2a14217175848794535f156ed4d98 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 4 Jan 2017 13:25:24 +0100 Subject: [PATCH] When failures happen during the scheduler's loopback run then make sure we either continue or stop. Relates to elastic/elasticsearch#628 Original commit: elastic/x-pack-elasticsearch@c5726bd8e4112734e872a675d08fbd93359efbf9 --- .../prelert/scheduler/ScheduledJobRunner.java | 25 +++++++++------ .../scheduler/ScheduledJobRunnerTests.java | 32 +++++++++++++++++++ 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java index dc24cba1080..4c9669b85d1 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java @@ -77,26 +77,33 @@ public class ScheduledJobRunner extends AbstractComponent { Holder holder = createJobScheduler(scheduler, job, handler); task.setHolder(holder); holder.future = threadPool.executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME).submit(() -> { + Long next = null; try { - Long next = holder.scheduledJob.runLookBack(startTime, endTime); - if (next != null) { - doScheduleRealtime(next, job.getId(), holder); - } else { - holder.stop(null); - } + next = holder.scheduledJob.runLookBack(startTime, endTime); } catch (ScheduledJob.ExtractionProblemException e) { + if (endTime == null) { + next = e.nextDelayInMsSinceEpoch; + } holder.problemTracker.reportExtractionProblem(e.getCause().getMessage()); } catch (ScheduledJob.AnalysisProblemException e) { + if (endTime == null) { + next = e.nextDelayInMsSinceEpoch; + } holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage()); } catch (ScheduledJob.EmptyDataCountException e) { - if (holder.problemTracker.updateEmptyDataCount(true)) { - holder.stop(e); + if (endTime == null && holder.problemTracker.updateEmptyDataCount(true) == false) { + next = e.nextDelayInMsSinceEpoch; } } catch (Exception e) { logger.error("Failed lookback import for job [" + job.getId() + "]", e); holder.stop(e); } - holder.problemTracker.finishReport(); + if (next != null) { + doScheduleRealtime(next, job.getId(), holder); + } else { + holder.stop(null); + holder.problemTracker.finishReport(); + } }); }); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunnerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunnerTests.java index d1dcf419dea..456c5c38865 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunnerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunnerTests.java @@ -141,6 +141,38 @@ public class ScheduledJobRunnerTests extends ESTestCase { verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STOPPED)), any()); } + public void testStart_extractionProblem() throws Exception { + Job.Builder jobBuilder = createScheduledJob(); + SchedulerConfig schedulerConfig = createSchedulerConfig("scheduler1", "foo").build(); + DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); + Job job = jobBuilder.build(); + PrelertMetadata prelertMetadata = new PrelertMetadata.Builder() + .putJob(job, false) + .putScheduler(schedulerConfig, mock(SearchRequestParsers.class)) + .updateStatus("foo", JobStatus.OPENED, null) + .build(); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata)) + .build()); + + DataExtractor dataExtractor = mock(DataExtractor.class); + when(dataExtractorFactory.newExtractor(schedulerConfig, job)).thenReturn(dataExtractor); + when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); + when(dataExtractor.next()).thenThrow(new RuntimeException("dummy")); + when(jobDataFuture.get()).thenReturn(new JobDataAction.Response(dataCounts)); + Consumer handler = mockConsumer(); + StartSchedulerAction.SchedulerTask task = mock(StartSchedulerAction.SchedulerTask.class); + scheduledJobRunner.run("scheduler1", 0L, 60000L, task, handler); + + verify(dataExtractor).newSearch(eq(0L), eq(60000L), any()); + verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME); + verify(threadPool, never()).schedule(any(), any(), any()); + verify(client, never()).execute(same(JobDataAction.INSTANCE), eq(new JobDataAction.Request("foo"))); + verify(client, never()).execute(same(FlushJobAction.INSTANCE), any()); + verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STARTED)), any()); + verify(client).execute(same(INSTANCE), eq(new Request("scheduler1", SchedulerStatus.STOPPED)), any()); + } + public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception { Job.Builder jobBuilder = createScheduledJob(); SchedulerConfig schedulerConfig = createSchedulerConfig("scheduler1", "foo").build();