Adding additional tests for agg parsing in datafeedconfig (#36261)

* Adding additional tests for agg parsing in datafeedconfig

* Fixing bug, adding yml test
This commit is contained in:
Benjamin Trent 2018-12-06 11:19:34 -06:00 committed by GitHub
parent 96ee6e0d2a
commit adc8355c5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 168 additions and 5 deletions

View File

@ -46,7 +46,8 @@ public class XContentObjectTransformer<T extends ToXContentObject> {
public static XContentObjectTransformer<AggregatorFactories.Builder> aggregatorTransformer() {
return new XContentObjectTransformer<>(searchRegistry, (p) -> {
// Serializing a map creates an object, need to skip the start object for the aggregation parser
assert(XContentParser.Token.START_OBJECT.equals(p.nextToken()));
XContentParser.Token token = p.nextToken();
assert(XContentParser.Token.START_OBJECT.equals(token));
return AggregatorFactories.parseAggregators(p);
});
}

View File

@ -8,28 +8,38 @@ package org.elasticsearch.xpack.core.ml.datafeed;
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.PipelineAggregatorBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField;
import org.elasticsearch.test.AbstractSerializingTestCase;
@ -83,7 +93,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
if (randomBoolean() && addScriptFields == false) {
// can only test with a single agg as the xcontent order gets randomized by test base class and then
// the actual xcontent isn't the same and test fail.
// Testing with a single agg is ok as we don't have special list writeable / xconent logic
// Testing with a single agg is ok as we don't have special list writeable / xcontent logic
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
aggHistogramInterval = randomNonNegativeLong();
aggHistogramInterval = aggHistogramInterval> bucketSpanMillis ? bucketSpanMillis : aggHistogramInterval;
@ -567,6 +577,98 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(12)));
}
public void testSerializationOfComplexAggs() throws IOException {
MaxAggregationBuilder maxTime = AggregationBuilders.max("timestamp").field("timestamp");
AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("bytes_in_avg").field("system.network.in.bytes");
DerivativePipelineAggregationBuilder derivativePipelineAggregationBuilder =
PipelineAggregatorBuilders.derivative("bytes_in_derivative", "bytes_in_avg");
BucketScriptPipelineAggregationBuilder bucketScriptPipelineAggregationBuilder =
PipelineAggregatorBuilders.bucketScript("non_negative_bytes",
Collections.singletonMap("bytes", "bytes_in_derivative"),
new Script("params.bytes > 0 ? params.bytes : null"));
DateHistogramAggregationBuilder dateHistogram =
AggregationBuilders.dateHistogram("histogram_buckets")
.field("timestamp").interval(300000).timeZone(DateTimeZone.UTC)
.subAggregation(maxTime)
.subAggregation(avgAggregationBuilder)
.subAggregation(derivativePipelineAggregationBuilder)
.subAggregation(bucketScriptPipelineAggregationBuilder);
DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilderWithDateHistogram(dateHistogram);
QueryBuilder terms =
new BoolQueryBuilder().filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
datafeedConfigBuilder.setParsedQuery(terms);
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder().addAggregator(dateHistogram);
XContentType xContentType = XContentType.JSON;
BytesReference bytes = XContentHelper.toXContent(datafeedConfig, xContentType, false);
XContentParser parser = XContentHelper.createParser(xContentRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
bytes,
xContentType);
DatafeedConfig parsedDatafeedConfig = doParseInstance(parser);
assertEquals(datafeedConfig, parsedDatafeedConfig);
// Assert that the parsed versions of our aggs and queries work as well
assertEquals(aggBuilder, parsedDatafeedConfig.getParsedAggregations());
assertEquals(terms, parsedDatafeedConfig.getParsedQuery());
try(BytesStreamOutput output = new BytesStreamOutput()) {
datafeedConfig.writeTo(output);
try(StreamInput streamInput = output.bytes().streamInput()) {
DatafeedConfig streamedDatafeedConfig = new DatafeedConfig(streamInput);
assertEquals(datafeedConfig, streamedDatafeedConfig);
// Assert that the parsed versions of our aggs and queries work as well
assertEquals(aggBuilder, streamedDatafeedConfig.getParsedAggregations());
assertEquals(terms, streamedDatafeedConfig.getParsedQuery());
}
}
}
public void testSerializationOfComplexAggsBetweenVersions() throws IOException {
MaxAggregationBuilder maxTime = AggregationBuilders.max("timestamp").field("timestamp");
AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("bytes_in_avg").field("system.network.in.bytes");
DerivativePipelineAggregationBuilder derivativePipelineAggregationBuilder =
PipelineAggregatorBuilders.derivative("bytes_in_derivative", "bytes_in_avg");
BucketScriptPipelineAggregationBuilder bucketScriptPipelineAggregationBuilder =
PipelineAggregatorBuilders.bucketScript("non_negative_bytes",
Collections.singletonMap("bytes", "bytes_in_derivative"),
new Script("params.bytes > 0 ? params.bytes : null"));
DateHistogramAggregationBuilder dateHistogram =
AggregationBuilders.dateHistogram("histogram_buckets")
.field("timestamp").interval(300000).timeZone(DateTimeZone.UTC)
.subAggregation(maxTime)
.subAggregation(avgAggregationBuilder)
.subAggregation(derivativePipelineAggregationBuilder)
.subAggregation(bucketScriptPipelineAggregationBuilder);
DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilderWithDateHistogram(dateHistogram);
QueryBuilder terms =
new BoolQueryBuilder().filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
datafeedConfigBuilder.setParsedQuery(terms);
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables());
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_6_0_0);
datafeedConfig.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
in.setVersion(Version.V_6_0_0);
DatafeedConfig streamedDatafeedConfig = new DatafeedConfig(in);
assertEquals(datafeedConfig, streamedDatafeedConfig);
// Assert that the parsed versions of our aggs and queries work as well
assertEquals(new AggregatorFactories.Builder().addAggregator(dateHistogram),
streamedDatafeedConfig.getParsedAggregations());
assertEquals(terms, streamedDatafeedConfig.getParsedQuery());
}
}
}
public static String randomValidDatafeedId() {
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
return generator.ofCodePointsLength(random(), 10, 10);
@ -590,14 +692,18 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
return createDatafeedWithDateHistogram(dateHistogram);
}
private static DatafeedConfig createDatafeedWithDateHistogram(DateHistogramAggregationBuilder dateHistogram) {
private static DatafeedConfig.Builder createDatafeedBuilderWithDateHistogram(DateHistogramAggregationBuilder dateHistogram) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator(dateHistogram);
DatafeedConfig.validateAggregations(aggs);
builder.setParsedAggregations(aggs);
return builder.build();
return builder;
}
private static DatafeedConfig createDatafeedWithDateHistogram(DateHistogramAggregationBuilder dateHistogram) {
return createDatafeedBuilderWithDateHistogram(dateHistogram).build();
}
@Override

