diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index 8b72575ca8e..4ef2dfc222a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.util.QueryPage; @@ -28,7 +29,6 @@ import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DefaultFrequency; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.messages.Messages; -import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; @@ -110,7 +110,8 @@ public class DatafeedJobRunner extends AbstractComponent { } holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage()); } catch (DatafeedJob.EmptyDataCountException e) { - if (endTime == null && holder.problemTracker.updateEmptyDataCount(true) == false) { + if (endTime == null) { + holder.problemTracker.reportEmptyDataCount(); next = e.nextDelayInMsSinceEpoch; } } catch (Exception e) { @@ -127,7 +128,7 @@ public class DatafeedJobRunner extends AbstractComponent { }); } - private void doDatafeedRealtime(long delayInMsSinceEpoch, String jobId, Holder holder) { + void doDatafeedRealtime(long delayInMsSinceEpoch, String jobId, Holder holder) { if (holder.isRunning()) { TimeValue delay = computeNextDelay(delayInMsSinceEpoch); logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId); @@ -135,6 +136,7 @@ public class DatafeedJobRunner extends AbstractComponent { long nextDelayInMsSinceEpoch; try { nextDelayInMsSinceEpoch = holder.datafeedJob.runRealtime(); + holder.problemTracker.reportNoneEmptyCount(); } catch (DatafeedJob.ExtractionProblemException e) { nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch; holder.problemTracker.reportExtractionProblem(e.getCause().getMessage()); @@ -143,11 +145,7 @@ public class DatafeedJobRunner extends AbstractComponent { holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage()); } catch (DatafeedJob.EmptyDataCountException e) { nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch; - if (holder.problemTracker.updateEmptyDataCount(true)) { - holder.problemTracker.finishReport(); - holder.stop("empty_data", e); - return; - } + holder.problemTracker.reportEmptyDataCount(); } catch (Exception e) { logger.error("Unexpected datafeed failure for job [" + jobId + "] stopping...", e); holder.stop("general_realtime_error", e); @@ -159,7 +157,7 @@ public class DatafeedJobRunner extends AbstractComponent { } } - private Holder createJobDatafeed(DatafeedConfig datafeed, Job job, long finalBucketEndMs, long latestRecordTimeMs, + Holder createJobDatafeed(DatafeedConfig datafeed, Job job, long finalBucketEndMs, long latestRecordTimeMs, Consumer handler, StartDatafeedAction.DatafeedTask task) { Auditor auditor = jobProvider.audit(job.getId()); Duration frequency = getFrequencyOrDefault(datafeed, job); @@ -232,7 +230,7 @@ public class DatafeedJobRunner extends AbstractComponent { private final Consumer handler; volatile Future future; - private Holder(DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker, + Holder(DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker, Consumer handler) { this.datafeed = datafeed; this.datafeedJob = datafeedJob; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java index a2072d3f15f..79dc342815a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java @@ -5,8 +5,8 @@ */ package org.elasticsearch.xpack.ml.datafeed; -import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.job.messages.Messages; +import org.elasticsearch.xpack.ml.notifications.Auditor; import java.util.Objects; import java.util.function.Supplier; @@ -72,26 +72,22 @@ class ProblemTracker { /** * Updates the tracking of empty data cycles. If the number of consecutive empty data - * cycles reaches {@code EMPTY_DATA_WARN_COUNT}, a warning is reported. If non-empty - * is reported and a warning was issued previously, a recovery info is reported. - * - * @param empty Whether data was seen since last report - * @return {@code true} if an empty data warning was issued, {@code false} otherwise + * cycles reaches {@code EMPTY_DATA_WARN_COUNT}, a warning is reported. */ - public boolean updateEmptyDataCount(boolean empty) { - if (empty && emptyDataCount < EMPTY_DATA_WARN_COUNT) { + public void reportEmptyDataCount() { + if (emptyDataCount < EMPTY_DATA_WARN_COUNT) { emptyDataCount++; if (emptyDataCount == EMPTY_DATA_WARN_COUNT) { auditor.get().warning(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_NO_DATA)); - return true; } - } else if (!empty) { - if (emptyDataCount >= EMPTY_DATA_WARN_COUNT) { - auditor.get().info(Messages.getMessage(Messages.JOB_AUDIR_DATAFEED_DATA_SEEN_AGAIN)); - } - emptyDataCount = 0; } - return false; + } + + public void reportNoneEmptyCount() { + if (emptyDataCount >= EMPTY_DATA_WARN_COUNT) { + auditor.get().info(Messages.getMessage(Messages.JOB_AUDIR_DATAFEED_DATA_SEEN_AGAIN)); + } + emptyDataCount = 0; } public boolean hasProblems() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java index bf644ba99dc..7f426b2b2d8 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java @@ -47,10 +47,12 @@ import java.util.Arrays; import java.util.Date; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.function.Consumer; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; @@ -64,6 +66,7 @@ import static org.mockito.Mockito.when; public class DatafeedJobRunnerTests extends ESTestCase { private Client client; + private Auditor auditor; private ActionFuture jobDataFuture; private ActionFuture flushJobFuture; private ClusterService clusterService; @@ -89,7 +92,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { return null; }).when(jobProvider).dataCounts(any(), any(), any()); dataExtractorFactory = mock(DataExtractorFactory.class); - Auditor auditor = mock(Auditor.class); + auditor = mock(Auditor.class); threadPool = mock(ThreadPool.class); ExecutorService executorService = mock(ExecutorService.class); doAnswer(invocation -> { @@ -190,6 +193,42 @@ public class DatafeedJobRunnerTests extends ESTestCase { verify(client, never()).execute(same(FlushJobAction.INSTANCE), any()); } + public void testStart_emptyDataCountException() throws Exception { + currentTime = 6000000; + Job.Builder jobBuilder = createDatafeedJob(); + DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build(); + Job job = jobBuilder.build(); + MlMetadata mlMetadata = new MlMetadata.Builder() + .putJob(job, false) + .putDatafeed(datafeedConfig) + .build(); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata)) + .build()); + int[] counter = new int[] {0}; + doAnswer(invocationOnMock -> { + if (counter[0]++ < 10) { + Runnable r = (Runnable) invocationOnMock.getArguments()[2]; + currentTime += 600000; + r.run(); + } + return mock(ScheduledFuture.class); + }).when(threadPool).schedule(any(), any(), any()); + + DataExtractor dataExtractor = mock(DataExtractor.class); + when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor); + when(dataExtractor.hasNext()).thenReturn(false); + Consumer handler = mockConsumer(); + StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed1", 0L, null); + DatafeedJobRunner.Holder holder = datafeedJobRunner.createJobDatafeed(datafeedConfig, job, 100, 100, handler, task); + datafeedJobRunner.doDatafeedRealtime(10L, "foo", holder); + + verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME), any()); + verify(auditor, times(1)).warning(anyString()); + verify(client, never()).execute(same(PostDataAction.INSTANCE), any()); + verify(client, never()).execute(same(FlushJobAction.INSTANCE), any()); + } + public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception { Job.Builder jobBuilder = createDatafeedJob(); DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java index c91c259b1fa..fb2eea13975 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java @@ -59,7 +59,7 @@ public class ProblemTrackerTests extends ESTestCase { public void testUpdateEmptyDataCount_GivenEmptyNineTimes() { for (int i = 0; i < 9; i++) { - problemTracker.updateEmptyDataCount(true); + problemTracker.reportEmptyDataCount(); } Mockito.verifyNoMoreInteractions(auditor); @@ -67,7 +67,7 @@ public class ProblemTrackerTests extends ESTestCase { public void testUpdateEmptyDataCount_GivenEmptyTenTimes() { for (int i = 0; i < 10; i++) { - problemTracker.updateEmptyDataCount(true); + problemTracker.reportEmptyDataCount(); } verify(auditor).warning("Datafeed has been retrieving no data for a while"); @@ -75,7 +75,7 @@ public class ProblemTrackerTests extends ESTestCase { public void testUpdateEmptyDataCount_GivenEmptyElevenTimes() { for (int i = 0; i < 11; i++) { - problemTracker.updateEmptyDataCount(true); + problemTracker.reportEmptyDataCount(); } verify(auditor, times(1)).warning("Datafeed has been retrieving no data for a while"); @@ -83,18 +83,18 @@ public class ProblemTrackerTests extends ESTestCase { public void testUpdateEmptyDataCount_GivenNonEmptyAfterNineEmpty() { for (int i = 0; i < 9; i++) { - problemTracker.updateEmptyDataCount(true); + problemTracker.reportEmptyDataCount(); } - problemTracker.updateEmptyDataCount(false); + problemTracker.reportNoneEmptyCount(); Mockito.verifyNoMoreInteractions(auditor); } public void testUpdateEmptyDataCount_GivenNonEmptyAfterTenEmpty() { for (int i = 0; i < 10; i++) { - problemTracker.updateEmptyDataCount(true); + problemTracker.reportEmptyDataCount(); } - problemTracker.updateEmptyDataCount(false); + problemTracker.reportNoneEmptyCount(); verify(auditor).warning("Datafeed has been retrieving no data for a while"); verify(auditor).info("Datafeed has started retrieving data again");