[ML] Frequency in datafeeds with aggs must be multiple of hist interval (elastic/x-pack-elasticsearch#3205)
relates elastic/x-pack-elasticsearch#3204 Original commit: elastic/x-pack-elasticsearch@0bbd9addd4
This commit is contained in:
parent
d39c8b76db
commit
30b745f846
|
@ -54,6 +54,11 @@ import java.util.concurrent.TimeUnit;
|
||||||
*/
|
*/
|
||||||
public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements ToXContentObject {
|
public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements ToXContentObject {
|
||||||
|
|
||||||
|
private static final int SECONDS_IN_MINUTE = 60;
|
||||||
|
private static final int TWO_MINS_SECONDS = 2 * SECONDS_IN_MINUTE;
|
||||||
|
private static final int TWENTY_MINS_SECONDS = 20 * SECONDS_IN_MINUTE;
|
||||||
|
private static final int HALF_DAY_SECONDS = 12 * 60 * SECONDS_IN_MINUTE;
|
||||||
|
|
||||||
// Used for QueryPage
|
// Used for QueryPage
|
||||||
public static final ParseField RESULTS_FIELD = new ParseField("datafeeds");
|
public static final ParseField RESULTS_FIELD = new ParseField("datafeeds");
|
||||||
|
|
||||||
|
@ -350,6 +355,53 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
|
||||||
return Strings.toString(this);
|
return Strings.toString(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculates a sensible default frequency for a given bucket span.
|
||||||
|
* <p>
|
||||||
|
* The default depends on the bucket span:
|
||||||
|
* <ul>
|
||||||
|
* <li> <= 2 mins -> 1 min</li>
|
||||||
|
* <li> <= 20 mins -> bucket span / 2</li>
|
||||||
|
* <li> <= 12 hours -> 10 mins</li>
|
||||||
|
* <li> > 12 hours -> 1 hour</li>
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
* If the datafeed has aggregations, the default frequency is the
|
||||||
|
* closest multiple of the histogram interval based on the rules above.
|
||||||
|
*
|
||||||
|
* @param bucketSpan the bucket span
|
||||||
|
* @return the default frequency
|
||||||
|
*/
|
||||||
|
public TimeValue defaultFrequency(TimeValue bucketSpan) {
|
||||||
|
TimeValue defaultFrequency = defaultFrequencyTarget(bucketSpan);
|
||||||
|
if (hasAggregations()) {
|
||||||
|
long histogramIntervalMillis = getHistogramIntervalMillis();
|
||||||
|
long targetFrequencyMillis = defaultFrequency.millis();
|
||||||
|
long defaultFrequencyMillis = histogramIntervalMillis > targetFrequencyMillis ? histogramIntervalMillis
|
||||||
|
: (targetFrequencyMillis / histogramIntervalMillis) * histogramIntervalMillis;
|
||||||
|
defaultFrequency = TimeValue.timeValueMillis(defaultFrequencyMillis);
|
||||||
|
}
|
||||||
|
return defaultFrequency;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TimeValue defaultFrequencyTarget(TimeValue bucketSpan) {
|
||||||
|
long bucketSpanSeconds = bucketSpan.seconds();
|
||||||
|
if (bucketSpanSeconds <= 0) {
|
||||||
|
throw new IllegalArgumentException("Bucket span has to be > 0");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bucketSpanSeconds <= TWO_MINS_SECONDS) {
|
||||||
|
return TimeValue.timeValueSeconds(SECONDS_IN_MINUTE);
|
||||||
|
}
|
||||||
|
if (bucketSpanSeconds <= TWENTY_MINS_SECONDS) {
|
||||||
|
return TimeValue.timeValueSeconds(bucketSpanSeconds / 2);
|
||||||
|
}
|
||||||
|
if (bucketSpanSeconds <= HALF_DAY_SECONDS) {
|
||||||
|
return TimeValue.timeValueMinutes(10);
|
||||||
|
}
|
||||||
|
return TimeValue.timeValueHours(1);
|
||||||
|
}
|
||||||
|
|
||||||
public static class Builder {
|
public static class Builder {
|
||||||
|
|
||||||
private static final int DEFAULT_SCROLL_SIZE = 1000;
|
private static final int DEFAULT_SCROLL_SIZE = 1000;
|
||||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.io.Streams;
|
import org.elasticsearch.common.io.Streams;
|
||||||
import org.elasticsearch.common.logging.Loggers;
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.mapper.DateFieldMapper;
|
import org.elasticsearch.index.mapper.DateFieldMapper;
|
||||||
|
@ -96,9 +97,10 @@ class DatafeedJob {
|
||||||
|
|
||||||
String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STARTED_FROM_TO,
|
String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STARTED_FROM_TO,
|
||||||
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackStartTimeMs),
|
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackStartTimeMs),
|
||||||
endTime == null ? "real-time" : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackEnd));
|
endTime == null ? "real-time" : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackEnd),
|
||||||
|
TimeValue.timeValueMillis(frequencyMs).getStringRep());
|
||||||
auditor.info(jobId, msg);
|
auditor.info(jobId, msg);
|
||||||
LOGGER.info("[" + jobId + "] " + msg);
|
LOGGER.info("[{}] {}", jobId, msg);
|
||||||
|
|
||||||
|
|
||||||
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
|
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
|
||||||
|
|
|
@ -20,7 +20,6 @@ import org.elasticsearch.xpack.ml.job.results.Bucket;
|
||||||
import org.elasticsearch.xpack.ml.job.results.Result;
|
import org.elasticsearch.xpack.ml.job.results.Result;
|
||||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
@ -47,9 +46,9 @@ public class DatafeedJobBuilder {
|
||||||
|
|
||||||
// Step 5. Build datafeed job object
|
// Step 5. Build datafeed job object
|
||||||
Consumer<Context> contextHanlder = context -> {
|
Consumer<Context> contextHanlder = context -> {
|
||||||
Duration frequency = getFrequencyOrDefault(datafeed, job);
|
TimeValue frequency = getFrequencyOrDefault(datafeed, job);
|
||||||
Duration queryDelay = Duration.ofMillis(datafeed.getQueryDelay().millis());
|
TimeValue queryDelay = datafeed.getQueryDelay();
|
||||||
DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(),
|
DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.millis(), queryDelay.millis(),
|
||||||
context.dataExtractorFactory, client, auditor, currentTimeSupplier,
|
context.dataExtractorFactory, client, auditor, currentTimeSupplier,
|
||||||
context.latestFinalBucketEndMs, context.latestRecordTimeMs);
|
context.latestFinalBucketEndMs, context.latestRecordTimeMs);
|
||||||
listener.onResponse(datafeedJob);
|
listener.onResponse(datafeedJob);
|
||||||
|
@ -100,10 +99,13 @@ public class DatafeedJobBuilder {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Duration getFrequencyOrDefault(DatafeedConfig datafeed, Job job) {
|
private static TimeValue getFrequencyOrDefault(DatafeedConfig datafeed, Job job) {
|
||||||
TimeValue frequency = datafeed.getFrequency();
|
TimeValue frequency = datafeed.getFrequency();
|
||||||
|
if (frequency == null) {
|
||||||
TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
|
TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
|
||||||
return frequency == null ? DefaultFrequency.ofBucketSpan(bucketSpan.seconds()) : Duration.ofSeconds(frequency.seconds());
|
return datafeed.defaultFrequency(bucketSpan);
|
||||||
|
}
|
||||||
|
return frequency;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static DataDescription buildDataDescription(Job job) {
|
private static DataDescription buildDataDescription(Job job) {
|
||||||
|
|
|
@ -29,6 +29,7 @@ public final class DatafeedJobValidator {
|
||||||
if (datafeedConfig.hasAggregations()) {
|
if (datafeedConfig.hasAggregations()) {
|
||||||
checkSummaryCountFieldNameIsSet(analysisConfig);
|
checkSummaryCountFieldNameIsSet(analysisConfig);
|
||||||
checkValidHistogramInterval(datafeedConfig, analysisConfig);
|
checkValidHistogramInterval(datafeedConfig, analysisConfig);
|
||||||
|
checkFrequencyIsMultipleOfHistogramInterval(datafeedConfig);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,6 +56,18 @@ public final class DatafeedJobValidator {
|
||||||
TimeValue.timeValueMillis(histogramIntervalMillis).getStringRep(),
|
TimeValue.timeValueMillis(histogramIntervalMillis).getStringRep(),
|
||||||
TimeValue.timeValueMillis(bucketSpanMillis).getStringRep()));
|
TimeValue.timeValueMillis(bucketSpanMillis).getStringRep()));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void checkFrequencyIsMultipleOfHistogramInterval(DatafeedConfig datafeedConfig) {
|
||||||
|
TimeValue frequency = datafeedConfig.getFrequency();
|
||||||
|
if (frequency != null) {
|
||||||
|
long histogramIntervalMillis = datafeedConfig.getHistogramIntervalMillis();
|
||||||
|
long frequencyMillis = frequency.millis();
|
||||||
|
if (frequencyMillis % histogramIntervalMillis != 0) {
|
||||||
|
throw ExceptionsHelper.badRequestException(Messages.getMessage(
|
||||||
|
Messages.DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL,
|
||||||
|
frequency, TimeValue.timeValueMillis(histogramIntervalMillis).getStringRep()));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,55 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the Elastic License;
|
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
|
||||||
*/
|
|
||||||
package org.elasticsearch.xpack.ml.datafeed;
|
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Factory methods for a sensible default for the datafeed frequency
|
|
||||||
*/
|
|
||||||
public final class DefaultFrequency {
|
|
||||||
private static final int SECONDS_IN_MINUTE = 60;
|
|
||||||
private static final int TWO_MINS_SECONDS = 2 * SECONDS_IN_MINUTE;
|
|
||||||
private static final int TWENTY_MINS_SECONDS = 20 * SECONDS_IN_MINUTE;
|
|
||||||
private static final int HALF_DAY_SECONDS = 12 * 60 * SECONDS_IN_MINUTE;
|
|
||||||
private static final Duration TEN_MINUTES = Duration.ofMinutes(10);
|
|
||||||
private static final Duration ONE_HOUR = Duration.ofHours(1);
|
|
||||||
|
|
||||||
private DefaultFrequency() {
|
|
||||||
// Do nothing
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a sensible default frequency for a given bucket span.
|
|
||||||
* <p>
|
|
||||||
* The default depends on the bucket span:
|
|
||||||
* <ul>
|
|
||||||
* <li> <= 2 mins -> 1 min</li>
|
|
||||||
* <li> <= 20 mins -> bucket span / 2</li>
|
|
||||||
* <li> <= 12 hours -> 10 mins</li>
|
|
||||||
* <li> > 12 hours -> 1 hour</li>
|
|
||||||
* </ul>
|
|
||||||
*
|
|
||||||
* @param bucketSpanSeconds the bucket span in seconds
|
|
||||||
* @return the default frequency
|
|
||||||
*/
|
|
||||||
public static Duration ofBucketSpan(long bucketSpanSeconds) {
|
|
||||||
if (bucketSpanSeconds <= 0) {
|
|
||||||
throw new IllegalArgumentException("Bucket span has to be > 0");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (bucketSpanSeconds <= TWO_MINS_SECONDS) {
|
|
||||||
return Duration.ofSeconds(SECONDS_IN_MINUTE);
|
|
||||||
}
|
|
||||||
if (bucketSpanSeconds <= TWENTY_MINS_SECONDS) {
|
|
||||||
return Duration.ofSeconds(bucketSpanSeconds / 2);
|
|
||||||
}
|
|
||||||
if (bucketSpanSeconds <= HALF_DAY_SECONDS) {
|
|
||||||
return TEN_MINUTES;
|
|
||||||
}
|
|
||||||
return ONE_HOUR;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -39,6 +39,8 @@ public final class Messages {
|
||||||
public static final String DATAFEED_DATA_HISTOGRAM_MUST_HAVE_NESTED_MAX_AGGREGATION =
|
public static final String DATAFEED_DATA_HISTOGRAM_MUST_HAVE_NESTED_MAX_AGGREGATION =
|
||||||
"Date histogram must have nested max aggregation for time_field [{0}]";
|
"Date histogram must have nested max aggregation for time_field [{0}]";
|
||||||
public static final String DATAFEED_MISSING_MAX_AGGREGATION_FOR_TIME_FIELD = "Missing max aggregation for time_field [{0}]";
|
public static final String DATAFEED_MISSING_MAX_AGGREGATION_FOR_TIME_FIELD = "Missing max aggregation for time_field [{0}]";
|
||||||
|
public static final String DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL =
|
||||||
|
"Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]";
|
||||||
|
|
||||||
public static final String INCONSISTENT_ID =
|
public static final String INCONSISTENT_ID =
|
||||||
"Inconsistent {0}; ''{1}'' specified in the body differs from ''{2}'' specified as a URL argument";
|
"Inconsistent {0}; ''{1}'' specified in the body differs from ''{2}'' specified as a URL argument";
|
||||||
|
@ -58,7 +60,7 @@ public final class Messages {
|
||||||
public static final String JOB_AUDIT_DATAFEED_LOOKBACK_NO_DATA = "Datafeed lookback retrieved no data";
|
public static final String JOB_AUDIT_DATAFEED_LOOKBACK_NO_DATA = "Datafeed lookback retrieved no data";
|
||||||
public static final String JOB_AUDIT_DATAFEED_NO_DATA = "Datafeed has been retrieving no data for a while";
|
public static final String JOB_AUDIT_DATAFEED_NO_DATA = "Datafeed has been retrieving no data for a while";
|
||||||
public static final String JOB_AUDIT_DATAFEED_RECOVERED = "Datafeed has recovered data extraction and analysis";
|
public static final String JOB_AUDIT_DATAFEED_RECOVERED = "Datafeed has recovered data extraction and analysis";
|
||||||
public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1})";
|
public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]";
|
||||||
public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time";
|
public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time";
|
||||||
public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped";
|
public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped";
|
||||||
public static final String JOB_AUDIT_DELETED = "Job deleted";
|
public static final String JOB_AUDIT_DELETED = "Job deleted";
|
||||||
|
|
|
@ -6,7 +6,6 @@
|
||||||
package org.elasticsearch.xpack.ml.datafeed;
|
package org.elasticsearch.xpack.ml.datafeed;
|
||||||
|
|
||||||
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
|
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
|
@ -33,7 +32,6 @@ import org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField;
|
||||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig.Mode;
|
import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig.Mode;
|
||||||
import org.elasticsearch.xpack.ml.job.config.JobTests;
|
|
||||||
import org.elasticsearch.xpack.ml.job.messages.Messages;
|
import org.elasticsearch.xpack.ml.job.messages.Messages;
|
||||||
import org.joda.time.DateTimeZone;
|
import org.joda.time.DateTimeZone;
|
||||||
|
|
||||||
|
@ -79,23 +77,29 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
||||||
}
|
}
|
||||||
builder.setScriptFields(scriptFields);
|
builder.setScriptFields(scriptFields);
|
||||||
}
|
}
|
||||||
|
Long aggHistogramInterval = null;
|
||||||
if (randomBoolean() && addScriptFields == false) {
|
if (randomBoolean() && addScriptFields == false) {
|
||||||
// can only test with a single agg as the xcontent order gets randomized by test base class and then
|
// can only test with a single agg as the xcontent order gets randomized by test base class and then
|
||||||
// the actual xcontent isn't the same and test fail.
|
// the actual xcontent isn't the same and test fail.
|
||||||
// Testing with a single agg is ok as we don't have special list writeable / xconent logic
|
// Testing with a single agg is ok as we don't have special list writeable / xconent logic
|
||||||
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
|
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
|
||||||
long interval = randomNonNegativeLong();
|
aggHistogramInterval = randomNonNegativeLong();
|
||||||
interval = interval > bucketSpanMillis ? bucketSpanMillis : interval;
|
aggHistogramInterval = aggHistogramInterval> bucketSpanMillis ? bucketSpanMillis : aggHistogramInterval;
|
||||||
interval = interval <= 0 ? 1 : interval;
|
aggHistogramInterval = aggHistogramInterval <= 0 ? 1 : aggHistogramInterval;
|
||||||
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
|
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
|
||||||
aggs.addAggregator(AggregationBuilders.dateHistogram("buckets").interval(interval).subAggregation(maxTime).field("time"));
|
aggs.addAggregator(AggregationBuilders.dateHistogram("buckets")
|
||||||
|
.interval(aggHistogramInterval).subAggregation(maxTime).field("time"));
|
||||||
builder.setAggregations(aggs);
|
builder.setAggregations(aggs);
|
||||||
}
|
}
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
|
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
|
||||||
}
|
}
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
|
if (aggHistogramInterval == null) {
|
||||||
builder.setFrequency(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000)));
|
builder.setFrequency(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000)));
|
||||||
|
} else {
|
||||||
|
builder.setFrequency(TimeValue.timeValueMillis(randomIntBetween(1, 5) * aggHistogramInterval));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, 1_000_000)));
|
builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, 1_000_000)));
|
||||||
|
@ -398,6 +402,90 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
||||||
assertEquals("Aggregations can only have 1 date_histogram or histogram aggregation", e.getMessage());
|
assertEquals("Aggregations can only have 1 date_histogram or histogram aggregation", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testDefaultFrequency_GivenNegative() {
|
||||||
|
DatafeedConfig datafeed = createTestInstance();
|
||||||
|
ESTestCase.expectThrows(IllegalArgumentException.class, () -> datafeed.defaultFrequency(TimeValue.timeValueSeconds(-1)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDefaultFrequency_GivenNoAggregations() {
|
||||||
|
DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("feed", "job");
|
||||||
|
datafeedBuilder.setIndices(Arrays.asList("my_index"));
|
||||||
|
DatafeedConfig datafeed = datafeedBuilder.build();
|
||||||
|
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(1)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(30)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(60)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(90)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(120)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(121)));
|
||||||
|
|
||||||
|
assertEquals(TimeValue.timeValueSeconds(61), datafeed.defaultFrequency(TimeValue.timeValueSeconds(122)));
|
||||||
|
assertEquals(TimeValue.timeValueSeconds(75), datafeed.defaultFrequency(TimeValue.timeValueSeconds(150)));
|
||||||
|
assertEquals(TimeValue.timeValueSeconds(150), datafeed.defaultFrequency(TimeValue.timeValueSeconds(300)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueSeconds(1200)));
|
||||||
|
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueSeconds(1201)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueSeconds(1800)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueHours(1)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueHours(2)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueHours(12)));
|
||||||
|
|
||||||
|
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(12 * 3600 + 1)));
|
||||||
|
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(13)));
|
||||||
|
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(24)));
|
||||||
|
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(48)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Second() {
|
||||||
|
DatafeedConfig datafeed = createDatafeedWithDateHistogram("1s");
|
||||||
|
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(60)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(90)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(120)));
|
||||||
|
assertEquals(TimeValue.timeValueSeconds(125), datafeed.defaultFrequency(TimeValue.timeValueSeconds(250)));
|
||||||
|
assertEquals(TimeValue.timeValueSeconds(250), datafeed.defaultFrequency(TimeValue.timeValueSeconds(500)));
|
||||||
|
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueHours(1)));
|
||||||
|
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(13)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Minute() {
|
||||||
|
DatafeedConfig datafeed = createDatafeedWithDateHistogram("1m");
|
||||||
|
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(60)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(90)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(120)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(180)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(2), datafeed.defaultFrequency(TimeValue.timeValueSeconds(240)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueMinutes(20)));
|
||||||
|
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueSeconds(20 * 60 + 1)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueHours(6)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueHours(12)));
|
||||||
|
|
||||||
|
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(13)));
|
||||||
|
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(72)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_10_Minutes() {
|
||||||
|
DatafeedConfig datafeed = createDatafeedWithDateHistogram("10m");
|
||||||
|
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueMinutes(10)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueMinutes(20)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueMinutes(30)));
|
||||||
|
assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueMinutes(12 * 60)));
|
||||||
|
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueMinutes(13 * 60)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Hour() {
|
||||||
|
DatafeedConfig datafeed = createDatafeedWithDateHistogram("1h");
|
||||||
|
|
||||||
|
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(1)));
|
||||||
|
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(3601)));
|
||||||
|
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(2)));
|
||||||
|
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(12)));
|
||||||
|
}
|
||||||
|
|
||||||
public static String randomValidDatafeedId() {
|
public static String randomValidDatafeedId() {
|
||||||
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
|
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
|
||||||
return generator.ofCodePointsLength(random(), 10, 10);
|
return generator.ofCodePointsLength(random(), 10, 10);
|
||||||
|
|
|
@ -141,6 +141,39 @@ public class DatafeedJobValidatorTests extends ESTestCase {
|
||||||
DatafeedJobValidator.validate(goodDatafeedConfig, job);
|
DatafeedJobValidator.validate(goodDatafeedConfig, job);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testVerify_FrequencyIsMultipleOfHistogramInterval() throws IOException {
|
||||||
|
Job.Builder builder = buildJobBuilder("foo");
|
||||||
|
AnalysisConfig.Builder ac = createAnalysisConfig();
|
||||||
|
ac.setSummaryCountFieldName("some_count");
|
||||||
|
ac.setBucketSpan(TimeValue.timeValueMinutes(5));
|
||||||
|
builder.setAnalysisConfig(ac);
|
||||||
|
Job job = builder.build(new Date());
|
||||||
|
DatafeedConfig.Builder datafeedBuilder = createValidDatafeedConfigWithAggs(60 * 1000);
|
||||||
|
|
||||||
|
// Check with multiples
|
||||||
|
datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(60));
|
||||||
|
DatafeedJobValidator.validate(datafeedBuilder.build(), job);
|
||||||
|
datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(120));
|
||||||
|
DatafeedJobValidator.validate(datafeedBuilder.build(), job);
|
||||||
|
datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(180));
|
||||||
|
DatafeedJobValidator.validate(datafeedBuilder.build(), job);
|
||||||
|
datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(240));
|
||||||
|
DatafeedJobValidator.validate(datafeedBuilder.build(), job);
|
||||||
|
datafeedBuilder.setFrequency(TimeValue.timeValueHours(1));
|
||||||
|
DatafeedJobValidator.validate(datafeedBuilder.build(), job);
|
||||||
|
|
||||||
|
// Now non-multiples
|
||||||
|
datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(30));
|
||||||
|
ElasticsearchStatusException e = ESTestCase.expectThrows(ElasticsearchStatusException.class,
|
||||||
|
() -> DatafeedJobValidator.validate(datafeedBuilder.build(), job));
|
||||||
|
assertEquals("Datafeed frequency [30s] must be a multiple of the aggregation interval [60000ms]", e.getMessage());
|
||||||
|
|
||||||
|
datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(90));
|
||||||
|
e = ESTestCase.expectThrows(ElasticsearchStatusException.class,
|
||||||
|
() -> DatafeedJobValidator.validate(datafeedBuilder.build(), job));
|
||||||
|
assertEquals("Datafeed frequency [1.5m] must be a multiple of the aggregation interval [60000ms]", e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
private static Job.Builder buildJobBuilder(String id) {
|
private static Job.Builder buildJobBuilder(String id) {
|
||||||
Job.Builder builder = new Job.Builder(id);
|
Job.Builder builder = new Job.Builder(id);
|
||||||
AnalysisConfig.Builder ac = createAnalysisConfig();
|
AnalysisConfig.Builder ac = createAnalysisConfig();
|
||||||
|
|
|
@ -1,43 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the Elastic License;
|
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
|
||||||
*/
|
|
||||||
package org.elasticsearch.xpack.ml.datafeed;
|
|
||||||
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
|
|
||||||
public class DefaultFrequencyTests extends ESTestCase {
|
|
||||||
|
|
||||||
public void testCalc_GivenNegative() {
|
|
||||||
ESTestCase.expectThrows(IllegalArgumentException.class, () -> DefaultFrequency.ofBucketSpan(-1));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public void testCalc() {
|
|
||||||
assertEquals(Duration.ofMinutes(1), DefaultFrequency.ofBucketSpan(1));
|
|
||||||
assertEquals(Duration.ofMinutes(1), DefaultFrequency.ofBucketSpan(30));
|
|
||||||
assertEquals(Duration.ofMinutes(1), DefaultFrequency.ofBucketSpan(60));
|
|
||||||
assertEquals(Duration.ofMinutes(1), DefaultFrequency.ofBucketSpan(90));
|
|
||||||
assertEquals(Duration.ofMinutes(1), DefaultFrequency.ofBucketSpan(120));
|
|
||||||
assertEquals(Duration.ofMinutes(1), DefaultFrequency.ofBucketSpan(121));
|
|
||||||
|
|
||||||
assertEquals(Duration.ofSeconds(61), DefaultFrequency.ofBucketSpan(122));
|
|
||||||
assertEquals(Duration.ofSeconds(75), DefaultFrequency.ofBucketSpan(150));
|
|
||||||
assertEquals(Duration.ofSeconds(150), DefaultFrequency.ofBucketSpan(300));
|
|
||||||
assertEquals(Duration.ofMinutes(10), DefaultFrequency.ofBucketSpan(1200));
|
|
||||||
|
|
||||||
assertEquals(Duration.ofMinutes(10), DefaultFrequency.ofBucketSpan(1201));
|
|
||||||
assertEquals(Duration.ofMinutes(10), DefaultFrequency.ofBucketSpan(1800));
|
|
||||||
assertEquals(Duration.ofMinutes(10), DefaultFrequency.ofBucketSpan(3600));
|
|
||||||
assertEquals(Duration.ofMinutes(10), DefaultFrequency.ofBucketSpan(7200));
|
|
||||||
assertEquals(Duration.ofMinutes(10), DefaultFrequency.ofBucketSpan(12 * 3600));
|
|
||||||
|
|
||||||
assertEquals(Duration.ofHours(1), DefaultFrequency.ofBucketSpan(12 * 3600 + 1));
|
|
||||||
assertEquals(Duration.ofHours(1), DefaultFrequency.ofBucketSpan(13 * 3600));
|
|
||||||
assertEquals(Duration.ofHours(1), DefaultFrequency.ofBucketSpan(24 * 3600));
|
|
||||||
assertEquals(Duration.ofHours(1), DefaultFrequency.ofBucketSpan(48 * 3600));
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -96,7 +96,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
||||||
DatafeedConfig.Builder configBuilder = createDatafeedBuilder("data_feed_id", job.getId(), Collections.singletonList("*"));
|
DatafeedConfig.Builder configBuilder = createDatafeedBuilder("data_feed_id", job.getId(), Collections.singletonList("*"));
|
||||||
|
|
||||||
MaxAggregationBuilder maxAggregation = AggregationBuilders.max("time").field("time");
|
MaxAggregationBuilder maxAggregation = AggregationBuilders.max("time").field("time");
|
||||||
HistogramAggregationBuilder histogramAggregation = AggregationBuilders.histogram("time").interval(300000)
|
HistogramAggregationBuilder histogramAggregation = AggregationBuilders.histogram("time").interval(60000)
|
||||||
.subAggregation(maxAggregation).field("time");
|
.subAggregation(maxAggregation).field("time");
|
||||||
|
|
||||||
configBuilder.setAggregations(AggregatorFactories.builder().addAggregator(histogramAggregation));
|
configBuilder.setAggregations(AggregatorFactories.builder().addAggregator(histogramAggregation));
|
||||||
|
|
Loading…
Reference in New Issue