[ML] Also chunk aggregated datafeed by default (elastic/x-pack-elasticsearch#999)

The change applies chunking by default on aggregated datafeeds.
The chunking is set to a manual mode with time_span being
1000 histogram buckets.

The motivation for the change is two-fold:

1. It helps to avoid memory pressure/blowing.
Users may perform a lookback on a very long period of time. In that
case, we may hold a search response for all that time which could
include too many buckets. By chunking, we avoid that situation
as we know we'll only keep results for 1000 buckets at a time.

2. It makes cancellation more responsive.
In elastic/x-pack-elasticsearch#862 we made the processing of a search response cancellable in a
responsive manner. However, the search phase cannot be cancelled at
the moment. Chunking makes the search phase shorter, which will
result to a better user experience when they stop an aggregated
datafeed.

Also note the change sets the default chunking_config on datafeed
creation so the setting is no longer hidden.

Relates to elastic/x-pack-elasticsearch#803

Original commit: elastic/x-pack-elasticsearch@ae8f120f5f
This commit is contained in:
Dimitris Athanasiou 2017-04-10 18:20:48 +01:00 committed by GitHub
parent 0b6ac175da
commit 1e1b5405b3
6 changed files with 60 additions and 12 deletions

View File

@ -212,7 +212,11 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
* The method expects a valid top level aggregation to exist. * The method expects a valid top level aggregation to exist.
*/ */
public long getHistogramIntervalMillis() { public long getHistogramIntervalMillis() {
AggregationBuilder topLevelAgg = getTopLevelAgg(); return getHistogramIntervalMillis(aggregations);
}
private static long getHistogramIntervalMillis(AggregatorFactories.Builder aggregations) {
AggregationBuilder topLevelAgg = getTopLevelAgg(aggregations);
if (topLevelAgg == null) { if (topLevelAgg == null) {
throw new IllegalStateException("No aggregations exist"); throw new IllegalStateException("No aggregations exist");
} }
@ -225,7 +229,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
} }
} }
private AggregationBuilder getTopLevelAgg() { private static AggregationBuilder getTopLevelAgg(AggregatorFactories.Builder aggregations) {
if (aggregations == null || aggregations.getAggregatorFactories().isEmpty()) { if (aggregations == null || aggregations.getAggregatorFactories().isEmpty()) {
return null; return null;
} }
@ -420,6 +424,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private static final int DEFAULT_SCROLL_SIZE = 1000; private static final int DEFAULT_SCROLL_SIZE = 1000;
private static final TimeValue DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(1); private static final TimeValue DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(1);
private static final int DEFAULT_AGGREGATION_CHUNKING_BUCKETS = 1000;
private String id; private String id;
private String jobId; private String jobId;
@ -531,6 +536,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
throw invalidOptionValue(TYPES.getPreferredName(), types); throw invalidOptionValue(TYPES.getPreferredName(), types);
} }
validateAggregations(); validateAggregations();
setDefaultChunkingConfig();
return new DatafeedConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize, return new DatafeedConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize,
source, chunkingConfig); source, chunkingConfig);
} }
@ -560,6 +566,18 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
} }
} }
private void setDefaultChunkingConfig() {
if (chunkingConfig == null) {
if (aggregations == null) {
chunkingConfig = ChunkingConfig.newAuto();
} else {
long histogramIntervalMillis = getHistogramIntervalMillis(aggregations);
chunkingConfig = ChunkingConfig.newManual(TimeValue.timeValueMillis(
DEFAULT_AGGREGATION_CHUNKING_BUCKETS * histogramIntervalMillis));
}
}
}
private static ElasticsearchException invalidOptionValue(String fieldName, Object value) { private static ElasticsearchException invalidOptionValue(String fieldName, Object value) {
String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, fieldName, value); String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, fieldName, value);
throw new IllegalArgumentException(msg); throw new IllegalArgumentException(msg);

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.ml.datafeed.extractor; package org.elasticsearch.xpack.ml.datafeed.extractor;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory;
@ -23,12 +22,7 @@ public interface DataExtractorFactory {
boolean isScrollSearch = datafeedConfig.hasAggregations() == false; boolean isScrollSearch = datafeedConfig.hasAggregations() == false;
DataExtractorFactory dataExtractorFactory = isScrollSearch ? new ScrollDataExtractorFactory(client, datafeedConfig, job) DataExtractorFactory dataExtractorFactory = isScrollSearch ? new ScrollDataExtractorFactory(client, datafeedConfig, job)
: new AggregationDataExtractorFactory(client, datafeedConfig, job); : new AggregationDataExtractorFactory(client, datafeedConfig, job);
ChunkingConfig chunkingConfig = datafeedConfig.getChunkingConfig(); return datafeedConfig.getChunkingConfig().isEnabled() ? new ChunkedDataExtractorFactory(
if (chunkingConfig == null) { client, datafeedConfig, job, dataExtractorFactory) : dataExtractorFactory;
chunkingConfig = isScrollSearch ? ChunkingConfig.newAuto() : ChunkingConfig.newOff();
}
return chunkingConfig.isEnabled() ? new ChunkedDataExtractorFactory(client, datafeedConfig, job, dataExtractorFactory)
: dataExtractorFactory;
} }
} }

