From fad98d784f8b02fc72456829a7758b8a2c9764ee Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 20 Sep 2017 16:45:06 +0100 Subject: [PATCH] [ML] Align aggregated data extraction to histogram interval (elastic/x-pack-elasticsearch#2553) When the datafeed uses aggregations and in order to accommodate derivatives, an extra bucket is queried at the beginning of each search. In order to avoid visiting the same bucket twice, we need to search buckets aligned to the histogram interval. This allows us to steer away from partial buckets, and thus avoid the problem of dropping or duplicating data. relates elastic/x-pack-elasticsearch#2519 Original commit: elastic/x-pack-elasticsearch@e03dde5fea609ddee266ba0f46e65176fe449f23 --- .../aggregation/AggregationDataExtractor.java | 6 +- .../AggregationDataExtractorFactory.java | 6 +- .../AggregationToJsonProcessor.java | 9 +- .../chunked/ChunkedDataExtractor.java | 7 +- .../chunked/ChunkedDataExtractorContext.java | 10 +- .../chunked/ChunkedDataExtractorFactory.java | 50 +++++++++- .../xpack/ml/utils/Intervals.java | 39 ++++++++ .../AggregationDataExtractorFactoryTests.java | 71 ++++++++++++++ .../AggregationToJsonProcessorTests.java | 30 +----- .../ChunkedDataExtractorFactoryTests.java | 98 +++++++++++++++++++ .../chunked/ChunkedDataExtractorTests.java | 3 +- .../xpack/ml/utils/IntervalsTests.java | 41 ++++++++ 12 files changed, 327 insertions(+), 43 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/utils/Intervals.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactoryTests.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/utils/IntervalsTests.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java index 98e2cf90239..6624299fd7f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java @@ -106,7 +106,7 @@ class AggregationDataExtractor implements DataExtractor { private void initAggregationProcessor(Aggregations aggs) throws IOException { aggregationToJsonProcessor = new AggregationToJsonProcessor(context.timeField, context.fields, context.includeDocCount, - context.start, getHistogramInterval()); + context.start); aggregationToJsonProcessor.process(aggs); } @@ -157,4 +157,8 @@ class AggregationDataExtractor implements DataExtractor { private long getHistogramInterval() { return ExtractorUtils.getHistogramIntervalMillis(context.aggs); } + + AggregationDataExtractorContext getContext() { + return context; + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java index 67546853fa7..db081637624 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java @@ -10,6 +10,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.utils.Intervals; import java.util.Objects; @@ -27,6 +28,7 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory { @Override public DataExtractor newExtractor(long start, long end) { + long histogramInterval = datafeedConfig.getHistogramIntervalMillis(); AggregationDataExtractorContext dataExtractorContext = new AggregationDataExtractorContext( job.getId(), job.getDataDescription().getTimeField(), @@ -35,8 +37,8 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory { datafeedConfig.getTypes(), datafeedConfig.getQuery(), datafeedConfig.getAggregations(), - start, - end, + Intervals.alignToCeil(start, histogramInterval), + Intervals.alignToFloor(end, histogramInterval), job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT)); return new AggregationDataExtractor(client, dataExtractorContext); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java index 1c4f8bc3587..b46ca924452 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java @@ -51,7 +51,6 @@ class AggregationToJsonProcessor { private long keyValueWrittenCount; private final SortedMap>> docsByBucketTimestamp; private final long startTime; - private final long histogramInterval; /** * Constructs a processor that processes aggregations into JSON @@ -60,9 +59,8 @@ class AggregationToJsonProcessor { * @param fields the fields to convert into JSON * @param includeDocCount whether to include the doc_count * @param startTime buckets with a timestamp before this time are discarded - * @param histogramInterval the histogram interval */ - AggregationToJsonProcessor(String timeField, Set fields, boolean includeDocCount, long startTime, long histogramInterval) + AggregationToJsonProcessor(String timeField, Set fields, boolean includeDocCount, long startTime) throws IOException { this.timeField = Objects.requireNonNull(timeField); this.fields = Objects.requireNonNull(fields); @@ -71,7 +69,6 @@ class AggregationToJsonProcessor { docsByBucketTimestamp = new TreeMap<>(); keyValueWrittenCount = 0; this.startTime = startTime; - this.histogramInterval = histogramInterval; } public void process(Aggregations aggs) throws IOException { @@ -162,9 +159,9 @@ class AggregationToJsonProcessor { for (Histogram.Bucket bucket : agg.getBuckets()) { if (checkBucketTime) { long bucketTime = toHistogramKeyToEpoch(bucket.getKey()); - if (bucketTime + histogramInterval <= startTime) { + if (bucketTime < startTime) { // skip buckets outside the required time range - LOGGER.debug("Skipping bucket at [" + bucketTime + "], startTime is [" + startTime + "]"); + LOGGER.debug("Skipping bucket at [{}], startTime is [{}]", bucketTime, startTime); continue; } else { checkBucketTime = false; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java index 4014dae6fc9..6f07a31fead 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java @@ -94,9 +94,10 @@ public class ChunkedDataExtractor implements DataExtractor { private void setUpChunkedSearch() throws IOException { DataSummary dataSummary = requestDataSummary(); if (dataSummary.totalHits > 0) { - currentStart = dataSummary.earliestTime; + currentStart = context.timeAligner.alignToFloor(dataSummary.earliestTime); currentEnd = currentStart; chunkSpan = context.chunkSpan == null ? dataSummary.estimateChunk() : context.chunkSpan.getMillis(); + chunkSpan = context.timeAligner.alignToCeil(chunkSpan); LOGGER.debug("Chunked search configured: totalHits = {}, dataTimeSpread = {} ms, chunk span = {} ms", dataSummary.totalHits, dataSummary.getDataTimeSpread(), chunkSpan); } else { @@ -212,4 +213,8 @@ public class ChunkedDataExtractor implements DataExtractor { return Math.max(estimatedChunk, MIN_CHUNK_SPAN); } } + + ChunkedDataExtractorContext getContext() { + return context; + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java index 49720c69900..4c2cdfd1b12 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java @@ -14,6 +14,11 @@ import java.util.Objects; class ChunkedDataExtractorContext { + interface TimeAligner { + long alignToFloor(long value); + long alignToCeil(long value); + } + final String jobId; final String timeField; final String[] indices; @@ -23,9 +28,11 @@ class ChunkedDataExtractorContext { final long start; final long end; final TimeValue chunkSpan; + final TimeAligner timeAligner; ChunkedDataExtractorContext(String jobId, String timeField, List indices, List types, - QueryBuilder query, int scrollSize, long start, long end, @Nullable TimeValue chunkSpan) { + QueryBuilder query, int scrollSize, long start, long end, @Nullable TimeValue chunkSpan, + TimeAligner timeAligner) { this.jobId = Objects.requireNonNull(jobId); this.timeField = Objects.requireNonNull(timeField); this.indices = indices.toArray(new String[indices.size()]); @@ -35,5 +42,6 @@ class ChunkedDataExtractorContext { this.start = start; this.end = end; this.chunkSpan = chunkSpan; + this.timeAligner = Objects.requireNonNull(timeAligner); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java index ef20672f6ff..0a60d0cd637 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java @@ -6,10 +6,12 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.chunked; import org.elasticsearch.client.Client; +import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.utils.Intervals; import java.util.Objects; @@ -29,6 +31,7 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory { @Override public DataExtractor newExtractor(long start, long end) { + ChunkedDataExtractorContext.TimeAligner timeAligner = newTimeAligner(); ChunkedDataExtractorContext dataExtractorContext = new ChunkedDataExtractorContext( job.getId(), job.getDataDescription().getTimeField(), @@ -36,9 +39,50 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory { datafeedConfig.getTypes(), datafeedConfig.getQuery(), datafeedConfig.getScrollSize(), - start, - end, - datafeedConfig.getChunkingConfig().getTimeSpan()); + timeAligner.alignToCeil(start), + timeAligner.alignToFloor(end), + datafeedConfig.getChunkingConfig().getTimeSpan(), + timeAligner); return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext); } + + private ChunkedDataExtractorContext.TimeAligner newTimeAligner() { + if (datafeedConfig.hasAggregations()) { + // When the datafeed uses aggregations and in order to accommodate derivatives, + // an extra bucket is queried at the beginning of each search. In order to avoid visiting + // the same bucket twice, we need to search buckets aligned to the histogram interval. + // This allows us to steer away from partial buckets, and thus avoid the problem of + // dropping or duplicating data. + return newIntervalTimeAligner(datafeedConfig.getHistogramIntervalMillis()); + } + return newIdentityTimeAligner(); + } + + static ChunkedDataExtractorContext.TimeAligner newIdentityTimeAligner() { + return new ChunkedDataExtractorContext.TimeAligner() { + @Override + public long alignToFloor(long value) { + return value; + } + + @Override + public long alignToCeil(long value) { + return value; + } + }; + } + + static ChunkedDataExtractorContext.TimeAligner newIntervalTimeAligner(long interval) { + return new ChunkedDataExtractorContext.TimeAligner() { + @Override + public long alignToFloor(long value) { + return Intervals.alignToFloor(value, interval); + } + + @Override + public long alignToCeil(long value) { + return Intervals.alignToCeil(value, interval); + } + }; + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/Intervals.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/Intervals.java new file mode 100644 index 00000000000..5b766e714c0 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/Intervals.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.utils; + +/** + * A collection of utilities related to intervals + */ +public class Intervals { + + private Intervals() {} + + /** + * Aligns a {@code value} to a multiple of an {@code interval} by rounding down. + * @param value the value to align to a multiple of the {@code interval} + * @param interval the interval + * @return the multiple of the {@code interval} that is less or equal to the {@code value} + */ + public static long alignToFloor(long value, long interval) { + long result = (value / interval) * interval; + if (result == value || value >= 0) { + return result; + } + return result - interval; + } + + /** + * Aligns a {@code value} to a multiple of an {@code interval} by rounding up. + * @param value the value to align to a multiple of the {@code interval} + * @param interval the interval + * @return the multiple of the {@code interval} that is greater or equal to the {@code value} + */ + public static long alignToCeil(long value, long interval) { + long result = alignToFloor(value, interval); + return result == value ? result : result + interval; + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java new file mode 100644 index 00000000000..6da9823d921 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.elasticsearch.client.Client; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Date; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class AggregationDataExtractorFactoryTests extends ESTestCase { + + private Client client; + + @Before + public void setUpMocks() { + client = mock(Client.class); + } + + public void testNewExtractor_GivenAlignedTimes() { + AggregationDataExtractorFactory factory = createFactory(1000L); + + AggregationDataExtractor dataExtractor = (AggregationDataExtractor) factory.newExtractor(2000, 5000); + + assertThat(dataExtractor.getContext().start, equalTo(2000L)); + assertThat(dataExtractor.getContext().end, equalTo(5000L)); + } + + public void testNewExtractor_GivenNonAlignedTimes() { + AggregationDataExtractorFactory factory = createFactory(1000L); + + AggregationDataExtractor dataExtractor = (AggregationDataExtractor) factory.newExtractor(3980, 9200); + + assertThat(dataExtractor.getContext().start, equalTo(4000L)); + assertThat(dataExtractor.getContext().end, equalTo(9000L)); + } + + private AggregationDataExtractorFactory createFactory(long histogramInterval) { + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator( + AggregationBuilders.histogram("time").field("time").interval(histogramInterval).subAggregation( + AggregationBuilders.max("time").field("time"))); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Detector.Builder detectorBuilder = new Detector.Builder(); + detectorBuilder.setFunction("sum"); + detectorBuilder.setFieldName("value"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detectorBuilder.build())); + analysisConfig.setSummaryCountFieldName("doc_count"); + Job.Builder jobBuilder = new Job.Builder("foo"); + jobBuilder.setDataDescription(dataDescription); + jobBuilder.setAnalysisConfig(analysisConfig); + DatafeedConfig.Builder datafeedConfigBuilder = new DatafeedConfig.Builder("foo-feed", jobBuilder.getId()); + datafeedConfigBuilder.setAggregations(aggs); + datafeedConfigBuilder.setIndices(Arrays.asList("my_index")); + return new AggregationDataExtractorFactory(client, datafeedConfigBuilder.build(), jobBuilder.build(new Date())); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java index e4d90b3a1b6..ffadcfab43c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java @@ -44,7 +44,6 @@ public class AggregationToJsonProcessorTests extends ESTestCase { private String timeField = "time"; private boolean includeDocCount = true; private long startTime = 0; - private long histogramInterval = 1000; public void testProcessGivenMultipleDateHistograms() { List nestedHistogramBuckets = Arrays.asList( @@ -374,7 +373,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase { public void testBucketAggContainsRequiredAgg() throws IOException { Set fields = new HashSet<>(); fields.add("foo"); - AggregationToJsonProcessor processor = new AggregationToJsonProcessor("time", fields, false, 0L, 10L); + AggregationToJsonProcessor processor = new AggregationToJsonProcessor("time", fields, false, 0L); Terms termsAgg = mock(Terms.class); when(termsAgg.getBuckets()).thenReturn(Collections.emptyList()); @@ -414,7 +413,6 @@ public class AggregationToJsonProcessorTests extends ESTestCase { ); startTime = 2000; - histogramInterval = 1000; String json = aggToString(Sets.newHashSet("my_field"), histogramBuckets); assertThat(json, equalTo("{\"time\":2000,\"my_field\":2.0,\"doc_count\":7} " + @@ -435,35 +433,12 @@ public class AggregationToJsonProcessorTests extends ESTestCase { ); startTime = 3000; - histogramInterval = 1000; String json = aggToString(Sets.newHashSet("my_field"), histogramBuckets); assertThat(json, equalTo("{\"time\":3000,\"my_field\":3.0,\"doc_count\":10} " + "{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}")); } - public void testFirstBucketIsNotPrunedIfItContainsStartTime() throws IOException { - List histogramBuckets = Arrays.asList( - createHistogramBucket(1000L, 4, Arrays.asList( - createMax("time", 1000), createPercentiles("my_field", 1.0))), - createHistogramBucket(2000L, 7, Arrays.asList( - createMax("time", 2000), createPercentiles("my_field", 2.0))), - createHistogramBucket(3000L, 10, Arrays.asList( - createMax("time", 3000), createPercentiles("my_field", 3.0))), - createHistogramBucket(4000L, 14, Arrays.asList( - createMax("time", 4000), createPercentiles("my_field", 4.0))) - ); - - startTime = 1999; - histogramInterval = 1000; - String json = aggToString(Sets.newHashSet("my_field"), histogramBuckets); - - assertThat(json, equalTo("{\"time\":1000,\"my_field\":1.0,\"doc_count\":4} " + - "{\"time\":2000,\"my_field\":2.0,\"doc_count\":7} " + - "{\"time\":3000,\"my_field\":3.0,\"doc_count\":10} " + - "{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}")); - } - private String aggToString(Set fields, Histogram.Bucket bucket) throws IOException { return aggToString(fields, Collections.singletonList(bucket)); } @@ -476,8 +451,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase { private String aggToString(Set fields, Aggregations aggregations) throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - AggregationToJsonProcessor processor = new AggregationToJsonProcessor( - timeField, fields, includeDocCount, startTime, histogramInterval); + AggregationToJsonProcessor processor = new AggregationToJsonProcessor(timeField, fields, includeDocCount, startTime); processor.process(aggregations); processor.writeDocs(10000, outputStream); keyValuePairsWritten = processor.getKeyValueCount(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactoryTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactoryTests.java new file mode 100644 index 00000000000..66ae570ed51 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactoryTests.java @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.chunked; + +import org.elasticsearch.client.Client; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Date; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class ChunkedDataExtractorFactoryTests extends ESTestCase { + + private Client client; + private DataExtractorFactory dataExtractorFactory; + + @Before + public void setUpMocks() { + client = mock(Client.class); + dataExtractorFactory = mock(DataExtractorFactory.class); + } + + public void testNewExtractor_GivenAlignedTimes() { + ChunkedDataExtractorFactory factory = createFactory(1000L); + + ChunkedDataExtractor dataExtractor = (ChunkedDataExtractor) factory.newExtractor(2000, 5000); + + assertThat(dataExtractor.getContext().start, equalTo(2000L)); + assertThat(dataExtractor.getContext().end, equalTo(5000L)); + } + + public void testNewExtractor_GivenNonAlignedTimes() { + ChunkedDataExtractorFactory factory = createFactory(1000L); + + ChunkedDataExtractor dataExtractor = (ChunkedDataExtractor) factory.newExtractor(3980, 9200); + + assertThat(dataExtractor.getContext().start, equalTo(4000L)); + assertThat(dataExtractor.getContext().end, equalTo(9000L)); + } + + public void testIntervalTimeAligner() { + ChunkedDataExtractorContext.TimeAligner timeAligner = ChunkedDataExtractorFactory.newIntervalTimeAligner(100L); + assertThat(timeAligner.alignToFloor(300L), equalTo(300L)); + assertThat(timeAligner.alignToFloor(301L), equalTo(300L)); + assertThat(timeAligner.alignToFloor(399L), equalTo(300L)); + assertThat(timeAligner.alignToFloor(400L), equalTo(400L)); + assertThat(timeAligner.alignToCeil(300L), equalTo(300L)); + assertThat(timeAligner.alignToCeil(301L), equalTo(400L)); + assertThat(timeAligner.alignToCeil(399L), equalTo(400L)); + assertThat(timeAligner.alignToCeil(400L), equalTo(400L)); + } + + public void testIdentityTimeAligner() { + ChunkedDataExtractorContext.TimeAligner timeAligner = ChunkedDataExtractorFactory.newIdentityTimeAligner(); + assertThat(timeAligner.alignToFloor(300L), equalTo(300L)); + assertThat(timeAligner.alignToFloor(301L), equalTo(301L)); + assertThat(timeAligner.alignToFloor(399L), equalTo(399L)); + assertThat(timeAligner.alignToFloor(400L), equalTo(400L)); + assertThat(timeAligner.alignToCeil(300L), equalTo(300L)); + assertThat(timeAligner.alignToCeil(301L), equalTo(301L)); + assertThat(timeAligner.alignToCeil(399L), equalTo(399L)); + assertThat(timeAligner.alignToCeil(400L), equalTo(400L)); + } + + private ChunkedDataExtractorFactory createFactory(long histogramInterval) { + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator( + AggregationBuilders.histogram("time").field("time").interval(histogramInterval).subAggregation( + AggregationBuilders.max("time").field("time"))); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Detector.Builder detectorBuilder = new Detector.Builder(); + detectorBuilder.setFunction("sum"); + detectorBuilder.setFieldName("value"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detectorBuilder.build())); + analysisConfig.setSummaryCountFieldName("doc_count"); + Job.Builder jobBuilder = new Job.Builder("foo"); + jobBuilder.setDataDescription(dataDescription); + jobBuilder.setAnalysisConfig(analysisConfig); + DatafeedConfig.Builder datafeedConfigBuilder = new DatafeedConfig.Builder("foo-feed", jobBuilder.getId()); + datafeedConfigBuilder.setAggregations(aggs); + datafeedConfigBuilder.setIndices(Arrays.asList("my_index")); + return new ChunkedDataExtractorFactory(client, datafeedConfigBuilder.build(), jobBuilder.build(new Date()), dataExtractorFactory); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java index dee69494854..f5ebe86d5b4 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java @@ -445,7 +445,8 @@ public class ChunkedDataExtractorTests extends ESTestCase { } private ChunkedDataExtractorContext createContext(long start, long end) { - return new ChunkedDataExtractorContext(jobId, timeField, indices, types, query, scrollSize, start, end, chunkSpan); + return new ChunkedDataExtractorContext(jobId, timeField, indices, types, query, scrollSize, start, end, chunkSpan, + ChunkedDataExtractorFactory.newIdentityTimeAligner()); } private static class StubSubExtractor implements DataExtractor { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/utils/IntervalsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/utils/IntervalsTests.java new file mode 100644 index 00000000000..1052f65047c --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/utils/IntervalsTests.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.utils; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class IntervalsTests extends ESTestCase { + + public void testAlignToFloor() { + assertThat(Intervals.alignToFloor(0, 10), equalTo(0L)); + assertThat(Intervals.alignToFloor(10, 5), equalTo(10L)); + assertThat(Intervals.alignToFloor(6, 5), equalTo(5L)); + assertThat(Intervals.alignToFloor(36, 5), equalTo(35L)); + assertThat(Intervals.alignToFloor(10, 10), equalTo(10L)); + assertThat(Intervals.alignToFloor(11, 10), equalTo(10L)); + assertThat(Intervals.alignToFloor(19, 10), equalTo(10L)); + assertThat(Intervals.alignToFloor(20, 10), equalTo(20L)); + assertThat(Intervals.alignToFloor(25, 10), equalTo(20L)); + assertThat(Intervals.alignToFloor(-20, 10), equalTo(-20L)); + assertThat(Intervals.alignToFloor(-21, 10), equalTo(-30L)); + } + + public void testAlignToCeil() { + assertThat(Intervals.alignToCeil(0, 10), equalTo(0L)); + assertThat(Intervals.alignToCeil(10, 5), equalTo(10L)); + assertThat(Intervals.alignToCeil(6, 5), equalTo(10L)); + assertThat(Intervals.alignToCeil(36, 5), equalTo(40L)); + assertThat(Intervals.alignToCeil(10, 10), equalTo(10L)); + assertThat(Intervals.alignToCeil(11, 10), equalTo(20L)); + assertThat(Intervals.alignToCeil(19, 10), equalTo(20L)); + assertThat(Intervals.alignToCeil(20, 10), equalTo(20L)); + assertThat(Intervals.alignToCeil(25, 10), equalTo(30L)); + assertThat(Intervals.alignToCeil(-20, 10), equalTo(-20L)); + assertThat(Intervals.alignToCeil(-21, 10), equalTo(-20L)); + } +} \ No newline at end of file