From 3e494070bfd3338471258a52899d75875a27ec00 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 21 Dec 2016 10:57:55 +0000 Subject: [PATCH] Strict parse search parts of schedulerConfig (elastic/elasticsearch#600) * Strict parse search parts of schedulerConfig This commit adds methods to build the typed objects for the search parts of a scheduler config. Those are: query, aggregations and script_fields. As scheduler configs are stored in the cluster state and parsing the search parts requires a SearchRequestParsers object, we cannot store them as typed fields. Instead, they are now stored as BytesReferences. This change is in preparation for switching over to using a client based data extractor. Point summary of changes: - query, aggregations and scriptFields are now stored as BytesReference - adds methods to build the corresponding typed objects - putting a scheduler now builds the search parts to to validate that the config is valid Relates to elastic/elasticsearch#478 Original commit: elastic/x-pack-elasticsearch@e6d5a85871adc1886c27b9b8579e313c18a13649 --- .../xpack/prelert/PrelertPlugin.java | 2 +- .../prelert/action/PutSchedulerAction.java | 9 +- .../prelert/job/metadata/PrelertMetadata.java | 9 +- .../writer/AggregatedJsonRecordReader.java | 204 ------------ .../schedulers/RestStartSchedulerAction.java | 7 +- .../prelert/scheduler/ScheduledJobRunner.java | 1 - .../scheduler/ScheduledJobValidator.java | 4 +- .../xpack/prelert/scheduler/Scheduler.java | 2 +- .../prelert/scheduler/SchedulerConfig.java | 303 ++++++++---------- .../http/ElasticsearchQueryBuilder.java | 2 +- .../http/HttpDataExtractorFactory.java | 58 ++-- .../xpack/prelert/utils/ExceptionsHelper.java | 6 + .../GetSchedulersActionResponseTests.java | 20 +- .../StopSchedulerActionRequestTests.java | 6 +- .../xpack/prelert/job/JobTests.java | 4 +- .../job/metadata/PrelertMetadataTests.java | 35 +- .../AggregatedJsonRecordReaderTests.java | 250 --------------- .../RestStartJobSchedulerActionTests.java | 6 +- .../scheduler/ScheduledJobRunnerTests.java | 20 +- .../scheduler/ScheduledJobValidatorTests.java | 31 +- .../scheduler/SchedulerConfigTests.java | 157 +++------ .../prelert/scheduler/SchedulerTests.java | 30 ++ .../http/ElasticsearchDataExtractorTests.java | 2 +- .../http/ElasticsearchQueryBuilderTests.java | 18 +- .../rest-api-spec/test/schedulers_crud.yaml | 14 + 25 files changed, 371 insertions(+), 829 deletions(-) delete mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AggregatedJsonRecordReader.java delete mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AggregatedJsonRecordReaderTests.java create mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/SchedulerTests.java diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java index 169327ccd92..f86d4ab1270 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java @@ -196,7 +196,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { autodetectProcessFactory, normalizerFactory); ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider, // norelease: we will no longer need to pass the client here after we switch to a client based data extractor - new HttpDataExtractorFactory(client), + new HttpDataExtractorFactory(client, searchRequestParsers), System::currentTimeMillis); JobLifeCycleService jobLifeCycleService = diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PutSchedulerAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PutSchedulerAction.java index bc662aaad33..3cdaa0497a3 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PutSchedulerAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/PutSchedulerAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.SearchRequestParsers; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig; @@ -173,11 +174,15 @@ public class PutSchedulerAction extends Action { + private final SearchRequestParsers searchRequestParsers; + @Inject public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + SearchRequestParsers searchRequestParsers) { super(settings, PutSchedulerAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new); + this.searchRequestParsers = searchRequestParsers; } @Override @@ -213,7 +218,7 @@ public class PutSchedulerAction extends Action nestingOrder; - private List nestedValues; - private String latestDocCount; - - /** - * Create a reader that simulates simple records given a hierarchical JSON - * structure where each field is at a progressively deeper level of nesting. - */ - AggregatedJsonRecordReader(JsonParser parser, Map fieldMap, String recordHoldingField, Logger logger, - List nestingOrder) { - super(parser, fieldMap, recordHoldingField, logger); - this.nestingOrder = Objects.requireNonNull(nestingOrder); - if (this.nestingOrder.isEmpty()) { - throw new IllegalArgumentException( - "Expected nesting order for aggregated JSON must not be empty"); - } - nestedValues = new ArrayList<>(); - } - - /** - * Read forwards in the JSON until enough information has been gathered to - * write to the record array. - * - * @param record Read fields are written to this array. This array is first filled with empty - * strings and will never contain a null - * @param gotFields boolean array each element is true if that field - * was read - * @return The number of fields in the aggregated hierarchy, or -1 if nothing was read - * because the end of the stream was reached - */ - @Override - public long read(String[] record, boolean[] gotFields) throws IOException { - initArrays(record, gotFields); - latestDocCount = null; - fieldCount = 0; - if (isFirstTime) { - clearNestedLevel(); - consumeToRecordHoldingField(); - isFirstTime = false; - } - - boolean gotInnerValue = false; - JsonToken token = tryNextTokenOrReadToEndOnError(); - while (!(token == JsonToken.END_OBJECT && nestedLevel == 0)) { - if (token == null) { - break; - } - - if (token == JsonToken.START_OBJECT) { - ++nestedLevel; - } else if (token == JsonToken.END_OBJECT) { - if (gotInnerValue) { - completeRecord(record, gotFields); - } - --nestedLevel; - if (nestedLevel % 2 == 0 && !nestedValues.isEmpty()) { - nestedValues.remove(nestedValues.size() - 1); - } - if (gotInnerValue) { - break; - } - } else if (token == JsonToken.FIELD_NAME) { - if (((nestedLevel + 1) / 2) == nestingOrder.size()) { - gotInnerValue = parseFieldValuePair(record, gotFields) || gotInnerValue; - } - // Alternate nesting levels are arbitrary labels that can be - // ignored. - else if (nestedLevel > 0 && nestedLevel % 2 == 0) { - String fieldName = parser.getCurrentName(); - if (fieldName.equals(AGG_KEY)) { - token = tryNextTokenOrReadToEndOnError(); - if (token == null) { - break; - } - nestedValues.add(parser.getText()); - } else if (fieldName.equals(SchedulerConfig.DOC_COUNT)) { - token = tryNextTokenOrReadToEndOnError(); - if (token == null) { - break; - } - latestDocCount = parser.getText(); - } - } - } - - token = tryNextTokenOrReadToEndOnError(); - } - - // null token means EOF; nestedLevel 0 means we've reached the end of - // the aggregations object - if (token == null || nestedLevel == 0) { - return -1; - } - return fieldCount; - } - - @Override - protected void clearNestedLevel() { - nestedLevel = 0; - } - - private boolean parseFieldValuePair(String[] record, boolean[] gotFields) throws IOException { - String fieldName = parser.getCurrentName(); - JsonToken token = tryNextTokenOrReadToEndOnError(); - - if (token == null) { - return false; - } - - if (token == JsonToken.START_OBJECT) { - ++nestedLevel; - return false; - } - - if (token == JsonToken.START_ARRAY) { - // We don't expect arrays at this level of aggregated inputIndex - // (although we do expect arrays at higher levels). Consume the - // whole array but do nothing with it. - while (token != JsonToken.END_ARRAY) { - token = tryNextTokenOrReadToEndOnError(); - } - return false; - } - - ++fieldCount; - - if (AGG_VALUE.equals(fieldName)) { - fieldName = nestingOrder.get(nestingOrder.size() - 1); - } - - Integer index = fieldMap.get(fieldName); - if (index == null) { - return false; - } - - String fieldValue = parser.getText(); - record[index] = fieldValue; - gotFields[index] = true; - - return true; - } - - private void completeRecord(String[] record, boolean[] gotFields) throws IOException { - // This loop should do time plus the by/over/partition/influencer fields - int numberOfFields = Math.min(nestingOrder.size() - 1, nestedValues.size()); - if (nestingOrder.size() - 1 != nestedValues.size()) { - logger.warn("Aggregation inputIndex does not match expectation: expected field order: " - + nestingOrder + " actual values: " + nestedValues); - } - fieldCount += numberOfFields; - for (int i = 0; i < numberOfFields; ++i) { - String fieldName = nestingOrder.get(i); - Integer index = fieldMap.get(fieldName); - if (index == null) { - continue; - } - - String fieldValue = nestedValues.get(i); - record[index] = fieldValue; - gotFields[index] = true; - } - - // This adds the summary count field - if (latestDocCount != null) { - ++fieldCount; - Integer index = fieldMap.get(SchedulerConfig.DOC_COUNT); - if (index != null) { - record[index] = latestDocCount; - gotFields[index] = true; - } - } - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartSchedulerAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartSchedulerAction.java index fb9ab99cf0c..9392b66d529 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartSchedulerAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartSchedulerAction.java @@ -21,16 +21,17 @@ import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchRequestParsers; import org.elasticsearch.tasks.LoggingTaskListener; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.prelert.PrelertPlugin; import org.elasticsearch.xpack.prelert.action.StartSchedulerAction; -import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig; -import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus; import org.elasticsearch.xpack.prelert.job.messages.Messages; import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.scheduler.ScheduledJobRunner; +import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig; +import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus; import org.elasticsearch.xpack.prelert.utils.SchedulerStatusObserver; import java.io.IOException; @@ -44,7 +45,7 @@ public class RestStartSchedulerAction extends BaseRestHandler { @Inject public RestStartSchedulerAction(Settings settings, RestController controller, ThreadPool threadPool, - ClusterService clusterService) { + ClusterService clusterService, SearchRequestParsers searchRequestParsers) { super(settings); this.clusterService = clusterService; this.schedulerStatusObserver = new SchedulerStatusObserver(threadPool, clusterService); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java index 612c02713b0..aef02fdbd0a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunner.java @@ -26,7 +26,6 @@ import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.config.DefaultFrequency; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory; -import org.elasticsearch.xpack.prelert.job.messages.Messages; import org.elasticsearch.xpack.prelert.job.metadata.Allocation; import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobValidator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobValidator.java index 9113e67f77e..58e777eb489 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobValidator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobValidator.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.prelert.scheduler; +import org.elasticsearch.search.SearchRequestParsers; import org.elasticsearch.xpack.prelert.job.AnalysisConfig; import org.elasticsearch.xpack.prelert.job.DataDescription; import org.elasticsearch.xpack.prelert.job.Job; @@ -24,8 +25,7 @@ public final class ScheduledJobValidator { if (analysisConfig.getLatency() != null && analysisConfig.getLatency() > 0) { throw new IllegalArgumentException(Messages.getMessage(Messages.SCHEDULER_DOES_NOT_SUPPORT_JOB_WITH_LATENCY)); } - if (schedulerConfig.getAggregations() != null - && !SchedulerConfig.DOC_COUNT.equals(analysisConfig.getSummaryCountFieldName())) { + if (schedulerConfig.hasAggregations() && !SchedulerConfig.DOC_COUNT.equals(analysisConfig.getSummaryCountFieldName())) { throw new IllegalArgumentException( Messages.getMessage(Messages.SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, SchedulerConfig.DOC_COUNT)); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/Scheduler.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/Scheduler.java index 8c688b00618..1121417ba79 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/Scheduler.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/Scheduler.java @@ -29,7 +29,7 @@ public class Scheduler extends AbstractDiffable implements ToXContent public static final ParseField RESULTS_FIELD = new ParseField("schedulers"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("scheduler", - a -> new Scheduler((SchedulerConfig) a[0], (SchedulerStatus) a[1])); + a -> new Scheduler(((SchedulerConfig.Builder) a[0]).build(), (SchedulerStatus) a[1])); static { PARSER.declareObject(ConstructingObjectParser.constructorArg(), SchedulerConfig.PARSER, CONFIG_FIELD); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/SchedulerConfig.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/SchedulerConfig.java index 28335639b06..568bfb894b9 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/SchedulerConfig.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/SchedulerConfig.java @@ -8,12 +8,25 @@ package org.elasticsearch.xpack.prelert.scheduler; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.support.ToXContentToBytes; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcherSupplier; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.QueryParseContext; +import org.elasticsearch.indices.query.IndicesQueriesRegistry; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorParsers; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.messages.Messages; import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; @@ -22,12 +35,9 @@ import org.elasticsearch.xpack.prelert.utils.PrelertStrings; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.SortedMap; -import java.util.TreeMap; /** * Scheduler configuration options. Describes where to proactively pull input @@ -61,7 +71,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { public static final ParseField AGGS = new ParseField("aggs"); public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields"); - public static final ObjectParser PARSER = new ObjectParser<>("schedule_config", Builder::new); + public static final ObjectParser PARSER = new ObjectParser<>("scheduler_config", Builder::new); static { PARSER.declareString(Builder::setId, ID); @@ -70,34 +80,26 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { PARSER.declareStringArray(Builder::setTypes, TYPES); PARSER.declareLong(Builder::setQueryDelay, QUERY_DELAY); PARSER.declareLong(Builder::setFrequency, FREQUENCY); - PARSER.declareObject(Builder::setQuery, (p, c) -> { - try { - return p.map(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }, QUERY); - PARSER.declareObject(Builder::setAggregations, (p, c) -> { - try { - return p.map(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }, AGGREGATIONS); - PARSER.declareObject(Builder::setAggregations, (p, c) -> { - try { - return p.map(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }, AGGS); - PARSER.declareObject(Builder::setScriptFields, (p, c) -> { - try { - return p.map(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }, SCRIPT_FIELDS); + PARSER.declareField((parser, builder, aVoid) -> { + XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent()); + XContentHelper.copyCurrentStructure(contentBuilder.generator(), parser); + builder.setQuery(contentBuilder.bytes()); + }, QUERY, ObjectParser.ValueType.OBJECT); + PARSER.declareField((parser, builder, aVoid) -> { + XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent()); + XContentHelper.copyCurrentStructure(contentBuilder.generator(), parser); + builder.setAggregations(contentBuilder.bytes()); + }, AGGREGATIONS, ObjectParser.ValueType.OBJECT); + PARSER.declareField((parser, builder, aVoid) -> { + XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent()); + XContentHelper.copyCurrentStructure(contentBuilder.generator(), parser); + builder.setAggregations(contentBuilder.bytes()); + }, AGGS, ObjectParser.ValueType.OBJECT); + PARSER.declareField((parser, builder, aVoid) -> { + XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent()); + XContentHelper.copyCurrentStructure(contentBuilder.generator(), parser); + builder.setScriptFields(contentBuilder.bytes()); + }, SCRIPT_FIELDS, ObjectParser.ValueType.OBJECT); PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE); } @@ -116,16 +118,13 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { private final List indexes; private final List types; - // NORELEASE: These 4 fields can be reduced to a single - // SearchSourceBuilder field holding the entire source: - private final Map query; - private final Map aggregations; - private final Map scriptFields; + private final BytesReference query; + private final BytesReference aggregations; + private final BytesReference scriptFields; private final Integer scrollSize; private SchedulerConfig(String id, String jobId, Long queryDelay, Long frequency, List indexes, List types, - Map query, Map aggregations, Map scriptFields, - Integer scrollSize) { + BytesReference query, BytesReference aggregations, BytesReference scriptFields, Integer scrollSize) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -153,21 +152,9 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { } else { this.types = null; } - if (in.readBoolean()) { - this.query = in.readMap(); - } else { - this.query = null; - } - if (in.readBoolean()) { - this.aggregations = in.readMap(); - } else { - this.aggregations = null; - } - if (in.readBoolean()) { - this.scriptFields = in.readMap(); - } else { - this.scriptFields = null; - } + this.query = in.readOptionalBytesReference(); + this.aggregations = in.readOptionalBytesReference(); + this.scriptFields = in.readOptionalBytesReference(); this.scrollSize = in.readOptionalVInt(); } @@ -207,82 +194,74 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { return this.types; } - /** - * For the ELASTICSEARCH data source only, the Elasticsearch query DSL - * representing the query to submit to Elasticsearch to get the input data. - * This should not include time bounds, as these are added separately. This - * class does not attempt to interpret the query. The map will be converted - * back to an arbitrary JSON object. - * - * @return The search query, or null if not set. - */ - public Map getQuery() { - return this.query; - } - - /** - * For the ELASTICSEARCH data source only, get the size of documents to be - * retrieved from each shard via a scroll search - * - * @return The size of documents to be retrieved from each shard via a - * scroll search - */ public Integer getScrollSize() { return this.scrollSize; } - /** - * For the ELASTICSEARCH data source only, optional Elasticsearch - * script_fields to add to the search to be submitted to Elasticsearch to - * get the input data. This class does not attempt to interpret the script - * fields. The map will be converted back to an arbitrary JSON object. - * - * @return The script fields, or null if not set. - */ - public Map getScriptFields() { - return this.scriptFields; + Map getQueryAsMap() { + return XContentHelper.convertToMap(query, true).v2(); } - /** - * For the ELASTICSEARCH data source only, optional Elasticsearch - * aggregations to apply to the search to be submitted to Elasticsearch to - * get the input data. This class does not attempt to interpret the - * aggregations. The map will be converted back to an arbitrary JSON object. - * - * @return The aggregations, or null if not set. - */ - public Map getAggregations() { - return this.aggregations; + Map getAggregationsAsMap() { + return XContentHelper.convertToMap(aggregations, true).v2(); } - /** - * Build the list of fields expected in the output from aggregations - * submitted to Elasticsearch. - * - * @return The list of fields, or empty list if there are no aggregations. - */ - public List buildAggregatedFieldList() { - Map aggs = getAggregations(); - if (aggs == null) { + Map getScriptFieldsAsMap() { + return XContentHelper.convertToMap(scriptFields, true).v2(); + } + + public QueryBuilder buildQuery(IndicesQueriesRegistry queryParsers) { + if (query == null) { + return QueryBuilders.matchAllQuery(); + } + XContentParser parser = createParser(QUERY, query); + QueryParseContext queryParseContext = new QueryParseContext(queryParsers, parser, ParseFieldMatcher.STRICT); + try { + return queryParseContext.parseInnerQueryBuilder().orElse(QueryBuilders.matchAllQuery()); + } catch (IOException e) { + throw ExceptionsHelper.parseException(QUERY, e); + } + } + + public boolean hasAggregations() { + return aggregations != null; + } + + public AggregatorFactories.Builder buildAggregations(IndicesQueriesRegistry queryParsers, AggregatorParsers aggParsers) { + if (!hasAggregations()) { + return null; + } + XContentParser parser = createParser(AGGREGATIONS, aggregations); + QueryParseContext queryParseContext = new QueryParseContext(queryParsers, parser, ParseFieldMatcher.STRICT); + try { + return aggParsers.parseAggregators(queryParseContext); + } catch (IOException e) { + throw ExceptionsHelper.parseException(AGGREGATIONS, e); + } + } + + public List buildScriptFields(IndicesQueriesRegistry queryParsers) { + if (scriptFields == null) { return Collections.emptyList(); } - - SortedMap orderedFields = new TreeMap<>(); - - scanSubLevel(aggs, 0, orderedFields); - - return new ArrayList<>(orderedFields.values()); + List parsedScriptFields = new ArrayList<>(); + XContentParser parser = createParser(SCRIPT_FIELDS, scriptFields); + try { + QueryParseContext queryParseContext = new QueryParseContext(queryParsers, parser, ParseFieldMatcher.STRICT); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + parsedScriptFields.add(new SearchSourceBuilder.ScriptField(queryParseContext)); + } + } catch (IOException e) { + throw ExceptionsHelper.parseException(SCRIPT_FIELDS, e); + } + return parsedScriptFields; } - @SuppressWarnings("unchecked") - private void scanSubLevel(Map subLevel, int depth, SortedMap orderedFields) { - for (Map.Entry entry : subLevel.entrySet()) { - Object value = entry.getValue(); - if (value instanceof Map) { - scanSubLevel((Map) value, depth + 1, orderedFields); - } else if (value instanceof String && FIELD.equals(entry.getKey())) { - orderedFields.put(depth, (String) value); - } + private XContentParser createParser(ParseField parseField, BytesReference bytesReference) { + try { + return XContentFactory.xContent(query).createParser(query); + } catch (IOException e) { + throw ExceptionsHelper.parseException(parseField, e); } } @@ -304,24 +283,9 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { } else { out.writeBoolean(false); } - if (query != null) { - out.writeBoolean(true); - out.writeMap(query); - } else { - out.writeBoolean(false); - } - if (aggregations != null) { - out.writeBoolean(true); - out.writeMap(aggregations); - } else { - out.writeBoolean(false); - } - if (scriptFields != null) { - out.writeBoolean(true); - out.writeMap(scriptFields); - } else { - out.writeBoolean(false); - } + out.writeOptionalBytesReference(query); + out.writeOptionalBytesReference(aggregations); + out.writeOptionalBytesReference(scriptFields); out.writeOptionalVInt(scrollSize); } @@ -349,13 +313,13 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { builder.field(TYPES.getPreferredName(), types); } if (query != null) { - builder.field(QUERY.getPreferredName(), query); + builder.field(QUERY.getPreferredName(), getQueryAsMap()); } if (aggregations != null) { - builder.field(AGGREGATIONS.getPreferredName(), aggregations); + builder.field(AGGREGATIONS.getPreferredName(), getAggregationsAsMap()); } if (scriptFields != null) { - builder.field(SCRIPT_FIELDS.getPreferredName(), scriptFields); + builder.field(SCRIPT_FIELDS.getPreferredName(), getScriptFieldsAsMap()); } if (scrollSize != null) { builder.field(SCROLL_SIZE.getPreferredName(), scrollSize); @@ -388,13 +352,13 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { && Objects.equals(this.types, that.types) && Objects.equals(this.query, that.query) && Objects.equals(this.scrollSize, that.scrollSize) - && Objects.equals(this.getAggregations(), that.getAggregations()) + && Objects.equals(this.aggregations, that.aggregations) && Objects.equals(this.scriptFields, that.scriptFields); } @Override public int hashCode() { - return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, getAggregations(), scriptFields); + return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, aggregations, scriptFields); } public static class Builder { @@ -402,29 +366,18 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { private static final int DEFAULT_SCROLL_SIZE = 1000; private static final long DEFAULT_ELASTICSEARCH_QUERY_DELAY = 60L; - /** - * The default query for elasticsearch searches - */ - private static final String MATCH_ALL_ES_QUERY = "match_all"; - private String id; private String jobId; private Long queryDelay; private Long frequency; private List indexes = Collections.emptyList(); private List types = Collections.emptyList(); - // NORELEASE: use Collections.emptyMap() instead of null as initial - // value: - // NORELEASE: Use SearchSourceBuilder - private Map query = null; - private Map aggregations = null; - private Map scriptFields = null; + private BytesReference query; + private BytesReference aggregations; + private BytesReference scriptFields; private Integer scrollSize; public Builder() { - Map query = new HashMap<>(); - query.put(MATCH_ALL_ES_QUERY, new HashMap()); - setQuery(query); setQueryDelay(DEFAULT_ELASTICSEARCH_QUERY_DELAY); setScrollSize(DEFAULT_SCROLL_SIZE); } @@ -482,21 +435,49 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable { this.frequency = frequency; } - public void setQuery(Map query) { - // NORELEASE: make use of Collections.unmodifiableMap(...) + private void setQuery(BytesReference query) { this.query = Objects.requireNonNull(query); } - public void setAggregations(Map aggregations) { - // NORELEASE: make use of Collections.unmodifiableMap(...) + public void setQuery(QueryBuilder query) { + this.query = xContentToBytes(query); + } + + private BytesReference xContentToBytes(ToXContent value) { + try { + XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); + return value.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS).bytes(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void setAggregations(BytesReference aggregations) { this.aggregations = Objects.requireNonNull(aggregations); } - public void setScriptFields(Map scriptFields) { - // NORELEASE: make use of Collections.unmodifiableMap(...) + public void setAggregations(AggregatorFactories.Builder aggregations) { + this.aggregations = xContentToBytes(aggregations); + } + + private void setScriptFields(BytesReference scriptFields) { this.scriptFields = Objects.requireNonNull(scriptFields); } + public void setScriptFields(List scriptFields) { + try { + XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); + jsonBuilder.startObject(); + for (SearchSourceBuilder.ScriptField scriptField : scriptFields) { + scriptField.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); + } + jsonBuilder.endObject(); + this.scriptFields = jsonBuilder.bytes(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + public void setScrollSize(int scrollSize) { if (scrollSize < 0) { String msg = Messages.getMessage(Messages.SCHEDULER_CONFIG_INVALID_OPTION_VALUE, diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/http/ElasticsearchQueryBuilder.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/http/ElasticsearchQueryBuilder.java index fb0d3d3bb38..11d509a575f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/http/ElasticsearchQueryBuilder.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/http/ElasticsearchQueryBuilder.java @@ -37,7 +37,7 @@ public class ElasticsearchQueryBuilder { + "\"query\": {" + "\"bool\": {" + "\"filter\": [" - + "{%s}," + + "%s," + "{" + "\"range\": {" + "\"%s\": {" diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/http/HttpDataExtractorFactory.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/http/HttpDataExtractorFactory.java index 39450a91d8e..b5bc2aba1a7 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/http/HttpDataExtractorFactory.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/scheduler/http/HttpDataExtractorFactory.java @@ -11,33 +11,43 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.search.SearchRequestParsers; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory; import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig; import java.io.IOException; +import java.util.List; import java.util.Map; +import java.util.Objects; public class HttpDataExtractorFactory implements DataExtractorFactory { private static final Logger LOGGER = Loggers.getLogger(HttpDataExtractorFactory.class); private final Client client; + private final SearchRequestParsers searchRequestParsers; - public HttpDataExtractorFactory(Client client) { - this.client = client; + public HttpDataExtractorFactory(Client client, SearchRequestParsers searchRequestParsers) { + this.client = Objects.requireNonNull(client); + this.searchRequestParsers = Objects.requireNonNull(searchRequestParsers); } @Override public DataExtractor newExtractor(SchedulerConfig schedulerConfig, Job job) { String timeField = job.getDataDescription().getTimeField(); ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder( - stringifyElasticsearchQuery(schedulerConfig.getQuery()), - stringifyElasticsearchAggregations(schedulerConfig.getAggregations()), - stringifyElasticsearchScriptFields(schedulerConfig.getScriptFields()), + xContentToJson(schedulerConfig.buildQuery(searchRequestParsers.queryParsers)), + stringifyAggregations(schedulerConfig.buildAggregations(searchRequestParsers.queryParsers, + searchRequestParsers.aggParsers)), + stringifyScriptFields(schedulerConfig.buildScriptFields(searchRequestParsers.queryParsers)), timeField); HttpRequester httpRequester = new HttpRequester(); ElasticsearchUrlBuilder urlBuilder = ElasticsearchUrlBuilder @@ -53,26 +63,38 @@ public class HttpDataExtractorFactory implements DataExtractorFactory { return baseUrl; } - String stringifyElasticsearchQuery(Map queryMap) { - String queryStr = writeMapAsJson(queryMap); - if (queryStr.startsWith("{") && queryStr.endsWith("}")) { - return queryStr.substring(1, queryStr.length() - 1); + private String xContentToJson(ToXContent xContent) { + try { + XContentBuilder jsonBuilder = JsonXContent.contentBuilder(); + xContent.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); + return jsonBuilder.string(); + } catch (IOException e) { + throw new RuntimeException(e); } - return queryStr; } - String stringifyElasticsearchAggregations(Map aggregationsMap) { - if (aggregationsMap != null) { - return writeMapAsJson(aggregationsMap); + String stringifyAggregations(AggregatorFactories.Builder aggregations) { + if (aggregations == null) { + return null; } - return null; + return xContentToJson(aggregations); } - String stringifyElasticsearchScriptFields(Map scriptFieldsMap) { - if (scriptFieldsMap != null) { - return writeMapAsJson(scriptFieldsMap); + String stringifyScriptFields(List scriptFields) { + if (scriptFields.isEmpty()) { + return null; + } + try { + XContentBuilder jsonBuilder = JsonXContent.contentBuilder(); + jsonBuilder.startObject(); + for (SearchSourceBuilder.ScriptField scriptField : scriptFields) { + scriptField.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); + } + jsonBuilder.endObject(); + return jsonBuilder.string(); + } catch (IOException e) { + throw new RuntimeException(e); } - return null; } private static String writeMapAsJson(Map map) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/utils/ExceptionsHelper.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/utils/ExceptionsHelper.java index dc451ad4b3f..4edb83b39a9 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/utils/ExceptionsHelper.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/utils/ExceptionsHelper.java @@ -6,9 +6,11 @@ package org.elasticsearch.xpack.prelert.utils; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.common.ParseField; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.prelert.job.messages.Messages; @@ -39,6 +41,10 @@ public class ExceptionsHelper { return new ElasticsearchStatusException(msg, RestStatus.CONFLICT); } + public static ElasticsearchParseException parseException(ParseField parseField, Throwable cause) { + throw new ElasticsearchParseException("Failed to parse [" + parseField.getPreferredName() + "]", cause); + } + /** * A more REST-friendly Object.requireNonNull() */ diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetSchedulersActionResponseTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetSchedulersActionResponseTests.java index 7f4c8f9c9e2..a12cab04358 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetSchedulersActionResponseTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetSchedulersActionResponseTests.java @@ -5,6 +5,11 @@ */ package org.elasticsearch.xpack.prelert.action; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.script.Script; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.prelert.action.GetSchedulersAction.Response; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; import org.elasticsearch.xpack.prelert.scheduler.Scheduler; @@ -14,7 +19,6 @@ import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; public class GetSchedulersActionResponseTests extends AbstractStreamableTestCase { @@ -34,16 +38,24 @@ public class GetSchedulersActionResponseTests extends AbstractStreamableTestCase schedulerConfig.setFrequency(randomPositiveLong()); schedulerConfig.setQueryDelay(randomPositiveLong()); if (randomBoolean()) { - schedulerConfig.setQuery(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10))); + schedulerConfig.setQuery(QueryBuilders.termQuery(randomAsciiOfLength(10), randomAsciiOfLength(10))); } if (randomBoolean()) { - schedulerConfig.setScriptFields(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10))); + int scriptsSize = randomInt(3); + List scriptFields = new ArrayList<>(scriptsSize); + for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) { + scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)), + randomBoolean())); + } + schedulerConfig.setScriptFields(scriptFields); } if (randomBoolean()) { schedulerConfig.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE)); } if (randomBoolean()) { - schedulerConfig.setAggregations(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10))); + AggregatorFactories.Builder aggsBuilder = new AggregatorFactories.Builder(); + aggsBuilder.addAggregator(AggregationBuilders.avg(randomAsciiOfLength(10))); + schedulerConfig.setAggregations(aggsBuilder); } schedulerList.add(schedulerConfig.build()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StopSchedulerActionRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StopSchedulerActionRequestTests.java index c286e5b1313..f284c963db8 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StopSchedulerActionRequestTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/StopSchedulerActionRequestTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.prelert.action; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchRequestParsers; import org.elasticsearch.xpack.prelert.action.StopSchedulerAction.Request; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig; @@ -18,6 +19,7 @@ import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase; import static org.elasticsearch.xpack.prelert.scheduler.ScheduledJobRunnerTests.createScheduledJob; import static org.elasticsearch.xpack.prelert.scheduler.ScheduledJobRunnerTests.createSchedulerConfig; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; public class StopSchedulerActionRequestTests extends AbstractStreamableTestCase { @@ -41,13 +43,13 @@ public class StopSchedulerActionRequestTests extends AbstractStreamableTestCase< SchedulerConfig schedulerConfig = createSchedulerConfig("foo", "foo").build(); PrelertMetadata prelertMetadata2 = new PrelertMetadata.Builder().putJob(job, false) - .putScheduler(schedulerConfig) + .putScheduler(schedulerConfig, mock(SearchRequestParsers.class)) .build(); e = expectThrows(ElasticsearchStatusException.class, () -> StopSchedulerAction.validate("foo", prelertMetadata2)); assertThat(e.getMessage(), equalTo("scheduler already stopped, expected scheduler status [STARTED], but got [STOPPED]")); PrelertMetadata prelertMetadata3 = new PrelertMetadata.Builder().putJob(job, false) - .putScheduler(schedulerConfig) + .putScheduler(schedulerConfig, mock(SearchRequestParsers.class)) .updateSchedulerStatus("foo", SchedulerStatus.STARTED) .build(); StopSchedulerAction.validate("foo", prelertMetadata3); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobTests.java index 0bc1dac5783..6cee0ed6ff1 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/JobTests.java @@ -489,7 +489,9 @@ public class JobTests extends AbstractSerializingTestCase { builder.setAnalysisConfig(analysisConfig); builder.setAnalysisLimits(new AnalysisLimits(randomPositiveLong(), randomPositiveLong())); if (randomBoolean()) { - builder.setDataDescription(new DataDescription.Builder()); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(randomFrom(DataDescription.DataFormat.values())); + builder.setDataDescription(dataDescription); } String[] outputs; TransformType[] transformTypes ; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java index 75f7cf9e384..322a01bdf2b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java @@ -16,11 +16,13 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchRequestParsers; import org.elasticsearch.xpack.prelert.job.DataDescription; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.JobTests; import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig; +import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfigTests; import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus; import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase; @@ -33,13 +35,14 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; public class PrelertMetadataTests extends AbstractSerializingTestCase { @Override protected PrelertMetadata createTestInstance() { PrelertMetadata.Builder builder = new PrelertMetadata.Builder(); - int numJobs = randomIntBetween(0, 4); + int numJobs = randomIntBetween(0, 10); for (int i = 0; i < numJobs; i++) { Job job = JobTests.createRandomizedJob(); builder.putJob(job, false); @@ -52,10 +55,13 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase builder.removeJob(job1.getId())); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); @@ -172,7 +178,7 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase builder.putScheduler(schedulerConfig1)); + expectThrows(ResourceNotFoundException.class, () -> builder.putScheduler(schedulerConfig1, null)); } public void testPutScheduler_failBecauseSchedulerIdIsAlreadyTaken() { @@ -200,9 +206,9 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase builder.putScheduler(schedulerConfig1)); + expectThrows(ResourceAlreadyExistsException.class, () -> builder.putScheduler(schedulerConfig1, null)); } public void testPutScheduler_failBecauseJobAlreadyHasScheduler() { @@ -211,9 +217,10 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase builder.putScheduler(schedulerConfig2)); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> builder.putScheduler(schedulerConfig2, mock(SearchRequestParsers.class))); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); } @@ -226,7 +233,7 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase builder.putScheduler(schedulerConfig1)); + expectThrows(IllegalArgumentException.class, () -> builder.putScheduler(schedulerConfig1, mock(SearchRequestParsers.class))); } public void testRemoveScheduler_failBecauseSchedulerStarted() { @@ -234,7 +241,7 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase fieldMap = createFieldMapWithNoTermField(); - List nestingOrder = createNestingOrderWithNoTermField(); - - AggregatedJsonRecordReader reader = new AggregatedJsonRecordReader(parser, fieldMap, "aggregations", mock(Logger.class), - nestingOrder); - - String[] record = new String[4]; - boolean[] gotFields = new boolean[4]; - - assertEquals(3, reader.read(record, gotFields)); - assertEquals("649", record[0]); - assertEquals("106.72129514140468", record[1]); - assertEquals("1449446400000", record[2]); - - assertEquals(3, reader.read(record, gotFields)); - assertEquals("627", record[0]); - assertEquals("103.64676252462097", record[1]); - assertEquals("1449450000000", record[2]); - - assertEquals(-1, reader.read(record, gotFields)); - } - - public void testRead_WithOneTermField() throws JsonParseException, IOException { - String data = "{" + "\"took\" : 88," + "\"timed_out\" : false," - + "\"_shards\" : { \"total\" : 5, \"successful\" : 5, \"failed\" : 0 }," - + "\"hits\" : { \"total\" : 86275, \"max_score\" : 0.0, \"hits\" : [ ] }," + "\"aggregations\" : {" + "\"time_level\" : {" - + "\"buckets\" : [ {" + "\"key_as_string\" : \"2015-12-07T00:00:00.000Z\", \"key\" : 1449446400000, \"doc_count\" : 649," - + "\"airline_level\" : {" + "\"doc_count_error_upper_bound\" : 0, \"sum_other_doc_count\" : 0," - + "\"buckets\" : [ { \"key\" : \"aal\", \"doc_count\" : 62, \"metric_level\" : { \"value\" : 106.72129514140468 } }," - + "{ \"key\" : \"awe\", \"doc_count\" : 61, \"metric_level\" : { \"value\" : 20.20497368984535 } } ]" + "}" + "}," + "{" - + "\"key_as_string\" : \"2015-12-07T01:00:00.000Z\", \"key\" : 1449450000000, \"doc_count\" : 627," - + "\"airline_level\" : {" + "\"doc_count_error_upper_bound\" : 0, \"sum_other_doc_count\" : 0," - + "\"buckets\" : [ { \"key\" : \"aal\", \"doc_count\" : 59, \"metric_level\" : { \"value\" : 103.64676252462097 } }," - + "{ \"key\" : \"awe\", \"doc_count\" : 56, \"metric_level\" : { \"value\" : 20.047162464686803 } } ]" + "}" + "} ]" + "}" - + "}" + "}"; - JsonParser parser = createParser(data); - Map fieldMap = createFieldMapWithOneTermField(); - List nestingOrder = createNestingOrderWithOneTermField(); - - AggregatedJsonRecordReader reader = new AggregatedJsonRecordReader(parser, fieldMap, "aggregations", mock(Logger.class), - nestingOrder); - - String[] record = new String[4]; - boolean[] gotFields = new boolean[4]; - - assertEquals(4, reader.read(record, gotFields)); - assertEquals("aal", record[0]); - assertEquals("62", record[1]); - assertEquals("106.72129514140468", record[2]); - assertEquals("1449446400000", record[3]); - - assertEquals(4, reader.read(record, gotFields)); - assertEquals("awe", record[0]); - assertEquals("61", record[1]); - assertEquals("20.20497368984535", record[2]); - assertEquals("1449446400000", record[3]); - - assertEquals(4, reader.read(record, gotFields)); - assertEquals("aal", record[0]); - assertEquals("59", record[1]); - assertEquals("103.64676252462097", record[2]); - assertEquals("1449450000000", record[3]); - - assertEquals(4, reader.read(record, gotFields)); - assertEquals("awe", record[0]); - assertEquals("56", record[1]); - assertEquals("20.047162464686803", record[2]); - assertEquals("1449450000000", record[3]); - - assertEquals(-1, reader.read(record, gotFields)); - } - - public void testRead_WithTwoTermFields() throws JsonParseException, IOException { - String data = "{" + "\"took\" : 88," + "\"timed_out\" : false," - + "\"_shards\" : { \"total\" : 5, \"successful\" : 5, \"failed\" : 0 }," - + "\"hits\" : { \"total\" : 86275, \"max_score\" : 0.0, \"hits\" : [ ] }," + "\"aggregations\" : {" + "\"time_level\" : {" - + "\"buckets\" : [ {" + "\"key_as_string\" : \"2015-12-07T00:00:00.000Z\", \"key\" : 1449446400000, \"doc_count\" : 649," - + "\"sourcetype_level\" : {" + "\"doc_count_error_upper_bound\" : 0, \"sum_other_doc_count\" : 0," + "\"buckets\" : [ {" - + "\"key\" : \"farequote\", \"doc_count\" : 649," + "\"airline_level\" : {" - + "\"doc_count_error_upper_bound\" : 0, \"sum_other_doc_count\" : 0," - + "\"buckets\" : [ { \"key\" : \"aal\", \"doc_count\" : 62, \"metric_level\" : { \"value\" : 106.72129514140468 } }," - + "{ \"key\" : \"awe\", \"doc_count\" : 61, \"metric_level\" : { \"value\" : 20.20497368984535 } } ]" + "}" + "} ]" + "}" - + "}," + "{" + "\"key_as_string\" : \"2015-12-07T01:00:00.000Z\", \"key\" : 1449450000000, \"doc_count\" : 627," - + "\"sourcetype_level\" : {" + "\"doc_count_error_upper_bound\" : 0, \"sum_other_doc_count\" : 0," + "\"buckets\" : [ {" - + "\"key\" : \"farequote\", \"doc_count\" : 627," + "\"airline_level\" : {" - + "\"doc_count_error_upper_bound\" : 0, \"sum_other_doc_count\" : 0," - + "\"buckets\" : [ { \"key\" : \"aal\", \"doc_count\" : 59, \"metric_level\" : { \"value\" : 103.64676252462097 } }," - + "{ \"key\" : \"awe\", \"doc_count\" : 56, \"metric_level\" : { \"value\" : 20.047162464686803 } } ]" + "}" + "} ]" + "}" - + "} ]" + "}" + "}" + "}"; - JsonParser parser = createParser(data); - Map fieldMap = createFieldMapWithTwoTermFields(); - List nestingOrder = createNestingOrderWithTwoTermFields(); - - AggregatedJsonRecordReader reader = new AggregatedJsonRecordReader(parser, fieldMap, "aggregations", mock(Logger.class), - nestingOrder); - - String[] record = new String[5]; - boolean[] gotFields = new boolean[5]; - - assertEquals(5, reader.read(record, gotFields)); - assertEquals("aal", record[0]); - assertEquals("62", record[1]); - assertEquals("106.72129514140468", record[2]); - assertEquals("1449446400000", record[3]); - assertEquals("farequote", record[4]); - - assertEquals(5, reader.read(record, gotFields)); - assertEquals("awe", record[0]); - assertEquals("61", record[1]); - assertEquals("20.20497368984535", record[2]); - assertEquals("1449446400000", record[3]); - assertEquals("farequote", record[4]); - - assertEquals(5, reader.read(record, gotFields)); - assertEquals("aal", record[0]); - assertEquals("59", record[1]); - assertEquals("103.64676252462097", record[2]); - assertEquals("1449450000000", record[3]); - assertEquals("farequote", record[4]); - - assertEquals(5, reader.read(record, gotFields)); - assertEquals("awe", record[0]); - assertEquals("56", record[1]); - assertEquals("20.047162464686803", record[2]); - assertEquals("1449450000000", record[3]); - assertEquals("farequote", record[4]); - - assertEquals(-1, reader.read(record, gotFields)); - } - - public void testConstructor_GivenNoNestingOrder() throws JsonParseException, IOException { - JsonParser parser = createParser(""); - Map fieldMap = createFieldMapWithNoTermField(); - List nestingOrder = Collections.emptyList(); - - ESTestCase.expectThrows(IllegalArgumentException.class, - () -> new AggregatedJsonRecordReader(parser, fieldMap, "aggregations", mock(Logger.class), nestingOrder)); - } - - public void testRead_GivenInvalidJson() throws JsonParseException, IOException { - String data = "{" + "\"took\" : 88," + "\"timed_out\" : false," - + "\"_shards\" : { \"total\" : 5, \"successful\" : 5, \"failed\" : 0 }," - + "\"hits\" : { \"total\" : 86275, \"max_score\" : 0.0, \"hits\" : [ ] }," + "\"aggregations\" : {" + "\"time_level\" : {"; - JsonParser parser = createParser(data); - Map fieldMap = createFieldMapWithNoTermField(); - List nestingOrder = createNestingOrderWithNoTermField(); - - AggregatedJsonRecordReader reader = new AggregatedJsonRecordReader(parser, fieldMap, "aggregations", mock(Logger.class), - nestingOrder); - - String[] record = new String[4]; - boolean[] gotFields = new boolean[4]; - - ESTestCase.expectThrows(ElasticsearchParseException.class, () -> reader.read(record, gotFields)); - } - - private JsonParser createParser(String input) throws JsonParseException, IOException { - ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); - return new JsonFactory().createParser(inputStream); - } - - private Map createFieldMapWithNoTermField() { - Map fieldMap = new HashMap<>(); - fieldMap.put("doc_count", 0); - fieldMap.put("responsetime", 1); - fieldMap.put("time", 2); - return fieldMap; - } - - private List createNestingOrderWithNoTermField() { - List nestingOrder = new ArrayList<>(); - nestingOrder.add("time"); - nestingOrder.add("responsetime"); - return nestingOrder; - } - - private Map createFieldMapWithOneTermField() { - Map fieldMap = new HashMap<>(); - fieldMap.put("airline", 0); - fieldMap.put("doc_count", 1); - fieldMap.put("responsetime", 2); - fieldMap.put("time", 3); - return fieldMap; - } - - private List createNestingOrderWithOneTermField() { - List nestingOrder = new ArrayList<>(); - nestingOrder.add("time"); - nestingOrder.add("airline"); - nestingOrder.add("responsetime"); - return nestingOrder; - } - - private Map createFieldMapWithTwoTermFields() { - Map fieldMap = new HashMap<>(); - fieldMap.put("airline", 0); - fieldMap.put("doc_count", 1); - fieldMap.put("responsetime", 2); - fieldMap.put("time", 3); - fieldMap.put("sourcetype", 4); - return fieldMap; - } - - private List createNestingOrderWithTwoTermFields() { - List nestingOrder = new ArrayList<>(); - nestingOrder.add("time"); - nestingOrder.add("sourcetype"); - nestingOrder.add("airline"); - nestingOrder.add("responsetime"); - return nestingOrder; - } - -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java index 8c43e7fd5cb..1c957d2a4f8 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/rest/schedulers/RestStartJobSchedulerActionTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.search.SearchRequestParsers; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.threadpool.ThreadPool; @@ -33,18 +34,19 @@ public class RestStartJobSchedulerActionTests extends ESTestCase { public void testPrepareRequest() throws Exception { ClusterService clusterService = mock(ClusterService.class); + SearchRequestParsers searchRequestParsers = mock(SearchRequestParsers.class); Job.Builder job = ScheduledJobRunnerTests.createScheduledJob(); SchedulerConfig schedulerConfig = ScheduledJobRunnerTests.createSchedulerConfig("foo-scheduler", "foo").build(); PrelertMetadata prelertMetadata = new PrelertMetadata.Builder() .putJob(job.build(), false) - .putScheduler(schedulerConfig) + .putScheduler(schedulerConfig, searchRequestParsers) .updateStatus("foo", JobStatus.OPENED, null) .build(); when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata)) .build()); RestStartSchedulerAction action = new RestStartSchedulerAction(Settings.EMPTY, mock(RestController.class), - mock(ThreadPool.class), clusterService); + mock(ThreadPool.class), clusterService, searchRequestParsers); Map params = new HashMap<>(); params.put("start", "not-a-date"); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunnerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunnerTests.java index 9b90b627e8a..0c87d33645e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunnerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobRunnerTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchRequestParsers; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.prelert.PrelertPlugin; @@ -99,8 +100,8 @@ public class ScheduledJobRunnerTests extends ESTestCase { when(client.execute(same(JobDataAction.INSTANCE), any())).thenReturn(jobDataFuture); when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); - scheduledJobRunner = - new ScheduledJobRunner(threadPool, client, clusterService,jobProvider, dataExtractorFactory, () -> currentTime); + scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider, dataExtractorFactory, + () -> currentTime); when(jobProvider.audit(anyString())).thenReturn(auditor); when(jobProvider.buckets(anyString(), any(BucketsQueryBuilder.BucketsQuery.class))).thenThrow( @@ -114,7 +115,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { Job job = jobBuilder.build(); PrelertMetadata prelertMetadata = new PrelertMetadata.Builder() .putJob(job, false) - .putScheduler(schedulerConfig) + .putScheduler(schedulerConfig, mock(SearchRequestParsers.class)) .updateStatus("foo", JobStatus.OPENED, null) .build(); when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) @@ -147,7 +148,7 @@ public class ScheduledJobRunnerTests extends ESTestCase { Job job = jobBuilder.build(); PrelertMetadata prelertMetadata = new PrelertMetadata.Builder() .putJob(job, false) - .putScheduler(schedulerConfig) + .putScheduler(schedulerConfig, mock(SearchRequestParsers.class)) .updateStatus("foo", JobStatus.OPENED, null) .build(); when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name")) @@ -202,21 +203,24 @@ public class ScheduledJobRunnerTests extends ESTestCase { PrelertMetadata prelertMetadata1 = new PrelertMetadata.Builder() .putJob(job1, false) .build(); - Exception e = expectThrows(ResourceNotFoundException.class, () -> ScheduledJobRunner.validate("some-scheduler", prelertMetadata1)); + Exception e = expectThrows(ResourceNotFoundException.class, + () -> ScheduledJobRunner.validate("some-scheduler", prelertMetadata1)); assertThat(e.getMessage(), equalTo("No scheduler with id [some-scheduler] exists")); SchedulerConfig schedulerConfig1 = createSchedulerConfig("foo-scheduler", "foo").build(); PrelertMetadata prelertMetadata2 = new PrelertMetadata.Builder(prelertMetadata1) - .putScheduler(schedulerConfig1) + .putScheduler(schedulerConfig1, mock(SearchRequestParsers.class)) .build(); - e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate("foo-scheduler", prelertMetadata2)); + e = expectThrows(ElasticsearchStatusException.class, + () -> ScheduledJobRunner.validate("foo-scheduler", prelertMetadata2)); assertThat(e.getMessage(), equalTo("cannot start scheduler, expected job status [OPENED], but got [CLOSED]")); PrelertMetadata prelertMetadata3 = new PrelertMetadata.Builder(prelertMetadata2) .updateStatus("foo", JobStatus.OPENED, null) .updateSchedulerStatus("foo-scheduler", SchedulerStatus.STARTED) .build(); - e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate("foo-scheduler", prelertMetadata3)); + e = expectThrows(ElasticsearchStatusException.class, + () -> ScheduledJobRunner.validate("foo-scheduler", prelertMetadata3)); assertThat(e.getMessage(), equalTo("scheduler already started, expected scheduler status [STOPPED], but got [STARTED]")); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobValidatorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobValidatorTests.java index a1000066549..c5944ecd39e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobValidatorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/ScheduledJobValidatorTests.java @@ -5,8 +5,8 @@ */ package org.elasticsearch.xpack.prelert.scheduler; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.prelert.job.AnalysisConfig; import org.elasticsearch.xpack.prelert.job.DataDescription; @@ -141,32 +141,7 @@ public class ScheduledJobValidatorTests extends ESTestCase { private static SchedulerConfig.Builder createValidSchedulerConfigWithAggs() throws IOException { SchedulerConfig.Builder schedulerConfig = createValidSchedulerConfig(); - String aggStr = - "{" + - "\"buckets\" : {" + - "\"histogram\" : {" + - "\"field\" : \"time\"," + - "\"interval\" : 3600000" + - "}," + - "\"aggs\" : {" + - "\"byField\" : {" + - "\"terms\" : {" + - "\"field\" : \"airline\"," + - "\"size\" : 0" + - "}," + - "\"aggs\" : {" + - "\"stats\" : {" + - "\"stats\" : {" + - "\"field\" : \"responsetime\"" + - "}" + - "}" + - "}" + - "}" + - "}" + - "} " + - "}"; - XContentParser parser = XContentFactory.xContent(aggStr).createParser(aggStr); - schedulerConfig.setAggregations(parser.map()); + schedulerConfig.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo"))); return schedulerConfig; } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/SchedulerConfigTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/SchedulerConfigTests.java index d9a18bae8e0..7a9a14a14cb 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/SchedulerConfigTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/SchedulerConfigTests.java @@ -6,12 +6,17 @@ package org.elasticsearch.xpack.prelert.scheduler; import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; -import org.apache.logging.log4j.Logger; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.script.Script; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.prelert.job.messages.Messages; import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase; @@ -20,29 +25,22 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class SchedulerConfigTests extends AbstractSerializingTestCase { @Override protected SchedulerConfig createTestInstance() { - SchedulerConfig.Builder builder = new SchedulerConfig.Builder(randomValidSchedulerId(), randomAsciiOfLength(10)); + return createRandomizedSchedulerConfig(randomAsciiOfLength(10)); + } + + public static SchedulerConfig createRandomizedSchedulerConfig(String jobId) { + SchedulerConfig.Builder builder = new SchedulerConfig.Builder(randomValidSchedulerId(), jobId); builder.setIndexes(randomStringList(1, 10)); builder.setTypes(randomStringList(1, 10)); - if (randomBoolean()) { - builder.setQuery(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10))); - } - if (randomBoolean()) { - builder.setScriptFields(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10))); - } if (randomBoolean()) { builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE)); } - if (randomBoolean()) { - builder.setAggregations(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10))); - } if (randomBoolean()) { builder.setFrequency(randomPositiveLong()); } @@ -71,80 +69,43 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase matcher).build(); } - /** - * Test parsing of the opaque {@link SchedulerConfig#getQuery()} object - */ - public void testQueryParsing() throws IOException { - Logger logger = Loggers.getLogger(SchedulerConfigTests.class); + public void testToXContent_GivenQueryAggsAndScriptFields() throws IOException { + SchedulerConfig.Builder builder = new SchedulerConfig.Builder(randomValidSchedulerId(), randomAsciiOfLength(10)); + builder.setIndexes(randomStringList(1, 10)); + builder.setTypes(randomStringList(1, 10)); + builder.setQuery(QueryBuilders.matchAllQuery()); - String schedulerConfigStr = "{" + "\"scheduler_id\":\"scheduler1\"," + "\"job_id\":\"job1\"," + "\"indexes\":[\"farequote\"]," - + "\"types\":[\"farequote\"]," + "\"query\":{\"match_all\":{} }" + "}"; + int scriptsSize = randomInt(3); + List scriptFields = new ArrayList<>(scriptsSize); + for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) { + scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)), + randomBoolean())); + } + builder.setScriptFields(scriptFields); - XContentParser parser = XContentFactory.xContent(schedulerConfigStr).createParser(schedulerConfigStr); - SchedulerConfig schedulerConfig = SchedulerConfig.PARSER.apply(parser, () -> ParseFieldMatcher.STRICT).build(); - assertNotNull(schedulerConfig); + AggregatorFactories.Builder aggsBuilder = new AggregatorFactories.Builder(); + aggsBuilder.addAggregator(AggregationBuilders.avg(randomAsciiOfLength(10))); + builder.setAggregations(aggsBuilder); - Map query = schedulerConfig.getQuery(); - assertNotNull(query); + SchedulerConfig testInstance = builder.build(); - String queryAsJson = XContentFactory.jsonBuilder().map(query).string(); - logger.info("Round trip of query is: " + queryAsJson); - assertTrue(query.containsKey("match_all")); - } + for (int runs = 0; runs < NUMBER_OF_TESTQUERIES; runs++) { + XContentBuilder xContentBuilder = toXContent(testInstance, randomFrom(XContentType.values())); + XContentBuilder shuffled = shuffleXContent(xContentBuilder, shuffleProtectedFields()); - public void testBuildAggregatedFieldList_GivenNoAggregations() { - SchedulerConfig.Builder builder = new SchedulerConfig.Builder("scheduler1", "job1"); - builder.setIndexes(Arrays.asList("index")); - builder.setTypes(Arrays.asList("type")); - assertTrue(builder.build().buildAggregatedFieldList().isEmpty()); - } + XContentParser parser = XContentFactory.xContent(shuffled.bytes()).createParser(shuffled.bytes()); + SchedulerConfig parsedInstance = parseInstance(parser, ParseFieldMatcher.STRICT); - public void testAggsParse() throws IOException { - Logger logger = Loggers.getLogger(SchedulerConfigTests.class); - - String aggregationsConfig = "{" + "\"scheduler_id\":\"scheduler1\"," + "\"job_id\":\"job1\"," + "\"indexes\":[\"farequote\"]," - + "\"types\":[\"farequote\"]," + "\"query\":{\"match_all\":{} }," + "\"aggs\" : {" + "\"top_level_must_be_time\" : {" - + "\"histogram\" : {" + "\"field\" : \"@timestamp\"," + "\"interval\" : 3600000" + "}," + "\"aggs\" : {" - + "\"by_field_in_the_middle\" : { " + "\"terms\" : {" + "\"field\" : \"airline\"," + "\"size\" : 0" + "}," + "\"aggs\" : {" - + "\"stats_last\" : {" + "\"avg\" : {" + "\"field\" : \"responsetime\"" + "}" + "}" + "} " + "}" + "}" + "}" + "}" + "}" - + "}"; - - String aggsConfig = "{" + "\"scheduler_id\":\"scheduler1\"," + "\"job_id\":\"job1\"," + "\"indexes\":[\"farequote\"]," - + "\"types\":[\"farequote\"]," + "\"query\":{\"match_all\":{} }," + "\"aggs\" : {" + "\"top_level_must_be_time\" : {" - + "\"histogram\" : {" + "\"field\" : \"@timestamp\"," + "\"interval\" : 3600000" + "}," + "\"aggs\" : {" - + "\"by_field_in_the_middle\" : { " + "\"terms\" : {" + "\"field\" : \"airline\"," + "\"size\" : 0" + "}," + "\"aggs\" : {" - + "\"stats_last\" : {" + "\"avg\" : {" + "\"field\" : \"responsetime\"" + "}" + "}" + "} " + "}" + "}" + "}" + "}" + "}" - + "}"; - - XContentParser parser = XContentFactory.xContent(aggregationsConfig).createParser(aggregationsConfig); - SchedulerConfig aggregationsSchedulerConfig = SchedulerConfig.PARSER.apply(parser, () -> ParseFieldMatcher.STRICT).build(); - parser = XContentFactory.xContent(aggsConfig).createParser(aggsConfig); - SchedulerConfig aggsSchedulerConfig = SchedulerConfig.PARSER.apply(parser, () -> ParseFieldMatcher.STRICT).build(); - assertNotNull(aggregationsSchedulerConfig); - assertNotNull(aggsSchedulerConfig); - assertEquals(aggregationsSchedulerConfig, aggsSchedulerConfig); - - Map aggs = aggsSchedulerConfig.getAggregations(); - assertNotNull(aggs); - - String aggsAsJson = XContentFactory.jsonBuilder().map(aggs).string(); - logger.info("Round trip of aggs is: " + aggsAsJson); - assertTrue(aggs.containsKey("top_level_must_be_time")); - - List aggregatedFieldList = aggsSchedulerConfig.buildAggregatedFieldList(); - assertEquals(3, aggregatedFieldList.size()); - assertEquals("@timestamp", aggregatedFieldList.get(0)); - assertEquals("airline", aggregatedFieldList.get(1)); - assertEquals("responsetime", aggregatedFieldList.get(2)); + assertEquals(testInstance.getQueryAsMap(), parsedInstance.getQueryAsMap()); + assertEquals(testInstance.getAggregationsAsMap(), parsedInstance.getAggregationsAsMap()); + assertEquals(testInstance.getScriptFieldsAsMap(), parsedInstance.getScriptFieldsAsMap()); + } } public void testFillDefaults() { SchedulerConfig.Builder expectedSchedulerConfig = new SchedulerConfig.Builder("scheduler1", "job1"); expectedSchedulerConfig.setIndexes(Arrays.asList("index")); expectedSchedulerConfig.setTypes(Arrays.asList("type")); - Map defaultQuery = new HashMap<>(); - defaultQuery.put("match_all", new HashMap()); - expectedSchedulerConfig.setQuery(defaultQuery); expectedSchedulerConfig.setQueryDelay(60L); expectedSchedulerConfig.setScrollSize(1000); SchedulerConfig.Builder defaultedSchedulerConfig = new SchedulerConfig.Builder("scheduler1", "job1"); @@ -208,8 +169,7 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase emptyQuery = new HashMap<>(); - b2.setQuery(emptyQuery); + b2.setQuery(QueryBuilders.termQuery("foo", "bar")); SchedulerConfig sc1 = b1.build(); SchedulerConfig sc2 = b2.build(); @@ -220,8 +180,7 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase emptyAggs = new HashMap<>(); - sc2.setAggregations(emptyAggs); + sc2.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.count("foo"))); assertFalse(sc1.build().equals(sc2.build())); assertFalse(sc2.build().equals(sc1.build())); @@ -233,46 +192,12 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase query = new HashMap<>(); - query.put("foo", new HashMap<>()); - sc.setQuery(query); - Map aggs = new HashMap<>(); - aggs.put("bar", new HashMap<>()); - sc.setAggregations(aggs); + sc.setQuery(QueryBuilders.matchAllQuery()); + sc.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo"))); sc.setQueryDelay(90L); return sc; } - public void testCheckValid_AllOk() throws IOException { - SchedulerConfig.Builder conf = new SchedulerConfig.Builder("scheduler1", "job1"); - conf.setIndexes(Arrays.asList("myindex")); - conf.setTypes(Arrays.asList("mytype")); - conf.setQueryDelay(90L); - String json = "{ \"match_all\" : {} }"; - XContentParser parser = XContentFactory.xContent(json).createParser(json); - conf.setQuery(parser.map()); - conf.setScrollSize(2000); - conf.build(); - } - - public void testCheckValid_NoQuery() { - SchedulerConfig.Builder conf = new SchedulerConfig.Builder("scheduler1", "job1"); - conf.setIndexes(Arrays.asList("myindex")); - conf.setTypes(Arrays.asList("mytype")); - assertEquals(Collections.singletonMap("match_all", new HashMap<>()), conf.build().getQuery()); - } - - public void testCheckValid_GivenScriptFields() throws IOException { - SchedulerConfig.Builder conf = new SchedulerConfig.Builder("scheduler1", "job1"); - conf.setIndexes(Arrays.asList("myindex")); - conf.setTypes(Arrays.asList("mytype")); - String json = "{ \"twiceresponsetime\" : { \"script\" : { \"lang\" : \"expression\", " - + "\"inline\" : \"doc['responsetime'].value * 2\" } } }"; - XContentParser parser = XContentFactory.xContent(json).createParser(json); - conf.setScriptFields(parser.map()); - assertEquals(1, conf.build().getScriptFields().size()); - } - public void testCheckValid_GivenNullIndexes() throws IOException { SchedulerConfig.Builder conf = new SchedulerConfig.Builder("scheduler1", "job1"); expectThrows(IllegalArgumentException.class, () -> conf.setIndexes(null)); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/SchedulerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/SchedulerTests.java new file mode 100644 index 00000000000..5e684ad5be6 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/SchedulerTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.prelert.scheduler; + +import org.elasticsearch.common.ParseFieldMatcher; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase; + +public class SchedulerTests extends AbstractSerializingTestCase { + + @Override + protected Scheduler createTestInstance() { + return new Scheduler(SchedulerConfigTests.createRandomizedSchedulerConfig(randomAsciiOfLength(10)), + randomFrom(SchedulerStatus.values())); + } + + @Override + protected Writeable.Reader instanceReader() { + return Scheduler::new; + } + + @Override + protected Scheduler parseInstance(XContentParser parser, ParseFieldMatcher matcher) { + return Scheduler.PARSER.apply(parser, () -> matcher); + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/http/ElasticsearchDataExtractorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/http/ElasticsearchDataExtractorTests.java index 02da3bd2e9b..3cae9945fe2 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/http/ElasticsearchDataExtractorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/http/ElasticsearchDataExtractorTests.java @@ -28,7 +28,7 @@ public class ElasticsearchDataExtractorTests extends ESTestCase { private static final List INDEXES = Arrays.asList("index-*"); private static final List TYPES = Arrays.asList("dataType"); - private static final String SEARCH = "\"match_all\":{}"; + private static final String SEARCH = "{\"match_all\":{}}"; private static final String TIME_FIELD = "time"; private static final String CLEAR_SCROLL_RESPONSE = "{}"; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/http/ElasticsearchQueryBuilderTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/http/ElasticsearchQueryBuilderTests.java index 7e579d9372e..7152652d40d 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/http/ElasticsearchQueryBuilderTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/scheduler/http/ElasticsearchQueryBuilderTests.java @@ -13,8 +13,10 @@ import static org.mockito.Mockito.verify; public class ElasticsearchQueryBuilderTests extends ESTestCase { + private static final String MATCH_ALL_QUERY = "{\"match_all\":{}}"; + public void testCreateSearchBody_GivenQueryOnly() { - ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, null, "time"); + ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null, null, "time"); assertFalse(queryBuilder.isAggregated()); @@ -29,7 +31,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase { } public void testCreateSearchBody_GivenQueryAndScriptFields() { - ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, + ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null, "{\"test1\":{\"script\": \"...\"}}", "@timestamp"); assertFalse(queryBuilder.isAggregated()); @@ -46,7 +48,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase { } public void testCreateSearchBody_GivenQueryAndAggs() { - ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", "{\"my_aggs\":{}}", null, "time"); + ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, "{\"my_aggs\":{}}", null, "time"); assertTrue(queryBuilder.isAggregated()); @@ -61,7 +63,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase { } public void testCreateDataSummaryQuery_GivenQueryOnly() { - ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, null, "@timestamp"); + ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null, null, "@timestamp"); assertFalse(queryBuilder.isAggregated()); @@ -78,7 +80,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase { } public void testCreateDataSummaryQuery_GivenQueryAndScriptFields() { - ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, + ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null, "{\"test1\":{\"script\": \"...\"}}", "@timestamp"); assertFalse(queryBuilder.isAggregated()); @@ -96,7 +98,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase { } public void testCreateDataSummaryQuery_GivenQueryAndAggs() { - ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", "{\"my_aggs\":{}}", null, "@timestamp"); + ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, "{\"my_aggs\":{}}", null, "@timestamp"); assertTrue(queryBuilder.isAggregated()); @@ -113,7 +115,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase { } public void testLogQueryInfo_GivenNoAggsNoFields() { - ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, null, "@timestamp"); + ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null, null, "@timestamp"); Logger logger = mock(Logger.class); queryBuilder.logQueryInfo(logger); @@ -122,7 +124,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase { } public void testLogQueryInfo() { - ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", "{\"my_aggs\":{ \"foo\": \"bar\" }}", + ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, "{\"my_aggs\":{ \"foo\": \"bar\" }}", null, "@timestamp"); Logger logger = mock(Logger.class); diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/schedulers_crud.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/schedulers_crud.yaml index f0e350bef2b..b7dd35b111f 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/schedulers_crud.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/schedulers_crud.yaml @@ -104,6 +104,20 @@ setup: "types":["type-bar"] } +--- +"Test put scheduler with invalid query": + - do: + catch: /parsing_exception/ + xpack.prelert.put_scheduler: + scheduler_id: test-scheduler-1 + body: > + { + "job_id":"job-1", + "indexes":["index-foo"], + "types":["type-bar"], + "query":{"match_all_mispelled":{}} + } + --- "Test delete scheduler with missing id": - do: