[ML] Align aggregated data extraction to histogram interval (elastic/x-pack-elasticsearch#2553)
When the datafeed uses aggregations and in order to accommodate derivatives, an extra bucket is queried at the beginning of each search. In order to avoid visiting the same bucket twice, we need to search buckets aligned to the histogram interval. This allows us to steer away from partial buckets, and thus avoid the problem of dropping or duplicating data. relates elastic/x-pack-elasticsearch#2519 Original commit: elastic/x-pack-elasticsearch@e03dde5fea
This commit is contained in:
parent
3c517902f2
commit
fad98d784f
|
@ -106,7 +106,7 @@ class AggregationDataExtractor implements DataExtractor {
|
|||
|
||||
private void initAggregationProcessor(Aggregations aggs) throws IOException {
|
||||
aggregationToJsonProcessor = new AggregationToJsonProcessor(context.timeField, context.fields, context.includeDocCount,
|
||||
context.start, getHistogramInterval());
|
||||
context.start);
|
||||
aggregationToJsonProcessor.process(aggs);
|
||||
}
|
||||
|
||||
|
@ -157,4 +157,8 @@ class AggregationDataExtractor implements DataExtractor {
|
|||
private long getHistogramInterval() {
|
||||
return ExtractorUtils.getHistogramIntervalMillis(context.aggs);
|
||||
}
|
||||
|
||||
AggregationDataExtractorContext getContext() {
|
||||
return context;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
|||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
|
||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.utils.Intervals;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
|
@ -27,6 +28,7 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory {
|
|||
|
||||
@Override
|
||||
public DataExtractor newExtractor(long start, long end) {
|
||||
long histogramInterval = datafeedConfig.getHistogramIntervalMillis();
|
||||
AggregationDataExtractorContext dataExtractorContext = new AggregationDataExtractorContext(
|
||||
job.getId(),
|
||||
job.getDataDescription().getTimeField(),
|
||||
|
@ -35,8 +37,8 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory {
|
|||
datafeedConfig.getTypes(),
|
||||
datafeedConfig.getQuery(),
|
||||
datafeedConfig.getAggregations(),
|
||||
start,
|
||||
end,
|
||||
Intervals.alignToCeil(start, histogramInterval),
|
||||
Intervals.alignToFloor(end, histogramInterval),
|
||||
job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT));
|
||||
return new AggregationDataExtractor(client, dataExtractorContext);
|
||||
}
|
||||
|
|
|
@ -51,7 +51,6 @@ class AggregationToJsonProcessor {
|
|||
private long keyValueWrittenCount;
|
||||
private final SortedMap<Long, List<Map<String, Object>>> docsByBucketTimestamp;
|
||||
private final long startTime;
|
||||
private final long histogramInterval;
|
||||
|
||||
/**
|
||||
* Constructs a processor that processes aggregations into JSON
|
||||
|
@ -60,9 +59,8 @@ class AggregationToJsonProcessor {
|
|||
* @param fields the fields to convert into JSON
|
||||
* @param includeDocCount whether to include the doc_count
|
||||
* @param startTime buckets with a timestamp before this time are discarded
|
||||
* @param histogramInterval the histogram interval
|
||||
*/
|
||||
AggregationToJsonProcessor(String timeField, Set<String> fields, boolean includeDocCount, long startTime, long histogramInterval)
|
||||
AggregationToJsonProcessor(String timeField, Set<String> fields, boolean includeDocCount, long startTime)
|
||||
throws IOException {
|
||||
this.timeField = Objects.requireNonNull(timeField);
|
||||
this.fields = Objects.requireNonNull(fields);
|
||||
|
@ -71,7 +69,6 @@ class AggregationToJsonProcessor {
|
|||
docsByBucketTimestamp = new TreeMap<>();
|
||||
keyValueWrittenCount = 0;
|
||||
this.startTime = startTime;
|
||||
this.histogramInterval = histogramInterval;
|
||||
}
|
||||
|
||||
public void process(Aggregations aggs) throws IOException {
|
||||
|
@ -162,9 +159,9 @@ class AggregationToJsonProcessor {
|
|||
for (Histogram.Bucket bucket : agg.getBuckets()) {
|
||||
if (checkBucketTime) {
|
||||
long bucketTime = toHistogramKeyToEpoch(bucket.getKey());
|
||||
if (bucketTime + histogramInterval <= startTime) {
|
||||
if (bucketTime < startTime) {
|
||||
// skip buckets outside the required time range
|
||||
LOGGER.debug("Skipping bucket at [" + bucketTime + "], startTime is [" + startTime + "]");
|
||||
LOGGER.debug("Skipping bucket at [{}], startTime is [{}]", bucketTime, startTime);
|
||||
continue;
|
||||
} else {
|
||||
checkBucketTime = false;
|
||||
|
|
|
@ -94,9 +94,10 @@ public class ChunkedDataExtractor implements DataExtractor {
|
|||
private void setUpChunkedSearch() throws IOException {
|
||||
DataSummary dataSummary = requestDataSummary();
|
||||
if (dataSummary.totalHits > 0) {
|
||||
currentStart = dataSummary.earliestTime;
|
||||
currentStart = context.timeAligner.alignToFloor(dataSummary.earliestTime);
|
||||
currentEnd = currentStart;
|
||||
chunkSpan = context.chunkSpan == null ? dataSummary.estimateChunk() : context.chunkSpan.getMillis();
|
||||
chunkSpan = context.timeAligner.alignToCeil(chunkSpan);
|
||||
LOGGER.debug("Chunked search configured: totalHits = {}, dataTimeSpread = {} ms, chunk span = {} ms",
|
||||
dataSummary.totalHits, dataSummary.getDataTimeSpread(), chunkSpan);
|
||||
} else {
|
||||
|
@ -212,4 +213,8 @@ public class ChunkedDataExtractor implements DataExtractor {
|
|||
return Math.max(estimatedChunk, MIN_CHUNK_SPAN);
|
||||
}
|
||||
}
|
||||
|
||||
ChunkedDataExtractorContext getContext() {
|
||||
return context;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,11 @@ import java.util.Objects;
|
|||
|
||||
class ChunkedDataExtractorContext {
|
||||
|
||||
interface TimeAligner {
|
||||
long alignToFloor(long value);
|
||||
long alignToCeil(long value);
|
||||
}
|
||||
|
||||
final String jobId;
|
||||
final String timeField;
|
||||
final String[] indices;
|
||||
|
@ -23,9 +28,11 @@ class ChunkedDataExtractorContext {
|
|||
final long start;
|
||||
final long end;
|
||||
final TimeValue chunkSpan;
|
||||
final TimeAligner timeAligner;
|
||||
|
||||
ChunkedDataExtractorContext(String jobId, String timeField, List<String> indices, List<String> types,
|
||||
QueryBuilder query, int scrollSize, long start, long end, @Nullable TimeValue chunkSpan) {
|
||||
QueryBuilder query, int scrollSize, long start, long end, @Nullable TimeValue chunkSpan,
|
||||
TimeAligner timeAligner) {
|
||||
this.jobId = Objects.requireNonNull(jobId);
|
||||
this.timeField = Objects.requireNonNull(timeField);
|
||||
this.indices = indices.toArray(new String[indices.size()]);
|
||||
|
@ -35,5 +42,6 @@ class ChunkedDataExtractorContext {
|
|||
this.start = start;
|
||||
this.end = end;
|
||||
this.chunkSpan = chunkSpan;
|
||||
this.timeAligner = Objects.requireNonNull(timeAligner);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,10 +6,12 @@
|
|||
package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
|
||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.utils.Intervals;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
|
@ -29,6 +31,7 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory {
|
|||
|
||||
@Override
|
||||
public DataExtractor newExtractor(long start, long end) {
|
||||
ChunkedDataExtractorContext.TimeAligner timeAligner = newTimeAligner();
|
||||
ChunkedDataExtractorContext dataExtractorContext = new ChunkedDataExtractorContext(
|
||||
job.getId(),
|
||||
job.getDataDescription().getTimeField(),
|
||||
|
@ -36,9 +39,50 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory {
|
|||
datafeedConfig.getTypes(),
|
||||
datafeedConfig.getQuery(),
|
||||
datafeedConfig.getScrollSize(),
|
||||
start,
|
||||
end,
|
||||
datafeedConfig.getChunkingConfig().getTimeSpan());
|
||||
timeAligner.alignToCeil(start),
|
||||
timeAligner.alignToFloor(end),
|
||||
datafeedConfig.getChunkingConfig().getTimeSpan(),
|
||||
timeAligner);
|
||||
return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext);
|
||||
}
|
||||
|
||||
private ChunkedDataExtractorContext.TimeAligner newTimeAligner() {
|
||||
if (datafeedConfig.hasAggregations()) {
|
||||
// When the datafeed uses aggregations and in order to accommodate derivatives,
|
||||
// an extra bucket is queried at the beginning of each search. In order to avoid visiting
|
||||
// the same bucket twice, we need to search buckets aligned to the histogram interval.
|
||||
// This allows us to steer away from partial buckets, and thus avoid the problem of
|
||||
// dropping or duplicating data.
|
||||
return newIntervalTimeAligner(datafeedConfig.getHistogramIntervalMillis());
|
||||
}
|
||||
return newIdentityTimeAligner();
|
||||
}
|
||||
|
||||
static ChunkedDataExtractorContext.TimeAligner newIdentityTimeAligner() {
|
||||
return new ChunkedDataExtractorContext.TimeAligner() {
|
||||
@Override
|
||||
public long alignToFloor(long value) {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long alignToCeil(long value) {
|
||||
return value;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
static ChunkedDataExtractorContext.TimeAligner newIntervalTimeAligner(long interval) {
|
||||
return new ChunkedDataExtractorContext.TimeAligner() {
|
||||
@Override
|
||||
public long alignToFloor(long value) {
|
||||
return Intervals.alignToFloor(value, interval);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long alignToCeil(long value) {
|
||||
return Intervals.alignToCeil(value, interval);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.utils;
|
||||
|
||||
/**
|
||||
* A collection of utilities related to intervals
|
||||
*/
|
||||
public class Intervals {
|
||||
|
||||
private Intervals() {}
|
||||
|
||||
/**
|
||||
* Aligns a {@code value} to a multiple of an {@code interval} by rounding down.
|
||||
* @param value the value to align to a multiple of the {@code interval}
|
||||
* @param interval the interval
|
||||
* @return the multiple of the {@code interval} that is less or equal to the {@code value}
|
||||
*/
|
||||
public static long alignToFloor(long value, long interval) {
|
||||
long result = (value / interval) * interval;
|
||||
if (result == value || value >= 0) {
|
||||
return result;
|
||||
}
|
||||
return result - interval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Aligns a {@code value} to a multiple of an {@code interval} by rounding up.
|
||||
* @param value the value to align to a multiple of the {@code interval}
|
||||
* @param interval the interval
|
||||
* @return the multiple of the {@code interval} that is greater or equal to the {@code value}
|
||||
*/
|
||||
public static long alignToCeil(long value, long interval) {
|
||||
long result = alignToFloor(value, interval);
|
||||
return result == value ? result : result + interval;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class AggregationDataExtractorFactoryTests extends ESTestCase {
|
||||
|
||||
private Client client;
|
||||
|
||||
@Before
|
||||
public void setUpMocks() {
|
||||
client = mock(Client.class);
|
||||
}
|
||||
|
||||
public void testNewExtractor_GivenAlignedTimes() {
|
||||
AggregationDataExtractorFactory factory = createFactory(1000L);
|
||||
|
||||
AggregationDataExtractor dataExtractor = (AggregationDataExtractor) factory.newExtractor(2000, 5000);
|
||||
|
||||
assertThat(dataExtractor.getContext().start, equalTo(2000L));
|
||||
assertThat(dataExtractor.getContext().end, equalTo(5000L));
|
||||
}
|
||||
|
||||
public void testNewExtractor_GivenNonAlignedTimes() {
|
||||
AggregationDataExtractorFactory factory = createFactory(1000L);
|
||||
|
||||
AggregationDataExtractor dataExtractor = (AggregationDataExtractor) factory.newExtractor(3980, 9200);
|
||||
|
||||
assertThat(dataExtractor.getContext().start, equalTo(4000L));
|
||||
assertThat(dataExtractor.getContext().end, equalTo(9000L));
|
||||
}
|
||||
|
||||
private AggregationDataExtractorFactory createFactory(long histogramInterval) {
|
||||
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator(
|
||||
AggregationBuilders.histogram("time").field("time").interval(histogramInterval).subAggregation(
|
||||
AggregationBuilders.max("time").field("time")));
|
||||
DataDescription.Builder dataDescription = new DataDescription.Builder();
|
||||
dataDescription.setTimeField("time");
|
||||
Detector.Builder detectorBuilder = new Detector.Builder();
|
||||
detectorBuilder.setFunction("sum");
|
||||
detectorBuilder.setFieldName("value");
|
||||
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detectorBuilder.build()));
|
||||
analysisConfig.setSummaryCountFieldName("doc_count");
|
||||
Job.Builder jobBuilder = new Job.Builder("foo");
|
||||
jobBuilder.setDataDescription(dataDescription);
|
||||
jobBuilder.setAnalysisConfig(analysisConfig);
|
||||
DatafeedConfig.Builder datafeedConfigBuilder = new DatafeedConfig.Builder("foo-feed", jobBuilder.getId());
|
||||
datafeedConfigBuilder.setAggregations(aggs);
|
||||
datafeedConfigBuilder.setIndices(Arrays.asList("my_index"));
|
||||
return new AggregationDataExtractorFactory(client, datafeedConfigBuilder.build(), jobBuilder.build(new Date()));
|
||||
}
|
||||
}
|
|
@ -44,7 +44,6 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
private String timeField = "time";
|
||||
private boolean includeDocCount = true;
|
||||
private long startTime = 0;
|
||||
private long histogramInterval = 1000;
|
||||
|
||||
public void testProcessGivenMultipleDateHistograms() {
|
||||
List<Histogram.Bucket> nestedHistogramBuckets = Arrays.asList(
|
||||
|
@ -374,7 +373,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
public void testBucketAggContainsRequiredAgg() throws IOException {
|
||||
Set<String> fields = new HashSet<>();
|
||||
fields.add("foo");
|
||||
AggregationToJsonProcessor processor = new AggregationToJsonProcessor("time", fields, false, 0L, 10L);
|
||||
AggregationToJsonProcessor processor = new AggregationToJsonProcessor("time", fields, false, 0L);
|
||||
|
||||
Terms termsAgg = mock(Terms.class);
|
||||
when(termsAgg.getBuckets()).thenReturn(Collections.emptyList());
|
||||
|
@ -414,7 +413,6 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
);
|
||||
|
||||
startTime = 2000;
|
||||
histogramInterval = 1000;
|
||||
String json = aggToString(Sets.newHashSet("my_field"), histogramBuckets);
|
||||
|
||||
assertThat(json, equalTo("{\"time\":2000,\"my_field\":2.0,\"doc_count\":7} " +
|
||||
|
@ -435,35 +433,12 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
);
|
||||
|
||||
startTime = 3000;
|
||||
histogramInterval = 1000;
|
||||
String json = aggToString(Sets.newHashSet("my_field"), histogramBuckets);
|
||||
|
||||
assertThat(json, equalTo("{\"time\":3000,\"my_field\":3.0,\"doc_count\":10} " +
|
||||
"{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}"));
|
||||
}
|
||||
|
||||
public void testFirstBucketIsNotPrunedIfItContainsStartTime() throws IOException {
|
||||
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
|
||||
createHistogramBucket(1000L, 4, Arrays.asList(
|
||||
createMax("time", 1000), createPercentiles("my_field", 1.0))),
|
||||
createHistogramBucket(2000L, 7, Arrays.asList(
|
||||
createMax("time", 2000), createPercentiles("my_field", 2.0))),
|
||||
createHistogramBucket(3000L, 10, Arrays.asList(
|
||||
createMax("time", 3000), createPercentiles("my_field", 3.0))),
|
||||
createHistogramBucket(4000L, 14, Arrays.asList(
|
||||
createMax("time", 4000), createPercentiles("my_field", 4.0)))
|
||||
);
|
||||
|
||||
startTime = 1999;
|
||||
histogramInterval = 1000;
|
||||
String json = aggToString(Sets.newHashSet("my_field"), histogramBuckets);
|
||||
|
||||
assertThat(json, equalTo("{\"time\":1000,\"my_field\":1.0,\"doc_count\":4} " +
|
||||
"{\"time\":2000,\"my_field\":2.0,\"doc_count\":7} " +
|
||||
"{\"time\":3000,\"my_field\":3.0,\"doc_count\":10} " +
|
||||
"{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}"));
|
||||
}
|
||||
|
||||
private String aggToString(Set<String> fields, Histogram.Bucket bucket) throws IOException {
|
||||
return aggToString(fields, Collections.singletonList(bucket));
|
||||
}
|
||||
|
@ -476,8 +451,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
private String aggToString(Set<String> fields, Aggregations aggregations) throws IOException {
|
||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
|
||||
AggregationToJsonProcessor processor = new AggregationToJsonProcessor(
|
||||
timeField, fields, includeDocCount, startTime, histogramInterval);
|
||||
AggregationToJsonProcessor processor = new AggregationToJsonProcessor(timeField, fields, includeDocCount, startTime);
|
||||
processor.process(aggregations);
|
||||
processor.writeDocs(10000, outputStream);
|
||||
keyValuePairsWritten = processor.getKeyValueCount();
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class ChunkedDataExtractorFactoryTests extends ESTestCase {
|
||||
|
||||
private Client client;
|
||||
private DataExtractorFactory dataExtractorFactory;
|
||||
|
||||
@Before
|
||||
public void setUpMocks() {
|
||||
client = mock(Client.class);
|
||||
dataExtractorFactory = mock(DataExtractorFactory.class);
|
||||
}
|
||||
|
||||
public void testNewExtractor_GivenAlignedTimes() {
|
||||
ChunkedDataExtractorFactory factory = createFactory(1000L);
|
||||
|
||||
ChunkedDataExtractor dataExtractor = (ChunkedDataExtractor) factory.newExtractor(2000, 5000);
|
||||
|
||||
assertThat(dataExtractor.getContext().start, equalTo(2000L));
|
||||
assertThat(dataExtractor.getContext().end, equalTo(5000L));
|
||||
}
|
||||
|
||||
public void testNewExtractor_GivenNonAlignedTimes() {
|
||||
ChunkedDataExtractorFactory factory = createFactory(1000L);
|
||||
|
||||
ChunkedDataExtractor dataExtractor = (ChunkedDataExtractor) factory.newExtractor(3980, 9200);
|
||||
|
||||
assertThat(dataExtractor.getContext().start, equalTo(4000L));
|
||||
assertThat(dataExtractor.getContext().end, equalTo(9000L));
|
||||
}
|
||||
|
||||
public void testIntervalTimeAligner() {
|
||||
ChunkedDataExtractorContext.TimeAligner timeAligner = ChunkedDataExtractorFactory.newIntervalTimeAligner(100L);
|
||||
assertThat(timeAligner.alignToFloor(300L), equalTo(300L));
|
||||
assertThat(timeAligner.alignToFloor(301L), equalTo(300L));
|
||||
assertThat(timeAligner.alignToFloor(399L), equalTo(300L));
|
||||
assertThat(timeAligner.alignToFloor(400L), equalTo(400L));
|
||||
assertThat(timeAligner.alignToCeil(300L), equalTo(300L));
|
||||
assertThat(timeAligner.alignToCeil(301L), equalTo(400L));
|
||||
assertThat(timeAligner.alignToCeil(399L), equalTo(400L));
|
||||
assertThat(timeAligner.alignToCeil(400L), equalTo(400L));
|
||||
}
|
||||
|
||||
public void testIdentityTimeAligner() {
|
||||
ChunkedDataExtractorContext.TimeAligner timeAligner = ChunkedDataExtractorFactory.newIdentityTimeAligner();
|
||||
assertThat(timeAligner.alignToFloor(300L), equalTo(300L));
|
||||
assertThat(timeAligner.alignToFloor(301L), equalTo(301L));
|
||||
assertThat(timeAligner.alignToFloor(399L), equalTo(399L));
|
||||
assertThat(timeAligner.alignToFloor(400L), equalTo(400L));
|
||||
assertThat(timeAligner.alignToCeil(300L), equalTo(300L));
|
||||
assertThat(timeAligner.alignToCeil(301L), equalTo(301L));
|
||||
assertThat(timeAligner.alignToCeil(399L), equalTo(399L));
|
||||
assertThat(timeAligner.alignToCeil(400L), equalTo(400L));
|
||||
}
|
||||
|
||||
private ChunkedDataExtractorFactory createFactory(long histogramInterval) {
|
||||
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator(
|
||||
AggregationBuilders.histogram("time").field("time").interval(histogramInterval).subAggregation(
|
||||
AggregationBuilders.max("time").field("time")));
|
||||
DataDescription.Builder dataDescription = new DataDescription.Builder();
|
||||
dataDescription.setTimeField("time");
|
||||
Detector.Builder detectorBuilder = new Detector.Builder();
|
||||
detectorBuilder.setFunction("sum");
|
||||
detectorBuilder.setFieldName("value");
|
||||
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detectorBuilder.build()));
|
||||
analysisConfig.setSummaryCountFieldName("doc_count");
|
||||
Job.Builder jobBuilder = new Job.Builder("foo");
|
||||
jobBuilder.setDataDescription(dataDescription);
|
||||
jobBuilder.setAnalysisConfig(analysisConfig);
|
||||
DatafeedConfig.Builder datafeedConfigBuilder = new DatafeedConfig.Builder("foo-feed", jobBuilder.getId());
|
||||
datafeedConfigBuilder.setAggregations(aggs);
|
||||
datafeedConfigBuilder.setIndices(Arrays.asList("my_index"));
|
||||
return new ChunkedDataExtractorFactory(client, datafeedConfigBuilder.build(), jobBuilder.build(new Date()), dataExtractorFactory);
|
||||
}
|
||||
}
|
|
@ -445,7 +445,8 @@ public class ChunkedDataExtractorTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private ChunkedDataExtractorContext createContext(long start, long end) {
|
||||
return new ChunkedDataExtractorContext(jobId, timeField, indices, types, query, scrollSize, start, end, chunkSpan);
|
||||
return new ChunkedDataExtractorContext(jobId, timeField, indices, types, query, scrollSize, start, end, chunkSpan,
|
||||
ChunkedDataExtractorFactory.newIdentityTimeAligner());
|
||||
}
|
||||
|
||||
private static class StubSubExtractor implements DataExtractor {
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.utils;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class IntervalsTests extends ESTestCase {
|
||||
|
||||
public void testAlignToFloor() {
|
||||
assertThat(Intervals.alignToFloor(0, 10), equalTo(0L));
|
||||
assertThat(Intervals.alignToFloor(10, 5), equalTo(10L));
|
||||
assertThat(Intervals.alignToFloor(6, 5), equalTo(5L));
|
||||
assertThat(Intervals.alignToFloor(36, 5), equalTo(35L));
|
||||
assertThat(Intervals.alignToFloor(10, 10), equalTo(10L));
|
||||
assertThat(Intervals.alignToFloor(11, 10), equalTo(10L));
|
||||
assertThat(Intervals.alignToFloor(19, 10), equalTo(10L));
|
||||
assertThat(Intervals.alignToFloor(20, 10), equalTo(20L));
|
||||
assertThat(Intervals.alignToFloor(25, 10), equalTo(20L));
|
||||
assertThat(Intervals.alignToFloor(-20, 10), equalTo(-20L));
|
||||
assertThat(Intervals.alignToFloor(-21, 10), equalTo(-30L));
|
||||
}
|
||||
|
||||
public void testAlignToCeil() {
|
||||
assertThat(Intervals.alignToCeil(0, 10), equalTo(0L));
|
||||
assertThat(Intervals.alignToCeil(10, 5), equalTo(10L));
|
||||
assertThat(Intervals.alignToCeil(6, 5), equalTo(10L));
|
||||
assertThat(Intervals.alignToCeil(36, 5), equalTo(40L));
|
||||
assertThat(Intervals.alignToCeil(10, 10), equalTo(10L));
|
||||
assertThat(Intervals.alignToCeil(11, 10), equalTo(20L));
|
||||
assertThat(Intervals.alignToCeil(19, 10), equalTo(20L));
|
||||
assertThat(Intervals.alignToCeil(20, 10), equalTo(20L));
|
||||
assertThat(Intervals.alignToCeil(25, 10), equalTo(30L));
|
||||
assertThat(Intervals.alignToCeil(-20, 10), equalTo(-20L));
|
||||
assertThat(Intervals.alignToCeil(-21, 10), equalTo(-20L));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue