Adds a new auto-interval date histogram (#28993)

* Adds a new auto-interval date histogram

This change adds a new type of histogram aggregation called `auto_date_histogram` where you can specify the target number of buckets you require and it will find an appropriate interval for the returned buckets. The aggregation works by first collecting documents in buckets at second interval, when it has created more than the target number of buckets it merges these buckets into minute interval bucket and continues collecting until it reaches the target number of buckets again. It will keep merging buckets when it exceeds the target until either collection is finished or the highest interval (currently years) is reached. A similar process happens at reduce time.

This aggregation intentionally does not support min_doc_count, offest and extended_bounds to keep the already complex logic from becoming more complex. The aggregation accepts sub-aggregations but will always operate in `breadth_first` mode deferring the computation of sub-aggregations until the final buckets from the shard are known. min_doc_count is effectively hard-coded to zero meaning that we will insert empty buckets where necessary.

Closes #9572

* Adds documentation

* Added sub aggregator test

* Fixes failing docs test

* Brings branch up to date with master changes

* trying to get tests to pass again

* Fixes multiBucketConsumer accounting

* Collects more buckets than needed on shards

This gives us more options at reduce time in terms of how we do the
final merge of the buckeets to produce the final result

* Revert "Collects more buckets than needed on shards"

This reverts commit 993c782d117892af9a3c86a51921cdee630a3ac5.

* Adds ability to merge within a rounding

* Fixes nonn-timezone doc test failure

* Fix time zone tests

* iterates on tests

* Adds test case and documentation changes

Added some notes in the documentation about the intervals that can bbe
returned.

Also added a test case that utilises the merging of conseecutive buckets

* Fixes performance bug

The bug meant that getAppropriate rounding look a huge amount of time
if the range of the data was large but also sparsely populated. In
these situations the rounding would be very low so iterating through
the rounding values from the min key to the max keey look a long time
(~120 seconds in one test).

The solution is to add a rough estimate first which chooses the
rounding based just on the long values of the min and max keeys alone
but selects the rounding one lower than the one it thinks is
appropriate so the accurate method can choose the final rounding taking
into account the fact that intervals are not always fixed length.

Thee commit also adds more tests

* Changes to only do complex reduction on final reduce

* merge latest with master

* correct tests and add a new test case for 10k buckets

* refactor to perform bucket number check in innerBuild

* correctly derive bucket setting, update tests to increase bucket threshold

* fix checkstyle

* address code review comments

* add documentation for default buckets

* fix typo
This commit is contained in:
Colin Goodheart-Smithe 2018-07-13 18:08:35 +01:00 committed by Paul Sanwald
parent 2c3ea43f45
commit 0edb096eb4
20 changed files with 3263 additions and 6 deletions

View File

@ -85,8 +85,10 @@ import org.elasticsearch.search.aggregations.bucket.geogrid.GeoGridAggregationBu
import org.elasticsearch.search.aggregations.bucket.geogrid.ParsedGeoHashGrid;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.global.ParsedGlobal;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedAutoDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedHistogram;
import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder;
@ -1004,6 +1006,7 @@ public class RestHighLevelClient implements Closeable {
map.put(GeoCentroidAggregationBuilder.NAME, (p, c) -> ParsedGeoCentroid.fromXContent(p, (String) c));
map.put(HistogramAggregationBuilder.NAME, (p, c) -> ParsedHistogram.fromXContent(p, (String) c));
map.put(DateHistogramAggregationBuilder.NAME, (p, c) -> ParsedDateHistogram.fromXContent(p, (String) c));
map.put(AutoDateHistogramAggregationBuilder.NAME, (p, c) -> ParsedAutoDateHistogram.fromXContent(p, (String) c));
map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c));
map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c));
map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c));

View File

@ -19,6 +19,8 @@ the limit will fail with an exception.
include::bucket/adjacency-matrix-aggregation.asciidoc[]
include::bucket/autodatehistogram-aggregation.asciidoc[]
include::bucket/children-aggregation.asciidoc[]
include::bucket/composite-aggregation.asciidoc[]

View File

@ -0,0 +1,283 @@
[[search-aggregations-bucket-autodatehistogram-aggregation]]
=== Auto-interval Date Histogram Aggregation
A multi-bucket aggregation similar to the <<search-aggregations-bucket-datehistogram-aggregation>> except
instead of providing an interval to use as the width of each bucket, a target number of buckets is provided
indicating the number of buckets needed and the interval of the buckets is automatically chosen to best achieve
that target. The number of buckets returned will always be less than or equal to this target number.
The buckets field is optional, and will default to 10 buckets if not specified.
Requesting a target of 10 buckets.
[source,js]
--------------------------------------------------
POST /sales/_search?size=0
{
"aggs" : {
"sales_over_time" : {
"auto_date_histogram" : {
"field" : "date",
"buckets" : 10
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
==== Keys
Internally, a date is represented as a 64 bit number representing a timestamp
in milliseconds-since-the-epoch. These timestamps are returned as the bucket
++key++s. The `key_as_string` is the same timestamp converted to a formatted
date string using the format specified with the `format` parameter:
TIP: If no `format` is specified, then it will use the first date
<<mapping-date-format,format>> specified in the field mapping.
[source,js]
--------------------------------------------------
POST /sales/_search?size=0
{
"aggs" : {
"sales_over_time" : {
"auto_date_histogram" : {
"field" : "date",
"buckets" : 5,
"format" : "yyyy-MM-dd" <1>
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
<1> Supports expressive date <<date-format-pattern,format pattern>>
Response:
[source,js]
--------------------------------------------------
{
...
"aggregations": {
"sales_over_time": {
"buckets": [
{
"key_as_string": "2015-01-01",
"key": 1420070400000,
"doc_count": 3
},
{
"key_as_string": "2015-02-01",
"key": 1422748800000,
"doc_count": 2
},
{
"key_as_string": "2015-03-01",
"key": 1425168000000,
"doc_count": 2
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
=== Intervals
The interval of the returned buckets is selected based on the data collected by the
aggregation so that the number of buckets returned is less than or equal to the number
requested. The possible intervals returned are:
[horizontal]
seconds:: In multiples of 1, 5, 10 and 30
minutes:: In multiples of 1, 5, 10 and 30
hours:: In multiples of 1, 3 and 12
days:: In multiples of 1, and 7
months:: In multiples of 1, and 3
years:: In multiples of 1, 5, 10, 20, 50 and 100
In the worst case, where the number of daily buckets are too many for the requested
number of buckets, the number of buckets returned will be 1/7th of the number of
buckets requested.
==== Time Zone
Date-times are stored in Elasticsearch in UTC. By default, all bucketing and
rounding is also done in UTC. The `time_zone` parameter can be used to indicate
that bucketing should use a different time zone.
Time zones may either be specified as an ISO 8601 UTC offset (e.g. `+01:00` or
`-08:00`) or as a timezone id, an identifier used in the TZ database like
`America/Los_Angeles`.
Consider the following example:
[source,js]
---------------------------------
PUT my_index/log/1?refresh
{
"date": "2015-10-01T00:30:00Z"
}
PUT my_index/log/2?refresh
{
"date": "2015-10-01T01:30:00Z"
}
PUT my_index/log/3?refresh
{
"date": "2015-10-01T02:30:00Z"
}
GET my_index/_search?size=0
{
"aggs": {
"by_day": {
"auto_date_histogram": {
"field": "date",
"buckets" : 3
}
}
}
}
---------------------------------
// CONSOLE
UTC is used if no time zone is specified, three 1-hour buckets are returned
starting at midnight UTC on 1 October 2015:
[source,js]
---------------------------------
{
...
"aggregations": {
"by_day": {
"buckets": [
{
"key_as_string": "2015-10-01T00:00:00.000Z",
"key": 1443657600000,
"doc_count": 1
},
{
"key_as_string": "2015-10-01T01:00:00.000Z",
"key": 1443661200000,
"doc_count": 1
},
{
"key_as_string": "2015-10-01T02:00:00.000Z",
"key": 1443664800000,
"doc_count": 1
}
]
}
}
}
---------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
If a `time_zone` of `-01:00` is specified, then midnight starts at one hour before
midnight UTC:
[source,js]
---------------------------------
GET my_index/_search?size=0
{
"aggs": {
"by_day": {
"auto_date_histogram": {
"field": "date",
"buckets" : 3,
"time_zone": "-01:00"
}
}
}
}
---------------------------------
// CONSOLE
// TEST[continued]
Now three 1-hour buckets are still returned but the first bucket starts at
11:00pm on 30 September 2015 since that is the local time for the bucket in
the specified time zone.
[source,js]
---------------------------------
{
...
"aggregations": {
"by_day": {
"buckets": [
{
"key_as_string": "2015-09-30T23:00:00.000-01:00",
"key": 1443657600000,
"doc_count": 1
},
{
"key_as_string": "2015-10-01T00:00:00.000-01:00",
"key": 1443661200000,
"doc_count": 1
},
{
"key_as_string": "2015-10-01T01:00:00.000-01:00",
"key": 1443664800000,
"doc_count": 1
}
]
}
}
}
---------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
<1> The `key_as_string` value represents midnight on each day
in the specified time zone.
WARNING: When using time zones that follow DST (daylight savings time) changes,
buckets close to the moment when those changes happen can have slightly different
sizes than neighbouring buckets.
For example, consider a DST start in the `CET` time zone: on 27 March 2016 at 2am,
clocks were turned forward 1 hour to 3am local time. If the result of the aggregation
was daily buckets, the bucket covering that day will only hold data for 23 hours
instead of the usual 24 hours for other buckets. The same is true for shorter intervals
like e.g. 12h. Here, we will have only a 11h bucket on the morning of 27 March when the
DST shift happens.
==== Scripts
Like with the normal <<search-aggregations-bucket-datehistogram-aggregation, `date_histogram`>>, both document level
scripts and value level scripts are supported. This aggregation does not however, support the `min_doc_count`,
`extended_bounds` and `order` parameters.
==== Missing value
The `missing` parameter defines how documents that are missing a value should be treated.
By default they will be ignored but it is also possible to treat them as if they
had a value.
[source,js]
--------------------------------------------------
POST /sales/_search?size=0
{
"aggs" : {
"sale_date" : {
"auto_date_histogram" : {
"field" : "date",
"buckets": 10,
"missing": "2000/01/01" <1>
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
<1> Documents without a value in the `publish_date` field will fall into the same bucket as documents that have the value `2000-01-01`.

View File

@ -109,8 +109,10 @@ import org.elasticsearch.search.aggregations.bucket.geogrid.GeoGridAggregationBu
import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing;
@ -395,6 +397,8 @@ public class SearchModule {
HistogramAggregationBuilder::parse).addResultReader(InternalHistogram::new));
registerAggregation(new AggregationSpec(DateHistogramAggregationBuilder.NAME, DateHistogramAggregationBuilder::new,
DateHistogramAggregationBuilder::parse).addResultReader(InternalDateHistogram::new));
registerAggregation(new AggregationSpec(AutoDateHistogramAggregationBuilder.NAME, AutoDateHistogramAggregationBuilder::new,
AutoDateHistogramAggregationBuilder::parse).addResultReader(InternalAutoDateHistogram::new));
registerAggregation(new AggregationSpec(GeoDistanceAggregationBuilder.NAME, GeoDistanceAggregationBuilder::new,
GeoDistanceAggregationBuilder::parse).addResultReader(InternalGeoDistance::new));
registerAggregation(new AggregationSpec(GeoGridAggregationBuilder.NAME, GeoGridAggregationBuilder::new,

View File

@ -84,6 +84,19 @@ public abstract class BucketsAggregator extends AggregatorBase {
subCollector.collect(doc, bucketOrd);
}
public final void mergeBuckets(long[] mergeMap, long newNumBuckets) {
try (IntArray oldDocCounts = docCounts) {
docCounts = bigArrays.newIntArray(newNumBuckets, true);
docCounts.fill(0, newNumBuckets, 0);
for (int i = 0; i < oldDocCounts.size(); i++) {
int docCount = oldDocCounts.get(i);
if (docCount != 0) {
docCounts.increment(mergeMap[i], docCount);
}
}
}
}
public IntArray getDocCounts() {
return docCounts;
}

View File

@ -0,0 +1,236 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.PackedLongValues;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* A specialization of {@link DeferringBucketCollector} that collects all
* matches and then is able to replay a given subset of buckets. Exposes
* mergeBuckets, which can be invoked by the aggregator when increasing the
* rounding interval.
*/
public class MergingBucketsDeferringCollector extends DeferringBucketCollector {
List<Entry> entries = new ArrayList<>();
BucketCollector collector;
final SearchContext searchContext;
LeafReaderContext context;
PackedLongValues.Builder docDeltas;
PackedLongValues.Builder buckets;
long maxBucket = -1;
boolean finished = false;
LongHash selectedBuckets;
public MergingBucketsDeferringCollector(SearchContext context) {
this.searchContext = context;
}
@Override
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.collector = BucketCollector.wrap(deferredCollectors);
}
@Override
public boolean needsScores() {
if (collector == null) {
throw new IllegalStateException();
}
return collector.needsScores();
}
@Override
public void preCollection() throws IOException {
collector.preCollection();
}
private void finishLeaf() {
if (context != null) {
entries.add(new Entry(context, docDeltas.build(), buckets.build()));
}
context = null;
docDeltas = null;
buckets = null;
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
finishLeaf();
context = ctx;
docDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
buckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
return new LeafBucketCollector() {
int lastDoc = 0;
@Override
public void collect(int doc, long bucket) {
docDeltas.add(doc - lastDoc);
buckets.add(bucket);
lastDoc = doc;
maxBucket = Math.max(maxBucket, bucket);
}
};
}
public void mergeBuckets(long[] mergeMap) {
List<Entry> newEntries = new ArrayList<>(entries.size());
for (Entry sourceEntry : entries) {
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
for (PackedLongValues.Iterator itr = sourceEntry.buckets.iterator(); itr.hasNext();) {
long bucket = itr.next();
newBuckets.add(mergeMap[Math.toIntExact(bucket)]);
}
newEntries.add(new Entry(sourceEntry.context, sourceEntry.docDeltas, newBuckets.build()));
}
entries = newEntries;
// if there are buckets that have been collected in the current segment
// we need to update the bucket ordinals there too
if (buckets.size() > 0) {
PackedLongValues currentBuckets = buckets.build();
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
for (PackedLongValues.Iterator itr = currentBuckets.iterator(); itr.hasNext();) {
long bucket = itr.next();
newBuckets.add(mergeMap[Math.toIntExact(bucket)]);
}
buckets = newBuckets;
}
}
@Override
public void postCollection() {
finishLeaf();
finished = true;
}
/**
* Replay the wrapped collector, but only on a selection of buckets.
*/
@Override
public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
if (finished == false) {
throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
}
if (this.selectedBuckets != null) {
throw new IllegalStateException("Already been replayed");
}
final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
for (long bucket : selectedBuckets) {
hash.add(bucket);
}
this.selectedBuckets = hash;
boolean needsScores = collector.needsScores();
Weight weight = null;
if (needsScores) {
weight = searchContext.searcher().createNormalizedWeight(searchContext.query(), true);
}
for (Entry entry : entries) {
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
DocIdSetIterator docIt = null;
if (needsScores && entry.docDeltas.size() > 0) {
Scorer scorer = weight.scorer(entry.context);
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay
// (entry.docDeltas it not empty).
docIt = scorer.iterator();
leafCollector.setScorer(scorer);
}
final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator();
final PackedLongValues.Iterator buckets = entry.buckets.iterator();
int doc = 0;
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
doc += docDeltaIterator.next();
final long bucket = buckets.next();
final long rebasedBucket = hash.find(bucket);
if (rebasedBucket != -1) {
if (needsScores) {
if (docIt.docID() < doc) {
docIt.advance(doc);
}
// aggregations should only be replayed on matching
// documents
assert docIt.docID() == doc;
}
leafCollector.collect(doc, rebasedBucket);
}
}
}
collector.postCollection();
}
/**
* Wrap the provided aggregator so that it behaves (almost) as if it had
* been collected directly.
*/
@Override
public Aggregator wrap(final Aggregator in) {
return new WrappedAggregator(in) {
@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
if (selectedBuckets == null) {
throw new IllegalStateException("Collection has not been replayed yet.");
}
final long rebasedBucket = selectedBuckets.find(bucket);
if (rebasedBucket == -1) {
throw new IllegalStateException("Cannot build for a bucket which has not been collected [" + bucket + "]");
}
return in.buildAggregation(rebasedBucket);
}
};
}
private static class Entry {
final LeafReaderContext context;
final PackedLongValues docDeltas;
final PackedLongValues buckets;
Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) {
this.context = context;
this.docDeltas = docDeltas;
this.buckets = buckets;
}
}
}

View File

@ -0,0 +1,218 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.histogram;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.rounding.DateTimeUnit;
import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
public class AutoDateHistogramAggregationBuilder
extends ValuesSourceAggregationBuilder<ValuesSource.Numeric, AutoDateHistogramAggregationBuilder> {
public static final String NAME = "auto_date_histogram";
public static final ParseField NUM_BUCKETS_FIELD = new ParseField("buckets");
private static final ObjectParser<AutoDateHistogramAggregationBuilder, Void> PARSER;
static {
PARSER = new ObjectParser<>(AutoDateHistogramAggregationBuilder.NAME);
ValuesSourceParserHelper.declareNumericFields(PARSER, true, true, true);
PARSER.declareInt(AutoDateHistogramAggregationBuilder::setNumBuckets, NUM_BUCKETS_FIELD);
}
public static AutoDateHistogramAggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException {
return PARSER.parse(parser, new AutoDateHistogramAggregationBuilder(aggregationName), null);
}
private int numBuckets = 10;
/** Create a new builder with the given name. */
public AutoDateHistogramAggregationBuilder(String name) {
super(name, ValuesSourceType.NUMERIC, ValueType.DATE);
}
/** Read from a stream, for internal use only. */
public AutoDateHistogramAggregationBuilder(StreamInput in) throws IOException {
super(in, ValuesSourceType.NUMERIC, ValueType.DATE);
numBuckets = in.readVInt();
}
protected AutoDateHistogramAggregationBuilder(AutoDateHistogramAggregationBuilder clone, Builder factoriesBuilder,
Map<String, Object> metaData) {
super(clone, factoriesBuilder, metaData);
this.numBuckets = clone.numBuckets;
}
@Override
protected AggregationBuilder shallowCopy(Builder factoriesBuilder, Map<String, Object> metaData) {
return new AutoDateHistogramAggregationBuilder(this, factoriesBuilder, metaData);
}
@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
out.writeVInt(numBuckets);
}
@Override
public String getType() {
return NAME;
}
public AutoDateHistogramAggregationBuilder setNumBuckets(int numBuckets) {
if (numBuckets <= 0) {
throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must be greater than 0 for [" + name + "]");
}
this.numBuckets = numBuckets;
return this;
}
public int getNumBuckets() {
return numBuckets;
}
@Override
protected ValuesSourceAggregatorFactory<Numeric, ?> innerBuild(SearchContext context, ValuesSourceConfig<Numeric> config,
AggregatorFactory<?> parent, Builder subFactoriesBuilder) throws IOException {
RoundingInfo[] roundings = new RoundingInfo[6];
roundings[0] = new RoundingInfo(createRounding(DateTimeUnit.SECOND_OF_MINUTE), 1000L, 1, 5, 10, 30);
roundings[1] = new RoundingInfo(createRounding(DateTimeUnit.MINUTES_OF_HOUR), 60 * 1000L, 1, 5, 10, 30);
roundings[2] = new RoundingInfo(createRounding(DateTimeUnit.HOUR_OF_DAY), 60 * 60 * 1000L, 1, 3, 12);
roundings[3] = new RoundingInfo(createRounding(DateTimeUnit.DAY_OF_MONTH), 24 * 60 * 60 * 1000L, 1, 7);
roundings[4] = new RoundingInfo(createRounding(DateTimeUnit.MONTH_OF_YEAR), 30 * 24 * 60 * 60 * 1000L, 1, 3);
roundings[5] = new RoundingInfo(createRounding(DateTimeUnit.YEAR_OF_CENTURY), 365 * 24 * 60 * 60 * 1000L, 1, 5, 10, 20, 50, 100);
int maxRoundingInterval = Arrays.stream(roundings,0, roundings.length-1)
.map(rounding -> rounding.innerIntervals)
.flatMapToInt(Arrays::stream)
.boxed()
.reduce(Integer::max).get();
Settings settings = context.getQueryShardContext().getIndexSettings().getNodeSettings();
int maxBuckets = MultiBucketConsumerService.MAX_BUCKET_SETTING.get(settings);
int bucketCeiling = maxBuckets / maxRoundingInterval;
if (numBuckets > bucketCeiling) {
throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName()+
" must be less than " + bucketCeiling);
}
return new AutoDateHistogramAggregatorFactory(name, config, numBuckets, roundings, context, parent, subFactoriesBuilder, metaData);
}
private Rounding createRounding(DateTimeUnit interval) {
Rounding.Builder tzRoundingBuilder = Rounding.builder(interval);
if (timeZone() != null) {
tzRoundingBuilder.timeZone(timeZone());
}
Rounding rounding = tzRoundingBuilder.build();
return rounding;
}
@Override
protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(NUM_BUCKETS_FIELD.getPreferredName(), numBuckets);
return builder;
}
@Override
protected int innerHashCode() {
return Objects.hash(numBuckets);
}
@Override
protected boolean innerEquals(Object obj) {
AutoDateHistogramAggregationBuilder other = (AutoDateHistogramAggregationBuilder) obj;
return Objects.equals(numBuckets, other.numBuckets);
}
public static class RoundingInfo implements Writeable {
final Rounding rounding;
final int[] innerIntervals;
final long roughEstimateDurationMillis;
public RoundingInfo(Rounding rounding, long roughEstimateDurationMillis, int... innerIntervals) {
this.rounding = rounding;
this.roughEstimateDurationMillis = roughEstimateDurationMillis;
this.innerIntervals = innerIntervals;
}
public RoundingInfo(StreamInput in) throws IOException {
rounding = Rounding.Streams.read(in);
roughEstimateDurationMillis = in.readVLong();
innerIntervals = in.readIntArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
Rounding.Streams.write(rounding, out);
out.writeVLong(roughEstimateDurationMillis);
out.writeIntArray(innerIntervals);
}
public int getMaximumInnerInterval() {
return innerIntervals[innerIntervals.length - 1];
}
public long getRoughEstimateDurationMillis() {
return roughEstimateDurationMillis;
}
@Override
public int hashCode() {
return Objects.hash(rounding, Arrays.hashCode(innerIntervals));
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
RoundingInfo other = (RoundingInfo) obj;
return Objects.equals(rounding, other.rounding) &&
Objects.deepEquals(innerIntervals, other.innerIntervals);
}
}
}

View File

@ -0,0 +1,199 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.histogram;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator;
import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
import org.elasticsearch.search.aggregations.bucket.MergingBucketsDeferringCollector;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* An aggregator for date values. Every date is rounded down using a configured
* {@link Rounding}.
*
* @see Rounding
*/
class AutoDateHistogramAggregator extends DeferableBucketAggregator {
private final ValuesSource.Numeric valuesSource;
private final DocValueFormat formatter;
private final RoundingInfo[] roundingInfos;
private int roundingIdx = 0;
private LongHash bucketOrds;
private int targetBuckets;
private MergingBucketsDeferringCollector deferringCollector;
AutoDateHistogramAggregator(String name, AggregatorFactories factories, int numBuckets, RoundingInfo[] roundingInfos,
@Nullable ValuesSource.Numeric valuesSource, DocValueFormat formatter, SearchContext aggregationContext, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
this.targetBuckets = numBuckets;
this.valuesSource = valuesSource;
this.formatter = formatter;
this.roundingInfos = roundingInfos;
bucketOrds = new LongHash(1, aggregationContext.bigArrays());
}
@Override
public boolean needsScores() {
return (valuesSource != null && valuesSource.needsScores()) || super.needsScores();
}
@Override
protected boolean shouldDefer(Aggregator aggregator) {
return true;
}
@Override
public DeferringBucketCollector getDeferringCollector() {
deferringCollector = new MergingBucketsDeferringCollector(context);
return deferringCollector;
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final SortedNumericDocValues values = valuesSource.longValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
assert bucket == 0;
if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
long previousRounded = Long.MIN_VALUE;
for (int i = 0; i < valuesCount; ++i) {
long value = values.nextValue();
long rounded = roundingInfos[roundingIdx].rounding.round(value);
assert rounded >= previousRounded;
if (rounded == previousRounded) {
continue;
}
long bucketOrd = bucketOrds.add(rounded);
if (bucketOrd < 0) { // already seen
bucketOrd = -1 - bucketOrd;
collectExistingBucket(sub, doc, bucketOrd);
} else {
collectBucket(sub, doc, bucketOrd);
while (roundingIdx < roundingInfos.length - 1
&& bucketOrds.size() > (targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval())) {
increaseRounding();
}
}
previousRounded = rounded;
}
}
}
private void increaseRounding() {
try (LongHash oldBucketOrds = bucketOrds) {
LongHash newBucketOrds = new LongHash(1, context.bigArrays());
long[] mergeMap = new long[(int) oldBucketOrds.size()];
Rounding newRounding = roundingInfos[++roundingIdx].rounding;
for (int i = 0; i < oldBucketOrds.size(); i++) {
long oldKey = oldBucketOrds.get(i);
long newKey = newRounding.round(oldKey);
long newBucketOrd = newBucketOrds.add(newKey);
if (newBucketOrd >= 0) {
mergeMap[i] = newBucketOrd;
} else {
mergeMap[i] = -1 - newBucketOrd;
}
}
mergeBuckets(mergeMap, newBucketOrds.size());
if (deferringCollector != null) {
deferringCollector.mergeBuckets(mergeMap);
}
bucketOrds = newBucketOrds;
}
}
};
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
consumeBucketsAndMaybeBreak((int) bucketOrds.size());
long[] bucketOrdArray = new long[(int) bucketOrds.size()];
for (int i = 0; i < bucketOrds.size(); i++) {
bucketOrdArray[i] = i;
}
runDeferredCollections(bucketOrdArray);
List<InternalAutoDateHistogram.Bucket> buckets = new ArrayList<>((int) bucketOrds.size());
for (long i = 0; i < bucketOrds.size(); i++) {
buckets.add(new InternalAutoDateHistogram.Bucket(bucketOrds.get(i), bucketDocCount(i), formatter, bucketAggregations(i)));
}
// the contract of the histogram aggregation is that shards must return
// buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this));
// value source will be null for unmapped fields
InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundingInfos, roundingIdx,
buildEmptySubAggregations());
return new InternalAutoDateHistogram(name, buckets, targetBuckets, emptyBucketInfo, formatter, pipelineAggregators(), metaData());
}
@Override
public InternalAggregation buildEmptyAggregation() {
InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundingInfos, roundingIdx,
buildEmptySubAggregations());
return new InternalAutoDateHistogram(name, Collections.emptyList(), targetBuckets, emptyBucketInfo, formatter,
pipelineAggregators(), metaData());
}
@Override
public void doClose() {
Releasables.close(bucketOrds);
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.histogram;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public final class AutoDateHistogramAggregatorFactory
extends ValuesSourceAggregatorFactory<ValuesSource.Numeric, AutoDateHistogramAggregatorFactory> {
private final int numBuckets;
private RoundingInfo[] roundingInfos;
public AutoDateHistogramAggregatorFactory(String name, ValuesSourceConfig<Numeric> config, int numBuckets, RoundingInfo[] roundingInfos,
SearchContext context, AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metaData) throws IOException {
super(name, config, context, parent, subFactoriesBuilder, metaData);
this.numBuckets = numBuckets;
this.roundingInfos = roundingInfos;
}
@Override
protected Aggregator doCreateInternal(ValuesSource.Numeric valuesSource, Aggregator parent, boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, context, parent);
}
return createAggregator(valuesSource, parent, pipelineAggregators, metaData);
}
private Aggregator createAggregator(ValuesSource.Numeric valuesSource, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new AutoDateHistogramAggregator(name, factories, numBuckets, roundingInfos, valuesSource, config.format(), context, parent,
pipelineAggregators,
metaData);
}
@Override
protected Aggregator createUnmapped(Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
return createAggregator(null, parent, pipelineAggregators, metaData);
}
}

View File

@ -28,13 +28,13 @@ import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;

View File

@ -0,0 +1,601 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.histogram;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
/**
* Implementation of {@link Histogram}.
*/
public final class InternalAutoDateHistogram extends
InternalMultiBucketAggregation<InternalAutoDateHistogram, InternalAutoDateHistogram.Bucket> implements Histogram, HistogramFactory {
public static class Bucket extends InternalMultiBucketAggregation.InternalBucket implements Histogram.Bucket, KeyComparable<Bucket> {
final long key;
final long docCount;
final InternalAggregations aggregations;
protected final transient DocValueFormat format;
public Bucket(long key, long docCount, DocValueFormat format,
InternalAggregations aggregations) {
this.format = format;
this.key = key;
this.docCount = docCount;
this.aggregations = aggregations;
}
/**
* Read from a stream.
*/
public Bucket(StreamInput in, DocValueFormat format) throws IOException {
this.format = format;
key = in.readLong();
docCount = in.readVLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != InternalAutoDateHistogram.Bucket.class) {
return false;
}
InternalAutoDateHistogram.Bucket that = (InternalAutoDateHistogram.Bucket) obj;
// No need to take the keyed and format parameters into account,
// they are already stored and tested on the InternalDateHistogram object
return key == that.key
&& docCount == that.docCount
&& Objects.equals(aggregations, that.aggregations);
}
@Override
public int hashCode() {
return Objects.hash(getClass(), key, docCount, aggregations);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(key);
out.writeVLong(docCount);
aggregations.writeTo(out);
}
@Override
public String getKeyAsString() {
return format.format(key).toString();
}
@Override
public Object getKey() {
return new DateTime(key, DateTimeZone.UTC);
}
@Override
public long getDocCount() {
return docCount;
}
@Override
public Aggregations getAggregations() {
return aggregations;
}
Bucket reduce(List<Bucket> buckets, Rounding rounding, ReduceContext context) {
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
long docCount = 0;
for (Bucket bucket : buckets) {
docCount += bucket.docCount;
aggregations.add((InternalAggregations) bucket.getAggregations());
}
InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return new InternalAutoDateHistogram.Bucket(rounding.round(key), docCount, format, aggs);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
String keyAsString = format.format(key).toString();
builder.startObject();
if (format != DocValueFormat.RAW) {
builder.field(CommonFields.KEY_AS_STRING.getPreferredName(), keyAsString);
}
builder.field(CommonFields.KEY.getPreferredName(), key);
builder.field(CommonFields.DOC_COUNT.getPreferredName(), docCount);
aggregations.toXContentInternal(builder, params);
builder.endObject();
return builder;
}
@Override
public int compareKey(Bucket other) {
return Long.compare(key, other.key);
}
public DocValueFormat getFormatter() {
return format;
}
}
static class BucketInfo {
final RoundingInfo[] roundingInfos;
final int roundingIdx;
final InternalAggregations emptySubAggregations;
BucketInfo(RoundingInfo[] roundings, int roundingIdx, InternalAggregations subAggregations) {
this.roundingInfos = roundings;
this.roundingIdx = roundingIdx;
this.emptySubAggregations = subAggregations;
}
BucketInfo(StreamInput in) throws IOException {
int size = in.readVInt();
roundingInfos = new RoundingInfo[size];
for (int i = 0; i < size; i++) {
roundingInfos[i] = new RoundingInfo(in);
}
roundingIdx = in.readVInt();
emptySubAggregations = InternalAggregations.readAggregations(in);
}
void writeTo(StreamOutput out) throws IOException {
out.writeVInt(roundingInfos.length);
for (RoundingInfo roundingInfo : roundingInfos) {
roundingInfo.writeTo(out);
}
out.writeVInt(roundingIdx);
emptySubAggregations.writeTo(out);
}
@Override
public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
BucketInfo that = (BucketInfo) obj;
return Objects.deepEquals(roundingInfos, that.roundingInfos)
&& Objects.equals(roundingIdx, that.roundingIdx)
&& Objects.equals(emptySubAggregations, that.emptySubAggregations);
}
@Override
public int hashCode() {
return Objects.hash(getClass(), Arrays.hashCode(roundingInfos), roundingIdx, emptySubAggregations);
}
}
private final List<Bucket> buckets;
private final DocValueFormat format;
private final BucketInfo bucketInfo;
private final int targetBuckets;
InternalAutoDateHistogram(String name, List<Bucket> buckets, int targetBuckets, BucketInfo emptyBucketInfo, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
this.buckets = buckets;
this.bucketInfo = emptyBucketInfo;
this.format = formatter;
this.targetBuckets = targetBuckets;
}
/**
* Stream from a stream.
*/
public InternalAutoDateHistogram(StreamInput in) throws IOException {
super(in);
bucketInfo = new BucketInfo(in);
format = in.readNamedWriteable(DocValueFormat.class);
buckets = in.readList(stream -> new Bucket(stream, format));
this.targetBuckets = in.readVInt();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
bucketInfo.writeTo(out);
out.writeNamedWriteable(format);
out.writeList(buckets);
out.writeVInt(targetBuckets);
}
@Override
public String getWriteableName() {
return AutoDateHistogramAggregationBuilder.NAME;
}
@Override
public List<InternalAutoDateHistogram.Bucket> getBuckets() {
return Collections.unmodifiableList(buckets);
}
DocValueFormat getFormatter() {
return format;
}
public int getTargetBuckets() {
return targetBuckets;
}
public BucketInfo getBucketInfo() {
return bucketInfo;
}
@Override
public InternalAutoDateHistogram create(List<Bucket> buckets) {
return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators(), metaData);
}
@Override
public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) {
return new Bucket(prototype.key, prototype.docCount, prototype.format, aggregations);
}
private static class IteratorAndCurrent {
private final Iterator<Bucket> iterator;
private Bucket current;
IteratorAndCurrent(Iterator<Bucket> iterator) {
this.iterator = iterator;
current = iterator.next();
}
}
/**
* This method works almost exactly the same as
* InternalDateHistogram#reduceBuckets(List, ReduceContext), the different
* here is that we need to round all the keys we see using the highest level
* rounding returned across all the shards so the resolution of the buckets
* is the same and they can be reduced together.
*/
private BucketReduceResult reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
// First we need to find the highest level rounding used across all the
// shards
int reduceRoundingIdx = 0;
for (InternalAggregation aggregation : aggregations) {
int aggRoundingIdx = ((InternalAutoDateHistogram) aggregation).bucketInfo.roundingIdx;
if (aggRoundingIdx > reduceRoundingIdx) {
reduceRoundingIdx = aggRoundingIdx;
}
}
// This rounding will be used to reduce all the buckets
RoundingInfo reduceRoundingInfo = bucketInfo.roundingInfos[reduceRoundingIdx];
Rounding reduceRounding = reduceRoundingInfo.rounding;
final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
return a.current.key < b.current.key;
}
};
for (InternalAggregation aggregation : aggregations) {
InternalAutoDateHistogram histogram = (InternalAutoDateHistogram) aggregation;
if (histogram.buckets.isEmpty() == false) {
pq.add(new IteratorAndCurrent(histogram.buckets.iterator()));
}
}
List<Bucket> reducedBuckets = new ArrayList<>();
if (pq.size() > 0) {
// list of buckets coming from different shards that have the same key
List<Bucket> currentBuckets = new ArrayList<>();
double key = reduceRounding.round(pq.top().current.key);
do {
final IteratorAndCurrent top = pq.top();
if (reduceRounding.round(top.current.key) != key) {
// the key changes, reduce what we already buffered and reset the buffer for current buckets
final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceRounding, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reduced);
currentBuckets.clear();
key = reduceRounding.round(top.current.key);
}
currentBuckets.add(top.current);
if (top.iterator.hasNext()) {
final Bucket next = top.iterator.next();
assert next.key > top.current.key : "shards must return data sorted by key";
top.current = next;
pq.updateTop();
} else {
pq.pop();
}
} while (pq.size() > 0);
if (currentBuckets.isEmpty() == false) {
final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceRounding, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reduced);
}
}
return mergeBucketsIfNeeded(reducedBuckets, reduceRoundingIdx, reduceRoundingInfo, reduceContext);
}
private BucketReduceResult mergeBucketsIfNeeded(List<Bucket> reducedBuckets, int reduceRoundingIdx, RoundingInfo reduceRoundingInfo,
ReduceContext reduceContext) {
while (reducedBuckets.size() > (targetBuckets * reduceRoundingInfo.getMaximumInnerInterval())
&& reduceRoundingIdx < bucketInfo.roundingInfos.length - 1) {
reduceRoundingIdx++;
reduceRoundingInfo = bucketInfo.roundingInfos[reduceRoundingIdx];
reducedBuckets = mergeBuckets(reducedBuckets, reduceRoundingInfo.rounding, reduceContext);
}
return new BucketReduceResult(reducedBuckets, reduceRoundingInfo, reduceRoundingIdx);
}
private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding reduceRounding, ReduceContext reduceContext) {
List<Bucket> mergedBuckets = new ArrayList<>();
List<Bucket> sameKeyedBuckets = new ArrayList<>();
double key = Double.NaN;
for (Bucket bucket : reducedBuckets) {
long roundedBucketKey = reduceRounding.round(bucket.key);
if (Double.isNaN(key)) {
key = roundedBucketKey;
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
} else if (roundedBucketKey == key) {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
} else {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(sameKeyedBuckets.get(0).reduce(sameKeyedBuckets, reduceRounding, reduceContext));
sameKeyedBuckets.clear();
key = roundedBucketKey;
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
}
}
if (sameKeyedBuckets.isEmpty() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(sameKeyedBuckets.get(0).reduce(sameKeyedBuckets, reduceRounding, reduceContext));
}
reducedBuckets = mergedBuckets;
return reducedBuckets;
}
private static class BucketReduceResult {
List<Bucket> buckets;
RoundingInfo roundingInfo;
int roundingIdx;
BucketReduceResult(List<Bucket> buckets, RoundingInfo roundingInfo, int roundingIdx) {
this.buckets = buckets;
this.roundingInfo = roundingInfo;
this.roundingIdx = roundingIdx;
}
}
private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, ReduceContext reduceContext) {
List<Bucket> list = currentResult.buckets;
if (list.isEmpty()) {
return currentResult;
}
int roundingIdx = getAppropriateRounding(list.get(0).key, list.get(list.size() - 1).key, currentResult.roundingIdx,
bucketInfo.roundingInfos);
RoundingInfo roundingInfo = bucketInfo.roundingInfos[roundingIdx];
Rounding rounding = roundingInfo.rounding;
// merge buckets using the new rounding
list = mergeBuckets(list, rounding, reduceContext);
Bucket lastBucket = null;
ListIterator<Bucket> iter = list.listIterator();
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(Collections.singletonList(bucketInfo.emptySubAggregations),
reduceContext);
// Add the empty buckets within the data,
// e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6
while (iter.hasNext()) {
Bucket nextBucket = list.get(iter.nextIndex());
if (lastBucket != null) {
long key = rounding.nextRoundingValue(lastBucket.key);
while (key < nextBucket.key) {
reduceContext.consumeBucketsAndMaybeBreak(1);
iter.add(new InternalAutoDateHistogram.Bucket(key, 0, format, reducedEmptySubAggs));
key = rounding.nextRoundingValue(key);
}
assert key == nextBucket.key : "key: " + key + ", nextBucket.key: " + nextBucket.key;
}
lastBucket = iter.next();
}
return new BucketReduceResult(list, roundingInfo, roundingIdx);
}
private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx, RoundingInfo[] roundings) {
if (roundingIdx == roundings.length - 1) {
return roundingIdx;
}
int currentRoundingIdx = roundingIdx;
// Getting the accurate number of required buckets can be slow for large
// ranges at low roundings so get a rough estimate of the rounding first
// so we are at most 1 away from the correct rounding and then get the
// accurate rounding value
for (int i = currentRoundingIdx + 1; i < roundings.length; i++) {
long dataDuration = maxKey - minKey;
long roughEstimateRequiredBuckets = dataDuration / roundings[i].getRoughEstimateDurationMillis();
if (roughEstimateRequiredBuckets < targetBuckets * roundings[i].getMaximumInnerInterval()) {
currentRoundingIdx = i - 1;
break;
} else if (i == roundingIdx - 1) {
currentRoundingIdx = i;
break;
}
}
int requiredBuckets = 0;
do {
Rounding currentRounding = roundings[currentRoundingIdx].rounding;
long currentKey = minKey;
requiredBuckets = 0;
while (currentKey < maxKey) {
requiredBuckets++;
currentKey = currentRounding.nextRoundingValue(currentKey);
}
currentRoundingIdx++;
} while (requiredBuckets > (targetBuckets * roundings[roundingIdx].getMaximumInnerInterval())
&& currentRoundingIdx < roundings.length);
// The loop will increase past the correct rounding index here so we
// need to subtract one to get the rounding index we need
return currentRoundingIdx - 1;
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext);
if (reduceContext.isFinalReduce()) {
// adding empty buckets if needed
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, reduceContext);
// Adding empty buckets may have tipped us over the target so merge the buckets again if needed
reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx,
reducedBucketsResult.roundingInfo, reduceContext);
// Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext);
}
BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx,
this.bucketInfo.emptySubAggregations);
return new InternalAutoDateHistogram(getName(), reducedBucketsResult.buckets, targetBuckets, bucketInfo, format,
pipelineAggregators(), getMetaData());
}
private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult reducedBucketsResult, ReduceContext reduceContext) {
List<Bucket> buckets = reducedBucketsResult.buckets;
RoundingInfo roundingInfo = reducedBucketsResult.roundingInfo;
int roundingIdx = reducedBucketsResult.roundingIdx;
if (buckets.size() > targetBuckets) {
for (int interval : roundingInfo.innerIntervals) {
int resultingBuckets = buckets.size() / interval;
if (resultingBuckets <= targetBuckets) {
return mergeConsecutiveBuckets(buckets, interval, roundingIdx, roundingInfo, reduceContext);
}
}
}
return reducedBucketsResult;
}
private BucketReduceResult mergeConsecutiveBuckets(List<Bucket> reducedBuckets, int mergeInterval, int roundingIdx,
RoundingInfo roundingInfo, ReduceContext reduceContext) {
List<Bucket> mergedBuckets = new ArrayList<>();
List<Bucket> sameKeyedBuckets = new ArrayList<>();
double key = roundingInfo.rounding.round(reducedBuckets.get(0).key);
for (int i = 0; i < reducedBuckets.size(); i++) {
Bucket bucket = reducedBuckets.get(i);
if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(sameKeyedBuckets.get(0).reduce(sameKeyedBuckets, roundingInfo.rounding, reduceContext));
sameKeyedBuckets.clear();
key = roundingInfo.rounding.round(bucket.key);
}
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
}
if (sameKeyedBuckets.isEmpty() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(sameKeyedBuckets.get(0).reduce(sameKeyedBuckets, roundingInfo.rounding, reduceContext));
}
return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx);
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.startArray(CommonFields.BUCKETS.getPreferredName());
for (Bucket bucket : buckets) {
bucket.toXContent(builder, params);
}
builder.endArray();
return builder;
}
// HistogramFactory method impls
@Override
public Number getKey(MultiBucketsAggregation.Bucket bucket) {
return ((Bucket) bucket).key;
}
@Override
public Number nextKey(Number key) {
return bucketInfo.roundingInfos[bucketInfo.roundingIdx].rounding.nextRoundingValue(key.longValue());
}
@Override
public InternalAggregation createAggregation(List<MultiBucketsAggregation.Bucket> buckets) {
// convert buckets to the right type
List<Bucket> buckets2 = new ArrayList<>(buckets.size());
for (Object b : buckets) {
buckets2.add((Bucket) b);
}
buckets2 = Collections.unmodifiableList(buckets2);
return new InternalAutoDateHistogram(name, buckets2, targetBuckets, bucketInfo, format, pipelineAggregators(), getMetaData());
}
@Override
public Bucket createBucket(Number key, long docCount, InternalAggregations aggregations) {
return new Bucket(key.longValue(), docCount, format, aggregations);
}
@Override
protected boolean doEquals(Object obj) {
InternalAutoDateHistogram that = (InternalAutoDateHistogram) obj;
return Objects.equals(buckets, that.buckets)
&& Objects.equals(format, that.format)
&& Objects.equals(bucketInfo, that.bucketInfo);
}
@Override
protected int doHashCode() {
return Objects.hash(buckets, format, bucketInfo);
}
}

