parent
b04b3173db
commit
cfc310748d
|
@ -31,6 +31,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
|
|||
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.core.security.user.SystemUser;
|
||||
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
|
||||
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
|
||||
|
@ -189,24 +190,27 @@ class DatafeedJob {
|
|||
long totalRecordsMissing = missingDataBuckets.stream()
|
||||
.mapToLong(BucketWithMissingData::getMissingDocumentCount)
|
||||
.sum();
|
||||
Date endTime = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket().getTimestamp();
|
||||
Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(),
|
||||
endTime,
|
||||
totalRecordsMissing);
|
||||
Bucket lastBucket = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket();
|
||||
// Get the end of the last bucket and make it milliseconds
|
||||
Date endTime = new Date((lastBucket.getEpoch() + lastBucket.getBucketSpan()) * 1000);
|
||||
|
||||
String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing,
|
||||
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(lastBucket.getTimestamp().getTime()));
|
||||
|
||||
Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(), endTime, msg);
|
||||
|
||||
// Have we an annotation that covers the same area with the same message?
|
||||
// Cannot use annotation.equals(other) as that checks createTime
|
||||
if (lastDataCheckAnnotation != null
|
||||
&& annotation.getAnnotation().equals(lastDataCheckAnnotation.getAnnotation())
|
||||
&& annotation.getTimestamp().equals(lastDataCheckAnnotation.getTimestamp())
|
||||
&& annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getTimestamp())) {
|
||||
&& annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getEndTimestamp())) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Creating a warning in addition to updating/creating our annotation. This allows the issue to be plainly visible
|
||||
// in the job list page.
|
||||
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing,
|
||||
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(endTime.getTime())));
|
||||
auditor.warning(jobId, msg);
|
||||
|
||||
if (lastDataCheckAnnotationId != null) {
|
||||
updateAnnotation(annotation);
|
||||
|
@ -217,17 +221,16 @@ class DatafeedJob {
|
|||
}
|
||||
}
|
||||
|
||||
private Annotation createAnnotation(Date startTime, Date endTime, long recordsMissing) {
|
||||
String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, recordsMissing,
|
||||
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(endTime.getTime()));
|
||||
private Annotation createAnnotation(Date startTime, Date endTime, String msg) {
|
||||
Date currentTime = new Date(currentTimeSupplier.get());
|
||||
return new Annotation(msg,
|
||||
new Date(currentTimeSupplier.get()),
|
||||
currentTime,
|
||||
SystemUser.NAME,
|
||||
startTime,
|
||||
endTime,
|
||||
jobId,
|
||||
null,
|
||||
null,
|
||||
currentTime,
|
||||
SystemUser.NAME,
|
||||
"annotation");
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@ import java.io.ByteArrayInputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
@ -226,6 +227,8 @@ public class DatafeedJobTests extends ESTestCase {
|
|||
flushJobResponse = new FlushJobAction.Response(true, new Date(2000));
|
||||
Bucket bucket = mock(Bucket.class);
|
||||
when(bucket.getTimestamp()).thenReturn(new Date(2000));
|
||||
when(bucket.getEpoch()).thenReturn(2L);
|
||||
when(bucket.getBucketSpan()).thenReturn(4L);
|
||||
when(flushJobFuture.actionGet()).thenReturn(flushJobResponse);
|
||||
when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture);
|
||||
when(delayedDataDetector.detectMissingData(2000))
|
||||
|
@ -270,10 +273,10 @@ public class DatafeedJobTests extends ESTestCase {
|
|||
new Date(currentTime),
|
||||
SystemUser.NAME,
|
||||
bucket.getTimestamp(),
|
||||
bucket.getTimestamp(),
|
||||
new Date((bucket.getEpoch() + bucket.getBucketSpan()) * 1000),
|
||||
jobId,
|
||||
null,
|
||||
null,
|
||||
new Date(currentTime),
|
||||
SystemUser.NAME,
|
||||
"annotation");
|
||||
|
||||
IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
|
||||
|
@ -286,8 +289,13 @@ public class DatafeedJobTests extends ESTestCase {
|
|||
assertThat(request.source(), equalTo(indexRequestArgumentCaptor.getValue().source()));
|
||||
|
||||
// Execute a fourth time, this time we return a new delayedDataDetector response to verify annotation gets updated
|
||||
Bucket bucket2 = mock(Bucket.class);
|
||||
when(bucket2.getTimestamp()).thenReturn(new Date(6000));
|
||||
when(bucket2.getEpoch()).thenReturn(6L);
|
||||
when(bucket2.getBucketSpan()).thenReturn(4L);
|
||||
when(delayedDataDetector.detectMissingData(2000))
|
||||
.thenReturn(Collections.singletonList(BucketWithMissingData.fromMissingAndBucket(15, bucket)));
|
||||
.thenReturn(Arrays.asList(BucketWithMissingData.fromMissingAndBucket(10, bucket),
|
||||
BucketWithMissingData.fromMissingAndBucket(5, bucket2)));
|
||||
currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
|
||||
inputStream = new ByteArrayInputStream(contentBytes);
|
||||
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
|
||||
|
@ -297,7 +305,7 @@ public class DatafeedJobTests extends ESTestCase {
|
|||
|
||||
msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA,
|
||||
15,
|
||||
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(2000));
|
||||
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(6000));
|
||||
// What we expect the updated annotation to be indexed as
|
||||
IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
|
||||
indexRequest.id(annotationDocId);
|
||||
|
@ -305,6 +313,7 @@ public class DatafeedJobTests extends ESTestCase {
|
|||
updatedAnnotation.setAnnotation(msg);
|
||||
updatedAnnotation.setModifiedTime(new Date(currentTime));
|
||||
updatedAnnotation.setModifiedUsername(SystemUser.NAME);
|
||||
updatedAnnotation.setEndTimestamp(new Date((bucket2.getEpoch() + bucket2.getBucketSpan()) * 1000));
|
||||
try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
|
||||
indexRequest.source(xContentBuilder);
|
||||
}
|
||||
|
@ -317,6 +326,17 @@ public class DatafeedJobTests extends ESTestCase {
|
|||
assertThat(indexRequest.source().utf8ToString(),
|
||||
equalTo(updateRequestArgumentCaptor.getValue().source().utf8ToString()));
|
||||
assertThat(updateRequestArgumentCaptor.getValue().opType(), equalTo(DocWriteRequest.OpType.INDEX));
|
||||
|
||||
// Execute a fifth time, no changes should occur as annotation is the same
|
||||
currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
|
||||
inputStream = new ByteArrayInputStream(contentBytes);
|
||||
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
|
||||
when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
|
||||
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
|
||||
datafeedJob.runRealtime();
|
||||
|
||||
// We should not get 3 index requests for the annotations
|
||||
verify(client, atMost(2)).index(any());
|
||||
}
|
||||
|
||||
public void testEmptyDataCountGivenlookback() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue