diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index df674ea898e..b9e41b87932 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -85,8 +85,10 @@ import org.elasticsearch.search.aggregations.bucket.geogrid.GeoGridAggregationBu import org.elasticsearch.search.aggregations.bucket.geogrid.ParsedGeoHashGrid; import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.global.ParsedGlobal; +import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.ParsedAutoDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.ParsedHistogram; import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder; @@ -1004,6 +1006,7 @@ public class RestHighLevelClient implements Closeable { map.put(GeoCentroidAggregationBuilder.NAME, (p, c) -> ParsedGeoCentroid.fromXContent(p, (String) c)); map.put(HistogramAggregationBuilder.NAME, (p, c) -> ParsedHistogram.fromXContent(p, (String) c)); map.put(DateHistogramAggregationBuilder.NAME, (p, c) -> ParsedDateHistogram.fromXContent(p, (String) c)); + map.put(AutoDateHistogramAggregationBuilder.NAME, (p, c) -> ParsedAutoDateHistogram.fromXContent(p, (String) c)); map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c)); map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c)); map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c)); diff --git a/docs/reference/aggregations/bucket.asciidoc b/docs/reference/aggregations/bucket.asciidoc index 1233e0d9b73..ddb55e8d34c 100644 --- a/docs/reference/aggregations/bucket.asciidoc +++ b/docs/reference/aggregations/bucket.asciidoc @@ -19,6 +19,8 @@ the limit will fail with an exception. include::bucket/adjacency-matrix-aggregation.asciidoc[] +include::bucket/autodatehistogram-aggregation.asciidoc[] + include::bucket/children-aggregation.asciidoc[] include::bucket/composite-aggregation.asciidoc[] diff --git a/docs/reference/aggregations/bucket/autodatehistogram-aggregation.asciidoc b/docs/reference/aggregations/bucket/autodatehistogram-aggregation.asciidoc new file mode 100644 index 00000000000..28cb65ce6cc --- /dev/null +++ b/docs/reference/aggregations/bucket/autodatehistogram-aggregation.asciidoc @@ -0,0 +1,283 @@ +[[search-aggregations-bucket-autodatehistogram-aggregation]] +=== Auto-interval Date Histogram Aggregation + +A multi-bucket aggregation similar to the <> except +instead of providing an interval to use as the width of each bucket, a target number of buckets is provided +indicating the number of buckets needed and the interval of the buckets is automatically chosen to best achieve +that target. The number of buckets returned will always be less than or equal to this target number. + +The buckets field is optional, and will default to 10 buckets if not specified. + +Requesting a target of 10 buckets. + +[source,js] +-------------------------------------------------- +POST /sales/_search?size=0 +{ + "aggs" : { + "sales_over_time" : { + "auto_date_histogram" : { + "field" : "date", + "buckets" : 10 + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +==== Keys + +Internally, a date is represented as a 64 bit number representing a timestamp +in milliseconds-since-the-epoch. These timestamps are returned as the bucket +++key++s. The `key_as_string` is the same timestamp converted to a formatted +date string using the format specified with the `format` parameter: + +TIP: If no `format` is specified, then it will use the first date +<> specified in the field mapping. + +[source,js] +-------------------------------------------------- +POST /sales/_search?size=0 +{ + "aggs" : { + "sales_over_time" : { + "auto_date_histogram" : { + "field" : "date", + "buckets" : 5, + "format" : "yyyy-MM-dd" <1> + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +<1> Supports expressive date <> + +Response: + +[source,js] +-------------------------------------------------- +{ + ... + "aggregations": { + "sales_over_time": { + "buckets": [ + { + "key_as_string": "2015-01-01", + "key": 1420070400000, + "doc_count": 3 + }, + { + "key_as_string": "2015-02-01", + "key": 1422748800000, + "doc_count": 2 + }, + { + "key_as_string": "2015-03-01", + "key": 1425168000000, + "doc_count": 2 + } + ] + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/] + +=== Intervals + +The interval of the returned buckets is selected based on the data collected by the +aggregation so that the number of buckets returned is less than or equal to the number +requested. The possible intervals returned are: + +[horizontal] +seconds:: In multiples of 1, 5, 10 and 30 +minutes:: In multiples of 1, 5, 10 and 30 +hours:: In multiples of 1, 3 and 12 +days:: In multiples of 1, and 7 +months:: In multiples of 1, and 3 +years:: In multiples of 1, 5, 10, 20, 50 and 100 + +In the worst case, where the number of daily buckets are too many for the requested +number of buckets, the number of buckets returned will be 1/7th of the number of +buckets requested. + +==== Time Zone + +Date-times are stored in Elasticsearch in UTC. By default, all bucketing and +rounding is also done in UTC. The `time_zone` parameter can be used to indicate +that bucketing should use a different time zone. + +Time zones may either be specified as an ISO 8601 UTC offset (e.g. `+01:00` or +`-08:00`) or as a timezone id, an identifier used in the TZ database like +`America/Los_Angeles`. + +Consider the following example: + +[source,js] +--------------------------------- +PUT my_index/log/1?refresh +{ + "date": "2015-10-01T00:30:00Z" +} + +PUT my_index/log/2?refresh +{ + "date": "2015-10-01T01:30:00Z" +} + +PUT my_index/log/3?refresh +{ + "date": "2015-10-01T02:30:00Z" +} + +GET my_index/_search?size=0 +{ + "aggs": { + "by_day": { + "auto_date_histogram": { + "field": "date", + "buckets" : 3 + } + } + } +} +--------------------------------- +// CONSOLE + +UTC is used if no time zone is specified, three 1-hour buckets are returned +starting at midnight UTC on 1 October 2015: + +[source,js] +--------------------------------- +{ + ... + "aggregations": { + "by_day": { + "buckets": [ + { + "key_as_string": "2015-10-01T00:00:00.000Z", + "key": 1443657600000, + "doc_count": 1 + }, + { + "key_as_string": "2015-10-01T01:00:00.000Z", + "key": 1443661200000, + "doc_count": 1 + }, + { + "key_as_string": "2015-10-01T02:00:00.000Z", + "key": 1443664800000, + "doc_count": 1 + } + ] + } + } +} +--------------------------------- +// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/] + +If a `time_zone` of `-01:00` is specified, then midnight starts at one hour before +midnight UTC: + +[source,js] +--------------------------------- +GET my_index/_search?size=0 +{ + "aggs": { + "by_day": { + "auto_date_histogram": { + "field": "date", + "buckets" : 3, + "time_zone": "-01:00" + } + } + } +} +--------------------------------- +// CONSOLE +// TEST[continued] + + +Now three 1-hour buckets are still returned but the first bucket starts at +11:00pm on 30 September 2015 since that is the local time for the bucket in +the specified time zone. + +[source,js] +--------------------------------- +{ + ... + "aggregations": { + "by_day": { + "buckets": [ + { + "key_as_string": "2015-09-30T23:00:00.000-01:00", + "key": 1443657600000, + "doc_count": 1 + }, + { + "key_as_string": "2015-10-01T00:00:00.000-01:00", + "key": 1443661200000, + "doc_count": 1 + }, + { + "key_as_string": "2015-10-01T01:00:00.000-01:00", + "key": 1443664800000, + "doc_count": 1 + } + ] + } + } +} +--------------------------------- +// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/] + +<1> The `key_as_string` value represents midnight on each day + in the specified time zone. + +WARNING: When using time zones that follow DST (daylight savings time) changes, +buckets close to the moment when those changes happen can have slightly different +sizes than neighbouring buckets. +For example, consider a DST start in the `CET` time zone: on 27 March 2016 at 2am, +clocks were turned forward 1 hour to 3am local time. If the result of the aggregation +was daily buckets, the bucket covering that day will only hold data for 23 hours +instead of the usual 24 hours for other buckets. The same is true for shorter intervals +like e.g. 12h. Here, we will have only a 11h bucket on the morning of 27 March when the +DST shift happens. + +==== Scripts + +Like with the normal <>, both document level +scripts and value level scripts are supported. This aggregation does not however, support the `min_doc_count`, +`extended_bounds` and `order` parameters. + +==== Missing value + +The `missing` parameter defines how documents that are missing a value should be treated. +By default they will be ignored but it is also possible to treat them as if they +had a value. + +[source,js] +-------------------------------------------------- +POST /sales/_search?size=0 +{ + "aggs" : { + "sale_date" : { + "auto_date_histogram" : { + "field" : "date", + "buckets": 10, + "missing": "2000/01/01" <1> + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +<1> Documents without a value in the `publish_date` field will fall into the same bucket as documents that have the value `2000-01-01`. + diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java index 199d2278bf7..efef1aeb04f 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java @@ -109,8 +109,10 @@ import org.elasticsearch.search.aggregations.bucket.geogrid.GeoGridAggregationBu import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid; import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal; +import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing; @@ -395,6 +397,8 @@ public class SearchModule { HistogramAggregationBuilder::parse).addResultReader(InternalHistogram::new)); registerAggregation(new AggregationSpec(DateHistogramAggregationBuilder.NAME, DateHistogramAggregationBuilder::new, DateHistogramAggregationBuilder::parse).addResultReader(InternalDateHistogram::new)); + registerAggregation(new AggregationSpec(AutoDateHistogramAggregationBuilder.NAME, AutoDateHistogramAggregationBuilder::new, + AutoDateHistogramAggregationBuilder::parse).addResultReader(InternalAutoDateHistogram::new)); registerAggregation(new AggregationSpec(GeoDistanceAggregationBuilder.NAME, GeoDistanceAggregationBuilder::new, GeoDistanceAggregationBuilder::parse).addResultReader(InternalGeoDistance::new)); registerAggregation(new AggregationSpec(GeoGridAggregationBuilder.NAME, GeoGridAggregationBuilder::new, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 504758e7a4e..7b09ac9d618 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -84,6 +84,19 @@ public abstract class BucketsAggregator extends AggregatorBase { subCollector.collect(doc, bucketOrd); } + public final void mergeBuckets(long[] mergeMap, long newNumBuckets) { + try (IntArray oldDocCounts = docCounts) { + docCounts = bigArrays.newIntArray(newNumBuckets, true); + docCounts.fill(0, newNumBuckets, 0); + for (int i = 0; i < oldDocCounts.size(); i++) { + int docCount = oldDocCounts.get(i); + if (docCount != 0) { + docCounts.increment(mergeMap[i], docCount); + } + } + } + } + public IntArray getDocCounts() { return docCounts; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java new file mode 100644 index 00000000000..f357e9d286f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java @@ -0,0 +1,236 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.packed.PackedInts; +import org.apache.lucene.util.packed.PackedLongValues; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.BucketCollector; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * A specialization of {@link DeferringBucketCollector} that collects all + * matches and then is able to replay a given subset of buckets. Exposes + * mergeBuckets, which can be invoked by the aggregator when increasing the + * rounding interval. + */ +public class MergingBucketsDeferringCollector extends DeferringBucketCollector { + + List entries = new ArrayList<>(); + BucketCollector collector; + final SearchContext searchContext; + LeafReaderContext context; + PackedLongValues.Builder docDeltas; + PackedLongValues.Builder buckets; + long maxBucket = -1; + boolean finished = false; + LongHash selectedBuckets; + + public MergingBucketsDeferringCollector(SearchContext context) { + this.searchContext = context; + } + + @Override + public void setDeferredCollector(Iterable deferredCollectors) { + this.collector = BucketCollector.wrap(deferredCollectors); + } + + @Override + public boolean needsScores() { + if (collector == null) { + throw new IllegalStateException(); + } + return collector.needsScores(); + } + + @Override + public void preCollection() throws IOException { + collector.preCollection(); + } + + private void finishLeaf() { + if (context != null) { + entries.add(new Entry(context, docDeltas.build(), buckets.build())); + } + context = null; + docDeltas = null; + buckets = null; + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { + finishLeaf(); + + context = ctx; + docDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT); + buckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT); + + return new LeafBucketCollector() { + int lastDoc = 0; + + @Override + public void collect(int doc, long bucket) { + docDeltas.add(doc - lastDoc); + buckets.add(bucket); + lastDoc = doc; + maxBucket = Math.max(maxBucket, bucket); + } + }; + } + + public void mergeBuckets(long[] mergeMap) { + + List newEntries = new ArrayList<>(entries.size()); + for (Entry sourceEntry : entries) { + PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT); + for (PackedLongValues.Iterator itr = sourceEntry.buckets.iterator(); itr.hasNext();) { + long bucket = itr.next(); + newBuckets.add(mergeMap[Math.toIntExact(bucket)]); + } + newEntries.add(new Entry(sourceEntry.context, sourceEntry.docDeltas, newBuckets.build())); + } + entries = newEntries; + + // if there are buckets that have been collected in the current segment + // we need to update the bucket ordinals there too + if (buckets.size() > 0) { + PackedLongValues currentBuckets = buckets.build(); + PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT); + for (PackedLongValues.Iterator itr = currentBuckets.iterator(); itr.hasNext();) { + long bucket = itr.next(); + newBuckets.add(mergeMap[Math.toIntExact(bucket)]); + } + buckets = newBuckets; + } + } + + @Override + public void postCollection() { + finishLeaf(); + finished = true; + } + + /** + * Replay the wrapped collector, but only on a selection of buckets. + */ + @Override + public void prepareSelectedBuckets(long... selectedBuckets) throws IOException { + if (finished == false) { + throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called"); + } + if (this.selectedBuckets != null) { + throw new IllegalStateException("Already been replayed"); + } + + final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE); + for (long bucket : selectedBuckets) { + hash.add(bucket); + } + this.selectedBuckets = hash; + + boolean needsScores = collector.needsScores(); + Weight weight = null; + if (needsScores) { + weight = searchContext.searcher().createNormalizedWeight(searchContext.query(), true); + } + for (Entry entry : entries) { + final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context); + DocIdSetIterator docIt = null; + if (needsScores && entry.docDeltas.size() > 0) { + Scorer scorer = weight.scorer(entry.context); + // We don't need to check if the scorer is null + // since we are sure that there are documents to replay + // (entry.docDeltas it not empty). + docIt = scorer.iterator(); + leafCollector.setScorer(scorer); + } + final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator(); + final PackedLongValues.Iterator buckets = entry.buckets.iterator(); + int doc = 0; + for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) { + doc += docDeltaIterator.next(); + final long bucket = buckets.next(); + final long rebasedBucket = hash.find(bucket); + if (rebasedBucket != -1) { + if (needsScores) { + if (docIt.docID() < doc) { + docIt.advance(doc); + } + // aggregations should only be replayed on matching + // documents + assert docIt.docID() == doc; + } + leafCollector.collect(doc, rebasedBucket); + } + } + } + + collector.postCollection(); + } + + /** + * Wrap the provided aggregator so that it behaves (almost) as if it had + * been collected directly. + */ + @Override + public Aggregator wrap(final Aggregator in) { + + return new WrappedAggregator(in) { + + @Override + public InternalAggregation buildAggregation(long bucket) throws IOException { + if (selectedBuckets == null) { + throw new IllegalStateException("Collection has not been replayed yet."); + } + final long rebasedBucket = selectedBuckets.find(bucket); + if (rebasedBucket == -1) { + throw new IllegalStateException("Cannot build for a bucket which has not been collected [" + bucket + "]"); + } + return in.buildAggregation(rebasedBucket); + } + + }; + } + + private static class Entry { + final LeafReaderContext context; + final PackedLongValues docDeltas; + final PackedLongValues buckets; + + Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) { + this.context = context; + this.docDeltas = docDeltas; + this.buckets = buckets; + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregationBuilder.java new file mode 100644 index 00000000000..366060835d8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregationBuilder.java @@ -0,0 +1,218 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.rounding.DateTimeUnit; +import org.elasticsearch.common.rounding.Rounding; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; +import org.elasticsearch.search.aggregations.support.ValueType; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; + +public class AutoDateHistogramAggregationBuilder + extends ValuesSourceAggregationBuilder { + + public static final String NAME = "auto_date_histogram"; + + public static final ParseField NUM_BUCKETS_FIELD = new ParseField("buckets"); + + private static final ObjectParser PARSER; + static { + PARSER = new ObjectParser<>(AutoDateHistogramAggregationBuilder.NAME); + ValuesSourceParserHelper.declareNumericFields(PARSER, true, true, true); + + PARSER.declareInt(AutoDateHistogramAggregationBuilder::setNumBuckets, NUM_BUCKETS_FIELD); + } + + public static AutoDateHistogramAggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException { + return PARSER.parse(parser, new AutoDateHistogramAggregationBuilder(aggregationName), null); + } + + private int numBuckets = 10; + + /** Create a new builder with the given name. */ + public AutoDateHistogramAggregationBuilder(String name) { + super(name, ValuesSourceType.NUMERIC, ValueType.DATE); + } + + /** Read from a stream, for internal use only. */ + public AutoDateHistogramAggregationBuilder(StreamInput in) throws IOException { + super(in, ValuesSourceType.NUMERIC, ValueType.DATE); + numBuckets = in.readVInt(); + } + + protected AutoDateHistogramAggregationBuilder(AutoDateHistogramAggregationBuilder clone, Builder factoriesBuilder, + Map metaData) { + super(clone, factoriesBuilder, metaData); + this.numBuckets = clone.numBuckets; + } + + @Override + protected AggregationBuilder shallowCopy(Builder factoriesBuilder, Map metaData) { + return new AutoDateHistogramAggregationBuilder(this, factoriesBuilder, metaData); + } + + @Override + protected void innerWriteTo(StreamOutput out) throws IOException { + out.writeVInt(numBuckets); + } + + @Override + public String getType() { + return NAME; + } + + public AutoDateHistogramAggregationBuilder setNumBuckets(int numBuckets) { + if (numBuckets <= 0) { + throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must be greater than 0 for [" + name + "]"); + } + this.numBuckets = numBuckets; + return this; + } + + public int getNumBuckets() { + return numBuckets; + } + + @Override + protected ValuesSourceAggregatorFactory innerBuild(SearchContext context, ValuesSourceConfig config, + AggregatorFactory parent, Builder subFactoriesBuilder) throws IOException { + RoundingInfo[] roundings = new RoundingInfo[6]; + roundings[0] = new RoundingInfo(createRounding(DateTimeUnit.SECOND_OF_MINUTE), 1000L, 1, 5, 10, 30); + roundings[1] = new RoundingInfo(createRounding(DateTimeUnit.MINUTES_OF_HOUR), 60 * 1000L, 1, 5, 10, 30); + roundings[2] = new RoundingInfo(createRounding(DateTimeUnit.HOUR_OF_DAY), 60 * 60 * 1000L, 1, 3, 12); + roundings[3] = new RoundingInfo(createRounding(DateTimeUnit.DAY_OF_MONTH), 24 * 60 * 60 * 1000L, 1, 7); + roundings[4] = new RoundingInfo(createRounding(DateTimeUnit.MONTH_OF_YEAR), 30 * 24 * 60 * 60 * 1000L, 1, 3); + roundings[5] = new RoundingInfo(createRounding(DateTimeUnit.YEAR_OF_CENTURY), 365 * 24 * 60 * 60 * 1000L, 1, 5, 10, 20, 50, 100); + + int maxRoundingInterval = Arrays.stream(roundings,0, roundings.length-1) + .map(rounding -> rounding.innerIntervals) + .flatMapToInt(Arrays::stream) + .boxed() + .reduce(Integer::max).get(); + Settings settings = context.getQueryShardContext().getIndexSettings().getNodeSettings(); + int maxBuckets = MultiBucketConsumerService.MAX_BUCKET_SETTING.get(settings); + int bucketCeiling = maxBuckets / maxRoundingInterval; + if (numBuckets > bucketCeiling) { + throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName()+ + " must be less than " + bucketCeiling); + } + return new AutoDateHistogramAggregatorFactory(name, config, numBuckets, roundings, context, parent, subFactoriesBuilder, metaData); + } + + private Rounding createRounding(DateTimeUnit interval) { + Rounding.Builder tzRoundingBuilder = Rounding.builder(interval); + if (timeZone() != null) { + tzRoundingBuilder.timeZone(timeZone()); + } + Rounding rounding = tzRoundingBuilder.build(); + return rounding; + } + + @Override + protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.field(NUM_BUCKETS_FIELD.getPreferredName(), numBuckets); + return builder; + } + + @Override + protected int innerHashCode() { + return Objects.hash(numBuckets); + } + + @Override + protected boolean innerEquals(Object obj) { + AutoDateHistogramAggregationBuilder other = (AutoDateHistogramAggregationBuilder) obj; + return Objects.equals(numBuckets, other.numBuckets); + } + + public static class RoundingInfo implements Writeable { + final Rounding rounding; + final int[] innerIntervals; + final long roughEstimateDurationMillis; + + public RoundingInfo(Rounding rounding, long roughEstimateDurationMillis, int... innerIntervals) { + this.rounding = rounding; + this.roughEstimateDurationMillis = roughEstimateDurationMillis; + this.innerIntervals = innerIntervals; + } + + public RoundingInfo(StreamInput in) throws IOException { + rounding = Rounding.Streams.read(in); + roughEstimateDurationMillis = in.readVLong(); + innerIntervals = in.readIntArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + Rounding.Streams.write(rounding, out); + out.writeVLong(roughEstimateDurationMillis); + out.writeIntArray(innerIntervals); + } + + public int getMaximumInnerInterval() { + return innerIntervals[innerIntervals.length - 1]; + } + + public long getRoughEstimateDurationMillis() { + return roughEstimateDurationMillis; + } + + @Override + public int hashCode() { + return Objects.hash(rounding, Arrays.hashCode(innerIntervals)); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + RoundingInfo other = (RoundingInfo) obj; + return Objects.equals(rounding, other.rounding) && + Objects.deepEquals(innerIntervals, other.innerIntervals); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java new file mode 100644 index 00000000000..f86145386f1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -0,0 +1,199 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.util.CollectionUtil; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.rounding.Rounding; +import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.BucketOrder; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator; +import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; +import org.elasticsearch.search.aggregations.bucket.MergingBucketsDeferringCollector; +import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * An aggregator for date values. Every date is rounded down using a configured + * {@link Rounding}. + * + * @see Rounding + */ +class AutoDateHistogramAggregator extends DeferableBucketAggregator { + + private final ValuesSource.Numeric valuesSource; + private final DocValueFormat formatter; + private final RoundingInfo[] roundingInfos; + private int roundingIdx = 0; + + private LongHash bucketOrds; + private int targetBuckets; + private MergingBucketsDeferringCollector deferringCollector; + + AutoDateHistogramAggregator(String name, AggregatorFactories factories, int numBuckets, RoundingInfo[] roundingInfos, + @Nullable ValuesSource.Numeric valuesSource, DocValueFormat formatter, SearchContext aggregationContext, Aggregator parent, + List pipelineAggregators, Map metaData) throws IOException { + + super(name, factories, aggregationContext, parent, pipelineAggregators, metaData); + this.targetBuckets = numBuckets; + this.valuesSource = valuesSource; + this.formatter = formatter; + this.roundingInfos = roundingInfos; + + bucketOrds = new LongHash(1, aggregationContext.bigArrays()); + + } + + @Override + public boolean needsScores() { + return (valuesSource != null && valuesSource.needsScores()) || super.needsScores(); + } + + @Override + protected boolean shouldDefer(Aggregator aggregator) { + return true; + } + + @Override + public DeferringBucketCollector getDeferringCollector() { + deferringCollector = new MergingBucketsDeferringCollector(context); + return deferringCollector; + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, + final LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + final SortedNumericDocValues values = valuesSource.longValues(ctx); + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + assert bucket == 0; + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + + long previousRounded = Long.MIN_VALUE; + for (int i = 0; i < valuesCount; ++i) { + long value = values.nextValue(); + long rounded = roundingInfos[roundingIdx].rounding.round(value); + assert rounded >= previousRounded; + if (rounded == previousRounded) { + continue; + } + long bucketOrd = bucketOrds.add(rounded); + if (bucketOrd < 0) { // already seen + bucketOrd = -1 - bucketOrd; + collectExistingBucket(sub, doc, bucketOrd); + } else { + collectBucket(sub, doc, bucketOrd); + while (roundingIdx < roundingInfos.length - 1 + && bucketOrds.size() > (targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval())) { + increaseRounding(); + } + } + previousRounded = rounded; + } + } + } + + private void increaseRounding() { + try (LongHash oldBucketOrds = bucketOrds) { + LongHash newBucketOrds = new LongHash(1, context.bigArrays()); + long[] mergeMap = new long[(int) oldBucketOrds.size()]; + Rounding newRounding = roundingInfos[++roundingIdx].rounding; + for (int i = 0; i < oldBucketOrds.size(); i++) { + long oldKey = oldBucketOrds.get(i); + long newKey = newRounding.round(oldKey); + long newBucketOrd = newBucketOrds.add(newKey); + if (newBucketOrd >= 0) { + mergeMap[i] = newBucketOrd; + } else { + mergeMap[i] = -1 - newBucketOrd; + } + } + mergeBuckets(mergeMap, newBucketOrds.size()); + if (deferringCollector != null) { + deferringCollector.mergeBuckets(mergeMap); + } + bucketOrds = newBucketOrds; + } + } + }; + } + + @Override + public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException { + assert owningBucketOrdinal == 0; + consumeBucketsAndMaybeBreak((int) bucketOrds.size()); + + long[] bucketOrdArray = new long[(int) bucketOrds.size()]; + for (int i = 0; i < bucketOrds.size(); i++) { + bucketOrdArray[i] = i; + } + + runDeferredCollections(bucketOrdArray); + + List buckets = new ArrayList<>((int) bucketOrds.size()); + for (long i = 0; i < bucketOrds.size(); i++) { + buckets.add(new InternalAutoDateHistogram.Bucket(bucketOrds.get(i), bucketDocCount(i), formatter, bucketAggregations(i))); + } + + // the contract of the histogram aggregation is that shards must return + // buckets ordered by key in ascending order + CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this)); + + // value source will be null for unmapped fields + InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundingInfos, roundingIdx, + buildEmptySubAggregations()); + + return new InternalAutoDateHistogram(name, buckets, targetBuckets, emptyBucketInfo, formatter, pipelineAggregators(), metaData()); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundingInfos, roundingIdx, + buildEmptySubAggregations()); + return new InternalAutoDateHistogram(name, Collections.emptyList(), targetBuckets, emptyBucketInfo, formatter, + pipelineAggregators(), metaData()); + } + + @Override + public void doClose() { + Releasables.close(bucketOrds); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorFactory.java new file mode 100644 index 00000000000..051f2f9f6e7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public final class AutoDateHistogramAggregatorFactory + extends ValuesSourceAggregatorFactory { + + private final int numBuckets; + private RoundingInfo[] roundingInfos; + + public AutoDateHistogramAggregatorFactory(String name, ValuesSourceConfig config, int numBuckets, RoundingInfo[] roundingInfos, + SearchContext context, AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder, + Map metaData) throws IOException { + super(name, config, context, parent, subFactoriesBuilder, metaData); + this.numBuckets = numBuckets; + this.roundingInfos = roundingInfos; + } + + @Override + protected Aggregator doCreateInternal(ValuesSource.Numeric valuesSource, Aggregator parent, boolean collectsFromSingleBucket, + List pipelineAggregators, Map metaData) throws IOException { + if (collectsFromSingleBucket == false) { + return asMultiBucketAggregator(this, context, parent); + } + return createAggregator(valuesSource, parent, pipelineAggregators, metaData); + } + + private Aggregator createAggregator(ValuesSource.Numeric valuesSource, Aggregator parent, List pipelineAggregators, + Map metaData) throws IOException { + return new AutoDateHistogramAggregator(name, factories, numBuckets, roundingInfos, valuesSource, config.format(), context, parent, + pipelineAggregators, + metaData); + } + + @Override + protected Aggregator createUnmapped(Aggregator parent, List pipelineAggregators, Map metaData) + throws IOException { + return createAggregator(null, parent, pipelineAggregators, metaData); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index e0b64d2cd5b..8b1f0c46421 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -28,13 +28,13 @@ import org.elasticsearch.common.util.LongHash; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.internal.SearchContext; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java new file mode 100644 index 00000000000..27c195cbdae --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java @@ -0,0 +1,601 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.rounding.Rounding; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.KeyComparable; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Objects; + +/** + * Implementation of {@link Histogram}. + */ +public final class InternalAutoDateHistogram extends + InternalMultiBucketAggregation implements Histogram, HistogramFactory { + + public static class Bucket extends InternalMultiBucketAggregation.InternalBucket implements Histogram.Bucket, KeyComparable { + + final long key; + final long docCount; + final InternalAggregations aggregations; + protected final transient DocValueFormat format; + + public Bucket(long key, long docCount, DocValueFormat format, + InternalAggregations aggregations) { + this.format = format; + this.key = key; + this.docCount = docCount; + this.aggregations = aggregations; + } + + /** + * Read from a stream. + */ + public Bucket(StreamInput in, DocValueFormat format) throws IOException { + this.format = format; + key = in.readLong(); + docCount = in.readVLong(); + aggregations = InternalAggregations.readAggregations(in); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != InternalAutoDateHistogram.Bucket.class) { + return false; + } + InternalAutoDateHistogram.Bucket that = (InternalAutoDateHistogram.Bucket) obj; + // No need to take the keyed and format parameters into account, + // they are already stored and tested on the InternalDateHistogram object + return key == that.key + && docCount == that.docCount + && Objects.equals(aggregations, that.aggregations); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), key, docCount, aggregations); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(key); + out.writeVLong(docCount); + aggregations.writeTo(out); + } + + @Override + public String getKeyAsString() { + return format.format(key).toString(); + } + + @Override + public Object getKey() { + return new DateTime(key, DateTimeZone.UTC); + } + + @Override + public long getDocCount() { + return docCount; + } + + @Override + public Aggregations getAggregations() { + return aggregations; + } + + Bucket reduce(List buckets, Rounding rounding, ReduceContext context) { + List aggregations = new ArrayList<>(buckets.size()); + long docCount = 0; + for (Bucket bucket : buckets) { + docCount += bucket.docCount; + aggregations.add((InternalAggregations) bucket.getAggregations()); + } + InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); + return new InternalAutoDateHistogram.Bucket(rounding.round(key), docCount, format, aggs); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + String keyAsString = format.format(key).toString(); + builder.startObject(); + if (format != DocValueFormat.RAW) { + builder.field(CommonFields.KEY_AS_STRING.getPreferredName(), keyAsString); + } + builder.field(CommonFields.KEY.getPreferredName(), key); + builder.field(CommonFields.DOC_COUNT.getPreferredName(), docCount); + aggregations.toXContentInternal(builder, params); + builder.endObject(); + return builder; + } + + @Override + public int compareKey(Bucket other) { + return Long.compare(key, other.key); + } + + public DocValueFormat getFormatter() { + return format; + } + } + + static class BucketInfo { + + final RoundingInfo[] roundingInfos; + final int roundingIdx; + final InternalAggregations emptySubAggregations; + + BucketInfo(RoundingInfo[] roundings, int roundingIdx, InternalAggregations subAggregations) { + this.roundingInfos = roundings; + this.roundingIdx = roundingIdx; + this.emptySubAggregations = subAggregations; + } + + BucketInfo(StreamInput in) throws IOException { + int size = in.readVInt(); + roundingInfos = new RoundingInfo[size]; + for (int i = 0; i < size; i++) { + roundingInfos[i] = new RoundingInfo(in); + } + roundingIdx = in.readVInt(); + emptySubAggregations = InternalAggregations.readAggregations(in); + } + + void writeTo(StreamOutput out) throws IOException { + out.writeVInt(roundingInfos.length); + for (RoundingInfo roundingInfo : roundingInfos) { + roundingInfo.writeTo(out); + } + out.writeVInt(roundingIdx); + emptySubAggregations.writeTo(out); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BucketInfo that = (BucketInfo) obj; + return Objects.deepEquals(roundingInfos, that.roundingInfos) + && Objects.equals(roundingIdx, that.roundingIdx) + && Objects.equals(emptySubAggregations, that.emptySubAggregations); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), Arrays.hashCode(roundingInfos), roundingIdx, emptySubAggregations); + } + } + + private final List buckets; + private final DocValueFormat format; + private final BucketInfo bucketInfo; + private final int targetBuckets; + + + InternalAutoDateHistogram(String name, List buckets, int targetBuckets, BucketInfo emptyBucketInfo, DocValueFormat formatter, + List pipelineAggregators, Map metaData) { + super(name, pipelineAggregators, metaData); + this.buckets = buckets; + this.bucketInfo = emptyBucketInfo; + this.format = formatter; + this.targetBuckets = targetBuckets; + } + + /** + * Stream from a stream. + */ + public InternalAutoDateHistogram(StreamInput in) throws IOException { + super(in); + bucketInfo = new BucketInfo(in); + format = in.readNamedWriteable(DocValueFormat.class); + buckets = in.readList(stream -> new Bucket(stream, format)); + this.targetBuckets = in.readVInt(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + bucketInfo.writeTo(out); + out.writeNamedWriteable(format); + out.writeList(buckets); + out.writeVInt(targetBuckets); + } + + @Override + public String getWriteableName() { + return AutoDateHistogramAggregationBuilder.NAME; + } + + @Override + public List getBuckets() { + return Collections.unmodifiableList(buckets); + } + + DocValueFormat getFormatter() { + return format; + } + + public int getTargetBuckets() { + return targetBuckets; + } + + public BucketInfo getBucketInfo() { + return bucketInfo; + } + + @Override + public InternalAutoDateHistogram create(List buckets) { + return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators(), metaData); + } + + @Override + public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) { + return new Bucket(prototype.key, prototype.docCount, prototype.format, aggregations); + } + + private static class IteratorAndCurrent { + + private final Iterator iterator; + private Bucket current; + + IteratorAndCurrent(Iterator iterator) { + this.iterator = iterator; + current = iterator.next(); + } + + } + + /** + * This method works almost exactly the same as + * InternalDateHistogram#reduceBuckets(List, ReduceContext), the different + * here is that we need to round all the keys we see using the highest level + * rounding returned across all the shards so the resolution of the buckets + * is the same and they can be reduced together. + */ + private BucketReduceResult reduceBuckets(List aggregations, ReduceContext reduceContext) { + + // First we need to find the highest level rounding used across all the + // shards + int reduceRoundingIdx = 0; + for (InternalAggregation aggregation : aggregations) { + int aggRoundingIdx = ((InternalAutoDateHistogram) aggregation).bucketInfo.roundingIdx; + if (aggRoundingIdx > reduceRoundingIdx) { + reduceRoundingIdx = aggRoundingIdx; + } + } + // This rounding will be used to reduce all the buckets + RoundingInfo reduceRoundingInfo = bucketInfo.roundingInfos[reduceRoundingIdx]; + Rounding reduceRounding = reduceRoundingInfo.rounding; + + final PriorityQueue pq = new PriorityQueue(aggregations.size()) { + @Override + protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { + return a.current.key < b.current.key; + } + }; + for (InternalAggregation aggregation : aggregations) { + InternalAutoDateHistogram histogram = (InternalAutoDateHistogram) aggregation; + if (histogram.buckets.isEmpty() == false) { + pq.add(new IteratorAndCurrent(histogram.buckets.iterator())); + } + } + + List reducedBuckets = new ArrayList<>(); + if (pq.size() > 0) { + // list of buckets coming from different shards that have the same key + List currentBuckets = new ArrayList<>(); + double key = reduceRounding.round(pq.top().current.key); + + do { + final IteratorAndCurrent top = pq.top(); + + if (reduceRounding.round(top.current.key) != key) { + // the key changes, reduce what we already buffered and reset the buffer for current buckets + final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceRounding, reduceContext); + reduceContext.consumeBucketsAndMaybeBreak(1); + reducedBuckets.add(reduced); + currentBuckets.clear(); + key = reduceRounding.round(top.current.key); + } + + currentBuckets.add(top.current); + + if (top.iterator.hasNext()) { + final Bucket next = top.iterator.next(); + assert next.key > top.current.key : "shards must return data sorted by key"; + top.current = next; + pq.updateTop(); + } else { + pq.pop(); + } + } while (pq.size() > 0); + + if (currentBuckets.isEmpty() == false) { + final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceRounding, reduceContext); + reduceContext.consumeBucketsAndMaybeBreak(1); + reducedBuckets.add(reduced); + } + } + + return mergeBucketsIfNeeded(reducedBuckets, reduceRoundingIdx, reduceRoundingInfo, reduceContext); + } + + private BucketReduceResult mergeBucketsIfNeeded(List reducedBuckets, int reduceRoundingIdx, RoundingInfo reduceRoundingInfo, + ReduceContext reduceContext) { + while (reducedBuckets.size() > (targetBuckets * reduceRoundingInfo.getMaximumInnerInterval()) + && reduceRoundingIdx < bucketInfo.roundingInfos.length - 1) { + reduceRoundingIdx++; + reduceRoundingInfo = bucketInfo.roundingInfos[reduceRoundingIdx]; + reducedBuckets = mergeBuckets(reducedBuckets, reduceRoundingInfo.rounding, reduceContext); + } + return new BucketReduceResult(reducedBuckets, reduceRoundingInfo, reduceRoundingIdx); + } + + private List mergeBuckets(List reducedBuckets, Rounding reduceRounding, ReduceContext reduceContext) { + List mergedBuckets = new ArrayList<>(); + + List sameKeyedBuckets = new ArrayList<>(); + double key = Double.NaN; + for (Bucket bucket : reducedBuckets) { + long roundedBucketKey = reduceRounding.round(bucket.key); + if (Double.isNaN(key)) { + key = roundedBucketKey; + reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); + sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations)); + } else if (roundedBucketKey == key) { + reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); + sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations)); + } else { + reduceContext.consumeBucketsAndMaybeBreak(1); + mergedBuckets.add(sameKeyedBuckets.get(0).reduce(sameKeyedBuckets, reduceRounding, reduceContext)); + sameKeyedBuckets.clear(); + key = roundedBucketKey; + reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); + sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations)); + } + } + if (sameKeyedBuckets.isEmpty() == false) { + reduceContext.consumeBucketsAndMaybeBreak(1); + mergedBuckets.add(sameKeyedBuckets.get(0).reduce(sameKeyedBuckets, reduceRounding, reduceContext)); + } + reducedBuckets = mergedBuckets; + return reducedBuckets; + } + + private static class BucketReduceResult { + List buckets; + RoundingInfo roundingInfo; + int roundingIdx; + + BucketReduceResult(List buckets, RoundingInfo roundingInfo, int roundingIdx) { + this.buckets = buckets; + this.roundingInfo = roundingInfo; + this.roundingIdx = roundingIdx; + + } + } + + private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, ReduceContext reduceContext) { + List list = currentResult.buckets; + if (list.isEmpty()) { + return currentResult; + } + int roundingIdx = getAppropriateRounding(list.get(0).key, list.get(list.size() - 1).key, currentResult.roundingIdx, + bucketInfo.roundingInfos); + RoundingInfo roundingInfo = bucketInfo.roundingInfos[roundingIdx]; + Rounding rounding = roundingInfo.rounding; + // merge buckets using the new rounding + list = mergeBuckets(list, rounding, reduceContext); + + Bucket lastBucket = null; + ListIterator iter = list.listIterator(); + InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(Collections.singletonList(bucketInfo.emptySubAggregations), + reduceContext); + + // Add the empty buckets within the data, + // e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6 + while (iter.hasNext()) { + Bucket nextBucket = list.get(iter.nextIndex()); + if (lastBucket != null) { + long key = rounding.nextRoundingValue(lastBucket.key); + while (key < nextBucket.key) { + reduceContext.consumeBucketsAndMaybeBreak(1); + iter.add(new InternalAutoDateHistogram.Bucket(key, 0, format, reducedEmptySubAggs)); + key = rounding.nextRoundingValue(key); + } + assert key == nextBucket.key : "key: " + key + ", nextBucket.key: " + nextBucket.key; + } + lastBucket = iter.next(); + } + return new BucketReduceResult(list, roundingInfo, roundingIdx); + } + + private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx, RoundingInfo[] roundings) { + if (roundingIdx == roundings.length - 1) { + return roundingIdx; + } + int currentRoundingIdx = roundingIdx; + + // Getting the accurate number of required buckets can be slow for large + // ranges at low roundings so get a rough estimate of the rounding first + // so we are at most 1 away from the correct rounding and then get the + // accurate rounding value + for (int i = currentRoundingIdx + 1; i < roundings.length; i++) { + long dataDuration = maxKey - minKey; + long roughEstimateRequiredBuckets = dataDuration / roundings[i].getRoughEstimateDurationMillis(); + if (roughEstimateRequiredBuckets < targetBuckets * roundings[i].getMaximumInnerInterval()) { + currentRoundingIdx = i - 1; + break; + } else if (i == roundingIdx - 1) { + currentRoundingIdx = i; + break; + } + } + + int requiredBuckets = 0; + do { + Rounding currentRounding = roundings[currentRoundingIdx].rounding; + long currentKey = minKey; + requiredBuckets = 0; + while (currentKey < maxKey) { + requiredBuckets++; + currentKey = currentRounding.nextRoundingValue(currentKey); + } + currentRoundingIdx++; + } while (requiredBuckets > (targetBuckets * roundings[roundingIdx].getMaximumInnerInterval()) + && currentRoundingIdx < roundings.length); + // The loop will increase past the correct rounding index here so we + // need to subtract one to get the rounding index we need + return currentRoundingIdx - 1; + } + + @Override + public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext); + + if (reduceContext.isFinalReduce()) { + // adding empty buckets if needed + reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, reduceContext); + + // Adding empty buckets may have tipped us over the target so merge the buckets again if needed + reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx, + reducedBucketsResult.roundingInfo, reduceContext); + + // Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding + reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext); + } + + BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx, + this.bucketInfo.emptySubAggregations); + + return new InternalAutoDateHistogram(getName(), reducedBucketsResult.buckets, targetBuckets, bucketInfo, format, + pipelineAggregators(), getMetaData()); + } + + private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult reducedBucketsResult, ReduceContext reduceContext) { + List buckets = reducedBucketsResult.buckets; + RoundingInfo roundingInfo = reducedBucketsResult.roundingInfo; + int roundingIdx = reducedBucketsResult.roundingIdx; + if (buckets.size() > targetBuckets) { + for (int interval : roundingInfo.innerIntervals) { + int resultingBuckets = buckets.size() / interval; + if (resultingBuckets <= targetBuckets) { + return mergeConsecutiveBuckets(buckets, interval, roundingIdx, roundingInfo, reduceContext); + } + } + } + return reducedBucketsResult; + } + + private BucketReduceResult mergeConsecutiveBuckets(List reducedBuckets, int mergeInterval, int roundingIdx, + RoundingInfo roundingInfo, ReduceContext reduceContext) { + List mergedBuckets = new ArrayList<>(); + List sameKeyedBuckets = new ArrayList<>(); + + double key = roundingInfo.rounding.round(reducedBuckets.get(0).key); + for (int i = 0; i < reducedBuckets.size(); i++) { + Bucket bucket = reducedBuckets.get(i); + if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) { + reduceContext.consumeBucketsAndMaybeBreak(1); + mergedBuckets.add(sameKeyedBuckets.get(0).reduce(sameKeyedBuckets, roundingInfo.rounding, reduceContext)); + sameKeyedBuckets.clear(); + key = roundingInfo.rounding.round(bucket.key); + } + reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); + sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations)); + } + if (sameKeyedBuckets.isEmpty() == false) { + reduceContext.consumeBucketsAndMaybeBreak(1); + mergedBuckets.add(sameKeyedBuckets.get(0).reduce(sameKeyedBuckets, roundingInfo.rounding, reduceContext)); + } + return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx); + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.startArray(CommonFields.BUCKETS.getPreferredName()); + for (Bucket bucket : buckets) { + bucket.toXContent(builder, params); + } + builder.endArray(); + return builder; + } + + // HistogramFactory method impls + + @Override + public Number getKey(MultiBucketsAggregation.Bucket bucket) { + return ((Bucket) bucket).key; + } + + @Override + public Number nextKey(Number key) { + return bucketInfo.roundingInfos[bucketInfo.roundingIdx].rounding.nextRoundingValue(key.longValue()); + } + + @Override + public InternalAggregation createAggregation(List buckets) { + // convert buckets to the right type + List buckets2 = new ArrayList<>(buckets.size()); + for (Object b : buckets) { + buckets2.add((Bucket) b); + } + buckets2 = Collections.unmodifiableList(buckets2); + return new InternalAutoDateHistogram(name, buckets2, targetBuckets, bucketInfo, format, pipelineAggregators(), getMetaData()); + } + + @Override + public Bucket createBucket(Number key, long docCount, InternalAggregations aggregations) { + return new Bucket(key.longValue(), docCount, format, aggregations); + } + + @Override + protected boolean doEquals(Object obj) { + InternalAutoDateHistogram that = (InternalAutoDateHistogram) obj; + return Objects.equals(buckets, that.buckets) + && Objects.equals(format, that.format) + && Objects.equals(bucketInfo, that.bucketInfo); + } + + @Override + protected int doHashCode() { + return Objects.hash(buckets, format, bucketInfo); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 84dec2c983e..669bda5574d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -424,7 +424,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation< iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key).longValue(); } - assert key == nextBucket.key; + assert key == nextBucket.key : "key: " + key + ", nextBucket.key: " + nextBucket.key; } lastBucket = iter.next(); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/ParsedAutoDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/ParsedAutoDateHistogram.java new file mode 100644 index 00000000000..caca44f9f2e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/ParsedAutoDateHistogram.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; + +import java.io.IOException; +import java.util.List; + +public class ParsedAutoDateHistogram extends ParsedMultiBucketAggregation implements Histogram { + + @Override + public String getType() { + return AutoDateHistogramAggregationBuilder.NAME; + } + + @Override + public List getBuckets() { + return buckets; + } + + private static ObjectParser PARSER = + new ObjectParser<>(ParsedAutoDateHistogram.class.getSimpleName(), true, ParsedAutoDateHistogram::new); + static { + declareMultiBucketAggregationFields(PARSER, + parser -> ParsedBucket.fromXContent(parser, false), + parser -> ParsedBucket.fromXContent(parser, true)); + } + + public static ParsedAutoDateHistogram fromXContent(XContentParser parser, String name) throws IOException { + ParsedAutoDateHistogram aggregation = PARSER.parse(parser, null); + aggregation.setName(name); + return aggregation; + } + + public static class ParsedBucket extends ParsedMultiBucketAggregation.ParsedBucket implements Histogram.Bucket { + + private Long key; + + @Override + public Object getKey() { + if (key != null) { + return new DateTime(key, DateTimeZone.UTC); + } + return null; + } + + @Override + public String getKeyAsString() { + String keyAsString = super.getKeyAsString(); + if (keyAsString != null) { + return keyAsString; + } + if (key != null) { + return Long.toString(key); + } + return null; + } + + @Override + protected XContentBuilder keyToXContent(XContentBuilder builder) throws IOException { + return builder.field(CommonFields.KEY.getPreferredName(), key); + } + + static ParsedBucket fromXContent(XContentParser parser, boolean keyed) throws IOException { + return parseXContent(parser, keyed, ParsedBucket::new, (p, bucket) -> bucket.key = p.longValue()); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java index 79984f58949..fcafce3936e 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.search.aggregations.bucket.filter.InternalFilterTests; import org.elasticsearch.search.aggregations.bucket.filter.InternalFiltersTests; import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGridTests; import org.elasticsearch.search.aggregations.bucket.global.InternalGlobalTests; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogramTests; import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogramTests; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogramTests; import org.elasticsearch.search.aggregations.bucket.missing.InternalMissingTests; @@ -125,6 +126,7 @@ public class AggregationsTests extends ESTestCase { aggsTests.add(new InternalGeoCentroidTests()); aggsTests.add(new InternalHistogramTests()); aggsTests.add(new InternalDateHistogramTests()); + aggsTests.add(new InternalAutoDateHistogramTests()); aggsTests.add(new LongTermsTests()); aggsTests.add(new DoubleTermsTests()); aggsTests.add(new StringTermsTests()); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/AutoDateHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/AutoDateHistogramTests.java new file mode 100644 index 00000000000..3a10edf1833 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/AutoDateHistogramTests.java @@ -0,0 +1,44 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import org.elasticsearch.search.aggregations.BaseAggregationTestCase; +import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder; + +public class AutoDateHistogramTests extends BaseAggregationTestCase { + + @Override + protected AutoDateHistogramAggregationBuilder createTestAggregatorBuilder() { + AutoDateHistogramAggregationBuilder builder = new AutoDateHistogramAggregationBuilder(randomAlphaOfLengthBetween(1, 10)); + builder.field(INT_FIELD_NAME); + builder.setNumBuckets(randomIntBetween(1, 100000)); + if (randomBoolean()) { + builder.format("###.##"); + } + if (randomBoolean()) { + builder.missing(randomIntBetween(0, 10)); + } + if (randomBoolean()) { + builder.timeZone(randomDateTimeZone()); + } + return builder; + } + +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java new file mode 100644 index 00000000000..7cf29e3aa9c --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -0,0 +1,1332 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.MatchNoDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; +import org.elasticsearch.search.aggregations.metrics.stats.Stats; +import org.hamcrest.Matchers; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.chrono.ISOChronology; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.containsString; + +public class AutoDateHistogramAggregatorTests extends AggregatorTestCase { + + private static final String DATE_FIELD = "date"; + private static final String INSTANT_FIELD = "instant"; + + private static final List dataset = Arrays.asList( + "2010-03-12T01:07:45", + "2010-04-27T03:43:34", + "2012-05-18T04:11:00", + "2013-05-29T05:11:31", + "2013-10-31T08:24:05", + "2015-02-13T13:09:32", + "2015-06-24T13:47:43", + "2015-11-13T16:14:34", + "2016-03-04T17:09:50", + "2017-12-12T22:55:46"); + + public void testMatchNoDocs() throws IOException { + testBothCases(new MatchNoDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), + histogram -> assertEquals(0, histogram.getBuckets().size()) + ); + } + + public void testMatchAllDocs() throws IOException { + Query query = new MatchAllDocsQuery(); + + testSearchCase(query, dataset, + aggregation -> aggregation.setNumBuckets(6).field(DATE_FIELD), + histogram -> assertEquals(10, histogram.getBuckets().size()) + ); + testSearchAndReduceCase(query, dataset, + aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD), + histogram -> assertEquals(8, histogram.getBuckets().size()) + ); + } + + public void testSubAggregations() throws IOException { + Query query = new MatchAllDocsQuery(); + testSearchAndReduceCase(query, dataset, + aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD) + .subAggregation(AggregationBuilders.stats("stats").field(DATE_FIELD)), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(8, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2010-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + Stats stats = bucket.getAggregations().get("stats"); + assertEquals("2010-03-12T01:07:45.000Z", stats.getMinAsString()); + assertEquals("2010-04-27T03:43:34.000Z", stats.getMaxAsString()); + assertEquals(2L, stats.getCount()); + + bucket = buckets.get(1); + assertEquals("2011-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + stats = bucket.getAggregations().get("stats"); + assertTrue(Double.isInfinite(stats.getMin())); + assertTrue(Double.isInfinite(stats.getMax())); + assertEquals(0L, stats.getCount()); + + bucket = buckets.get(2); + assertEquals("2012-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + stats = bucket.getAggregations().get("stats"); + assertEquals("2012-05-18T04:11:00.000Z", stats.getMinAsString()); + assertEquals("2012-05-18T04:11:00.000Z", stats.getMaxAsString()); + assertEquals(1L, stats.getCount()); + + bucket = buckets.get(3); + assertEquals("2013-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + stats = bucket.getAggregations().get("stats"); + assertEquals("2013-05-29T05:11:31.000Z", stats.getMinAsString()); + assertEquals("2013-10-31T08:24:05.000Z", stats.getMaxAsString()); + assertEquals(2L, stats.getCount()); + + bucket = buckets.get(4); + assertEquals("2014-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + stats = bucket.getAggregations().get("stats"); + assertTrue(Double.isInfinite(stats.getMin())); + assertTrue(Double.isInfinite(stats.getMax())); + assertEquals(0L, stats.getCount()); + + bucket = buckets.get(5); + assertEquals("2015-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + stats = bucket.getAggregations().get("stats"); + assertEquals("2015-02-13T13:09:32.000Z", stats.getMinAsString()); + assertEquals("2015-11-13T16:14:34.000Z", stats.getMaxAsString()); + assertEquals(3L, stats.getCount()); + + bucket = buckets.get(6); + assertEquals("2016-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + stats = bucket.getAggregations().get("stats"); + assertEquals("2016-03-04T17:09:50.000Z", stats.getMinAsString()); + assertEquals("2016-03-04T17:09:50.000Z", stats.getMaxAsString()); + assertEquals(1L, stats.getCount()); + + bucket = buckets.get(7); + assertEquals("2017-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + stats = bucket.getAggregations().get("stats"); + assertEquals("2017-12-12T22:55:46.000Z", stats.getMinAsString()); + assertEquals("2017-12-12T22:55:46.000Z", stats.getMaxAsString()); + assertEquals(1L, stats.getCount()); + }); + } + + public void testNoDocs() throws IOException { + Query query = new MatchNoDocsQuery(); + List dates = Collections.emptyList(); + Consumer aggregation = agg -> agg.setNumBuckets(10).field(DATE_FIELD); + + testSearchCase(query, dates, aggregation, + histogram -> assertEquals(0, histogram.getBuckets().size()) + ); + testSearchAndReduceCase(query, dates, aggregation, + histogram -> assertNull(histogram) + ); + } + + public void testAggregateWrongField() throws IOException { + testBothCases(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(10).field("wrong_field"), + histogram -> assertEquals(0, histogram.getBuckets().size()) + ); + } + + public void testIntervalYear() throws IOException { + testSearchCase(LongPoint.newRangeQuery(INSTANT_FIELD, asLong("2015-01-01"), asLong("2017-12-31")), dataset, + aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(5, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2015-02-13T13:09:32.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2015-06-24T13:47:43.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2015-11-13T16:14:34.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2016-03-04T17:09:50.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-12-12T22:55:46.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + } + ); + testSearchAndReduceCase(LongPoint.newRangeQuery(INSTANT_FIELD, asLong("2015-01-01"), asLong("2017-12-31")), dataset, + aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(3, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2015-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2016-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + } + ); + } + + public void testIntervalMonth() throws IOException { + testSearchCase(new MatchAllDocsQuery(), + Arrays.asList("2017-01-01", "2017-02-02", "2017-02-03", "2017-03-04", "2017-03-05", "2017-03-06"), + aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(6, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-02T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-03T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-03-04T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-03-05T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(5); + assertEquals("2017-03-06T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), + Arrays.asList("2017-01-01", "2017-02-02", "2017-02-03", "2017-03-04", "2017-03-05", "2017-03-06"), + aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(3, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-03-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + } + ); + } + + public void testWithLargeNumberOfBuckets() { + Query query = new MatchAllDocsQuery(); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, + () -> testSearchCase(query, dataset, + aggregation -> aggregation.setNumBuckets(MultiBucketConsumerService.DEFAULT_MAX_BUCKETS+1).field(DATE_FIELD), + // since an exception is thrown, this assertion won't be invoked. + histogram -> assertTrue(false) + )); + assertThat(exception.getMessage(), containsString("must be less than")); + } + + public void testIntervalDay() throws IOException { + testSearchCase(new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(4, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-02T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-03T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-05T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), + Arrays.asList( + "2017-02-01", + "2017-02-02", + "2017-02-02", + "2017-02-03", + "2017-02-03", + "2017-02-03", + "2017-02-05" + ), + aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(5, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-02T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-03T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-04T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-05T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + } + ); + } + + public void testIntervalDayWithTZ() throws IOException { + testSearchCase(new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD).timeZone(DateTimeZone.forOffsetHours(-1)), histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(4, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-01-31T23:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T23:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-02T23:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-04T23:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD).timeZone(DateTimeZone.forOffsetHours(-1)), histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(5, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-01-31T00:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T00:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-02T00:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-03T00:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-04T00:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + }); + } + + public void testIntervalHour() throws IOException { + testSearchCase(new MatchAllDocsQuery(), + Arrays.asList( + "2017-02-01T09:02:00.000Z", + "2017-02-01T09:35:00.000Z", + "2017-02-01T10:15:00.000Z", + "2017-02-01T13:06:00.000Z", + "2017-02-01T14:04:00.000Z", + "2017-02-01T14:05:00.000Z", + "2017-02-01T15:59:00.000Z", + "2017-02-01T16:06:00.000Z", + "2017-02-01T16:48:00.000Z", + "2017-02-01T16:59:00.000Z" + ), + aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(10, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T09:02:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T09:35:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T10:15:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T13:06:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-01T14:04:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(5); + assertEquals("2017-02-01T14:05:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(6); + assertEquals("2017-02-01T15:59:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(7); + assertEquals("2017-02-01T16:06:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(8); + assertEquals("2017-02-01T16:48:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(9); + assertEquals("2017-02-01T16:59:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + } + ); + testSearchAndReduceCase(new MatchAllDocsQuery(), + Arrays.asList( + "2017-02-01T09:02:00.000Z", + "2017-02-01T09:35:00.000Z", + "2017-02-01T10:15:00.000Z", + "2017-02-01T13:06:00.000Z", + "2017-02-01T14:04:00.000Z", + "2017-02-01T14:05:00.000Z", + "2017-02-01T15:59:00.000Z", + "2017-02-01T16:06:00.000Z", + "2017-02-01T16:48:00.000Z", + "2017-02-01T16:59:00.000Z" + ), + aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(8, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T09:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T10:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T11:00:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T12:00:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-01T13:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(5); + assertEquals("2017-02-01T14:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(6); + assertEquals("2017-02-01T15:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(7); + assertEquals("2017-02-01T16:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + } + ); + } + + public void testIntervalHourWithTZ() throws IOException { + testSearchCase(new MatchAllDocsQuery(), + Arrays.asList( + "2017-02-01T09:02:00.000Z", + "2017-02-01T09:35:00.000Z", + "2017-02-01T10:15:00.000Z", + "2017-02-01T13:06:00.000Z", + "2017-02-01T14:04:00.000Z", + "2017-02-01T14:05:00.000Z", + "2017-02-01T15:59:00.000Z", + "2017-02-01T16:06:00.000Z", + "2017-02-01T16:48:00.000Z", + "2017-02-01T16:59:00.000Z" + ), + aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD).timeZone(DateTimeZone.forOffsetHours(-1)), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(10, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T08:02:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T08:35:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T09:15:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T12:06:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-01T13:04:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(5); + assertEquals("2017-02-01T13:05:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(6); + assertEquals("2017-02-01T14:59:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(7); + assertEquals("2017-02-01T15:06:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(8); + assertEquals("2017-02-01T15:48:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(9); + assertEquals("2017-02-01T15:59:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + } + ); + testSearchAndReduceCase(new MatchAllDocsQuery(), + Arrays.asList( + "2017-02-01T09:02:00.000Z", + "2017-02-01T09:35:00.000Z", + "2017-02-01T10:15:00.000Z", + "2017-02-01T13:06:00.000Z", + "2017-02-01T14:04:00.000Z", + "2017-02-01T14:05:00.000Z", + "2017-02-01T15:59:00.000Z", + "2017-02-01T16:06:00.000Z", + "2017-02-01T16:48:00.000Z", + "2017-02-01T16:59:00.000Z" + ), + aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD).timeZone(DateTimeZone.forOffsetHours(-1)), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(8, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T08:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T09:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T10:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T11:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-01T12:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(5); + assertEquals("2017-02-01T13:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(6); + assertEquals("2017-02-01T14:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(7); + assertEquals("2017-02-01T15:00:00.000-01:00", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + } + ); + } + + public void testAllSecondIntervals() throws IOException { + DateTimeFormatter format = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + List dataset = new ArrayList<>(); + DateTime startDate = new DateTime(2017, 01, 01, 00, 00, 00, ISOChronology.getInstanceUTC()); + for (int i = 0; i < 600; i++) { + DateTime date = startDate.plusSeconds(i); + dataset.add(format.print(date)); + } + + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(600).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(600, buckets.size()); + for (int i = 0; i < 600; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusSeconds(i), bucket.getKey()); + assertEquals(1, bucket.getDocCount()); + } + }); + + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(300).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(120, buckets.size()); + for (int i = 0; i < 120; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusSeconds(i * 5), bucket.getKey()); + assertEquals(5, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(100).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(60, buckets.size()); + for (int i = 0; i < 60; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusSeconds(i * 10), bucket.getKey()); + assertEquals(10, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(50).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(20, buckets.size()); + for (int i = 0; i < 20; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusSeconds(i * 30), bucket.getKey()); + assertEquals(30, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(15).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(10, buckets.size()); + for (int i = 0; i < 10; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusMinutes(i), bucket.getKey()); + assertEquals(60, bucket.getDocCount()); + } + }); + } + + public void testAllMinuteIntervals() throws IOException { + DateTimeFormatter format = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + List dataset = new ArrayList<>(); + DateTime startDate = new DateTime(2017, 01, 01, 00, 00, 00, ISOChronology.getInstanceUTC()); + for (int i = 0; i < 600; i++) { + DateTime date = startDate.plusMinutes(i); + dataset.add(format.print(date)); + } + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(600).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(600, buckets.size()); + for (int i = 0; i < 600; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusMinutes(i), bucket.getKey()); + assertEquals(1, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(300).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(120, buckets.size()); + for (int i = 0; i < 120; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusMinutes(i * 5), bucket.getKey()); + assertEquals(5, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(100).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(60, buckets.size()); + for (int i = 0; i < 60; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusMinutes(i * 10), bucket.getKey()); + assertEquals(10, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(50).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(20, buckets.size()); + for (int i = 0; i < 20; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusMinutes(i * 30), bucket.getKey()); + assertEquals(30, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(15).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(10, buckets.size()); + for (int i = 0; i < 10; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusHours(i), bucket.getKey()); + assertEquals(60, bucket.getDocCount()); + } + }); + } + + public void testAllHourIntervals() throws IOException { + DateTimeFormatter format = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + List dataset = new ArrayList<>(); + DateTime startDate = new DateTime(2017, 01, 01, 00, 00, 00, ISOChronology.getInstanceUTC()); + for (int i = 0; i < 600; i++) { + DateTime date = startDate.plusHours(i); + dataset.add(format.print(date)); + } + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(600).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(600, buckets.size()); + for (int i = 0; i < 600; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusHours(i), bucket.getKey()); + assertEquals(1, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(300).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(200, buckets.size()); + for (int i = 0; i < 200; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusHours(i * 3), bucket.getKey()); + assertEquals(3, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(100).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(50, buckets.size()); + for (int i = 0; i < 50; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusHours(i * 12), bucket.getKey()); + assertEquals(12, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(30).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(25, buckets.size()); + for (int i = 0; i < 25; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusDays(i), bucket.getKey()); + assertEquals(24, bucket.getDocCount()); + } + }); + } + + public void testAllDayIntervals() throws IOException { + DateTimeFormatter format = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + List dataset = new ArrayList<>(); + DateTime startDate = new DateTime(2017, 01, 01, 00, 00, 00, ISOChronology.getInstanceUTC()); + for (int i = 0; i < 700; i++) { + DateTime date = startDate.plusDays(i); + dataset.add(format.print(date)); + } + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(700).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(700, buckets.size()); + for (int i = 0; i < 700; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusDays(i), bucket.getKey()); + assertEquals(1, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(300).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(100, buckets.size()); + for (int i = 0; i < 100; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusDays(i * 7), bucket.getKey()); + assertEquals(7, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(30).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(24, buckets.size()); + for (int i = 0; i < 24; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusMonths(i), bucket.getKey()); + assertThat(bucket.getDocCount(), Matchers.lessThanOrEqualTo(31L)); + } + }); + } + + public void testAllMonthIntervals() throws IOException { + DateTimeFormatter format = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + List dataset = new ArrayList<>(); + DateTime startDate = new DateTime(2017, 01, 01, 00, 00, 00, ISOChronology.getInstanceUTC()); + for (int i = 0; i < 600; i++) { + DateTime date = startDate.plusMonths(i); + dataset.add(format.print(date)); + } + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(600).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(600, buckets.size()); + for (int i = 0; i < 600; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusMonths(i), bucket.getKey()); + assertEquals(1, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(300).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(200, buckets.size()); + for (int i = 0; i < 200; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusMonths(i * 3), bucket.getKey()); + assertEquals(3, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, + aggregation -> aggregation.setNumBuckets(60).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(50, buckets.size()); + for (int i = 0; i < 50; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusYears(i), bucket.getKey()); + assertEquals(12, bucket.getDocCount()); + } + }); + } + + public void testAllYearIntervals() throws IOException { + DateTimeFormatter format = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + List dataset = new ArrayList<>(); + DateTime startDate = new DateTime(2017, 01, 01, 00, 00, 00, ISOChronology.getInstanceUTC()); + for (int i = 0; i < 600; i++) { + DateTime date = startDate.plusYears(i); + dataset.add(format.print(date)); + } + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, aggregation -> aggregation.setNumBuckets(600).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(600, buckets.size()); + for (int i = 0; i < 600; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusYears(i), bucket.getKey()); + assertEquals(1, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, aggregation -> aggregation.setNumBuckets(300).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(120, buckets.size()); + for (int i = 0; i < 120; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusYears(i * 5), bucket.getKey()); + assertEquals(5, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, aggregation -> aggregation.setNumBuckets(100).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(60, buckets.size()); + for (int i = 0; i < 60; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusYears(i * 10), bucket.getKey()); + assertEquals(10, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, aggregation -> aggregation.setNumBuckets(50).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(30, buckets.size()); + for (int i = 0; i < 30; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusYears(i * 20), bucket.getKey()); + assertEquals(20, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, aggregation -> aggregation.setNumBuckets(20).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(12, buckets.size()); + for (int i = 0; i < 12; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusYears(i * 50), bucket.getKey()); + assertEquals(50, bucket.getDocCount()); + } + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), dataset, aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(6, buckets.size()); + for (int i = 0; i < 6; i++) { + Histogram.Bucket bucket = buckets.get(i); + assertEquals(startDate.plusYears(i * 100), bucket.getKey()); + assertEquals(100, bucket.getDocCount()); + } + }); + } + + public void testInterval3Hour() throws IOException { + testSearchCase(new MatchAllDocsQuery(), + Arrays.asList( + "2017-02-01T09:02:00.000Z", + "2017-02-01T09:35:00.000Z", + "2017-02-01T10:15:00.000Z", + "2017-02-01T13:06:00.000Z", + "2017-02-01T14:04:00.000Z", + "2017-02-01T14:05:00.000Z", + "2017-02-01T15:59:00.000Z", + "2017-02-01T16:06:00.000Z", + "2017-02-01T16:48:00.000Z", + "2017-02-01T16:59:00.000Z" + ), + aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(10, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T09:02:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T09:35:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T10:15:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T13:06:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-01T14:04:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(5); + assertEquals("2017-02-01T14:05:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(6); + assertEquals("2017-02-01T15:59:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(7); + assertEquals("2017-02-01T16:06:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(8); + assertEquals("2017-02-01T16:48:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(9); + assertEquals("2017-02-01T16:59:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + } + ); + testSearchAndReduceCase(new MatchAllDocsQuery(), + Arrays.asList( + "2017-02-01T09:02:00.000Z", + "2017-02-01T09:35:00.000Z", + "2017-02-01T10:15:00.000Z", + "2017-02-01T13:06:00.000Z", + "2017-02-01T14:04:00.000Z", + "2017-02-01T14:05:00.000Z", + "2017-02-01T15:59:00.000Z", + "2017-02-01T16:06:00.000Z", + "2017-02-01T16:48:00.000Z", + "2017-02-01T16:59:00.000Z" + ), + aggregation -> aggregation.setNumBuckets(6).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(3, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T09:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T12:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T15:00:00.000Z", bucket.getKeyAsString()); + assertEquals(4, bucket.getDocCount()); + } + ); + } + + public void testIntervalMinute() throws IOException { + testSearchCase(new MatchAllDocsQuery(), + Arrays.asList( + "2017-02-01T09:02:35.000Z", + "2017-02-01T09:02:59.000Z", + "2017-02-01T09:15:37.000Z", + "2017-02-01T09:16:04.000Z", + "2017-02-01T09:16:42.000Z" + ), + aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(5, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T09:02:35.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T09:02:59.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T09:15:37.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T09:16:04.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-01T09:16:42.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + } + ); + testSearchAndReduceCase(new MatchAllDocsQuery(), + Arrays.asList( + "2017-02-01T09:02:35.000Z", + "2017-02-01T09:02:59.000Z", + "2017-02-01T09:15:37.000Z", + "2017-02-01T09:16:04.000Z", + "2017-02-01T09:16:42.000Z" + ), + aggregation -> aggregation.setNumBuckets(15).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(15, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T09:02:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T09:03:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T09:04:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T09:05:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-01T09:06:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(5); + assertEquals("2017-02-01T09:07:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(6); + assertEquals("2017-02-01T09:08:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(7); + assertEquals("2017-02-01T09:09:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(8); + assertEquals("2017-02-01T09:10:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(9); + assertEquals("2017-02-01T09:11:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(10); + assertEquals("2017-02-01T09:12:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(11); + assertEquals("2017-02-01T09:13:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(12); + assertEquals("2017-02-01T09:14:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(13); + assertEquals("2017-02-01T09:15:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(14); + assertEquals("2017-02-01T09:16:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + } + ); + } + + public void testIntervalSecond() throws IOException { + testSearchCase(new MatchAllDocsQuery(), + Arrays.asList("2017-02-01T00:00:05.015Z", "2017-02-01T00:00:07.299Z", "2017-02-01T00:00:07.074Z", + "2017-02-01T00:00:11.688Z", "2017-02-01T00:00:11.210Z", "2017-02-01T00:00:11.380Z"), + aggregation -> aggregation.setNumBuckets(7).field(DATE_FIELD), histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(3, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T00:00:05.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T00:00:07.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T00:00:11.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + }); + testSearchAndReduceCase(new MatchAllDocsQuery(), + Arrays.asList( + "2017-02-01T00:00:05.015Z", + "2017-02-01T00:00:07.299Z", + "2017-02-01T00:00:07.074Z", + "2017-02-01T00:00:11.688Z", + "2017-02-01T00:00:11.210Z", + "2017-02-01T00:00:11.380Z" + ), + aggregation -> aggregation.setNumBuckets(7).field(DATE_FIELD), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(7, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T00:00:05.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-01T00:00:06.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-01T00:00:07.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + + bucket = buckets.get(3); + assertEquals("2017-02-01T00:00:08.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(4); + assertEquals("2017-02-01T00:00:09.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(5); + assertEquals("2017-02-01T00:00:10.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(6); + assertEquals("2017-02-01T00:00:11.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + } + ); + } + + private void testSearchCase(Query query, List dataset, + Consumer configure, + Consumer verify) throws IOException { + executeTestCase(false, query, dataset, configure, verify); + } + + private void testSearchAndReduceCase(Query query, List dataset, + Consumer configure, + Consumer verify) throws IOException { + executeTestCase(true, query, dataset, configure, verify); + } + + private void testBothCases(Query query, List dataset, + Consumer configure, + Consumer verify) throws IOException { + testSearchCase(query, dataset, configure, verify); + testSearchAndReduceCase(query, dataset, configure, verify); + } + + @Override + protected IndexSettings createIndexSettings() { + Settings nodeSettings = Settings.builder() + .put("search.max_buckets", 100000).build(); + return new IndexSettings( + IndexMetaData.builder("_index").settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .creationDate(System.currentTimeMillis()) + .build(), + nodeSettings + ); + } + + private void executeTestCase(boolean reduced, Query query, List dataset, + Consumer configure, + Consumer verify) throws IOException { + + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + Document document = new Document(); + for (String date : dataset) { + if (frequently()) { + indexWriter.commit(); + } + + long instant = asLong(date); + document.add(new SortedNumericDocValuesField(DATE_FIELD, instant)); + document.add(new LongPoint(INSTANT_FIELD, instant)); + indexWriter.addDocument(document); + document.clear(); + } + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + AutoDateHistogramAggregationBuilder aggregationBuilder = new AutoDateHistogramAggregationBuilder("_name"); + if (configure != null) { + configure.accept(aggregationBuilder); + } + + DateFieldMapper.Builder builder = new DateFieldMapper.Builder("_name"); + DateFieldMapper.DateFieldType fieldType = builder.fieldType(); + fieldType.setHasDocValues(true); + fieldType.setName(aggregationBuilder.field()); + + InternalAutoDateHistogram histogram; + if (reduced) { + histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType); + } else { + histogram = search(indexSearcher, query, aggregationBuilder, fieldType); + } + verify.accept(histogram); + } + } + } + + private static long asLong(String dateTime) { + return DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parser().parseDateTime(dateTime).getMillis(); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java new file mode 100644 index 00000000000..389371efd79 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java @@ -0,0 +1,154 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.rounding.DateTimeUnit; +import org.elasticsearch.common.rounding.Rounding; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram.BucketInfo; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.test.InternalMultiBucketAggregationTestCase; +import org.joda.time.DateTime; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.elasticsearch.common.unit.TimeValue.timeValueHours; +import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes; +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; + +public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregationTestCase { + + private DocValueFormat format; + private RoundingInfo[] roundingInfos; + + @Override + public void setUp() throws Exception { + super.setUp(); + format = randomNumericDocValueFormat(); + + roundingInfos = new RoundingInfo[6]; + roundingInfos[0] = new RoundingInfo(Rounding.builder(DateTimeUnit.SECOND_OF_MINUTE).build(), 1, 5, 10, 30); + roundingInfos[1] = new RoundingInfo(Rounding.builder(DateTimeUnit.MINUTES_OF_HOUR).build(), 1, 5, 10, 30); + roundingInfos[2] = new RoundingInfo(Rounding.builder(DateTimeUnit.HOUR_OF_DAY).build(), 1, 3, 12); + roundingInfos[3] = new RoundingInfo(Rounding.builder(DateTimeUnit.DAY_OF_MONTH).build(), 1, 7); + roundingInfos[4] = new RoundingInfo(Rounding.builder(DateTimeUnit.MONTH_OF_YEAR).build(), 1, 3); + roundingInfos[5] = new RoundingInfo(Rounding.builder(DateTimeUnit.YEAR_OF_CENTURY).build(), 1, 10, 20, 50, 100); + } + + @Override + protected InternalAutoDateHistogram createTestInstance(String name, + List pipelineAggregators, + Map metaData, + InternalAggregations aggregations) { + int nbBuckets = randomNumberOfBuckets(); + int targetBuckets = randomIntBetween(1, nbBuckets * 2 + 1); + List buckets = new ArrayList<>(nbBuckets); + long startingDate = System.currentTimeMillis(); + + long interval = randomIntBetween(1, 3); + long intervalMillis = randomFrom(timeValueSeconds(interval), timeValueMinutes(interval), timeValueHours(interval)).getMillis(); + + for (int i = 0; i < nbBuckets; i++) { + long key = startingDate + (intervalMillis * i); + buckets.add(i, new InternalAutoDateHistogram.Bucket(key, randomIntBetween(1, 100), format, aggregations)); + } + InternalAggregations subAggregations = new InternalAggregations(Collections.emptyList()); + BucketInfo bucketInfo = new BucketInfo(roundingInfos, randomIntBetween(0, roundingInfos.length - 1), subAggregations); + + return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators, metaData); + } + + @Override + protected void assertReduced(InternalAutoDateHistogram reduced, List inputs) { + int roundingIdx = 0; + for (InternalAutoDateHistogram histogram : inputs) { + if (histogram.getBucketInfo().roundingIdx > roundingIdx) { + roundingIdx = histogram.getBucketInfo().roundingIdx; + } + } + Map expectedCounts = new TreeMap<>(); + for (Histogram histogram : inputs) { + for (Histogram.Bucket bucket : histogram.getBuckets()) { + expectedCounts.compute(roundingInfos[roundingIdx].rounding.round(((DateTime) bucket.getKey()).getMillis()), + (key, oldValue) -> (oldValue == null ? 0 : oldValue) + bucket.getDocCount()); + } + } + Map actualCounts = new TreeMap<>(); + for (Histogram.Bucket bucket : reduced.getBuckets()) { + actualCounts.compute(((DateTime) bucket.getKey()).getMillis(), + (key, oldValue) -> (oldValue == null ? 0 : oldValue) + bucket.getDocCount()); + } + assertEquals(expectedCounts, actualCounts); + } + + @Override + protected Writeable.Reader instanceReader() { + return InternalAutoDateHistogram::new; + } + + @Override + protected Class implementationClass() { + return ParsedAutoDateHistogram.class; + } + + @Override + protected InternalAutoDateHistogram mutateInstance(InternalAutoDateHistogram instance) { + String name = instance.getName(); + List buckets = instance.getBuckets(); + int targetBuckets = instance.getTargetBuckets(); + BucketInfo bucketInfo = instance.getBucketInfo(); + List pipelineAggregators = instance.pipelineAggregators(); + Map metaData = instance.getMetaData(); + switch (between(0, 3)) { + case 0: + name += randomAlphaOfLength(5); + break; + case 1: + buckets = new ArrayList<>(buckets); + buckets.add(new InternalAutoDateHistogram.Bucket(randomNonNegativeLong(), randomIntBetween(1, 100), format, + InternalAggregations.EMPTY)); + break; + case 2: + int roundingIdx = bucketInfo.roundingIdx == bucketInfo.roundingInfos.length - 1 ? 0 : bucketInfo.roundingIdx + 1; + bucketInfo = new BucketInfo(bucketInfo.roundingInfos, roundingIdx, bucketInfo.emptySubAggregations); + break; + case 3: + if (metaData == null) { + metaData = new HashMap<>(1); + } else { + metaData = new HashMap<>(instance.getMetaData()); + } + metaData.put(randomAlphaOfLength(15), randomInt()); + break; + default: + throw new AssertionError("Illegal randomisation branch"); + } + return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators, metaData); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 07f25986655..e84f2a99a11 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -87,7 +87,6 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; /** * Base class for testing {@link Aggregator} implementations. @@ -229,7 +228,7 @@ public abstract class AggregatorTestCase extends ESTestCase { }); when(searchContext.bitsetFilterCache()).thenReturn(new BitsetFilterCache(indexSettings, mock(Listener.class))); doAnswer(invocation -> { - /* Store the releasables so we can release them at the end of the test case. This is important because aggregations don't + /* Store the release-ables so we can release them at the end of the test case. This is important because aggregations don't * close their sub-aggregations. This is fairly similar to what the production code does. */ releasables.add((Releasable) invocation.getArguments()[0]); return null; diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index 838b0e315ea..15e44853a97 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -53,8 +53,10 @@ import org.elasticsearch.search.aggregations.bucket.geogrid.GeoGridAggregationBu import org.elasticsearch.search.aggregations.bucket.geogrid.ParsedGeoHashGrid; import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.global.ParsedGlobal; +import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.ParsedAutoDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.ParsedHistogram; import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder; @@ -181,6 +183,7 @@ public abstract class InternalAggregationTestCase map.put(GeoCentroidAggregationBuilder.NAME, (p, c) -> ParsedGeoCentroid.fromXContent(p, (String) c)); map.put(HistogramAggregationBuilder.NAME, (p, c) -> ParsedHistogram.fromXContent(p, (String) c)); map.put(DateHistogramAggregationBuilder.NAME, (p, c) -> ParsedDateHistogram.fromXContent(p, (String) c)); + map.put(AutoDateHistogramAggregationBuilder.NAME, (p, c) -> ParsedAutoDateHistogram.fromXContent(p, (String) c)); map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c)); map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c)); map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c)); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java index 952b6c02794..6f0aebe2396 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java @@ -149,7 +149,8 @@ public abstract class InternalMultiBucketAggregationTestCase parsedClass = implementationClass(); assertNotNull("Parsed aggregation class must not be null", parsedClass); - assertTrue(parsedClass.isInstance(actual)); + assertTrue("Unexpected parsed class, expected instance of: " + actual + ", but was: " + parsedClass, + parsedClass.isInstance(actual)); assertTrue(expected instanceof InternalAggregation); assertEquals(expected.getName(), actual.getName());