ML refactor DatafeedsConfig(Update) so defaults are not populated in queries or aggs (#38822) (#39119)
* ML refactor DatafeedsConfig(Update) so defaults are not populated in queries or aggs * Addressing pr feedback
This commit is contained in:
parent
80540a2fcc
commit
109b6451fd
|
@ -5,6 +5,8 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.ml.datafeed;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.AbstractDiffable;
|
||||
|
@ -18,11 +20,9 @@ import org.elasticsearch.common.util.CachedSupplier;
|
|||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParseException;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.query.AbstractQueryBuilder;
|
||||
import org.elasticsearch.index.query.MatchAllQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
|
||||
|
@ -71,19 +71,12 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
|
|||
(objectMap, id, warnings) -> {
|
||||
try {
|
||||
return QUERY_TRANSFORMER.fromMap(objectMap, warnings);
|
||||
} catch (IOException | XContentParseException exception) {
|
||||
} catch (Exception exception) {
|
||||
// Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user
|
||||
if (exception.getCause() instanceof IllegalArgumentException) {
|
||||
throw ExceptionsHelper.badRequestException(
|
||||
Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT,
|
||||
id,
|
||||
exception.getCause().getMessage()),
|
||||
exception.getCause());
|
||||
} else {
|
||||
throw ExceptionsHelper.badRequestException(
|
||||
Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, exception, id),
|
||||
exception);
|
||||
exception = (Exception)exception.getCause();
|
||||
}
|
||||
throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, id), exception);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -92,22 +85,17 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
|
|||
(objectMap, id, warnings) -> {
|
||||
try {
|
||||
return AGG_TRANSFORMER.fromMap(objectMap, warnings);
|
||||
} catch (IOException | XContentParseException exception) {
|
||||
} catch (Exception exception) {
|
||||
// Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user
|
||||
if (exception.getCause() instanceof IllegalArgumentException) {
|
||||
throw ExceptionsHelper.badRequestException(
|
||||
Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT,
|
||||
id,
|
||||
exception.getCause().getMessage()),
|
||||
exception.getCause());
|
||||
} else {
|
||||
throw ExceptionsHelper.badRequestException(
|
||||
Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, exception.getMessage(), id),
|
||||
exception);
|
||||
exception = (Exception)exception.getCause();
|
||||
}
|
||||
throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id), exception);
|
||||
}
|
||||
};
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(DatafeedConfig.class);
|
||||
|
||||
// Used for QueryPage
|
||||
public static final ParseField RESULTS_FIELD = new ParseField("datafeeds");
|
||||
public static String TYPE = "datafeed";
|
||||
|
@ -164,15 +152,11 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
|
|||
builder.setQueryDelay(TimeValue.parseTimeValue(val, QUERY_DELAY.getPreferredName())), QUERY_DELAY);
|
||||
parser.declareString((builder, val) ->
|
||||
builder.setFrequency(TimeValue.parseTimeValue(val, FREQUENCY.getPreferredName())), FREQUENCY);
|
||||
if (ignoreUnknownFields) {
|
||||
parser.declareObject(Builder::setQuery, (p, c) -> p.mapOrdered(), QUERY);
|
||||
parser.declareObject(Builder::setAggregations, (p, c) -> p.mapOrdered(), AGGREGATIONS);
|
||||
parser.declareObject(Builder::setAggregations, (p, c) -> p.mapOrdered(), AGGS);
|
||||
} else {
|
||||
parser.declareObject(Builder::setParsedQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), QUERY);
|
||||
parser.declareObject(Builder::setParsedAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGREGATIONS);
|
||||
parser.declareObject(Builder::setParsedAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGS);
|
||||
}
|
||||
parser.declareObject((builder, val) -> builder.setQuery(val, ignoreUnknownFields), (p, c) -> p.mapOrdered(), QUERY);
|
||||
parser.declareObject((builder, val) -> builder.setAggregationsSafe(val, ignoreUnknownFields), (p, c) -> p.mapOrdered(),
|
||||
AGGREGATIONS);
|
||||
parser.declareObject((builder, val) -> builder.setAggregationsSafe(val, ignoreUnknownFields), (p, c) -> p.mapOrdered(),
|
||||
AGGS);
|
||||
parser.declareObject(Builder::setScriptFields, (p, c) -> {
|
||||
List<SearchSourceBuilder.ScriptField> parsedScriptFields = new ArrayList<>();
|
||||
while (p.nextToken() != XContentParser.Token.END_OBJECT) {
|
||||
|
@ -582,7 +566,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
|
|||
private TimeValue queryDelay;
|
||||
private TimeValue frequency;
|
||||
private List<String> indices = Collections.emptyList();
|
||||
private Map<String, Object> query;
|
||||
private Map<String, Object> query = Collections.singletonMap(MatchAllQueryBuilder.NAME, Collections.emptyMap());
|
||||
private Map<String, Object> aggregations;
|
||||
private List<SearchSourceBuilder.ScriptField> scriptFields;
|
||||
private Integer scrollSize = DEFAULT_SCROLL_SIZE;
|
||||
|
@ -590,11 +574,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
|
|||
private Map<String, String> headers = Collections.emptyMap();
|
||||
private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig();
|
||||
|
||||
public Builder() {
|
||||
try {
|
||||
this.query = QUERY_TRANSFORMER.toMap(QueryBuilders.matchAllQuery());
|
||||
} catch (IOException ex) { /*Should never happen*/ }
|
||||
}
|
||||
public Builder() { }
|
||||
|
||||
public Builder(String id, String jobId) {
|
||||
this();
|
||||
|
@ -647,48 +627,74 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
|
|||
this.frequency = frequency;
|
||||
}
|
||||
|
||||
public void setParsedQuery(QueryBuilder query) {
|
||||
public void setQuery(Map<String, Object> query) {
|
||||
setQuery(query, true);
|
||||
}
|
||||
|
||||
public void setQuery(Map<String, Object> query, boolean lenient) {
|
||||
this.query = ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName());
|
||||
try {
|
||||
setQuery(QUERY_TRANSFORMER.toMap(ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName())));
|
||||
} catch (IOException | XContentParseException exception) {
|
||||
if (exception.getCause() instanceof IllegalArgumentException) {
|
||||
// Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user
|
||||
throw ExceptionsHelper.badRequestException(
|
||||
Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT,
|
||||
id,
|
||||
exception.getCause().getMessage()),
|
||||
exception.getCause());
|
||||
QUERY_TRANSFORMER.fromMap(query);
|
||||
} catch(Exception ex) {
|
||||
String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, id);
|
||||
|
||||
if (ex.getCause() instanceof IllegalArgumentException) {
|
||||
ex = (Exception)ex.getCause();
|
||||
}
|
||||
|
||||
if (lenient) {
|
||||
logger.warn(msg, ex);
|
||||
} else {
|
||||
throw ExceptionsHelper.badRequestException(
|
||||
Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, id, exception.getMessage()), exception);
|
||||
throw ExceptionsHelper.badRequestException(msg, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void setQuery(Map<String, Object> query) {
|
||||
this.query = ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName());
|
||||
}
|
||||
|
||||
// Kept for easier testing
|
||||
public void setParsedAggregations(AggregatorFactories.Builder aggregations) {
|
||||
try {
|
||||
setAggregations(AGG_TRANSFORMER.toMap(aggregations));
|
||||
} catch (IOException | XContentParseException exception) {
|
||||
} catch (Exception exception) {
|
||||
// Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user
|
||||
if (exception.getCause() instanceof IllegalArgumentException) {
|
||||
throw ExceptionsHelper.badRequestException(
|
||||
Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT,
|
||||
id,
|
||||
exception.getCause().getMessage()),
|
||||
exception.getCause());
|
||||
} else {
|
||||
throw ExceptionsHelper.badRequestException(
|
||||
Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id, exception.getMessage()), exception);
|
||||
exception = (Exception)exception.getCause();
|
||||
}
|
||||
throw ExceptionsHelper.badRequestException(
|
||||
Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id), exception);
|
||||
}
|
||||
}
|
||||
|
||||
private void setAggregationsSafe(Map<String, Object> aggregations, boolean lenient) {
|
||||
if (this.aggregations != null) {
|
||||
throw ExceptionsHelper.badRequestException("Found two aggregation definitions: [aggs] and [aggregations]");
|
||||
}
|
||||
setAggregations(aggregations, lenient);
|
||||
}
|
||||
|
||||
void setAggregations(Map<String, Object> aggregations) {
|
||||
setAggregations(aggregations, true);
|
||||
}
|
||||
|
||||
void setAggregations(Map<String, Object> aggregations, boolean lenient) {
|
||||
this.aggregations = aggregations;
|
||||
try {
|
||||
if (aggregations != null && aggregations.isEmpty()) {
|
||||
throw new Exception("[aggregations] are empty");
|
||||
}
|
||||
AGG_TRANSFORMER.fromMap(aggregations);
|
||||
} catch (Exception ex) {
|
||||
String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id);
|
||||
|
||||
if (ex.getCause() instanceof IllegalArgumentException) {
|
||||
ex = (Exception)ex.getCause();
|
||||
}
|
||||
|
||||
if (lenient) {
|
||||
logger.warn(msg, ex);
|
||||
} else {
|
||||
throw ExceptionsHelper.badRequestException(msg, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void setScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
|
||||
|
|
|
@ -16,13 +16,12 @@ import org.elasticsearch.common.xcontent.ObjectParser;
|
|||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.query.AbstractQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -34,6 +33,11 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig.AGG_TRANSFORMER;
|
||||
import static org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig.QUERY_TRANSFORMER;
|
||||
import static org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig.lazyAggParser;
|
||||
import static org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig.lazyQueryParser;
|
||||
|
||||
/**
|
||||
* A datafeed update contains partial properties to update a {@link DatafeedConfig}.
|
||||
* The main difference between this class and {@link DatafeedConfig} is that here all
|
||||
|
@ -52,12 +56,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
|
|||
TimeValue.parseTimeValue(val, DatafeedConfig.QUERY_DELAY.getPreferredName())), DatafeedConfig.QUERY_DELAY);
|
||||
PARSER.declareString((builder, val) -> builder.setFrequency(
|
||||
TimeValue.parseTimeValue(val, DatafeedConfig.FREQUENCY.getPreferredName())), DatafeedConfig.FREQUENCY);
|
||||
PARSER.declareObject(Builder::setQuery,
|
||||
(p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), DatafeedConfig.QUERY);
|
||||
PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p),
|
||||
DatafeedConfig.AGGREGATIONS);
|
||||
PARSER.declareObject(Builder::setAggregations,(p, c) -> AggregatorFactories.parseAggregators(p),
|
||||
DatafeedConfig.AGGS);
|
||||
PARSER.declareObject(Builder::setQuery, (p, c) -> p.mapOrdered(), DatafeedConfig.QUERY);
|
||||
PARSER.declareObject(Builder::setAggregationsSafe, (p, c) -> p.mapOrdered(), DatafeedConfig.AGGREGATIONS);
|
||||
PARSER.declareObject(Builder::setAggregationsSafe,(p, c) -> p.mapOrdered(), DatafeedConfig.AGGS);
|
||||
PARSER.declareObject(Builder::setScriptFields, (p, c) -> {
|
||||
List<SearchSourceBuilder.ScriptField> parsedScriptFields = new ArrayList<>();
|
||||
while (p.nextToken() != XContentParser.Token.END_OBJECT) {
|
||||
|
@ -78,16 +79,16 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
|
|||
private final TimeValue queryDelay;
|
||||
private final TimeValue frequency;
|
||||
private final List<String> indices;
|
||||
private final QueryBuilder query;
|
||||
private final AggregatorFactories.Builder aggregations;
|
||||
private final Map<String, Object> query;
|
||||
private final Map<String, Object> aggregations;
|
||||
private final List<SearchSourceBuilder.ScriptField> scriptFields;
|
||||
private final Integer scrollSize;
|
||||
private final ChunkingConfig chunkingConfig;
|
||||
private final DelayedDataCheckConfig delayedDataCheckConfig;
|
||||
|
||||
private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, QueryBuilder query,
|
||||
AggregatorFactories.Builder aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
|
||||
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
|
||||
private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices,
|
||||
Map<String, Object> query, Map<String, Object> aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
|
||||
Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
|
||||
this.id = id;
|
||||
this.jobId = jobId;
|
||||
this.queryDelay = queryDelay;
|
||||
|
@ -117,8 +118,17 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
|
|||
in.readStringList();
|
||||
}
|
||||
}
|
||||
this.query = in.readOptionalNamedWriteable(QueryBuilder.class);
|
||||
this.aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new);
|
||||
if (in.getVersion().before(Version.V_7_1_0)) {
|
||||
this.query = QUERY_TRANSFORMER.toMap(in.readOptionalNamedWriteable(QueryBuilder.class));
|
||||
this.aggregations = AGG_TRANSFORMER.toMap(in.readOptionalWriteable(AggregatorFactories.Builder::new));
|
||||
} else {
|
||||
this.query = in.readMap();
|
||||
if (in.readBoolean()) {
|
||||
this.aggregations = in.readMap();
|
||||
} else {
|
||||
this.aggregations = null;
|
||||
}
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
this.scriptFields = in.readList(SearchSourceBuilder.ScriptField::new);
|
||||
} else {
|
||||
|
@ -158,8 +168,16 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
|
|||
out.writeBoolean(true);
|
||||
out.writeStringCollection(Collections.emptyList());
|
||||
}
|
||||
out.writeOptionalNamedWriteable(query);
|
||||
out.writeOptionalWriteable(aggregations);
|
||||
if (out.getVersion().before(Version.V_7_1_0)) {
|
||||
out.writeOptionalNamedWriteable(lazyQueryParser.apply(query, id, new ArrayList<>()));
|
||||
out.writeOptionalWriteable(lazyAggParser.apply(aggregations, id, new ArrayList<>()));
|
||||
} else {
|
||||
out.writeMap(query);
|
||||
out.writeBoolean(aggregations != null);
|
||||
if (aggregations != null) {
|
||||
out.writeMap(aggregations);
|
||||
}
|
||||
}
|
||||
if (scriptFields != null) {
|
||||
out.writeBoolean(true);
|
||||
out.writeList(scriptFields);
|
||||
|
@ -227,27 +245,20 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
|
|||
return scrollSize;
|
||||
}
|
||||
|
||||
QueryBuilder getQuery() {
|
||||
Map<String, Object> getQuery() {
|
||||
return query;
|
||||
}
|
||||
|
||||
AggregatorFactories.Builder getAggregations() {
|
||||
Map<String, Object> getAggregations() {
|
||||
return aggregations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the histogram's interval as epoch millis.
|
||||
*/
|
||||
long getHistogramIntervalMillis() {
|
||||
return ExtractorUtils.getHistogramIntervalMillis(aggregations);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@code true} when there are non-empty aggregations, {@code false}
|
||||
* otherwise
|
||||
*/
|
||||
boolean hasAggregations() {
|
||||
return aggregations != null && aggregations.count() > 0;
|
||||
return aggregations != null && aggregations.size() > 0;
|
||||
}
|
||||
|
||||
List<SearchSourceBuilder.ScriptField> getScriptFields() {
|
||||
|
@ -285,11 +296,11 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
|
|||
builder.setIndices(indices);
|
||||
}
|
||||
if (query != null) {
|
||||
builder.setParsedQuery(query);
|
||||
builder.setQuery(query);
|
||||
}
|
||||
if (aggregations != null) {
|
||||
DatafeedConfig.validateAggregations(aggregations);
|
||||
builder.setParsedAggregations(aggregations);
|
||||
DatafeedConfig.validateAggregations(lazyAggParser.apply(aggregations, id, new ArrayList<>()));
|
||||
builder.setAggregations(aggregations);
|
||||
}
|
||||
if (scriptFields != null) {
|
||||
builder.setScriptFields(scriptFields);
|
||||
|
@ -360,9 +371,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
|
|||
return (frequency == null || Objects.equals(frequency, datafeed.getFrequency()))
|
||||
&& (queryDelay == null || Objects.equals(queryDelay, datafeed.getQueryDelay()))
|
||||
&& (indices == null || Objects.equals(indices, datafeed.getIndices()))
|
||||
&& (query == null || Objects.equals(query, datafeed.getParsedQuery()))
|
||||
&& (query == null || Objects.equals(query, datafeed.getQuery()))
|
||||
&& (scrollSize == null || Objects.equals(scrollSize, datafeed.getQueryDelay()))
|
||||
&& (aggregations == null || Objects.equals(aggregations, datafeed.getParsedAggregations()))
|
||||
&& (aggregations == null || Objects.equals(aggregations, datafeed.getAggregations()))
|
||||
&& (scriptFields == null || Objects.equals(scriptFields, datafeed.getScriptFields()))
|
||||
&& (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig()))
|
||||
&& (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig()));
|
||||
|
@ -375,8 +386,8 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
|
|||
private TimeValue queryDelay;
|
||||
private TimeValue frequency;
|
||||
private List<String> indices;
|
||||
private QueryBuilder query;
|
||||
private AggregatorFactories.Builder aggregations;
|
||||
private Map<String, Object> query;
|
||||
private Map<String, Object> aggregations;
|
||||
private List<SearchSourceBuilder.ScriptField> scriptFields;
|
||||
private Integer scrollSize;
|
||||
private ChunkingConfig chunkingConfig;
|
||||
|
@ -423,12 +434,42 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
|
|||
this.frequency = frequency;
|
||||
}
|
||||
|
||||
public void setQuery(QueryBuilder query) {
|
||||
public void setQuery(Map<String, Object> query) {
|
||||
this.query = query;
|
||||
try {
|
||||
QUERY_TRANSFORMER.fromMap(query);
|
||||
} catch(Exception ex) {
|
||||
String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, id);
|
||||
|
||||
if (ex.getCause() instanceof IllegalArgumentException) {
|
||||
ex = (Exception)ex.getCause();
|
||||
}
|
||||
throw ExceptionsHelper.badRequestException(msg, ex);
|
||||
}
|
||||
}
|
||||
|
||||
public void setAggregations(AggregatorFactories.Builder aggregations) {
|
||||
private void setAggregationsSafe(Map<String, Object> aggregations) {
|
||||
if (this.aggregations != null) {
|
||||
throw ExceptionsHelper.badRequestException("Found two aggregation definitions: [aggs] and [aggregations]");
|
||||
}
|
||||
setAggregations(aggregations);
|
||||
}
|
||||
|
||||
public void setAggregations(Map<String, Object> aggregations) {
|
||||
this.aggregations = aggregations;
|
||||
try {
|
||||
if (aggregations != null && aggregations.isEmpty()) {
|
||||
throw new Exception("[aggregations] are empty");
|
||||
}
|
||||
AGG_TRANSFORMER.fromMap(aggregations);
|
||||
} catch(Exception ex) {
|
||||
String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id);
|
||||
|
||||
if (ex.getCause() instanceof IllegalArgumentException) {
|
||||
ex = (Exception)ex.getCause();
|
||||
}
|
||||
throw ExceptionsHelper.badRequestException(msg, ex);
|
||||
}
|
||||
}
|
||||
|
||||
public void setScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
|
||||
|
|
|
@ -26,8 +26,8 @@ public final class Messages {
|
|||
"delayed_data_check_config: check_window [{0}] must be greater than the bucket_span [{1}]";
|
||||
public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS =
|
||||
"delayed_data_check_config: check_window [{0}] must be less than 10,000x the bucket_span [{1}]";
|
||||
public static final String DATAFEED_CONFIG_QUERY_BAD_FORMAT = "Datafeed [{0}] query is not parsable: {1}";
|
||||
public static final String DATAFEED_CONFIG_AGG_BAD_FORMAT = "Datafeed [{0}] aggregations are not parsable: {1}";
|
||||
public static final String DATAFEED_CONFIG_QUERY_BAD_FORMAT = "Datafeed [{0}] query is not parsable";
|
||||
public static final String DATAFEED_CONFIG_AGG_BAD_FORMAT = "Datafeed [{0}] aggregations are not parsable";
|
||||
|
||||
public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency";
|
||||
public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists";
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.ml.datafeed;
|
|||
|
||||
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
|
@ -26,9 +27,8 @@ import org.elasticsearch.common.xcontent.XContentParseException;
|
|||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.query.TermQueryBuilder;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
|
@ -58,6 +58,8 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig.QUERY_TRANSFORMER;
|
||||
import static org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig.lazyQueryParser;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
|
@ -89,7 +91,8 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(randomValidDatafeedId(), jobId);
|
||||
builder.setIndices(randomStringList(1, 10));
|
||||
if (randomBoolean()) {
|
||||
builder.setParsedQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)));
|
||||
builder.setQuery(Collections.singletonMap(TermQueryBuilder.NAME,
|
||||
Collections.singletonMap(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))));
|
||||
}
|
||||
boolean addScriptFields = randomBoolean();
|
||||
if (addScriptFields) {
|
||||
|
@ -214,6 +217,41 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
" }\n" +
|
||||
"}";
|
||||
|
||||
private static final String MULTIPLE_AGG_DEF_DATAFEED = "{\n" +
|
||||
" \"datafeed_id\": \"farequote-datafeed\",\n" +
|
||||
" \"job_id\": \"farequote\",\n" +
|
||||
" \"frequency\": \"1h\",\n" +
|
||||
" \"indices\": [\"farequote1\", \"farequote2\"],\n" +
|
||||
" \"aggregations\": {\n" +
|
||||
" \"buckets\": {\n" +
|
||||
" \"date_histogram\": {\n" +
|
||||
" \"field\": \"time\",\n" +
|
||||
" \"interval\": \"360s\",\n" +
|
||||
" \"time_zone\": \"UTC\"\n" +
|
||||
" },\n" +
|
||||
" \"aggregations\": {\n" +
|
||||
" \"time\": {\n" +
|
||||
" \"max\": {\"field\": \"time\"}\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }," +
|
||||
" \"aggs\": {\n" +
|
||||
" \"buckets2\": {\n" +
|
||||
" \"date_histogram\": {\n" +
|
||||
" \"field\": \"time\",\n" +
|
||||
" \"interval\": \"360s\",\n" +
|
||||
" \"time_zone\": \"UTC\"\n" +
|
||||
" },\n" +
|
||||
" \"aggregations\": {\n" +
|
||||
" \"time\": {\n" +
|
||||
" \"max\": {\"field\": \"time\"}\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
"}";
|
||||
|
||||
public void testFutureConfigParse() throws IOException {
|
||||
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
|
||||
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, FUTURE_DATAFEED);
|
||||
|
@ -228,7 +266,8 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
|
||||
DatafeedConfig config = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build();
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> config.getParsedQuery());
|
||||
assertEquals("[match] query doesn't support multiple fields, found [query] and [type]", e.getMessage());
|
||||
assertNotNull(e.getCause());
|
||||
assertEquals("[match] query doesn't support multiple fields, found [query] and [type]", e.getCause().getMessage());
|
||||
}
|
||||
|
||||
try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
|
||||
|
@ -236,7 +275,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
|
||||
XContentParseException e = expectThrows(XContentParseException.class,
|
||||
() -> DatafeedConfig.STRICT_PARSER.apply(parser, null).build());
|
||||
assertEquals("[6:25] [datafeed_config] failed to parse field [query]", e.getMessage());
|
||||
assertEquals("[6:64] [datafeed_config] failed to parse field [query]", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -246,9 +285,10 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
|
||||
DatafeedConfig.Builder configBuilder = DatafeedConfig.LENIENT_PARSER.apply(parser, null);
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> configBuilder.build());
|
||||
assertNotNull(e.getCause());
|
||||
assertEquals(
|
||||
"Datafeed [farequote-datafeed] aggregations are not parsable: [size] must be greater than 0. Found [0] in [airline]",
|
||||
e.getMessage());
|
||||
"[size] must be greater than 0. Found [0] in [airline]",
|
||||
e.getCause().getMessage());
|
||||
}
|
||||
|
||||
try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
|
||||
|
@ -256,7 +296,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
|
||||
XContentParseException e = expectThrows(XContentParseException.class,
|
||||
() -> DatafeedConfig.STRICT_PARSER.apply(parser, null).build());
|
||||
assertEquals("[8:25] [datafeed_config] failed to parse field [aggregations]", e.getMessage());
|
||||
assertEquals("[25:3] [datafeed_config] failed to parse field [aggregations]", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -267,6 +307,25 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
assertNotNull(DatafeedConfig.LENIENT_PARSER.apply(parser, null).build());
|
||||
}
|
||||
|
||||
public void testMultipleDefinedAggParse() throws IOException {
|
||||
try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
|
||||
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, MULTIPLE_AGG_DEF_DATAFEED)) {
|
||||
XContentParseException ex = expectThrows(XContentParseException.class,
|
||||
() -> DatafeedConfig.LENIENT_PARSER.apply(parser, null));
|
||||
assertThat(ex.getMessage(), equalTo("[32:3] [datafeed_config] failed to parse field [aggs]"));
|
||||
assertNotNull(ex.getCause());
|
||||
assertThat(ex.getCause().getMessage(), equalTo("Found two aggregation definitions: [aggs] and [aggregations]"));
|
||||
}
|
||||
try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
|
||||
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, MULTIPLE_AGG_DEF_DATAFEED)) {
|
||||
XContentParseException ex = expectThrows(XContentParseException.class,
|
||||
() -> DatafeedConfig.STRICT_PARSER.apply(parser, null));
|
||||
assertThat(ex.getMessage(), equalTo("[32:3] [datafeed_config] failed to parse field [aggs]"));
|
||||
assertNotNull(ex.getCause());
|
||||
assertThat(ex.getCause().getMessage(), equalTo("Found two aggregation definitions: [aggs] and [aggregations]"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testToXContentForInternalStorage() throws IOException {
|
||||
DatafeedConfig.Builder builder = createRandomizedDatafeedConfigBuilder("foo", 300);
|
||||
|
||||
|
@ -443,7 +502,8 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build);
|
||||
|
||||
assertThat(e.getMessage(), containsString("[interval] must be >0 for histogram aggregation [time]"));
|
||||
assertNotNull(e.getCause());
|
||||
assertThat(e.getCause().getMessage(), containsString("[interval] must be >0 for histogram aggregation [time]"));
|
||||
}
|
||||
|
||||
public void testBuild_GivenDateHistogramWithInvalidTimeZone() {
|
||||
|
@ -636,7 +696,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
|
||||
DatafeedConfig spiedConfig = spy(datafeed);
|
||||
spiedConfig.getQueryDeprecations();
|
||||
verify(spiedConfig).getQueryDeprecations(DatafeedConfig.lazyQueryParser);
|
||||
verify(spiedConfig).getQueryDeprecations(lazyQueryParser);
|
||||
}
|
||||
|
||||
public void testSerializationOfComplexAggs() throws IOException {
|
||||
|
@ -656,9 +716,11 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
.subAggregation(derivativePipelineAggregationBuilder)
|
||||
.subAggregation(bucketScriptPipelineAggregationBuilder);
|
||||
DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilderWithDateHistogram(dateHistogram);
|
||||
QueryBuilder terms =
|
||||
new BoolQueryBuilder().filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
|
||||
datafeedConfigBuilder.setParsedQuery(terms);
|
||||
Map<String, Object> terms = Collections.singletonMap(BoolQueryBuilder.NAME,
|
||||
Collections.singletonMap("filter",
|
||||
Collections.singletonMap(TermQueryBuilder.NAME,
|
||||
Collections.singletonMap(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)))));
|
||||
datafeedConfigBuilder.setQuery(terms);
|
||||
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
|
||||
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder().addAggregator(dateHistogram);
|
||||
|
||||
|
@ -675,7 +737,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
|
||||
// Assert that the parsed versions of our aggs and queries work as well
|
||||
assertEquals(aggBuilder, parsedDatafeedConfig.getParsedAggregations());
|
||||
assertEquals(terms, parsedDatafeedConfig.getParsedQuery());
|
||||
assertEquals(terms, parsedDatafeedConfig.getQuery());
|
||||
|
||||
try(BytesStreamOutput output = new BytesStreamOutput()) {
|
||||
datafeedConfig.writeTo(output);
|
||||
|
@ -685,7 +747,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
|
||||
// Assert that the parsed versions of our aggs and queries work as well
|
||||
assertEquals(aggBuilder, streamedDatafeedConfig.getParsedAggregations());
|
||||
assertEquals(terms, streamedDatafeedConfig.getParsedQuery());
|
||||
assertEquals(terms, streamedDatafeedConfig.getQuery());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -707,9 +769,15 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
.subAggregation(derivativePipelineAggregationBuilder)
|
||||
.subAggregation(bucketScriptPipelineAggregationBuilder);
|
||||
DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilderWithDateHistogram(dateHistogram);
|
||||
QueryBuilder terms =
|
||||
new BoolQueryBuilder().filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
|
||||
datafeedConfigBuilder.setParsedQuery(terms);
|
||||
Map<String, Object> terms = Collections.singletonMap(BoolQueryBuilder.NAME,
|
||||
Collections.singletonMap("filter",
|
||||
Collections.singletonList(
|
||||
Collections.singletonMap(TermQueryBuilder.NAME,
|
||||
Collections.singletonMap(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))))));
|
||||
// So equality check between the streamed and current passes
|
||||
// Streamed DatafeedConfigs when they are before 6.6.0 require a parsed object for aggs and queries, consequently all the default
|
||||
// values are added between them
|
||||
datafeedConfigBuilder.setQuery(QUERY_TRANSFORMER.toMap(QUERY_TRANSFORMER.fromMap(terms)));
|
||||
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
|
||||
|
||||
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
|
||||
|
@ -726,7 +794,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
// Assert that the parsed versions of our aggs and queries work as well
|
||||
assertEquals(new AggregatorFactories.Builder().addAggregator(dateHistogram),
|
||||
streamedDatafeedConfig.getParsedAggregations());
|
||||
assertEquals(terms, streamedDatafeedConfig.getParsedQuery());
|
||||
assertEquals(datafeedConfig.getParsedQuery(), streamedDatafeedConfig.getParsedQuery());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -738,6 +806,22 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
}
|
||||
}
|
||||
|
||||
public void testEmptyQueryMap() {
|
||||
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("empty_query_map", "job1");
|
||||
ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> builder.setQuery(Collections.emptyMap(), false));
|
||||
assertThat(ex.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(ex.getMessage(), equalTo("Datafeed [empty_query_map] query is not parsable"));
|
||||
}
|
||||
|
||||
public void testEmptyAggMap() {
|
||||
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("empty_agg_map", "job1");
|
||||
ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> builder.setAggregations(Collections.emptyMap(), false));
|
||||
assertThat(ex.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(ex.getMessage(), equalTo("Datafeed [empty_agg_map] aggregations are not parsable"));
|
||||
}
|
||||
|
||||
public static String randomValidDatafeedId() {
|
||||
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
|
||||
return generator.ofCodePointsLength(random(), 10, 10);
|
||||
|
@ -800,12 +884,14 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
|||
builder.setIndices(indices);
|
||||
break;
|
||||
case 5:
|
||||
BoolQueryBuilder query = new BoolQueryBuilder();
|
||||
if (instance.getParsedQuery() != null) {
|
||||
query.must(instance.getParsedQuery());
|
||||
Map<String, Object> query = new HashMap<>();
|
||||
if (instance.getQuery() != null) {
|
||||
query.put("must", instance.getQuery());
|
||||
}
|
||||
query.filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
|
||||
builder.setParsedQuery(query);
|
||||
query.put("filter", Collections.singletonList(
|
||||
Collections.singletonMap(TermQueryBuilder.NAME,
|
||||
Collections.singletonMap(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)))));
|
||||
builder.setQuery(query);
|
||||
break;
|
||||
case 6:
|
||||
if (instance.hasAggregations()) {
|
||||
|
|
|
@ -5,31 +5,34 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.ml.datafeed;
|
||||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParseException;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.query.TermQueryBuilder;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig.Mode;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTests;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
@ -61,7 +64,8 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
|
|||
builder.setIndices(DatafeedConfigTests.randomStringList(1, 10));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
builder.setQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)));
|
||||
builder.setQuery(Collections.singletonMap(TermQueryBuilder.NAME,
|
||||
Collections.singletonMap(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
int scriptsSize = randomInt(3);
|
||||
|
@ -75,10 +79,9 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
|
|||
if (randomBoolean() && datafeed == null) {
|
||||
// can only test with a single agg as the xcontent order gets randomized by test base class and then
|
||||
// the actual xcontent isn't the same and test fail.
|
||||
// Testing with a single agg is ok as we don't have special list writeable / xconent logic
|
||||
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
|
||||
aggs.addAggregator(AggregationBuilders.avg(randomAlphaOfLength(10)).field(randomAlphaOfLength(10)));
|
||||
builder.setAggregations(aggs);
|
||||
// Testing with a single agg is ok as we don't have special list writeable / xcontent logic
|
||||
builder.setAggregations(Collections.singletonMap(randomAlphaOfLength(10),
|
||||
Collections.singletonMap("avg", Collections.singletonMap("field", randomAlphaOfLength(10)))));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
|
||||
|
@ -114,6 +117,52 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
|
|||
return new NamedXContentRegistry(searchModule.getNamedXContents());
|
||||
}
|
||||
|
||||
private static final String MULTIPLE_AGG_DEF_DATAFEED = "{\n" +
|
||||
" \"datafeed_id\": \"farequote-datafeed\",\n" +
|
||||
" \"job_id\": \"farequote\",\n" +
|
||||
" \"frequency\": \"1h\",\n" +
|
||||
" \"indices\": [\"farequote1\", \"farequote2\"],\n" +
|
||||
" \"aggregations\": {\n" +
|
||||
" \"buckets\": {\n" +
|
||||
" \"date_histogram\": {\n" +
|
||||
" \"field\": \"time\",\n" +
|
||||
" \"interval\": \"360s\",\n" +
|
||||
" \"time_zone\": \"UTC\"\n" +
|
||||
" },\n" +
|
||||
" \"aggregations\": {\n" +
|
||||
" \"time\": {\n" +
|
||||
" \"max\": {\"field\": \"time\"}\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }," +
|
||||
" \"aggs\": {\n" +
|
||||
" \"buckets2\": {\n" +
|
||||
" \"date_histogram\": {\n" +
|
||||
" \"field\": \"time\",\n" +
|
||||
" \"interval\": \"360s\",\n" +
|
||||
" \"time_zone\": \"UTC\"\n" +
|
||||
" },\n" +
|
||||
" \"aggregations\": {\n" +
|
||||
" \"time\": {\n" +
|
||||
" \"max\": {\"field\": \"time\"}\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
"}";
|
||||
|
||||
public void testMultipleDefinedAggParse() throws IOException {
|
||||
try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
|
||||
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, MULTIPLE_AGG_DEF_DATAFEED)) {
|
||||
XContentParseException ex = expectThrows(XContentParseException.class,
|
||||
() -> DatafeedUpdate.PARSER.apply(parser, null));
|
||||
assertThat(ex.getMessage(), equalTo("[32:3] [datafeed_update] failed to parse field [aggs]"));
|
||||
assertNotNull(ex.getCause());
|
||||
assertThat(ex.getCause().getMessage(), equalTo("Found two aggregation definitions: [aggs] and [aggregations]"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testApply_failBecauseTargetDatafeedHasDifferentId() {
|
||||
DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo");
|
||||
expectThrows(IllegalArgumentException.class, () -> createRandomized(datafeed.getId() + "_2").apply(datafeed, null));
|
||||
|
@ -149,7 +198,7 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
|
|||
update.setIndices(Collections.singletonList("i_2"));
|
||||
update.setQueryDelay(TimeValue.timeValueSeconds(42));
|
||||
update.setFrequency(TimeValue.timeValueSeconds(142));
|
||||
update.setQuery(QueryBuilders.termQuery("a", "b"));
|
||||
update.setQuery(Collections.singletonMap(TermQueryBuilder.NAME, Collections.singletonMap("a", "b")));
|
||||
update.setScriptFields(Collections.singletonList(new SearchSourceBuilder.ScriptField("a", mockScript("b"), false)));
|
||||
update.setScrollSize(8000);
|
||||
update.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1)));
|
||||
|
@ -161,7 +210,8 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
|
|||
assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_2")));
|
||||
assertThat(updatedDatafeed.getQueryDelay(), equalTo(TimeValue.timeValueSeconds(42)));
|
||||
assertThat(updatedDatafeed.getFrequency(), equalTo(TimeValue.timeValueSeconds(142)));
|
||||
assertThat(updatedDatafeed.getParsedQuery(), equalTo(QueryBuilders.termQuery("a", "b")));
|
||||
assertThat(updatedDatafeed.getQuery(),
|
||||
equalTo(Collections.singletonMap(TermQueryBuilder.NAME, Collections.singletonMap("a", "b"))));
|
||||
assertThat(updatedDatafeed.hasAggregations(), is(false));
|
||||
assertThat(updatedDatafeed.getScriptFields(),
|
||||
equalTo(Collections.singletonList(new SearchSourceBuilder.ScriptField("a", mockScript("b"), false))));
|
||||
|
@ -177,16 +227,21 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
|
|||
DatafeedConfig datafeed = datafeedBuilder.build();
|
||||
|
||||
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeed.getId());
|
||||
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
|
||||
update.setAggregations(new AggregatorFactories.Builder().addAggregator(
|
||||
AggregationBuilders.histogram("a").interval(300000).field("time").subAggregation(maxTime)));
|
||||
Map<String, Object> maxTime = Collections.singletonMap("time",
|
||||
Collections.singletonMap("max", Collections.singletonMap("field", "time")));
|
||||
Map<String, Object> histoDefinition = new HashMap<>();
|
||||
histoDefinition.put("interval", 300000);
|
||||
histoDefinition.put("field", "time");
|
||||
Map<String, Object> aggBody = new HashMap<>();
|
||||
aggBody.put("histogram", histoDefinition);
|
||||
aggBody.put("aggs", maxTime);
|
||||
Map<String, Object> aggMap = Collections.singletonMap("a", aggBody);
|
||||
update.setAggregations(aggMap);
|
||||
|
||||
DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap());
|
||||
|
||||
assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_1")));
|
||||
assertThat(updatedDatafeed.getParsedAggregations(),
|
||||
equalTo(new AggregatorFactories.Builder().addAggregator(
|
||||
AggregationBuilders.histogram("a").interval(300000).field("time").subAggregation(maxTime))));
|
||||
assertThat(updatedDatafeed.getAggregations(), equalTo(aggMap));
|
||||
}
|
||||
|
||||
public void testApply_GivenRandomUpdates_AssertImmutability() {
|
||||
|
@ -208,6 +263,22 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
|
|||
}
|
||||
}
|
||||
|
||||
public void testEmptyQueryMap() {
|
||||
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder("empty_query_map");
|
||||
ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> builder.setQuery(Collections.emptyMap()));
|
||||
assertThat(ex.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(ex.getMessage(), equalTo("Datafeed [empty_query_map] query is not parsable"));
|
||||
}
|
||||
|
||||
public void testEmptyAggMap() {
|
||||
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder("empty_agg_map");
|
||||
ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> builder.setAggregations(Collections.emptyMap()));
|
||||
assertThat(ex.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(ex.getMessage(), equalTo("Datafeed [empty_agg_map] aggregations are not parsable"));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) {
|
||||
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(instance);
|
||||
|
@ -243,22 +314,31 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
|
|||
builder.setIndices(indices);
|
||||
break;
|
||||
case 5:
|
||||
BoolQueryBuilder query = new BoolQueryBuilder();
|
||||
Map<String, Object> boolQuery = new HashMap<>();
|
||||
if (instance.getQuery() != null) {
|
||||
query.must(instance.getQuery());
|
||||
boolQuery.put("must", instance.getQuery());
|
||||
}
|
||||
query.filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
|
||||
builder.setQuery(query);
|
||||
boolQuery.put("filter",
|
||||
Collections.singletonList(
|
||||
Collections.singletonMap(TermQueryBuilder.NAME,
|
||||
Collections.singletonMap(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)))));
|
||||
builder.setQuery(Collections.singletonMap("bool", boolQuery));
|
||||
break;
|
||||
case 6:
|
||||
if (instance.hasAggregations()) {
|
||||
builder.setAggregations(null);
|
||||
} else {
|
||||
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
|
||||
String timeField = randomAlphaOfLength(10);
|
||||
aggBuilder.addAggregator(new DateHistogramAggregationBuilder(timeField).field(timeField).interval(between(10000, 3600000))
|
||||
.subAggregation(new MaxAggregationBuilder(timeField).field(timeField)));
|
||||
builder.setAggregations(aggBuilder);
|
||||
Map<String, Object> maxTime = Collections.singletonMap(timeField,
|
||||
Collections.singletonMap("max", Collections.singletonMap("field", timeField)));
|
||||
Map<String, Object> histoDefinition = new HashMap<>();
|
||||
histoDefinition.put("interval", between(10000, 3600000));
|
||||
histoDefinition.put("field", timeField);
|
||||
Map<String, Object> aggBody = new HashMap<>();
|
||||
aggBody.put("aggs", maxTime);
|
||||
aggBody.put("date_histogram", histoDefinition);
|
||||
Map<String, Object> aggMap = Collections.singletonMap(timeField, aggBody);
|
||||
builder.setAggregations(aggMap);
|
||||
if (instance.getScriptFields().isEmpty() == false) {
|
||||
builder.setScriptFields(Collections.emptyList());
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ public class MlDeprecationChecksTests extends ESTestCase {
|
|||
public void testCheckDataFeedQuery() {
|
||||
DatafeedConfig.Builder goodDatafeed = new DatafeedConfig.Builder("good-df", "job-id");
|
||||
goodDatafeed.setIndices(Collections.singletonList("some-index"));
|
||||
goodDatafeed.setParsedQuery(new TermQueryBuilder("foo", "bar"));
|
||||
goodDatafeed.setQuery(Collections.singletonMap(TermQueryBuilder.NAME, Collections.singletonMap("foo", "bar")));
|
||||
assertNull(MlDeprecationChecks.checkDataFeedQuery(goodDatafeed.build()));
|
||||
|
||||
DatafeedConfig.Builder deprecatedDatafeed = new DatafeedConfig.Builder("df-with-deprecated-query", "job-id");
|
||||
|
|
|
@ -159,7 +159,9 @@ public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase {
|
|||
.subAggregation(avgAggregationBuilder)
|
||||
.field("time")
|
||||
.interval(TimeValue.timeValueMinutes(5).millis())));
|
||||
datafeedConfigBuilder.setParsedQuery(new RangeQueryBuilder("value").gte(numDocs/2));
|
||||
datafeedConfigBuilder.setQuery(Collections.singletonMap(RangeQueryBuilder.NAME,
|
||||
Collections.singletonMap("value",
|
||||
Collections.singletonMap(RangeQueryBuilder.GTE_FIELD.getPreferredName(), numDocs/2))));
|
||||
datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5));
|
||||
datafeedConfigBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(12)));
|
||||
|
||||
|
|
|
@ -356,9 +356,9 @@ setup:
|
|||
datafeed_id: test-datafeed-aggs-1
|
||||
- match: { datafeeds.0.datafeed_id: "test-datafeed-aggs-1" }
|
||||
- match: { datafeeds.0.aggregations.histogram_buckets.date_histogram.field: "@timestamp" }
|
||||
- match: { datafeeds.0.aggregations.histogram_buckets.aggregations.@timestamp.max.field: "@timestamp" }
|
||||
- match: { datafeeds.0.aggregations.histogram_buckets.aggregations.bytes_in_avg.avg.field: "system.network.in.bytes" }
|
||||
- match: { datafeeds.0.aggregations.histogram_buckets.aggregations.non_negative_bytes.bucket_script.buckets_path.bytes: "bytes_in_derivative" }
|
||||
- match: { datafeeds.0.aggregations.histogram_buckets.aggs.@timestamp.max.field: "@timestamp" }
|
||||
- match: { datafeeds.0.aggregations.histogram_buckets.aggs.bytes_in_avg.avg.field: "system.network.in.bytes" }
|
||||
- match: { datafeeds.0.aggregations.histogram_buckets.aggs.non_negative_bytes.bucket_script.buckets_path.bytes: "bytes_in_derivative" }
|
||||
|
||||
---
|
||||
"Test delete datafeed":
|
||||
|
|
Loading…
Reference in New Issue