diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java index 5d25b9d71e6..9749f565660 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java @@ -46,7 +46,8 @@ public class XContentObjectTransformer { public static XContentObjectTransformer aggregatorTransformer() { return new XContentObjectTransformer<>(searchRegistry, (p) -> { // Serializing a map creates an object, need to skip the start object for the aggregation parser - assert(XContentParser.Token.START_OBJECT.equals(p.nextToken())); + XContentParser.Token token = p.nextToken(); + assert(XContentParser.Token.START_OBJECT.equals(token)); return AggregatorFactories.parseAggregators(p); }); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index 2787f67952a..554fd131c1e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -8,28 +8,38 @@ package org.elasticsearch.xpack.core.ml.datafeed; import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParseException; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.PipelineAggregatorBuilders; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -83,7 +93,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase bucketSpanMillis ? bucketSpanMillis : aggHistogramInterval; @@ -567,6 +577,98 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase 0 ? params.bytes : null")); + DateHistogramAggregationBuilder dateHistogram = + AggregationBuilders.dateHistogram("histogram_buckets") + .field("timestamp").interval(300000).timeZone(DateTimeZone.UTC) + .subAggregation(maxTime) + .subAggregation(avgAggregationBuilder) + .subAggregation(derivativePipelineAggregationBuilder) + .subAggregation(bucketScriptPipelineAggregationBuilder); + DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilderWithDateHistogram(dateHistogram); + QueryBuilder terms = + new BoolQueryBuilder().filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))); + datafeedConfigBuilder.setParsedQuery(terms); + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); + AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder().addAggregator(dateHistogram); + + + XContentType xContentType = XContentType.JSON; + BytesReference bytes = XContentHelper.toXContent(datafeedConfig, xContentType, false); + XContentParser parser = XContentHelper.createParser(xContentRegistry(), + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + bytes, + xContentType); + + DatafeedConfig parsedDatafeedConfig = doParseInstance(parser); + assertEquals(datafeedConfig, parsedDatafeedConfig); + + // Assert that the parsed versions of our aggs and queries work as well + assertEquals(aggBuilder, parsedDatafeedConfig.getParsedAggregations()); + assertEquals(terms, parsedDatafeedConfig.getParsedQuery()); + + try(BytesStreamOutput output = new BytesStreamOutput()) { + datafeedConfig.writeTo(output); + try(StreamInput streamInput = output.bytes().streamInput()) { + DatafeedConfig streamedDatafeedConfig = new DatafeedConfig(streamInput); + assertEquals(datafeedConfig, streamedDatafeedConfig); + + // Assert that the parsed versions of our aggs and queries work as well + assertEquals(aggBuilder, streamedDatafeedConfig.getParsedAggregations()); + assertEquals(terms, streamedDatafeedConfig.getParsedQuery()); + } + } + } + + public void testSerializationOfComplexAggsBetweenVersions() throws IOException { + MaxAggregationBuilder maxTime = AggregationBuilders.max("timestamp").field("timestamp"); + AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("bytes_in_avg").field("system.network.in.bytes"); + DerivativePipelineAggregationBuilder derivativePipelineAggregationBuilder = + PipelineAggregatorBuilders.derivative("bytes_in_derivative", "bytes_in_avg"); + BucketScriptPipelineAggregationBuilder bucketScriptPipelineAggregationBuilder = + PipelineAggregatorBuilders.bucketScript("non_negative_bytes", + Collections.singletonMap("bytes", "bytes_in_derivative"), + new Script("params.bytes > 0 ? params.bytes : null")); + DateHistogramAggregationBuilder dateHistogram = + AggregationBuilders.dateHistogram("histogram_buckets") + .field("timestamp").interval(300000).timeZone(DateTimeZone.UTC) + .subAggregation(maxTime) + .subAggregation(avgAggregationBuilder) + .subAggregation(derivativePipelineAggregationBuilder) + .subAggregation(bucketScriptPipelineAggregationBuilder); + DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilderWithDateHistogram(dateHistogram); + QueryBuilder terms = + new BoolQueryBuilder().filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))); + datafeedConfigBuilder.setParsedQuery(terms); + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); + + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables()); + + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.setVersion(Version.V_6_0_0); + datafeedConfig.writeTo(output); + try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) { + in.setVersion(Version.V_6_0_0); + DatafeedConfig streamedDatafeedConfig = new DatafeedConfig(in); + assertEquals(datafeedConfig, streamedDatafeedConfig); + + // Assert that the parsed versions of our aggs and queries work as well + assertEquals(new AggregatorFactories.Builder().addAggregator(dateHistogram), + streamedDatafeedConfig.getParsedAggregations()); + assertEquals(terms, streamedDatafeedConfig.getParsedQuery()); + } + } + } + public static String randomValidDatafeedId() { CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); return generator.ofCodePointsLength(random(), 10, 10); @@ -590,14 +692,18 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase + { + "job_id":"datafeeds-crud-1", + "indices":["index-foo"], + "types":["type-bar"], + "aggs": { + "histogram_buckets":{ + "date_histogram": { + "field": "@timestamp", + "interval": "5m", + "time_zone": "UTC", + "min_doc_count": 0 + }, + "aggs": { + "@timestamp": { + "max": { + "field": "@timestamp" + } + }, + "bytes_in_avg": { + "avg": { + "field": "system.network.in.bytes" + } + }, + "bytes_in_derivative": { + "derivative": { + "buckets_path": "bytes_in_avg" + } + }, + "non_negative_bytes": { + "bucket_script": { + "buckets_path": { + "bytes": "bytes_in_derivative" + }, + "script": "params.bytes > 0 ? params.bytes : null" + } + } + } + } + } + } + - do: + xpack.ml.get_datafeeds: + datafeed_id: test-datafeed-aggs-1 + - match: { datafeeds.0.datafeed_id: "test-datafeed-aggs-1" } + - match: { datafeeds.0.aggregations.histogram_buckets.date_histogram.field: "@timestamp" } + - match: { datafeeds.0.aggregations.histogram_buckets.aggregations.@timestamp.max.field: "@timestamp" } + - match: { datafeeds.0.aggregations.histogram_buckets.aggregations.bytes_in_avg.avg.field: "system.network.in.bytes" } + - match: { datafeeds.0.aggregations.histogram_buckets.aggregations.non_negative_bytes.bucket_script.buckets_path.bytes: "bytes_in_derivative" } + --- "Test delete datafeed": - do: