mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-24 17:09:48 +00:00
This builds an `auto_date_histogram` aggregator that natively aggregates from many buckets and uses it when the `auto_date_histogram` used to use `asMultiBucketAggregator` which should save a significant amount of memory in those cases. In particular, this happens when `auto_date_histogram` is a sub-aggregator of a multi-bucketing aggregator like `terms` or `histogram` or `filters`. For the most part we preserve the original implementation when `auto_date_histogram` only collects from a single bucket. It isn't possible to "just port the aggregator" without taking a pretty significant performance hit because we used to rewrite all of the buckets every time we switched to a coarser and coarser rounding configuration. Without some major surgery to how to delay sub-aggs we'd end up rewriting the delay list zillions of time if there are many buckets. The multi-bucket version of the aggregator has a "budget" of "wasted" buckets and only rewrites all of the buckets when we exceed that budget. Now that we don't rebucket every time we increase the rounding we can no longer get an accurate count of the number of buckets! So instead the aggregator uses an estimate of the number of buckets to trigger switching to a coarser rounding. This estimate is likely to be *terrible* when buckets are far apart compared to the rounding. So it also uses the difference between the first and last bucket to trigger switching to a coarser rounding. Which covers for the shortcomings of the bucket estimation technique pretty well. It also causes the aggregator to emit fewer buckets in cases where they'd be reduced together on the coordinating node. This is wonderful! But probably fairly rare. All of that does buy us some speed improvements when the aggregator is a child of multi-bucket aggregator: Without metrics or time zone: 25% faster With metrics: 15% faster With time zone: 22% faster Relates to #56487
This commit is contained in:
parent
69338b03d7
commit
ab2c6d9696
@ -4,6 +4,7 @@ setup:
|
||||
index: test
|
||||
body:
|
||||
settings:
|
||||
number_of_shards: 1
|
||||
number_of_replicas: 0
|
||||
mappings:
|
||||
properties:
|
||||
@ -76,3 +77,25 @@ setup:
|
||||
- match: { aggregations.histo.buckets.1.doc_count: 2 }
|
||||
- match: { aggregations.histo.buckets.1.v.value: 7 }
|
||||
- match: { aggregations.histo_avg_v.value: 5 }
|
||||
|
||||
---
|
||||
"profile at top level":
|
||||
- skip:
|
||||
version: " - 7.99.99"
|
||||
reason: Debug information added in 8.0.0 (to be backported to 7.9.0)
|
||||
|
||||
- do:
|
||||
search:
|
||||
body:
|
||||
profile: true
|
||||
size: 0
|
||||
aggs:
|
||||
histo:
|
||||
auto_date_histogram:
|
||||
field: date
|
||||
buckets: 2
|
||||
|
||||
- match: { hits.total.value: 4 }
|
||||
- length: { aggregations.histo.buckets: 2 }
|
||||
- match: { profile.shards.0.aggregations.0.type: AutoDateHistogramAggregator.FromSingle }
|
||||
- match: { profile.shards.0.aggregations.0.debug.surviving_buckets: 4 }
|
||||
|
@ -30,6 +30,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.LeafBucketCollector;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
@ -329,7 +330,7 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
||||
* @param bucketOrds hash of values to the bucket ordinal
|
||||
*/
|
||||
protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(long[] owningBucketOrds, LongKeyedBucketOrds bucketOrds,
|
||||
BucketBuilderForVariable<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
|
||||
BucketBuilderForVariable<B> bucketBuilder, ResultBuilderForVariable<B> resultBuilder) throws IOException {
|
||||
long totalOrdsToCollect = 0;
|
||||
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
|
||||
totalOrdsToCollect += bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
|
||||
@ -360,7 +361,7 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
||||
}
|
||||
buckets.add(bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults[b++]));
|
||||
}
|
||||
results[ordIdx] = resultBuilder.apply(buckets);
|
||||
results[ordIdx] = resultBuilder.build(owningBucketOrds[ordIdx], buckets);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
@ -368,6 +369,10 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
||||
protected interface BucketBuilderForVariable<B> {
|
||||
B build(long bucketValue, int docCount, InternalAggregations subAggregationResults);
|
||||
}
|
||||
@FunctionalInterface
|
||||
protected interface ResultBuilderForVariable<B> {
|
||||
InternalAggregation build(long owninigBucketOrd, List<B> buckets);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to build empty aggregations of the sub aggregators.
|
||||
@ -407,4 +412,15 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
||||
"Either drop the key (a la \"" + name() + "\") or change it to \"doc_count\" (a la \"" + name() +
|
||||
".doc_count\") or \"key\".");
|
||||
}
|
||||
|
||||
public static boolean descendsFromGlobalAggregator(Aggregator parent) {
|
||||
while (parent != null) {
|
||||
if (parent.getClass() == GlobalAggregator.class) {
|
||||
return true;
|
||||
}
|
||||
parent = parent.parent();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -23,7 +23,6 @@ import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.aggregations.BucketCollector;
|
||||
import org.elasticsearch.search.aggregations.MultiBucketCollector;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -70,16 +69,6 @@ public abstract class DeferableBucketAggregator extends BucketsAggregator {
|
||||
collectableSubAggregators = MultiBucketCollector.wrap(collectors);
|
||||
}
|
||||
|
||||
public static boolean descendsFromGlobalAggregator(Aggregator parent) {
|
||||
while (parent != null) {
|
||||
if (parent.getClass() == GlobalAggregator.class) {
|
||||
return true;
|
||||
}
|
||||
parent = parent.parent();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public DeferringBucketCollector getDeferringCollector() {
|
||||
// Default impl is a collector that selects the best buckets
|
||||
// but an alternative defer policy may be based on best docs.
|
||||
|
@ -90,7 +90,7 @@ public abstract class AbstractHistogramAggregator extends BucketsAggregator {
|
||||
double roundKey = Double.longBitsToDouble(bucketValue);
|
||||
double key = roundKey * interval + offset;
|
||||
return new InternalHistogram.Bucket(key, docCount, keyed, formatter, subAggregationResults);
|
||||
}, buckets -> {
|
||||
}, (owningBucketOrd, buckets) -> {
|
||||
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
|
||||
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
|
||||
|
||||
|
@ -302,6 +302,10 @@ public class AutoDateHistogramAggregationBuilder
|
||||
return roughEstimateDurationMillis;
|
||||
}
|
||||
|
||||
public long getMaximumRoughEstimateDurationMillis() {
|
||||
return getRoughEstimateDurationMillis() * getMaximumInnerInterval();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(rounding, Arrays.hashCode(innerIntervals), dateTimeUnit);
|
||||
|
@ -23,8 +23,11 @@ import org.apache.lucene.index.SortedNumericDocValues;
|
||||
import org.apache.lucene.search.ScoreMode;
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.elasticsearch.common.Rounding;
|
||||
import org.elasticsearch.common.Rounding.Prepared;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.util.LongHash;
|
||||
import org.elasticsearch.common.util.ByteArray;
|
||||
import org.elasticsearch.common.util.IntArray;
|
||||
import org.elasticsearch.common.util.LongArray;
|
||||
import org.elasticsearch.search.DocValueFormat;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
@ -36,6 +39,7 @@ import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator;
|
||||
import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
|
||||
import org.elasticsearch.search.aggregations.bucket.MergingBucketsDeferringCollector;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
@ -43,30 +47,76 @@ import org.elasticsearch.search.internal.SearchContext;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.LongToIntFunction;
|
||||
|
||||
/**
|
||||
* An aggregator for date values that attempts to return a specific number of
|
||||
* buckets, reconfiguring how it rounds dates to buckets on the fly as new
|
||||
* data arrives.
|
||||
* <p>
|
||||
* This class is abstract because there is a simple implementation for when the
|
||||
* aggregator only collects from a single bucket and a more complex
|
||||
* implementation when it doesn't. This ain't great from a test coverage
|
||||
* standpoint but the simpler implementation is between 7% and 15% faster
|
||||
* when you can use it. This is an important aggregation and we need that
|
||||
* performance.
|
||||
*/
|
||||
class AutoDateHistogramAggregator extends DeferableBucketAggregator {
|
||||
abstract class AutoDateHistogramAggregator extends DeferableBucketAggregator {
|
||||
static AutoDateHistogramAggregator build(
|
||||
String name,
|
||||
AggregatorFactories factories,
|
||||
int targetBuckets,
|
||||
RoundingInfo[] roundingInfos,
|
||||
Function<Rounding, Rounding.Prepared> roundingPreparer,
|
||||
ValuesSourceConfig valuesSourceConfig,
|
||||
SearchContext aggregationContext,
|
||||
Aggregator parent,
|
||||
boolean collectsFromSingleBucket,
|
||||
Map<String, Object> metadata
|
||||
) throws IOException {
|
||||
return collectsFromSingleBucket
|
||||
? new FromSingle(
|
||||
name,
|
||||
factories,
|
||||
targetBuckets,
|
||||
roundingInfos,
|
||||
roundingPreparer,
|
||||
valuesSourceConfig,
|
||||
aggregationContext,
|
||||
parent,
|
||||
metadata
|
||||
)
|
||||
: new FromMany(
|
||||
name,
|
||||
factories,
|
||||
targetBuckets,
|
||||
roundingInfos,
|
||||
roundingPreparer,
|
||||
valuesSourceConfig,
|
||||
aggregationContext,
|
||||
parent,
|
||||
metadata
|
||||
);
|
||||
}
|
||||
|
||||
private final ValuesSource.Numeric valuesSource;
|
||||
private final DocValueFormat formatter;
|
||||
private final RoundingInfo[] roundingInfos;
|
||||
private final Function<Rounding, Rounding.Prepared> roundingPreparer;
|
||||
private int roundingIdx = 0;
|
||||
private Rounding.Prepared preparedRounding;
|
||||
|
||||
private LongHash bucketOrds;
|
||||
private int targetBuckets;
|
||||
/**
|
||||
* A reference to the collector so we can
|
||||
* {@link MergingBucketsDeferringCollector#mergeBuckets(long[])}.
|
||||
*/
|
||||
private MergingBucketsDeferringCollector deferringCollector;
|
||||
|
||||
AutoDateHistogramAggregator(
|
||||
protected final RoundingInfo[] roundingInfos;
|
||||
protected final int targetBuckets;
|
||||
|
||||
private AutoDateHistogramAggregator(
|
||||
String name,
|
||||
AggregatorFactories factories,
|
||||
int numBuckets,
|
||||
int targetBuckets,
|
||||
RoundingInfo[] roundingInfos,
|
||||
Function<Rounding, Rounding.Prepared> roundingPreparer,
|
||||
ValuesSourceConfig valuesSourceConfig,
|
||||
@ -76,20 +126,16 @@ class AutoDateHistogramAggregator extends DeferableBucketAggregator {
|
||||
) throws IOException {
|
||||
|
||||
super(name, factories, aggregationContext, parent, metadata);
|
||||
this.targetBuckets = numBuckets;
|
||||
this.targetBuckets = targetBuckets;
|
||||
// TODO: Remove null usage here, by using a different aggregator for create
|
||||
this.valuesSource = valuesSourceConfig.hasValues() ? (ValuesSource.Numeric) valuesSourceConfig.getValuesSource() : null;
|
||||
this.formatter = valuesSourceConfig.format();
|
||||
this.roundingInfos = roundingInfos;
|
||||
this.roundingPreparer = roundingPreparer;
|
||||
preparedRounding = roundingPreparer.apply(roundingInfos[roundingIdx].rounding);
|
||||
|
||||
bucketOrds = new LongHash(1, aggregationContext.bigArrays());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScoreMode scoreMode() {
|
||||
public final ScoreMode scoreMode() {
|
||||
if (valuesSource != null && valuesSource.needsScores()) {
|
||||
return ScoreMode.COMPLETE;
|
||||
}
|
||||
@ -97,29 +143,145 @@ class AutoDateHistogramAggregator extends DeferableBucketAggregator {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldDefer(Aggregator aggregator) {
|
||||
protected final boolean shouldDefer(Aggregator aggregator) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeferringBucketCollector getDeferringCollector() {
|
||||
public final DeferringBucketCollector getDeferringCollector() {
|
||||
deferringCollector = new MergingBucketsDeferringCollector(context, descendsFromGlobalAggregator(parent()));
|
||||
return deferringCollector;
|
||||
}
|
||||
|
||||
protected abstract LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException;
|
||||
|
||||
@Override
|
||||
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
|
||||
final LeafBucketCollector sub) throws IOException {
|
||||
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
|
||||
if (valuesSource == null) {
|
||||
return LeafBucketCollector.NO_OP_COLLECTOR;
|
||||
}
|
||||
final SortedNumericDocValues values = valuesSource.longValues(ctx);
|
||||
return new LeafBucketCollectorBase(sub, values) {
|
||||
@Override
|
||||
public void collect(int doc, long bucket) throws IOException {
|
||||
assert bucket == 0;
|
||||
if (values.advanceExact(doc)) {
|
||||
final int valuesCount = values.docValueCount();
|
||||
return getLeafCollector(valuesSource.longValues(ctx), sub);
|
||||
}
|
||||
|
||||
protected final InternalAggregation[] buildAggregations(
|
||||
LongKeyedBucketOrds bucketOrds,
|
||||
LongToIntFunction roundingIndexFor,
|
||||
long[] owningBucketOrds
|
||||
) throws IOException {
|
||||
return buildAggregationsForVariableBuckets(
|
||||
owningBucketOrds,
|
||||
bucketOrds,
|
||||
(bucketValue, docCount, subAggregationResults) -> new InternalAutoDateHistogram.Bucket(
|
||||
bucketValue,
|
||||
docCount,
|
||||
formatter,
|
||||
subAggregationResults
|
||||
),
|
||||
(owningBucketOrd, buckets) -> {
|
||||
// the contract of the histogram aggregation is that shards must return
|
||||
// buckets ordered by key in ascending order
|
||||
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
|
||||
|
||||
// value source will be null for unmapped fields
|
||||
InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(
|
||||
roundingInfos,
|
||||
roundingIndexFor.applyAsInt(owningBucketOrd),
|
||||
buildEmptySubAggregations()
|
||||
);
|
||||
|
||||
return new InternalAutoDateHistogram(name, buckets, targetBuckets, emptyBucketInfo, formatter, metadata(), 1);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final InternalAggregation buildEmptyAggregation() {
|
||||
InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(
|
||||
roundingInfos,
|
||||
0,
|
||||
buildEmptySubAggregations()
|
||||
);
|
||||
return new InternalAutoDateHistogram(name, Collections.emptyList(), targetBuckets, emptyBucketInfo, formatter, metadata(), 1);
|
||||
}
|
||||
|
||||
protected final Rounding.Prepared prepareRounding(int index) {
|
||||
return roundingPreparer.apply(roundingInfos[index].rounding);
|
||||
}
|
||||
|
||||
protected final void merge(long[] mergeMap, long newNumBuckets) {
|
||||
mergeBuckets(mergeMap, newNumBuckets);
|
||||
if (deferringCollector != null) {
|
||||
deferringCollector.mergeBuckets(mergeMap);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initially it uses the most fine grained rounding configuration possible
|
||||
* but as more data arrives it rebuckets the data until it "fits" in the
|
||||
* aggregation rounding. Similar to {@link FromMany} this checks both the
|
||||
* bucket count and range of the aggregation, but unlike
|
||||
* {@linkplain FromMany} it keeps an accurate count of the buckets and it
|
||||
* doesn't delay rebucketing.
|
||||
* <p>
|
||||
* Rebucketing is roughly {@code O(number_of_hits_collected_so_far)} but we
|
||||
* rebucket roughly {@code O(log number_of_hits_collected_so_far)} because
|
||||
* the "shape" of the roundings is <strong>roughly</strong>
|
||||
* logarithmically increasing.
|
||||
*/
|
||||
private static class FromSingle extends AutoDateHistogramAggregator {
|
||||
private int roundingIdx;
|
||||
private Rounding.Prepared preparedRounding;
|
||||
/**
|
||||
* Map from value to bucket ordinals.
|
||||
* <p>
|
||||
* It is important that this is the exact subtype of
|
||||
* {@link LongKeyedBucketOrds} so that the JVM can make a monomorphic
|
||||
* call to {@link LongKeyedBucketOrds#add(long, long)} in the tight
|
||||
* inner loop of {@link LeafBucketCollector#collect(int, long)}. You'd
|
||||
* think that it wouldn't matter, but its seriously 7%-15% performance
|
||||
* difference for the aggregation. Yikes.
|
||||
*/
|
||||
private LongKeyedBucketOrds.FromSingle bucketOrds;
|
||||
private long min = Long.MAX_VALUE;
|
||||
private long max = Long.MIN_VALUE;
|
||||
|
||||
FromSingle(
|
||||
String name,
|
||||
AggregatorFactories factories,
|
||||
int targetBuckets,
|
||||
RoundingInfo[] roundingInfos,
|
||||
Function<Rounding, Prepared> roundingPreparer,
|
||||
ValuesSourceConfig valuesSourceConfig,
|
||||
SearchContext aggregationContext,
|
||||
Aggregator parent,
|
||||
Map<String, Object> metadata
|
||||
) throws IOException {
|
||||
super(
|
||||
name,
|
||||
factories,
|
||||
targetBuckets,
|
||||
roundingInfos,
|
||||
roundingPreparer,
|
||||
valuesSourceConfig,
|
||||
aggregationContext,
|
||||
parent,
|
||||
metadata
|
||||
);
|
||||
|
||||
preparedRounding = prepareRounding(0);
|
||||
bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException {
|
||||
return new LeafBucketCollectorBase(sub, values) {
|
||||
@Override
|
||||
public void collect(int doc, long owningBucketOrd) throws IOException {
|
||||
assert owningBucketOrd == 0;
|
||||
if (false == values.advanceExact(doc)) {
|
||||
return;
|
||||
}
|
||||
int valuesCount = values.docValueCount();
|
||||
|
||||
long previousRounded = Long.MIN_VALUE;
|
||||
for (int i = 0; i < valuesCount; ++i) {
|
||||
@ -129,74 +291,355 @@ class AutoDateHistogramAggregator extends DeferableBucketAggregator {
|
||||
if (rounded == previousRounded) {
|
||||
continue;
|
||||
}
|
||||
long bucketOrd = bucketOrds.add(rounded);
|
||||
if (bucketOrd < 0) { // already seen
|
||||
bucketOrd = -1 - bucketOrd;
|
||||
collectExistingBucket(sub, doc, bucketOrd);
|
||||
} else {
|
||||
collectBucket(sub, doc, bucketOrd);
|
||||
while (roundingIdx < roundingInfos.length - 1
|
||||
&& bucketOrds.size() > (targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval())) {
|
||||
increaseRounding();
|
||||
}
|
||||
}
|
||||
collectValue(doc, rounded);
|
||||
previousRounded = rounded;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void increaseRounding() {
|
||||
try (LongHash oldBucketOrds = bucketOrds) {
|
||||
LongHash newBucketOrds = new LongHash(1, context.bigArrays());
|
||||
long[] mergeMap = new long[(int) oldBucketOrds.size()];
|
||||
preparedRounding = roundingPreparer.apply(roundingInfos[++roundingIdx].rounding);
|
||||
for (int i = 0; i < oldBucketOrds.size(); i++) {
|
||||
long oldKey = oldBucketOrds.get(i);
|
||||
long newKey = preparedRounding.round(oldKey);
|
||||
long newBucketOrd = newBucketOrds.add(newKey);
|
||||
if (newBucketOrd >= 0) {
|
||||
mergeMap[i] = newBucketOrd;
|
||||
} else {
|
||||
mergeMap[i] = -1 - newBucketOrd;
|
||||
}
|
||||
private void collectValue(int doc, long rounded) throws IOException {
|
||||
long bucketOrd = bucketOrds.add(0, rounded);
|
||||
if (bucketOrd < 0) { // already seen
|
||||
bucketOrd = -1 - bucketOrd;
|
||||
collectExistingBucket(sub, doc, bucketOrd);
|
||||
return;
|
||||
}
|
||||
mergeBuckets(mergeMap, newBucketOrds.size());
|
||||
if (deferringCollector != null) {
|
||||
deferringCollector.mergeBuckets(mergeMap);
|
||||
}
|
||||
bucketOrds = newBucketOrds;
|
||||
collectBucket(sub, doc, bucketOrd);
|
||||
increaseRoundingIfNeeded(rounded);
|
||||
}
|
||||
|
||||
private void increaseRoundingIfNeeded(long rounded) {
|
||||
if (roundingIdx >= roundingInfos.length - 1) {
|
||||
return;
|
||||
}
|
||||
min = Math.min(min, rounded);
|
||||
max = Math.max(max, rounded);
|
||||
if (bucketOrds.size() <= targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval()
|
||||
&& max - min <= targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()) {
|
||||
return;
|
||||
}
|
||||
do {
|
||||
try (LongKeyedBucketOrds oldOrds = bucketOrds) {
|
||||
preparedRounding = prepareRounding(++roundingIdx);
|
||||
long[] mergeMap = new long[Math.toIntExact(oldOrds.size())];
|
||||
bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays());
|
||||
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(0);
|
||||
while (ordsEnum.next()) {
|
||||
long oldKey = ordsEnum.value();
|
||||
long newKey = preparedRounding.round(oldKey);
|
||||
long newBucketOrd = bucketOrds.add(0, newKey);
|
||||
mergeMap[(int) ordsEnum.ord()] = newBucketOrd >= 0 ? newBucketOrd : -1 - newBucketOrd;
|
||||
}
|
||||
merge(mergeMap, bucketOrds.size());
|
||||
}
|
||||
} while (roundingIdx < roundingInfos.length - 1
|
||||
&& (bucketOrds.size() > targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval()
|
||||
|| max - min > targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
|
||||
return buildAggregations(bucketOrds, l -> roundingIdx, owningBucketOrds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectDebugInfo(BiConsumer<String, Object> add) {
|
||||
super.collectDebugInfo(add);
|
||||
add.accept("surviving_buckets", bucketOrds.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
Releasables.close(bucketOrds);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initially it uses the most fine grained rounding configuration possible but
|
||||
* as more data arrives it uses two heuristics to shift to coarser and coarser
|
||||
* rounding. The first heuristic is the number of buckets, specifically,
|
||||
* when there are more buckets than can "fit" in the current rounding it shifts
|
||||
* to the next rounding. Instead of redoing the rounding, it estimates the
|
||||
* number of buckets that will "survive" at the new rounding and uses
|
||||
* <strong>that</strong> as the initial value for the bucket count that it
|
||||
* increments in order to trigger another promotion to another coarser
|
||||
* rounding. This works fairly well at containing the number of buckets, but
|
||||
* the estimate of the number of buckets will be wrong if the buckets are
|
||||
* quite a spread out compared to the rounding.
|
||||
* <p>
|
||||
* The second heuristic it uses to trigger promotion to a coarser rounding is
|
||||
* the distance between the min and max bucket. When that distance is greater
|
||||
* than what the current rounding supports it promotes. This heuristic
|
||||
* isn't good at limiting the number of buckets but is great when the buckets
|
||||
* are spread out compared to the rounding. So it should complement the first
|
||||
* heuristic.
|
||||
* <p>
|
||||
* When promoting a rounding we keep the old buckets around because it is
|
||||
* expensive to call {@link MergingBucketsDeferringCollector#mergeBuckets}.
|
||||
* In particular it is {@code O(number_of_hits_collected_so_far)}. So if we
|
||||
* called it frequently we'd end up in {@code O(n^2)} territory. Bad news for
|
||||
* aggregations! Instead, we keep a "budget" of buckets that we're ok
|
||||
* "wasting". When we promote the rounding and our estimate of the number of
|
||||
* "dead" buckets that have data but have yet to be merged into the buckets
|
||||
* that are valid for the current rounding exceeds the budget then we rebucket
|
||||
* the entire aggregation and double the budget.
|
||||
* <p>
|
||||
* Once we're done collecting and we know exactly which buckets we'll be
|
||||
* returning we <strong>finally</strong> perform a "real", "perfect bucketing",
|
||||
* rounding all of the keys for {@code owningBucketOrd} that we're going to
|
||||
* collect and picking the rounding based on a real, accurate count and the
|
||||
* min and max.
|
||||
*/
|
||||
private static class FromMany extends AutoDateHistogramAggregator {
|
||||
/**
|
||||
* An array of prepared roundings in the same order as
|
||||
* {@link #roundingInfos}. The 0th entry is prepared initially,
|
||||
* and other entries are null until first needed.
|
||||
*/
|
||||
private final Rounding.Prepared[] preparedRoundings;
|
||||
/**
|
||||
* Map from value to bucket ordinals.
|
||||
* <p>
|
||||
* It is important that this is the exact subtype of
|
||||
* {@link LongKeyedBucketOrds} so that the JVM can make a monomorphic
|
||||
* call to {@link LongKeyedBucketOrds#add(long, long)} in the tight
|
||||
* inner loop of {@link LeafBucketCollector#collect(int, long)}.
|
||||
*/
|
||||
private LongKeyedBucketOrds.FromMany bucketOrds;
|
||||
/**
|
||||
* The index of the rounding that each {@code owningBucketOrd} is
|
||||
* currently using.
|
||||
* <p>
|
||||
* During collection we use overestimates for how much buckets are save
|
||||
* by bumping to the next rounding index. So we end up bumping less
|
||||
* aggressively than a "perfect" algorithm. That is fine because we
|
||||
* correct the error when we merge the buckets together all the way
|
||||
* up in {@link InternalAutoDateHistogram#reduceBucket}. In particular,
|
||||
* on final reduce we bump the rounding until it we appropriately
|
||||
* cover the date range across all of the results returned by all of
|
||||
* the {@link AutoDateHistogramAggregator}s.
|
||||
*/
|
||||
private ByteArray roundingIndices;
|
||||
/**
|
||||
* The minimum key per {@code owningBucketOrd}.
|
||||
*/
|
||||
private LongArray mins;
|
||||
/**
|
||||
* The max key per {@code owningBucketOrd}.
|
||||
*/
|
||||
private LongArray maxes;
|
||||
|
||||
/**
|
||||
* An underestimate of the number of buckets that are "live" in the
|
||||
* current rounding for each {@code owningBucketOrdinal}.
|
||||
*/
|
||||
private IntArray liveBucketCountUnderestimate;
|
||||
/**
|
||||
* An over estimate of the number of wasted buckets. When this gets
|
||||
* too high we {@link #rebucket} which sets it to 0.
|
||||
*/
|
||||
private long wastedBucketsOverestimate = 0;
|
||||
/**
|
||||
* The next {@link #wastedBucketsOverestimate} that will trigger a
|
||||
* {@link #rebucket() rebucketing}.
|
||||
*/
|
||||
private long nextRebucketAt = 1000; // TODO this could almost certainly start higher when asMultiBucketAggregator is gone
|
||||
/**
|
||||
* The number of times the aggregator had to {@link #rebucket()} the
|
||||
* results. We keep this just to report to the profiler.
|
||||
*/
|
||||
private int rebucketCount = 0;
|
||||
|
||||
FromMany(
|
||||
String name,
|
||||
AggregatorFactories factories,
|
||||
int targetBuckets,
|
||||
RoundingInfo[] roundingInfos,
|
||||
Function<Rounding, Rounding.Prepared> roundingPreparer,
|
||||
ValuesSourceConfig valuesSourceConfig,
|
||||
SearchContext aggregationContext,
|
||||
Aggregator parent,
|
||||
Map<String, Object> metadata
|
||||
) throws IOException {
|
||||
|
||||
super(
|
||||
name,
|
||||
factories,
|
||||
targetBuckets,
|
||||
roundingInfos,
|
||||
roundingPreparer,
|
||||
valuesSourceConfig,
|
||||
aggregationContext,
|
||||
parent,
|
||||
metadata
|
||||
);
|
||||
assert roundingInfos.length < 127 : "Rounding must fit in a signed byte";
|
||||
roundingIndices = context.bigArrays().newByteArray(1, true);
|
||||
mins = context.bigArrays().newLongArray(1, false);
|
||||
mins.set(0, Long.MAX_VALUE);
|
||||
maxes = context.bigArrays().newLongArray(1, false);
|
||||
maxes.set(0, Long.MIN_VALUE);
|
||||
preparedRoundings = new Rounding.Prepared[roundingInfos.length];
|
||||
// Prepare the first rounding because we know we'll need it.
|
||||
preparedRoundings[0] = roundingPreparer.apply(roundingInfos[0].rounding);
|
||||
bucketOrds = new LongKeyedBucketOrds.FromMany(context.bigArrays());
|
||||
liveBucketCountUnderestimate = context.bigArrays().newIntArray(1, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException {
|
||||
return new LeafBucketCollectorBase(sub, values) {
|
||||
@Override
|
||||
public void collect(int doc, long owningBucketOrd) throws IOException {
|
||||
if (false == values.advanceExact(doc)) {
|
||||
return;
|
||||
}
|
||||
int valuesCount = values.docValueCount();
|
||||
|
||||
long previousRounded = Long.MIN_VALUE;
|
||||
int roundingIdx = roundingIndexFor(owningBucketOrd);
|
||||
for (int i = 0; i < valuesCount; ++i) {
|
||||
long value = values.nextValue();
|
||||
long rounded = preparedRoundings[roundingIdx].round(value);
|
||||
assert rounded >= previousRounded;
|
||||
if (rounded == previousRounded) {
|
||||
continue;
|
||||
}
|
||||
roundingIdx = collectValue(owningBucketOrd, roundingIdx, doc, rounded);
|
||||
previousRounded = rounded;
|
||||
}
|
||||
}
|
||||
|
||||
private int collectValue(long owningBucketOrd, int roundingIdx, int doc, long rounded) throws IOException {
|
||||
long bucketOrd = bucketOrds.add(owningBucketOrd, rounded);
|
||||
if (bucketOrd < 0) { // already seen
|
||||
bucketOrd = -1 - bucketOrd;
|
||||
collectExistingBucket(sub, doc, bucketOrd);
|
||||
return roundingIdx;
|
||||
}
|
||||
collectBucket(sub, doc, bucketOrd);
|
||||
liveBucketCountUnderestimate = context.bigArrays().grow(liveBucketCountUnderestimate, owningBucketOrd + 1);
|
||||
int estimatedBucketCount = liveBucketCountUnderestimate.increment(owningBucketOrd, 1);
|
||||
return increaseRoundingIfNeeded(owningBucketOrd, estimatedBucketCount, rounded, roundingIdx);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increase the rounding of {@code owningBucketOrd} using
|
||||
* estimated, bucket counts, {@link #rebucket() rebucketing} the all
|
||||
* buckets if the estimated number of wasted buckets is too high.
|
||||
*/
|
||||
private int increaseRoundingIfNeeded(long owningBucketOrd, int oldEstimatedBucketCount, long newKey, int oldRounding) {
|
||||
if (oldRounding >= roundingInfos.length - 1) {
|
||||
return oldRounding;
|
||||
}
|
||||
if (mins.size() < owningBucketOrd + 1) {
|
||||
long oldSize = mins.size();
|
||||
mins = context.bigArrays().grow(mins, owningBucketOrd + 1);
|
||||
mins.fill(oldSize, mins.size(), Long.MAX_VALUE);
|
||||
}
|
||||
if (maxes.size() < owningBucketOrd + 1) {
|
||||
long oldSize = maxes.size();
|
||||
maxes = context.bigArrays().grow(maxes, owningBucketOrd + 1);
|
||||
maxes.fill(oldSize, maxes.size(), Long.MIN_VALUE);
|
||||
}
|
||||
|
||||
long min = Math.min(mins.get(owningBucketOrd), newKey);
|
||||
mins.set(owningBucketOrd, min);
|
||||
long max = Math.max(maxes.get(owningBucketOrd), newKey);
|
||||
maxes.set(owningBucketOrd, max);
|
||||
if (oldEstimatedBucketCount <= targetBuckets * roundingInfos[oldRounding].getMaximumInnerInterval()
|
||||
&& max - min <= targetBuckets * roundingInfos[oldRounding].getMaximumRoughEstimateDurationMillis()) {
|
||||
return oldRounding;
|
||||
}
|
||||
long oldRoughDuration = roundingInfos[oldRounding].roughEstimateDurationMillis;
|
||||
int newRounding = oldRounding;
|
||||
int newEstimatedBucketCount;
|
||||
do {
|
||||
newRounding++;
|
||||
double ratio = (double) oldRoughDuration / (double) roundingInfos[newRounding].getRoughEstimateDurationMillis();
|
||||
newEstimatedBucketCount = (int) Math.ceil(oldEstimatedBucketCount * ratio);
|
||||
} while (newRounding < roundingInfos.length - 1
|
||||
&& (newEstimatedBucketCount > targetBuckets * roundingInfos[newRounding].getMaximumInnerInterval()
|
||||
|| max - min > targetBuckets * roundingInfos[newRounding].getMaximumRoughEstimateDurationMillis()));
|
||||
setRounding(owningBucketOrd, newRounding);
|
||||
mins.set(owningBucketOrd, preparedRoundings[newRounding].round(mins.get(owningBucketOrd)));
|
||||
maxes.set(owningBucketOrd, preparedRoundings[newRounding].round(maxes.get(owningBucketOrd)));
|
||||
wastedBucketsOverestimate += oldEstimatedBucketCount - newEstimatedBucketCount;
|
||||
if (wastedBucketsOverestimate > nextRebucketAt) {
|
||||
rebucket();
|
||||
// Bump the threshold for the next rebucketing
|
||||
wastedBucketsOverestimate = 0;
|
||||
nextRebucketAt *= 2;
|
||||
} else {
|
||||
liveBucketCountUnderestimate.set(owningBucketOrd, newEstimatedBucketCount);
|
||||
}
|
||||
return newRounding;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void rebucket() {
|
||||
rebucketCount++;
|
||||
try (LongKeyedBucketOrds oldOrds = bucketOrds) {
|
||||
long[] mergeMap = new long[Math.toIntExact(oldOrds.size())];
|
||||
bucketOrds = new LongKeyedBucketOrds.FromMany(context.bigArrays());
|
||||
for (long owningBucketOrd = 0; owningBucketOrd <= oldOrds.maxOwningBucketOrd(); owningBucketOrd++) {
|
||||
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(owningBucketOrd);
|
||||
Rounding.Prepared preparedRounding = preparedRoundings[roundingIndexFor(owningBucketOrd)];
|
||||
while (ordsEnum.next()) {
|
||||
long oldKey = ordsEnum.value();
|
||||
long newKey = preparedRounding.round(oldKey);
|
||||
long newBucketOrd = bucketOrds.add(owningBucketOrd, newKey);
|
||||
mergeMap[(int) ordsEnum.ord()] = newBucketOrd >= 0 ? newBucketOrd : -1 - newBucketOrd;
|
||||
}
|
||||
liveBucketCountUnderestimate = context.bigArrays().grow(liveBucketCountUnderestimate, owningBucketOrd + 1);
|
||||
liveBucketCountUnderestimate.set(owningBucketOrd, Math.toIntExact(bucketOrds.bucketsInOrd(owningBucketOrd)));
|
||||
}
|
||||
merge(mergeMap, bucketOrds.size());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
|
||||
/*
|
||||
* Rebucket before building the aggregation to build as small as result
|
||||
* as possible.
|
||||
*
|
||||
* TODO it'd be faster if we could apply the merging on the fly as we
|
||||
* replay the hits and build the buckets. How much faster is not clear,
|
||||
* but it does have the advantage of only touching the buckets that we
|
||||
* want to collect.
|
||||
*/
|
||||
rebucket();
|
||||
return buildAggregations(bucketOrds, this::roundingIndexFor, owningBucketOrds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectDebugInfo(BiConsumer<String, Object> add) {
|
||||
super.collectDebugInfo(add);
|
||||
add.accept("surviving_buckets", bucketOrds.size());
|
||||
add.accept("wasted_buckets_overestimate", wastedBucketsOverestimate);
|
||||
add.accept("next_rebucket_at", nextRebucketAt);
|
||||
add.accept("rebucket_count", rebucketCount);
|
||||
}
|
||||
|
||||
private void setRounding(long owningBucketOrd, int newRounding) {
|
||||
roundingIndices = context.bigArrays().grow(roundingIndices, owningBucketOrd + 1);
|
||||
roundingIndices.set(owningBucketOrd, (byte) newRounding);
|
||||
if (preparedRoundings[newRounding] == null) {
|
||||
preparedRoundings[newRounding] = prepareRounding(newRounding);
|
||||
}
|
||||
}
|
||||
|
||||
private int roundingIndexFor(long owningBucketOrd) {
|
||||
return owningBucketOrd < roundingIndices.size() ? roundingIndices.get(owningBucketOrd) : 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doClose() {
|
||||
Releasables.close(bucketOrds, roundingIndices, mins, maxes, liveBucketCountUnderestimate);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
|
||||
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds,
|
||||
(bucketValue, docCount, subAggregationResults) ->
|
||||
new InternalAutoDateHistogram.Bucket(bucketValue, docCount, formatter, subAggregationResults),
|
||||
buckets -> {
|
||||
// the contract of the histogram aggregation is that shards must return
|
||||
// buckets ordered by key in ascending order
|
||||
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
|
||||
|
||||
// value source will be null for unmapped fields
|
||||
InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundingInfos,
|
||||
roundingIdx, buildEmptySubAggregations());
|
||||
|
||||
return new InternalAutoDateHistogram(name, buckets, targetBuckets, emptyBucketInfo, formatter, metadata(), 1);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation buildEmptyAggregation() {
|
||||
InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundingInfos, roundingIdx,
|
||||
buildEmptySubAggregations());
|
||||
return new InternalAutoDateHistogram(name, Collections.emptyList(), targetBuckets, emptyBucketInfo, formatter, metadata(), 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doClose() {
|
||||
Releasables.close(bucketOrds);
|
||||
}
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ public final class AutoDateHistogramAggregatorFactory extends ValuesSourceAggreg
|
||||
public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
|
||||
builder.register(AutoDateHistogramAggregationBuilder.NAME,
|
||||
Arrays.asList(CoreValuesSourceType.DATE, CoreValuesSourceType.NUMERIC, CoreValuesSourceType.BOOLEAN),
|
||||
(AutoDateHistogramAggregatorSupplier) AutoDateHistogramAggregator::new);
|
||||
(AutoDateHistogramAggregatorSupplier) AutoDateHistogramAggregator::build);
|
||||
}
|
||||
|
||||
private final int numBuckets;
|
||||
@ -67,9 +67,6 @@ public final class AutoDateHistogramAggregatorFactory extends ValuesSourceAggreg
|
||||
Aggregator parent,
|
||||
boolean collectsFromSingleBucket,
|
||||
Map<String, Object> metadata) throws IOException {
|
||||
if (collectsFromSingleBucket == false) {
|
||||
return asMultiBucketAggregator(this, searchContext, parent);
|
||||
}
|
||||
AggregatorSupplier aggregatorSupplier = queryShardContext.getValuesSourceRegistry().getAggregator(config,
|
||||
AutoDateHistogramAggregationBuilder.NAME);
|
||||
if (aggregatorSupplier instanceof AutoDateHistogramAggregatorSupplier == false) {
|
||||
@ -79,14 +76,24 @@ public final class AutoDateHistogramAggregatorFactory extends ValuesSourceAggreg
|
||||
Function<Rounding, Rounding.Prepared> roundingPreparer =
|
||||
config.getValuesSource().roundingPreparer(searchContext.getQueryShardContext().getIndexReader());
|
||||
return ((AutoDateHistogramAggregatorSupplier) aggregatorSupplier).build(name, factories, numBuckets, roundingInfos,
|
||||
roundingPreparer, config, searchContext, parent, metadata);
|
||||
roundingPreparer, config, searchContext, parent, collectsFromSingleBucket, metadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Aggregator createUnmapped(SearchContext searchContext,
|
||||
Aggregator parent,
|
||||
Map<String, Object> metadata) throws IOException {
|
||||
return new AutoDateHistogramAggregator(name, factories, numBuckets, roundingInfos, Rounding::prepareForUnknown,
|
||||
config, searchContext, parent, metadata);
|
||||
return AutoDateHistogramAggregator.build(
|
||||
name,
|
||||
factories,
|
||||
numBuckets,
|
||||
roundingInfos,
|
||||
Rounding::prepareForUnknown,
|
||||
config,
|
||||
searchContext,
|
||||
parent,
|
||||
false,
|
||||
metadata
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -43,6 +43,7 @@ public interface AutoDateHistogramAggregatorSupplier extends AggregatorSupplier
|
||||
ValuesSourceConfig valuesSourceConfig,
|
||||
SearchContext aggregationContext,
|
||||
Aggregator parent,
|
||||
boolean collectsFromSingleBucket,
|
||||
Map<String, Object> metadata
|
||||
) throws IOException;
|
||||
}
|
||||
|
@ -144,7 +144,7 @@ class DateHistogramAggregator extends BucketsAggregator {
|
||||
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds,
|
||||
(bucketValue, docCount, subAggregationResults) -> {
|
||||
return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults);
|
||||
}, buckets -> {
|
||||
}, (owningBucketOrd, buckets) -> {
|
||||
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
|
||||
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
|
||||
|
||||
|
@ -171,7 +171,7 @@ class DateRangeHistogramAggregator extends BucketsAggregator {
|
||||
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds,
|
||||
(bucketValue, docCount, subAggregationResults) ->
|
||||
new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults),
|
||||
buckets -> {
|
||||
(owningBucketOrd, buckets) -> {
|
||||
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
|
||||
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
|
||||
|
||||
|
@ -46,22 +46,30 @@ public abstract class LongKeyedBucketOrds implements Releasable {
|
||||
*/
|
||||
public abstract long add(long owningBucketOrd, long value);
|
||||
|
||||
/**
|
||||
* Count the buckets in {@code owningBucketOrd}.
|
||||
* <p>
|
||||
* Some aggregations expect this to be fast but most wouldn't
|
||||
* mind particularly if it weren't.
|
||||
*/
|
||||
public abstract long bucketsInOrd(long owningBucketOrd);
|
||||
|
||||
/**
|
||||
* Find the {@code owningBucketOrd, value} pair. Return the ord for
|
||||
* their bucket if they have been added or {@code -1} if they haven't.
|
||||
*/
|
||||
public abstract long find(long owningBucketOrd, long value);
|
||||
|
||||
/**
|
||||
* Count the buckets in {@code owningBucketOrd}.
|
||||
*/
|
||||
public abstract long bucketsInOrd(long owningBucketOrd);
|
||||
public abstract long find(long owningBucketOrd, long value);
|
||||
|
||||
/**
|
||||
* The number of collected buckets.
|
||||
*/
|
||||
public abstract long size();
|
||||
|
||||
/**
|
||||
* The maximum possible used {@code owningBucketOrd}.
|
||||
*/
|
||||
public abstract long maxOwningBucketOrd();
|
||||
|
||||
/**
|
||||
* Build an iterator for buckets inside {@code owningBucketOrd} in order
|
||||
* of increasing ord.
|
||||
@ -105,10 +113,10 @@ public abstract class LongKeyedBucketOrds implements Releasable {
|
||||
/**
|
||||
* Implementation that only works if it is collecting from a single bucket.
|
||||
*/
|
||||
private static class FromSingle extends LongKeyedBucketOrds {
|
||||
public static class FromSingle extends LongKeyedBucketOrds {
|
||||
private final LongHash ords;
|
||||
|
||||
FromSingle(BigArrays bigArrays) {
|
||||
public FromSingle(BigArrays bigArrays) {
|
||||
ords = new LongHash(1, bigArrays);
|
||||
}
|
||||
|
||||
@ -135,6 +143,11 @@ public abstract class LongKeyedBucketOrds implements Releasable {
|
||||
return ords.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long maxOwningBucketOrd() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BucketOrdsEnum ordsEnum(long owningBucketOrd) {
|
||||
assert owningBucketOrd == 0;
|
||||
@ -173,7 +186,7 @@ public abstract class LongKeyedBucketOrds implements Releasable {
|
||||
/**
|
||||
* Implementation that works properly when collecting from many buckets.
|
||||
*/
|
||||
private static class FromMany extends LongKeyedBucketOrds {
|
||||
public static class FromMany extends LongKeyedBucketOrds {
|
||||
// TODO we can almost certainly do better here by building something fit for purpose rather than trying to lego together stuff
|
||||
private static class Buckets implements Releasable {
|
||||
private final LongHash valueToThisBucketOrd;
|
||||
@ -193,7 +206,7 @@ public abstract class LongKeyedBucketOrds implements Releasable {
|
||||
private ObjectArray<Buckets> owningOrdToBuckets;
|
||||
private long lastGlobalOrd = -1;
|
||||
|
||||
FromMany(BigArrays bigArrays) {
|
||||
public FromMany(BigArrays bigArrays) {
|
||||
this.bigArrays = bigArrays;
|
||||
owningOrdToBuckets = bigArrays.newObjectArray(1);
|
||||
}
|
||||
@ -261,6 +274,11 @@ public abstract class LongKeyedBucketOrds implements Releasable {
|
||||
return lastGlobalOrd + 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long maxOwningBucketOrd() {
|
||||
return owningOrdToBuckets.size() - 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BucketOrdsEnum ordsEnum(long owningBucketOrd) {
|
||||
if (owningBucketOrd >= owningOrdToBuckets.size()) {
|
||||
|
@ -41,6 +41,10 @@ public class InternalAggregationProfileTree extends AbstractInternalProfileTree<
|
||||
if (element instanceof MultiBucketAggregatorWrapper) {
|
||||
return ((MultiBucketAggregatorWrapper) element).getWrappedClass().getSimpleName();
|
||||
}
|
||||
Class<?> enclosing = element.getClass().getEnclosingClass();
|
||||
if (enclosing != null) {
|
||||
return enclosing.getSimpleName() + "." + element.getClass().getSimpleName();
|
||||
}
|
||||
return element.getClass().getSimpleName();
|
||||
}
|
||||
|
||||
|
@ -22,34 +22,41 @@ package org.elasticsearch.search.aggregations.bucket.histogram;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.LongPoint;
|
||||
import org.apache.lucene.document.SortedNumericDocValuesField;
|
||||
import org.apache.lucene.document.SortedSetDocValuesField;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.MatchNoDocsQuery;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.common.CheckedBiConsumer;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.time.DateFormatter;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.mapper.DateFieldMapper;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.mapper.NumberFieldMapper;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
||||
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.metrics.InternalMax;
|
||||
import org.elasticsearch.search.aggregations.metrics.InternalStats;
|
||||
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.YearMonth;
|
||||
import java.time.ZoneOffset;
|
||||
@ -58,14 +65,20 @@ import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.Matchers.either;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasEntry;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
public class AutoDateHistogramAggregatorTests extends DateHistogramAggregatorTestCase {
|
||||
private static final String DATE_FIELD = "date";
|
||||
private static final String INSTANT_FIELD = "instant";
|
||||
private static final String NUMERIC_FIELD = "numeric";
|
||||
@ -95,19 +108,22 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
}
|
||||
|
||||
public void testMatchAllDocs() throws IOException {
|
||||
Map<String, Integer> expectedDocCount = new TreeMap<>();
|
||||
expectedDocCount.put("2010-01-01T00:00:00.000Z", 2);
|
||||
expectedDocCount.put("2012-01-01T00:00:00.000Z", 1);
|
||||
expectedDocCount.put("2013-01-01T00:00:00.000Z", 2);
|
||||
expectedDocCount.put("2015-01-01T00:00:00.000Z", 3);
|
||||
expectedDocCount.put("2016-01-01T00:00:00.000Z", 1);
|
||||
expectedDocCount.put("2017-01-01T00:00:00.000Z", 1);
|
||||
testSearchCase(DEFAULT_QUERY, DATES_WITH_TIME,
|
||||
aggregation -> aggregation.setNumBuckets(6).field(DATE_FIELD),
|
||||
histogram -> {
|
||||
assertEquals(10, histogram.getBuckets().size());
|
||||
assertTrue(AggregationInspectionHelper.hasValue(histogram));
|
||||
}
|
||||
aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD),
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount))
|
||||
);
|
||||
expectedDocCount.put("2011-01-01T00:00:00.000Z", 0);
|
||||
expectedDocCount.put("2014-01-01T00:00:00.000Z", 0);
|
||||
testSearchAndReduceCase(DEFAULT_QUERY, DATES_WITH_TIME,
|
||||
aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD),
|
||||
histogram -> {
|
||||
assertEquals(8, histogram.getBuckets().size());
|
||||
assertTrue(AggregationInspectionHelper.hasValue(histogram));
|
||||
}
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount))
|
||||
);
|
||||
}
|
||||
|
||||
@ -194,6 +210,177 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
});
|
||||
}
|
||||
|
||||
public void testAsSubAgg() throws IOException {
|
||||
AggregationBuilder builder = new TermsAggregationBuilder("k1").field("k1").subAggregation(
|
||||
new AutoDateHistogramAggregationBuilder("dh").field(AGGREGABLE_DATE).setNumBuckets(3).subAggregation(
|
||||
new MaxAggregationBuilder("max").field("n")));
|
||||
asSubAggTestCase(builder, (StringTerms terms) -> {
|
||||
StringTerms.Bucket a = terms.getBucketByKey("a");
|
||||
InternalAutoDateHistogram adh = a.getAggregations().get("dh");
|
||||
Map<String, Integer> expectedDocCount = new TreeMap<>();
|
||||
expectedDocCount.put("2020-01-01T00:00:00.000Z", 2);
|
||||
expectedDocCount.put("2021-01-01T00:00:00.000Z", 2);
|
||||
assertThat(bucketCountsAsMap(adh), equalTo(expectedDocCount));
|
||||
Map<String, Double> expectedMax = new TreeMap<>();
|
||||
expectedMax.put("2020-01-01T00:00:00.000Z", 2.0);
|
||||
expectedMax.put("2021-01-01T00:00:00.000Z", 4.0);
|
||||
assertThat(maxAsMap(adh), equalTo(expectedMax));
|
||||
|
||||
StringTerms.Bucket b = terms.getBucketByKey("b");
|
||||
InternalAutoDateHistogram bdh = b.getAggregations().get("dh");
|
||||
expectedDocCount.clear();
|
||||
expectedDocCount.put("2020-02-01T00:00:00.000Z", 1);
|
||||
assertThat(bucketCountsAsMap(bdh), equalTo(expectedDocCount));
|
||||
expectedMax.clear();
|
||||
expectedMax.put("2020-02-01T00:00:00.000Z", 5.0);
|
||||
assertThat(maxAsMap(bdh), equalTo(expectedMax));
|
||||
});
|
||||
builder = new TermsAggregationBuilder("k2").field("k2").subAggregation(builder);
|
||||
asSubAggTestCase(builder, (StringTerms terms) -> {
|
||||
StringTerms.Bucket a = terms.getBucketByKey("a");
|
||||
StringTerms ak1 = a.getAggregations().get("k1");
|
||||
StringTerms.Bucket ak1a = ak1.getBucketByKey("a");
|
||||
InternalAutoDateHistogram ak1adh = ak1a.getAggregations().get("dh");
|
||||
Map<String, Integer> expectedDocCount = new TreeMap<>();
|
||||
expectedDocCount.put("2020-01-01T00:00:00.000Z", 2);
|
||||
expectedDocCount.put("2021-01-01T00:00:00.000Z", 1);
|
||||
assertThat(bucketCountsAsMap(ak1adh), equalTo(expectedDocCount));
|
||||
Map<String, Double> expectedMax = new TreeMap<>();
|
||||
expectedMax.put("2020-01-01T00:00:00.000Z", 2.0);
|
||||
expectedMax.put("2021-01-01T00:00:00.000Z", 3.0);
|
||||
assertThat(maxAsMap(ak1adh), equalTo(expectedMax));
|
||||
|
||||
StringTerms.Bucket b = terms.getBucketByKey("b");
|
||||
StringTerms bk1 = b.getAggregations().get("k1");
|
||||
StringTerms.Bucket bk1a = bk1.getBucketByKey("a");
|
||||
InternalAutoDateHistogram bk1adh = bk1a.getAggregations().get("dh");
|
||||
expectedDocCount.clear();
|
||||
expectedDocCount.put("2021-03-01T00:00:00.000Z", 1);
|
||||
assertThat(bucketCountsAsMap(bk1adh), equalTo(expectedDocCount));
|
||||
expectedMax.clear();
|
||||
expectedMax.put("2021-03-01T00:00:00.000Z", 4.0);
|
||||
assertThat(maxAsMap(bk1adh), equalTo(expectedMax));
|
||||
StringTerms.Bucket bk1b = bk1.getBucketByKey("b");
|
||||
InternalAutoDateHistogram bk1bdh = bk1b.getAggregations().get("dh");
|
||||
expectedDocCount.clear();
|
||||
expectedDocCount.put("2020-02-01T00:00:00.000Z", 1);
|
||||
assertThat(bucketCountsAsMap(bk1bdh), equalTo(expectedDocCount));
|
||||
expectedMax.clear();
|
||||
expectedMax.put("2020-02-01T00:00:00.000Z", 5.0);
|
||||
assertThat(maxAsMap(bk1bdh), equalTo(expectedMax));
|
||||
});
|
||||
}
|
||||
|
||||
public void testAsSubAggWithIncreasedRounding() throws IOException {
|
||||
CheckedBiConsumer<RandomIndexWriter, DateFieldMapper.DateFieldType, IOException> buildIndex = (iw, dft) -> {
|
||||
long start = dft.parse("2020-01-01T00:00:00Z");
|
||||
long end = dft.parse("2021-01-01T00:00:00Z");
|
||||
long useC = dft.parse("2020-07-01T00:00Z");
|
||||
long anHour = dft.resolution().convert(Instant.ofEpochSecond(TimeUnit.HOURS.toSeconds(1)));
|
||||
List<List<IndexableField>> docs = new ArrayList<>();
|
||||
BytesRef aBytes = new BytesRef("a");
|
||||
BytesRef bBytes = new BytesRef("b");
|
||||
BytesRef cBytes = new BytesRef("c");
|
||||
int n = 0;
|
||||
for (long d = start; d < end; d += anHour) {
|
||||
docs.add(org.elasticsearch.common.collect.List.of(
|
||||
new SortedNumericDocValuesField(AGGREGABLE_DATE, d),
|
||||
new SortedSetDocValuesField("k1", aBytes),
|
||||
new SortedSetDocValuesField("k1", d < useC ? bBytes : cBytes),
|
||||
new SortedNumericDocValuesField("n", n++)
|
||||
));
|
||||
}
|
||||
/*
|
||||
* Intentionally add all documents at once to put them on the
|
||||
* same shard to make the reduce behavior consistent.
|
||||
*/
|
||||
iw.addDocuments(docs);
|
||||
};
|
||||
AggregationBuilder builder = new TermsAggregationBuilder("k1").field("k1").subAggregation(
|
||||
new AutoDateHistogramAggregationBuilder("dh").field(AGGREGABLE_DATE).setNumBuckets(4).subAggregation(
|
||||
new MaxAggregationBuilder("max").field("n")));
|
||||
asSubAggTestCase(builder, buildIndex, (StringTerms terms) -> {
|
||||
StringTerms.Bucket a = terms.getBucketByKey("a");
|
||||
InternalAutoDateHistogram adh = a.getAggregations().get("dh");
|
||||
Map<String, Integer> expectedDocCount = new TreeMap<>();
|
||||
expectedDocCount.put("2020-01-01T00:00:00.000Z", 2184);
|
||||
expectedDocCount.put("2020-04-01T00:00:00.000Z", 2184);
|
||||
expectedDocCount.put("2020-07-01T00:00:00.000Z", 2208);
|
||||
expectedDocCount.put("2020-10-01T00:00:00.000Z", 2208);
|
||||
assertThat(bucketCountsAsMap(adh), equalTo(expectedDocCount));
|
||||
Map<String, Double> expectedMax = new TreeMap<>();
|
||||
expectedMax.put("2020-01-01T00:00:00.000Z", 2183.0);
|
||||
expectedMax.put("2020-04-01T00:00:00.000Z", 4367.0);
|
||||
expectedMax.put("2020-07-01T00:00:00.000Z", 6575.0);
|
||||
expectedMax.put("2020-10-01T00:00:00.000Z", 8783.0);
|
||||
assertThat(maxAsMap(adh), equalTo(expectedMax));
|
||||
|
||||
StringTerms.Bucket b = terms.getBucketByKey("b");
|
||||
InternalAutoDateHistogram bdh = b.getAggregations().get("dh");
|
||||
expectedDocCount.clear();
|
||||
expectedDocCount.put("2020-01-01T00:00:00.000Z", 2184);
|
||||
expectedDocCount.put("2020-04-01T00:00:00.000Z", 2184);
|
||||
assertThat(bucketCountsAsMap(bdh), equalTo(expectedDocCount));
|
||||
expectedMax.clear();
|
||||
expectedMax.put("2020-01-01T00:00:00.000Z", 2183.0);
|
||||
expectedMax.put("2020-04-01T00:00:00.000Z", 4367.0);
|
||||
assertThat(maxAsMap(bdh), equalTo(expectedMax));
|
||||
|
||||
StringTerms.Bucket c = terms.getBucketByKey("c");
|
||||
InternalAutoDateHistogram cdh = c.getAggregations().get("dh");
|
||||
expectedDocCount.clear();
|
||||
expectedDocCount.put("2020-07-01T00:00:00.000Z", 2208);
|
||||
expectedDocCount.put("2020-10-01T00:00:00.000Z", 2208);
|
||||
assertThat(bucketCountsAsMap(cdh), equalTo(expectedDocCount));
|
||||
expectedMax.clear();
|
||||
expectedMax.put("2020-07-01T00:00:00.000Z", 6575.0);
|
||||
expectedMax.put("2020-10-01T00:00:00.000Z", 8783.0);
|
||||
assertThat(maxAsMap(cdh), equalTo(expectedMax));
|
||||
});
|
||||
}
|
||||
|
||||
public void testAsSubAggInManyBuckets() throws IOException {
|
||||
CheckedBiConsumer<RandomIndexWriter, DateFieldMapper.DateFieldType, IOException> buildIndex = (iw, dft) -> {
|
||||
long start = dft.parse("2020-01-01T00:00:00Z");
|
||||
long end = dft.parse("2021-01-01T00:00:00Z");
|
||||
long anHour = dft.resolution().convert(Instant.ofEpochSecond(TimeUnit.HOURS.toSeconds(1)));
|
||||
List<List<IndexableField>> docs = new ArrayList<>();
|
||||
int n = 0;
|
||||
for (long d = start; d < end; d += anHour) {
|
||||
docs.add(org.elasticsearch.common.collect.List.of(
|
||||
new SortedNumericDocValuesField(AGGREGABLE_DATE, d),
|
||||
new SortedNumericDocValuesField("n", n % 100)
|
||||
));
|
||||
n++;
|
||||
}
|
||||
/*
|
||||
* Intentionally add all documents at once to put them on the
|
||||
* same shard to make the reduce behavior consistent.
|
||||
*/
|
||||
iw.addDocuments(docs);
|
||||
};
|
||||
AggregationBuilder builder = new HistogramAggregationBuilder("n").field("n").interval(1).subAggregation(
|
||||
new AutoDateHistogramAggregationBuilder("dh").field(AGGREGABLE_DATE).setNumBuckets(4).subAggregation(
|
||||
new MaxAggregationBuilder("max").field("n")));
|
||||
asSubAggTestCase(builder, buildIndex, (InternalHistogram histo) -> {
|
||||
assertThat(histo.getBuckets(), hasSize(100));
|
||||
for (int n = 0; n < 100; n ++) {
|
||||
InternalHistogram.Bucket b = histo.getBuckets().get(n);
|
||||
InternalAutoDateHistogram dh = b.getAggregations().get("dh");
|
||||
assertThat(bucketCountsAsMap(dh), hasEntry(equalTo("2020-01-01T00:00:00.000Z"), either(equalTo(21)).or(equalTo(22))));
|
||||
assertThat(bucketCountsAsMap(dh), hasEntry(equalTo("2020-04-01T00:00:00.000Z"), either(equalTo(21)).or(equalTo(22))));
|
||||
assertThat(bucketCountsAsMap(dh), hasEntry(equalTo("2020-07-01T00:00:00.000Z"), either(equalTo(22)).or(equalTo(23))));
|
||||
assertThat(bucketCountsAsMap(dh), hasEntry(equalTo("2020-10-01T00:00:00.000Z"), either(equalTo(22)).or(equalTo(23))));
|
||||
Map<String, Double> expectedMax = new TreeMap<>();
|
||||
expectedMax.put("2020-01-01T00:00:00.000Z", (double) n);
|
||||
expectedMax.put("2020-04-01T00:00:00.000Z", (double) n);
|
||||
expectedMax.put("2020-07-01T00:00:00.000Z", (double) n);
|
||||
expectedMax.put("2020-10-01T00:00:00.000Z", (double) n);
|
||||
assertThat(maxAsMap(dh), equalTo(expectedMax));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void testNoDocs() throws IOException {
|
||||
final List<ZonedDateTime> dates = Collections.emptyList();
|
||||
final Consumer<AutoDateHistogramAggregationBuilder> aggregation = agg -> agg.setNumBuckets(10).field(DATE_FIELD);
|
||||
@ -244,20 +431,7 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
final long start = LocalDate.of(2015, 1, 1).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli();
|
||||
final long end = LocalDate.of(2017, 12, 31).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli();
|
||||
final Query rangeQuery = LongPoint.newRangeQuery(INSTANT_FIELD, start, end);
|
||||
testSearchCase(rangeQuery, DATES_WITH_TIME,
|
||||
aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD),
|
||||
histogram -> {
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(5, buckets.size());
|
||||
for (int i = 0; i < buckets.size(); i++) {
|
||||
final Histogram.Bucket bucket = buckets.get(i);
|
||||
assertEquals(DATES_WITH_TIME.get(5 + i), bucket.getKey());
|
||||
assertEquals(1, bucket.getDocCount());
|
||||
}
|
||||
assertTrue(AggregationInspectionHelper.hasValue(histogram));
|
||||
}
|
||||
);
|
||||
testSearchAndReduceCase(rangeQuery, DATES_WITH_TIME,
|
||||
testBothCases(rangeQuery, DATES_WITH_TIME,
|
||||
aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD),
|
||||
histogram -> {
|
||||
final ZonedDateTime startDate = ZonedDateTime.of(2015, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
|
||||
@ -282,29 +456,13 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
ZonedDateTime.of(2017, 3, 4, 0, 0, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 3, 5, 0, 0, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 3, 6, 0, 0, 0, 0, ZoneOffset.UTC));
|
||||
testSearchCase(DEFAULT_QUERY, datesForMonthInterval,
|
||||
aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), histogram -> {
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(datesForMonthInterval.size(), buckets.size());
|
||||
for (int i = 0; i < buckets.size(); i++) {
|
||||
final Histogram.Bucket bucket = buckets.get(i);
|
||||
assertEquals(datesForMonthInterval.get(i), bucket.getKey());
|
||||
assertEquals(1, bucket.getDocCount());
|
||||
}
|
||||
});
|
||||
testSearchAndReduceCase(DEFAULT_QUERY, datesForMonthInterval,
|
||||
Map<String, Integer> expectedDocCount = new TreeMap<>();
|
||||
expectedDocCount.put("2017-01-01T00:00:00.000Z", 1);
|
||||
expectedDocCount.put("2017-02-01T00:00:00.000Z", 2);
|
||||
expectedDocCount.put("2017-03-01T00:00:00.000Z", 3);
|
||||
testBothCases(DEFAULT_QUERY, datesForMonthInterval,
|
||||
aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD),
|
||||
histogram -> {
|
||||
final Map<ZonedDateTime, Integer> expectedDocCount = new HashMap<>();
|
||||
expectedDocCount.put(datesForMonthInterval.get(0).withDayOfMonth(1), 1);
|
||||
expectedDocCount.put(datesForMonthInterval.get(1).withDayOfMonth(1), 2);
|
||||
expectedDocCount.put(datesForMonthInterval.get(3).withDayOfMonth(1), 3);
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(expectedDocCount.size(), buckets.size());
|
||||
buckets.forEach(bucket ->
|
||||
assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount()));
|
||||
assertTrue(AggregationInspectionHelper.hasValue(histogram));
|
||||
}
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount))
|
||||
);
|
||||
}
|
||||
|
||||
@ -327,28 +485,19 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
ZonedDateTime.of(2017, 2, 3, 0, 0, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 2, 3, 0, 0, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 2, 5, 0, 0, 0, 0, ZoneOffset.UTC));
|
||||
final Map<ZonedDateTime, Integer> expectedDocCount = new HashMap<>();
|
||||
expectedDocCount.put(datesForDayInterval.get(0), 1);
|
||||
expectedDocCount.put(datesForDayInterval.get(1), 2);
|
||||
expectedDocCount.put(datesForDayInterval.get(3), 3);
|
||||
expectedDocCount.put(datesForDayInterval.get(6), 1);
|
||||
|
||||
Map<String, Integer> expectedDocCount = new TreeMap<>();
|
||||
expectedDocCount.put("2017-02-01T00:00:00.000Z", 1);
|
||||
expectedDocCount.put("2017-02-02T00:00:00.000Z", 2);
|
||||
expectedDocCount.put("2017-02-03T00:00:00.000Z", 3);
|
||||
expectedDocCount.put("2017-02-05T00:00:00.000Z", 1);
|
||||
testSearchCase(DEFAULT_QUERY, datesForDayInterval,
|
||||
aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), histogram -> {
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(expectedDocCount.size(), buckets.size());
|
||||
buckets.forEach(bucket ->
|
||||
assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount()));
|
||||
});
|
||||
aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD),
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount))
|
||||
);
|
||||
expectedDocCount.put("2017-02-04T00:00:00.000Z", 0);
|
||||
testSearchAndReduceCase(DEFAULT_QUERY, datesForDayInterval,
|
||||
aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD),
|
||||
histogram -> {
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(5, buckets.size());
|
||||
buckets.forEach(bucket ->
|
||||
assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount()));
|
||||
assertTrue(AggregationInspectionHelper.hasValue(histogram));
|
||||
}
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount))
|
||||
);
|
||||
}
|
||||
|
||||
@ -361,32 +510,20 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
ZonedDateTime.of(2017, 2, 3, 0, 0, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 2, 3, 0, 0, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 2, 5, 0, 0, 0, 0, ZoneOffset.UTC));
|
||||
Map<String, Integer> expectedDocCount = new TreeMap<>();
|
||||
expectedDocCount.put("2017-01-31T00:00:00.000-01:00", 1);
|
||||
expectedDocCount.put("2017-02-01T00:00:00.000-01:00", 2);
|
||||
expectedDocCount.put("2017-02-02T00:00:00.000-01:00", 3);
|
||||
expectedDocCount.put("2017-02-04T00:00:00.000-01:00", 1);
|
||||
testSearchCase(DEFAULT_QUERY, datesForDayInterval,
|
||||
aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD).timeZone(ZoneOffset.ofHours(-1)), histogram -> {
|
||||
final Map<String, Integer> expectedDocCount = new HashMap<>();
|
||||
expectedDocCount.put("2017-01-31T23:00:00.000-01:00", 1);
|
||||
expectedDocCount.put("2017-02-01T23:00:00.000-01:00", 2);
|
||||
expectedDocCount.put("2017-02-02T23:00:00.000-01:00", 3);
|
||||
expectedDocCount.put("2017-02-04T23:00:00.000-01:00", 1);
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(expectedDocCount.size(), buckets.size());
|
||||
buckets.forEach(bucket ->
|
||||
assertEquals(expectedDocCount.getOrDefault(bucket.getKeyAsString(), 0).longValue(), bucket.getDocCount()));
|
||||
assertTrue(AggregationInspectionHelper.hasValue(histogram));
|
||||
});
|
||||
aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD).timeZone(ZoneOffset.ofHours(-1)),
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount))
|
||||
);
|
||||
expectedDocCount.put("2017-02-03T00:00:00.000-01:00", 0);
|
||||
testSearchAndReduceCase(DEFAULT_QUERY, datesForDayInterval,
|
||||
aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD).timeZone(ZoneOffset.ofHours(-1)), histogram -> {
|
||||
final Map<String, Integer> expectedDocCount = new HashMap<>();
|
||||
expectedDocCount.put("2017-01-31T00:00:00.000-01:00", 1);
|
||||
expectedDocCount.put("2017-02-01T00:00:00.000-01:00", 2);
|
||||
expectedDocCount.put("2017-02-02T00:00:00.000-01:00", 3);
|
||||
expectedDocCount.put("2017-02-04T00:00:00.000-01:00", 1);
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(5, buckets.size());
|
||||
buckets.forEach(bucket ->
|
||||
assertEquals(expectedDocCount.getOrDefault(bucket.getKeyAsString(), 0).longValue(), bucket.getDocCount()));
|
||||
assertTrue(AggregationInspectionHelper.hasValue(histogram));
|
||||
});
|
||||
aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD).timeZone(ZoneOffset.ofHours(-1)),
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount))
|
||||
);
|
||||
}
|
||||
|
||||
public void testIntervalHour() throws IOException {
|
||||
@ -401,51 +538,36 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
ZonedDateTime.of(2017, 2, 1, 16, 6, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 2, 1, 16, 48, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 2, 1, 16, 59, 0, 0, ZoneOffset.UTC));
|
||||
Map<String, Integer> expectedDocCount = new TreeMap<>();
|
||||
expectedDocCount.put("2017-02-01T09:00:00.000Z", 2);
|
||||
expectedDocCount.put("2017-02-01T10:00:00.000Z", 1);
|
||||
expectedDocCount.put("2017-02-01T13:00:00.000Z", 1);
|
||||
expectedDocCount.put("2017-02-01T14:00:00.000Z", 2);
|
||||
expectedDocCount.put("2017-02-01T15:00:00.000Z", 1);
|
||||
expectedDocCount.put("2017-02-01T15:00:00.000Z", 1);
|
||||
expectedDocCount.put("2017-02-01T16:00:00.000Z", 3);
|
||||
testSearchCase(DEFAULT_QUERY, datesForHourInterval,
|
||||
aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD),
|
||||
histogram -> {
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(datesForHourInterval.size(), buckets.size());
|
||||
for (int i = 0; i < buckets.size(); i++) {
|
||||
final Histogram.Bucket bucket = buckets.get(i);
|
||||
assertEquals(datesForHourInterval.get(i), bucket.getKey());
|
||||
assertEquals(1, bucket.getDocCount());
|
||||
}
|
||||
}
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount))
|
||||
);
|
||||
expectedDocCount.put("2017-02-01T11:00:00.000Z", 0);
|
||||
expectedDocCount.put("2017-02-01T12:00:00.000Z", 0);
|
||||
testSearchAndReduceCase(DEFAULT_QUERY, datesForHourInterval,
|
||||
aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD),
|
||||
histogram -> {
|
||||
final Map<ZonedDateTime, Integer> expectedDocCount = new HashMap<>();
|
||||
expectedDocCount.put(datesForHourInterval.get(0).withMinute(0), 2);
|
||||
expectedDocCount.put(datesForHourInterval.get(2).withMinute(0), 1);
|
||||
expectedDocCount.put(datesForHourInterval.get(3).withMinute(0), 1);
|
||||
expectedDocCount.put(datesForHourInterval.get(4).withMinute(0), 2);
|
||||
expectedDocCount.put(datesForHourInterval.get(6).withMinute(0), 1);
|
||||
expectedDocCount.put(datesForHourInterval.get(7).withMinute(0), 3);
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(8, buckets.size());
|
||||
buckets.forEach(bucket ->
|
||||
assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount()));
|
||||
}
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount))
|
||||
);
|
||||
expectedDocCount.clear();
|
||||
expectedDocCount.put("2017-02-01T09:00:00.000Z", 3);
|
||||
expectedDocCount.put("2017-02-01T12:00:00.000Z", 3);
|
||||
expectedDocCount.put("2017-02-01T15:00:00.000Z", 4);
|
||||
testSearchAndReduceCase(DEFAULT_QUERY, datesForHourInterval,
|
||||
aggregation -> aggregation.setNumBuckets(6).field(DATE_FIELD),
|
||||
histogram -> {
|
||||
final Map<ZonedDateTime, Integer> expectedDocCount = new HashMap<>();
|
||||
expectedDocCount.put(datesForHourInterval.get(0).withMinute(0), 3);
|
||||
expectedDocCount.put(datesForHourInterval.get(0).plusHours(3).withMinute(0), 3);
|
||||
expectedDocCount.put(datesForHourInterval.get(0).plusHours(6).withMinute(0), 4);
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(expectedDocCount.size(), buckets.size());
|
||||
buckets.forEach(bucket ->
|
||||
assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount()));
|
||||
}
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount))
|
||||
);
|
||||
}
|
||||
|
||||
public void testIntervalHourWithTZ() throws IOException {
|
||||
final List<ZonedDateTime> datesForHourInterval = Arrays.asList(
|
||||
List<ZonedDateTime> datesForHourInterval = Arrays.asList(
|
||||
ZonedDateTime.of(2017, 2, 1, 9, 2, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 2, 1, 9, 35, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 2, 1, 10, 15, 0, 0, ZoneOffset.UTC),
|
||||
@ -456,36 +578,22 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
ZonedDateTime.of(2017, 2, 1, 16, 6, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 2, 1, 16, 48, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 2, 1, 16, 59, 0, 0, ZoneOffset.UTC));
|
||||
Map<String, Integer> expectedDocCount = new TreeMap<>();
|
||||
expectedDocCount.put("2017-02-01T08:00:00.000-01:00", 2);
|
||||
expectedDocCount.put("2017-02-01T09:00:00.000-01:00", 1);
|
||||
expectedDocCount.put("2017-02-01T12:00:00.000-01:00", 1);
|
||||
expectedDocCount.put("2017-02-01T13:00:00.000-01:00", 2);
|
||||
expectedDocCount.put("2017-02-01T14:00:00.000-01:00", 1);
|
||||
expectedDocCount.put("2017-02-01T15:00:00.000-01:00", 3);
|
||||
testSearchCase(DEFAULT_QUERY, datesForHourInterval,
|
||||
aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD).timeZone(ZoneOffset.ofHours(-1)),
|
||||
histogram -> {
|
||||
final List<String> dateStrings = datesForHourInterval.stream()
|
||||
.map(dateTime -> DateFormatter.forPattern("strict_date_time")
|
||||
.format(dateTime.withZoneSameInstant(ZoneOffset.ofHours(-1)))).collect(Collectors.toList());
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(datesForHourInterval.size(), buckets.size());
|
||||
for (int i = 0; i < buckets.size(); i++) {
|
||||
final Histogram.Bucket bucket = buckets.get(i);
|
||||
assertEquals(dateStrings.get(i), bucket.getKeyAsString());
|
||||
assertEquals(1, bucket.getDocCount());
|
||||
}
|
||||
}
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount))
|
||||
);
|
||||
expectedDocCount.put("2017-02-01T10:00:00.000-01:00", 0);
|
||||
expectedDocCount.put("2017-02-01T11:00:00.000-01:00", 0);
|
||||
testSearchAndReduceCase(DEFAULT_QUERY, datesForHourInterval,
|
||||
aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD).timeZone(ZoneOffset.ofHours(-1)),
|
||||
histogram -> {
|
||||
final Map<String, Integer> expectedDocCount = new HashMap<>();
|
||||
expectedDocCount.put("2017-02-01T08:00:00.000-01:00", 2);
|
||||
expectedDocCount.put("2017-02-01T09:00:00.000-01:00", 1);
|
||||
expectedDocCount.put("2017-02-01T12:00:00.000-01:00", 1);
|
||||
expectedDocCount.put("2017-02-01T13:00:00.000-01:00", 2);
|
||||
expectedDocCount.put("2017-02-01T14:00:00.000-01:00", 1);
|
||||
expectedDocCount.put("2017-02-01T15:00:00.000-01:00", 3);
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(8, buckets.size());
|
||||
buckets.forEach(bucket ->
|
||||
assertEquals(expectedDocCount.getOrDefault(bucket.getKeyAsString(), 0).longValue(), bucket.getDocCount()));
|
||||
}
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount))
|
||||
);
|
||||
}
|
||||
|
||||
@ -685,31 +793,35 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
ZonedDateTime.of(2017, 2, 1, 9, 15, 37, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 2, 1, 9, 16, 4, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 2, 1, 9, 16, 42, 0, ZoneOffset.UTC));
|
||||
|
||||
Map<String, Integer> skeletonDocCount = new TreeMap<>();
|
||||
skeletonDocCount.put("2017-02-01T09:02:00.000Z", 2);
|
||||
skeletonDocCount.put("2017-02-01T09:15:00.000Z", 1);
|
||||
skeletonDocCount.put("2017-02-01T09:16:00.000Z", 2);
|
||||
testSearchCase(DEFAULT_QUERY, datesForMinuteInterval,
|
||||
aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD),
|
||||
histogram -> {
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(datesForMinuteInterval.size(), buckets.size());
|
||||
for (int i = 0; i < buckets.size(); i++) {
|
||||
final Histogram.Bucket bucket = buckets.get(i);
|
||||
assertEquals(datesForMinuteInterval.get(i), bucket.getKey());
|
||||
assertEquals(1, bucket.getDocCount());
|
||||
}
|
||||
}
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(skeletonDocCount))
|
||||
);
|
||||
Map<String, Integer> fullDocCount = new TreeMap<>();
|
||||
fullDocCount.put("2017-02-01T09:02:00.000Z", 2);
|
||||
fullDocCount.put("2017-02-01T09:07:00.000Z", 0);
|
||||
fullDocCount.put("2017-02-01T09:12:00.000Z", 3);
|
||||
testSearchAndReduceCase(DEFAULT_QUERY, datesForMinuteInterval,
|
||||
aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD),
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(fullDocCount))
|
||||
);
|
||||
|
||||
testSearchCase(DEFAULT_QUERY, datesForMinuteInterval,
|
||||
aggregation -> aggregation.setNumBuckets(15).field(DATE_FIELD),
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(skeletonDocCount))
|
||||
);
|
||||
fullDocCount.clear();
|
||||
fullDocCount.putAll(skeletonDocCount);
|
||||
for (int minute = 3; minute < 15; minute++) {
|
||||
fullDocCount.put(String.format(Locale.ROOT, "2017-02-01T09:%02d:00.000Z", minute), 0);
|
||||
}
|
||||
testSearchAndReduceCase(DEFAULT_QUERY, datesForMinuteInterval,
|
||||
aggregation -> aggregation.setNumBuckets(15).field(DATE_FIELD),
|
||||
histogram -> {
|
||||
final Map<ZonedDateTime, Integer> expectedDocCount = new HashMap<>();
|
||||
expectedDocCount.put(datesForMinuteInterval.get(0).withSecond(0), 2);
|
||||
expectedDocCount.put(datesForMinuteInterval.get(2).withSecond(0), 1);
|
||||
expectedDocCount.put(datesForMinuteInterval.get(3).withSecond(0), 2);
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(15, buckets.size());
|
||||
buckets.forEach(bucket ->
|
||||
assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount()));
|
||||
}
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(fullDocCount))
|
||||
);
|
||||
}
|
||||
|
||||
@ -721,27 +833,21 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
ZonedDateTime.of(2017, 2, 1, 0, 0, 11, 688, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 2, 1, 0, 0, 11, 210, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2017, 2, 1, 0, 0, 11, 380, ZoneOffset.UTC));
|
||||
final ZonedDateTime startDate = datesForSecondInterval.get(0).withNano(0);
|
||||
final Map<ZonedDateTime, Integer> expectedDocCount = new HashMap<>();
|
||||
expectedDocCount.put(startDate, 1);
|
||||
expectedDocCount.put(startDate.plusSeconds(2), 2);
|
||||
expectedDocCount.put(startDate.plusSeconds(6), 3);
|
||||
|
||||
Map<String, Integer> expectedDocCount = new TreeMap<>();
|
||||
expectedDocCount.put("2017-02-01T00:00:05.000Z", 1);
|
||||
expectedDocCount.put("2017-02-01T00:00:07.000Z", 2);
|
||||
expectedDocCount.put("2017-02-01T00:00:11.000Z", 3);
|
||||
testSearchCase(DEFAULT_QUERY, datesForSecondInterval,
|
||||
aggregation -> aggregation.setNumBuckets(7).field(DATE_FIELD), histogram -> {
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(expectedDocCount.size(), buckets.size());
|
||||
buckets.forEach(bucket ->
|
||||
assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount()));
|
||||
});
|
||||
aggregation -> aggregation.setNumBuckets(7).field(DATE_FIELD),
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount))
|
||||
);
|
||||
expectedDocCount.put("2017-02-01T00:00:06.000Z", 0);
|
||||
expectedDocCount.put("2017-02-01T00:00:08.000Z", 0);
|
||||
expectedDocCount.put("2017-02-01T00:00:09.000Z", 0);
|
||||
expectedDocCount.put("2017-02-01T00:00:10.000Z", 0);
|
||||
testSearchAndReduceCase(DEFAULT_QUERY, datesForSecondInterval,
|
||||
aggregation -> aggregation.setNumBuckets(7).field(DATE_FIELD),
|
||||
histogram -> {
|
||||
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(7, buckets.size());
|
||||
buckets.forEach(bucket ->
|
||||
assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount()));
|
||||
}
|
||||
result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount))
|
||||
);
|
||||
}
|
||||
|
||||
@ -859,6 +965,25 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, Integer> bucketCountsAsMap(InternalAutoDateHistogram result) {
|
||||
LinkedHashMap<String, Integer> map = new LinkedHashMap<>(result.getBuckets().size());
|
||||
result.getBuckets().stream().forEach(b -> {
|
||||
Object old = map.put(b.getKeyAsString(), Math.toIntExact(b.getDocCount()));
|
||||
assertNull(old);
|
||||
});
|
||||
return map;
|
||||
}
|
||||
|
||||
private Map<String, Double> maxAsMap(InternalAutoDateHistogram result) {
|
||||
LinkedHashMap<String, Double> map = new LinkedHashMap<>(result.getBuckets().size());
|
||||
result.getBuckets().stream().forEach(b -> {
|
||||
InternalMax max = b.getAggregations().get("max");
|
||||
Object old = map.put(b.getKeyAsString(), max.getValue());
|
||||
assertNull(old);
|
||||
});
|
||||
return map;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) {
|
||||
/*
|
||||
|
@ -0,0 +1,104 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.bucket.histogram;
|
||||
|
||||
import org.apache.lucene.document.SortedNumericDocValuesField;
|
||||
import org.apache.lucene.document.SortedSetDocValuesField;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.CheckedBiConsumer;
|
||||
import org.elasticsearch.index.mapper.DateFieldMapper;
|
||||
import org.elasticsearch.index.mapper.KeywordFieldMapper;
|
||||
import org.elasticsearch.index.mapper.NumberFieldMapper;
|
||||
import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public abstract class DateHistogramAggregatorTestCase extends AggregatorTestCase {
|
||||
/**
|
||||
* A date that is always "aggregable" because it has doc values but may or
|
||||
* may not have a search index. If it doesn't then we can't use our fancy
|
||||
* date rounding mechanism that needs to know the minimum and maximum dates
|
||||
* it is going to round because it ready *that* out of the search index.
|
||||
*/
|
||||
protected static final String AGGREGABLE_DATE = "aggregable_date";
|
||||
|
||||
protected final <R extends InternalAggregation> void asSubAggTestCase(AggregationBuilder builder, Consumer<R> verify)
|
||||
throws IOException {
|
||||
CheckedBiConsumer<RandomIndexWriter, DateFieldMapper.DateFieldType, IOException> buildIndex = (iw, dft) -> {
|
||||
iw.addDocument(org.elasticsearch.common.collect.List.of(
|
||||
new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2020-02-01T00:00:00Z")),
|
||||
new SortedSetDocValuesField("k1", new BytesRef("a")),
|
||||
new SortedSetDocValuesField("k2", new BytesRef("a")),
|
||||
new SortedNumericDocValuesField("n", 1)
|
||||
));
|
||||
iw.addDocument(org.elasticsearch.common.collect.List.of(
|
||||
new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2020-03-01T00:00:00Z")),
|
||||
new SortedSetDocValuesField("k1", new BytesRef("a")),
|
||||
new SortedSetDocValuesField("k2", new BytesRef("a")),
|
||||
new SortedNumericDocValuesField("n", 2)
|
||||
));
|
||||
iw.addDocument(org.elasticsearch.common.collect.List.of(
|
||||
new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2021-02-01T00:00:00Z")),
|
||||
new SortedSetDocValuesField("k1", new BytesRef("a")),
|
||||
new SortedSetDocValuesField("k2", new BytesRef("a")),
|
||||
new SortedNumericDocValuesField("n", 3)
|
||||
));
|
||||
iw.addDocument(org.elasticsearch.common.collect.List.of(
|
||||
new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2021-03-01T00:00:00Z")),
|
||||
new SortedSetDocValuesField("k1", new BytesRef("a")),
|
||||
new SortedSetDocValuesField("k2", new BytesRef("b")),
|
||||
new SortedNumericDocValuesField("n", 4)
|
||||
));
|
||||
iw.addDocument(org.elasticsearch.common.collect.List.of(
|
||||
new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2020-02-01T00:00:00Z")),
|
||||
new SortedSetDocValuesField("k1", new BytesRef("b")),
|
||||
new SortedSetDocValuesField("k2", new BytesRef("b")),
|
||||
new SortedNumericDocValuesField("n", 5)
|
||||
));
|
||||
};
|
||||
asSubAggTestCase(builder, buildIndex, verify);
|
||||
}
|
||||
|
||||
protected final <R extends InternalAggregation> void asSubAggTestCase(
|
||||
AggregationBuilder builder,
|
||||
CheckedBiConsumer<RandomIndexWriter, DateFieldMapper.DateFieldType, IOException> buildIndex,
|
||||
Consumer<R> verify
|
||||
) throws IOException {
|
||||
KeywordFieldMapper.KeywordFieldType k1ft = new KeywordFieldMapper.KeywordFieldType("k1");
|
||||
KeywordFieldMapper.KeywordFieldType k2ft = new KeywordFieldMapper.KeywordFieldType("k2");
|
||||
NumberFieldMapper.NumberFieldType nft = new NumberFieldMapper.NumberFieldType("n", NumberType.LONG);
|
||||
DateFieldMapper.DateFieldType dft = aggregableDateFieldType(false, randomBoolean());
|
||||
testCase(builder, new MatchAllDocsQuery(), iw -> buildIndex.accept(iw, dft), verify, k1ft, k2ft, nft, dft);
|
||||
}
|
||||
|
||||
protected final DateFieldMapper.DateFieldType aggregableDateFieldType(boolean useNanosecondResolution, boolean isSearchable) {
|
||||
return new DateFieldMapper.DateFieldType(AGGREGABLE_DATE, isSearchable, true,
|
||||
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER,
|
||||
useNanosecondResolution ? DateFieldMapper.Resolution.NANOSECONDS : DateFieldMapper.Resolution.MILLISECONDS,
|
||||
Collections.emptyMap());
|
||||
}
|
||||
}
|
@ -22,7 +22,6 @@ package org.elasticsearch.search.aggregations.bucket.histogram;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.LongPoint;
|
||||
import org.apache.lucene.document.SortedNumericDocValuesField;
|
||||
import org.apache.lucene.document.SortedSetDocValuesField;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
@ -31,13 +30,9 @@ import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.MatchNoDocsQuery;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.time.DateFormatters;
|
||||
import org.elasticsearch.index.mapper.DateFieldMapper;
|
||||
import org.elasticsearch.index.mapper.KeywordFieldMapper;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
||||
import org.elasticsearch.search.aggregations.BucketOrder;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
|
||||
@ -53,15 +48,7 @@ import java.util.function.Consumer;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class DateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
|
||||
/**
|
||||
* A date that is always "aggregable" because it has doc values but may or
|
||||
* may not have a search index. If it doesn't then we can't use our fancy
|
||||
* date rounding mechanism that needs to know the minimum and maximum dates
|
||||
* it is going to round because it ready *that* out of the search index.
|
||||
*/
|
||||
private static final String AGGREGABLE_DATE = "aggregable_date";
|
||||
public class DateHistogramAggregatorTests extends DateHistogramAggregatorTestCase {
|
||||
/**
|
||||
* A date that is always "searchable" because it is indexed.
|
||||
*/
|
||||
@ -169,39 +156,9 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
}
|
||||
|
||||
public void testAsSubAgg() throws IOException {
|
||||
KeywordFieldMapper.KeywordFieldType k1ft = new KeywordFieldMapper.KeywordFieldType("k1");
|
||||
KeywordFieldMapper.KeywordFieldType k2ft = new KeywordFieldMapper.KeywordFieldType("k2");
|
||||
DateFieldMapper.DateFieldType dft = aggregableDateFieldType(false, randomBoolean());
|
||||
CheckedConsumer<RandomIndexWriter, IOException> buildIndex = iw -> {
|
||||
iw.addDocument(org.elasticsearch.common.collect.List.of(
|
||||
new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2020-02-01T00:00:00Z")),
|
||||
new SortedSetDocValuesField("k1", new BytesRef("a")),
|
||||
new SortedSetDocValuesField("k2", new BytesRef("a"))
|
||||
));
|
||||
iw.addDocument(org.elasticsearch.common.collect.List.of(
|
||||
new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2020-03-01T00:00:00Z")),
|
||||
new SortedSetDocValuesField("k1", new BytesRef("a")),
|
||||
new SortedSetDocValuesField("k2", new BytesRef("a"))
|
||||
));
|
||||
iw.addDocument(org.elasticsearch.common.collect.List.of(
|
||||
new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2021-02-01T00:00:00Z")),
|
||||
new SortedSetDocValuesField("k1", new BytesRef("a")),
|
||||
new SortedSetDocValuesField("k2", new BytesRef("a"))
|
||||
));
|
||||
iw.addDocument(org.elasticsearch.common.collect.List.of(
|
||||
new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2021-03-01T00:00:00Z")),
|
||||
new SortedSetDocValuesField("k1", new BytesRef("a")),
|
||||
new SortedSetDocValuesField("k2", new BytesRef("b"))
|
||||
));
|
||||
iw.addDocument(org.elasticsearch.common.collect.List.of(
|
||||
new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2020-02-01T00:00:00Z")),
|
||||
new SortedSetDocValuesField("k1", new BytesRef("b")),
|
||||
new SortedSetDocValuesField("k2", new BytesRef("b"))
|
||||
));
|
||||
};
|
||||
AggregationBuilder builder = new TermsAggregationBuilder("k1").field("k1").subAggregation(
|
||||
new DateHistogramAggregationBuilder("dh").field(AGGREGABLE_DATE).calendarInterval(DateHistogramInterval.YEAR));
|
||||
testCase(builder, new MatchAllDocsQuery(), buildIndex, (StringTerms terms) -> {
|
||||
new DateHistogramAggregationBuilder("dh").field(AGGREGABLE_DATE).calendarInterval(DateHistogramInterval.YEAR));
|
||||
asSubAggTestCase(builder, (StringTerms terms) -> {
|
||||
StringTerms.Bucket a = terms.getBucketByKey("a");
|
||||
InternalDateHistogram adh = a.getAggregations().get("dh");
|
||||
assertThat(adh.getBuckets().stream().map(bucket -> bucket.getKey().toString()).collect(toList()),
|
||||
@ -210,33 +167,37 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
|
||||
StringTerms.Bucket b = terms.getBucketByKey("b");
|
||||
InternalDateHistogram bdh = b.getAggregations().get("dh");
|
||||
assertThat(bdh.getBuckets().stream().map(bucket -> bucket.getKey().toString()).collect(toList()),
|
||||
equalTo(org.elasticsearch.common.collect.List.of("2020-01-01T00:00Z")
|
||||
));
|
||||
}, k1ft, dft);
|
||||
assertThat(
|
||||
bdh.getBuckets().stream().map(bucket -> bucket.getKey().toString()).collect(toList()),
|
||||
equalTo(org.elasticsearch.common.collect.List.of("2020-01-01T00:00Z"))
|
||||
);
|
||||
});
|
||||
builder = new TermsAggregationBuilder("k2").field("k2").subAggregation(builder);
|
||||
testCase(builder, new MatchAllDocsQuery(), buildIndex, (StringTerms terms) -> {
|
||||
asSubAggTestCase(builder, (StringTerms terms) -> {
|
||||
StringTerms.Bucket a = terms.getBucketByKey("a");
|
||||
StringTerms ak1 = a.getAggregations().get("k1");
|
||||
StringTerms.Bucket ak1a = ak1.getBucketByKey("a");
|
||||
InternalDateHistogram ak1adh = ak1a.getAggregations().get("dh");
|
||||
assertThat(ak1adh.getBuckets().stream().map(bucket -> bucket.getKey().toString()).collect(toList()),
|
||||
equalTo(org.elasticsearch.common.collect.List.of("2020-01-01T00:00Z", "2021-01-01T00:00Z")
|
||||
));
|
||||
assertThat(
|
||||
ak1adh.getBuckets().stream().map(bucket -> bucket.getKey().toString()).collect(toList()),
|
||||
equalTo(org.elasticsearch.common.collect.List.of("2020-01-01T00:00Z", "2021-01-01T00:00Z"))
|
||||
);
|
||||
|
||||
StringTerms.Bucket b = terms.getBucketByKey("b");
|
||||
StringTerms bk1 = b.getAggregations().get("k1");
|
||||
StringTerms.Bucket bk1a = bk1.getBucketByKey("a");
|
||||
InternalDateHistogram bk1adh = bk1a.getAggregations().get("dh");
|
||||
assertThat(bk1adh.getBuckets().stream().map(bucket -> bucket.getKey().toString()).collect(toList()),
|
||||
equalTo(org.elasticsearch.common.collect.List.of("2021-01-01T00:00Z")
|
||||
));
|
||||
assertThat(
|
||||
bk1adh.getBuckets().stream().map(bucket -> bucket.getKey().toString()).collect(toList()),
|
||||
equalTo(org.elasticsearch.common.collect.List.of("2021-01-01T00:00Z"))
|
||||
);
|
||||
StringTerms.Bucket bk1b = bk1.getBucketByKey("b");
|
||||
InternalDateHistogram bk1bdh = bk1b.getAggregations().get("dh");
|
||||
assertThat(bk1bdh.getBuckets().stream().map(bucket -> bucket.getKey().toString()).collect(toList()),
|
||||
equalTo(org.elasticsearch.common.collect.List.of("2020-01-01T00:00Z")
|
||||
));
|
||||
}, k1ft, k2ft, dft);
|
||||
assertThat(
|
||||
bk1bdh.getBuckets().stream().map(bucket -> bucket.getKey().toString()).collect(toList()),
|
||||
equalTo(org.elasticsearch.common.collect.List.of("2020-01-01T00:00Z"))
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
public void testNoDocsDeprecatedInterval() throws IOException {
|
||||
@ -1274,13 +1235,6 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
private DateFieldMapper.DateFieldType aggregableDateFieldType(boolean useNanosecondResolution, boolean isSearchable) {
|
||||
return new DateFieldMapper.DateFieldType(AGGREGABLE_DATE, isSearchable, true,
|
||||
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER,
|
||||
useNanosecondResolution ? DateFieldMapper.Resolution.NANOSECONDS : DateFieldMapper.Resolution.MILLISECONDS,
|
||||
Collections.emptyMap());
|
||||
}
|
||||
|
||||
private static long asLong(String dateTime) {
|
||||
return DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(dateTime)).toInstant().toEpochMilli();
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
|
||||
public class LongKeyedBucketOrdsTests extends ESTestCase {
|
||||
private final MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
|
||||
@ -96,6 +97,8 @@ public class LongKeyedBucketOrdsTests extends ESTestCase {
|
||||
assertThat(ordsEnum.value(), equalTo(values[i]));
|
||||
}
|
||||
assertFalse(ordsEnum.next());
|
||||
|
||||
assertThat(ords.maxOwningBucketOrd(), equalTo(0L));
|
||||
} finally {
|
||||
ords.close();
|
||||
}
|
||||
@ -164,6 +167,8 @@ public class LongKeyedBucketOrdsTests extends ESTestCase {
|
||||
}
|
||||
assertFalse(ords.ordsEnum(randomLongBetween(maxOwningBucketOrd + 1, Long.MAX_VALUE)).next());
|
||||
assertThat(ords.bucketsInOrd(randomLongBetween(maxOwningBucketOrd + 1, Long.MAX_VALUE)), equalTo(0L));
|
||||
|
||||
assertThat(ords.maxOwningBucketOrd(), greaterThanOrEqualTo(maxOwningBucketOrd));
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user