From 30b745f8469b526695972a876fcc1f4d95e31751 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Mon, 4 Dec 2017 15:22:56 +0000 Subject: [PATCH] [ML] Frequency in datafeeds with aggs must be multiple of hist interval (elastic/x-pack-elasticsearch#3205) relates elastic/x-pack-elasticsearch#3204 Original commit: elastic/x-pack-elasticsearch@0bbd9addd4ca988296d3c230843cb94e43aa4266 --- .../xpack/ml/datafeed/DatafeedConfig.java | 52 +++++++++ .../xpack/ml/datafeed/DatafeedJob.java | 6 +- .../xpack/ml/datafeed/DatafeedJobBuilder.java | 16 +-- .../ml/datafeed/DatafeedJobValidator.java | 13 +++ .../xpack/ml/datafeed/DefaultFrequency.java | 55 ---------- .../xpack/ml/job/messages/Messages.java | 4 +- .../ml/datafeed/DatafeedConfigTests.java | 102 ++++++++++++++++-- .../datafeed/DatafeedJobValidatorTests.java | 33 ++++++ .../ml/datafeed/DefaultFrequencyTests.java | 43 -------- .../integration/BasicDistributedJobsIT.java | 2 +- 10 files changed, 210 insertions(+), 116 deletions(-) delete mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DefaultFrequency.java delete mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DefaultFrequencyTests.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java index d3207499512..9f85fd24568 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java @@ -54,6 +54,11 @@ import java.util.concurrent.TimeUnit; */ public class DatafeedConfig extends AbstractDiffable implements ToXContentObject { + private static final int SECONDS_IN_MINUTE = 60; + private static final int TWO_MINS_SECONDS = 2 * SECONDS_IN_MINUTE; + private static final int TWENTY_MINS_SECONDS = 20 * SECONDS_IN_MINUTE; + private static final int HALF_DAY_SECONDS = 12 * 60 * SECONDS_IN_MINUTE; + // Used for QueryPage public static final ParseField RESULTS_FIELD = new ParseField("datafeeds"); @@ -350,6 +355,53 @@ public class DatafeedConfig extends AbstractDiffable implements return Strings.toString(this); } + /** + * Calculates a sensible default frequency for a given bucket span. + *

+ * The default depends on the bucket span: + *

    + *
  • <= 2 mins -> 1 min
  • + *
  • <= 20 mins -> bucket span / 2
  • + *
  • <= 12 hours -> 10 mins
  • + *
  • > 12 hours -> 1 hour
  • + *
+ * + * If the datafeed has aggregations, the default frequency is the + * closest multiple of the histogram interval based on the rules above. + * + * @param bucketSpan the bucket span + * @return the default frequency + */ + public TimeValue defaultFrequency(TimeValue bucketSpan) { + TimeValue defaultFrequency = defaultFrequencyTarget(bucketSpan); + if (hasAggregations()) { + long histogramIntervalMillis = getHistogramIntervalMillis(); + long targetFrequencyMillis = defaultFrequency.millis(); + long defaultFrequencyMillis = histogramIntervalMillis > targetFrequencyMillis ? histogramIntervalMillis + : (targetFrequencyMillis / histogramIntervalMillis) * histogramIntervalMillis; + defaultFrequency = TimeValue.timeValueMillis(defaultFrequencyMillis); + } + return defaultFrequency; + } + + private TimeValue defaultFrequencyTarget(TimeValue bucketSpan) { + long bucketSpanSeconds = bucketSpan.seconds(); + if (bucketSpanSeconds <= 0) { + throw new IllegalArgumentException("Bucket span has to be > 0"); + } + + if (bucketSpanSeconds <= TWO_MINS_SECONDS) { + return TimeValue.timeValueSeconds(SECONDS_IN_MINUTE); + } + if (bucketSpanSeconds <= TWENTY_MINS_SECONDS) { + return TimeValue.timeValueSeconds(bucketSpanSeconds / 2); + } + if (bucketSpanSeconds <= HALF_DAY_SECONDS) { + return TimeValue.timeValueMinutes(10); + } + return TimeValue.timeValueHours(1); + } + public static class Builder { private static final int DEFAULT_SCROLL_SIZE = 1000; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 72e7c56016c..227c8615a78 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -11,6 +11,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.mapper.DateFieldMapper; @@ -96,9 +97,10 @@ class DatafeedJob { String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STARTED_FROM_TO, DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackStartTimeMs), - endTime == null ? "real-time" : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackEnd)); + endTime == null ? "real-time" : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackEnd), + TimeValue.timeValueMillis(frequencyMs).getStringRep()); auditor.info(jobId, msg); - LOGGER.info("[" + jobId + "] " + msg); + LOGGER.info("[{}] {}", jobId, msg); FlushJobAction.Request request = new FlushJobAction.Request(jobId); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index 1a423a4d9da..181634411c1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -20,7 +20,6 @@ import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.notifications.Auditor; -import java.time.Duration; import java.util.Collections; import java.util.Objects; import java.util.function.Consumer; @@ -47,9 +46,9 @@ public class DatafeedJobBuilder { // Step 5. Build datafeed job object Consumer contextHanlder = context -> { - Duration frequency = getFrequencyOrDefault(datafeed, job); - Duration queryDelay = Duration.ofMillis(datafeed.getQueryDelay().millis()); - DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(), + TimeValue frequency = getFrequencyOrDefault(datafeed, job); + TimeValue queryDelay = datafeed.getQueryDelay(); + DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.millis(), queryDelay.millis(), context.dataExtractorFactory, client, auditor, currentTimeSupplier, context.latestFinalBucketEndMs, context.latestRecordTimeMs); listener.onResponse(datafeedJob); @@ -100,10 +99,13 @@ public class DatafeedJobBuilder { }); } - private static Duration getFrequencyOrDefault(DatafeedConfig datafeed, Job job) { + private static TimeValue getFrequencyOrDefault(DatafeedConfig datafeed, Job job) { TimeValue frequency = datafeed.getFrequency(); - TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan(); - return frequency == null ? DefaultFrequency.ofBucketSpan(bucketSpan.seconds()) : Duration.ofSeconds(frequency.seconds()); + if (frequency == null) { + TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan(); + return datafeed.defaultFrequency(bucketSpan); + } + return frequency; } private static DataDescription buildDataDescription(Job job) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidator.java index 0e4dc939a33..e4e23764b0c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidator.java @@ -29,6 +29,7 @@ public final class DatafeedJobValidator { if (datafeedConfig.hasAggregations()) { checkSummaryCountFieldNameIsSet(analysisConfig); checkValidHistogramInterval(datafeedConfig, analysisConfig); + checkFrequencyIsMultipleOfHistogramInterval(datafeedConfig); } } @@ -55,6 +56,18 @@ public final class DatafeedJobValidator { TimeValue.timeValueMillis(histogramIntervalMillis).getStringRep(), TimeValue.timeValueMillis(bucketSpanMillis).getStringRep())); } + } + private static void checkFrequencyIsMultipleOfHistogramInterval(DatafeedConfig datafeedConfig) { + TimeValue frequency = datafeedConfig.getFrequency(); + if (frequency != null) { + long histogramIntervalMillis = datafeedConfig.getHistogramIntervalMillis(); + long frequencyMillis = frequency.millis(); + if (frequencyMillis % histogramIntervalMillis != 0) { + throw ExceptionsHelper.badRequestException(Messages.getMessage( + Messages.DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL, + frequency, TimeValue.timeValueMillis(histogramIntervalMillis).getStringRep())); + } + } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DefaultFrequency.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DefaultFrequency.java deleted file mode 100644 index c94c0207af8..00000000000 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DefaultFrequency.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.datafeed; - -import java.time.Duration; - -/** - * Factory methods for a sensible default for the datafeed frequency - */ -public final class DefaultFrequency { - private static final int SECONDS_IN_MINUTE = 60; - private static final int TWO_MINS_SECONDS = 2 * SECONDS_IN_MINUTE; - private static final int TWENTY_MINS_SECONDS = 20 * SECONDS_IN_MINUTE; - private static final int HALF_DAY_SECONDS = 12 * 60 * SECONDS_IN_MINUTE; - private static final Duration TEN_MINUTES = Duration.ofMinutes(10); - private static final Duration ONE_HOUR = Duration.ofHours(1); - - private DefaultFrequency() { - // Do nothing - } - - /** - * Creates a sensible default frequency for a given bucket span. - *

- * The default depends on the bucket span: - *

    - *
  • <= 2 mins -> 1 min
  • - *
  • <= 20 mins -> bucket span / 2
  • - *
  • <= 12 hours -> 10 mins
  • - *
  • > 12 hours -> 1 hour
  • - *
- * - * @param bucketSpanSeconds the bucket span in seconds - * @return the default frequency - */ - public static Duration ofBucketSpan(long bucketSpanSeconds) { - if (bucketSpanSeconds <= 0) { - throw new IllegalArgumentException("Bucket span has to be > 0"); - } - - if (bucketSpanSeconds <= TWO_MINS_SECONDS) { - return Duration.ofSeconds(SECONDS_IN_MINUTE); - } - if (bucketSpanSeconds <= TWENTY_MINS_SECONDS) { - return Duration.ofSeconds(bucketSpanSeconds / 2); - } - if (bucketSpanSeconds <= HALF_DAY_SECONDS) { - return TEN_MINUTES; - } - return ONE_HOUR; - } -} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java index 255a0ec865b..c7cd36e13ad 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java @@ -39,6 +39,8 @@ public final class Messages { public static final String DATAFEED_DATA_HISTOGRAM_MUST_HAVE_NESTED_MAX_AGGREGATION = "Date histogram must have nested max aggregation for time_field [{0}]"; public static final String DATAFEED_MISSING_MAX_AGGREGATION_FOR_TIME_FIELD = "Missing max aggregation for time_field [{0}]"; + public static final String DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL = + "Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]"; public static final String INCONSISTENT_ID = "Inconsistent {0}; ''{1}'' specified in the body differs from ''{2}'' specified as a URL argument"; @@ -58,7 +60,7 @@ public final class Messages { public static final String JOB_AUDIT_DATAFEED_LOOKBACK_NO_DATA = "Datafeed lookback retrieved no data"; public static final String JOB_AUDIT_DATAFEED_NO_DATA = "Datafeed has been retrieving no data for a while"; public static final String JOB_AUDIT_DATAFEED_RECOVERED = "Datafeed has recovered data extraction and analysis"; - public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1})"; + public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]"; public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time"; public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped"; public static final String JOB_AUDIT_DELETED = "Job deleted"; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java index 57be39f7612..5bd0455b6ae 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.datafeed; import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; - import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; @@ -33,7 +32,6 @@ import org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig.Mode; -import org.elasticsearch.xpack.ml.job.config.JobTests; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.joda.time.DateTimeZone; @@ -79,23 +77,29 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase bucketSpanMillis ? bucketSpanMillis : interval; - interval = interval <= 0 ? 1 : interval; + aggHistogramInterval = randomNonNegativeLong(); + aggHistogramInterval = aggHistogramInterval> bucketSpanMillis ? bucketSpanMillis : aggHistogramInterval; + aggHistogramInterval = aggHistogramInterval <= 0 ? 1 : aggHistogramInterval; MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); - aggs.addAggregator(AggregationBuilders.dateHistogram("buckets").interval(interval).subAggregation(maxTime).field("time")); + aggs.addAggregator(AggregationBuilders.dateHistogram("buckets") + .interval(aggHistogramInterval).subAggregation(maxTime).field("time")); builder.setAggregations(aggs); } if (randomBoolean()) { builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE)); } if (randomBoolean()) { - builder.setFrequency(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000))); + if (aggHistogramInterval == null) { + builder.setFrequency(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000))); + } else { + builder.setFrequency(TimeValue.timeValueMillis(randomIntBetween(1, 5) * aggHistogramInterval)); + } } if (randomBoolean()) { builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, 1_000_000))); @@ -398,6 +402,90 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase datafeed.defaultFrequency(TimeValue.timeValueSeconds(-1))); + } + + public void testDefaultFrequency_GivenNoAggregations() { + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("feed", "job"); + datafeedBuilder.setIndices(Arrays.asList("my_index")); + DatafeedConfig datafeed = datafeedBuilder.build(); + + assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(1))); + assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(30))); + assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(60))); + assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(90))); + assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(120))); + assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(121))); + + assertEquals(TimeValue.timeValueSeconds(61), datafeed.defaultFrequency(TimeValue.timeValueSeconds(122))); + assertEquals(TimeValue.timeValueSeconds(75), datafeed.defaultFrequency(TimeValue.timeValueSeconds(150))); + assertEquals(TimeValue.timeValueSeconds(150), datafeed.defaultFrequency(TimeValue.timeValueSeconds(300))); + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueSeconds(1200))); + + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueSeconds(1201))); + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueSeconds(1800))); + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueHours(1))); + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueHours(2))); + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueHours(12))); + + assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(12 * 3600 + 1))); + assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(13))); + assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(24))); + assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(48))); + } + + public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Second() { + DatafeedConfig datafeed = createDatafeedWithDateHistogram("1s"); + + assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(60))); + assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(90))); + assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(120))); + assertEquals(TimeValue.timeValueSeconds(125), datafeed.defaultFrequency(TimeValue.timeValueSeconds(250))); + assertEquals(TimeValue.timeValueSeconds(250), datafeed.defaultFrequency(TimeValue.timeValueSeconds(500))); + + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueHours(1))); + assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(13))); + } + + public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Minute() { + DatafeedConfig datafeed = createDatafeedWithDateHistogram("1m"); + + assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(60))); + assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(90))); + assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(120))); + assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(180))); + assertEquals(TimeValue.timeValueMinutes(2), datafeed.defaultFrequency(TimeValue.timeValueSeconds(240))); + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueMinutes(20))); + + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueSeconds(20 * 60 + 1))); + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueHours(6))); + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueHours(12))); + + assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(13))); + assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(72))); + } + + public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_10_Minutes() { + DatafeedConfig datafeed = createDatafeedWithDateHistogram("10m"); + + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueMinutes(10))); + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueMinutes(20))); + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueMinutes(30))); + assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueMinutes(12 * 60))); + assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueMinutes(13 * 60))); + } + + public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Hour() { + DatafeedConfig datafeed = createDatafeedWithDateHistogram("1h"); + + assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(1))); + assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(3601))); + assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(2))); + assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(12))); + } + public static String randomValidDatafeedId() { CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); return generator.ofCodePointsLength(random(), 10, 10); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java index 92aaf1597ad..149896539de 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java @@ -141,6 +141,39 @@ public class DatafeedJobValidatorTests extends ESTestCase { DatafeedJobValidator.validate(goodDatafeedConfig, job); } + public void testVerify_FrequencyIsMultipleOfHistogramInterval() throws IOException { + Job.Builder builder = buildJobBuilder("foo"); + AnalysisConfig.Builder ac = createAnalysisConfig(); + ac.setSummaryCountFieldName("some_count"); + ac.setBucketSpan(TimeValue.timeValueMinutes(5)); + builder.setAnalysisConfig(ac); + Job job = builder.build(new Date()); + DatafeedConfig.Builder datafeedBuilder = createValidDatafeedConfigWithAggs(60 * 1000); + + // Check with multiples + datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(60)); + DatafeedJobValidator.validate(datafeedBuilder.build(), job); + datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(120)); + DatafeedJobValidator.validate(datafeedBuilder.build(), job); + datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(180)); + DatafeedJobValidator.validate(datafeedBuilder.build(), job); + datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(240)); + DatafeedJobValidator.validate(datafeedBuilder.build(), job); + datafeedBuilder.setFrequency(TimeValue.timeValueHours(1)); + DatafeedJobValidator.validate(datafeedBuilder.build(), job); + + // Now non-multiples + datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(30)); + ElasticsearchStatusException e = ESTestCase.expectThrows(ElasticsearchStatusException.class, + () -> DatafeedJobValidator.validate(datafeedBuilder.build(), job)); + assertEquals("Datafeed frequency [30s] must be a multiple of the aggregation interval [60000ms]", e.getMessage()); + + datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(90)); + e = ESTestCase.expectThrows(ElasticsearchStatusException.class, + () -> DatafeedJobValidator.validate(datafeedBuilder.build(), job)); + assertEquals("Datafeed frequency [1.5m] must be a multiple of the aggregation interval [60000ms]", e.getMessage()); + } + private static Job.Builder buildJobBuilder(String id) { Job.Builder builder = new Job.Builder(id); AnalysisConfig.Builder ac = createAnalysisConfig(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DefaultFrequencyTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DefaultFrequencyTests.java deleted file mode 100644 index 29f5fc4a812..00000000000 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DefaultFrequencyTests.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.datafeed; - -import org.elasticsearch.test.ESTestCase; - -import java.time.Duration; - -public class DefaultFrequencyTests extends ESTestCase { - - public void testCalc_GivenNegative() { - ESTestCase.expectThrows(IllegalArgumentException.class, () -> DefaultFrequency.ofBucketSpan(-1)); - } - - - public void testCalc() { - assertEquals(Duration.ofMinutes(1), DefaultFrequency.ofBucketSpan(1)); - assertEquals(Duration.ofMinutes(1), DefaultFrequency.ofBucketSpan(30)); - assertEquals(Duration.ofMinutes(1), DefaultFrequency.ofBucketSpan(60)); - assertEquals(Duration.ofMinutes(1), DefaultFrequency.ofBucketSpan(90)); - assertEquals(Duration.ofMinutes(1), DefaultFrequency.ofBucketSpan(120)); - assertEquals(Duration.ofMinutes(1), DefaultFrequency.ofBucketSpan(121)); - - assertEquals(Duration.ofSeconds(61), DefaultFrequency.ofBucketSpan(122)); - assertEquals(Duration.ofSeconds(75), DefaultFrequency.ofBucketSpan(150)); - assertEquals(Duration.ofSeconds(150), DefaultFrequency.ofBucketSpan(300)); - assertEquals(Duration.ofMinutes(10), DefaultFrequency.ofBucketSpan(1200)); - - assertEquals(Duration.ofMinutes(10), DefaultFrequency.ofBucketSpan(1201)); - assertEquals(Duration.ofMinutes(10), DefaultFrequency.ofBucketSpan(1800)); - assertEquals(Duration.ofMinutes(10), DefaultFrequency.ofBucketSpan(3600)); - assertEquals(Duration.ofMinutes(10), DefaultFrequency.ofBucketSpan(7200)); - assertEquals(Duration.ofMinutes(10), DefaultFrequency.ofBucketSpan(12 * 3600)); - - assertEquals(Duration.ofHours(1), DefaultFrequency.ofBucketSpan(12 * 3600 + 1)); - assertEquals(Duration.ofHours(1), DefaultFrequency.ofBucketSpan(13 * 3600)); - assertEquals(Duration.ofHours(1), DefaultFrequency.ofBucketSpan(24 * 3600)); - assertEquals(Duration.ofHours(1), DefaultFrequency.ofBucketSpan(48 * 3600)); - } -} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 0c6635bf71c..738c18c8999 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -96,7 +96,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { DatafeedConfig.Builder configBuilder = createDatafeedBuilder("data_feed_id", job.getId(), Collections.singletonList("*")); MaxAggregationBuilder maxAggregation = AggregationBuilders.max("time").field("time"); - HistogramAggregationBuilder histogramAggregation = AggregationBuilders.histogram("time").interval(300000) + HistogramAggregationBuilder histogramAggregation = AggregationBuilders.histogram("time").interval(60000) .subAggregation(maxAggregation).field("time"); configBuilder.setAggregations(AggregatorFactories.builder().addAggregator(histogramAggregation));