diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/DataExtractor.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/DataExtractor.java index 20968b22425..ff276383361 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/DataExtractor.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/DataExtractor.java @@ -33,4 +33,9 @@ public interface DataExtractor { * Cancel the current search. */ void cancel(); + + /** + * @return the end time to which this extractor will search + */ + long getEndTime(); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java new file mode 100644 index 00000000000..3e89da92b6f --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; +import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.junit.After; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class DatafeedWithAggsIT extends MlNativeAutodetectIntegTestCase { + + @After + public void cleanup(){ + cleanUp(); + } + + public void testRealtime() throws Exception { + String dataIndex = "datafeed-with-aggs-rt-data"; + + // A job with a bucket_span of 2s + String jobId = "datafeed-with-aggs-rt-job"; + DataDescription.Builder dataDescription = new DataDescription.Builder(); + + Detector.Builder d = new Detector.Builder("count", null); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(2)); + analysisConfig.setSummaryCountFieldName("doc_count"); + + Job.Builder jobBuilder = new Job.Builder(); + jobBuilder.setId(jobId); + + jobBuilder.setAnalysisConfig(analysisConfig); + jobBuilder.setDataDescription(dataDescription); + + // Datafeed with aggs + String datafeedId = jobId + "-feed"; + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder(datafeedId, jobId); + datafeedBuilder.setQueryDelay(TimeValue.timeValueMillis(100)); + datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(1)); + datafeedBuilder.setIndices(Collections.singletonList(dataIndex)); + + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); + aggs.addAggregator(AggregationBuilders.dateHistogram("time").field("time").interval(1000) + .subAggregation(AggregationBuilders.max("time").field("time"))); + datafeedBuilder.setParsedAggregations(aggs); + + DatafeedConfig datafeed = datafeedBuilder.build(); + + // Create stuff and open job + registerJob(jobBuilder); + putJob(jobBuilder); + registerDatafeed(datafeed); + putDatafeed(datafeed); + openJob(jobId); + + // Now let's index the data + client().admin().indices().prepareCreate(dataIndex) + .addMapping("type", "time", "type=date") + .get(); + + // Index a doc per second from a minute ago to a minute later + long now = System.currentTimeMillis(); + long aMinuteAgo = now - TimeValue.timeValueMinutes(1).millis(); + long aMinuteLater = now + TimeValue.timeValueMinutes(1).millis(); + long curTime = aMinuteAgo; + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + while (curTime < aMinuteLater) { + IndexRequest indexRequest = new IndexRequest(dataIndex); + indexRequest.source("time", curTime); + bulkRequestBuilder.add(indexRequest); + curTime += TimeValue.timeValueSeconds(1).millis(); + } + BulkResponse bulkResponse = bulkRequestBuilder + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index docs: " + bulkResponse.buildFailureMessage()); + } + + // And start datafeed in real-time mode + startDatafeed(datafeedId, 0L, null); + + // Wait until we finalize a bucket after now + assertBusy(() -> { + GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId); + getBucketsRequest.setExcludeInterim(true); + getBucketsRequest.setSort("timestamp"); + getBucketsRequest.setDescending(true); + List buckets = getBuckets(getBucketsRequest); + assertThat(buckets.size(), greaterThanOrEqualTo(1)); + assertThat(buckets.get(0).getTimestamp().getTime(), greaterThan(now)); + }, 30, TimeUnit.SECONDS); + + // Wrap up + StopDatafeedAction.Response stopJobResponse = stopDatafeed(datafeedId); + assertTrue(stopJobResponse.isStopped()); + assertBusy(() -> { + GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); + assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED)); + }); + closeJob(jobId); + + // Assert we have not dropped any data - final buckets should contain 2 events each + GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId); + getBucketsRequest.setExcludeInterim(true); + List buckets = getBuckets(getBucketsRequest); + for (Bucket bucket : buckets) { + if (bucket.getEventCount() != 2) { + fail("Bucket [" + bucket.getTimestamp().getTime() + "] has [" + bucket.getEventCount() + "] when 2 were expected"); + } + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 85f2489e6b0..4e5e2070d43 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -380,7 +380,7 @@ class DatafeedJob { } } - lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1); + lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, dataExtractor.getEndTime() - 1); LOGGER.debug("[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", jobId, error, recordCount, lastEndTimeMs, isRunning(), dataExtractor.isCancelled()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java index df858f45c82..aa5c7ed6314 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java @@ -80,6 +80,11 @@ abstract class AbstractAggregationDataExtractor next() throws IOException { if (!hasNext()) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java index 618ae6ee9a3..f1e1fe2a10a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java @@ -114,7 +114,7 @@ public class ChunkedDataExtractor implements DataExtractor { currentEnd = currentStart; chunkSpan = context.chunkSpan == null ? dataSummary.estimateChunk() : context.chunkSpan.getMillis(); chunkSpan = context.timeAligner.alignToCeil(chunkSpan); - LOGGER.debug("[{}]Chunked search configured: kind = {}, dataTimeSpread = {} ms, chunk span = {} ms", + LOGGER.debug("[{}] Chunked search configured: kind = {}, dataTimeSpread = {} ms, chunk span = {} ms", context.jobId, dataSummary.getClass().getSimpleName(), dataSummary.getDataTimeSpread(), chunkSpan); } else { // search is over @@ -170,6 +170,11 @@ public class ChunkedDataExtractor implements DataExtractor { isCancelled = true; } + @Override + public long getEndTime() { + return context.end; + } + ChunkedDataExtractorContext getContext() { return context; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index 5e6eb96637d..b848eba948c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -77,6 +77,11 @@ class ScrollDataExtractor implements DataExtractor { isCancelled = true; } + @Override + public long getEndTime() { + return context.end; + } + @Override public Optional next() throws IOException { if (!hasNext()) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 2540ab8cde8..8d8bd84a97c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -370,7 +370,7 @@ public class DatafeedJobTests extends ESTestCase { verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); } - public void testPostAnalysisProblem() throws Exception { + public void testPostAnalysisProblem() { client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); @@ -378,6 +378,8 @@ public class DatafeedJobTests extends ESTestCase { when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(new RuntimeException()); + when(dataExtractor.getEndTime()).thenReturn(1000L); + DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1); DatafeedJob.AnalysisProblemException analysisProblemException = expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L)); @@ -397,7 +399,7 @@ public class DatafeedJobTests extends ESTestCase { verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); } - public void testPostAnalysisProblemIsConflict() throws Exception { + public void testPostAnalysisProblemIsConflict() { client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); @@ -405,6 +407,8 @@ public class DatafeedJobTests extends ESTestCase { when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); when(client.execute(same(PostDataAction.INSTANCE), any())).thenThrow(ExceptionsHelper.conflictStatusException("conflict")); + when(dataExtractor.getEndTime()).thenReturn(1000L); + DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1); DatafeedJob.AnalysisProblemException analysisProblemException = expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L)); @@ -424,7 +428,7 @@ public class DatafeedJobTests extends ESTestCase { verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); } - public void testFlushAnalysisProblem() throws Exception { + public void testFlushAnalysisProblem() { when(client.execute(same(FlushJobAction.INSTANCE), any())).thenThrow(new RuntimeException()); currentTime = 60000L; @@ -436,7 +440,7 @@ public class DatafeedJobTests extends ESTestCase { assertThat(analysisProblemException.shouldStop, is(false)); } - public void testFlushAnalysisProblemIsConflict() throws Exception { + public void testFlushAnalysisProblemIsConflict() { when(client.execute(same(FlushJobAction.INSTANCE), any())).thenThrow(ExceptionsHelper.conflictStatusException("conflict")); currentTime = 60000L; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java index c8e53dfcf7d..406f1a5fa90 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java @@ -596,5 +596,10 @@ public class ChunkedDataExtractorTests extends ESTestCase { public void cancel() { // do nothing } + + @Override + public long getEndTime() { + return 0; + } } }