[ML] Fix datafeed skipping first bucket after lookback when aggs are … (#39859) (#39958)

The problem here was that `DatafeedJob` was updating the last end time searched
based on the `now` even though when there are aggregations, the extactor will
only search up to the floor of `now` against the histogram interval.
This commit fixes the issue by using the end time as calculated by the extractor.

It also adds an integration test that uses aggregations. This test would fail
before this fix. Unfortunately the test is slow as we need to wait for the
datafeed to work in real time.

Closes #39842
This commit is contained in:
Dimitris Athanasiou 2019-03-13 09:09:07 +02:00 committed by GitHub
parent b8733eab00
commit 79e414df86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 174 additions and 6 deletions

View File

@ -33,4 +33,9 @@ public interface DataExtractor {
* Cancel the current search.
*/
void cancel();
/**
* @return the end time to which this extractor will search
*/
long getEndTime();
}

View File

@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.junit.After;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public class DatafeedWithAggsIT extends MlNativeAutodetectIntegTestCase {
@After
public void cleanup(){
cleanUp();
}
public void testRealtime() throws Exception {
String dataIndex = "datafeed-with-aggs-rt-data";
// A job with a bucket_span of 2s
String jobId = "datafeed-with-aggs-rt-job";
DataDescription.Builder dataDescription = new DataDescription.Builder();
Detector.Builder d = new Detector.Builder("count", null);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(2));
analysisConfig.setSummaryCountFieldName("doc_count");
Job.Builder jobBuilder = new Job.Builder();
jobBuilder.setId(jobId);
jobBuilder.setAnalysisConfig(analysisConfig);
jobBuilder.setDataDescription(dataDescription);
// Datafeed with aggs
String datafeedId = jobId + "-feed";
DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder(datafeedId, jobId);
datafeedBuilder.setQueryDelay(TimeValue.timeValueMillis(100));
datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(1));
datafeedBuilder.setIndices(Collections.singletonList(dataIndex));
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
aggs.addAggregator(AggregationBuilders.dateHistogram("time").field("time").interval(1000)
.subAggregation(AggregationBuilders.max("time").field("time")));
datafeedBuilder.setParsedAggregations(aggs);
DatafeedConfig datafeed = datafeedBuilder.build();
// Create stuff and open job
registerJob(jobBuilder);
putJob(jobBuilder);
registerDatafeed(datafeed);
putDatafeed(datafeed);
openJob(jobId);
// Now let's index the data
client().admin().indices().prepareCreate(dataIndex)
.addMapping("type", "time", "type=date")
.get();
// Index a doc per second from a minute ago to a minute later
long now = System.currentTimeMillis();
long aMinuteAgo = now - TimeValue.timeValueMinutes(1).millis();
long aMinuteLater = now + TimeValue.timeValueMinutes(1).millis();
long curTime = aMinuteAgo;
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
while (curTime < aMinuteLater) {
IndexRequest indexRequest = new IndexRequest(dataIndex);
indexRequest.source("time", curTime);
bulkRequestBuilder.add(indexRequest);
curTime += TimeValue.timeValueSeconds(1).millis();
}
BulkResponse bulkResponse = bulkRequestBuilder
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index docs: " + bulkResponse.buildFailureMessage());
}
// And start datafeed in real-time mode
startDatafeed(datafeedId, 0L, null);
// Wait until we finalize a bucket after now
assertBusy(() -> {
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
getBucketsRequest.setExcludeInterim(true);
getBucketsRequest.setSort("timestamp");
getBucketsRequest.setDescending(true);
List<Bucket> buckets = getBuckets(getBucketsRequest);
assertThat(buckets.size(), greaterThanOrEqualTo(1));
assertThat(buckets.get(0).getTimestamp().getTime(), greaterThan(now));
}, 30, TimeUnit.SECONDS);
// Wrap up
StopDatafeedAction.Response stopJobResponse = stopDatafeed(datafeedId);
assertTrue(stopJobResponse.isStopped());
assertBusy(() -> {
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
});
closeJob(jobId);
// Assert we have not dropped any data - final buckets should contain 2 events each
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
getBucketsRequest.setExcludeInterim(true);
List<Bucket> buckets = getBuckets(getBucketsRequest);
for (Bucket bucket : buckets) {
if (bucket.getEventCount() != 2) {
fail("Bucket [" + bucket.getTimestamp().getTime() + "] has [" + bucket.getEventCount() + "] when 2 were expected");
}
}
}
}

View File

@ -380,7 +380,7 @@ class DatafeedJob {
}
}
lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1);
lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, dataExtractor.getEndTime() - 1);
LOGGER.debug("[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", jobId, error, recordCount,
lastEndTimeMs, isRunning(), dataExtractor.isCancelled());

View File

@ -80,6 +80,11 @@ abstract class AbstractAggregationDataExtractor<T extends ActionRequestBuilder<S
hasNext = false;
}
@Override
public long getEndTime() {
return context.end;
}
@Override
public Optional<InputStream> next() throws IOException {
if (!hasNext()) {

View File

@ -170,6 +170,11 @@ public class ChunkedDataExtractor implements DataExtractor {
isCancelled = true;
}
@Override
public long getEndTime() {
return context.end;
}
ChunkedDataExtractorContext getContext() {
return context;
}

View File

@ -77,6 +77,11 @@ class ScrollDataExtractor implements DataExtractor {
isCancelled = true;
}
@Override
public long getEndTime() {
return context.end;
}
@Override
public Optional<InputStream> next() throws IOException {
if (!hasNext()) {

View File

@ -370,7 +370,7 @@ public class DatafeedJobTests extends ESTestCase {
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
}
public void testPostAnalysisProblem() throws Exception {
public void testPostAnalysisProblem() {
client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
@ -378,6 +378,8 @@ public class DatafeedJobTests extends ESTestCase {
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(new RuntimeException());
when(dataExtractor.getEndTime()).thenReturn(1000L);
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1);
DatafeedJob.AnalysisProblemException analysisProblemException =
expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L));
@ -397,7 +399,7 @@ public class DatafeedJobTests extends ESTestCase {
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
}
public void testPostAnalysisProblemIsConflict() throws Exception {
public void testPostAnalysisProblemIsConflict() {
client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
@ -405,6 +407,8 @@ public class DatafeedJobTests extends ESTestCase {
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(ExceptionsHelper.conflictStatusException("conflict"));
when(dataExtractor.getEndTime()).thenReturn(1000L);
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1);
DatafeedJob.AnalysisProblemException analysisProblemException =
expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L));
@ -424,7 +428,7 @@ public class DatafeedJobTests extends ESTestCase {
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
}
public void testFlushAnalysisProblem() throws Exception {
public void testFlushAnalysisProblem() {
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenThrow(new RuntimeException());
currentTime = 60000L;
@ -436,7 +440,7 @@ public class DatafeedJobTests extends ESTestCase {
assertThat(analysisProblemException.shouldStop, is(false));
}
public void testFlushAnalysisProblemIsConflict() throws Exception {
public void testFlushAnalysisProblemIsConflict() {
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenThrow(ExceptionsHelper.conflictStatusException("conflict"));
currentTime = 60000L;

View File

@ -596,5 +596,10 @@ public class ChunkedDataExtractorTests extends ESTestCase {
public void cancel() {
// do nothing
}
@Override
public long getEndTime() {
return 0;
}
}
}