[ML] Respect requested start time when datafeed is resumed (elastic/x-pack-elasticsearch#1822)

This commit fixes a bug where if a datafeed is resumed with `now`
as the `start` time, the datafeed still pulls all data from last
seen record.

relates elastic/x-pack-elasticsearch#1821

Original commit: elastic/x-pack-elasticsearch@ff066c0e04
This commit is contained in:
Dimitris Athanasiou 2017-06-22 12:13:42 +01:00 committed by GitHub
parent bd973aaadb
commit 0399be6406
2 changed files with 29 additions and 5 deletions

View File

@ -116,7 +116,7 @@ class DatafeedJob {
}
long runRealtime() throws Exception {
long start = lastEndTimeMs == null ? lookbackStartTimeMs : lastEndTimeMs + 1;
long start = lastEndTimeMs == null ? lookbackStartTimeMs : Math.max(lookbackStartTimeMs, lastEndTimeMs + 1);
long nowMinusQueryDelay = currentTimeSupplier.get() - queryDelayMs;
long end = toIntervalStartEpochMs(nowMinusQueryDelay);
FlushJobAction.Request request = new FlushJobAction.Request(jobId);

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
@ -48,6 +49,7 @@ public class DatafeedJobTests extends ESTestCase {
private DataExtractor dataExtractor;
private Client client;
private DataDescription.Builder dataDescription;
ActionFuture<PostDataAction.Response> postDataFuture;
private ActionFuture<FlushJobAction.Response> flushJobFuture;
private ArgumentCaptor<FlushJobAction.Request> flushJobRequests;
@ -64,7 +66,7 @@ public class DatafeedJobTests extends ESTestCase {
client = mock(Client.class);
dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
ActionFuture<PostDataAction.Response> jobDataFuture = mock(ActionFuture.class);
postDataFuture = mock(ActionFuture.class);
flushJobFuture = mock(ActionFuture.class);
currentTime = 0;
xContentType = XContentType.JSON;
@ -79,8 +81,8 @@ public class DatafeedJobTests extends ESTestCase {
PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id");
expectedRequest.setDataDescription(dataDescription.build());
expectedRequest.setContent(new BytesArray(contentBytes), xContentType);
when(client.execute(same(PostDataAction.INSTANCE), eq(expectedRequest))).thenReturn(jobDataFuture);
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
when(client.execute(same(PostDataAction.INSTANCE), eq(expectedRequest))).thenReturn(postDataFuture);
when(postDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
flushJobRequests = ArgumentCaptor.forClass(FlushJobAction.Request.class);
when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture);
@ -120,7 +122,7 @@ public class DatafeedJobTests extends ESTestCase {
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
}
public void testLookBackRunWithOverrideStartTime() throws Exception {
public void testLookBackRunWithStartTimeEarlierThanResumePoint() throws Exception {
currentTime = 10000L;
long latestFinalBucketEndTimeMs = -1;
long latestRecordTimeMs = -1;
@ -142,6 +144,28 @@ public class DatafeedJobTests extends ESTestCase {
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
}
public void testContinueFromNow() throws Exception {
// We need to return empty counts so that the lookback doesn't update the last end time
when(postDataFuture.actionGet()).thenReturn(new PostDataAction.Response(new DataCounts("_job_id")));
currentTime = 10000L;
long latestFinalBucketEndTimeMs = 5000;
long latestRecordTimeMs = 5000;
long frequencyMs = 1000;
long queryDelayMs = 500;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs);
datafeedJob.runLookBack(10000L, null);
// advance time
currentTime = 12000L;
expectThrows(DatafeedJob.EmptyDataCountException.class, () -> datafeedJob.runRealtime());
verify(dataExtractorFactory, times(1)).newExtractor(10000L, 11000L);
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
}
public void testRealtimeRun() throws Exception {
currentTime = 60000L;
long frequencyMs = 100;