[ML] Lookback one extra bucket for histograms (elastic/x-pack-elasticsearch#2084)

Original commit: elastic/x-pack-elasticsearch@b9b4d3977f
This commit is contained in:
David Kyle 2017-07-28 16:14:27 +01:00 committed by GitHub
parent 3a265de458
commit f9104b7127
8 changed files with 315 additions and 165 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.MlParserType;
import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
@ -224,97 +225,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
* Returns the histogram's interval as epoch millis.
*/
public long getHistogramIntervalMillis() {
AggregationBuilder histogramAggregation = getHistogramAggregation(aggregations.getAggregatorFactories());
return getHistogramIntervalMillis(histogramAggregation);
}
private static long getHistogramIntervalMillis(AggregationBuilder histogramAggregation) {
if (histogramAggregation instanceof HistogramAggregationBuilder) {
return (long) ((HistogramAggregationBuilder) histogramAggregation).interval();
} else if (histogramAggregation instanceof DateHistogramAggregationBuilder) {
return validateAndGetDateHistogramInterval((DateHistogramAggregationBuilder) histogramAggregation);
} else {
throw new IllegalStateException("Invalid histogram aggregation [" + histogramAggregation.getName() + "]");
}
}
static AggregationBuilder getHistogramAggregation(List<AggregationBuilder> aggregations) {
if (aggregations.isEmpty()) {
throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM));
}
if (aggregations.size() != 1) {
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM_NO_SIBLINGS);
}
AggregationBuilder agg = aggregations.get(0);
if (isHistogram(agg)) {
return agg;
} else {
return getHistogramAggregation(agg.getSubAggregations());
}
}
private static boolean isHistogram(AggregationBuilder aggregationBuilder) {
return aggregationBuilder instanceof HistogramAggregationBuilder
|| aggregationBuilder instanceof DateHistogramAggregationBuilder;
}
/**
* Returns the date histogram interval as epoch millis if valid, or throws
* an {@link ElasticsearchException} with the validation error
*/
private static long validateAndGetDateHistogramInterval(DateHistogramAggregationBuilder dateHistogram) {
if (dateHistogram.timeZone() != null && dateHistogram.timeZone().equals(DateTimeZone.UTC) == false) {
throw ExceptionsHelper.badRequestException("ML requires date_histogram.time_zone to be UTC");
}
if (dateHistogram.dateHistogramInterval() != null) {
return validateAndGetCalendarInterval(dateHistogram.dateHistogramInterval().toString());
} else {
return dateHistogram.interval();
}
}
private static long validateAndGetCalendarInterval(String calendarInterval) {
TimeValue interval;
DateTimeUnit dateTimeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(calendarInterval);
if (dateTimeUnit != null) {
switch (dateTimeUnit) {
case WEEK_OF_WEEKYEAR:
interval = new TimeValue(7, TimeUnit.DAYS);
break;
case DAY_OF_MONTH:
interval = new TimeValue(1, TimeUnit.DAYS);
break;
case HOUR_OF_DAY:
interval = new TimeValue(1, TimeUnit.HOURS);
break;
case MINUTES_OF_HOUR:
interval = new TimeValue(1, TimeUnit.MINUTES);
break;
case SECOND_OF_MINUTE:
interval = new TimeValue(1, TimeUnit.SECONDS);
break;
case MONTH_OF_YEAR:
case YEAR_OF_CENTURY:
case QUARTER:
throw ExceptionsHelper.badRequestException(invalidDateHistogramCalendarIntervalMessage(calendarInterval));
default:
throw ExceptionsHelper.badRequestException("Unexpected dateTimeUnit [" + dateTimeUnit + "]");
}
} else {
interval = TimeValue.parseTimeValue(calendarInterval, "date_histogram.interval");
}
if (interval.days() > 7) {
throw ExceptionsHelper.badRequestException(invalidDateHistogramCalendarIntervalMessage(calendarInterval));
}
return interval.millis();
}
private static String invalidDateHistogramCalendarIntervalMessage(String interval) {
throw ExceptionsHelper.badRequestException("When specifying a date_histogram calendar interval ["
+ interval + "], ML does not accept intervals longer than a week because of " +
"variable lengths of periods greater than a week");
return ExtractorUtils.getHistogramIntervalMillis(aggregations);
}
/**
@ -570,7 +481,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM);
}
AggregationBuilder histogramAggregation = getHistogramAggregation(aggregatorFactories);
AggregationBuilder histogramAggregation = ExtractorUtils.getHistogramAggregation(aggregatorFactories);
checkNoMoreHistogramAggregations(histogramAggregation.getSubAggregations());
checkHistogramAggregationHasChildMaxTimeAgg(histogramAggregation);
checkHistogramIntervalIsPositive(histogramAggregation);
@ -578,7 +489,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private static void checkNoMoreHistogramAggregations(List<AggregationBuilder> aggregations) {
for (AggregationBuilder agg : aggregations) {
if (isHistogram(agg)) {
if (ExtractorUtils.isHistogram(agg)) {
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_MAX_ONE_DATE_HISTOGRAM);
}
checkNoMoreHistogramAggregations(agg.getSubAggregations());
@ -605,7 +516,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
}
private static void checkHistogramIntervalIsPositive(AggregationBuilder histogramAggregation) {
long interval = getHistogramIntervalMillis(histogramAggregation);
long interval = ExtractorUtils.getHistogramIntervalMillis(histogramAggregation);
if (interval <= 0) {
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO);
}
@ -616,8 +527,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
if (aggregations == null) {
chunkingConfig = ChunkingConfig.newAuto();
} else {
AggregationBuilder histogramAggregation = getHistogramAggregation(aggregations.getAggregatorFactories());
long histogramIntervalMillis = getHistogramIntervalMillis(histogramAggregation);
long histogramIntervalMillis = ExtractorUtils.getHistogramIntervalMillis(aggregations);
chunkingConfig = ChunkingConfig.newManual(TimeValue.timeValueMillis(
DEFAULT_AGGREGATION_CHUNKING_BUCKETS * histogramIntervalMillis));
}

View File

@ -6,17 +6,28 @@
package org.elasticsearch.xpack.ml.datafeed.extractor;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.rounding.DateTimeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Collects common utility methods needed by various {@link DataExtractor} implementations
@ -54,4 +65,120 @@ public final class ExtractorUtils {
throw new IOException("[" + jobId + "] Search request encountered [" + unavailableShards + "] unavailable shards");
}
}
/**
* Find the (date) histogram in {@code aggFactory} and extract its interval.
* Throws if there is no (date) histogram or if the histogram has sibling
* aggregations.
* @param aggFactory Aggregations factory
* @return The histogram interval
*/
public static long getHistogramIntervalMillis(AggregatorFactories.Builder aggFactory) {
AggregationBuilder histogram = getHistogramAggregation(aggFactory.getAggregatorFactories());
return getHistogramIntervalMillis(histogram);
}
/**
* Find and return (date) histogram in {@code aggregations}
* @param aggregations List of aggregations
* @return A {@link HistogramAggregationBuilder} or a {@link DateHistogramAggregationBuilder}
*/
public static AggregationBuilder getHistogramAggregation(List<AggregationBuilder> aggregations) {
if (aggregations.isEmpty()) {
throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM));
}
if (aggregations.size() != 1) {
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM_NO_SIBLINGS);
}
AggregationBuilder agg = aggregations.get(0);
if (isHistogram(agg)) {
return agg;
} else {
return getHistogramAggregation(agg.getSubAggregations());
}
}
public static boolean isHistogram(AggregationBuilder aggregationBuilder) {
return aggregationBuilder instanceof HistogramAggregationBuilder
|| aggregationBuilder instanceof DateHistogramAggregationBuilder;
}
/**
* Get the interval from {@code histogramAggregation} or throw an {@coce IllegalStateException}
* if {@code histogramAggregation} is not a {@link HistogramAggregationBuilder} or a
* {@link DateHistogramAggregationBuilder}
*
* @param histogramAggregation Must be a {@link HistogramAggregationBuilder} or a
* {@link DateHistogramAggregationBuilder}
* @return The histogram interval
*/
public static long getHistogramIntervalMillis(AggregationBuilder histogramAggregation) {
if (histogramAggregation instanceof HistogramAggregationBuilder) {
return (long) ((HistogramAggregationBuilder) histogramAggregation).interval();
} else if (histogramAggregation instanceof DateHistogramAggregationBuilder) {
return validateAndGetDateHistogramInterval((DateHistogramAggregationBuilder) histogramAggregation);
} else {
throw new IllegalStateException("Invalid histogram aggregation [" + histogramAggregation.getName() + "]");
}
}
/**
* Returns the date histogram interval as epoch millis if valid, or throws
* an {@link ElasticsearchException} with the validation error
*/
private static long validateAndGetDateHistogramInterval(DateHistogramAggregationBuilder dateHistogram) {
if (dateHistogram.timeZone() != null && dateHistogram.timeZone().equals(DateTimeZone.UTC) == false) {
throw ExceptionsHelper.badRequestException("ML requires date_histogram.time_zone to be UTC");
}
if (dateHistogram.dateHistogramInterval() != null) {
return validateAndGetCalendarInterval(dateHistogram.dateHistogramInterval().toString());
} else {
return dateHistogram.interval();
}
}
static long validateAndGetCalendarInterval(String calendarInterval) {
TimeValue interval;
DateTimeUnit dateTimeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(calendarInterval);
if (dateTimeUnit != null) {
switch (dateTimeUnit) {
case WEEK_OF_WEEKYEAR:
interval = new TimeValue(7, TimeUnit.DAYS);
break;
case DAY_OF_MONTH:
interval = new TimeValue(1, TimeUnit.DAYS);
break;
case HOUR_OF_DAY:
interval = new TimeValue(1, TimeUnit.HOURS);
break;
case MINUTES_OF_HOUR:
interval = new TimeValue(1, TimeUnit.MINUTES);
break;
case SECOND_OF_MINUTE:
interval = new TimeValue(1, TimeUnit.SECONDS);
break;
case MONTH_OF_YEAR:
case YEAR_OF_CENTURY:
case QUARTER:
throw ExceptionsHelper.badRequestException(invalidDateHistogramCalendarIntervalMessage(calendarInterval));
default:
throw ExceptionsHelper.badRequestException("Unexpected dateTimeUnit [" + dateTimeUnit + "]");
}
} else {
interval = TimeValue.parseTimeValue(calendarInterval, "date_histogram.interval");
}
if (interval.days() > 7) {
throw ExceptionsHelper.badRequestException(invalidDateHistogramCalendarIntervalMessage(calendarInterval));
}
return interval.millis();
}
private static String invalidDateHistogramCalendarIntervalMessage(String interval) {
throw ExceptionsHelper.badRequestException("When specifying a date_histogram calendar interval ["
+ interval + "], ML does not accept intervals longer than a week because of " +
"variable lengths of periods greater than a week");
}
}

