From 9cc33f4e292309368ea871de802bf8a1da3cfbf5 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Tue, 27 Mar 2018 10:33:59 -0700 Subject: [PATCH] [Rollup] Select best jobs then execute msearch-per-job (elastic/x-pack-elasticsearch#4152) If there are multiple jobs that are all the "best" (e.g. share the best interval) we have no way of knowing which is actually the best. Unfortunately, we cannot just filter for all the jobs in a single search because their doc_counts can potentially overlap. To solve this, we execute an msearch-per-job so that the results stay isolated. When rewriting the response, we iteratively unroll and reduce the independent msearch responses into a single "working tree". This allows us to intervene if there are overlapping buckets and manually choose a doc_count. Job selection is found by recursively descending through the aggregation tree and independently pruning the list of valid job caps in each branch. When a leaf node is reached in the branch, the remaining jobs are sorted by "best'ness" (see comparator in RollupJobIdentifierUtils for the implementation) and added to a global set of "best jobs". Once all branches have been evaluated, the final set is returned to the calling code. Job "best'ness" is, briefly, the job(s) that have - The largest compatible date interval - Fewer and larger interval histograms - Fewer terms groups Note: the final set of "best" jobs is not guaranteed to be minimal, there may be redundant effort due to independent branches choosing jobs that are subsets of other branches. Related changes: - We have to include the job's ID in the rollup doc's hash, so that different jobs don't overwrite the same summary document. - Now that we iteratively reduce the agg tree, the agg framework injects empty buckets while we're working. In most cases this is harmless, but for `avg` aggs the empty bucket is a SumAgg while any unrolled versions are converted into AvgAggs... causing a cast exception. To get around this, avg's are renamed to `{source_name}.value` to prevent a conflict - The job filtering has been pushed up into a query filter, since it applies to the entire msearch rather than just individual agg components - We no longer add a filter agg clause about the date_histo's interval, because that is handled by the job validation and pruning. Original commit: elastic/x-pack-elasticsearch@995be2a039912ba55ee4b5d21f25eaae54b76b68 --- .../en/rest-api/rollup/rollup-search.asciidoc | 8 +- .../xpack/core/rollup/RollupField.java | 10 + .../core/rollup/action/RollupJobCaps.java | 9 + .../core/rollup/job/DateHistoGroupConfig.java | 5 +- .../elasticsearch/xpack/rollup/Rollup.java | 7 + .../rollup/RollupJobIdentifierUtils.java | 333 +++++++++ .../xpack/rollup/RollupRequestTranslator.java | 149 +--- .../rollup/RollupResponseTranslator.java | 283 +++++--- .../action/TransportRollupSearchAction.java | 208 +++--- .../action/TransportStartRollupAction.java | 2 +- .../action/TransportStopRollupAction.java | 2 +- .../xpack/rollup/job/IndexerUtils.java | 4 +- .../rollup/RollupJobIdentifierUtilTests.java | 530 ++++++++++++++ .../rollup/RollupRequestTranslationTests.java | 593 ++-------------- .../RollupResponseTranslationTests.java | 491 +++++++++++-- .../rollup/action/RollupIndexCapsTests.java | 2 +- .../rollup/action/SearchActionTests.java | 306 ++++++-- .../xpack/rollup/config/ConfigTests.java | 8 + .../xpack/rollup/job/IndexerUtilsTests.java | 2 +- .../api/xpack.rollup.rollup_search.json | 26 + .../test/rollup/rollup_search.yml | 658 ++++++++++++++++++ 21 files changed, 2669 insertions(+), 967 deletions(-) create mode 100644 plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupJobIdentifierUtils.java create mode 100644 plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupJobIdentifierUtilTests.java create mode 100644 plugin/src/test/resources/rest-api-spec/api/xpack.rollup.rollup_search.json create mode 100644 plugin/src/test/resources/rest-api-spec/test/rollup/rollup_search.yml diff --git a/docs/en/rest-api/rollup/rollup-search.asciidoc b/docs/en/rest-api/rollup/rollup-search.asciidoc index e10e2c4e35d..5d033759e57 100644 --- a/docs/en/rest-api/rollup/rollup-search.asciidoc +++ b/docs/en/rest-api/rollup/rollup-search.asciidoc @@ -103,6 +103,7 @@ aggregation has been used on the `temperature` field, yielding the following res { "took" : 102, "timed_out" : false, + "terminated_early" : false, "_shards" : ... , "hits" : { "total" : 0, @@ -142,7 +143,7 @@ GET sensor_rollup/_rollup_search -------------------------------------------------- // CONSOLE // TEST[continued] -// TEST[catch:bad_request] +// TEST[catch:/illegal_argument_exception/] [source,js] ---- @@ -151,12 +152,12 @@ GET sensor_rollup/_rollup_search "root_cause" : [ { "type" : "illegal_argument_exception", - "reason" : "There is not a [avg] agg with name [temperature] configured in selected rollup indices, cannot translate aggregation.", + "reason" : "There is not a rollup job that has a [avg] agg with name [avg_temperature] which also satisfies all requirements of query.", "stack_trace": ... } ], "type" : "illegal_argument_exception", - "reason" : "There is not a [avg] agg with name [temperature] configured in selected rollup indices, cannot translate aggregation.", + "reason" : "There is not a rollup job that has a [avg] agg with name [avg_temperature] which also satisfies all requirements of query.", "stack_trace": ... }, "status": 400 @@ -204,6 +205,7 @@ The response to the above query will look as expected, despite spanning rollup a { "took" : 102, "timed_out" : false, + "terminated_early" : false, "_shards" : ... , "hits" : { "total" : 0, diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/RollupField.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/RollupField.java index 663f6d500cb..9efca1da16c 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/RollupField.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/RollupField.java @@ -20,6 +20,7 @@ public class RollupField { public static final String TIMESTAMP = "timestamp"; public static final String FILTER = "filter"; public static final String NAME = "rollup"; + public static final String AGG = "agg"; /** * Format to the appropriate Rollup field name convention @@ -59,6 +60,15 @@ public class RollupField { return field + "." + RollupField.COUNT_FIELD; } + /** + * Format to the appropriate Rollup convention for agg names that + * might conflict with empty buckets. `value` is appended to agg name. + * E.g. used for averages + */ + public static String formatValueAggName(String field) { + return field + "." + RollupField.VALUE; + } + /** * Format into the convention for computed field lookups */ diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupJobCaps.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupJobCaps.java index 48f60749aec..f37a351c013 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupJobCaps.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupJobCaps.java @@ -9,8 +9,13 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; import java.io.IOException; @@ -20,7 +25,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.BiConsumer; +/** + * Represents the Rollup capabilities for a specific job on a single rollup index + */ public class RollupJobCaps implements Writeable, ToXContentObject { private static ParseField JOB_ID = new ParseField("job_id"); private static ParseField ROLLUP_INDEX = new ParseField("rollup_index"); diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java index 41bb6c024ea..31722f09784 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java @@ -264,11 +264,14 @@ public class DateHistoGroupConfig implements Writeable, ToXContentFragment { public DateHistoGroupConfig build() { if (field == null || field.isEmpty()) { - throw new IllegalArgumentException("Parameter [" + FIELD + "] is mandatory."); + throw new IllegalArgumentException("Parameter [" + FIELD.getPreferredName() + "] is mandatory."); } if (timeZone == null) { timeZone = DateTimeZone.UTC; } + if (interval == null) { + throw new IllegalArgumentException("Parameter [" + INTERVAL.getPreferredName() + "] is mandatory."); + } // validate interval createRounding(interval.toString(), timeZone, INTERVAL.getPreferredName()); if (delay != null) { diff --git a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index 50c664a8658..dbc1f9767af 100644 --- a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -29,6 +29,10 @@ import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptService; import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; @@ -96,6 +100,9 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin public static final Set HEADER_FILTERS = new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication")); + public static final List SUPPORTED_METRICS = Arrays.asList(MaxAggregationBuilder.NAME, MinAggregationBuilder.NAME, + SumAggregationBuilder.NAME, AvgAggregationBuilder.NAME); + private final Settings settings; private final boolean enabled; diff --git a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupJobIdentifierUtils.java b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupJobIdentifierUtils.java new file mode 100644 index 00000000000..403f8fb3c72 --- /dev/null +++ b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupJobIdentifierUtils.java @@ -0,0 +1,333 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.rollup; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; +import org.elasticsearch.xpack.core.rollup.RollupField; +import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps; +import org.elasticsearch.xpack.core.rollup.job.DateHistoGroupConfig; +import org.joda.time.DateTimeZone; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * This class contains utilities to identify which jobs are the "best" for a given aggregation tree. + * It allows the caller to pass in a set of possible rollup job capabilities and get in return + * a smaller (but not guaranteed minimal) set of valid jobs that can be searched. + */ +public class RollupJobIdentifierUtils { + + private static final Comparator COMPARATOR = RollupJobIdentifierUtils.getComparator(); + + /** + * Given the aggregation tree and a list of available job capabilities, this method will return a set + * of the "best" jobs that should be searched. + * + * It does this by recursively descending through the aggregation tree and independently pruning the + * list of valid job caps in each branch. When a leaf node is reached in the branch, the remaining + * jobs are sorted by "best'ness" (see {@link #getComparator()} for the implementation) + * and added to a global set of "best jobs". + * + * Once all branches have been evaluated, the final set is returned to the calling code. + * + * Job "best'ness" is, briefly, the job(s) that have + * - The larger compatible date interval + * - Fewer and larger interval histograms + * - Fewer terms groups + * + * Note: the final set of "best" jobs is not guaranteed to be minimal, there may be redundant effort + * due to independent branches choosing jobs that are subsets of other branches. + * + * @param source The source aggregation that we are trying to find jobs for + * @param jobCaps The total set of available job caps on the index/indices + * @return A set of the "best" jobs out of the total job caps + */ + public static Set findBestJobs(AggregationBuilder source, Set jobCaps) { + // TODO there is an opportunity to optimize the returned caps to find the minimal set of required caps. + // For example, one leaf may have equally good jobs [A,B], while another leaf finds only job [B] to be best. + // If job A is a subset of job B, we could simply search job B in isolation and get the same results + // + // We can't do that today, because we don't (yet) have way of determining if one job is a sub/super set of another + Set bestCaps = new HashSet<>(); + doFindBestJobs(source, new ArrayList<>(jobCaps), bestCaps); + return bestCaps; + } + + private static void doFindBestJobs(AggregationBuilder source, List jobCaps, Set bestCaps) { + if (source.getWriteableName().equals(DateHistogramAggregationBuilder.NAME)) { + checkDateHisto((DateHistogramAggregationBuilder) source, jobCaps, bestCaps); + } else if (source.getWriteableName().equals(HistogramAggregationBuilder.NAME)) { + checkHisto((HistogramAggregationBuilder) source, jobCaps, bestCaps); + } else if (Rollup.SUPPORTED_METRICS.contains(source.getWriteableName())) { + checkVSLeaf((ValuesSourceAggregationBuilder.LeafOnly) source, jobCaps, bestCaps); + } else if (source.getWriteableName().equals(TermsAggregationBuilder.NAME)) { + checkTerms((TermsAggregationBuilder)source, jobCaps, bestCaps); + } else { + throw new IllegalArgumentException("Unable to translate aggregation tree into Rollup. Aggregation [" + + source.getName() + "] is of type [" + source.getClass().getSimpleName() + "] which is " + + "currently unsupported."); + } + } + + /** + * Find the set of date_histo's with the largest granularity interval + */ + private static void checkDateHisto(DateHistogramAggregationBuilder source, List jobCaps, + Set bestCaps) { + ArrayList localCaps = new ArrayList<>(); + for (RollupJobCaps cap : jobCaps) { + RollupJobCaps.RollupFieldCaps fieldCaps = cap.getFieldCaps().get(source.field()); + if (fieldCaps != null) { + for (Map agg : fieldCaps.getAggs()) { + if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) { + TimeValue interval = TimeValue.parseTimeValue((String)agg.get(RollupField.INTERVAL), "date_histogram.interval"); + String thisTimezone = (String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName()); + String sourceTimeZone = source.timeZone() == null ? DateTimeZone.UTC.toString() : source.timeZone().toString(); + + // Ensure we are working on the same timezone + if (thisTimezone.equalsIgnoreCase(sourceTimeZone) == false) { + continue; + } + if (source.dateHistogramInterval() != null) { + TimeValue sourceInterval = TimeValue.parseTimeValue(source.dateHistogramInterval().toString(), + "source.date_histogram.interval"); + //TODO should be divisor of interval + if (interval.compareTo(sourceInterval) <= 0) { + localCaps.add(cap); + } + } else { + if (interval.getMillis() <= source.interval()) { + localCaps.add(cap); + } + } + break; + } + } + } + } + + if (localCaps.isEmpty()) { + throw new IllegalArgumentException("There is not a rollup job that has a [" + source.getWriteableName() + "] agg on field [" + + source.field() + "] which also satisfies all requirements of query."); + } + + // We are a leaf, save our best caps + if (source.getSubAggregations().size() == 0) { + bestCaps.add(getTopEqualCaps(localCaps)); + } else { + // otherwise keep working down the tree + source.getSubAggregations().forEach(sub -> doFindBestJobs(sub, localCaps, bestCaps)); + } + } + + /** + * Find the set of histo's with the largest interval + */ + private static void checkHisto(HistogramAggregationBuilder source, List jobCaps, Set bestCaps) { + ArrayList localCaps = new ArrayList<>(); + for (RollupJobCaps cap : jobCaps) { + RollupJobCaps.RollupFieldCaps fieldCaps = cap.getFieldCaps().get(source.field()); + if (fieldCaps != null) { + for (Map agg : fieldCaps.getAggs()) { + if (agg.get(RollupField.AGG).equals(HistogramAggregationBuilder.NAME)) { + Long interval = (long)agg.get(RollupField.INTERVAL); + // TODO should be divisor of interval + if (interval <= source.interval()) { + localCaps.add(cap); + } + break; + } + } + } + } + + if (localCaps.isEmpty()) { + throw new IllegalArgumentException("There is not a rollup job that has a [" + source.getWriteableName() + "] agg on field [" + + source.field() + "] which also satisfies all requirements of query."); + } + + // We are a leaf, save our best caps + if (source.getSubAggregations().size() == 0) { + bestCaps.add(getTopEqualCaps(localCaps)); + } else { + // otherwise keep working down the tree + source.getSubAggregations().forEach(sub -> doFindBestJobs(sub, localCaps, bestCaps)); + } + } + + /** + * Ensure that the terms aggregation is supported by one or more job caps. There is no notion of "best" + * caps for terms, it is either supported or not. + */ + private static void checkTerms(TermsAggregationBuilder source, List jobCaps, Set bestCaps) { + ArrayList localCaps = new ArrayList<>(); + for (RollupJobCaps cap : jobCaps) { + RollupJobCaps.RollupFieldCaps fieldCaps = cap.getFieldCaps().get(source.field()); + if (fieldCaps != null) { + for (Map agg : fieldCaps.getAggs()) { + if (agg.get(RollupField.AGG).equals(TermsAggregationBuilder.NAME)) { + localCaps.add(cap); + break; + } + } + } + } + + if (localCaps.isEmpty()) { + throw new IllegalArgumentException("There is not a rollup job that has a [" + source.getWriteableName() + "] agg on field [" + + source.field() + "] which also satisfies all requirements of query."); + } + + // We are a leaf, save our best caps + if (source.getSubAggregations().size() == 0) { + bestCaps.add(getTopEqualCaps(localCaps)); + } else { + // otherwise keep working down the tree + source.getSubAggregations().forEach(sub -> doFindBestJobs(sub, localCaps, bestCaps)); + } + } + + /** + * Ensure that the metrics are supported by one or more job caps. There is no notion of "best" + * caps for metrics, it is either supported or not. + */ + private static void checkVSLeaf(ValuesSourceAggregationBuilder.LeafOnly source, List jobCaps, + Set bestCaps) { + ArrayList localCaps = new ArrayList<>(); + for (RollupJobCaps cap : jobCaps) { + RollupJobCaps.RollupFieldCaps fieldCaps = cap.getFieldCaps().get(source.field()); + if (fieldCaps != null) { + for (Map agg : fieldCaps.getAggs()) { + if (agg.get(RollupField.AGG).equals(source.getWriteableName())) { + localCaps.add(cap); + break; + } + } + } + } + + if (localCaps.isEmpty()) { + throw new IllegalArgumentException("There is not a rollup job that has a [" + source.getWriteableName() + "] agg with name [" + + source.getName() + "] which also satisfies all requirements of query."); + } + + // Metrics are always leaves so go ahead and add to best caps + bestCaps.add(getTopEqualCaps(localCaps)); + } + + private static RollupJobCaps getTopEqualCaps(List caps) { + assert caps.isEmpty() == false; + caps.sort(COMPARATOR); + return caps.get(0); + } + + private static Comparator getComparator() { + return (o1, o2) -> { + if (o1 == null) { + throw new NullPointerException("RollupJobCap [o1] cannot be null"); + } + if (o2 == null) { + throw new NullPointerException("RollupJobCap [o2] cannot be null"); + } + + if (o1.equals(o2)) { + return 0; + } + + TimeValue thisTime = null; + TimeValue thatTime = null; + + // histogram intervals are averaged and compared, with the idea that + // a larger average == better, because it will generate fewer documents + float thisHistoWeights = 0; + float thatHistoWeights = 0; + long counter = 0; + + // Similarly, fewer terms groups will generate fewer documents, so + // we count the number of terms groups + long thisTermsWeights = 0; + long thatTermsWeights = 0; + + // Iterate over the first Caps and collect the various stats + for (RollupJobCaps.RollupFieldCaps fieldCaps : o1.getFieldCaps().values()) { + for (Map agg : fieldCaps.getAggs()) { + if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) { + thisTime = TimeValue.parseTimeValue((String) agg.get(RollupField.INTERVAL), RollupField.INTERVAL); + } else if (agg.get(RollupField.AGG).equals(HistogramAggregationBuilder.NAME)) { + thisHistoWeights += (long) agg.get(RollupField.INTERVAL); + counter += 1; + } else if (agg.get(RollupField.AGG).equals(TermsAggregationBuilder.NAME)) { + thisTermsWeights += 1; + } + } + } + thisHistoWeights = counter == 0 ? 0 : thisHistoWeights / counter; + + // Iterate over the second Cap and collect the same stats + counter = 0; + for (RollupJobCaps.RollupFieldCaps fieldCaps : o2.getFieldCaps().values()) { + for (Map agg : fieldCaps.getAggs()) { + if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) { + thatTime = TimeValue.parseTimeValue((String) agg.get(RollupField.INTERVAL), RollupField.INTERVAL); + } else if (agg.get(RollupField.AGG).equals(HistogramAggregationBuilder.NAME)) { + thatHistoWeights += (long) agg.get(RollupField.INTERVAL); + counter += 1; + } else if (agg.get(RollupField.AGG).equals(TermsAggregationBuilder.NAME)) { + thatTermsWeights += 1; + } + } + } + thatHistoWeights = counter == 0 ? 0 : thatHistoWeights / counter; + + // DateHistos are mandatory so these should always be present no matter what + assert thisTime != null; + assert thatTime != null; + + // Compare on date interval first + // The "smaller" job is the one with the larger interval + int timeCompare = thisTime.compareTo(thatTime); + if (timeCompare != 0) { + return -timeCompare; + } + + // If dates are the same, the "smaller" job is the one with a larger histo avg histo weight. + // Not bullet proof, but heuristically we prefer: + // - one job with interval 100 (avg 100) over one job with interval 10 (avg 10) + // - one job with interval 100 (avg 100) over one job with ten histos @ interval 10 (avg 10) + // because in both cases the larger intervals likely generate fewer documents + // + // The exception is if one of jobs had no histo (avg 0) then we prefer that + int histoCompare = Float.compare(thisHistoWeights, thatHistoWeights); + if (histoCompare != 0) { + if (thisHistoWeights == 0) { + return -1; + } else if (thatHistoWeights == 0) { + return 1; + } + return -histoCompare; + } + + // If dates and histo are same, the "smaller" job is the one with fewer terms aggs since + // hopefully will generate fewer docs + return Long.compare(thisTermsWeights, thatTermsWeights); + + // Ignoring metrics for now, since the "best job" resolution doesn't take those into account + // and we rely on the msearch functionality to merge away and duplicates + // Could potentially optimize there in the future to choose jobs with more metric + // coverage + }; + } +} diff --git a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupRequestTranslator.java b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupRequestTranslator.java index 16b2127fc45..816618c8f0b 100644 --- a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupRequestTranslator.java +++ b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupRequestTranslator.java @@ -10,7 +10,6 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder; @@ -21,13 +20,12 @@ import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; import org.elasticsearch.xpack.core.rollup.RollupField; -import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps; import org.elasticsearch.xpack.core.rollup.job.DateHistoGroupConfig; +import org.joda.time.DateTimeZone; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.function.Supplier; @@ -52,7 +50,7 @@ import java.util.function.Supplier; * } * * - * The only publicly "consumable" API is {@link #translateAggregation(AggregationBuilder, List, NamedWriteableRegistry, List)}. + * The only publicly "consumable" API is {@link #translateAggregation(AggregationBuilder, List, NamedWriteableRegistry)}. */ public class RollupRequestTranslator { @@ -133,22 +131,18 @@ public class RollupRequestTranslator { */ public static List translateAggregation(AggregationBuilder source, List filterConditions, - NamedWriteableRegistry registry, - List jobCaps) { + NamedWriteableRegistry registry) { + if (source.getWriteableName().equals(DateHistogramAggregationBuilder.NAME)) { - validateAgg(((DateHistogramAggregationBuilder)source).field(), jobCaps, source.getWriteableName()); - return translateDateHistogram((DateHistogramAggregationBuilder) source, filterConditions, registry, jobCaps); + return translateDateHistogram((DateHistogramAggregationBuilder) source, filterConditions, registry); } else if (source.getWriteableName().equals(HistogramAggregationBuilder.NAME)) { - validateAgg(((HistogramAggregationBuilder)source).field(), jobCaps, source.getWriteableName()); - return translateHistogram((HistogramAggregationBuilder) source, filterConditions, registry, jobCaps); - } else if (source instanceof ValuesSourceAggregationBuilder.LeafOnly) { - validateAgg(((ValuesSourceAggregationBuilder.LeafOnly)source).field(), jobCaps, source.getWriteableName()); + return translateHistogram((HistogramAggregationBuilder) source, filterConditions, registry); + } else if (Rollup.SUPPORTED_METRICS.contains(source.getWriteableName())) { return translateVSLeaf((ValuesSourceAggregationBuilder.LeafOnly)source, registry); } else if (source.getWriteableName().equals(TermsAggregationBuilder.NAME)) { - validateAgg(((TermsAggregationBuilder)source).field(), jobCaps, source.getWriteableName()); - return translateTerms((TermsAggregationBuilder)source, filterConditions, registry, jobCaps); + return translateTerms((TermsAggregationBuilder)source, filterConditions, registry); } else { - throw new RuntimeException("Unable to translate aggregation tree into Rollup. Aggregation [" + throw new IllegalArgumentException("Unable to translate aggregation tree into Rollup. Aggregation [" + source.getName() + "] is of type [" + source.getClass().getSimpleName() + "] which is " + "currently unsupported."); } @@ -227,10 +221,9 @@ public class RollupRequestTranslator { */ private static List translateDateHistogram(DateHistogramAggregationBuilder source, List filterConditions, - NamedWriteableRegistry registry, - List jobCaps) { + NamedWriteableRegistry registry) { - return translateVSAggBuilder(source, filterConditions, registry, jobCaps, () -> { + return translateVSAggBuilder(source, filterConditions, registry, () -> { DateHistogramAggregationBuilder rolledDateHisto = new DateHistogramAggregationBuilder(source.getName()); @@ -240,46 +233,9 @@ public class RollupRequestTranslator { rolledDateHisto.interval(source.interval()); } - TimeValue bestInterval = null; - String bestJob = null; - String bestTZ = null; - for (RollupJobCaps cap : jobCaps) { - RollupJobCaps.RollupFieldCaps fieldCaps = cap.getFieldCaps().get(source.field()); - if (fieldCaps != null) { - for (Map agg : fieldCaps.getAggs()) { - if (agg.get("agg").equals(DateHistogramAggregationBuilder.NAME)) { - TimeValue interval = TimeValue.parseTimeValue((String)agg.get(RollupField.INTERVAL), "date_histogram.interval"); - if (bestInterval == null || interval.compareTo(bestInterval) < 0) { - bestInterval = interval; - bestJob = cap.getJobID(); - bestTZ = (String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName()); - } - break; - } - } - } - } - - // Even though rollups only use TimeValue, the user can still pass millis as the interval in a query, so we - // need to check to see what we're dealing with here. - if (source.dateHistogramInterval() != null) { - TimeValue sourceInterval = TimeValue.parseTimeValue(source.dateHistogramInterval().toString(), - "source.date_histogram.interval"); - if (bestInterval == null || bestInterval.compareTo(sourceInterval) > 0) { - throw new IllegalArgumentException("Could not find a rolled date_histogram configuration that satisfies the interval [" - + source.dateHistogramInterval() + "]"); - } - } else { - if (bestInterval == null || bestInterval.getMillis() > source.interval()) { - throw new IllegalArgumentException("Could not find a rolled date_histogram configuration that satisfies the interval [" - + source.interval() + "]"); - } - } - - filterConditions.add(new TermQueryBuilder(RollupField.formatFieldName(source, RollupField.INTERVAL), bestInterval.toString())); + String timezone = source.timeZone() == null ? DateTimeZone.UTC.toString() : source.timeZone().toString(); filterConditions.add(new TermQueryBuilder(RollupField.formatFieldName(source, - DateHistoGroupConfig.TIME_ZONE.getPreferredName()), bestTZ)); - filterConditions.add(new TermQueryBuilder(RollupField.formatMetaField(RollupField.ID.getPreferredName()), bestJob)); + DateHistoGroupConfig.TIME_ZONE.getPreferredName()), timezone)); rolledDateHisto.offset(source.offset()); if (source.extendedBounds() != null) { @@ -299,43 +255,17 @@ public class RollupRequestTranslator { * Notably, it adds a Sum metric to calculate the doc_count in each bucket. * * Conventions are identical to a date_histogram (excepting date-specific details), so see - * {@link #translateDateHistogram(DateHistogramAggregationBuilder, List, NamedWriteableRegistry, List)} for + * {@link #translateDateHistogram(DateHistogramAggregationBuilder, List, NamedWriteableRegistry)} for * a complete list of conventions, examples, etc */ private static List translateHistogram(HistogramAggregationBuilder source, List filterConditions, - NamedWriteableRegistry registry, - List jobCaps) { + NamedWriteableRegistry registry) { - return translateVSAggBuilder(source, filterConditions, registry, jobCaps, () -> { + return translateVSAggBuilder(source, filterConditions, registry, () -> { HistogramAggregationBuilder rolledHisto = new HistogramAggregationBuilder(source.getName()); - long bestInterval = Long.MAX_VALUE; - String bestJob = null; - for (RollupJobCaps cap : jobCaps) { - RollupJobCaps.RollupFieldCaps fieldCaps = cap.getFieldCaps().get(source.field()); - if (fieldCaps != null) { - for (Map agg : fieldCaps.getAggs()) { - if (agg.get("agg").equals(HistogramAggregationBuilder.NAME)) { - if ((long)agg.get(RollupField.INTERVAL) < bestInterval) { - bestInterval = (long)agg.get(RollupField.INTERVAL); - bestJob = cap.getJobID(); - } - break; - } - } - } - } - - if (bestInterval == Long.MAX_VALUE || bestInterval > source.interval()) { - throw new IllegalArgumentException("Could not find a rolled histogram configuration that satisfies the interval [" - + source.interval() + "]"); - } - - filterConditions.add(new TermQueryBuilder(RollupField.formatFieldName(source, RollupField.INTERVAL), bestInterval)); - filterConditions.add(new TermQueryBuilder(RollupField.formatMetaField(RollupField.ID.getPreferredName()), bestJob)); - rolledHisto.interval(source.interval()); rolledHisto.offset(source.offset()); if (Double.isFinite(source.minBound()) && Double.isFinite(source.maxBound())) { @@ -414,10 +344,9 @@ public class RollupRequestTranslator { */ private static List translateTerms(TermsAggregationBuilder source, List filterConditions, - NamedWriteableRegistry registry, - List jobCaps) { + NamedWriteableRegistry registry) { - return translateVSAggBuilder(source, filterConditions, registry, jobCaps, () -> { + return translateVSAggBuilder(source, filterConditions, registry, () -> { TermsAggregationBuilder rolledTerms = new TermsAggregationBuilder(source.getName(), source.valueType()); rolledTerms.field(RollupField.formatFieldName(source, RollupField.VALUE)); @@ -435,7 +364,7 @@ public class RollupRequestTranslator { rolledTerms.shardSize(source.shardSize()); } rolledTerms.showTermDocCountError(source.showTermDocCountError()); - //rolledTerms.size(termsAgg.size()); // TODO fix in core + rolledTerms.size(source.size()); return rolledTerms; }); } @@ -458,7 +387,7 @@ public class RollupRequestTranslator { */ private static List translateVSAggBuilder(ValuesSourceAggregationBuilder source, List filterConditions, - NamedWriteableRegistry registry, List jobCaps, Supplier factory) { + NamedWriteableRegistry registry, Supplier factory) { T rolled = factory.get(); @@ -470,7 +399,7 @@ public class RollupRequestTranslator { // Translate all subaggs and add to the newly translated agg // NOTE: using for loop instead of stream because compiler explodes with a bug :/ for (AggregationBuilder subAgg : source.getSubAggregations()) { - List translated = translateAggregation(subAgg, filterConditions, registry, jobCaps); + List translated = translateAggregation(subAgg, filterConditions, registry); for (AggregationBuilder t : translated) { rolled.subAggregation(t); } @@ -505,7 +434,8 @@ public class RollupRequestTranslator { * * However, for `avg` metrics (and potentially others in the future), the agg is translated into * a sum + sum aggs; one for count and one for sum. When unrolling these will be combined back into - * a single avg. E.g. for an `avg` agg: + * a single avg. Note that we also have to rename the avg agg name to distinguish it from empty + * buckets. E.g. for an `avg` agg: * *
{@code
      * {
@@ -518,7 +448,7 @@ public class RollupRequestTranslator {
      * 
{@code
      * [
      *   {
-     *    "the_avg": {
+     *    "the_avg.value": {
      *      "sum" : { "field" : "some_field.avg.value" }}
      *   },
      *   {
@@ -545,6 +475,7 @@ public class RollupRequestTranslator {
      * IF the agg is an AvgAgg, the following additional conventions are added:
      *