View File

@ -11,7 +11,8 @@ setup:
"job_id":"datafeeds-crud-1",
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
"detectors" :[{"function":"count"}],
"summary_count_field_name": "doc_count"
},
"data_description" : {
"format":"xcontent",
@ -321,6 +322,61 @@ setup:
- match: { chunking_config.mode: "manual" }
- match: { chunking_config.time_span: "1h" }
---
"Test put datafeed with aggregations":
- do:
xpack.ml.put_datafeed:
datafeed_id: test-datafeed-aggs-1
body: >
{
"job_id":"datafeeds-crud-1",
"indices":["index-foo"],
"types":["type-bar"],
"aggs": {
"histogram_buckets":{
"date_histogram": {
"field": "@timestamp",
"interval": "5m",
"time_zone": "UTC",
"min_doc_count": 0
},
"aggs": {
"@timestamp": {
"max": {
"field": "@timestamp"
}
},
"bytes_in_avg": {
"avg": {
"field": "system.network.in.bytes"
}
},
"bytes_in_derivative": {
"derivative": {
"buckets_path": "bytes_in_avg"
}
},
"non_negative_bytes": {
"bucket_script": {
"buckets_path": {
"bytes": "bytes_in_derivative"
},
"script": "params.bytes > 0 ? params.bytes : null"
}
}
}
}
}
}
- do:
xpack.ml.get_datafeeds:
datafeed_id: test-datafeed-aggs-1
- match: { datafeeds.0.datafeed_id: "test-datafeed-aggs-1" }
- match: { datafeeds.0.aggregations.histogram_buckets.date_histogram.field: "@timestamp" }
- match: { datafeeds.0.aggregations.histogram_buckets.aggregations.@timestamp.max.field: "@timestamp" }
- match: { datafeeds.0.aggregations.histogram_buckets.aggregations.bytes_in_avg.avg.field: "system.network.in.bytes" }
- match: { datafeeds.0.aggregations.histogram_buckets.aggregations.non_negative_bytes.bucket_script.buckets_path.bytes: "bytes_in_derivative" }
---
"Test delete datafeed":
- do: