diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java index 98e2cf90239..6624299fd7f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java @@ -106,7 +106,7 @@ class AggregationDataExtractor implements DataExtractor { private void initAggregationProcessor(Aggregations aggs) throws IOException { aggregationToJsonProcessor = new AggregationToJsonProcessor(context.timeField, context.fields, context.includeDocCount, - context.start, getHistogramInterval()); + context.start); aggregationToJsonProcessor.process(aggs); } @@ -157,4 +157,8 @@ class AggregationDataExtractor implements DataExtractor { private long getHistogramInterval() { return ExtractorUtils.getHistogramIntervalMillis(context.aggs); } + + AggregationDataExtractorContext getContext() { + return context; + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java index 67546853fa7..db081637624 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java @@ -10,6 +10,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.utils.Intervals; import java.util.Objects; @@ -27,6 +28,7 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory { @Override public DataExtractor newExtractor(long start, long end) { + long histogramInterval = datafeedConfig.getHistogramIntervalMillis(); AggregationDataExtractorContext dataExtractorContext = new AggregationDataExtractorContext( job.getId(), job.getDataDescription().getTimeField(), @@ -35,8 +37,8 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory { datafeedConfig.getTypes(), datafeedConfig.getQuery(), datafeedConfig.getAggregations(), - start, - end, + Intervals.alignToCeil(start, histogramInterval), + Intervals.alignToFloor(end, histogramInterval), job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT)); return new AggregationDataExtractor(client, dataExtractorContext); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java index 1c4f8bc3587..b46ca924452 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java @@ -51,7 +51,6 @@ class AggregationToJsonProcessor { private long keyValueWrittenCount; private final SortedMap>> docsByBucketTimestamp; private final long startTime; - private final long histogramInterval; /** * Constructs a processor that processes aggregations into JSON @@ -60,9 +59,8 @@ class AggregationToJsonProcessor { * @param fields the fields to convert into JSON * @param includeDocCount whether to include the doc_count * @param startTime buckets with a timestamp before this time are discarded - * @param histogramInterval the histogram interval */ - AggregationToJsonProcessor(String timeField, Set fields, boolean includeDocCount, long startTime, long histogramInterval) + AggregationToJsonProcessor(String timeField, Set fields, boolean includeDocCount, long startTime) throws IOException { this.timeField = Objects.requireNonNull(timeField); this.fields = Objects.requireNonNull(fields); @@ -71,7 +69,6 @@ class AggregationToJsonProcessor { docsByBucketTimestamp = new TreeMap<>(); keyValueWrittenCount = 0; this.startTime = startTime; - this.histogramInterval = histogramInterval; } public void process(Aggregations aggs) throws IOException { @@ -162,9 +159,9 @@ class AggregationToJsonProcessor { for (Histogram.Bucket bucket : agg.getBuckets()) { if (checkBucketTime) { long bucketTime = toHistogramKeyToEpoch(bucket.getKey()); - if (bucketTime + histogramInterval <= startTime) { + if (bucketTime < startTime) { // skip buckets outside the required time range - LOGGER.debug("Skipping bucket at [" + bucketTime + "], startTime is [" + startTime + "]"); + LOGGER.debug("Skipping bucket at [{}], startTime is [{}]", bucketTime, startTime); continue; } else { checkBucketTime = false; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java index 4014dae6fc9..6f07a31fead 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java @@ -94,9 +94,10 @@ public class ChunkedDataExtractor implements DataExtractor { private void setUpChunkedSearch() throws IOException { DataSummary dataSummary = requestDataSummary(); if (dataSummary.totalHits > 0) { - currentStart = dataSummary.earliestTime; + currentStart = context.timeAligner.alignToFloor(dataSummary.earliestTime); currentEnd = currentStart; chunkSpan = context.chunkSpan == null ? dataSummary.estimateChunk() : context.chunkSpan.getMillis(); + chunkSpan = context.timeAligner.alignToCeil(chunkSpan); LOGGER.debug("Chunked search configured: totalHits = {}, dataTimeSpread = {} ms, chunk span = {} ms", dataSummary.totalHits, dataSummary.getDataTimeSpread(), chunkSpan); } else { @@ -212,4 +213,8 @@ public class ChunkedDataExtractor implements DataExtractor { return Math.max(estimatedChunk, MIN_CHUNK_SPAN); } } + + ChunkedDataExtractorContext getContext() { + return context; + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java index 49720c69900..4c2cdfd1b12 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java @@ -14,6 +14,11 @@ import java.util.Objects; class ChunkedDataExtractorContext { + interface TimeAligner { + long alignToFloor(long value); + long alignToCeil(long value); + } + final String jobId; final String timeField; final String[] indices; @@ -23,9 +28,11 @@ class ChunkedDataExtractorContext { final long start; final long end; final TimeValue chunkSpan; + final TimeAligner timeAligner; ChunkedDataExtractorContext(String jobId, String timeField, List indices, List types, - QueryBuilder query, int scrollSize, long start, long end, @Nullable TimeValue chunkSpan) { + QueryBuilder query, int scrollSize, long start, long end, @Nullable TimeValue chunkSpan, + TimeAligner timeAligner) { this.jobId = Objects.requireNonNull(jobId); this.timeField = Objects.requireNonNull(timeField); this.indices = indices.toArray(new String[indices.size()]); @@ -35,5 +42,6 @@ class ChunkedDataExtractorContext { this.start = start; this.end = end; this.chunkSpan = chunkSpan; + this.timeAligner = Objects.requireNonNull(timeAligner); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java index ef20672f6ff..0a60d0cd637 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java @@ -6,10 +6,12 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.chunked; import org.elasticsearch.client.Client; +import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.utils.Intervals; import java.util.Objects; @@ -29,6 +31,7 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory { @Override public DataExtractor newExtractor(long start, long end) { + ChunkedDataExtractorContext.TimeAligner timeAligner = newTimeAligner(); ChunkedDataExtractorContext dataExtractorContext = new ChunkedDataExtractorContext( job.getId(), job.getDataDescription().getTimeField(), @@ -36,9 +39,50 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory { datafeedConfig.getTypes(), datafeedConfig.getQuery(), datafeedConfig.getScrollSize(), - start, - end, - datafeedConfig.getChunkingConfig().getTimeSpan()); + timeAligner.alignToCeil(start), + timeAligner.alignToFloor(end), + datafeedConfig.getChunkingConfig().getTimeSpan(), + timeAligner); return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext); } + + private ChunkedDataExtractorContext.TimeAligner newTimeAligner() { + if (datafeedConfig.hasAggregations()) { + // When the datafeed uses aggregations and in order to accommodate derivatives, + // an extra bucket is queried at the beginning of each search. In order to avoid visiting + // the same bucket twice, we need to search buckets aligned to the histogram interval. + // This allows us to steer away from partial buckets, and thus avoid the problem of + // dropping or duplicating data. + return newIntervalTimeAligner(datafeedConfig.getHistogramIntervalMillis()); + } + return newIdentityTimeAligner(); + } + + static ChunkedDataExtractorContext.TimeAligner newIdentityTimeAligner() { + return new ChunkedDataExtractorContext.TimeAligner() { + @Override + public long alignToFloor(long value) { + return value; + } + + @Override + public long alignToCeil(long value) { + return value; + } + }; + } + + static ChunkedDataExtractorContext.TimeAligner newIntervalTimeAligner(long interval) { + return new ChunkedDataExtractorContext.TimeAligner() { + @Override + public long alignToFloor(long value) { + return Intervals.alignToFloor(value, interval); + } + + @Override + public long alignToCeil(long value) { + return Intervals.alignToCeil(value, interval); + } + }; + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/Intervals.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/Intervals.java new file mode 100644 index 00000000000..5b766e714c0 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/Intervals.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.utils; + +/** + * A collection of utilities related to intervals + */ +public class Intervals { + + private Intervals() {} + + /** + * Aligns a {@code value} to a multiple of an {@code interval} by rounding down. + * @param value the value to align to a multiple of the {@code interval} + * @param interval the interval + * @return the multiple of the {@code interval} that is less or equal to the {@code value} + */ + public static long alignToFloor(long value, long interval) { + long result = (value / interval) * interval; + if (result == value || value >= 0) { + return result; + } + return result - interval; + } + + /** + * Aligns a {@code value} to a multiple of an {@code interval} by rounding up. + * @param value the value to align to a multiple of the {@code interval} + * @param interval the interval + * @return the multiple of the {@code interval} that is greater or equal to the {@code value} + */ + public static long alignToCeil(long value, long interval) { + long result = alignToFloor(value, interval); + return result == value ? result : result + interval; + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java new file mode 100644 index 00000000000..6da9823d921 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.elasticsearch.client.Client; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Date; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class AggregationDataExtractorFactoryTests extends ESTestCase { + + private Client client; + + @Before + public void setUpMocks() { + client = mock(Client.class); + } + + public void testNewExtractor_GivenAlignedTimes() { + AggregationDataExtractorFactory factory = createFactory(1000L); + + AggregationDataExtractor dataExtractor = (AggregationDataExtractor) factory.newExtractor(2000, 5000); + + assertThat(dataExtractor.getContext().start, equalTo(2000L)); + assertThat(dataExtractor.getContext().end, equalTo(5000L)); + } + + public void testNewExtractor_GivenNonAlignedTimes() { + AggregationDataExtractorFactory factory = createFactory(1000L); + + AggregationDataExtractor dataExtractor = (AggregationDataExtractor) factory.newExtractor(3980, 9200); + + assertThat(dataExtractor.getContext().start, equalTo(4000L)); + assertThat(dataExtractor.getContext().end, equalTo(9000L)); + } + + private AggregationDataExtractorFactory createFactory(long histogramInterval) { + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator( + AggregationBuilders.histogram("time").field("time").interval(histogramInterval).subAggregation( + AggregationBuilders.max("time").field("time"))); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Detector.Builder detectorBuilder = new Detector.Builder(); + detectorBuilder.setFunction("sum"); + detectorBuilder.setFieldName("value"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detectorBuilder.build())); + analysisConfig.setSummaryCountFieldName("doc_count"); + Job.Builder jobBuilder = new Job.Builder("foo"); + jobBuilder.setDataDescription(dataDescription); + jobBuilder.setAnalysisConfig(analysisConfig); + DatafeedConfig.Builder datafeedConfigBuilder = new DatafeedConfig.Builder("foo-feed", jobBuilder.getId()); + datafeedConfigBuilder.setAggregations(aggs); + datafeedConfigBuilder.setIndices(Arrays.asList("my_index")); + return new AggregationDataExtractorFactory(client, datafeedConfigBuilder.build(), jobBuilder.build(new Date())); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java index e4d90b3a1b6..ffadcfab43c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java @@ -44,7 +44,6 @@ public class AggregationToJsonProcessorTests extends ESTestCase { private String timeField = "time"; private boolean includeDocCount = true; private long startTime = 0; - private long histogramInterval = 1000; public void testProcessGivenMultipleDateHistograms() { List nestedHistogramBuckets = Arrays.asList( @@ -374,7 +373,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase { public void testBucketAggContainsRequiredAgg() throws IOException { Set fields = new HashSet<>(); fields.add("foo"); - AggregationToJsonProcessor processor = new AggregationToJsonProcessor("time", fields, false, 0L, 10L); + AggregationToJsonProcessor processor = new AggregationToJsonProcessor("time", fields, false, 0L); Terms termsAgg = mock(Terms.class); when(termsAgg.getBuckets()).thenReturn(Collections.emptyList()); @@ -414,7 +413,6 @@ public class AggregationToJsonProcessorTests extends ESTestCase { ); startTime = 2000; - histogramInterval = 1000; String json = aggToString(Sets.newHashSet("my_field"), histogramBuckets); assertThat(json, equalTo("{\"time\":2000,\"my_field\":2.0,\"doc_count\":7} " + @@ -435,35 +433,12 @@ public class AggregationToJsonProcessorTests extends ESTestCase { ); startTime = 3000; - histogramInterval = 1000; String json = aggToString(Sets.newHashSet("my_field"), histogramBuckets); assertThat(json, equalTo("{\"time\":3000,\"my_field\":3.0,\"doc_count\":10} " + "{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}")); } - public void testFirstBucketIsNotPrunedIfItContainsStartTime() throws IOException { - List histogramBuckets = Arrays.asList( - createHistogramBucket(1000L, 4, Arrays.asList( - createMax("time", 1000), createPercentiles("my_field", 1.0))), - createHistogramBucket(2000L, 7, Arrays.asList( - createMax("time", 2000), createPercentiles("my_field", 2.0))), - createHistogramBucket(3000L, 10, Arrays.asList( - createMax("time", 3000), createPercentiles("my_field", 3.0))), - createHistogramBucket(4000L, 14, Arrays.asList( - createMax("time", 4000), createPercentiles("my_field", 4.0))) - ); - - startTime = 1999; - histogramInterval = 1000; - String json = aggToString(Sets.newHashSet("my_field"), histogramBuckets); - - assertThat(json, equalTo("{\"time\":1000,\"my_field\":1.0,\"doc_count\":4} " + - "{\"time\":2000,\"my_field\":2.0,\"doc_count\":7} " + - "{\"time\":3000,\"my_field\":3.0,\"doc_count\":10} " + - "{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}")); - } - private String aggToString(Set fields, Histogram.Bucket bucket) throws IOException { return aggToString(fields, Collections.singletonList(bucket)); } @@ -476,8 +451,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase { private String aggToString(Set fields, Aggregations aggregations) throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - AggregationToJsonProcessor processor = new AggregationToJsonProcessor( - timeField, fields, includeDocCount, startTime, histogramInterval); + AggregationToJsonProcessor processor = new AggregationToJsonProcessor(timeField, fields, includeDocCount, startTime); processor.process(aggregations); processor.writeDocs(10000, outputStream); keyValuePairsWritten = processor.getKeyValueCount(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactoryTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactoryTests.java new file mode 100644 index 00000000000..66ae570ed51 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactoryTests.java @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.chunked; + +import org.elasticsearch.client.Client; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Date; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class ChunkedDataExtractorFactoryTests extends ESTestCase { + + private Client client; + private DataExtractorFactory dataExtractorFactory; + + @Before + public void setUpMocks() { + client = mock(Client.class); + dataExtractorFactory = mock(DataExtractorFactory.class); + } + + public void testNewExtractor_GivenAlignedTimes() { + ChunkedDataExtractorFactory factory = createFactory(1000L); + + ChunkedDataExtractor dataExtractor = (ChunkedDataExtractor) factory.newExtractor(2000, 5000); + + assertThat(dataExtractor.getContext().start, equalTo(2000L)); + assertThat(dataExtractor.getContext().end, equalTo(5000L)); + } + + public void testNewExtractor_GivenNonAlignedTimes() { + ChunkedDataExtractorFactory factory = createFactory(1000L); + + ChunkedDataExtractor dataExtractor = (ChunkedDataExtractor) factory.newExtractor(3980, 9200); + + assertThat(dataExtractor.getContext().start, equalTo(4000L)); + assertThat(dataExtractor.getContext().end, equalTo(9000L)); + } + + public void testIntervalTimeAligner() { + ChunkedDataExtractorContext.TimeAligner timeAligner = ChunkedDataExtractorFactory.newIntervalTimeAligner(100L); + assertThat(timeAligner.alignToFloor(300L), equalTo(300L)); + assertThat(timeAligner.alignToFloor(301L), equalTo(300L)); + assertThat(timeAligner.alignToFloor(399L), equalTo(300L)); + assertThat(timeAligner.alignToFloor(400L), equalTo(400L)); + assertThat(timeAligner.alignToCeil(300L), equalTo(300L)); + assertThat(timeAligner.alignToCeil(301L), equalTo(400L)); + assertThat(timeAligner.alignToCeil(399L), equalTo(400L)); + assertThat(timeAligner.alignToCeil(400L), equalTo(400L)); + } + + public void testIdentityTimeAligner() { + ChunkedDataExtractorContext.TimeAligner timeAligner = ChunkedDataExtractorFactory.newIdentityTimeAligner(); + assertThat(timeAligner.alignToFloor(300L), equalTo(300L)); + assertThat(timeAligner.alignToFloor(301L), equalTo(301L)); + assertThat(timeAligner.alignToFloor(399L), equalTo(399L)); + assertThat(timeAligner.alignToFloor(400L), equalTo(400L)); + assertThat(timeAligner.alignToCeil(300L), equalTo(300L)); + assertThat(timeAligner.alignToCeil(301L), equalTo(301L)); + assertThat(timeAligner.alignToCeil(399L), equalTo(399L)); + assertThat(timeAligner.alignToCeil(400L), equalTo(400L)); + } + + private ChunkedDataExtractorFactory createFactory(long histogramInterval) { + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator( + AggregationBuilders.histogram("time").field("time").interval(histogramInterval).subAggregation( + AggregationBuilders.max("time").field("time"))); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Detector.Builder detectorBuilder = new Detector.Builder(); + detectorBuilder.setFunction("sum"); + detectorBuilder.setFieldName("value"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detectorBuilder.build())); + analysisConfig.setSummaryCountFieldName("doc_count"); + Job.Builder jobBuilder = new Job.Builder("foo"); + jobBuilder.setDataDescription(dataDescription); + jobBuilder.setAnalysisConfig(analysisConfig); + DatafeedConfig.Builder datafeedConfigBuilder = new DatafeedConfig.Builder("foo-feed", jobBuilder.getId()); + datafeedConfigBuilder.setAggregations(aggs); + datafeedConfigBuilder.setIndices(Arrays.asList("my_index")); + return new ChunkedDataExtractorFactory(client, datafeedConfigBuilder.build(), jobBuilder.build(new Date()), dataExtractorFactory); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java index dee69494854..f5ebe86d5b4 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java @@ -445,7 +445,8 @@ public class ChunkedDataExtractorTests extends ESTestCase { } private ChunkedDataExtractorContext createContext(long start, long end) { - return new ChunkedDataExtractorContext(jobId, timeField, indices, types, query, scrollSize, start, end, chunkSpan); + return new ChunkedDataExtractorContext(jobId, timeField, indices, types, query, scrollSize, start, end, chunkSpan, + ChunkedDataExtractorFactory.newIdentityTimeAligner()); } private static class StubSubExtractor implements DataExtractor { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/utils/IntervalsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/utils/IntervalsTests.java new file mode 100644 index 00000000000..1052f65047c --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/utils/IntervalsTests.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.utils; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class IntervalsTests extends ESTestCase { + + public void testAlignToFloor() { + assertThat(Intervals.alignToFloor(0, 10), equalTo(0L)); + assertThat(Intervals.alignToFloor(10, 5), equalTo(10L)); + assertThat(Intervals.alignToFloor(6, 5), equalTo(5L)); + assertThat(Intervals.alignToFloor(36, 5), equalTo(35L)); + assertThat(Intervals.alignToFloor(10, 10), equalTo(10L)); + assertThat(Intervals.alignToFloor(11, 10), equalTo(10L)); + assertThat(Intervals.alignToFloor(19, 10), equalTo(10L)); + assertThat(Intervals.alignToFloor(20, 10), equalTo(20L)); + assertThat(Intervals.alignToFloor(25, 10), equalTo(20L)); + assertThat(Intervals.alignToFloor(-20, 10), equalTo(-20L)); + assertThat(Intervals.alignToFloor(-21, 10), equalTo(-30L)); + } + + public void testAlignToCeil() { + assertThat(Intervals.alignToCeil(0, 10), equalTo(0L)); + assertThat(Intervals.alignToCeil(10, 5), equalTo(10L)); + assertThat(Intervals.alignToCeil(6, 5), equalTo(10L)); + assertThat(Intervals.alignToCeil(36, 5), equalTo(40L)); + assertThat(Intervals.alignToCeil(10, 10), equalTo(10L)); + assertThat(Intervals.alignToCeil(11, 10), equalTo(20L)); + assertThat(Intervals.alignToCeil(19, 10), equalTo(20L)); + assertThat(Intervals.alignToCeil(20, 10), equalTo(20L)); + assertThat(Intervals.alignToCeil(25, 10), equalTo(30L)); + assertThat(Intervals.alignToCeil(-20, 10), equalTo(-20L)); + assertThat(Intervals.alignToCeil(-21, 10), equalTo(-20L)); + } +} \ No newline at end of file