Strict parse search parts of schedulerConfig (elastic/elasticsearch#600)

* Strict parse search parts of schedulerConfig

This commit adds methods to build the typed objects
for the search parts of a scheduler config. Those are:
query, aggregations and script_fields.

As scheduler configs are stored in the cluster state and parsing
the search parts requires a SearchRequestParsers object, we cannot
store them as typed fields. Instead, they are now stored as
BytesReferences.

This change is in preparation for switching over to using a
client based data extractor.

Point summary of changes:

- query, aggregations and scriptFields are now stored as BytesReference
- adds methods to build the corresponding typed objects
- putting a scheduler now builds the search parts to to validate that
the config is valid

Relates to elastic/elasticsearch#478



Original commit: elastic/x-pack-elasticsearch@e6d5a85871
This commit is contained in:
Dimitris Athanasiou 2016-12-21 10:57:55 +00:00 committed by GitHub
parent 4033c93a13
commit 3e494070bf
25 changed files with 371 additions and 829 deletions

View File

@ -196,7 +196,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
autodetectProcessFactory, normalizerFactory);
ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider,
// norelease: we will no longer need to pass the client here after we switch to a client based data extractor
new HttpDataExtractorFactory(client),
new HttpDataExtractorFactory(client, searchRequestParsers),
System::currentTimeMillis);
JobLifeCycleService jobLifeCycleService =

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
@ -173,11 +174,15 @@ public class PutSchedulerAction extends Action<PutSchedulerAction.Request, PutSc
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final SearchRequestParsers searchRequestParsers;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
SearchRequestParsers searchRequestParsers) {
super(settings, PutSchedulerAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.searchRequestParsers = searchRequestParsers;
}
@Override
@ -213,7 +218,7 @@ public class PutSchedulerAction extends Action<PutSchedulerAction.Request, PutSc
private ClusterState putScheduler(Request request, ClusterState clusterState) {
PrelertMetadata currentMetadata = clusterState.getMetaData().custom(PrelertMetadata.TYPE);
PrelertMetadata newMetadata = new PrelertMetadata.Builder(currentMetadata)
.putScheduler(request.getScheduler()).build();
.putScheduler(request.getScheduler(), searchRequestParsers).build();
return ClusterState.builder(clusterState).metaData(
MetaData.builder(clusterState.getMetaData()).putCustom(PrelertMetadata.TYPE, newMetadata).build())
.build();

View File

@ -21,6 +21,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.scheduler.Scheduler;
@ -272,7 +273,7 @@ public class PrelertMetadata implements MetaData.Custom {
return this;
}
public Builder putScheduler(SchedulerConfig schedulerConfig) {
public Builder putScheduler(SchedulerConfig schedulerConfig, SearchRequestParsers searchRequestParsers) {
if (schedulers.containsKey(schedulerConfig.getId())) {
throw new ResourceAlreadyExistsException("A scheduler with id [" + schedulerConfig.getId() + "] already exists");
}
@ -287,6 +288,12 @@ public class PrelertMetadata implements MetaData.Custom {
+ "] already exists for job [" + jobId + "]");
}
ScheduledJobValidator.validate(schedulerConfig, job);
// Check the query, aggregations and script_fields can be built
schedulerConfig.buildQuery(searchRequestParsers.queryParsers);
schedulerConfig.buildAggregations(searchRequestParsers.queryParsers, searchRequestParsers.aggParsers);
schedulerConfig.buildScriptFields(searchRequestParsers.queryParsers);
return putScheduler(new Scheduler(schedulerConfig, SchedulerStatus.STOPPED));
}

View File

@ -1,204 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.autodetect.writer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.logging.log4j.Logger;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
/**
* Reads highly hierarchical JSON structures. Currently very much geared to the
* outputIndex of Elasticsearch's aggregations. Could be made more generic in the
* future if another slightly different hierarchical JSON structure needs to be
* parsed.
*/
class AggregatedJsonRecordReader extends AbstractJsonRecordReader {
private static final String AGG_KEY = "key";
private static final String AGG_VALUE = "value";
private boolean isFirstTime = true;
private final List<String> nestingOrder;
private List<String> nestedValues;
private String latestDocCount;
/**
* Create a reader that simulates simple records given a hierarchical JSON
* structure where each field is at a progressively deeper level of nesting.
*/
AggregatedJsonRecordReader(JsonParser parser, Map<String, Integer> fieldMap, String recordHoldingField, Logger logger,
List<String> nestingOrder) {
super(parser, fieldMap, recordHoldingField, logger);
this.nestingOrder = Objects.requireNonNull(nestingOrder);
if (this.nestingOrder.isEmpty()) {
throw new IllegalArgumentException(
"Expected nesting order for aggregated JSON must not be empty");
}
nestedValues = new ArrayList<>();
}
/**
* Read forwards in the JSON until enough information has been gathered to
* write to the record array.
*
* @param record Read fields are written to this array. This array is first filled with empty
* strings and will never contain a <code>null</code>
* @param gotFields boolean array each element is true if that field
* was read
* @return The number of fields in the aggregated hierarchy, or -1 if nothing was read
* because the end of the stream was reached
*/
@Override
public long read(String[] record, boolean[] gotFields) throws IOException {
initArrays(record, gotFields);
latestDocCount = null;
fieldCount = 0;
if (isFirstTime) {
clearNestedLevel();
consumeToRecordHoldingField();
isFirstTime = false;
}
boolean gotInnerValue = false;
JsonToken token = tryNextTokenOrReadToEndOnError();
while (!(token == JsonToken.END_OBJECT && nestedLevel == 0)) {
if (token == null) {
break;
}
if (token == JsonToken.START_OBJECT) {
++nestedLevel;
} else if (token == JsonToken.END_OBJECT) {
if (gotInnerValue) {
completeRecord(record, gotFields);
}
--nestedLevel;
if (nestedLevel % 2 == 0 && !nestedValues.isEmpty()) {
nestedValues.remove(nestedValues.size() - 1);
}
if (gotInnerValue) {
break;
}
} else if (token == JsonToken.FIELD_NAME) {
if (((nestedLevel + 1) / 2) == nestingOrder.size()) {
gotInnerValue = parseFieldValuePair(record, gotFields) || gotInnerValue;
}
// Alternate nesting levels are arbitrary labels that can be
// ignored.
else if (nestedLevel > 0 && nestedLevel % 2 == 0) {
String fieldName = parser.getCurrentName();
if (fieldName.equals(AGG_KEY)) {
token = tryNextTokenOrReadToEndOnError();
if (token == null) {
break;
}
nestedValues.add(parser.getText());
} else if (fieldName.equals(SchedulerConfig.DOC_COUNT)) {
token = tryNextTokenOrReadToEndOnError();
if (token == null) {
break;
}
latestDocCount = parser.getText();
}
}
}
token = tryNextTokenOrReadToEndOnError();
}
// null token means EOF; nestedLevel 0 means we've reached the end of
// the aggregations object
if (token == null || nestedLevel == 0) {
return -1;
}
return fieldCount;
}
@Override
protected void clearNestedLevel() {
nestedLevel = 0;
}
private boolean parseFieldValuePair(String[] record, boolean[] gotFields) throws IOException {
String fieldName = parser.getCurrentName();
JsonToken token = tryNextTokenOrReadToEndOnError();
if (token == null) {
return false;
}
if (token == JsonToken.START_OBJECT) {
++nestedLevel;
return false;
}
if (token == JsonToken.START_ARRAY) {
// We don't expect arrays at this level of aggregated inputIndex
// (although we do expect arrays at higher levels). Consume the
// whole array but do nothing with it.
while (token != JsonToken.END_ARRAY) {
token = tryNextTokenOrReadToEndOnError();
}
return false;
}
++fieldCount;
if (AGG_VALUE.equals(fieldName)) {
fieldName = nestingOrder.get(nestingOrder.size() - 1);
}
Integer index = fieldMap.get(fieldName);
if (index == null) {
return false;
}
String fieldValue = parser.getText();
record[index] = fieldValue;
gotFields[index] = true;
return true;
}
private void completeRecord(String[] record, boolean[] gotFields) throws IOException {
// This loop should do time plus the by/over/partition/influencer fields
int numberOfFields = Math.min(nestingOrder.size() - 1, nestedValues.size());
if (nestingOrder.size() - 1 != nestedValues.size()) {
logger.warn("Aggregation inputIndex does not match expectation: expected field order: "
+ nestingOrder + " actual values: " + nestedValues);
}
fieldCount += numberOfFields;
for (int i = 0; i < numberOfFields; ++i) {
String fieldName = nestingOrder.get(i);
Integer index = fieldMap.get(fieldName);
if (index == null) {
continue;
}
String fieldValue = nestedValues.get(i);
record[index] = fieldValue;
gotFields[index] = true;
}
// This adds the summary count field
if (latestDocCount != null) {
++fieldCount;
Integer index = fieldMap.get(SchedulerConfig.DOC_COUNT);
if (index != null) {
record[index] = latestDocCount;
gotFields[index] = true;
}
}
}
}

View File

@ -21,16 +21,17 @@ import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.StartSchedulerAction;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.scheduler.ScheduledJobRunner;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.prelert.utils.SchedulerStatusObserver;
import java.io.IOException;
@ -44,7 +45,7 @@ public class RestStartSchedulerAction extends BaseRestHandler {
@Inject
public RestStartSchedulerAction(Settings settings, RestController controller, ThreadPool threadPool,
ClusterService clusterService) {
ClusterService clusterService, SearchRequestParsers searchRequestParsers) {
super(settings);
this.clusterService = clusterService;
this.schedulerStatusObserver = new SchedulerStatusObserver(threadPool, clusterService);

View File

@ -26,7 +26,6 @@ import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.config.DefaultFrequency;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder;

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.prelert.scheduler;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Job;
@ -24,8 +25,7 @@ public final class ScheduledJobValidator {
if (analysisConfig.getLatency() != null && analysisConfig.getLatency() > 0) {
throw new IllegalArgumentException(Messages.getMessage(Messages.SCHEDULER_DOES_NOT_SUPPORT_JOB_WITH_LATENCY));
}
if (schedulerConfig.getAggregations() != null
&& !SchedulerConfig.DOC_COUNT.equals(analysisConfig.getSummaryCountFieldName())) {
if (schedulerConfig.hasAggregations() && !SchedulerConfig.DOC_COUNT.equals(analysisConfig.getSummaryCountFieldName())) {
throw new IllegalArgumentException(
Messages.getMessage(Messages.SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, SchedulerConfig.DOC_COUNT));
}

View File

@ -29,7 +29,7 @@ public class Scheduler extends AbstractDiffable<Scheduler> implements ToXContent
public static final ParseField RESULTS_FIELD = new ParseField("schedulers");
public static final ConstructingObjectParser<Scheduler, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>("scheduler",
a -> new Scheduler((SchedulerConfig) a[0], (SchedulerStatus) a[1]));
a -> new Scheduler(((SchedulerConfig.Builder) a[0]).build(), (SchedulerStatus) a[1]));
static {
PARSER.declareObject(ConstructingObjectParser.constructorArg(), SchedulerConfig.PARSER, CONFIG_FIELD);

View File

@ -8,12 +8,25 @@ package org.elasticsearch.xpack.prelert.scheduler;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
@ -22,12 +35,9 @@ import org.elasticsearch.xpack.prelert.utils.PrelertStrings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
/**
* Scheduler configuration options. Describes where to proactively pull input
@ -61,7 +71,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
public static final ParseField AGGS = new ParseField("aggs");
public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
public static final ObjectParser<Builder, ParseFieldMatcherSupplier> PARSER = new ObjectParser<>("schedule_config", Builder::new);
public static final ObjectParser<Builder, ParseFieldMatcherSupplier> PARSER = new ObjectParser<>("scheduler_config", Builder::new);
static {
PARSER.declareString(Builder::setId, ID);
@ -70,34 +80,26 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
PARSER.declareStringArray(Builder::setTypes, TYPES);
PARSER.declareLong(Builder::setQueryDelay, QUERY_DELAY);
PARSER.declareLong(Builder::setFrequency, FREQUENCY);
PARSER.declareObject(Builder::setQuery, (p, c) -> {
try {
return p.map();
} catch (IOException e) {
throw new RuntimeException(e);
}
}, QUERY);
PARSER.declareObject(Builder::setAggregations, (p, c) -> {
try {
return p.map();
} catch (IOException e) {
throw new RuntimeException(e);
}
}, AGGREGATIONS);
PARSER.declareObject(Builder::setAggregations, (p, c) -> {
try {
return p.map();
} catch (IOException e) {
throw new RuntimeException(e);
}
}, AGGS);
PARSER.declareObject(Builder::setScriptFields, (p, c) -> {
try {
return p.map();
} catch (IOException e) {
throw new RuntimeException(e);
}
}, SCRIPT_FIELDS);
PARSER.declareField((parser, builder, aVoid) -> {
XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent());
XContentHelper.copyCurrentStructure(contentBuilder.generator(), parser);
builder.setQuery(contentBuilder.bytes());
}, QUERY, ObjectParser.ValueType.OBJECT);
PARSER.declareField((parser, builder, aVoid) -> {
XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent());
XContentHelper.copyCurrentStructure(contentBuilder.generator(), parser);
builder.setAggregations(contentBuilder.bytes());
}, AGGREGATIONS, ObjectParser.ValueType.OBJECT);
PARSER.declareField((parser, builder, aVoid) -> {
XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent());
XContentHelper.copyCurrentStructure(contentBuilder.generator(), parser);
builder.setAggregations(contentBuilder.bytes());
}, AGGS, ObjectParser.ValueType.OBJECT);
PARSER.declareField((parser, builder, aVoid) -> {
XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent());
XContentHelper.copyCurrentStructure(contentBuilder.generator(), parser);
builder.setScriptFields(contentBuilder.bytes());
}, SCRIPT_FIELDS, ObjectParser.ValueType.OBJECT);
PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE);
}
@ -116,16 +118,13 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
private final List<String> indexes;
private final List<String> types;
// NORELEASE: These 4 fields can be reduced to a single
// SearchSourceBuilder field holding the entire source:
private final Map<String, Object> query;
private final Map<String, Object> aggregations;
private final Map<String, Object> scriptFields;
private final BytesReference query;
private final BytesReference aggregations;
private final BytesReference scriptFields;
private final Integer scrollSize;
private SchedulerConfig(String id, String jobId, Long queryDelay, Long frequency, List<String> indexes, List<String> types,
Map<String, Object> query, Map<String, Object> aggregations, Map<String, Object> scriptFields,
Integer scrollSize) {
BytesReference query, BytesReference aggregations, BytesReference scriptFields, Integer scrollSize) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
@ -153,21 +152,9 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
} else {
this.types = null;
}
if (in.readBoolean()) {
this.query = in.readMap();
} else {
this.query = null;
}
if (in.readBoolean()) {
this.aggregations = in.readMap();
} else {
this.aggregations = null;
}
if (in.readBoolean()) {
this.scriptFields = in.readMap();
} else {
this.scriptFields = null;
}
this.query = in.readOptionalBytesReference();
this.aggregations = in.readOptionalBytesReference();
this.scriptFields = in.readOptionalBytesReference();
this.scrollSize = in.readOptionalVInt();
}
@ -207,82 +194,74 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
return this.types;
}
/**
* For the ELASTICSEARCH data source only, the Elasticsearch query DSL
* representing the query to submit to Elasticsearch to get the input data.
* This should not include time bounds, as these are added separately. This
* class does not attempt to interpret the query. The map will be converted
* back to an arbitrary JSON object.
*
* @return The search query, or <code>null</code> if not set.
*/
public Map<String, Object> getQuery() {
return this.query;
}
/**
* For the ELASTICSEARCH data source only, get the size of documents to be
* retrieved from each shard via a scroll search
*
* @return The size of documents to be retrieved from each shard via a
* scroll search
*/
public Integer getScrollSize() {
return this.scrollSize;
}
/**
* For the ELASTICSEARCH data source only, optional Elasticsearch
* script_fields to add to the search to be submitted to Elasticsearch to
* get the input data. This class does not attempt to interpret the script
* fields. The map will be converted back to an arbitrary JSON object.
*
* @return The script fields, or <code>null</code> if not set.
*/
public Map<String, Object> getScriptFields() {
return this.scriptFields;
Map<String, Object> getQueryAsMap() {
return XContentHelper.convertToMap(query, true).v2();
}
/**
* For the ELASTICSEARCH data source only, optional Elasticsearch
* aggregations to apply to the search to be submitted to Elasticsearch to
* get the input data. This class does not attempt to interpret the
* aggregations. The map will be converted back to an arbitrary JSON object.
*
* @return The aggregations, or <code>null</code> if not set.
*/
public Map<String, Object> getAggregations() {
return this.aggregations;
Map<String, Object> getAggregationsAsMap() {
return XContentHelper.convertToMap(aggregations, true).v2();
}
/**
* Build the list of fields expected in the output from aggregations
* submitted to Elasticsearch.
*
* @return The list of fields, or empty list if there are no aggregations.
*/
public List<String> buildAggregatedFieldList() {
Map<String, Object> aggs = getAggregations();
if (aggs == null) {
Map<String, Object> getScriptFieldsAsMap() {
return XContentHelper.convertToMap(scriptFields, true).v2();
}
public QueryBuilder buildQuery(IndicesQueriesRegistry queryParsers) {
if (query == null) {
return QueryBuilders.matchAllQuery();
}
XContentParser parser = createParser(QUERY, query);
QueryParseContext queryParseContext = new QueryParseContext(queryParsers, parser, ParseFieldMatcher.STRICT);
try {
return queryParseContext.parseInnerQueryBuilder().orElse(QueryBuilders.matchAllQuery());
} catch (IOException e) {
throw ExceptionsHelper.parseException(QUERY, e);
}
}
public boolean hasAggregations() {
return aggregations != null;
}
public AggregatorFactories.Builder buildAggregations(IndicesQueriesRegistry queryParsers, AggregatorParsers aggParsers) {
if (!hasAggregations()) {
return null;
}
XContentParser parser = createParser(AGGREGATIONS, aggregations);
QueryParseContext queryParseContext = new QueryParseContext(queryParsers, parser, ParseFieldMatcher.STRICT);
try {
return aggParsers.parseAggregators(queryParseContext);
} catch (IOException e) {
throw ExceptionsHelper.parseException(AGGREGATIONS, e);
}
}
public List<SearchSourceBuilder.ScriptField> buildScriptFields(IndicesQueriesRegistry queryParsers) {
if (scriptFields == null) {
return Collections.emptyList();
}
SortedMap<Integer, String> orderedFields = new TreeMap<>();
scanSubLevel(aggs, 0, orderedFields);
return new ArrayList<>(orderedFields.values());
List<SearchSourceBuilder.ScriptField> parsedScriptFields = new ArrayList<>();
XContentParser parser = createParser(SCRIPT_FIELDS, scriptFields);
try {
QueryParseContext queryParseContext = new QueryParseContext(queryParsers, parser, ParseFieldMatcher.STRICT);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
parsedScriptFields.add(new SearchSourceBuilder.ScriptField(queryParseContext));
}
} catch (IOException e) {
throw ExceptionsHelper.parseException(SCRIPT_FIELDS, e);
}
return parsedScriptFields;
}
@SuppressWarnings("unchecked")
private void scanSubLevel(Map<String, Object> subLevel, int depth, SortedMap<Integer, String> orderedFields) {
for (Map.Entry<String, Object> entry : subLevel.entrySet()) {
Object value = entry.getValue();
if (value instanceof Map<?, ?>) {
scanSubLevel((Map<String, Object>) value, depth + 1, orderedFields);
} else if (value instanceof String && FIELD.equals(entry.getKey())) {
orderedFields.put(depth, (String) value);
}
private XContentParser createParser(ParseField parseField, BytesReference bytesReference) {
try {
return XContentFactory.xContent(query).createParser(query);
} catch (IOException e) {
throw ExceptionsHelper.parseException(parseField, e);
}
}
@ -304,24 +283,9 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
} else {
out.writeBoolean(false);
}
if (query != null) {
out.writeBoolean(true);
out.writeMap(query);
} else {
out.writeBoolean(false);
}
if (aggregations != null) {
out.writeBoolean(true);
out.writeMap(aggregations);
} else {
out.writeBoolean(false);
}
if (scriptFields != null) {
out.writeBoolean(true);
out.writeMap(scriptFields);
} else {
out.writeBoolean(false);
}
out.writeOptionalBytesReference(query);
out.writeOptionalBytesReference(aggregations);
out.writeOptionalBytesReference(scriptFields);
out.writeOptionalVInt(scrollSize);
}
@ -349,13 +313,13 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
builder.field(TYPES.getPreferredName(), types);
}
if (query != null) {
builder.field(QUERY.getPreferredName(), query);
builder.field(QUERY.getPreferredName(), getQueryAsMap());
}
if (aggregations != null) {
builder.field(AGGREGATIONS.getPreferredName(), aggregations);
builder.field(AGGREGATIONS.getPreferredName(), getAggregationsAsMap());
}
if (scriptFields != null) {
builder.field(SCRIPT_FIELDS.getPreferredName(), scriptFields);
builder.field(SCRIPT_FIELDS.getPreferredName(), getScriptFieldsAsMap());
}
if (scrollSize != null) {
builder.field(SCROLL_SIZE.getPreferredName(), scrollSize);
@ -388,13 +352,13 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
&& Objects.equals(this.types, that.types)
&& Objects.equals(this.query, that.query)
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(this.getAggregations(), that.getAggregations())
&& Objects.equals(this.aggregations, that.aggregations)
&& Objects.equals(this.scriptFields, that.scriptFields);
}
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, getAggregations(), scriptFields);
return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, aggregations, scriptFields);
}
public static class Builder {
@ -402,29 +366,18 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
private static final int DEFAULT_SCROLL_SIZE = 1000;
private static final long DEFAULT_ELASTICSEARCH_QUERY_DELAY = 60L;
/**
* The default query for elasticsearch searches
*/
private static final String MATCH_ALL_ES_QUERY = "match_all";
private String id;
private String jobId;
private Long queryDelay;
private Long frequency;
private List<String> indexes = Collections.emptyList();
private List<String> types = Collections.emptyList();
// NORELEASE: use Collections.emptyMap() instead of null as initial
// value:
// NORELEASE: Use SearchSourceBuilder
private Map<String, Object> query = null;
private Map<String, Object> aggregations = null;
private Map<String, Object> scriptFields = null;
private BytesReference query;
private BytesReference aggregations;
private BytesReference scriptFields;
private Integer scrollSize;
public Builder() {
Map<String, Object> query = new HashMap<>();
query.put(MATCH_ALL_ES_QUERY, new HashMap<String, Object>());
setQuery(query);
setQueryDelay(DEFAULT_ELASTICSEARCH_QUERY_DELAY);
setScrollSize(DEFAULT_SCROLL_SIZE);
}
@ -482,21 +435,49 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
this.frequency = frequency;
}
public void setQuery(Map<String, Object> query) {
// NORELEASE: make use of Collections.unmodifiableMap(...)
private void setQuery(BytesReference query) {
this.query = Objects.requireNonNull(query);
}
public void setAggregations(Map<String, Object> aggregations) {
// NORELEASE: make use of Collections.unmodifiableMap(...)
public void setQuery(QueryBuilder query) {
this.query = xContentToBytes(query);
}
private BytesReference xContentToBytes(ToXContent value) {
try {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
return value.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS).bytes();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void setAggregations(BytesReference aggregations) {
this.aggregations = Objects.requireNonNull(aggregations);
}
public void setScriptFields(Map<String, Object> scriptFields) {
// NORELEASE: make use of Collections.unmodifiableMap(...)
public void setAggregations(AggregatorFactories.Builder aggregations) {
this.aggregations = xContentToBytes(aggregations);
}
private void setScriptFields(BytesReference scriptFields) {
this.scriptFields = Objects.requireNonNull(scriptFields);
}
public void setScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
try {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
jsonBuilder.startObject();
for (SearchSourceBuilder.ScriptField scriptField : scriptFields) {
scriptField.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
}
jsonBuilder.endObject();
this.scriptFields = jsonBuilder.bytes();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void setScrollSize(int scrollSize) {
if (scrollSize < 0) {
String msg = Messages.getMessage(Messages.SCHEDULER_CONFIG_INVALID_OPTION_VALUE,

View File

@ -37,7 +37,7 @@ public class ElasticsearchQueryBuilder {
+ "\"query\": {"
+ "\"bool\": {"
+ "\"filter\": ["
+ "{%s},"
+ "%s,"
+ "{"
+ "\"range\": {"
+ "\"%s\": {"

View File

@ -11,33 +11,43 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class HttpDataExtractorFactory implements DataExtractorFactory {
private static final Logger LOGGER = Loggers.getLogger(HttpDataExtractorFactory.class);
private final Client client;
private final SearchRequestParsers searchRequestParsers;
public HttpDataExtractorFactory(Client client) {
this.client = client;
public HttpDataExtractorFactory(Client client, SearchRequestParsers searchRequestParsers) {
this.client = Objects.requireNonNull(client);
this.searchRequestParsers = Objects.requireNonNull(searchRequestParsers);
}
@Override
public DataExtractor newExtractor(SchedulerConfig schedulerConfig, Job job) {
String timeField = job.getDataDescription().getTimeField();
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(
stringifyElasticsearchQuery(schedulerConfig.getQuery()),
stringifyElasticsearchAggregations(schedulerConfig.getAggregations()),
stringifyElasticsearchScriptFields(schedulerConfig.getScriptFields()),
xContentToJson(schedulerConfig.buildQuery(searchRequestParsers.queryParsers)),
stringifyAggregations(schedulerConfig.buildAggregations(searchRequestParsers.queryParsers,
searchRequestParsers.aggParsers)),
stringifyScriptFields(schedulerConfig.buildScriptFields(searchRequestParsers.queryParsers)),
timeField);
HttpRequester httpRequester = new HttpRequester();
ElasticsearchUrlBuilder urlBuilder = ElasticsearchUrlBuilder
@ -53,26 +63,38 @@ public class HttpDataExtractorFactory implements DataExtractorFactory {
return baseUrl;
}
String stringifyElasticsearchQuery(Map<String, Object> queryMap) {
String queryStr = writeMapAsJson(queryMap);
if (queryStr.startsWith("{") && queryStr.endsWith("}")) {
return queryStr.substring(1, queryStr.length() - 1);
private String xContentToJson(ToXContent xContent) {
try {
XContentBuilder jsonBuilder = JsonXContent.contentBuilder();
xContent.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
return jsonBuilder.string();
} catch (IOException e) {
throw new RuntimeException(e);
}
return queryStr;
}
String stringifyElasticsearchAggregations(Map<String, Object> aggregationsMap) {
if (aggregationsMap != null) {
return writeMapAsJson(aggregationsMap);
String stringifyAggregations(AggregatorFactories.Builder aggregations) {
if (aggregations == null) {
return null;
}
return null;
return xContentToJson(aggregations);
}
String stringifyElasticsearchScriptFields(Map<String, Object> scriptFieldsMap) {
if (scriptFieldsMap != null) {
return writeMapAsJson(scriptFieldsMap);
String stringifyScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
if (scriptFields.isEmpty()) {
return null;
}
try {
XContentBuilder jsonBuilder = JsonXContent.contentBuilder();
jsonBuilder.startObject();
for (SearchSourceBuilder.ScriptField scriptField : scriptFields) {
scriptField.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
}
jsonBuilder.endObject();
return jsonBuilder.string();
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
}
private static String writeMapAsJson(Map<String, Object> map) {

View File

@ -6,9 +6,11 @@
package org.elasticsearch.xpack.prelert.utils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
@ -39,6 +41,10 @@ public class ExceptionsHelper {
return new ElasticsearchStatusException(msg, RestStatus.CONFLICT);
}
public static ElasticsearchParseException parseException(ParseField parseField, Throwable cause) {
throw new ElasticsearchParseException("Failed to parse [" + parseField.getPreferredName() + "]", cause);
}
/**
* A more REST-friendly Object.requireNonNull()
*/

View File

@ -5,6 +5,11 @@
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.prelert.action.GetSchedulersAction.Response;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.scheduler.Scheduler;
@ -14,7 +19,6 @@ import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class GetSchedulersActionResponseTests extends AbstractStreamableTestCase<Response> {
@ -34,16 +38,24 @@ public class GetSchedulersActionResponseTests extends AbstractStreamableTestCase
schedulerConfig.setFrequency(randomPositiveLong());
schedulerConfig.setQueryDelay(randomPositiveLong());
if (randomBoolean()) {
schedulerConfig.setQuery(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
schedulerConfig.setQuery(QueryBuilders.termQuery(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
if (randomBoolean()) {
schedulerConfig.setScriptFields(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
int scriptsSize = randomInt(3);
List<SearchSourceBuilder.ScriptField> scriptFields = new ArrayList<>(scriptsSize);
for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) {
scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)),
randomBoolean()));
}
schedulerConfig.setScriptFields(scriptFields);
}
if (randomBoolean()) {
schedulerConfig.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
schedulerConfig.setAggregations(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
AggregatorFactories.Builder aggsBuilder = new AggregatorFactories.Builder();
aggsBuilder.addAggregator(AggregationBuilders.avg(randomAsciiOfLength(10)));
schedulerConfig.setAggregations(aggsBuilder);
}
schedulerList.add(schedulerConfig.build());

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.xpack.prelert.action.StopSchedulerAction.Request;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
@ -18,6 +19,7 @@ import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
import static org.elasticsearch.xpack.prelert.scheduler.ScheduledJobRunnerTests.createScheduledJob;
import static org.elasticsearch.xpack.prelert.scheduler.ScheduledJobRunnerTests.createSchedulerConfig;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
public class StopSchedulerActionRequestTests extends AbstractStreamableTestCase<StopSchedulerAction.Request> {
@ -41,13 +43,13 @@ public class StopSchedulerActionRequestTests extends AbstractStreamableTestCase<
SchedulerConfig schedulerConfig = createSchedulerConfig("foo", "foo").build();
PrelertMetadata prelertMetadata2 = new PrelertMetadata.Builder().putJob(job, false)
.putScheduler(schedulerConfig)
.putScheduler(schedulerConfig, mock(SearchRequestParsers.class))
.build();
e = expectThrows(ElasticsearchStatusException.class, () -> StopSchedulerAction.validate("foo", prelertMetadata2));
assertThat(e.getMessage(), equalTo("scheduler already stopped, expected scheduler status [STARTED], but got [STOPPED]"));
PrelertMetadata prelertMetadata3 = new PrelertMetadata.Builder().putJob(job, false)
.putScheduler(schedulerConfig)
.putScheduler(schedulerConfig, mock(SearchRequestParsers.class))
.updateSchedulerStatus("foo", SchedulerStatus.STARTED)
.build();
StopSchedulerAction.validate("foo", prelertMetadata3);

View File

@ -489,7 +489,9 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
builder.setAnalysisConfig(analysisConfig);
builder.setAnalysisLimits(new AnalysisLimits(randomPositiveLong(), randomPositiveLong()));
if (randomBoolean()) {
builder.setDataDescription(new DataDescription.Builder());
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(randomFrom(DataDescription.DataFormat.values()));
builder.setDataDescription(dataDescription);
}
String[] outputs;
TransformType[] transformTypes ;

View File

@ -16,11 +16,13 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.JobTests;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfigTests;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus;
import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase;
@ -33,13 +35,14 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
public class PrelertMetadataTests extends AbstractSerializingTestCase<PrelertMetadata> {
@Override
protected PrelertMetadata createTestInstance() {
PrelertMetadata.Builder builder = new PrelertMetadata.Builder();
int numJobs = randomIntBetween(0, 4);
int numJobs = randomIntBetween(0, 10);
for (int i = 0; i < numJobs; i++) {
Job job = JobTests.createRandomizedJob();
builder.putJob(job, false);
@ -52,10 +55,13 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase<PrelertMet
}
}
}
// NORELEASE TODO: randomize scheduler status:
// if (randomBoolean()) {
// builder.updateSchedulerStatus(job.getId(), SchedulerStatus.STARTED);
// }
if (job.getDataDescription() != null && job.getDataDescription().getFormat() == DataDescription.DataFormat.ELASTICSEARCH) {
SchedulerConfig schedulerConfig = SchedulerConfigTests.createRandomizedSchedulerConfig(job.getId());
builder.putScheduler(schedulerConfig, mock(SearchRequestParsers.class));
if (randomBoolean()) {
builder.updateSchedulerStatus(schedulerConfig.getId(), SchedulerStatus.STARTED);
}
}
}
return builder.build();
}
@ -154,7 +160,7 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase<PrelertMet
SchedulerConfig schedulerConfig1 = createSchedulerConfig("scheduler1", job1.getId()).build();
PrelertMetadata.Builder builder = new PrelertMetadata.Builder();
builder.putJob(job1, false);
builder.putScheduler(schedulerConfig1);
builder.putScheduler(schedulerConfig1, mock(SearchRequestParsers.class));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder.removeJob(job1.getId()));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
@ -172,7 +178,7 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase<PrelertMet
SchedulerConfig schedulerConfig1 = createSchedulerConfig("scheduler1", job1.getId()).build();
PrelertMetadata.Builder builder = new PrelertMetadata.Builder();
builder.putJob(job1, false);
builder.putScheduler(schedulerConfig1);
builder.putScheduler(schedulerConfig1, mock(SearchRequestParsers.class));
PrelertMetadata result = builder.build();
assertThat(result.getJobs().get("foo"), sameInstance(job1));
@ -192,7 +198,7 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase<PrelertMet
SchedulerConfig schedulerConfig1 = createSchedulerConfig("scheduler1", "missing-job").build();
PrelertMetadata.Builder builder = new PrelertMetadata.Builder();
expectThrows(ResourceNotFoundException.class, () -> builder.putScheduler(schedulerConfig1));
expectThrows(ResourceNotFoundException.class, () -> builder.putScheduler(schedulerConfig1, null));
}
public void testPutScheduler_failBecauseSchedulerIdIsAlreadyTaken() {
@ -200,9 +206,9 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase<PrelertMet
SchedulerConfig schedulerConfig1 = createSchedulerConfig("scheduler1", job1.getId()).build();
PrelertMetadata.Builder builder = new PrelertMetadata.Builder();
builder.putJob(job1, false);
builder.putScheduler(schedulerConfig1);
builder.putScheduler(schedulerConfig1, mock(SearchRequestParsers.class));
expectThrows(ResourceAlreadyExistsException.class, () -> builder.putScheduler(schedulerConfig1));
expectThrows(ResourceAlreadyExistsException.class, () -> builder.putScheduler(schedulerConfig1, null));
}
public void testPutScheduler_failBecauseJobAlreadyHasScheduler() {
@ -211,9 +217,10 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase<PrelertMet
SchedulerConfig schedulerConfig2 = createSchedulerConfig("scheduler2", job1.getId()).build();
PrelertMetadata.Builder builder = new PrelertMetadata.Builder();
builder.putJob(job1, false);
builder.putScheduler(schedulerConfig1);
builder.putScheduler(schedulerConfig1, mock(SearchRequestParsers.class));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder.putScheduler(schedulerConfig2));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> builder.putScheduler(schedulerConfig2, mock(SearchRequestParsers.class)));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
}
@ -226,7 +233,7 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase<PrelertMet
PrelertMetadata.Builder builder = new PrelertMetadata.Builder();
builder.putJob(job1.build(), false);
expectThrows(IllegalArgumentException.class, () -> builder.putScheduler(schedulerConfig1));
expectThrows(IllegalArgumentException.class, () -> builder.putScheduler(schedulerConfig1, mock(SearchRequestParsers.class)));
}
public void testRemoveScheduler_failBecauseSchedulerStarted() {
@ -234,7 +241,7 @@ public class PrelertMetadataTests extends AbstractSerializingTestCase<PrelertMet
SchedulerConfig schedulerConfig1 = createSchedulerConfig("scheduler1", job1.getId()).build();
PrelertMetadata.Builder builder = new PrelertMetadata.Builder();
builder.putJob(job1, false);
builder.putScheduler(schedulerConfig1);
builder.putScheduler(schedulerConfig1, mock(SearchRequestParsers.class));
builder.updateStatus("foo", JobStatus.OPENING, null);
builder.updateStatus("foo", JobStatus.OPENED, null);
builder.updateSchedulerStatus("scheduler1", SchedulerStatus.STARTED);

View File

@ -1,250 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.autodetect.writer;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.Logger;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.test.ESTestCase;
public class AggregatedJsonRecordReaderTests extends ESTestCase {
public void testRead_WithNoTermField() throws IOException {
String data = "{" + "\"took\" : 88," + "\"timed_out\" : false,"
+ "\"_shards\" : { \"total\" : 5, \"successful\" : 5, \"failed\" : 0 },"
+ "\"hits\" : { \"total\" : 86275, \"max_score\" : 0.0, \"hits\" : [ ] }," + "\"aggregations\" : {" + "\"time_level\" : {"
+ "\"buckets\" : [ {" + "\"key_as_string\" : \"2015-12-07T00:00:00.000Z\", \"key\" : 1449446400000, \"doc_count\" : 649,"
+ "\"metric_level\" : { \"value\" : 106.72129514140468 }" + "}," + "{"
+ "\"key_as_string\" : \"2015-12-07T01:00:00.000Z\", \"key\" : 1449450000000, \"doc_count\" : 627,"
+ "\"metric_level\" : { \"value\" : 103.64676252462097 }" + "} ]" + "}" + "}" + "}";
JsonParser parser = createParser(data);
Map<String, Integer> fieldMap = createFieldMapWithNoTermField();
List<String> nestingOrder = createNestingOrderWithNoTermField();
AggregatedJsonRecordReader reader = new AggregatedJsonRecordReader(parser, fieldMap, "aggregations", mock(Logger.class),
nestingOrder);
String[] record = new String[4];
boolean[] gotFields = new boolean[4];
assertEquals(3, reader.read(record, gotFields));
assertEquals("649", record[0]);
assertEquals("106.72129514140468", record[1]);
assertEquals("1449446400000", record[2]);
assertEquals(3, reader.read(record, gotFields));
assertEquals("627", record[0]);
assertEquals("103.64676252462097", record[1]);
assertEquals("1449450000000", record[2]);
assertEquals(-1, reader.read(record, gotFields));
}
public void testRead_WithOneTermField() throws JsonParseException, IOException {
String data = "{" + "\"took\" : 88," + "\"timed_out\" : false,"
+ "\"_shards\" : { \"total\" : 5, \"successful\" : 5, \"failed\" : 0 },"
+ "\"hits\" : { \"total\" : 86275, \"max_score\" : 0.0, \"hits\" : [ ] }," + "\"aggregations\" : {" + "\"time_level\" : {"
+ "\"buckets\" : [ {" + "\"key_as_string\" : \"2015-12-07T00:00:00.000Z\", \"key\" : 1449446400000, \"doc_count\" : 649,"
+ "\"airline_level\" : {" + "\"doc_count_error_upper_bound\" : 0, \"sum_other_doc_count\" : 0,"
+ "\"buckets\" : [ { \"key\" : \"aal\", \"doc_count\" : 62, \"metric_level\" : { \"value\" : 106.72129514140468 } },"
+ "{ \"key\" : \"awe\", \"doc_count\" : 61, \"metric_level\" : { \"value\" : 20.20497368984535 } } ]" + "}" + "}," + "{"
+ "\"key_as_string\" : \"2015-12-07T01:00:00.000Z\", \"key\" : 1449450000000, \"doc_count\" : 627,"
+ "\"airline_level\" : {" + "\"doc_count_error_upper_bound\" : 0, \"sum_other_doc_count\" : 0,"
+ "\"buckets\" : [ { \"key\" : \"aal\", \"doc_count\" : 59, \"metric_level\" : { \"value\" : 103.64676252462097 } },"
+ "{ \"key\" : \"awe\", \"doc_count\" : 56, \"metric_level\" : { \"value\" : 20.047162464686803 } } ]" + "}" + "} ]" + "}"
+ "}" + "}";
JsonParser parser = createParser(data);
Map<String, Integer> fieldMap = createFieldMapWithOneTermField();
List<String> nestingOrder = createNestingOrderWithOneTermField();
AggregatedJsonRecordReader reader = new AggregatedJsonRecordReader(parser, fieldMap, "aggregations", mock(Logger.class),
nestingOrder);
String[] record = new String[4];
boolean[] gotFields = new boolean[4];
assertEquals(4, reader.read(record, gotFields));
assertEquals("aal", record[0]);
assertEquals("62", record[1]);
assertEquals("106.72129514140468", record[2]);
assertEquals("1449446400000", record[3]);
assertEquals(4, reader.read(record, gotFields));
assertEquals("awe", record[0]);
assertEquals("61", record[1]);
assertEquals("20.20497368984535", record[2]);
assertEquals("1449446400000", record[3]);
assertEquals(4, reader.read(record, gotFields));
assertEquals("aal", record[0]);
assertEquals("59", record[1]);
assertEquals("103.64676252462097", record[2]);
assertEquals("1449450000000", record[3]);
assertEquals(4, reader.read(record, gotFields));
assertEquals("awe", record[0]);
assertEquals("56", record[1]);
assertEquals("20.047162464686803", record[2]);
assertEquals("1449450000000", record[3]);
assertEquals(-1, reader.read(record, gotFields));
}
public void testRead_WithTwoTermFields() throws JsonParseException, IOException {
String data = "{" + "\"took\" : 88," + "\"timed_out\" : false,"
+ "\"_shards\" : { \"total\" : 5, \"successful\" : 5, \"failed\" : 0 },"
+ "\"hits\" : { \"total\" : 86275, \"max_score\" : 0.0, \"hits\" : [ ] }," + "\"aggregations\" : {" + "\"time_level\" : {"
+ "\"buckets\" : [ {" + "\"key_as_string\" : \"2015-12-07T00:00:00.000Z\", \"key\" : 1449446400000, \"doc_count\" : 649,"
+ "\"sourcetype_level\" : {" + "\"doc_count_error_upper_bound\" : 0, \"sum_other_doc_count\" : 0," + "\"buckets\" : [ {"
+ "\"key\" : \"farequote\", \"doc_count\" : 649," + "\"airline_level\" : {"
+ "\"doc_count_error_upper_bound\" : 0, \"sum_other_doc_count\" : 0,"
+ "\"buckets\" : [ { \"key\" : \"aal\", \"doc_count\" : 62, \"metric_level\" : { \"value\" : 106.72129514140468 } },"
+ "{ \"key\" : \"awe\", \"doc_count\" : 61, \"metric_level\" : { \"value\" : 20.20497368984535 } } ]" + "}" + "} ]" + "}"
+ "}," + "{" + "\"key_as_string\" : \"2015-12-07T01:00:00.000Z\", \"key\" : 1449450000000, \"doc_count\" : 627,"
+ "\"sourcetype_level\" : {" + "\"doc_count_error_upper_bound\" : 0, \"sum_other_doc_count\" : 0," + "\"buckets\" : [ {"
+ "\"key\" : \"farequote\", \"doc_count\" : 627," + "\"airline_level\" : {"
+ "\"doc_count_error_upper_bound\" : 0, \"sum_other_doc_count\" : 0,"
+ "\"buckets\" : [ { \"key\" : \"aal\", \"doc_count\" : 59, \"metric_level\" : { \"value\" : 103.64676252462097 } },"
+ "{ \"key\" : \"awe\", \"doc_count\" : 56, \"metric_level\" : { \"value\" : 20.047162464686803 } } ]" + "}" + "} ]" + "}"
+ "} ]" + "}" + "}" + "}";
JsonParser parser = createParser(data);
Map<String, Integer> fieldMap = createFieldMapWithTwoTermFields();
List<String> nestingOrder = createNestingOrderWithTwoTermFields();
AggregatedJsonRecordReader reader = new AggregatedJsonRecordReader(parser, fieldMap, "aggregations", mock(Logger.class),
nestingOrder);
String[] record = new String[5];
boolean[] gotFields = new boolean[5];
assertEquals(5, reader.read(record, gotFields));
assertEquals("aal", record[0]);
assertEquals("62", record[1]);
assertEquals("106.72129514140468", record[2]);
assertEquals("1449446400000", record[3]);
assertEquals("farequote", record[4]);
assertEquals(5, reader.read(record, gotFields));
assertEquals("awe", record[0]);
assertEquals("61", record[1]);
assertEquals("20.20497368984535", record[2]);
assertEquals("1449446400000", record[3]);
assertEquals("farequote", record[4]);
assertEquals(5, reader.read(record, gotFields));
assertEquals("aal", record[0]);
assertEquals("59", record[1]);
assertEquals("103.64676252462097", record[2]);
assertEquals("1449450000000", record[3]);
assertEquals("farequote", record[4]);
assertEquals(5, reader.read(record, gotFields));
assertEquals("awe", record[0]);
assertEquals("56", record[1]);
assertEquals("20.047162464686803", record[2]);
assertEquals("1449450000000", record[3]);
assertEquals("farequote", record[4]);
assertEquals(-1, reader.read(record, gotFields));
}
public void testConstructor_GivenNoNestingOrder() throws JsonParseException, IOException {
JsonParser parser = createParser("");
Map<String, Integer> fieldMap = createFieldMapWithNoTermField();
List<String> nestingOrder = Collections.emptyList();
ESTestCase.expectThrows(IllegalArgumentException.class,
() -> new AggregatedJsonRecordReader(parser, fieldMap, "aggregations", mock(Logger.class), nestingOrder));
}
public void testRead_GivenInvalidJson() throws JsonParseException, IOException {
String data = "{" + "\"took\" : 88," + "\"timed_out\" : false,"
+ "\"_shards\" : { \"total\" : 5, \"successful\" : 5, \"failed\" : 0 },"
+ "\"hits\" : { \"total\" : 86275, \"max_score\" : 0.0, \"hits\" : [ ] }," + "\"aggregations\" : {" + "\"time_level\" : {";
JsonParser parser = createParser(data);
Map<String, Integer> fieldMap = createFieldMapWithNoTermField();
List<String> nestingOrder = createNestingOrderWithNoTermField();
AggregatedJsonRecordReader reader = new AggregatedJsonRecordReader(parser, fieldMap, "aggregations", mock(Logger.class),
nestingOrder);
String[] record = new String[4];
boolean[] gotFields = new boolean[4];
ESTestCase.expectThrows(ElasticsearchParseException.class, () -> reader.read(record, gotFields));
}
private JsonParser createParser(String input) throws JsonParseException, IOException {
ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8));
return new JsonFactory().createParser(inputStream);
}
private Map<String, Integer> createFieldMapWithNoTermField() {
Map<String, Integer> fieldMap = new HashMap<>();
fieldMap.put("doc_count", 0);
fieldMap.put("responsetime", 1);
fieldMap.put("time", 2);
return fieldMap;
}
private List<String> createNestingOrderWithNoTermField() {
List<String> nestingOrder = new ArrayList<>();
nestingOrder.add("time");
nestingOrder.add("responsetime");
return nestingOrder;
}
private Map<String, Integer> createFieldMapWithOneTermField() {
Map<String, Integer> fieldMap = new HashMap<>();
fieldMap.put("airline", 0);
fieldMap.put("doc_count", 1);
fieldMap.put("responsetime", 2);
fieldMap.put("time", 3);
return fieldMap;
}
private List<String> createNestingOrderWithOneTermField() {
List<String> nestingOrder = new ArrayList<>();
nestingOrder.add("time");
nestingOrder.add("airline");
nestingOrder.add("responsetime");
return nestingOrder;
}
private Map<String, Integer> createFieldMapWithTwoTermFields() {
Map<String, Integer> fieldMap = new HashMap<>();
fieldMap.put("airline", 0);
fieldMap.put("doc_count", 1);
fieldMap.put("responsetime", 2);
fieldMap.put("time", 3);
fieldMap.put("sourcetype", 4);
return fieldMap;
}
private List<String> createNestingOrderWithTwoTermFields() {
List<String> nestingOrder = new ArrayList<>();
nestingOrder.add("time");
nestingOrder.add("sourcetype");
nestingOrder.add("airline");
nestingOrder.add("responsetime");
return nestingOrder;
}
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.threadpool.ThreadPool;
@ -33,18 +34,19 @@ public class RestStartJobSchedulerActionTests extends ESTestCase {
public void testPrepareRequest() throws Exception {
ClusterService clusterService = mock(ClusterService.class);
SearchRequestParsers searchRequestParsers = mock(SearchRequestParsers.class);
Job.Builder job = ScheduledJobRunnerTests.createScheduledJob();
SchedulerConfig schedulerConfig = ScheduledJobRunnerTests.createSchedulerConfig("foo-scheduler", "foo").build();
PrelertMetadata prelertMetadata = new PrelertMetadata.Builder()
.putJob(job.build(), false)
.putScheduler(schedulerConfig)
.putScheduler(schedulerConfig, searchRequestParsers)
.updateStatus("foo", JobStatus.OPENED, null)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, prelertMetadata))
.build());
RestStartSchedulerAction action = new RestStartSchedulerAction(Settings.EMPTY, mock(RestController.class),
mock(ThreadPool.class), clusterService);
mock(ThreadPool.class), clusterService, searchRequestParsers);
Map<String, String> params = new HashMap<>();
params.put("start", "not-a-date");

View File

@ -15,6 +15,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
@ -99,8 +100,8 @@ public class ScheduledJobRunnerTests extends ESTestCase {
when(client.execute(same(JobDataAction.INSTANCE), any())).thenReturn(jobDataFuture);
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
scheduledJobRunner =
new ScheduledJobRunner(threadPool, client, clusterService,jobProvider, dataExtractorFactory, () -> currentTime);
scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider, dataExtractorFactory,
() -> currentTime);
when(jobProvider.audit(anyString())).thenReturn(auditor);
when(jobProvider.buckets(anyString(), any(BucketsQueryBuilder.BucketsQuery.class))).thenThrow(
@ -114,7 +115,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
Job job = jobBuilder.build();
PrelertMetadata prelertMetadata = new PrelertMetadata.Builder()
.putJob(job, false)
.putScheduler(schedulerConfig)
.putScheduler(schedulerConfig, mock(SearchRequestParsers.class))
.updateStatus("foo", JobStatus.OPENED, null)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
@ -147,7 +148,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
Job job = jobBuilder.build();
PrelertMetadata prelertMetadata = new PrelertMetadata.Builder()
.putJob(job, false)
.putScheduler(schedulerConfig)
.putScheduler(schedulerConfig, mock(SearchRequestParsers.class))
.updateStatus("foo", JobStatus.OPENED, null)
.build();
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("_name"))
@ -202,21 +203,24 @@ public class ScheduledJobRunnerTests extends ESTestCase {
PrelertMetadata prelertMetadata1 = new PrelertMetadata.Builder()
.putJob(job1, false)
.build();
Exception e = expectThrows(ResourceNotFoundException.class, () -> ScheduledJobRunner.validate("some-scheduler", prelertMetadata1));
Exception e = expectThrows(ResourceNotFoundException.class,
() -> ScheduledJobRunner.validate("some-scheduler", prelertMetadata1));
assertThat(e.getMessage(), equalTo("No scheduler with id [some-scheduler] exists"));
SchedulerConfig schedulerConfig1 = createSchedulerConfig("foo-scheduler", "foo").build();
PrelertMetadata prelertMetadata2 = new PrelertMetadata.Builder(prelertMetadata1)
.putScheduler(schedulerConfig1)
.putScheduler(schedulerConfig1, mock(SearchRequestParsers.class))
.build();
e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate("foo-scheduler", prelertMetadata2));
e = expectThrows(ElasticsearchStatusException.class,
() -> ScheduledJobRunner.validate("foo-scheduler", prelertMetadata2));
assertThat(e.getMessage(), equalTo("cannot start scheduler, expected job status [OPENED], but got [CLOSED]"));
PrelertMetadata prelertMetadata3 = new PrelertMetadata.Builder(prelertMetadata2)
.updateStatus("foo", JobStatus.OPENED, null)
.updateSchedulerStatus("foo-scheduler", SchedulerStatus.STARTED)
.build();
e = expectThrows(ElasticsearchStatusException.class, () -> ScheduledJobRunner.validate("foo-scheduler", prelertMetadata3));
e = expectThrows(ElasticsearchStatusException.class,
() -> ScheduledJobRunner.validate("foo-scheduler", prelertMetadata3));
assertThat(e.getMessage(), equalTo("scheduler already started, expected scheduler status [STOPPED], but got [STARTED]"));
}

View File

@ -5,8 +5,8 @@
*/
package org.elasticsearch.xpack.prelert.scheduler;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.DataDescription;
@ -141,32 +141,7 @@ public class ScheduledJobValidatorTests extends ESTestCase {
private static SchedulerConfig.Builder createValidSchedulerConfigWithAggs() throws IOException {
SchedulerConfig.Builder schedulerConfig = createValidSchedulerConfig();
String aggStr =
"{" +
"\"buckets\" : {" +
"\"histogram\" : {" +
"\"field\" : \"time\"," +
"\"interval\" : 3600000" +
"}," +
"\"aggs\" : {" +
"\"byField\" : {" +
"\"terms\" : {" +
"\"field\" : \"airline\"," +
"\"size\" : 0" +
"}," +
"\"aggs\" : {" +
"\"stats\" : {" +
"\"stats\" : {" +
"\"field\" : \"responsetime\"" +
"}" +
"}" +
"}" +
"}" +
"}" +
"} " +
"}";
XContentParser parser = XContentFactory.xContent(aggStr).createParser(aggStr);
schedulerConfig.setAggregations(parser.map());
schedulerConfig.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo")));
return schedulerConfig;
}

View File

@ -6,12 +6,17 @@
package org.elasticsearch.xpack.prelert.scheduler;
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase;
@ -20,29 +25,22 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerConfig> {
@Override
protected SchedulerConfig createTestInstance() {
SchedulerConfig.Builder builder = new SchedulerConfig.Builder(randomValidSchedulerId(), randomAsciiOfLength(10));
return createRandomizedSchedulerConfig(randomAsciiOfLength(10));
}
public static SchedulerConfig createRandomizedSchedulerConfig(String jobId) {
SchedulerConfig.Builder builder = new SchedulerConfig.Builder(randomValidSchedulerId(), jobId);
builder.setIndexes(randomStringList(1, 10));
builder.setTypes(randomStringList(1, 10));
if (randomBoolean()) {
builder.setQuery(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
if (randomBoolean()) {
builder.setScriptFields(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
if (randomBoolean()) {
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
builder.setAggregations(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
if (randomBoolean()) {
builder.setFrequency(randomPositiveLong());
}
@ -71,80 +69,43 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
return SchedulerConfig.PARSER.apply(parser, () -> matcher).build();
}
/**
* Test parsing of the opaque {@link SchedulerConfig#getQuery()} object
*/
public void testQueryParsing() throws IOException {
Logger logger = Loggers.getLogger(SchedulerConfigTests.class);
public void testToXContent_GivenQueryAggsAndScriptFields() throws IOException {
SchedulerConfig.Builder builder = new SchedulerConfig.Builder(randomValidSchedulerId(), randomAsciiOfLength(10));
builder.setIndexes(randomStringList(1, 10));
builder.setTypes(randomStringList(1, 10));
builder.setQuery(QueryBuilders.matchAllQuery());
String schedulerConfigStr = "{" + "\"scheduler_id\":\"scheduler1\"," + "\"job_id\":\"job1\"," + "\"indexes\":[\"farequote\"],"
+ "\"types\":[\"farequote\"]," + "\"query\":{\"match_all\":{} }" + "}";
int scriptsSize = randomInt(3);
List<SearchSourceBuilder.ScriptField> scriptFields = new ArrayList<>(scriptsSize);
for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) {
scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)),
randomBoolean()));
}
builder.setScriptFields(scriptFields);
XContentParser parser = XContentFactory.xContent(schedulerConfigStr).createParser(schedulerConfigStr);
SchedulerConfig schedulerConfig = SchedulerConfig.PARSER.apply(parser, () -> ParseFieldMatcher.STRICT).build();
assertNotNull(schedulerConfig);
AggregatorFactories.Builder aggsBuilder = new AggregatorFactories.Builder();
aggsBuilder.addAggregator(AggregationBuilders.avg(randomAsciiOfLength(10)));
builder.setAggregations(aggsBuilder);
Map<String, Object> query = schedulerConfig.getQuery();
assertNotNull(query);
SchedulerConfig testInstance = builder.build();
String queryAsJson = XContentFactory.jsonBuilder().map(query).string();
logger.info("Round trip of query is: " + queryAsJson);
assertTrue(query.containsKey("match_all"));
}
for (int runs = 0; runs < NUMBER_OF_TESTQUERIES; runs++) {
XContentBuilder xContentBuilder = toXContent(testInstance, randomFrom(XContentType.values()));
XContentBuilder shuffled = shuffleXContent(xContentBuilder, shuffleProtectedFields());
public void testBuildAggregatedFieldList_GivenNoAggregations() {
SchedulerConfig.Builder builder = new SchedulerConfig.Builder("scheduler1", "job1");
builder.setIndexes(Arrays.asList("index"));
builder.setTypes(Arrays.asList("type"));
assertTrue(builder.build().buildAggregatedFieldList().isEmpty());
}
XContentParser parser = XContentFactory.xContent(shuffled.bytes()).createParser(shuffled.bytes());
SchedulerConfig parsedInstance = parseInstance(parser, ParseFieldMatcher.STRICT);
public void testAggsParse() throws IOException {
Logger logger = Loggers.getLogger(SchedulerConfigTests.class);
String aggregationsConfig = "{" + "\"scheduler_id\":\"scheduler1\"," + "\"job_id\":\"job1\"," + "\"indexes\":[\"farequote\"],"
+ "\"types\":[\"farequote\"]," + "\"query\":{\"match_all\":{} }," + "\"aggs\" : {" + "\"top_level_must_be_time\" : {"
+ "\"histogram\" : {" + "\"field\" : \"@timestamp\"," + "\"interval\" : 3600000" + "}," + "\"aggs\" : {"
+ "\"by_field_in_the_middle\" : { " + "\"terms\" : {" + "\"field\" : \"airline\"," + "\"size\" : 0" + "}," + "\"aggs\" : {"
+ "\"stats_last\" : {" + "\"avg\" : {" + "\"field\" : \"responsetime\"" + "}" + "}" + "} " + "}" + "}" + "}" + "}" + "}"
+ "}";
String aggsConfig = "{" + "\"scheduler_id\":\"scheduler1\"," + "\"job_id\":\"job1\"," + "\"indexes\":[\"farequote\"],"
+ "\"types\":[\"farequote\"]," + "\"query\":{\"match_all\":{} }," + "\"aggs\" : {" + "\"top_level_must_be_time\" : {"
+ "\"histogram\" : {" + "\"field\" : \"@timestamp\"," + "\"interval\" : 3600000" + "}," + "\"aggs\" : {"
+ "\"by_field_in_the_middle\" : { " + "\"terms\" : {" + "\"field\" : \"airline\"," + "\"size\" : 0" + "}," + "\"aggs\" : {"
+ "\"stats_last\" : {" + "\"avg\" : {" + "\"field\" : \"responsetime\"" + "}" + "}" + "} " + "}" + "}" + "}" + "}" + "}"
+ "}";
XContentParser parser = XContentFactory.xContent(aggregationsConfig).createParser(aggregationsConfig);
SchedulerConfig aggregationsSchedulerConfig = SchedulerConfig.PARSER.apply(parser, () -> ParseFieldMatcher.STRICT).build();
parser = XContentFactory.xContent(aggsConfig).createParser(aggsConfig);
SchedulerConfig aggsSchedulerConfig = SchedulerConfig.PARSER.apply(parser, () -> ParseFieldMatcher.STRICT).build();
assertNotNull(aggregationsSchedulerConfig);
assertNotNull(aggsSchedulerConfig);
assertEquals(aggregationsSchedulerConfig, aggsSchedulerConfig);
Map<String, Object> aggs = aggsSchedulerConfig.getAggregations();
assertNotNull(aggs);
String aggsAsJson = XContentFactory.jsonBuilder().map(aggs).string();
logger.info("Round trip of aggs is: " + aggsAsJson);
assertTrue(aggs.containsKey("top_level_must_be_time"));
List<String> aggregatedFieldList = aggsSchedulerConfig.buildAggregatedFieldList();
assertEquals(3, aggregatedFieldList.size());
assertEquals("@timestamp", aggregatedFieldList.get(0));
assertEquals("airline", aggregatedFieldList.get(1));
assertEquals("responsetime", aggregatedFieldList.get(2));
assertEquals(testInstance.getQueryAsMap(), parsedInstance.getQueryAsMap());
assertEquals(testInstance.getAggregationsAsMap(), parsedInstance.getAggregationsAsMap());
assertEquals(testInstance.getScriptFieldsAsMap(), parsedInstance.getScriptFieldsAsMap());
}
}
public void testFillDefaults() {
SchedulerConfig.Builder expectedSchedulerConfig = new SchedulerConfig.Builder("scheduler1", "job1");
expectedSchedulerConfig.setIndexes(Arrays.asList("index"));
expectedSchedulerConfig.setTypes(Arrays.asList("type"));
Map<String, Object> defaultQuery = new HashMap<>();
defaultQuery.put("match_all", new HashMap<String, Object>());
expectedSchedulerConfig.setQuery(defaultQuery);
expectedSchedulerConfig.setQueryDelay(60L);
expectedSchedulerConfig.setScrollSize(1000);
SchedulerConfig.Builder defaultedSchedulerConfig = new SchedulerConfig.Builder("scheduler1", "job1");
@ -208,8 +169,7 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
public void testEquals_GivenDifferentQuery() {
SchedulerConfig.Builder b1 = createFullyPopulated();
SchedulerConfig.Builder b2 = createFullyPopulated();
Map<String, Object> emptyQuery = new HashMap<>();
b2.setQuery(emptyQuery);
b2.setQuery(QueryBuilders.termQuery("foo", "bar"));
SchedulerConfig sc1 = b1.build();
SchedulerConfig sc2 = b2.build();
@ -220,8 +180,7 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
public void testEquals_GivenDifferentAggregations() {
SchedulerConfig.Builder sc1 = createFullyPopulated();
SchedulerConfig.Builder sc2 = createFullyPopulated();
Map<String, Object> emptyAggs = new HashMap<>();
sc2.setAggregations(emptyAggs);
sc2.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.count("foo")));
assertFalse(sc1.build().equals(sc2.build()));
assertFalse(sc2.build().equals(sc1.build()));
@ -233,46 +192,12 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
sc.setTypes(Arrays.asList("myType1", "myType2"));
sc.setFrequency(60L);
sc.setScrollSize(5000);
Map<String, Object> query = new HashMap<>();
query.put("foo", new HashMap<>());
sc.setQuery(query);
Map<String, Object> aggs = new HashMap<>();
aggs.put("bar", new HashMap<>());
sc.setAggregations(aggs);
sc.setQuery(QueryBuilders.matchAllQuery());
sc.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo")));
sc.setQueryDelay(90L);
return sc;
}
public void testCheckValid_AllOk() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder("scheduler1", "job1");
conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype"));
conf.setQueryDelay(90L);
String json = "{ \"match_all\" : {} }";
XContentParser parser = XContentFactory.xContent(json).createParser(json);
conf.setQuery(parser.map());
conf.setScrollSize(2000);
conf.build();
}
public void testCheckValid_NoQuery() {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder("scheduler1", "job1");
conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype"));
assertEquals(Collections.singletonMap("match_all", new HashMap<>()), conf.build().getQuery());
}
public void testCheckValid_GivenScriptFields() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder("scheduler1", "job1");
conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype"));
String json = "{ \"twiceresponsetime\" : { \"script\" : { \"lang\" : \"expression\", "
+ "\"inline\" : \"doc['responsetime'].value * 2\" } } }";
XContentParser parser = XContentFactory.xContent(json).createParser(json);
conf.setScriptFields(parser.map());
assertEquals(1, conf.build().getScriptFields().size());
}
public void testCheckValid_GivenNullIndexes() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder("scheduler1", "job1");
expectThrows(IllegalArgumentException.class, () -> conf.setIndexes(null));

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase;
public class SchedulerTests extends AbstractSerializingTestCase<Scheduler> {
@Override
protected Scheduler createTestInstance() {
return new Scheduler(SchedulerConfigTests.createRandomizedSchedulerConfig(randomAsciiOfLength(10)),
randomFrom(SchedulerStatus.values()));
}
@Override
protected Writeable.Reader<Scheduler> instanceReader() {
return Scheduler::new;
}
@Override
protected Scheduler parseInstance(XContentParser parser, ParseFieldMatcher matcher) {
return Scheduler.PARSER.apply(parser, () -> matcher);
}
}

View File

@ -28,7 +28,7 @@ public class ElasticsearchDataExtractorTests extends ESTestCase {
private static final List<String> INDEXES = Arrays.asList("index-*");
private static final List<String> TYPES = Arrays.asList("dataType");
private static final String SEARCH = "\"match_all\":{}";
private static final String SEARCH = "{\"match_all\":{}}";
private static final String TIME_FIELD = "time";
private static final String CLEAR_SCROLL_RESPONSE = "{}";

View File

@ -13,8 +13,10 @@ import static org.mockito.Mockito.verify;
public class ElasticsearchQueryBuilderTests extends ESTestCase {
private static final String MATCH_ALL_QUERY = "{\"match_all\":{}}";
public void testCreateSearchBody_GivenQueryOnly() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, null, "time");
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null, null, "time");
assertFalse(queryBuilder.isAggregated());
@ -29,7 +31,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase {
}
public void testCreateSearchBody_GivenQueryAndScriptFields() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null,
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null,
"{\"test1\":{\"script\": \"...\"}}", "@timestamp");
assertFalse(queryBuilder.isAggregated());
@ -46,7 +48,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase {
}
public void testCreateSearchBody_GivenQueryAndAggs() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", "{\"my_aggs\":{}}", null, "time");
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, "{\"my_aggs\":{}}", null, "time");
assertTrue(queryBuilder.isAggregated());
@ -61,7 +63,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase {
}
public void testCreateDataSummaryQuery_GivenQueryOnly() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, null, "@timestamp");
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null, null, "@timestamp");
assertFalse(queryBuilder.isAggregated());
@ -78,7 +80,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase {
}
public void testCreateDataSummaryQuery_GivenQueryAndScriptFields() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null,
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null,
"{\"test1\":{\"script\": \"...\"}}", "@timestamp");
assertFalse(queryBuilder.isAggregated());
@ -96,7 +98,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase {
}
public void testCreateDataSummaryQuery_GivenQueryAndAggs() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", "{\"my_aggs\":{}}", null, "@timestamp");
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, "{\"my_aggs\":{}}", null, "@timestamp");
assertTrue(queryBuilder.isAggregated());
@ -113,7 +115,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase {
}
public void testLogQueryInfo_GivenNoAggsNoFields() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, null, "@timestamp");
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null, null, "@timestamp");
Logger logger = mock(Logger.class);
queryBuilder.logQueryInfo(logger);
@ -122,7 +124,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase {
}
public void testLogQueryInfo() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", "{\"my_aggs\":{ \"foo\": \"bar\" }}",
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, "{\"my_aggs\":{ \"foo\": \"bar\" }}",
null, "@timestamp");
Logger logger = mock(Logger.class);

View File

@ -104,6 +104,20 @@ setup:
"types":["type-bar"]
}
---
"Test put scheduler with invalid query":
- do:
catch: /parsing_exception/
xpack.prelert.put_scheduler:
scheduler_id: test-scheduler-1
body: >
{
"job_id":"job-1",
"indexes":["index-foo"],
"types":["type-bar"],
"query":{"match_all_mispelled":{}}
}
---
"Test delete scheduler with missing id":
- do: