diff --git a/docs/en/rest-api/rollup/rollup-search.asciidoc b/docs/en/rest-api/rollup/rollup-search.asciidoc index e10e2c4e35d..5d033759e57 100644 --- a/docs/en/rest-api/rollup/rollup-search.asciidoc +++ b/docs/en/rest-api/rollup/rollup-search.asciidoc @@ -103,6 +103,7 @@ aggregation has been used on the `temperature` field, yielding the following res { "took" : 102, "timed_out" : false, + "terminated_early" : false, "_shards" : ... , "hits" : { "total" : 0, @@ -142,7 +143,7 @@ GET sensor_rollup/_rollup_search -------------------------------------------------- // CONSOLE // TEST[continued] -// TEST[catch:bad_request] +// TEST[catch:/illegal_argument_exception/] [source,js] ---- @@ -151,12 +152,12 @@ GET sensor_rollup/_rollup_search "root_cause" : [ { "type" : "illegal_argument_exception", - "reason" : "There is not a [avg] agg with name [temperature] configured in selected rollup indices, cannot translate aggregation.", + "reason" : "There is not a rollup job that has a [avg] agg with name [avg_temperature] which also satisfies all requirements of query.", "stack_trace": ... } ], "type" : "illegal_argument_exception", - "reason" : "There is not a [avg] agg with name [temperature] configured in selected rollup indices, cannot translate aggregation.", + "reason" : "There is not a rollup job that has a [avg] agg with name [avg_temperature] which also satisfies all requirements of query.", "stack_trace": ... }, "status": 400 @@ -204,6 +205,7 @@ The response to the above query will look as expected, despite spanning rollup a { "took" : 102, "timed_out" : false, + "terminated_early" : false, "_shards" : ... , "hits" : { "total" : 0, diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/RollupField.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/RollupField.java index 663f6d500cb..9efca1da16c 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/RollupField.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/RollupField.java @@ -20,6 +20,7 @@ public class RollupField { public static final String TIMESTAMP = "timestamp"; public static final String FILTER = "filter"; public static final String NAME = "rollup"; + public static final String AGG = "agg"; /** * Format to the appropriate Rollup field name convention @@ -59,6 +60,15 @@ public class RollupField { return field + "." + RollupField.COUNT_FIELD; } + /** + * Format to the appropriate Rollup convention for agg names that + * might conflict with empty buckets. `value` is appended to agg name. + * E.g. used for averages + */ + public static String formatValueAggName(String field) { + return field + "." + RollupField.VALUE; + } + /** * Format into the convention for computed field lookups */ diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupJobCaps.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupJobCaps.java index 48f60749aec..f37a351c013 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupJobCaps.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupJobCaps.java @@ -9,8 +9,13 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; import java.io.IOException; @@ -20,7 +25,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.BiConsumer; +/** + * Represents the Rollup capabilities for a specific job on a single rollup index + */ public class RollupJobCaps implements Writeable, ToXContentObject { private static ParseField JOB_ID = new ParseField("job_id"); private static ParseField ROLLUP_INDEX = new ParseField("rollup_index"); diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java index 41bb6c024ea..31722f09784 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java @@ -264,11 +264,14 @@ public class DateHistoGroupConfig implements Writeable, ToXContentFragment { public DateHistoGroupConfig build() { if (field == null || field.isEmpty()) { - throw new IllegalArgumentException("Parameter [" + FIELD + "] is mandatory."); + throw new IllegalArgumentException("Parameter [" + FIELD.getPreferredName() + "] is mandatory."); } if (timeZone == null) { timeZone = DateTimeZone.UTC; } + if (interval == null) { + throw new IllegalArgumentException("Parameter [" + INTERVAL.getPreferredName() + "] is mandatory."); + } // validate interval createRounding(interval.toString(), timeZone, INTERVAL.getPreferredName()); if (delay != null) { diff --git a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index 50c664a8658..dbc1f9767af 100644 --- a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -29,6 +29,10 @@ import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptService; import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; @@ -96,6 +100,9 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin public static final Set HEADER_FILTERS = new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication")); + public static final List SUPPORTED_METRICS = Arrays.asList(MaxAggregationBuilder.NAME, MinAggregationBuilder.NAME, + SumAggregationBuilder.NAME, AvgAggregationBuilder.NAME); + private final Settings settings; private final boolean enabled; diff --git a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupJobIdentifierUtils.java b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupJobIdentifierUtils.java new file mode 100644 index 00000000000..403f8fb3c72 --- /dev/null +++ b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupJobIdentifierUtils.java @@ -0,0 +1,333 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.rollup; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; +import org.elasticsearch.xpack.core.rollup.RollupField; +import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps; +import org.elasticsearch.xpack.core.rollup.job.DateHistoGroupConfig; +import org.joda.time.DateTimeZone; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * This class contains utilities to identify which jobs are the "best" for a given aggregation tree. + * It allows the caller to pass in a set of possible rollup job capabilities and get in return + * a smaller (but not guaranteed minimal) set of valid jobs that can be searched. + */ +public class RollupJobIdentifierUtils { + + private static final Comparator COMPARATOR = RollupJobIdentifierUtils.getComparator(); + + /** + * Given the aggregation tree and a list of available job capabilities, this method will return a set + * of the "best" jobs that should be searched. + * + * It does this by recursively descending through the aggregation tree and independently pruning the + * list of valid job caps in each branch. When a leaf node is reached in the branch, the remaining + * jobs are sorted by "best'ness" (see {@link #getComparator()} for the implementation) + * and added to a global set of "best jobs". + * + * Once all branches have been evaluated, the final set is returned to the calling code. + * + * Job "best'ness" is, briefly, the job(s) that have + * - The larger compatible date interval + * - Fewer and larger interval histograms + * - Fewer terms groups + * + * Note: the final set of "best" jobs is not guaranteed to be minimal, there may be redundant effort + * due to independent branches choosing jobs that are subsets of other branches. + * + * @param source The source aggregation that we are trying to find jobs for + * @param jobCaps The total set of available job caps on the index/indices + * @return A set of the "best" jobs out of the total job caps + */ + public static Set findBestJobs(AggregationBuilder source, Set jobCaps) { + // TODO there is an opportunity to optimize the returned caps to find the minimal set of required caps. + // For example, one leaf may have equally good jobs [A,B], while another leaf finds only job [B] to be best. + // If job A is a subset of job B, we could simply search job B in isolation and get the same results + // + // We can't do that today, because we don't (yet) have way of determining if one job is a sub/super set of another + Set bestCaps = new HashSet<>(); + doFindBestJobs(source, new ArrayList<>(jobCaps), bestCaps); + return bestCaps; + } + + private static void doFindBestJobs(AggregationBuilder source, List jobCaps, Set bestCaps) { + if (source.getWriteableName().equals(DateHistogramAggregationBuilder.NAME)) { + checkDateHisto((DateHistogramAggregationBuilder) source, jobCaps, bestCaps); + } else if (source.getWriteableName().equals(HistogramAggregationBuilder.NAME)) { + checkHisto((HistogramAggregationBuilder) source, jobCaps, bestCaps); + } else if (Rollup.SUPPORTED_METRICS.contains(source.getWriteableName())) { + checkVSLeaf((ValuesSourceAggregationBuilder.LeafOnly) source, jobCaps, bestCaps); + } else if (source.getWriteableName().equals(TermsAggregationBuilder.NAME)) { + checkTerms((TermsAggregationBuilder)source, jobCaps, bestCaps); + } else { + throw new IllegalArgumentException("Unable to translate aggregation tree into Rollup. Aggregation [" + + source.getName() + "] is of type [" + source.getClass().getSimpleName() + "] which is " + + "currently unsupported."); + } + } + + /** + * Find the set of date_histo's with the largest granularity interval + */ + private static void checkDateHisto(DateHistogramAggregationBuilder source, List jobCaps, + Set bestCaps) { + ArrayList localCaps = new ArrayList<>(); + for (RollupJobCaps cap : jobCaps) { + RollupJobCaps.RollupFieldCaps fieldCaps = cap.getFieldCaps().get(source.field()); + if (fieldCaps != null) { + for (Map agg : fieldCaps.getAggs()) { + if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) { + TimeValue interval = TimeValue.parseTimeValue((String)agg.get(RollupField.INTERVAL), "date_histogram.interval"); + String thisTimezone = (String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName()); + String sourceTimeZone = source.timeZone() == null ? DateTimeZone.UTC.toString() : source.timeZone().toString(); + + // Ensure we are working on the same timezone + if (thisTimezone.equalsIgnoreCase(sourceTimeZone) == false) { + continue; + } + if (source.dateHistogramInterval() != null) { + TimeValue sourceInterval = TimeValue.parseTimeValue(source.dateHistogramInterval().toString(), + "source.date_histogram.interval"); + //TODO should be divisor of interval + if (interval.compareTo(sourceInterval) <= 0) { + localCaps.add(cap); + } + } else { + if (interval.getMillis() <= source.interval()) { + localCaps.add(cap); + } + } + break; + } + } + } + } + + if (localCaps.isEmpty()) { + throw new IllegalArgumentException("There is not a rollup job that has a [" + source.getWriteableName() + "] agg on field [" + + source.field() + "] which also satisfies all requirements of query."); + } + + // We are a leaf, save our best caps + if (source.getSubAggregations().size() == 0) { + bestCaps.add(getTopEqualCaps(localCaps)); + } else { + // otherwise keep working down the tree + source.getSubAggregations().forEach(sub -> doFindBestJobs(sub, localCaps, bestCaps)); + } + } + + /** + * Find the set of histo's with the largest interval + */ + private static void checkHisto(HistogramAggregationBuilder source, List jobCaps, Set bestCaps) { + ArrayList localCaps = new ArrayList<>(); + for (RollupJobCaps cap : jobCaps) { + RollupJobCaps.RollupFieldCaps fieldCaps = cap.getFieldCaps().get(source.field()); + if (fieldCaps != null) { + for (Map agg : fieldCaps.getAggs()) { + if (agg.get(RollupField.AGG).equals(HistogramAggregationBuilder.NAME)) { + Long interval = (long)agg.get(RollupField.INTERVAL); + // TODO should be divisor of interval + if (interval <= source.interval()) { + localCaps.add(cap); + } + break; + } + } + } + } + + if (localCaps.isEmpty()) { + throw new IllegalArgumentException("There is not a rollup job that has a [" + source.getWriteableName() + "] agg on field [" + + source.field() + "] which also satisfies all requirements of query."); + } + + // We are a leaf, save our best caps + if (source.getSubAggregations().size() == 0) { + bestCaps.add(getTopEqualCaps(localCaps)); + } else { + // otherwise keep working down the tree + source.getSubAggregations().forEach(sub -> doFindBestJobs(sub, localCaps, bestCaps)); + } + } + + /** + * Ensure that the terms aggregation is supported by one or more job caps. There is no notion of "best" + * caps for terms, it is either supported or not. + */ + private static void checkTerms(TermsAggregationBuilder source, List jobCaps, Set bestCaps) { + ArrayList localCaps = new ArrayList<>(); + for (RollupJobCaps cap : jobCaps) { + RollupJobCaps.RollupFieldCaps fieldCaps = cap.getFieldCaps().get(source.field()); + if (fieldCaps != null) { + for (Map agg : fieldCaps.getAggs()) { + if (agg.get(RollupField.AGG).equals(TermsAggregationBuilder.NAME)) { + localCaps.add(cap); + break; + } + } + } + } + + if (localCaps.isEmpty()) { + throw new IllegalArgumentException("There is not a rollup job that has a [" + source.getWriteableName() + "] agg on field [" + + source.field() + "] which also satisfies all requirements of query."); + } + + // We are a leaf, save our best caps + if (source.getSubAggregations().size() == 0) { + bestCaps.add(getTopEqualCaps(localCaps)); + } else { + // otherwise keep working down the tree + source.getSubAggregations().forEach(sub -> doFindBestJobs(sub, localCaps, bestCaps)); + } + } + + /** + * Ensure that the metrics are supported by one or more job caps. There is no notion of "best" + * caps for metrics, it is either supported or not. + */ + private static void checkVSLeaf(ValuesSourceAggregationBuilder.LeafOnly source, List jobCaps, + Set bestCaps) { + ArrayList localCaps = new ArrayList<>(); + for (RollupJobCaps cap : jobCaps) { + RollupJobCaps.RollupFieldCaps fieldCaps = cap.getFieldCaps().get(source.field()); + if (fieldCaps != null) { + for (Map agg : fieldCaps.getAggs()) { + if (agg.get(RollupField.AGG).equals(source.getWriteableName())) { + localCaps.add(cap); + break; + } + } + } + } + + if (localCaps.isEmpty()) { + throw new IllegalArgumentException("There is not a rollup job that has a [" + source.getWriteableName() + "] agg with name [" + + source.getName() + "] which also satisfies all requirements of query."); + } + + // Metrics are always leaves so go ahead and add to best caps + bestCaps.add(getTopEqualCaps(localCaps)); + } + + private static RollupJobCaps getTopEqualCaps(List caps) { + assert caps.isEmpty() == false; + caps.sort(COMPARATOR); + return caps.get(0); + } + + private static Comparator getComparator() { + return (o1, o2) -> { + if (o1 == null) { + throw new NullPointerException("RollupJobCap [o1] cannot be null"); + } + if (o2 == null) { + throw new NullPointerException("RollupJobCap [o2] cannot be null"); + } + + if (o1.equals(o2)) { + return 0; + } + + TimeValue thisTime = null; + TimeValue thatTime = null; + + // histogram intervals are averaged and compared, with the idea that + // a larger average == better, because it will generate fewer documents + float thisHistoWeights = 0; + float thatHistoWeights = 0; + long counter = 0; + + // Similarly, fewer terms groups will generate fewer documents, so + // we count the number of terms groups + long thisTermsWeights = 0; + long thatTermsWeights = 0; + + // Iterate over the first Caps and collect the various stats + for (RollupJobCaps.RollupFieldCaps fieldCaps : o1.getFieldCaps().values()) { + for (Map agg : fieldCaps.getAggs()) { + if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) { + thisTime = TimeValue.parseTimeValue((String) agg.get(RollupField.INTERVAL), RollupField.INTERVAL); + } else if (agg.get(RollupField.AGG).equals(HistogramAggregationBuilder.NAME)) { + thisHistoWeights += (long) agg.get(RollupField.INTERVAL); + counter += 1; + } else if (agg.get(RollupField.AGG).equals(TermsAggregationBuilder.NAME)) { + thisTermsWeights += 1; + } + } + } + thisHistoWeights = counter == 0 ? 0 : thisHistoWeights / counter; + + // Iterate over the second Cap and collect the same stats + counter = 0; + for (RollupJobCaps.RollupFieldCaps fieldCaps : o2.getFieldCaps().values()) { + for (Map agg : fieldCaps.getAggs()) { + if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) { + thatTime = TimeValue.parseTimeValue((String) agg.get(RollupField.INTERVAL), RollupField.INTERVAL); + } else if (agg.get(RollupField.AGG).equals(HistogramAggregationBuilder.NAME)) { + thatHistoWeights += (long) agg.get(RollupField.INTERVAL); + counter += 1; + } else if (agg.get(RollupField.AGG).equals(TermsAggregationBuilder.NAME)) { + thatTermsWeights += 1; + } + } + } + thatHistoWeights = counter == 0 ? 0 : thatHistoWeights / counter; + + // DateHistos are mandatory so these should always be present no matter what + assert thisTime != null; + assert thatTime != null; + + // Compare on date interval first + // The "smaller" job is the one with the larger interval + int timeCompare = thisTime.compareTo(thatTime); + if (timeCompare != 0) { + return -timeCompare; + } + + // If dates are the same, the "smaller" job is the one with a larger histo avg histo weight. + // Not bullet proof, but heuristically we prefer: + // - one job with interval 100 (avg 100) over one job with interval 10 (avg 10) + // - one job with interval 100 (avg 100) over one job with ten histos @ interval 10 (avg 10) + // because in both cases the larger intervals likely generate fewer documents + // + // The exception is if one of jobs had no histo (avg 0) then we prefer that + int histoCompare = Float.compare(thisHistoWeights, thatHistoWeights); + if (histoCompare != 0) { + if (thisHistoWeights == 0) { + return -1; + } else if (thatHistoWeights == 0) { + return 1; + } + return -histoCompare; + } + + // If dates and histo are same, the "smaller" job is the one with fewer terms aggs since + // hopefully will generate fewer docs + return Long.compare(thisTermsWeights, thatTermsWeights); + + // Ignoring metrics for now, since the "best job" resolution doesn't take those into account + // and we rely on the msearch functionality to merge away and duplicates + // Could potentially optimize there in the future to choose jobs with more metric + // coverage + }; + } +} diff --git a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupRequestTranslator.java b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupRequestTranslator.java index 16b2127fc45..816618c8f0b 100644 --- a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupRequestTranslator.java +++ b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupRequestTranslator.java @@ -10,7 +10,6 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder; @@ -21,13 +20,12 @@ import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; import org.elasticsearch.xpack.core.rollup.RollupField; -import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps; import org.elasticsearch.xpack.core.rollup.job.DateHistoGroupConfig; +import org.joda.time.DateTimeZone; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.function.Supplier; @@ -52,7 +50,7 @@ import java.util.function.Supplier; * } * * - * The only publicly "consumable" API is {@link #translateAggregation(AggregationBuilder, List, NamedWriteableRegistry, List)}. + * The only publicly "consumable" API is {@link #translateAggregation(AggregationBuilder, List, NamedWriteableRegistry)}. */ public class RollupRequestTranslator { @@ -133,22 +131,18 @@ public class RollupRequestTranslator { */ public static List translateAggregation(AggregationBuilder source, List filterConditions, - NamedWriteableRegistry registry, - List jobCaps) { + NamedWriteableRegistry registry) { + if (source.getWriteableName().equals(DateHistogramAggregationBuilder.NAME)) { - validateAgg(((DateHistogramAggregationBuilder)source).field(), jobCaps, source.getWriteableName()); - return translateDateHistogram((DateHistogramAggregationBuilder) source, filterConditions, registry, jobCaps); + return translateDateHistogram((DateHistogramAggregationBuilder) source, filterConditions, registry); } else if (source.getWriteableName().equals(HistogramAggregationBuilder.NAME)) { - validateAgg(((HistogramAggregationBuilder)source).field(), jobCaps, source.getWriteableName()); - return translateHistogram((HistogramAggregationBuilder) source, filterConditions, registry, jobCaps); - } else if (source instanceof ValuesSourceAggregationBuilder.LeafOnly) { - validateAgg(((ValuesSourceAggregationBuilder.LeafOnly)source).field(), jobCaps, source.getWriteableName()); + return translateHistogram((HistogramAggregationBuilder) source, filterConditions, registry); + } else if (Rollup.SUPPORTED_METRICS.contains(source.getWriteableName())) { return translateVSLeaf((ValuesSourceAggregationBuilder.LeafOnly)source, registry); } else if (source.getWriteableName().equals(TermsAggregationBuilder.NAME)) { - validateAgg(((TermsAggregationBuilder)source).field(), jobCaps, source.getWriteableName()); - return translateTerms((TermsAggregationBuilder)source, filterConditions, registry, jobCaps); + return translateTerms((TermsAggregationBuilder)source, filterConditions, registry); } else { - throw new RuntimeException("Unable to translate aggregation tree into Rollup. Aggregation [" + throw new IllegalArgumentException("Unable to translate aggregation tree into Rollup. Aggregation [" + source.getName() + "] is of type [" + source.getClass().getSimpleName() + "] which is " + "currently unsupported."); } @@ -227,10 +221,9 @@ public class RollupRequestTranslator { */ private static List translateDateHistogram(DateHistogramAggregationBuilder source, List filterConditions, - NamedWriteableRegistry registry, - List jobCaps) { + NamedWriteableRegistry registry) { - return translateVSAggBuilder(source, filterConditions, registry, jobCaps, () -> { + return translateVSAggBuilder(source, filterConditions, registry, () -> { DateHistogramAggregationBuilder rolledDateHisto = new DateHistogramAggregationBuilder(source.getName()); @@ -240,46 +233,9 @@ public class RollupRequestTranslator { rolledDateHisto.interval(source.interval()); } - TimeValue bestInterval = null; - String bestJob = null; - String bestTZ = null; - for (RollupJobCaps cap : jobCaps) { - RollupJobCaps.RollupFieldCaps fieldCaps = cap.getFieldCaps().get(source.field()); - if (fieldCaps != null) { - for (Map agg : fieldCaps.getAggs()) { - if (agg.get("agg").equals(DateHistogramAggregationBuilder.NAME)) { - TimeValue interval = TimeValue.parseTimeValue((String)agg.get(RollupField.INTERVAL), "date_histogram.interval"); - if (bestInterval == null || interval.compareTo(bestInterval) < 0) { - bestInterval = interval; - bestJob = cap.getJobID(); - bestTZ = (String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName()); - } - break; - } - } - } - } - - // Even though rollups only use TimeValue, the user can still pass millis as the interval in a query, so we - // need to check to see what we're dealing with here. - if (source.dateHistogramInterval() != null) { - TimeValue sourceInterval = TimeValue.parseTimeValue(source.dateHistogramInterval().toString(), - "source.date_histogram.interval"); - if (bestInterval == null || bestInterval.compareTo(sourceInterval) > 0) { - throw new IllegalArgumentException("Could not find a rolled date_histogram configuration that satisfies the interval [" - + source.dateHistogramInterval() + "]"); - } - } else { - if (bestInterval == null || bestInterval.getMillis() > source.interval()) { - throw new IllegalArgumentException("Could not find a rolled date_histogram configuration that satisfies the interval [" - + source.interval() + "]"); - } - } - - filterConditions.add(new TermQueryBuilder(RollupField.formatFieldName(source, RollupField.INTERVAL), bestInterval.toString())); + String timezone = source.timeZone() == null ? DateTimeZone.UTC.toString() : source.timeZone().toString(); filterConditions.add(new TermQueryBuilder(RollupField.formatFieldName(source, - DateHistoGroupConfig.TIME_ZONE.getPreferredName()), bestTZ)); - filterConditions.add(new TermQueryBuilder(RollupField.formatMetaField(RollupField.ID.getPreferredName()), bestJob)); + DateHistoGroupConfig.TIME_ZONE.getPreferredName()), timezone)); rolledDateHisto.offset(source.offset()); if (source.extendedBounds() != null) { @@ -299,43 +255,17 @@ public class RollupRequestTranslator { * Notably, it adds a Sum metric to calculate the doc_count in each bucket. * * Conventions are identical to a date_histogram (excepting date-specific details), so see - * {@link #translateDateHistogram(DateHistogramAggregationBuilder, List, NamedWriteableRegistry, List)} for + * {@link #translateDateHistogram(DateHistogramAggregationBuilder, List, NamedWriteableRegistry)} for * a complete list of conventions, examples, etc */ private static List translateHistogram(HistogramAggregationBuilder source, List filterConditions, - NamedWriteableRegistry registry, - List jobCaps) { + NamedWriteableRegistry registry) { - return translateVSAggBuilder(source, filterConditions, registry, jobCaps, () -> { + return translateVSAggBuilder(source, filterConditions, registry, () -> { HistogramAggregationBuilder rolledHisto = new HistogramAggregationBuilder(source.getName()); - long bestInterval = Long.MAX_VALUE; - String bestJob = null; - for (RollupJobCaps cap : jobCaps) { - RollupJobCaps.RollupFieldCaps fieldCaps = cap.getFieldCaps().get(source.field()); - if (fieldCaps != null) { - for (Map agg : fieldCaps.getAggs()) { - if (agg.get("agg").equals(HistogramAggregationBuilder.NAME)) { - if ((long)agg.get(RollupField.INTERVAL) < bestInterval) { - bestInterval = (long)agg.get(RollupField.INTERVAL); - bestJob = cap.getJobID(); - } - break; - } - } - } - } - - if (bestInterval == Long.MAX_VALUE || bestInterval > source.interval()) { - throw new IllegalArgumentException("Could not find a rolled histogram configuration that satisfies the interval [" - + source.interval() + "]"); - } - - filterConditions.add(new TermQueryBuilder(RollupField.formatFieldName(source, RollupField.INTERVAL), bestInterval)); - filterConditions.add(new TermQueryBuilder(RollupField.formatMetaField(RollupField.ID.getPreferredName()), bestJob)); - rolledHisto.interval(source.interval()); rolledHisto.offset(source.offset()); if (Double.isFinite(source.minBound()) && Double.isFinite(source.maxBound())) { @@ -414,10 +344,9 @@ public class RollupRequestTranslator { */ private static List translateTerms(TermsAggregationBuilder source, List filterConditions, - NamedWriteableRegistry registry, - List jobCaps) { + NamedWriteableRegistry registry) { - return translateVSAggBuilder(source, filterConditions, registry, jobCaps, () -> { + return translateVSAggBuilder(source, filterConditions, registry, () -> { TermsAggregationBuilder rolledTerms = new TermsAggregationBuilder(source.getName(), source.valueType()); rolledTerms.field(RollupField.formatFieldName(source, RollupField.VALUE)); @@ -435,7 +364,7 @@ public class RollupRequestTranslator { rolledTerms.shardSize(source.shardSize()); } rolledTerms.showTermDocCountError(source.showTermDocCountError()); - //rolledTerms.size(termsAgg.size()); // TODO fix in core + rolledTerms.size(source.size()); return rolledTerms; }); } @@ -458,7 +387,7 @@ public class RollupRequestTranslator { */ private static List translateVSAggBuilder(ValuesSourceAggregationBuilder source, List filterConditions, - NamedWriteableRegistry registry, List jobCaps, Supplier factory) { + NamedWriteableRegistry registry, Supplier factory) { T rolled = factory.get(); @@ -470,7 +399,7 @@ public class RollupRequestTranslator { // Translate all subaggs and add to the newly translated agg // NOTE: using for loop instead of stream because compiler explodes with a bug :/ for (AggregationBuilder subAgg : source.getSubAggregations()) { - List translated = translateAggregation(subAgg, filterConditions, registry, jobCaps); + List translated = translateAggregation(subAgg, filterConditions, registry); for (AggregationBuilder t : translated) { rolled.subAggregation(t); } @@ -505,7 +434,8 @@ public class RollupRequestTranslator { * * However, for `avg` metrics (and potentially others in the future), the agg is translated into * a sum + sum aggs; one for count and one for sum. When unrolling these will be combined back into - * a single avg. E.g. for an `avg` agg: + * a single avg. Note that we also have to rename the avg agg name to distinguish it from empty + * buckets. E.g. for an `avg` agg: * *
{@code
      * {
@@ -518,7 +448,7 @@ public class RollupRequestTranslator {
      * 
{@code
      * [
      *   {
-     *    "the_avg": {
+     *    "the_avg.value": {
      *      "sum" : { "field" : "some_field.avg.value" }}
      *   },
      *   {
@@ -545,6 +475,7 @@ public class RollupRequestTranslator {
      * IF the agg is an AvgAgg, the following additional conventions are added:
      *