[ML] Improve validations for datafeed with aggregations (elastic/x-pack-elasticsearch#917)

Adds following validations:

- aggregations must contain date_histogram or histogram at the top level
- a date_histogram has to have its time_zone to UTC (or unset which
  defaults to UTC)
- a date_histogram supports calendar intervals only up to 1 week
to avoid the length variability of longer intervals
- aggregation interval must be greater than zero
- aggregation interval must be less than or equal to the bucket_span

Original commit: elastic/x-pack-elasticsearch@404496a886
This commit is contained in:
Dimitris Athanasiou 2017-04-03 17:55:26 +01:00 committed by GitHub
parent 84f08ee02c
commit 758b689f51
8 changed files with 285 additions and 117 deletions

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.rounding.DateTimeUnit;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -19,13 +20,17 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryParseContext; import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.MlStrings; import org.elasticsearch.xpack.ml.utils.MlStrings;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils; import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
import org.joda.time.DateTimeZone;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -202,6 +207,89 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
return aggregations; return aggregations;
} }
/**
* Returns the top level histogram's interval as epoch millis.
* The method expects a valid top level aggregation to exist.
*/
public long getHistogramIntervalMillis() {
AggregationBuilder topLevelAgg = getTopLevelAgg();
if (topLevelAgg == null) {
throw new IllegalStateException("No aggregations exist");
}
if (topLevelAgg instanceof HistogramAggregationBuilder) {
return (long) ((HistogramAggregationBuilder) topLevelAgg).interval();
} else if (topLevelAgg instanceof DateHistogramAggregationBuilder) {
return validateAndGetDateHistogramInterval((DateHistogramAggregationBuilder) topLevelAgg);
} else {
throw new IllegalStateException("Invalid top level aggregation [" + topLevelAgg.getName() + "]");
}
}
private AggregationBuilder getTopLevelAgg() {
if (aggregations == null || aggregations.getAggregatorFactories().isEmpty()) {
return null;
}
return aggregations.getAggregatorFactories().get(0);
}
/**
* Returns the date histogram interval as epoch millis if valid, or throws
* an {@link IllegalArgumentException} with the validation error
*/
private static long validateAndGetDateHistogramInterval(DateHistogramAggregationBuilder dateHistogram) {
if (dateHistogram.timeZone() != null && dateHistogram.timeZone().equals(DateTimeZone.UTC) == false) {
throw new IllegalArgumentException("ML requires date_histogram.time_zone to be UTC");
}
if (dateHistogram.dateHistogramInterval() != null) {
return validateAndGetCalendarInterval(dateHistogram.dateHistogramInterval().toString());
} else {
return (long) dateHistogram.interval();
}
}
private static long validateAndGetCalendarInterval(String calendarInterval) {
TimeValue interval;
DateTimeUnit dateTimeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(calendarInterval);
if (dateTimeUnit != null) {
switch (dateTimeUnit) {
case WEEK_OF_WEEKYEAR:
interval = new TimeValue(7, TimeUnit.DAYS);
break;
case DAY_OF_MONTH:
interval = new TimeValue(1, TimeUnit.DAYS);
break;
case HOUR_OF_DAY:
interval = new TimeValue(1, TimeUnit.HOURS);
break;
case MINUTES_OF_HOUR:
interval = new TimeValue(1, TimeUnit.MINUTES);
break;
case SECOND_OF_MINUTE:
interval = new TimeValue(1, TimeUnit.SECONDS);
break;
case MONTH_OF_YEAR:
case YEAR_OF_CENTURY:
case QUARTER:
throw new IllegalArgumentException(invalidDateHistogramCalendarIntervalMessage(calendarInterval));
default:
throw new IllegalArgumentException("Unexpected dateTimeUnit [" + dateTimeUnit + "]");
}
} else {
interval = TimeValue.parseTimeValue(calendarInterval, "date_histogram.interval");
}
if (interval.days() > 7) {
throw new IllegalArgumentException(invalidDateHistogramCalendarIntervalMessage(calendarInterval));
}
return interval.millis();
}
private static String invalidDateHistogramCalendarIntervalMessage(String interval) {
throw new IllegalArgumentException("When specifying a date_histogram calendar interval ["
+ interval + "], ML does not accept intervals longer than a week because of " +
"variable lengths of periods greater than a week");
}
/** /**
* @return {@code true} when there are non-empty aggregations, {@code false} otherwise * @return {@code true} when there are non-empty aggregations, {@code false} otherwise
*/ */
@ -442,17 +530,39 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
if (types == null || types.isEmpty() || types.contains(null) || types.contains("")) { if (types == null || types.isEmpty() || types.contains(null) || types.contains("")) {
throw invalidOptionValue(TYPES.getPreferredName(), types); throw invalidOptionValue(TYPES.getPreferredName(), types);
} }
if (aggregations != null && (scriptFields != null && !scriptFields.isEmpty())) { validateAggregations();
throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS));
}
return new DatafeedConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize, return new DatafeedConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize,
source, chunkingConfig); source, chunkingConfig);
} }
private void validateAggregations() {
if (aggregations == null) {
return;
}
if (scriptFields != null && !scriptFields.isEmpty()) {
throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS));
}
List<AggregationBuilder> aggregatorFactories = aggregations.getAggregatorFactories();
if (aggregatorFactories.isEmpty()) {
throw new IllegalArgumentException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM);
}
AggregationBuilder topLevelAgg = aggregatorFactories.get(0);
if (topLevelAgg instanceof HistogramAggregationBuilder) {
if (((HistogramAggregationBuilder) topLevelAgg).interval() <= 0) {
throw new IllegalArgumentException(Messages.DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO);
}
} else if (topLevelAgg instanceof DateHistogramAggregationBuilder) {
if (validateAndGetDateHistogramInterval((DateHistogramAggregationBuilder) topLevelAgg) <= 0) {
throw new IllegalArgumentException(Messages.DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO);
}
} else {
throw new IllegalArgumentException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM);
}
}
private static ElasticsearchException invalidOptionValue(String fieldName, Object value) { private static ElasticsearchException invalidOptionValue(String fieldName, Object value) {
String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, fieldName, value); String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, fieldName, value);
throw new IllegalArgumentException(msg); throw new IllegalArgumentException(msg);
} }
} }
} }

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.datafeed; package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
@ -24,9 +25,27 @@ public final class DatafeedJobValidator {
if (analysisConfig.getLatency() != null && analysisConfig.getLatency().seconds() > 0) { if (analysisConfig.getLatency() != null && analysisConfig.getLatency().seconds() > 0) {
throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY)); throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY));
} }
if (datafeedConfig.hasAggregations() && Strings.isNullOrEmpty(analysisConfig.getSummaryCountFieldName())) { if (datafeedConfig.hasAggregations()) {
throw new IllegalArgumentException( checkSummaryCountFieldNameIsSet(analysisConfig);
Messages.getMessage(Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, DatafeedConfig.DOC_COUNT)); checkValidHistogramInterval(datafeedConfig, analysisConfig);
}
}
private static void checkSummaryCountFieldNameIsSet(AnalysisConfig analysisConfig) {
if (Strings.isNullOrEmpty(analysisConfig.getSummaryCountFieldName())) {
throw new IllegalArgumentException(Messages.getMessage(
Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD));
}
}
private static void checkValidHistogramInterval(DatafeedConfig datafeedConfig, AnalysisConfig analysisConfig) {
long histogramIntervalMillis = datafeedConfig.getHistogramIntervalMillis();
long bucketSpanMillis = analysisConfig.getBucketSpan().millis();
if (histogramIntervalMillis > bucketSpanMillis) {
throw new IllegalArgumentException(Messages.getMessage(
Messages.DATAFEED_AGGREGATIONS_INTERVAL_MUST_LESS_OR_EQUAL_TO_BUCKET_SPAN,
TimeValue.timeValueMillis(histogramIntervalMillis).getStringRep(),
TimeValue.timeValueMillis(bucketSpanMillis).getStringRep()));
} }
} }
} }

