[ML] Add lazy parsing for DatafeedConfig:Aggs,Query (#36117)

* Lazily parsing aggs and query in DatafeedConfigs

* Adding parser tests

* Fixing exception types && unneccessary checked ex

* Adding semi aggregation parser

* Adding tests, fixing up semi-parser

* Reverting semi-parsing

* Moving agg validations

* Making bad configs throw badRequestException
This commit is contained in:
Benjamin Trent 2018-12-04 09:41:47 -06:00 committed by GitHub
parent 6e1ff31222
commit 166d9a94d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 295 additions and 79 deletions

View File

@ -13,9 +13,11 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CachedSupplier;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
@ -31,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer;
import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils;
import java.io.IOException;
@ -43,6 +46,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
/**
* Datafeed configuration options. Describes where to proactively pull input
@ -60,6 +64,45 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private static final int TWO_MINS_SECONDS = 2 * SECONDS_IN_MINUTE;
private static final int TWENTY_MINS_SECONDS = 20 * SECONDS_IN_MINUTE;
private static final int HALF_DAY_SECONDS = 12 * 60 * SECONDS_IN_MINUTE;
static final XContentObjectTransformer<QueryBuilder> QUERY_TRANSFORMER = XContentObjectTransformer.queryBuilderTransformer();
private static final BiFunction<Map<String, Object>, String, QueryBuilder> lazyQueryParser = (objectMap, id) -> {
try {
return QUERY_TRANSFORMER.fromMap(objectMap);
} catch (IOException | XContentParseException exception) {
// Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user
if (exception.getCause() instanceof IllegalArgumentException) {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT,
id,
exception.getCause().getMessage()),
exception.getCause());
} else {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, exception, id),
exception);
}
}
};
static final XContentObjectTransformer<AggregatorFactories.Builder> AGG_TRANSFORMER = XContentObjectTransformer.aggregatorTransformer();
private static final BiFunction<Map<String, Object>, String, AggregatorFactories.Builder> lazyAggParser = (objectMap, id) -> {
try {
return AGG_TRANSFORMER.fromMap(objectMap);
} catch (IOException | XContentParseException exception) {
// Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user
if (exception.getCause() instanceof IllegalArgumentException) {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT,
id,
exception.getCause().getMessage()),
exception.getCause());
} else {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, exception.getMessage(), id),
exception);
}
}
};
// Used for QueryPage
public static final ParseField RESULTS_FIELD = new ParseField("datafeeds");
@ -90,6 +133,21 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
public static final ObjectParser<Builder, Void> LENIENT_PARSER = createParser(true);
public static final ObjectParser<Builder, Void> STRICT_PARSER = createParser(false);
public static void validateAggregations(AggregatorFactories.Builder aggregations) {
if (aggregations == null) {
return;
}
Collection<AggregationBuilder> aggregatorFactories = aggregations.getAggregatorFactories();
if (aggregatorFactories.isEmpty()) {
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM);
}
AggregationBuilder histogramAggregation = ExtractorUtils.getHistogramAggregation(aggregatorFactories);
Builder.checkNoMoreHistogramAggregations(histogramAggregation.getSubAggregations());
Builder.checkHistogramAggregationHasChildMaxTimeAgg(histogramAggregation);
Builder.checkHistogramIntervalIsPositive(histogramAggregation);
}
private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFields) {
ObjectParser<Builder, Void> parser = new ObjectParser<>("datafeed_config", ignoreUnknownFields, Builder::new);
@ -102,9 +160,15 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
builder.setQueryDelay(TimeValue.parseTimeValue(val, QUERY_DELAY.getPreferredName())), QUERY_DELAY);
parser.declareString((builder, val) ->
builder.setFrequency(TimeValue.parseTimeValue(val, FREQUENCY.getPreferredName())), FREQUENCY);
parser.declareObject(Builder::setQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), QUERY);
parser.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGREGATIONS);
parser.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGS);
if (ignoreUnknownFields) {
parser.declareObject(Builder::setQuery, (p, c) -> p.map(), QUERY);
parser.declareObject(Builder::setAggregations, (p, c) -> p.map(), AGGREGATIONS);
parser.declareObject(Builder::setAggregations, (p, c) -> p.map(), AGGS);
} else {
parser.declareObject(Builder::setParsedQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), QUERY);
parser.declareObject(Builder::setParsedAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGREGATIONS);
parser.declareObject(Builder::setParsedAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGS);
}
parser.declareObject(Builder::setScriptFields, (p, c) -> {
List<SearchSourceBuilder.ScriptField> parsedScriptFields = new ArrayList<>();
while (p.nextToken() != XContentParser.Token.END_OBJECT) {
@ -146,16 +210,18 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private final List<String> indices;
private final List<String> types;
private final QueryBuilder query;
private final AggregatorFactories.Builder aggregations;
private final Map<String, Object> query;
private final Map<String, Object> aggregations;
private final List<SearchSourceBuilder.ScriptField> scriptFields;
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private final Map<String, String> headers;
private final DelayedDataCheckConfig delayedDataCheckConfig;
private final CachedSupplier<QueryBuilder> querySupplier;
private final CachedSupplier<AggregatorFactories.Builder> aggSupplier;
private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
QueryBuilder query, AggregatorFactories.Builder aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Map<String, Object> query, Map<String, Object> aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig, Map<String, String> headers,
DelayedDataCheckConfig delayedDataCheckConfig) {
this.id = id;
@ -171,6 +237,8 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.chunkingConfig = chunkingConfig;
this.headers = Collections.unmodifiableMap(headers);
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.querySupplier = new CachedSupplier<>(() -> lazyQueryParser.apply(query, id));
this.aggSupplier = new CachedSupplier<>(() -> lazyAggParser.apply(aggregations, id));
}
public DatafeedConfig(StreamInput in) throws IOException {
@ -188,8 +256,17 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
} else {
this.types = null;
}
this.query = in.readNamedWriteable(QueryBuilder.class);
this.aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new);
if (in.getVersion().before(Version.CURRENT)) {
this.query = QUERY_TRANSFORMER.toMap(in.readNamedWriteable(QueryBuilder.class));
this.aggregations = AGG_TRANSFORMER.toMap(in.readOptionalWriteable(AggregatorFactories.Builder::new));
} else {
this.query = in.readMap();
if (in.readBoolean()) {
this.aggregations = in.readMap();
} else {
this.aggregations = null;
}
}
if (in.readBoolean()) {
this.scriptFields = Collections.unmodifiableList(in.readList(SearchSourceBuilder.ScriptField::new));
} else {
@ -207,6 +284,8 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
} else {
delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig();
}
this.querySupplier = new CachedSupplier<>(() -> lazyQueryParser.apply(query, id));
this.aggSupplier = new CachedSupplier<>(() -> lazyAggParser.apply(aggregations, id));
}
public String getId() {
@ -237,11 +316,19 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
return scrollSize;
}
public QueryBuilder getQuery() {
public QueryBuilder getParsedQuery() {
return querySupplier.get();
}
public Map<String, Object> getQuery() {
return query;
}
public AggregatorFactories.Builder getAggregations() {
public AggregatorFactories.Builder getParsedAggregations() {
return aggSupplier.get();
}
public Map<String, Object> getAggregations() {
return aggregations;
}
@ -249,14 +336,14 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
* Returns the histogram's interval as epoch millis.
*/
public long getHistogramIntervalMillis() {
return ExtractorUtils.getHistogramIntervalMillis(aggregations);
return ExtractorUtils.getHistogramIntervalMillis(getParsedAggregations());
}
/**
* @return {@code true} when there are non-empty aggregations, {@code false} otherwise
*/
public boolean hasAggregations() {
return aggregations != null && aggregations.count() > 0;
return aggregations != null && aggregations.size() > 0;
}
public List<SearchSourceBuilder.ScriptField> getScriptFields() {
@ -293,8 +380,16 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
} else {
out.writeBoolean(false);
}
out.writeNamedWriteable(query);
out.writeOptionalWriteable(aggregations);
if (out.getVersion().before(Version.CURRENT)) {
out.writeNamedWriteable(getParsedQuery());
out.writeOptionalWriteable(getParsedAggregations());
} else {
out.writeMap(query);
out.writeBoolean(aggregations != null);
if (aggregations != null) {
out.writeMap(aggregations);
}
}
if (scriptFields != null) {
out.writeBoolean(true);
out.writeList(scriptFields);
@ -454,15 +549,20 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private TimeValue frequency;
private List<String> indices = Collections.emptyList();
private List<String> types = Collections.emptyList();
private QueryBuilder query = QueryBuilders.matchAllQuery();
private AggregatorFactories.Builder aggregations;
private Map<String, Object> query;
private Map<String, Object> aggregations;
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize = DEFAULT_SCROLL_SIZE;
private ChunkingConfig chunkingConfig;
private Map<String, String> headers = Collections.emptyMap();
private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig();
public Builder() {
try {
this.query = QUERY_TRANSFORMER.toMap(QueryBuilders.matchAllQuery());
} catch (IOException ex) { /*Should never happen*/ }
}
public Builder(String id, String jobId) {
@ -517,11 +617,47 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.frequency = frequency;
}
public void setQuery(QueryBuilder query) {
public void setParsedQuery(QueryBuilder query) {
try {
setQuery(QUERY_TRANSFORMER.toMap(ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName())));
} catch (IOException | XContentParseException exception) {
if (exception.getCause() instanceof IllegalArgumentException) {
// Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT,
id,
exception.getCause().getMessage()),
exception.getCause());
} else {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, id, exception.getMessage()), exception);
}
}
}
void setQuery(Map<String, Object> query) {
this.query = ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName());
}
public void setAggregations(AggregatorFactories.Builder aggregations) {
public void setParsedAggregations(AggregatorFactories.Builder aggregations) {
try {
setAggregations(AGG_TRANSFORMER.toMap(aggregations));
} catch (IOException | XContentParseException exception) {
// Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user
if (exception.getCause() instanceof IllegalArgumentException) {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT,
id,
exception.getCause().getMessage()),
exception.getCause());
} else {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id, exception.getMessage()), exception);
}
}
}
void setAggregations(Map<String, Object> aggregations) {
this.aggregations = aggregations;
}
@ -564,30 +700,22 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
throw invalidOptionValue(TYPES.getPreferredName(), types);
}
validateAggregations();
validateScriptFields();
setDefaultChunkingConfig();
setDefaultQueryDelay();
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
chunkingConfig, headers, delayedDataCheckConfig);
}
void validateAggregations() {
void validateScriptFields() {
if (aggregations == null) {
return;
}
if (scriptFields != null && !scriptFields.isEmpty()) {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS));
Messages.getMessage(Messages.DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS));
}
Collection<AggregationBuilder> aggregatorFactories = aggregations.getAggregatorFactories();
if (aggregatorFactories.isEmpty()) {
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM);
}
AggregationBuilder histogramAggregation = ExtractorUtils.getHistogramAggregation(aggregatorFactories);
checkNoMoreHistogramAggregations(histogramAggregation.getSubAggregations());
checkHistogramAggregationHasChildMaxTimeAgg(histogramAggregation);
checkHistogramIntervalIsPositive(histogramAggregation);
}
private static void checkNoMoreHistogramAggregations(Collection<AggregationBuilder> aggregations) {
@ -630,7 +758,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
if (aggregations == null) {
chunkingConfig = ChunkingConfig.newAuto();
} else {
long histogramIntervalMillis = ExtractorUtils.getHistogramIntervalMillis(aggregations);
long histogramIntervalMillis = ExtractorUtils.getHistogramIntervalMillis(lazyAggParser.apply(aggregations, id));
chunkingConfig = ChunkingConfig.newManual(TimeValue.timeValueMillis(
DEFAULT_AGGREGATION_CHUNKING_BUCKETS * histogramIntervalMillis));
}

View File

@ -295,10 +295,11 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
builder.setTypes(types);
}
if (query != null) {
builder.setQuery(query);
builder.setParsedQuery(query);
}
if (aggregations != null) {
builder.setAggregations(aggregations);
DatafeedConfig.validateAggregations(aggregations);
builder.setParsedAggregations(aggregations);
}
if (scriptFields != null) {
builder.setScriptFields(scriptFields);
@ -371,9 +372,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
&& (queryDelay == null || Objects.equals(queryDelay, datafeed.getQueryDelay()))
&& (indices == null || Objects.equals(indices, datafeed.getIndices()))
&& (types == null || Objects.equals(types, datafeed.getTypes()))
&& (query == null || Objects.equals(query, datafeed.getQuery()))
&& (query == null || Objects.equals(query, datafeed.getParsedQuery()))
&& (scrollSize == null || Objects.equals(scrollSize, datafeed.getQueryDelay()))
&& (aggregations == null || Objects.equals(aggregations, datafeed.getAggregations()))
&& (aggregations == null || Objects.equals(aggregations, datafeed.getParsedAggregations()))
&& (scriptFields == null || Objects.equals(scriptFields, datafeed.getScriptFields()))
&& (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig()))
&& (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig()));

