diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index d164cb720a2..19cefab02a4 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -121,7 +121,7 @@ class DatafeedJob { long end = toIntervalStartEpochMs(nowMinusQueryDelay); FlushJobAction.Request request = new FlushJobAction.Request(jobId); request.setCalcInterim(true); - request.setAdvanceTime(String.valueOf(lastEndTimeMs)); + request.setAdvanceTime(String.valueOf(end)); run(start, end, request); return nextRealtimeTimestamp(); } @@ -228,16 +228,16 @@ class DatafeedJob { throw error; } - if (recordCount == 0) { - throw new EmptyDataCountException(nextRealtimeTimestamp()); - } - // If the datafeed was stopped, then it is possible that by the time // we call flush the job is closed. Thus, we don't flush unless the // datafeed is still running. if (isRunning() && !isIsolated) { flushJob(flushRequest); } + + if (recordCount == 0) { + throw new EmptyDataCountException(nextRealtimeTimestamp()); + } } private DataCounts postData(InputStream inputStream, XContentType xContentType) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index a6085427bc5..412b2d09edb 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -30,6 +30,7 @@ import java.util.Optional; import java.util.function.Supplier; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; @@ -152,15 +153,17 @@ public class DatafeedJobTests extends ESTestCase { verify(dataExtractorFactory).newExtractor(1000L + 1L, currentTime - queryDelayMs); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); flushRequest.setCalcInterim(true); - flushRequest.setAdvanceTime("1000"); + flushRequest.setAdvanceTime("59000"); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); } - public void testEmptyDataCount() throws Exception { + public void testEmptyDataCountGivenlookback() throws Exception { when(dataExtractor.hasNext()).thenReturn(false); DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1); expectThrows(DatafeedJob.EmptyDataCountException.class, () -> datafeedJob.runLookBack(0L, 1000L)); + verify(client, times(1)).execute(same(FlushJobAction.INSTANCE), any()); + assertThat(flushJobRequests.getValue().getAdvanceTime(), is(nullValue())); } public void testExtractionProblem() throws Exception { @@ -203,7 +206,7 @@ public class DatafeedJobTests extends ESTestCase { assertEquals(1000L, startTimeCaptor.getAllValues().get(1).longValue()); assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue()); assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue()); - verify(client, times(0)).execute(same(FlushJobAction.INSTANCE), any()); + verify(client, times(1)).execute(same(FlushJobAction.INSTANCE), any()); } public void testPostAnalysisProblemIsConflict() throws Exception { @@ -226,7 +229,7 @@ public class DatafeedJobTests extends ESTestCase { assertEquals(1000L, startTimeCaptor.getAllValues().get(1).longValue()); assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue()); assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue()); - verify(client, times(0)).execute(same(FlushJobAction.INSTANCE), any()); + verify(client, times(1)).execute(same(FlushJobAction.INSTANCE), any()); } public void testFlushAnalysisProblem() throws Exception {