View File

@ -22,6 +22,12 @@ public final class Messages {
public static final String DATAFEED_CONFIG_INVALID_OPTION_VALUE = "Invalid {0} value ''{1}'' in datafeed configuration"; public static final String DATAFEED_CONFIG_INVALID_OPTION_VALUE = "Invalid {0} value ''{1}'' in datafeed configuration";
public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency"; public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency";
public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists"; public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists";
public static final String DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM =
"A top level date_histogram (or histogram) aggregation is required";
public static final String DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO =
"Aggregation interval must be greater than 0";
public static final String DATAFEED_AGGREGATIONS_INTERVAL_MUST_LESS_OR_EQUAL_TO_BUCKET_SPAN =
"Aggregation interval [{0}] must be less than or equal to the bucket_span [{1}]";
public static final String INCONSISTENT_ID = public static final String INCONSISTENT_ID =
"Inconsistent {0}; ''{1}'' specified in the body differs from ''{2}'' specified as a URL argument"; "Inconsistent {0}; ''{1}'' specified in the body differs from ''{2}'' specified as a URL argument";
@ -118,11 +124,6 @@ public final class Messages {
public static final String JOB_CONFIG_DETECTOR_COUNT_DISALLOWED = public static final String JOB_CONFIG_DETECTOR_COUNT_DISALLOWED =
"''count'' is not a permitted value for {0}"; "''count'' is not a permitted value for {0}";
public static final String JOB_DATA_CONCURRENT_USE_CLOSE = "Cannot close job {0} while the job is processing another request";
public static final String JOB_DATA_CONCURRENT_USE_FLUSH = "Cannot flush job {0} while the job is processing another request";
public static final String JOB_DATA_CONCURRENT_USE_UPDATE = "Cannot update job {0} while the job is processing another request";
public static final String JOB_DATA_CONCURRENT_USE_UPLOAD = "Cannot write to job {0} while the job is processing another request";
public static final String JOB_UNKNOWN_ID = "No known job with id ''{0}''"; public static final String JOB_UNKNOWN_ID = "No known job with id ''{0}''";
public static final String REST_CANNOT_DELETE_HIGHEST_PRIORITY = public static final String REST_CANNOT_DELETE_HIGHEST_PRIORITY =

View File

@ -52,7 +52,8 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
if (randomBoolean()) { if (randomBoolean()) {
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job.getAnalysisConfig()); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job.getAnalysisConfig());
analysisConfig.setLatency(null); analysisConfig.setLatency(null);
DatafeedConfig datafeedConfig = DatafeedConfigTests.createRandomizedDatafeedConfig(job.getId()); DatafeedConfig datafeedConfig = DatafeedConfigTests.createRandomizedDatafeedConfig(
job.getId(), job.getAnalysisConfig().getBucketSpan().millis());
if (datafeedConfig.hasAggregations()) { if (datafeedConfig.hasAggregations()) {
analysisConfig.setSummaryCountFieldName("doc_count"); analysisConfig.setSummaryCountFieldName("doc_count");
} }

View File

@ -13,17 +13,22 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import org.joda.time.DateTimeZone;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.TimeZone;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ -35,6 +40,10 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
} }
public static DatafeedConfig createRandomizedDatafeedConfig(String jobId) { public static DatafeedConfig createRandomizedDatafeedConfig(String jobId) {
return createRandomizedDatafeedConfig(jobId, 3600000);
}
public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long bucketSpanMillis) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(randomValidDatafeedId(), jobId); DatafeedConfig.Builder builder = new DatafeedConfig.Builder(randomValidDatafeedId(), jobId);
builder.setIndexes(randomStringList(1, 10)); builder.setIndexes(randomStringList(1, 10));
builder.setTypes(randomStringList(1, 10)); builder.setTypes(randomStringList(1, 10));
@ -56,7 +65,10 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
// the actual xcontent isn't the same and test fail. // the actual xcontent isn't the same and test fail.
// Testing with a single agg is ok as we don't have special list writeable / xconent logic // Testing with a single agg is ok as we don't have special list writeable / xconent logic
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
aggs.addAggregator(AggregationBuilders.avg(randomAsciiOfLength(10)).field(randomAsciiOfLength(10))); long interval = randomNonNegativeLong();
interval = interval > bucketSpanMillis ? bucketSpanMillis : interval;
interval = interval <= 0 ? 1 : interval;
aggs.addAggregator(AggregationBuilders.dateHistogram("time").interval(interval));
builder.setAggregations(aggs); builder.setAggregations(aggs);
} }
if (randomBoolean()) { if (randomBoolean()) {
@ -117,89 +129,6 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
assertEquals(expectedDatafeedConfig.build(), defaultedDatafeedConfig.build()); assertEquals(expectedDatafeedConfig.build(), defaultedDatafeedConfig.build());
} }
public void testEquals_GivenDifferentQueryDelay() {
DatafeedConfig.Builder b1 = createFullyPopulated();
DatafeedConfig.Builder b2 = createFullyPopulated();
b2.setQueryDelay(TimeValue.timeValueMinutes(2));
DatafeedConfig sc1 = b1.build();
DatafeedConfig sc2 = b2.build();
assertFalse(sc1.equals(sc2));
assertFalse(sc2.equals(sc1));
}
public void testEquals_GivenDifferentScrollSize() {
DatafeedConfig.Builder b1 = createFullyPopulated();
DatafeedConfig.Builder b2 = createFullyPopulated();
b2.setScrollSize(1);
DatafeedConfig sc1 = b1.build();
DatafeedConfig sc2 = b2.build();
assertFalse(sc1.equals(sc2));
assertFalse(sc2.equals(sc1));
}
public void testEquals_GivenDifferentFrequency() {
DatafeedConfig.Builder b1 = createFullyPopulated();
DatafeedConfig.Builder b2 = createFullyPopulated();
b2.setFrequency(TimeValue.timeValueSeconds(90));
DatafeedConfig sc1 = b1.build();
DatafeedConfig sc2 = b2.build();
assertFalse(sc1.equals(sc2));
assertFalse(sc2.equals(sc1));
}
public void testEquals_GivenDifferentIndexes() {
DatafeedConfig.Builder sc1 = createFullyPopulated();
DatafeedConfig.Builder sc2 = createFullyPopulated();
sc2.setIndexes(Arrays.asList("blah", "di", "blah"));
assertFalse(sc1.build().equals(sc2.build()));
assertFalse(sc2.build().equals(sc1.build()));
}
public void testEquals_GivenDifferentTypes() {
DatafeedConfig.Builder sc1 = createFullyPopulated();
DatafeedConfig.Builder sc2 = createFullyPopulated();
sc2.setTypes(Arrays.asList("blah", "di", "blah"));
assertFalse(sc1.build().equals(sc2.build()));
assertFalse(sc2.build().equals(sc1.build()));
}
public void testEquals_GivenDifferentQuery() {
DatafeedConfig.Builder b1 = createFullyPopulated();
DatafeedConfig.Builder b2 = createFullyPopulated();
b2.setQuery(QueryBuilders.termQuery("foo", "bar"));
DatafeedConfig sc1 = b1.build();
DatafeedConfig sc2 = b2.build();
assertFalse(sc1.equals(sc2));
assertFalse(sc2.equals(sc1));
}
public void testEquals_GivenDifferentAggregations() {
DatafeedConfig.Builder sc1 = createFullyPopulated();
DatafeedConfig.Builder sc2 = createFullyPopulated();
sc2.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.count("foo")));
assertFalse(sc1.build().equals(sc2.build()));
assertFalse(sc2.build().equals(sc1.build()));
}
private static DatafeedConfig.Builder createFullyPopulated() {
DatafeedConfig.Builder sc = new DatafeedConfig.Builder("datafeed1", "job1");
sc.setIndexes(Arrays.asList("myIndex"));
sc.setTypes(Arrays.asList("myType1", "myType2"));
sc.setFrequency(TimeValue.timeValueSeconds(60));
sc.setScrollSize(5000);
sc.setQuery(QueryBuilders.matchAllQuery());
sc.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo")));
sc.setQueryDelay(TimeValue.timeValueMillis(900));
return sc;
}
public void testCheckValid_GivenNullIndexes() throws IOException { public void testCheckValid_GivenNullIndexes() throws IOException {
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
expectThrows(IllegalArgumentException.class, () -> conf.setIndexes(null)); expectThrows(IllegalArgumentException.class, () -> conf.setIndexes(null));
@ -280,28 +209,115 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
assertThat(datafeedConfig.hasAggregations(), is(false)); assertThat(datafeedConfig.hasAggregations(), is(false));
} }
public void testHasAggregations_GivenEmpty() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndexes(Arrays.asList("myIndex"));
builder.setTypes(Arrays.asList("myType"));
builder.setAggregations(new AggregatorFactories.Builder());
DatafeedConfig datafeedConfig = builder.build();
assertThat(datafeedConfig.hasAggregations(), is(false));
}
public void testHasAggregations_NonEmpty() { public void testHasAggregations_NonEmpty() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1"); DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndexes(Arrays.asList("myIndex")); builder.setIndexes(Arrays.asList("myIndex"));
builder.setTypes(Arrays.asList("myType")); builder.setTypes(Arrays.asList("myType"));
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo"))); builder.setAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(300000)));
DatafeedConfig datafeedConfig = builder.build(); DatafeedConfig datafeedConfig = builder.build();
assertThat(datafeedConfig.hasAggregations(), is(true)); assertThat(datafeedConfig.hasAggregations(), is(true));
} }
public void testBuild_GivenEmptyAggregations() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndexes(Arrays.asList("myIndex"));
builder.setTypes(Arrays.asList("myType"));
builder.setAggregations(new AggregatorFactories.Builder());
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> builder.build());
assertThat(e.getMessage(), equalTo("A top level date_histogram (or histogram) aggregation is required"));
}
public void testBuild_GivenTopLevelAggIsTerms() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndexes(Arrays.asList("myIndex"));
builder.setTypes(Arrays.asList("myType"));
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.terms("foo")));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> builder.build());
assertThat(e.getMessage(), equalTo("A top level date_histogram (or histogram) aggregation is required"));
}
public void testBuild_GivenHistogramWithDefaultInterval() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndexes(Arrays.asList("myIndex"));
builder.setTypes(Arrays.asList("myType"));
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.histogram("time")));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> builder.build());
assertThat(e.getMessage(), equalTo("Aggregation interval must be greater than 0"));
}
public void testBuild_GivenDateHistogramWithInvalidTimeZone() {
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time")
.interval(300000L).timeZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("EST")));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> createDatafeedWithDateHistogram(dateHistogram));
assertThat(e.getMessage(), equalTo("ML requires date_histogram.time_zone to be UTC"));
}
public void testBuild_GivenDateHistogramWithDefaultInterval() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> createDatafeedWithDateHistogram((String) null));
assertThat(e.getMessage(), equalTo("Aggregation interval must be greater than 0"));
}
public void testBuild_GivenValidDateHistogram() {
long millisInDay = 24 * 3600000L;
assertThat(createDatafeedWithDateHistogram("1s").getHistogramIntervalMillis(), equalTo(1000L));
assertThat(createDatafeedWithDateHistogram("2s").getHistogramIntervalMillis(), equalTo(2000L));
assertThat(createDatafeedWithDateHistogram("1m").getHistogramIntervalMillis(), equalTo(60000L));
assertThat(createDatafeedWithDateHistogram("2m").getHistogramIntervalMillis(), equalTo(120000L));
assertThat(createDatafeedWithDateHistogram("1h").getHistogramIntervalMillis(), equalTo(3600000L));
assertThat(createDatafeedWithDateHistogram("2h").getHistogramIntervalMillis(), equalTo(7200000L));
assertThat(createDatafeedWithDateHistogram("1d").getHistogramIntervalMillis(), equalTo(millisInDay));
assertThat(createDatafeedWithDateHistogram("7d").getHistogramIntervalMillis(), equalTo(7 * millisInDay));
assertThat(createDatafeedWithDateHistogram(7 * millisInDay + 1).getHistogramIntervalMillis(),
equalTo(7 * millisInDay + 1));
}
public void testBuild_GivenDateHistogramWithMoreThanCalendarWeek() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> createDatafeedWithDateHistogram("8d"));
assertThat(e.getMessage(), containsString("When specifying a date_histogram calendar interval [8d]"));
}
public static String randomValidDatafeedId() { public static String randomValidDatafeedId() {
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
return generator.ofCodePointsLength(random(), 10, 10); return generator.ofCodePointsLength(random(), 10, 10);
} }
private static DatafeedConfig createDatafeedWithDateHistogram(String interval) {
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time");
if (interval != null) {
dateHistogram.dateHistogramInterval(new DateHistogramInterval(interval));
}
return createDatafeedWithDateHistogram(dateHistogram);
}
private static DatafeedConfig createDatafeedWithDateHistogram(Long interval) {
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time");
if (interval != null) {
dateHistogram.interval(interval);
}
return createDatafeedWithDateHistogram(dateHistogram);
}
private static DatafeedConfig createDatafeedWithDateHistogram(DateHistogramAggregationBuilder dateHistogram) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndexes(Arrays.asList("myIndex"));
builder.setTypes(Arrays.asList("myType"));
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(dateHistogram));
return builder.build();
}
} }

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Detector;
@ -70,7 +71,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
ac.setBucketSpan(TimeValue.timeValueSeconds(1800)); ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
builder.setAnalysisConfig(ac); builder.setAnalysisConfig(ac);
Job job = builder.build(); Job job = builder.build();
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs().build(); DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs(1800.0).build();
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
() -> DatafeedJobValidator.validate(datafeedConfig, job)); () -> DatafeedJobValidator.validate(datafeedConfig, job));
@ -87,7 +88,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
ac.setBucketSpan(TimeValue.timeValueSeconds(1800)); ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
builder.setAnalysisConfig(ac); builder.setAnalysisConfig(ac);
Job job = builder.build(); Job job = builder.build();
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs().build(); DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs(1800.0).build();
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
() -> DatafeedJobValidator.validate(datafeedConfig, job)); () -> DatafeedJobValidator.validate(datafeedConfig, job));
@ -102,11 +103,26 @@ public class DatafeedJobValidatorTests extends ESTestCase {
ac.setBucketSpan(TimeValue.timeValueSeconds(1800)); ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
builder.setAnalysisConfig(ac); builder.setAnalysisConfig(ac);
Job job = builder.build(); Job job = builder.build();
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs().build(); DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs(900.0).build();
DatafeedJobValidator.validate(datafeedConfig, job); DatafeedJobValidator.validate(datafeedConfig, job);
} }
public void testVerify_GivenHistogramIntervalGreaterThanBucketSpan() throws IOException {
Job.Builder builder = buildJobBuilder("foo");
AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setSummaryCountFieldName("some_count");
ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
builder.setAnalysisConfig(ac);
Job job = builder.build();
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs(1800001.0).build();
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
() -> DatafeedJobValidator.validate(datafeedConfig, job));
assertEquals("Aggregation interval [1800001ms] must be less than or equal to the bucket_span [1800000ms]", e.getMessage());
}
public static Job.Builder buildJobBuilder(String id) { public static Job.Builder buildJobBuilder(String id) {
Job.Builder builder = new Job.Builder(id); Job.Builder builder = new Job.Builder(id);
builder.setCreateTime(new Date()); builder.setCreateTime(new Date());
@ -123,9 +139,10 @@ public class DatafeedJobValidatorTests extends ESTestCase {
return ac; return ac;
} }
private static DatafeedConfig.Builder createValidDatafeedConfigWithAggs() throws IOException { private static DatafeedConfig.Builder createValidDatafeedConfigWithAggs(double interval) throws IOException {
HistogramAggregationBuilder histogram = AggregationBuilders.histogram("time").interval(interval);
DatafeedConfig.Builder datafeedConfig = createValidDatafeedConfig(); DatafeedConfig.Builder datafeedConfig = createValidDatafeedConfig();
datafeedConfig.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo"))); datafeedConfig.setAggregations(new AggregatorFactories.Builder().addAggregator(histogram));
return datafeedConfig; return datafeedConfig;
} }

View File

@ -154,13 +154,15 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
DatafeedConfig datafeed = datafeedBuilder.build(); DatafeedConfig datafeed = datafeedBuilder.build();
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeed.getId()); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeed.getId());
update.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("a"))); update.setAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("a").interval(300000)));
DatafeedConfig updatedDatafeed = update.build().apply(datafeed); DatafeedConfig updatedDatafeed = update.build().apply(datafeed);
assertThat(updatedDatafeed.getIndexes(), equalTo(Arrays.asList("i_1"))); assertThat(updatedDatafeed.getIndexes(), equalTo(Arrays.asList("i_1")));
assertThat(updatedDatafeed.getTypes(), equalTo(Arrays.asList("t_1"))); assertThat(updatedDatafeed.getTypes(), equalTo(Arrays.asList("t_1")));
assertThat(updatedDatafeed.getAggregations(), assertThat(updatedDatafeed.getAggregations(),
equalTo(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("a")))); equalTo(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("a").interval(300000))));
} }
} }

View File

@ -75,7 +75,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription); jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo"); DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(AggregationBuilders.avg("a"))); datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000)));
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build()); DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build());
@ -88,7 +89,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription); jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo"); DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setAggregations(AggregatorFactories.builder()); datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000)));
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto()); datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build()); DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build());