From 1e1b5405b3daadcb4fe3aa0004ae11350d16f03d Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Mon, 10 Apr 2017 18:20:48 +0100 Subject: [PATCH] [ML] Also chunk aggregated datafeed by default (elastic/x-pack-elasticsearch#999) The change applies chunking by default on aggregated datafeeds. The chunking is set to a manual mode with time_span being 1000 histogram buckets. The motivation for the change is two-fold: 1. It helps to avoid memory pressure/blowing. Users may perform a lookback on a very long period of time. In that case, we may hold a search response for all that time which could include too many buckets. By chunking, we avoid that situation as we know we'll only keep results for 1000 buckets at a time. 2. It makes cancellation more responsive. In elastic/x-pack-elasticsearch#862 we made the processing of a search response cancellable in a responsive manner. However, the search phase cannot be cancelled at the moment. Chunking makes the search phase shorter, which will result to a better user experience when they stop an aggregated datafeed. Also note the change sets the default chunking_config on datafeed creation so the setting is no longer hidden. Relates to elastic/x-pack-elasticsearch#803 Original commit: elastic/x-pack-elasticsearch@ae8f120f5f8dcf72b24cd116458597398bfda3c4 --- .../xpack/ml/datafeed/DatafeedConfig.java | 22 +++++++++++++++++-- .../extractor/DataExtractorFactory.java | 10 ++------- .../chunked/ChunkedDataExtractorFactory.java | 2 +- .../ml/datafeed/DatafeedConfigTests.java | 14 ++++++++++++ .../extractor/DataExtractorFactoryTests.java | 18 ++++++++++++++- .../rest-api-spec/test/ml/datafeeds_crud.yaml | 6 +++++ 6 files changed, 60 insertions(+), 12 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java index e6836e2f8a2..cc63528649e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java @@ -212,7 +212,11 @@ public class DatafeedConfig extends AbstractDiffable implements * The method expects a valid top level aggregation to exist. */ public long getHistogramIntervalMillis() { - AggregationBuilder topLevelAgg = getTopLevelAgg(); + return getHistogramIntervalMillis(aggregations); + } + + private static long getHistogramIntervalMillis(AggregatorFactories.Builder aggregations) { + AggregationBuilder topLevelAgg = getTopLevelAgg(aggregations); if (topLevelAgg == null) { throw new IllegalStateException("No aggregations exist"); } @@ -225,7 +229,7 @@ public class DatafeedConfig extends AbstractDiffable implements } } - private AggregationBuilder getTopLevelAgg() { + private static AggregationBuilder getTopLevelAgg(AggregatorFactories.Builder aggregations) { if (aggregations == null || aggregations.getAggregatorFactories().isEmpty()) { return null; } @@ -420,6 +424,7 @@ public class DatafeedConfig extends AbstractDiffable implements private static final int DEFAULT_SCROLL_SIZE = 1000; private static final TimeValue DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(1); + private static final int DEFAULT_AGGREGATION_CHUNKING_BUCKETS = 1000; private String id; private String jobId; @@ -531,6 +536,7 @@ public class DatafeedConfig extends AbstractDiffable implements throw invalidOptionValue(TYPES.getPreferredName(), types); } validateAggregations(); + setDefaultChunkingConfig(); return new DatafeedConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize, source, chunkingConfig); } @@ -560,6 +566,18 @@ public class DatafeedConfig extends AbstractDiffable implements } } + private void setDefaultChunkingConfig() { + if (chunkingConfig == null) { + if (aggregations == null) { + chunkingConfig = ChunkingConfig.newAuto(); + } else { + long histogramIntervalMillis = getHistogramIntervalMillis(aggregations); + chunkingConfig = ChunkingConfig.newManual(TimeValue.timeValueMillis( + DEFAULT_AGGREGATION_CHUNKING_BUCKETS * histogramIntervalMillis)); + } + } + } + private static ElasticsearchException invalidOptionValue(String fieldName, Object value) { String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, fieldName, value); throw new IllegalArgumentException(msg); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java index dd4849df6e6..79429c64d94 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.datafeed.extractor; import org.elasticsearch.client.Client; -import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory; @@ -23,12 +22,7 @@ public interface DataExtractorFactory { boolean isScrollSearch = datafeedConfig.hasAggregations() == false; DataExtractorFactory dataExtractorFactory = isScrollSearch ? new ScrollDataExtractorFactory(client, datafeedConfig, job) : new AggregationDataExtractorFactory(client, datafeedConfig, job); - ChunkingConfig chunkingConfig = datafeedConfig.getChunkingConfig(); - if (chunkingConfig == null) { - chunkingConfig = isScrollSearch ? ChunkingConfig.newAuto() : ChunkingConfig.newOff(); - } - - return chunkingConfig.isEnabled() ? new ChunkedDataExtractorFactory(client, datafeedConfig, job, dataExtractorFactory) - : dataExtractorFactory; + return datafeedConfig.getChunkingConfig().isEnabled() ? new ChunkedDataExtractorFactory( + client, datafeedConfig, job, dataExtractorFactory) : dataExtractorFactory; } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java index 20417edb22d..17208b201b3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java @@ -38,7 +38,7 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory { datafeedConfig.getScrollSize(), start, end, - datafeedConfig.getChunkingConfig() == null ? null : datafeedConfig.getChunkingConfig().getTimeSpan()); + datafeedConfig.getChunkingConfig().getTimeSpan()); return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java index edcec740638..137a3763336 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java @@ -292,6 +292,20 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase