[ML] Flush job even when datafeed retrieves no data (elastic/x-pack-elasticsearch#1786)

In order to detect anomalies due to the absence of data we
need to flush the job even if the datafeed retrieves no data.

relates elastic/x-pack-elasticsearch#1794

Original commit: elastic/x-pack-elasticsearch@645e393976
This commit is contained in:
Dimitris Athanasiou 2017-06-21 10:37:29 +01:00 committed by GitHub
parent a8e394c3b5
commit c661ee0934
2 changed files with 12 additions and 9 deletions

View File

@ -121,7 +121,7 @@ class DatafeedJob {
long end = toIntervalStartEpochMs(nowMinusQueryDelay);
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
request.setCalcInterim(true);
request.setAdvanceTime(String.valueOf(lastEndTimeMs));
request.setAdvanceTime(String.valueOf(end));
run(start, end, request);
return nextRealtimeTimestamp();
}
@ -228,16 +228,16 @@ class DatafeedJob {
throw error;
}
if (recordCount == 0) {
throw new EmptyDataCountException(nextRealtimeTimestamp());
}
// If the datafeed was stopped, then it is possible that by the time
// we call flush the job is closed. Thus, we don't flush unless the
// datafeed is still running.
if (isRunning() && !isIsolated) {
flushJob(flushRequest);
}
if (recordCount == 0) {
throw new EmptyDataCountException(nextRealtimeTimestamp());
}
}
private DataCounts postData(InputStream inputStream, XContentType xContentType)

View File

@ -30,6 +30,7 @@ import java.util.Optional;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
@ -152,15 +153,17 @@ public class DatafeedJobTests extends ESTestCase {
verify(dataExtractorFactory).newExtractor(1000L + 1L, currentTime - queryDelayMs);
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
flushRequest.setCalcInterim(true);
flushRequest.setAdvanceTime("1000");
flushRequest.setAdvanceTime("59000");
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
}
public void testEmptyDataCount() throws Exception {
public void testEmptyDataCountGivenlookback() throws Exception {
when(dataExtractor.hasNext()).thenReturn(false);
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1);
expectThrows(DatafeedJob.EmptyDataCountException.class, () -> datafeedJob.runLookBack(0L, 1000L));
verify(client, times(1)).execute(same(FlushJobAction.INSTANCE), any());
assertThat(flushJobRequests.getValue().getAdvanceTime(), is(nullValue()));
}
public void testExtractionProblem() throws Exception {
@ -203,7 +206,7 @@ public class DatafeedJobTests extends ESTestCase {
assertEquals(1000L, startTimeCaptor.getAllValues().get(1).longValue());
assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue());
assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue());
verify(client, times(0)).execute(same(FlushJobAction.INSTANCE), any());
verify(client, times(1)).execute(same(FlushJobAction.INSTANCE), any());
}
public void testPostAnalysisProblemIsConflict() throws Exception {
@ -226,7 +229,7 @@ public class DatafeedJobTests extends ESTestCase {
assertEquals(1000L, startTimeCaptor.getAllValues().get(1).longValue());
assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue());
assertEquals(2000L, endTimeCaptor.getAllValues().get(1).longValue());
verify(client, times(0)).execute(same(FlushJobAction.INSTANCE), any());
verify(client, times(1)).execute(same(FlushJobAction.INSTANCE), any());
}
public void testFlushAnalysisProblem() throws Exception {