View File

@ -26,6 +26,8 @@ public final class Messages {
"delayed_data_check_config: check_window [{0}] must be greater than the bucket_span [{1}]";
public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS =
"delayed_data_check_config: check_window [{0}] must be less than 10,000x the bucket_span [{1}]";
public static final String DATAFEED_CONFIG_QUERY_BAD_FORMAT = "Datafeed [{0}] query is not parsable: {1}";
public static final String DATAFEED_CONFIG_AGG_BAD_FORMAT = "Datafeed [{0}] aggregations are not parsable: {1}";
public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency";
public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists";

View File

@ -61,6 +61,9 @@ public class XContentObjectTransformer<T extends ToXContentObject> {
}
public T fromMap(Map<String, Object> stringObjectMap) throws IOException {
if (stringObjectMap == null) {
return null;
}
LoggingDeprecationAccumulationHandler deprecationLogger = new LoggingDeprecationAccumulationHandler();
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(stringObjectMap);
XContentParser parser = XContentType.JSON
@ -74,6 +77,9 @@ public class XContentObjectTransformer<T extends ToXContentObject> {
}
public Map<String, Object> toMap(T object) throws IOException {
if (object == null) {
return null;
}
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = object.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
return XContentHelper.convertToMap(BytesReference.bytes(content), true, XContentType.JSON).v2();

View File

@ -67,7 +67,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
builder.setIndices(randomStringList(1, 10));
builder.setTypes(randomStringList(0, 10));
if (randomBoolean()) {
builder.setQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)));
builder.setParsedQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)));
}
boolean addScriptFields = randomBoolean();
if (addScriptFields) {
@ -91,7 +91,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
aggs.addAggregator(AggregationBuilders.dateHistogram("buckets")
.interval(aggHistogramInterval).subAggregation(maxTime).field("time"));
builder.setAggregations(aggs);
builder.setParsedAggregations(aggs);
}
if (randomBoolean()) {
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
@ -155,6 +155,43 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
" \"scroll_size\": 1234\n" +
"}";
private static final String ANACHRONISTIC_QUERY_DATAFEED = "{\n" +
" \"datafeed_id\": \"farequote-datafeed\",\n" +
" \"job_id\": \"farequote\",\n" +
" \"frequency\": \"1h\",\n" +
" \"indices\": [\"farequote1\", \"farequote2\"],\n" +
//query:match:type stopped being supported in 6.x
" \"query\": {\"match\" : {\"query\":\"fieldName\", \"type\": \"phrase\"}},\n" +
" \"scroll_size\": 1234\n" +
"}";
private static final String ANACHRONISTIC_AGG_DATAFEED = "{\n" +
" \"datafeed_id\": \"farequote-datafeed\",\n" +
" \"job_id\": \"farequote\",\n" +
" \"frequency\": \"1h\",\n" +
" \"indices\": [\"farequote1\", \"farequote2\"],\n" +
" \"aggregations\": {\n" +
" \"buckets\": {\n" +
" \"date_histogram\": {\n" +
" \"field\": \"time\",\n" +
" \"interval\": \"360s\",\n" +
" \"time_zone\": \"UTC\"\n" +
" },\n" +
" \"aggregations\": {\n" +
" \"time\": {\n" +
" \"max\": {\"field\": \"time\"}\n" +
" },\n" +
" \"airline\": {\n" +
" \"terms\": {\n" +
" \"field\": \"airline\",\n" +
" \"size\": 0\n" + //size: 0 stopped being supported in 6.x
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
"}";
public void testFutureConfigParse() throws IOException {
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, FUTURE_DATAFEED);
@ -163,6 +200,44 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
assertEquals("[6:5] [datafeed_config] unknown field [tomorrows_technology_today], parser not found", e.getMessage());
}
public void testPastQueryConfigParse() throws IOException {
try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_QUERY_DATAFEED)) {
DatafeedConfig config = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build();
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> config.getParsedQuery());
assertEquals("[match] query doesn't support multiple fields, found [query] and [type]", e.getMessage());
}
try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_QUERY_DATAFEED)) {
XContentParseException e = expectThrows(XContentParseException.class,
() -> DatafeedConfig.STRICT_PARSER.apply(parser, null).build());
assertEquals("[6:25] [datafeed_config] failed to parse field [query]", e.getMessage());
}
}
public void testPastAggConfigParse() throws IOException {
try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_AGG_DATAFEED)) {
DatafeedConfig.Builder configBuilder = DatafeedConfig.LENIENT_PARSER.apply(parser, null);
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> configBuilder.build());
assertEquals(
"Datafeed [farequote-datafeed] aggregations are not parsable: [size] must be greater than 0. Found [0] in [airline]",
e.getMessage());
}
try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_AGG_DATAFEED)) {
XContentParseException e = expectThrows(XContentParseException.class,
() -> DatafeedConfig.STRICT_PARSER.apply(parser, null).build());
assertEquals("[8:25] [datafeed_config] failed to parse field [aggregations]", e.getMessage());
}
}
public void testFutureMetadataParse() throws IOException {
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, FUTURE_DATAFEED);
@ -274,7 +349,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
datafeed.setTypes(Collections.singletonList("my_type"));
datafeed.setScriptFields(Collections.singletonList(new SearchSourceBuilder.ScriptField(randomAlphaOfLength(10),
mockScript(randomAlphaOfLength(10)), randomBoolean())));
datafeed.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo")));
datafeed.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo")));
ElasticsearchException e = expectThrows(ElasticsearchException.class, datafeed::build);
@ -295,7 +370,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(
builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(300000).subAggregation(maxTime).field("time")));
DatafeedConfig datafeedConfig = builder.build();
@ -306,7 +381,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
builder.setAggregations(new AggregatorFactories.Builder());
builder.setParsedAggregations(new AggregatorFactories.Builder());
ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build);
@ -318,13 +393,13 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(
builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("time").subAggregation(maxTime).field("time"))
);
ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build);
assertThat(e.getMessage(), equalTo("Aggregation interval must be greater than 0"));
assertThat(e.getMessage(), containsString("[interval] must be >0 for histogram aggregation [time]"));
}
public void testBuild_GivenDateHistogramWithInvalidTimeZone() {
@ -341,7 +416,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> createDatafeedWithDateHistogram((String) null));
assertThat(e.getMessage(), equalTo("Aggregation interval must be greater than 0"));
assertThat(e.getMessage(), containsString("Aggregation interval must be greater than 0"));
}
public void testBuild_GivenValidDateHistogram() {
@ -402,9 +477,8 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
TermsAggregationBuilder toplevelTerms = AggregationBuilders.terms("top_level");
toplevelTerms.subAggregation(dateHistogram);
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("foo", "bar");
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms));
ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::validateAggregations);
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> DatafeedConfig.validateAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms)));
assertEquals("Aggregations can only have 1 date_histogram or histogram aggregation", e.getMessage());
}
@ -520,7 +594,9 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(dateHistogram));
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator(dateHistogram);
DatafeedConfig.validateAggregations(aggs);
builder.setParsedAggregations(aggs);
return builder.build();
}
@ -556,11 +632,11 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
break;
case 6:
BoolQueryBuilder query = new BoolQueryBuilder();
if (instance.getQuery() != null) {
query.must(instance.getQuery());
if (instance.getParsedQuery() != null) {
query.must(instance.getParsedQuery());
}
query.filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
builder.setQuery(query);
builder.setParsedQuery(query);
break;
case 7:
if (instance.hasAggregations()) {
@ -571,7 +647,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
aggBuilder
.addAggregator(new DateHistogramAggregationBuilder(timeField).field(timeField).interval(between(10000, 3600000))
.subAggregation(new MaxAggregationBuilder(timeField).field(timeField)));
builder.setAggregations(aggBuilder);
builder.setParsedAggregations(aggBuilder);
if (instance.getScriptFields().isEmpty() == false) {
builder.setScriptFields(Collections.emptyList());
}

View File

@ -167,7 +167,7 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
assertThat(updatedDatafeed.getTypes(), equalTo(Collections.singletonList("t_2")));
assertThat(updatedDatafeed.getQueryDelay(), equalTo(TimeValue.timeValueSeconds(42)));
assertThat(updatedDatafeed.getFrequency(), equalTo(TimeValue.timeValueSeconds(142)));
assertThat(updatedDatafeed.getQuery(), equalTo(QueryBuilders.termQuery("a", "b")));
assertThat(updatedDatafeed.getParsedQuery(), equalTo(QueryBuilders.termQuery("a", "b")));
assertThat(updatedDatafeed.hasAggregations(), is(false));
assertThat(updatedDatafeed.getScriptFields(),
equalTo(Collections.singletonList(new SearchSourceBuilder.ScriptField("a", mockScript("b"), false))));
@ -192,7 +192,7 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_1")));
assertThat(updatedDatafeed.getTypes(), equalTo(Collections.singletonList("t_1")));
assertThat(updatedDatafeed.getAggregations(),
assertThat(updatedDatafeed.getParsedAggregations(),
equalTo(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("a").interval(300000).field("time").subAggregation(maxTime))));
}

View File

@ -153,13 +153,13 @@ public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase {
DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed",
job.getId(),
Collections.singletonList(index));
datafeedConfigBuilder.setAggregations(new AggregatorFactories.Builder().addAggregator(
datafeedConfigBuilder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("time")
.subAggregation(maxTime)
.subAggregation(avgAggregationBuilder)
.field("time")
.interval(TimeValue.timeValueMinutes(5).millis())));
datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gte(numDocs/2));
datafeedConfigBuilder.setParsedQuery(new RangeQueryBuilder("value").gte(numDocs/2));
datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5));
datafeedConfigBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(12)));

View File

@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
import org.elasticsearch.xpack.core.security.SecurityContext;
@ -154,6 +155,7 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDat
private void putDatafeed(PutDatafeedAction.Request request, Map<String, String> headers,
ActionListener<PutDatafeedAction.Response> listener) {
DatafeedConfig.validateAggregations(request.getDatafeed().getParsedAggregations());
clusterService.submitStateUpdateTask(
"put-datafeed-" + request.getDatafeed().getId(),
new AckedClusterStateUpdateTask<PutDatafeedAction.Response>(request, listener) {

View File

@ -90,6 +90,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
throw ExceptionsHelper.missingJobException(datafeed.getJobId());
}
DatafeedJobValidator.validate(datafeed, job);
DatafeedConfig.validateAggregations(datafeed.getParsedAggregations());
JobState jobState = MlTasks.getJobState(datafeed.getJobId(), tasks);
if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) {
throw ExceptionsHelper.conflictStatusException("cannot start datafeed [" + datafeedId + "] because job [" + job.getId() +

View File

@ -44,7 +44,7 @@ public class DelayedDataDetectorFactory {
window,
job.getId(),
job.getDataDescription().getTimeField(),
datafeedConfig.getQuery(),
datafeedConfig.getParsedQuery(),
datafeedConfig.getIndices().toArray(new String[0]),
client);
} else {

View File

@ -35,8 +35,8 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory {
job.getAnalysisConfig().analysisFields(),
datafeedConfig.getIndices(),
datafeedConfig.getTypes(),
datafeedConfig.getQuery(),
datafeedConfig.getAggregations(),
datafeedConfig.getParsedQuery(),
datafeedConfig.getParsedAggregations(),
Intervals.alignToCeil(start, histogramInterval),
Intervals.alignToFloor(end, histogramInterval),
job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT),

View File

@ -57,8 +57,8 @@ public class RollupDataExtractorFactory implements DataExtractorFactory {
job.getAnalysisConfig().analysisFields(),
datafeedConfig.getIndices(),
datafeedConfig.getTypes(),
datafeedConfig.getQuery(),
datafeedConfig.getAggregations(),
datafeedConfig.getParsedQuery(),
datafeedConfig.getParsedAggregations(),
Intervals.alignToCeil(start, histogramInterval),
Intervals.alignToFloor(end, histogramInterval),
job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT),
@ -73,7 +73,7 @@ public class RollupDataExtractorFactory implements DataExtractorFactory {
ActionListener<DataExtractorFactory> listener) {
final AggregationBuilder datafeedHistogramAggregation = getHistogramAggregation(
datafeed.getAggregations().getAggregatorFactories());
datafeed.getParsedAggregations().getAggregatorFactories());
if ((datafeedHistogramAggregation instanceof DateHistogramAggregationBuilder) == false) {
listener.onFailure(
new IllegalArgumentException("Rollup requires that the datafeed configuration use a [date_histogram] aggregation," +
@ -104,7 +104,7 @@ public class RollupDataExtractorFactory implements DataExtractorFactory {
return;
}
final List<ValuesSourceAggregationBuilder> flattenedAggs = new ArrayList<>();
flattenAggregations(datafeed.getAggregations().getAggregatorFactories(), datafeedHistogramAggregation, flattenedAggs);
flattenAggregations(datafeed.getParsedAggregations().getAggregatorFactories(), datafeedHistogramAggregation, flattenedAggs);
if (validIntervalCaps.stream().noneMatch(rollupJobConfig -> hasAggregations(rollupJobConfig, flattenedAggs))) {
listener.onFailure(

View File

@ -36,7 +36,7 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory {
job.getDataDescription().getTimeField(),
datafeedConfig.getIndices(),
datafeedConfig.getTypes(),
datafeedConfig.getQuery(),
datafeedConfig.getParsedQuery(),
datafeedConfig.getScrollSize(),
timeAligner.alignToCeil(start),
timeAligner.alignToFloor(end),

View File

@ -44,7 +44,7 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
extractedFields,
datafeedConfig.getIndices(),
datafeedConfig.getTypes(),
datafeedConfig.getQuery(),
datafeedConfig.getParsedQuery(),
datafeedConfig.getScriptFields(),
datafeedConfig.getScrollSize(),
start,

View File

@ -81,7 +81,7 @@ public class TransportPreviewDatafeedActionTests extends ESTestCase {
DatafeedConfig.Builder datafeed = new DatafeedConfig.Builder("no_aggs_feed", "job_foo");
datafeed.setIndices(Collections.singletonList("my_index"));
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
datafeed.setAggregations(AggregatorFactories.builder().addAggregator(
datafeed.setParsedAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time")));
datafeed.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1)));

View File

@ -222,7 +222,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
HistogramAggregationBuilder histogram =
AggregationBuilders.histogram("time").interval(interval).field("time").subAggregation(maxTime);
DatafeedConfig.Builder datafeedConfig = createValidDatafeedConfig();
datafeedConfig.setAggregations(new AggregatorFactories.Builder().addAggregator(histogram));
datafeedConfig.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(histogram));
return datafeedConfig;
}

View File

@ -143,7 +143,7 @@ public class DataExtractorFactoryTests extends ESTestCase {
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time")));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
@ -162,7 +162,7 @@ public class DataExtractorFactoryTests extends ESTestCase {
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time")));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
@ -180,7 +180,7 @@ public class DataExtractorFactoryTests extends ESTestCase {
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time")));
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
@ -203,7 +203,7 @@ public class DataExtractorFactoryTests extends ESTestCase {
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField");
TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField);
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time")));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(RollupDataExtractorFactory.class)),
@ -223,7 +223,7 @@ public class DataExtractorFactoryTests extends ESTestCase {
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField");
TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField);
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time")));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)),
@ -263,7 +263,7 @@ public class DataExtractorFactoryTests extends ESTestCase {
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField");
TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField);
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time")));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> fail(),
@ -288,7 +288,7 @@ public class DataExtractorFactoryTests extends ESTestCase {
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField");
TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField);
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time")));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> fail(),
@ -312,7 +312,7 @@ public class DataExtractorFactoryTests extends ESTestCase {
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("otherField");
TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField);
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time")));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> fail(),

View File

@ -64,8 +64,8 @@ public class AggregationDataExtractorFactoryTests extends ESTestCase {
jobBuilder.setDataDescription(dataDescription);
jobBuilder.setAnalysisConfig(analysisConfig);
DatafeedConfig.Builder datafeedConfigBuilder = new DatafeedConfig.Builder("foo-feed", jobBuilder.getId());
datafeedConfigBuilder.setAggregations(aggs);
datafeedConfigBuilder.setParsedAggregations(aggs);
datafeedConfigBuilder.setIndices(Arrays.asList("my_index"));
return new AggregationDataExtractorFactory(client, datafeedConfigBuilder.build(), jobBuilder.build(new Date()));
}
}
}

View File

@ -91,8 +91,8 @@ public class ChunkedDataExtractorFactoryTests extends ESTestCase {
jobBuilder.setDataDescription(dataDescription);
jobBuilder.setAnalysisConfig(analysisConfig);
DatafeedConfig.Builder datafeedConfigBuilder = new DatafeedConfig.Builder("foo-feed", jobBuilder.getId());
datafeedConfigBuilder.setAggregations(aggs);
datafeedConfigBuilder.setParsedAggregations(aggs);
datafeedConfigBuilder.setIndices(Arrays.asList("my_index"));
return new ChunkedDataExtractorFactory(client, datafeedConfigBuilder.build(), jobBuilder.build(new Date()), dataExtractorFactory);
}
}
}

View File

@ -98,7 +98,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
HistogramAggregationBuilder histogramAggregation = AggregationBuilders.histogram("time").interval(60000)
.subAggregation(maxAggregation).field("time");
configBuilder.setAggregations(AggregatorFactories.builder().addAggregator(histogramAggregation));
configBuilder.setParsedAggregations(AggregatorFactories.builder().addAggregator(histogramAggregation));
configBuilder.setFrequency(TimeValue.timeValueMinutes(2));
DatafeedConfig config = configBuilder.build();
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);