View File

@ -38,7 +38,7 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory {
datafeedConfig.getScrollSize(), datafeedConfig.getScrollSize(),
start, start,
end, end,
datafeedConfig.getChunkingConfig() == null ? null : datafeedConfig.getChunkingConfig().getTimeSpan()); datafeedConfig.getChunkingConfig().getTimeSpan());
return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext); return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext);
} }
} }

View File

@ -292,6 +292,20 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
assertThat(e.getMessage(), containsString("When specifying a date_histogram calendar interval [8d]")); assertThat(e.getMessage(), containsString("When specifying a date_histogram calendar interval [8d]"));
} }
public void testDefaultChunkingConfig_GivenAggregations() {
assertThat(createDatafeedWithDateHistogram("1s").getChunkingConfig(),
equalTo(ChunkingConfig.newManual(TimeValue.timeValueSeconds(1000))));
assertThat(createDatafeedWithDateHistogram("2h").getChunkingConfig(),
equalTo(ChunkingConfig.newManual(TimeValue.timeValueHours(2000))));
}
public void testChunkingConfig_GivenExplicitSetting() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(createDatafeedWithDateHistogram("30s"));
builder.setChunkingConfig(ChunkingConfig.newAuto());
assertThat(builder.build().getChunkingConfig(), equalTo(ChunkingConfig.newAuto()));
}
public static String randomValidDatafeedId() { public static String randomValidDatafeedId() {
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
return generator.ofCodePointsLength(random(), 10, 10); return generator.ofCodePointsLength(random(), 10, 10);

View File

@ -74,7 +74,7 @@ public class DataExtractorFactoryTests extends ESTestCase {
assertThat(dataExtractorFactory, instanceOf(ScrollDataExtractorFactory.class)); assertThat(dataExtractorFactory, instanceOf(ScrollDataExtractorFactory.class));
} }
public void testCreateDataExtractorFactoryGivenAggregation() { public void testCreateDataExtractorFactoryGivenDefaultAggregation() {
DataDescription.Builder dataDescription = new DataDescription.Builder(); DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time"); dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
@ -86,6 +86,22 @@ public class DataExtractorFactoryTests extends ESTestCase {
DataExtractorFactory dataExtractorFactory = DataExtractorFactory dataExtractorFactory =
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date())); DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()));
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
}
public void testCreateDataExtractorFactoryGivenAggregationWithOffChunk() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000)));
DataExtractorFactory dataExtractorFactory =
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()));
assertThat(dataExtractorFactory, instanceOf(AggregationDataExtractorFactory.class)); assertThat(dataExtractorFactory, instanceOf(AggregationDataExtractorFactory.class));
} }

View File

@ -70,6 +70,12 @@ setup:
"types":["type-bar"] "types":["type-bar"]
} }
- match: { datafeed_id: "test-datafeed-1" } - match: { datafeed_id: "test-datafeed-1" }
- match: { job_id: "job-1" }
- match: { indexes: ["index-foo"] }
- match: { types: ["type-bar"] }
- match: { scroll_size: 1000 }
- is_true: query.match_all
- match: { chunking_config: { mode: "auto" }}
--- ---
"Test put datafeed whose id is already taken": "Test put datafeed whose id is already taken":