diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java index e6836e2f8a2..cc63528649e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java @@ -212,7 +212,11 @@ public class DatafeedConfig extends AbstractDiffable implements * The method expects a valid top level aggregation to exist. */ public long getHistogramIntervalMillis() { - AggregationBuilder topLevelAgg = getTopLevelAgg(); + return getHistogramIntervalMillis(aggregations); + } + + private static long getHistogramIntervalMillis(AggregatorFactories.Builder aggregations) { + AggregationBuilder topLevelAgg = getTopLevelAgg(aggregations); if (topLevelAgg == null) { throw new IllegalStateException("No aggregations exist"); } @@ -225,7 +229,7 @@ public class DatafeedConfig extends AbstractDiffable implements } } - private AggregationBuilder getTopLevelAgg() { + private static AggregationBuilder getTopLevelAgg(AggregatorFactories.Builder aggregations) { if (aggregations == null || aggregations.getAggregatorFactories().isEmpty()) { return null; } @@ -420,6 +424,7 @@ public class DatafeedConfig extends AbstractDiffable implements private static final int DEFAULT_SCROLL_SIZE = 1000; private static final TimeValue DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(1); + private static final int DEFAULT_AGGREGATION_CHUNKING_BUCKETS = 1000; private String id; private String jobId; @@ -531,6 +536,7 @@ public class DatafeedConfig extends AbstractDiffable implements throw invalidOptionValue(TYPES.getPreferredName(), types); } validateAggregations(); + setDefaultChunkingConfig(); return new DatafeedConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize, source, chunkingConfig); } @@ -560,6 +566,18 @@ public class DatafeedConfig extends AbstractDiffable implements } } + private void setDefaultChunkingConfig() { + if (chunkingConfig == null) { + if (aggregations == null) { + chunkingConfig = ChunkingConfig.newAuto(); + } else { + long histogramIntervalMillis = getHistogramIntervalMillis(aggregations); + chunkingConfig = ChunkingConfig.newManual(TimeValue.timeValueMillis( + DEFAULT_AGGREGATION_CHUNKING_BUCKETS * histogramIntervalMillis)); + } + } + } + private static ElasticsearchException invalidOptionValue(String fieldName, Object value) { String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, fieldName, value); throw new IllegalArgumentException(msg); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java index dd4849df6e6..79429c64d94 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.datafeed.extractor; import org.elasticsearch.client.Client; -import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory; @@ -23,12 +22,7 @@ public interface DataExtractorFactory { boolean isScrollSearch = datafeedConfig.hasAggregations() == false; DataExtractorFactory dataExtractorFactory = isScrollSearch ? new ScrollDataExtractorFactory(client, datafeedConfig, job) : new AggregationDataExtractorFactory(client, datafeedConfig, job); - ChunkingConfig chunkingConfig = datafeedConfig.getChunkingConfig(); - if (chunkingConfig == null) { - chunkingConfig = isScrollSearch ? ChunkingConfig.newAuto() : ChunkingConfig.newOff(); - } - - return chunkingConfig.isEnabled() ? new ChunkedDataExtractorFactory(client, datafeedConfig, job, dataExtractorFactory) - : dataExtractorFactory; + return datafeedConfig.getChunkingConfig().isEnabled() ? new ChunkedDataExtractorFactory( + client, datafeedConfig, job, dataExtractorFactory) : dataExtractorFactory; } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java index 20417edb22d..17208b201b3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java @@ -38,7 +38,7 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory { datafeedConfig.getScrollSize(), start, end, - datafeedConfig.getChunkingConfig() == null ? null : datafeedConfig.getChunkingConfig().getTimeSpan()); + datafeedConfig.getChunkingConfig().getTimeSpan()); return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java index edcec740638..137a3763336 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java @@ -292,6 +292,20 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase