From 79e414df8672939b8549e435c6e19217615249b3 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 13 Mar 2019 09:09:07 +0200 Subject: [PATCH] =?UTF-8?q?[ML]=20Fix=20datafeed=20skipping=20first=20buck?= =?UTF-8?q?et=20after=20lookback=20when=20aggs=20are=20=E2=80=A6=20(#39859?= =?UTF-8?q?)=20(#39958)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The problem here was that `DatafeedJob` was updating the last end time searched based on the `now` even though when there are aggregations, the extactor will only search up to the floor of `now` against the histogram interval. This commit fixes the issue by using the end time as calculated by the extractor. It also adds an integration test that uses aggregations. This test would fail before this fix. Unfortunately the test is slow as we need to wait for the datafeed to work in real time. Closes #39842 --- .../ml/datafeed/extractor/DataExtractor.java | 5 + .../ml/integration/DatafeedWithAggsIT.java | 139 ++++++++++++++++++ .../xpack/ml/datafeed/DatafeedJob.java | 2 +- .../AbstractAggregationDataExtractor.java | 5 + .../chunked/ChunkedDataExtractor.java | 7 +- .../extractor/scroll/ScrollDataExtractor.java | 5 + .../xpack/ml/datafeed/DatafeedJobTests.java | 12 +- .../chunked/ChunkedDataExtractorTests.java | 5 + 8 files changed, 174 insertions(+), 6 deletions(-) create mode 100644 x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/DataExtractor.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/DataExtractor.java index 20968b22425..ff276383361 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/DataExtractor.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/DataExtractor.java @@ -33,4 +33,9 @@ public interface DataExtractor { * Cancel the current search. */ void cancel(); + + /** + * @return the end time to which this extractor will search + */ + long getEndTime(); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java new file mode 100644 index 00000000000..3e89da92b6f --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; +import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.junit.After; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class DatafeedWithAggsIT extends MlNativeAutodetectIntegTestCase { + + @After + public void cleanup(){ + cleanUp(); + } + + public void testRealtime() throws Exception { + String dataIndex = "datafeed-with-aggs-rt-data"; + + // A job with a bucket_span of 2s + String jobId = "datafeed-with-aggs-rt-job"; + DataDescription.Builder dataDescription = new DataDescription.Builder(); + + Detector.Builder d = new Detector.Builder("count", null); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(2)); + analysisConfig.setSummaryCountFieldName("doc_count"); + + Job.Builder jobBuilder = new Job.Builder(); + jobBuilder.setId(jobId); + + jobBuilder.setAnalysisConfig(analysisConfig); + jobBuilder.setDataDescription(dataDescription); + + // Datafeed with aggs + String datafeedId = jobId + "-feed"; + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder(datafeedId, jobId); + datafeedBuilder.setQueryDelay(TimeValue.timeValueMillis(100)); + datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(1)); + datafeedBuilder.setIndices(Collections.singletonList(dataIndex)); + + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); + aggs.addAggregator(AggregationBuilders.dateHistogram("time").field("time").interval(1000) + .subAggregation(AggregationBuilders.max("time").field("time"))); + datafeedBuilder.setParsedAggregations(aggs); + + DatafeedConfig datafeed = datafeedBuilder.build(); + + // Create stuff and open job + registerJob(jobBuilder); + putJob(jobBuilder); + registerDatafeed(datafeed); + putDatafeed(datafeed); + openJob(jobId); + + // Now let's index the data + client().admin().indices().prepareCreate(dataIndex) + .addMapping("type", "time", "type=date") + .get(); + + // Index a doc per second from a minute ago to a minute later + long now = System.currentTimeMillis(); + long aMinuteAgo = now - TimeValue.timeValueMinutes(1).millis(); + long aMinuteLater = now + TimeValue.timeValueMinutes(1).millis(); + long curTime = aMinuteAgo; + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + while (curTime < aMinuteLater) { + IndexRequest indexRequest = new IndexRequest(dataIndex); + indexRequest.source("time", curTime); + bulkRequestBuilder.add(indexRequest); + curTime += TimeValue.timeValueSeconds(1).millis(); + } + BulkResponse bulkResponse = bulkRequestBuilder + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index docs: " + bulkResponse.buildFailureMessage()); + } + + // And start datafeed in real-time mode + startDatafeed(datafeedId, 0L, null); + + // Wait until we finalize a bucket after now + assertBusy(() -> { + GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId); + getBucketsRequest.setExcludeInterim(true); + getBucketsRequest.setSort("timestamp"); + getBucketsRequest.setDescending(true); + List buckets = getBuckets(getBucketsRequest); + assertThat(buckets.size(), greaterThanOrEqualTo(1)); + assertThat(buckets.get(0).getTimestamp().getTime(), greaterThan(now)); + }, 30, TimeUnit.SECONDS); + + // Wrap up + StopDatafeedAction.Response stopJobResponse = stopDatafeed(datafeedId); + assertTrue(stopJobResponse.isStopped()); + assertBusy(() -> { + GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); + assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED)); + }); + closeJob(jobId); + + // Assert we have not dropped any data - final buckets should contain 2 events each + GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId); + getBucketsRequest.setExcludeInterim(true); + List buckets = getBuckets(getBucketsRequest); + for (Bucket bucket : buckets) { + if (bucket.getEventCount() != 2) { + fail("Bucket [" + bucket.getTimestamp().getTime() + "] has [" + bucket.getEventCount() + "] when 2 were expected"); + } + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 85f2489e6b0..4e5e2070d43 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -380,7 +380,7 @@ class DatafeedJob { } } - lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1); + lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, dataExtractor.getEndTime() - 1); LOGGER.debug("[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", jobId, error, recordCount, lastEndTimeMs, isRunning(), dataExtractor.isCancelled()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java index df858f45c82..aa5c7ed6314 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java @@ -80,6 +80,11 @@ abstract class AbstractAggregationDataExtractor next() throws IOException { if (!hasNext()) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java index 618ae6ee9a3..f1e1fe2a10a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java @@ -114,7 +114,7 @@ public class ChunkedDataExtractor implements DataExtractor { currentEnd = currentStart; chunkSpan = context.chunkSpan == null ? dataSummary.estimateChunk() : context.chunkSpan.getMillis(); chunkSpan = context.timeAligner.alignToCeil(chunkSpan); - LOGGER.debug("[{}]Chunked search configured: kind = {}, dataTimeSpread = {} ms, chunk span = {} ms", + LOGGER.debug("[{}] Chunked search configured: kind = {}, dataTimeSpread = {} ms, chunk span = {} ms", context.jobId, dataSummary.getClass().getSimpleName(), dataSummary.getDataTimeSpread(), chunkSpan); } else { // search is over @@ -170,6 +170,11 @@ public class ChunkedDataExtractor implements DataExtractor { isCancelled = true; } + @Override + public long getEndTime() { + return context.end; + } + ChunkedDataExtractorContext getContext() { return context; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index 5e6eb96637d..b848eba948c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -77,6 +77,11 @@ class ScrollDataExtractor implements DataExtractor { isCancelled = true; } + @Override + public long getEndTime() { + return context.end; + } + @Override public Optional next() throws IOException { if (!hasNext()) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 2540ab8cde8..8d8bd84a97c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -370,7 +370,7 @@ public class DatafeedJobTests extends ESTestCase { verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); } - public void testPostAnalysisProblem() throws Exception { + public void testPostAnalysisProblem() { client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); @@ -378,6 +378,8 @@ public class DatafeedJobTests extends ESTestCase { when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(new RuntimeException()); + when(dataExtractor.getEndTime()).thenReturn(1000L); + DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1); DatafeedJob.AnalysisProblemException analysisProblemException = expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L)); @@ -397,7 +399,7 @@ public class DatafeedJobTests extends ESTestCase { verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); } - public void testPostAnalysisProblemIsConflict() throws Exception { + public void testPostAnalysisProblemIsConflict() { client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); @@ -405,6 +407,8 @@ public class DatafeedJobTests extends ESTestCase { when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(ExceptionsHelper.conflictStatusException("conflict")); + when(dataExtractor.getEndTime()).thenReturn(1000L); + DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1); DatafeedJob.AnalysisProblemException analysisProblemException = expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L)); @@ -424,7 +428,7 @@ public class DatafeedJobTests extends ESTestCase { verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); } - public void testFlushAnalysisProblem() throws Exception { + public void testFlushAnalysisProblem() { when(client.execute(same(FlushJobAction.INSTANCE), any())).thenThrow(new RuntimeException()); currentTime = 60000L; @@ -436,7 +440,7 @@ public class DatafeedJobTests extends ESTestCase { assertThat(analysisProblemException.shouldStop, is(false)); } - public void testFlushAnalysisProblemIsConflict() throws Exception { + public void testFlushAnalysisProblemIsConflict() { when(client.execute(same(FlushJobAction.INSTANCE), any())).thenThrow(ExceptionsHelper.conflictStatusException("conflict")); currentTime = 60000L; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java index c8e53dfcf7d..406f1a5fa90 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java @@ -596,5 +596,10 @@ public class ChunkedDataExtractorTests extends ESTestCase { public void cancel() { // do nothing } + + @Override + public long getEndTime() { + return 0; + } } }