View File

@ -105,7 +105,8 @@ class AggregationDataExtractor implements DataExtractor {
}
private void initAggregationProcessor(Aggregations aggs) throws IOException {
aggregationToJsonProcessor = new AggregationToJsonProcessor(context.timeField, context.fields, context.includeDocCount);
aggregationToJsonProcessor = new AggregationToJsonProcessor(context.timeField, context.fields, context.includeDocCount,
context.start);
aggregationToJsonProcessor.process(aggs);
}
@ -114,11 +115,19 @@ class AggregationDataExtractor implements DataExtractor {
}
private SearchRequestBuilder buildSearchRequest() {
long histogramSearchStartTime = context.start;
if (context.aggs.getPipelineAggregatorFactories().isEmpty() == false) {
// For derivative aggregations the first bucket will always be null
// so query one extra histogram bucket back and hope there is data
// in that bucket
histogramSearchStartTime = Math.max(0, context.start - getHistogramInterval());
}
SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client)
.setIndices(context.indices)
.setTypes(context.types)
.setSize(0)
.setQuery(ExtractorUtils.wrapInTimeRangeQuery(context.query, context.timeField, context.start, context.end));
.setQuery(ExtractorUtils.wrapInTimeRangeQuery(context.query, context.timeField, histogramSearchStartTime, context.end));
context.aggs.getAggregatorFactories().forEach(searchRequestBuilder::addAggregation);
context.aggs.getPipelineAggregatorFactories().forEach(searchRequestBuilder::addAggregation);
@ -147,4 +156,8 @@ class AggregationDataExtractor implements DataExtractor {
hasNext = aggregationToJsonProcessor.writeDocs(BATCH_KEY_VALUE_PAIRS, outputStream);
return new ByteArrayInputStream(outputStream.toByteArray());
}
private long getHistogramInterval() {
return ExtractorUtils.getHistogramIntervalMillis(context.aggs);
}
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.joda.time.DateTime;
import java.io.IOException;
import java.io.OutputStream;
@ -45,6 +46,7 @@ class AggregationToJsonProcessor {
private final LinkedHashMap<String, Object> keyValuePairs;
private long keyValueWrittenCount;
private SortedMap<Long, List<Map<String, Object>>> docsByBucketTimestamp;
private long startTime;
/**
* Constructs a processor that processes aggregations into JSON
@ -52,8 +54,9 @@ class AggregationToJsonProcessor {
* @param timeField the time field
* @param fields the fields to convert into JSON
* @param includeDocCount whether to include the doc_count
* @param startTime buckets with a timestamp before this time are discarded
*/
AggregationToJsonProcessor(String timeField, Set<String> fields, boolean includeDocCount)
AggregationToJsonProcessor(String timeField, Set<String> fields, boolean includeDocCount, long startTime)
throws IOException {
this.timeField = Objects.requireNonNull(timeField);
this.fields = Objects.requireNonNull(fields);
@ -61,6 +64,7 @@ class AggregationToJsonProcessor {
keyValuePairs = new LinkedHashMap<>();
docsByBucketTimestamp = new TreeMap<>();
keyValueWrittenCount = 0;
this.startTime = startTime;
}
public void process(Aggregations aggs) throws IOException {
@ -145,14 +149,41 @@ class AggregationToJsonProcessor {
"[" + agg.getName() + "] is another instance of a Date histogram");
}
// buckets are ordered by time, once we get to a bucket past the
// start time we no longer need to check the time.
boolean checkBucketTime = true;
for (Histogram.Bucket bucket : agg.getBuckets()) {
List<Aggregation> childAggs = bucket.getAggregations().asList();
if (checkBucketTime) {
if (toHistogramKeyToEpoch(bucket.getKey()) < startTime) {
// skip buckets outside the required time range
continue;
} else {
checkBucketTime = false;
}
}
List<Aggregation> childAggs = bucket.getAggregations().asList();
processAggs(bucket.getDocCount(), childAggs);
keyValuePairs.remove(timeField);
}
}
/*
* Date Histograms have a {@link DateTime} object as the key,
* Histograms have either a Double or Long.
*/
private long toHistogramKeyToEpoch(Object key) {
if (key instanceof DateTime) {
return ((DateTime)key).getMillis();
} else if (key instanceof Double) {
return ((Double)key).longValue();
} else if (key instanceof Long){
return (Long)key;
} else {
throw new IllegalStateException("Histogram key [" + key + "] cannot be converted to a timestamp");
}
}
private void processTimeField(Aggregation agg) {
if (agg instanceof Max == false) {
throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_MISSING_MAX_AGGREGATION_FOR_TIME_FIELD, timeField));

View File

@ -353,75 +353,20 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
TermsAggregationBuilder nestedTerms = AggregationBuilders.terms("nested_terms");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time");
AggregationBuilder histogramAggregationBuilder = DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(dateHistogram).getAggregatorFactories());
assertEquals(dateHistogram, histogramAggregationBuilder);
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
dateHistogram.subAggregation(avg).subAggregation(nestedTerms).subAggregation(maxTime).field("time");
histogramAggregationBuilder = DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(dateHistogram).getAggregatorFactories());
assertEquals(dateHistogram, histogramAggregationBuilder);
TermsAggregationBuilder toplevelTerms = AggregationBuilders.terms("top_level");
toplevelTerms.subAggregation(dateHistogram);
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("foo", "bar");
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms));
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> builder.validateAggregations());
ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::validateAggregations);
assertEquals("Aggregations can only have 1 date_histogram or histogram aggregation", e.getMessage());
}
public void testGetHistogramAggregation_MissingHistogramAgg() {
TermsAggregationBuilder terms = AggregationBuilders.terms("top_level");
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(terms).getAggregatorFactories()));
assertEquals("A date_histogram (or histogram) aggregation is required", e.getMessage());
}
public void testGetHistogramAggregation_DateHistogramHasSibling() {
AvgAggregationBuilder avg = AggregationBuilders.avg("avg");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time");
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(avg).addAggregator(dateHistogram).getAggregatorFactories()));
assertEquals("The date_histogram (or histogram) aggregation cannot have sibling aggregations", e.getMessage());
TermsAggregationBuilder terms = AggregationBuilders.terms("terms");
terms.subAggregation(dateHistogram);
terms.subAggregation(avg);
e = expectThrows(ElasticsearchException.class,
() -> DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(terms).getAggregatorFactories()));
assertEquals("The date_histogram (or histogram) aggregation cannot have sibling aggregations", e.getMessage());
}
public void testGetHistogramAggregation() {
AvgAggregationBuilder avg = AggregationBuilders.avg("avg");
TermsAggregationBuilder nestedTerms = AggregationBuilders.terms("nested_terms");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time");
AggregationBuilder histogramAggregationBuilder = DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(dateHistogram).getAggregatorFactories());
assertEquals(dateHistogram, histogramAggregationBuilder);
dateHistogram.subAggregation(avg).subAggregation(nestedTerms);
histogramAggregationBuilder = DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(dateHistogram).getAggregatorFactories());
assertEquals(dateHistogram, histogramAggregationBuilder);
TermsAggregationBuilder toplevelTerms = AggregationBuilders.terms("top_level");
toplevelTerms.subAggregation(dateHistogram);
histogramAggregationBuilder = DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(toplevelTerms).getAggregatorFactories());
assertEquals(dateHistogram, histogramAggregationBuilder);
}
public static String randomValidDatafeedId() {
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
return generator.ofCodePointsLength(random(), 10, 10);

View File

@ -0,0 +1,99 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.test.ESTestCase;
import org.joda.time.DateTimeZone;
import java.util.TimeZone;
import static org.hamcrest.Matchers.equalTo;
public class ExtractorUtilsTests extends ESTestCase {
public void testGetHistogramAggregation_DateHistogramHasSibling() {
AvgAggregationBuilder avg = AggregationBuilders.avg("avg");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time");
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> ExtractorUtils.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(avg).addAggregator(dateHistogram).getAggregatorFactories()));
assertEquals("The date_histogram (or histogram) aggregation cannot have sibling aggregations", e.getMessage());
TermsAggregationBuilder terms = AggregationBuilders.terms("terms");
terms.subAggregation(dateHistogram);
terms.subAggregation(avg);
e = expectThrows(ElasticsearchException.class,
() -> ExtractorUtils.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(terms).getAggregatorFactories()));
assertEquals("The date_histogram (or histogram) aggregation cannot have sibling aggregations", e.getMessage());
}
public void testGetHistogramAggregation() {
AvgAggregationBuilder avg = AggregationBuilders.avg("avg");
TermsAggregationBuilder nestedTerms = AggregationBuilders.terms("nested_terms");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time");
AggregationBuilder histogramAggregationBuilder = ExtractorUtils.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(dateHistogram).getAggregatorFactories());
assertEquals(dateHistogram, histogramAggregationBuilder);
dateHistogram.subAggregation(avg).subAggregation(nestedTerms);
histogramAggregationBuilder = ExtractorUtils.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(dateHistogram).getAggregatorFactories());
assertEquals(dateHistogram, histogramAggregationBuilder);
TermsAggregationBuilder toplevelTerms = AggregationBuilders.terms("top_level");
toplevelTerms.subAggregation(dateHistogram);
histogramAggregationBuilder = ExtractorUtils.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(toplevelTerms).getAggregatorFactories());
assertEquals(dateHistogram, histogramAggregationBuilder);
}
public void testGetHistogramAggregation_MissingHistogramAgg() {
TermsAggregationBuilder terms = AggregationBuilders.terms("top_level");
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> ExtractorUtils.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(terms).getAggregatorFactories()));
assertEquals("A date_histogram (or histogram) aggregation is required", e.getMessage());
}
public void testGetHistogramIntervalMillis_GivenDateHistogramWithInvalidTimeZone() {
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("bucket").field("time")
.interval(300000L).timeZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("EST"))).subAggregation(maxTime);
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> ExtractorUtils.getHistogramIntervalMillis(dateHistogram));
assertThat(e.getMessage(), equalTo("ML requires date_histogram.time_zone to be UTC"));
}
public void testIsHistogram() {
assertTrue(ExtractorUtils.isHistogram(AggregationBuilders.dateHistogram("time")));
assertTrue(ExtractorUtils.isHistogram(AggregationBuilders.histogram("time")));
assertFalse(ExtractorUtils.isHistogram(AggregationBuilders.max("time")));
}
public void testValidateAndGetCalendarInterval() {
assertEquals(300 * 1000L, ExtractorUtils.validateAndGetCalendarInterval("5m"));
assertEquals(7200 * 1000L, ExtractorUtils.validateAndGetCalendarInterval("2h"));
assertEquals(86400L * 1000L, ExtractorUtils.validateAndGetCalendarInterval("1d"));
}
public void testValidateAndGetCalendarInterval_intervalIsLongerThanAWeek() {
expectThrows(ElasticsearchException.class,
() -> ExtractorUtils.validateAndGetCalendarInterval("8d"));
}
}

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;