View File

@ -424,7 +424,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
key = nextKey(key).longValue();
}
assert key == nextBucket.key;
assert key == nextBucket.key : "key: " + key + ", nextBucket.key: " + nextBucket.key;
}
lastBucket = iter.next();
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.histogram;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.List;
public class ParsedAutoDateHistogram extends ParsedMultiBucketAggregation<ParsedAutoDateHistogram.ParsedBucket> implements Histogram {
@Override
public String getType() {
return AutoDateHistogramAggregationBuilder.NAME;
}
@Override
public List<? extends Histogram.Bucket> getBuckets() {
return buckets;
}
private static ObjectParser<ParsedAutoDateHistogram, Void> PARSER =
new ObjectParser<>(ParsedAutoDateHistogram.class.getSimpleName(), true, ParsedAutoDateHistogram::new);
static {
declareMultiBucketAggregationFields(PARSER,
parser -> ParsedBucket.fromXContent(parser, false),
parser -> ParsedBucket.fromXContent(parser, true));
}
public static ParsedAutoDateHistogram fromXContent(XContentParser parser, String name) throws IOException {
ParsedAutoDateHistogram aggregation = PARSER.parse(parser, null);
aggregation.setName(name);
return aggregation;
}
public static class ParsedBucket extends ParsedMultiBucketAggregation.ParsedBucket implements Histogram.Bucket {
private Long key;
@Override
public Object getKey() {
if (key != null) {
return new DateTime(key, DateTimeZone.UTC);
}
return null;
}
@Override
public String getKeyAsString() {
String keyAsString = super.getKeyAsString();
if (keyAsString != null) {
return keyAsString;
}
if (key != null) {
return Long.toString(key);
}
return null;
}
@Override
protected XContentBuilder keyToXContent(XContentBuilder builder) throws IOException {
return builder.field(CommonFields.KEY.getPreferredName(), key);
}
static ParsedBucket fromXContent(XContentParser parser, boolean keyed) throws IOException {
return parseXContent(parser, keyed, ParsedBucket::new, (p, bucket) -> bucket.key = p.longValue());
}
}
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.search.aggregations.bucket.filter.InternalFilterTests;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFiltersTests;
import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGridTests;
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobalTests;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogramTests;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogramTests;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogramTests;
import org.elasticsearch.search.aggregations.bucket.missing.InternalMissingTests;
@ -125,6 +126,7 @@ public class AggregationsTests extends ESTestCase {
aggsTests.add(new InternalGeoCentroidTests());
aggsTests.add(new InternalHistogramTests());
aggsTests.add(new InternalDateHistogramTests());
aggsTests.add(new InternalAutoDateHistogramTests());
aggsTests.add(new LongTermsTests());
aggsTests.add(new DoubleTermsTests());
aggsTests.add(new StringTermsTests());

View File

@ -0,0 +1,44 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.search.aggregations.BaseAggregationTestCase;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
public class AutoDateHistogramTests extends BaseAggregationTestCase<AutoDateHistogramAggregationBuilder> {
@Override
protected AutoDateHistogramAggregationBuilder createTestAggregatorBuilder() {
AutoDateHistogramAggregationBuilder builder = new AutoDateHistogramAggregationBuilder(randomAlphaOfLengthBetween(1, 10));
builder.field(INT_FIELD_NAME);
builder.setNumBuckets(randomIntBetween(1, 100000));
if (randomBoolean()) {
builder.format("###.##");
}
if (randomBoolean()) {
builder.missing(randomIntBetween(0, 10));
}
if (randomBoolean()) {
builder.timeZone(randomDateTimeZone());
}
return builder;
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.histogram;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.rounding.DateTimeUnit;
import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram.BucketInfo;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueHours;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregationTestCase<InternalAutoDateHistogram> {
private DocValueFormat format;
private RoundingInfo[] roundingInfos;
@Override
public void setUp() throws Exception {
super.setUp();
format = randomNumericDocValueFormat();
roundingInfos = new RoundingInfo[6];
roundingInfos[0] = new RoundingInfo(Rounding.builder(DateTimeUnit.SECOND_OF_MINUTE).build(), 1, 5, 10, 30);
roundingInfos[1] = new RoundingInfo(Rounding.builder(DateTimeUnit.MINUTES_OF_HOUR).build(), 1, 5, 10, 30);
roundingInfos[2] = new RoundingInfo(Rounding.builder(DateTimeUnit.HOUR_OF_DAY).build(), 1, 3, 12);
roundingInfos[3] = new RoundingInfo(Rounding.builder(DateTimeUnit.DAY_OF_MONTH).build(), 1, 7);
roundingInfos[4] = new RoundingInfo(Rounding.builder(DateTimeUnit.MONTH_OF_YEAR).build(), 1, 3);
roundingInfos[5] = new RoundingInfo(Rounding.builder(DateTimeUnit.YEAR_OF_CENTURY).build(), 1, 10, 20, 50, 100);
}
@Override
protected InternalAutoDateHistogram createTestInstance(String name,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData,
InternalAggregations aggregations) {
int nbBuckets = randomNumberOfBuckets();
int targetBuckets = randomIntBetween(1, nbBuckets * 2 + 1);
List<InternalAutoDateHistogram.Bucket> buckets = new ArrayList<>(nbBuckets);
long startingDate = System.currentTimeMillis();
long interval = randomIntBetween(1, 3);
long intervalMillis = randomFrom(timeValueSeconds(interval), timeValueMinutes(interval), timeValueHours(interval)).getMillis();
for (int i = 0; i < nbBuckets; i++) {
long key = startingDate + (intervalMillis * i);
buckets.add(i, new InternalAutoDateHistogram.Bucket(key, randomIntBetween(1, 100), format, aggregations));
}
InternalAggregations subAggregations = new InternalAggregations(Collections.emptyList());
BucketInfo bucketInfo = new BucketInfo(roundingInfos, randomIntBetween(0, roundingInfos.length - 1), subAggregations);
return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators, metaData);
}
@Override
protected void assertReduced(InternalAutoDateHistogram reduced, List<InternalAutoDateHistogram> inputs) {
int roundingIdx = 0;
for (InternalAutoDateHistogram histogram : inputs) {
if (histogram.getBucketInfo().roundingIdx > roundingIdx) {
roundingIdx = histogram.getBucketInfo().roundingIdx;
}
}
Map<Long, Long> expectedCounts = new TreeMap<>();
for (Histogram histogram : inputs) {
for (Histogram.Bucket bucket : histogram.getBuckets()) {
expectedCounts.compute(roundingInfos[roundingIdx].rounding.round(((DateTime) bucket.getKey()).getMillis()),
(key, oldValue) -> (oldValue == null ? 0 : oldValue) + bucket.getDocCount());
}
}
Map<Long, Long> actualCounts = new TreeMap<>();
for (Histogram.Bucket bucket : reduced.getBuckets()) {
actualCounts.compute(((DateTime) bucket.getKey()).getMillis(),
(key, oldValue) -> (oldValue == null ? 0 : oldValue) + bucket.getDocCount());
}
assertEquals(expectedCounts, actualCounts);
}
@Override
protected Writeable.Reader<InternalAutoDateHistogram> instanceReader() {
return InternalAutoDateHistogram::new;
}
@Override
protected Class<? extends ParsedMultiBucketAggregation> implementationClass() {
return ParsedAutoDateHistogram.class;
}
@Override
protected InternalAutoDateHistogram mutateInstance(InternalAutoDateHistogram instance) {
String name = instance.getName();
List<InternalAutoDateHistogram.Bucket> buckets = instance.getBuckets();
int targetBuckets = instance.getTargetBuckets();
BucketInfo bucketInfo = instance.getBucketInfo();
List<PipelineAggregator> pipelineAggregators = instance.pipelineAggregators();
Map<String, Object> metaData = instance.getMetaData();
switch (between(0, 3)) {
case 0:
name += randomAlphaOfLength(5);
break;
case 1:
buckets = new ArrayList<>(buckets);
buckets.add(new InternalAutoDateHistogram.Bucket(randomNonNegativeLong(), randomIntBetween(1, 100), format,
InternalAggregations.EMPTY));
break;
case 2:
int roundingIdx = bucketInfo.roundingIdx == bucketInfo.roundingInfos.length - 1 ? 0 : bucketInfo.roundingIdx + 1;
bucketInfo = new BucketInfo(bucketInfo.roundingInfos, roundingIdx, bucketInfo.emptySubAggregations);
break;
case 3:
if (metaData == null) {
metaData = new HashMap<>(1);
} else {
metaData = new HashMap<>(instance.getMetaData());
}
metaData.put(randomAlphaOfLength(15), randomInt());
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators, metaData);
}
}

View File

@ -87,7 +87,6 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS;
/**
* Base class for testing {@link Aggregator} implementations.
@ -229,7 +228,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
});
when(searchContext.bitsetFilterCache()).thenReturn(new BitsetFilterCache(indexSettings, mock(Listener.class)));
doAnswer(invocation -> {
/* Store the releasables so we can release them at the end of the test case. This is important because aggregations don't
/* Store the release-ables so we can release them at the end of the test case. This is important because aggregations don't
* close their sub-aggregations. This is fairly similar to what the production code does. */
releasables.add((Releasable) invocation.getArguments()[0]);
return null;

View File

@ -53,8 +53,10 @@ import org.elasticsearch.search.aggregations.bucket.geogrid.GeoGridAggregationBu
import org.elasticsearch.search.aggregations.bucket.geogrid.ParsedGeoHashGrid;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.global.ParsedGlobal;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedAutoDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedHistogram;
import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder;
@ -181,6 +183,7 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
map.put(GeoCentroidAggregationBuilder.NAME, (p, c) -> ParsedGeoCentroid.fromXContent(p, (String) c));
map.put(HistogramAggregationBuilder.NAME, (p, c) -> ParsedHistogram.fromXContent(p, (String) c));
map.put(DateHistogramAggregationBuilder.NAME, (p, c) -> ParsedDateHistogram.fromXContent(p, (String) c));
map.put(AutoDateHistogramAggregationBuilder.NAME, (p, c) -> ParsedAutoDateHistogram.fromXContent(p, (String) c));
map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c));
map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c));
map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c));

View File

@ -149,7 +149,8 @@ public abstract class InternalMultiBucketAggregationTestCase<T extends InternalA
protected void assertMultiBucketsAggregation(MultiBucketsAggregation expected, MultiBucketsAggregation actual, boolean checkOrder) {
Class<? extends ParsedMultiBucketAggregation> parsedClass = implementationClass();
assertNotNull("Parsed aggregation class must not be null", parsedClass);
assertTrue(parsedClass.isInstance(actual));
assertTrue("Unexpected parsed class, expected instance of: " + actual + ", but was: " + parsedClass,
parsedClass.isInstance(actual));
assertTrue(expected instanceof InternalAggregation);
assertEquals(expected.getName(), actual.getName());