From 4b531c4dbb660902a7006f9ee08b1451b6ef2e45 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 3 Aug 2017 11:58:58 +0100 Subject: [PATCH] [ML] Check histogram interval is a divisor of bucketspan (elastic/x-pack-elasticsearch#2153) Original commit: elastic/x-pack-elasticsearch@356dfa719c37aa8be463b62a0d81f2e3d571ef71 --- .../xpack/ml/datafeed/DatafeedJobValidator.java | 8 ++++++++ .../aggregation/AggregationDataExtractor.java | 11 ++++------- .../xpack/ml/job/messages/Messages.java | 2 ++ .../ml/datafeed/DatafeedJobValidatorTests.java | 17 +++++++++++++++++ .../ml/integration/DatafeedJobsRestIT.java | 13 ++++++++++++- 5 files changed, 43 insertions(+), 8 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidator.java index 85cec627bfc..0e4dc939a33 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidator.java @@ -48,5 +48,13 @@ public final class DatafeedJobValidator { TimeValue.timeValueMillis(histogramIntervalMillis).getStringRep(), TimeValue.timeValueMillis(bucketSpanMillis).getStringRep())); } + + if (bucketSpanMillis % histogramIntervalMillis != 0) { + throw ExceptionsHelper.badRequestException(Messages.getMessage( + Messages.DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_DIVISOR_OF_BUCKET_SPAN, + TimeValue.timeValueMillis(histogramIntervalMillis).getStringRep(), + TimeValue.timeValueMillis(bucketSpanMillis).getStringRep())); + } + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java index da732fb6e5f..fb608ef7b83 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java @@ -115,13 +115,10 @@ class AggregationDataExtractor implements DataExtractor { } private SearchRequestBuilder buildSearchRequest() { - long histogramSearchStartTime = context.start; - if (context.aggs.getPipelineAggregatorFactories().isEmpty() == false) { - // For derivative aggregations the first bucket will always be null - // so query one extra histogram bucket back and hope there is data - // in that bucket - histogramSearchStartTime = Math.max(0, context.start - getHistogramInterval()); - } + // For derivative aggregations the first bucket will always be null + // so query one extra histogram bucket back and hope there is data + // in that bucket + long histogramSearchStartTime = Math.max(0, context.start - getHistogramInterval()); SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client) .setIndices(context.indices) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java index a105113ef77..fa549d0b602 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java @@ -30,6 +30,8 @@ public final class Messages { "The date_histogram (or histogram) aggregation cannot have sibling aggregations"; public static final String DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO = "Aggregation interval must be greater than 0"; + public static final String DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_DIVISOR_OF_BUCKET_SPAN = + "Aggregation interval [{0}] must be a divisor of the bucket_span [{1}]"; public static final String DATAFEED_AGGREGATIONS_INTERVAL_MUST_LESS_OR_EQUAL_TO_BUCKET_SPAN = "Aggregation interval [{0}] must be less than or equal to the bucket_span [{1}]"; public static final String DATAFEED_DATA_HISTOGRAM_MUST_HAVE_NESTED_MAX_AGGREGATION = diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java index cf2e121298b..bdc21fa95b4 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java @@ -123,6 +123,23 @@ public class DatafeedJobValidatorTests extends ESTestCase { assertEquals("Aggregation interval [1800001ms] must be less than or equal to the bucket_span [1800000ms]", e.getMessage()); } + public void testVerify_HistogramIntervalIsDivisorOfBucketSpan() throws IOException { + Job.Builder builder = buildJobBuilder("foo"); + AnalysisConfig.Builder ac = createAnalysisConfig(); + ac.setSummaryCountFieldName("some_count"); + ac.setBucketSpan(TimeValue.timeValueMinutes(5)); + builder.setAnalysisConfig(ac); + Job job = builder.build(new Date()); + DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs(37 * 1000).build(); + + ElasticsearchStatusException e = ESTestCase.expectThrows(ElasticsearchStatusException.class, + () -> DatafeedJobValidator.validate(datafeedConfig, job)); + assertEquals("Aggregation interval [37000ms] must be a divisor of the bucket_span [300000ms]", e.getMessage()); + + DatafeedConfig goodDatafeedConfig = createValidDatafeedConfigWithAggs(60 * 1000).build(); + DatafeedJobValidator.validate(goodDatafeedConfig, job); + } + private static Job.Builder buildJobBuilder(String id) { Job.Builder builder = new Job.Builder(id); AnalysisConfig.Builder ac = createAnalysisConfig(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java index 78cd0223e1a..52127ab895f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java @@ -442,7 +442,10 @@ public class DatafeedJobsRestIT extends ESRestTestCase { + "\"aggs\": {\"timestamp\":{\"max\":{\"field\":\"timestamp\"}}," + "\"bytes-delta\":{\"derivative\":{\"buckets_path\":\"avg_bytes_out\"}}," + "\"avg_bytes_out\":{\"avg\":{\"field\":\"network_bytes_out\"}} }}}}}"; - new DatafeedBuilder(datafeedId, jobId, "network-data", "doc").setAggregations(aggregations).build(); + new DatafeedBuilder(datafeedId, jobId, "network-data", "doc") + .setAggregations(aggregations) + .setChunkingTimespan("300s") + .build(); openJob(client(), jobId); @@ -696,6 +699,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase { String scriptedFields; String aggregations; String authHeader = BASIC_AUTH_VALUE_SUPER_USER; + String chunkingTimespan; DatafeedBuilder(String datafeedId, String jobId, String index, String type) { this.datafeedId = datafeedId; @@ -724,12 +728,19 @@ public class DatafeedJobsRestIT extends ESRestTestCase { return this; } + DatafeedBuilder setChunkingTimespan(String timespan) { + chunkingTimespan = timespan; + return this; + } + Response build() throws IOException { String datafeedConfig = "{" + "\"job_id\": \"" + jobId + "\",\"indexes\":[\"" + index + "\"],\"types\":[\"" + type + "\"]" + (source ? ",\"_source\":true" : "") + (scriptedFields == null ? "" : ",\"script_fields\":" + scriptedFields) + (aggregations == null ? "" : ",\"aggs\":" + aggregations) + + (chunkingTimespan == null ? "" : + ",\"chunking_config\":{\"mode\":\"MANUAL\",\"time_span\":\"" + chunkingTimespan + "\"}") + "}"; return client().performRequest("put", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId, Collections.emptyMap(), new StringEntity(datafeedConfig, ContentType.APPLICATION_JSON),