View File

@ -100,7 +100,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
createHistogramBucket(2000L, 5, Collections.singletonList(createMax("time", 2000)))
);
String json = aggToString("time", Collections.emptySet(), false, histogramBuckets);
String json = aggToString("time", Collections.emptySet(), false, histogramBuckets, 0L);
assertThat(json, equalTo("{\"time\":1000} {\"time\":2000}"));
assertThat(keyValuePairsWritten, equalTo(2L));
@ -246,7 +246,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
createTerms("my_field", new Term("c", 4, c4NumericAggs), new Term("b", 3, b4NumericAggs))))
);
String json = aggToString("time", Sets.newHashSet("my_field", "my_value", "my_value2"), false, histogramBuckets);
String json = aggToString("time", Sets.newHashSet("my_field", "my_value", "my_value2"), false, histogramBuckets, 0L);
assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"my_value\":111.0,\"my_value2\":112.0} " +
"{\"time\":1000,\"my_field\":\"b\",\"my_value2\":122.0} " +
@ -364,10 +364,11 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
assertThat(e.getMessage(), containsString("Multi-percentile aggregation [my_field] is not supported"));
}
@SuppressWarnings("unchecked")
public void testBucketAggContainsRequiredAgg() throws IOException {
Set<String> fields = new HashSet<>();
fields.add("foo");
AggregationToJsonProcessor processor = new AggregationToJsonProcessor("time", fields, false);
AggregationToJsonProcessor processor = new AggregationToJsonProcessor("time", fields, false, 0L);
Terms termsAgg = mock(Terms.class);
when(termsAgg.getBuckets()).thenReturn(Collections.emptyList());
@ -394,27 +395,52 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
assertTrue(processor.bucketAggContainsRequiredAgg(termsAgg));
}
public void testBucketsBeforeStartArePruned() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 4, Arrays.asList(
createMax("time", 1000), createPercentiles("my_field", 1.0))),
createHistogramBucket(2000L, 7, Arrays.asList(
createMax("time", 2000), createPercentiles("my_field", 2.0))),
createHistogramBucket(3000L, 10, Arrays.asList(
createMax("time", 3000), createPercentiles("my_field", 3.0))),
createHistogramBucket(4000L, 14, Arrays.asList(
createMax("time", 4000), createPercentiles("my_field", 4.0)))
);
String json = aggToString("time", Sets.newHashSet("my_field"), true, histogramBuckets, 2000L);
assertThat(json, equalTo("{\"time\":2000,\"my_field\":2.0,\"doc_count\":7} " +
"{\"time\":3000,\"my_field\":3.0,\"doc_count\":10} " +
"{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}"));
}
private String aggToString(String timeField, Set<String> fields, Histogram.Bucket bucket) throws IOException {
return aggToString(timeField, fields, true, Collections.singletonList(bucket));
return aggToString(timeField, fields, true, Collections.singletonList(bucket), 0L);
}
private String aggToString(String timeField, Set<String> fields, List<Histogram.Bucket> buckets) throws IOException {
return aggToString(timeField, fields, true, buckets);
return aggToString(timeField, fields, true, buckets, 0L);
}
private String aggToString(String timeField, Set<String> fields, boolean includeDocCount, List<Histogram.Bucket> buckets)
private String aggToString(String timeField, Set<String> fields, boolean includeDocCount, List<Histogram.Bucket> buckets,
long startTime)
throws IOException {
Histogram histogram = createHistogramAggregation("buckets", buckets);
return aggToString(timeField, fields, includeDocCount, createAggs(Collections.singletonList(histogram)));
return aggToString(timeField, fields, includeDocCount, createAggs(Collections.singletonList(histogram)), startTime);
}
private String aggToString(String timeField, Set<String> fields, boolean includeDocCount, Aggregations aggregations)
throws IOException {
return aggToString(timeField, fields, includeDocCount, aggregations, 0L);
}
private String aggToString(String timeField, Set<String> fields, boolean includeDocCount, Aggregations aggregations, long startTime)
throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
AggregationToJsonProcessor processor = new AggregationToJsonProcessor(timeField, fields, includeDocCount);
AggregationToJsonProcessor processor = new AggregationToJsonProcessor(timeField, fields, includeDocCount, startTime);
processor.process(aggregations);
processor.writeDocs(10000, outputStream);
keyValuePairsWritten = processor.getKeyValueCount();