mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 09:28:27 +00:00
When the `terms` enum operates on non-numeric data it can collect it via global ordinals. It actually has two separate collection strategies for, one "dense" and one "remapping". Each of *those* strategies has two "iteration" strategies that it uses to build buckets, depending on whether or not we need buckets with `0` docs in them. Previously this was done with several `null` checks and never really explained. This change replaces those checks with two `CollectionStrategy` classes which have good stuff like documentation.
This commit is contained in:
parent
24d605e41e
commit
b9fe10866e
@ -822,6 +822,7 @@ setup:
|
||||
- match: { profile.shards.0.aggregations.0.description: str_terms }
|
||||
- match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 }
|
||||
- match: { profile.shards.0.aggregations.0.debug.deferred_aggregators: [ max_number ] }
|
||||
- match: { profile.shards.0.aggregations.0.debug.collection_strategy: dense }
|
||||
- match: { profile.shards.0.aggregations.0.children.0.type: MaxAggregator }
|
||||
- match: { profile.shards.0.aggregations.0.children.0.description: max_number }
|
||||
|
||||
|
@ -96,50 +96,33 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
|
||||
long subsetSize = numCollectedDocs;
|
||||
|
||||
BucketSignificancePriorityQueue<SignificantStringTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size);
|
||||
SignificantStringTerms.Bucket spare = null;
|
||||
final boolean needsFullScan = bucketOrds == null || bucketCountThresholds.getMinDocCount() == 0;
|
||||
final long maxId = needsFullScan ? valueCount : bucketOrds.size();
|
||||
for (long ord = 0; ord < maxId; ord++) {
|
||||
final long globalOrd;
|
||||
final long bucketOrd;
|
||||
if (needsFullScan) {
|
||||
bucketOrd = bucketOrds == null ? ord : bucketOrds.find(ord);
|
||||
globalOrd = ord;
|
||||
} else {
|
||||
assert bucketOrds != null;
|
||||
bucketOrd = ord;
|
||||
globalOrd = bucketOrds.get(ord);
|
||||
}
|
||||
if (includeExclude != null && !acceptedGlobalOrdinals.get(globalOrd)) {
|
||||
continue;
|
||||
}
|
||||
final int bucketDocCount = bucketOrd < 0 ? 0 : bucketDocCount(bucketOrd);
|
||||
if (bucketCountThresholds.getMinDocCount() > 0 && bucketDocCount == 0) {
|
||||
continue;
|
||||
}
|
||||
if (bucketDocCount < bucketCountThresholds.getShardMinDocCount()) {
|
||||
continue;
|
||||
}
|
||||
collectionStrategy.forEach(new BucketInfoConsumer() {
|
||||
SignificantStringTerms.Bucket spare = null;
|
||||
|
||||
if (spare == null) {
|
||||
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
|
||||
@Override
|
||||
public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
|
||||
if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
|
||||
if (spare == null) {
|
||||
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
|
||||
}
|
||||
spare.bucketOrd = bucketOrd;
|
||||
copy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
|
||||
spare.subsetDf = docCount;
|
||||
spare.subsetSize = subsetSize;
|
||||
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
|
||||
spare.supersetSize = supersetSize;
|
||||
// During shard-local down-selection we use subset/superset stats
|
||||
// that are for this shard only
|
||||
// Back at the central reducer these properties will be updated with
|
||||
// global stats
|
||||
spare.updateScore(significanceHeuristic);
|
||||
spare = ordered.insertWithOverflow(spare);
|
||||
if (spare == null) {
|
||||
consumeBucketsAndMaybeBreak(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
spare.bucketOrd = bucketOrd;
|
||||
copy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
|
||||
spare.subsetDf = bucketDocCount;
|
||||
spare.subsetSize = subsetSize;
|
||||
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
|
||||
spare.supersetSize = supersetSize;
|
||||
// During shard-local down-selection we use subset/superset stats
|
||||
// that are for this shard only
|
||||
// Back at the central reducer these properties will be updated with
|
||||
// global stats
|
||||
spare.updateScore(significanceHeuristic);
|
||||
spare = ordered.insertWithOverflow(spare);
|
||||
if (spare == null) {
|
||||
consumeBucketsAndMaybeBreak(1);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
|
||||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
||||
|
@ -26,8 +26,8 @@ import org.apache.lucene.index.SortedDocValues;
|
||||
import org.apache.lucene.index.SortedSetDocValues;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.LongBitSet;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.util.IntArray;
|
||||
import org.elasticsearch.common.util.LongHash;
|
||||
@ -38,7 +38,6 @@ import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.aggregations.BucketOrder;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.LeafBucketCollector;
|
||||
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
||||
@ -47,6 +46,8 @@ import org.elasticsearch.search.internal.SearchContext;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.LongPredicate;
|
||||
import java.util.function.LongUnaryOperator;
|
||||
|
||||
import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;
|
||||
@ -57,7 +58,6 @@ import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;
|
||||
public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggregator {
|
||||
|
||||
protected final ValuesSource.Bytes.WithOrdinals valuesSource;
|
||||
protected final IncludeExclude.OrdinalsFilter includeExclude;
|
||||
|
||||
// TODO: cache the acceptedglobalValues per aggregation definition.
|
||||
// We can't cache this yet in ValuesSource, since ValuesSource is reused per field for aggs during the execution.
|
||||
@ -65,11 +65,10 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
||||
// first defined one.
|
||||
// So currently for each instance of this aggregator the acceptedglobalValues will be computed, this is unnecessary
|
||||
// especially if this agg is on a second layer or deeper.
|
||||
protected final LongBitSet acceptedGlobalOrdinals;
|
||||
protected final LongPredicate acceptedGlobalOrdinals;
|
||||
protected final long valueCount;
|
||||
protected final GlobalOrdLookupFunction lookupGlobalOrd;
|
||||
|
||||
protected final LongHash bucketOrds;
|
||||
protected final CollectionStrategy collectionStrategy;
|
||||
|
||||
public interface GlobalOrdLookupFunction {
|
||||
BytesRef apply(long ord) throws IOException;
|
||||
@ -89,32 +88,17 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
||||
Map<String, Object> metadata) throws IOException {
|
||||
super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata);
|
||||
this.valuesSource = valuesSource;
|
||||
this.includeExclude = includeExclude;
|
||||
final IndexReader reader = context.searcher().getIndexReader();
|
||||
final SortedSetDocValues values = reader.leaves().size() > 0 ?
|
||||
valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0)) : DocValues.emptySortedSet();
|
||||
this.valueCount = values.getValueCount();
|
||||
this.lookupGlobalOrd = values::lookupOrd;
|
||||
this.acceptedGlobalOrdinals = includeExclude != null ? includeExclude.acceptedGlobalOrdinals(values) : null;
|
||||
this.bucketOrds = remapGlobalOrds ? new LongHash(1, context.bigArrays()) : null;
|
||||
this.acceptedGlobalOrdinals = includeExclude == null ? l -> true : includeExclude.acceptedGlobalOrdinals(values)::get;
|
||||
this.collectionStrategy = remapGlobalOrds ? new RemapGlobalOrds() : new DenseGlobalOrds();
|
||||
}
|
||||
|
||||
boolean remapGlobalOrds() {
|
||||
return bucketOrds != null;
|
||||
}
|
||||
|
||||
private void collectGlobalOrd(int doc, long globalOrd, LeafBucketCollector sub) throws IOException {
|
||||
if (bucketOrds == null) {
|
||||
collectExistingBucket(sub, doc, globalOrd);
|
||||
} else {
|
||||
long bucketOrd = bucketOrds.add(globalOrd);
|
||||
if (bucketOrd < 0) {
|
||||
bucketOrd = -1 - bucketOrd;
|
||||
collectExistingBucket(sub, doc, bucketOrd);
|
||||
} else {
|
||||
collectBucket(sub, doc, bucketOrd);
|
||||
}
|
||||
}
|
||||
String descriptCollectionStrategy() {
|
||||
return collectionStrategy.describe();
|
||||
}
|
||||
|
||||
private SortedSetDocValues getGlobalOrds(LeafReaderContext ctx) throws IOException {
|
||||
@ -124,35 +108,32 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
||||
|
||||
@Override
|
||||
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
|
||||
final SortedSetDocValues globalOrds = getGlobalOrds(ctx);
|
||||
if (bucketOrds == null) {
|
||||
grow(globalOrds.getValueCount());
|
||||
}
|
||||
final SortedDocValues singleValues = DocValues.unwrapSingleton(globalOrds);
|
||||
SortedSetDocValues globalOrds = getGlobalOrds(ctx);
|
||||
collectionStrategy.globalOrdsReady(globalOrds);
|
||||
SortedDocValues singleValues = DocValues.unwrapSingleton(globalOrds);
|
||||
if (singleValues != null) {
|
||||
return new LeafBucketCollectorBase(sub, globalOrds) {
|
||||
@Override
|
||||
public void collect(int doc, long bucket) throws IOException {
|
||||
assert bucket == 0;
|
||||
if (singleValues.advanceExact(doc)) {
|
||||
final int ord = singleValues.ordValue();
|
||||
collectGlobalOrd(doc, ord, sub);
|
||||
}
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new LeafBucketCollectorBase(sub, globalOrds) {
|
||||
@Override
|
||||
public void collect(int doc, long bucket) throws IOException {
|
||||
assert bucket == 0;
|
||||
if (globalOrds.advanceExact(doc)) {
|
||||
for (long globalOrd = globalOrds.nextOrd(); globalOrd != NO_MORE_ORDS; globalOrd = globalOrds.nextOrd()) {
|
||||
collectGlobalOrd(doc, globalOrd, sub);
|
||||
}
|
||||
int ord = singleValues.ordValue();
|
||||
collectionStrategy.collectGlobalOrd(doc, ord, sub);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
return new LeafBucketCollectorBase(sub, globalOrds) {
|
||||
@Override
|
||||
public void collect(int doc, long bucket) throws IOException {
|
||||
assert bucket == 0;
|
||||
if (globalOrds.advanceExact(doc)) {
|
||||
for (long globalOrd = globalOrds.nextOrd(); globalOrd != NO_MORE_ORDS; globalOrd = globalOrds.nextOrd()) {
|
||||
collectionStrategy.collectGlobalOrd(doc, globalOrd, sub);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected static void copy(BytesRef from, BytesRef to) {
|
||||
@ -178,41 +159,28 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
||||
} else {
|
||||
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
|
||||
}
|
||||
long otherDocCount = 0;
|
||||
long[] otherDocCount = new long[1];
|
||||
BucketPriorityQueue<OrdBucket> ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
|
||||
OrdBucket spare = new OrdBucket(-1, 0, null, showTermDocCountError, 0);
|
||||
final boolean needsFullScan = bucketOrds == null || bucketCountThresholds.getMinDocCount() == 0;
|
||||
final long maxId = needsFullScan ? valueCount : bucketOrds.size();
|
||||
for (long ord = 0; ord < maxId; ord++) {
|
||||
final long globalOrd;
|
||||
final long bucketOrd;
|
||||
if (needsFullScan) {
|
||||
bucketOrd = bucketOrds == null ? ord : bucketOrds.find(ord);
|
||||
globalOrd = ord;
|
||||
} else {
|
||||
assert bucketOrds != null;
|
||||
bucketOrd = ord;
|
||||
globalOrd = bucketOrds.get(ord);
|
||||
}
|
||||
if (includeExclude != null && !acceptedGlobalOrdinals.get(globalOrd)) {
|
||||
continue;
|
||||
}
|
||||
final int bucketDocCount = bucketOrd < 0 ? 0 : bucketDocCount(bucketOrd);
|
||||
if (bucketCountThresholds.getMinDocCount() > 0 && bucketDocCount == 0) {
|
||||
continue;
|
||||
}
|
||||
otherDocCount += bucketDocCount;
|
||||
spare.globalOrd = globalOrd;
|
||||
spare.bucketOrd = bucketOrd;
|
||||
spare.docCount = bucketDocCount;
|
||||
if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
|
||||
spare = ordered.insertWithOverflow(spare);
|
||||
if (spare == null) {
|
||||
consumeBucketsAndMaybeBreak(1);
|
||||
spare = new OrdBucket(-1, 0, null, showTermDocCountError, 0);
|
||||
collectionStrategy.forEach(new BucketInfoConsumer() {
|
||||
OrdBucket spare = null;
|
||||
|
||||
@Override
|
||||
public void accept(long globalOrd, long bucketOrd, long docCount) {
|
||||
otherDocCount[0] += docCount;
|
||||
if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
|
||||
if (spare == null) {
|
||||
spare = new OrdBucket(showTermDocCountError, format);
|
||||
}
|
||||
spare.globalOrd = globalOrd;
|
||||
spare.bucketOrd = bucketOrd;
|
||||
spare.docCount = docCount;
|
||||
spare = ordered.insertWithOverflow(spare);
|
||||
if (spare == null) {
|
||||
consumeBucketsAndMaybeBreak(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Get the top buckets
|
||||
final StringTerms.Bucket[] list = new StringTerms.Bucket[ordered.size()];
|
||||
@ -222,7 +190,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
||||
copy(lookupGlobalOrd.apply(bucket.globalOrd), scratch);
|
||||
list[i] = new StringTerms.Bucket(scratch, bucket.docCount, null, showTermDocCountError, 0, format);
|
||||
list[i].bucketOrd = bucket.bucketOrd;
|
||||
otherDocCount -= list[i].docCount;
|
||||
otherDocCount[0] -= list[i].docCount;
|
||||
list[i].docCountError = 0;
|
||||
}
|
||||
buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
|
||||
@ -230,19 +198,24 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
||||
return new InternalAggregation[] {
|
||||
new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
|
||||
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
|
||||
otherDocCount, Arrays.asList(list), 0)
|
||||
otherDocCount[0], Arrays.asList(list), 0)
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectDebugInfo(BiConsumer<String, Object> add) {
|
||||
super.collectDebugInfo(add);
|
||||
add.accept("collection_strategy", collectionStrategy.describe());
|
||||
}
|
||||
|
||||
/**
|
||||
* This is used internally only, just for compare using global ordinal instead of term bytes in the PQ
|
||||
*/
|
||||
static class OrdBucket extends InternalTerms.Bucket<OrdBucket> {
|
||||
long globalOrd;
|
||||
|
||||
OrdBucket(long globalOrd, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError) {
|
||||
super(docCount, aggregations, showDocCountError, docCountError, null);
|
||||
this.globalOrd = globalOrd;
|
||||
OrdBucket(boolean showDocCountError, DocValueFormat format) {
|
||||
super(0, null, showDocCountError, 0, format);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -278,7 +251,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
Releasables.close(bucketOrds);
|
||||
Releasables.close(collectionStrategy);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -363,14 +336,13 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
||||
for (long i = 1; i < segmentDocCounts.size(); i++) {
|
||||
// We use set(...) here, because we need to reset the slow to 0.
|
||||
// segmentDocCounts get reused over the segments and otherwise counts would be too high.
|
||||
final int inc = segmentDocCounts.set(i, 0);
|
||||
int inc = segmentDocCounts.set(i, 0);
|
||||
if (inc == 0) {
|
||||
continue;
|
||||
}
|
||||
final long ord = i - 1; // remember we do +1 when counting
|
||||
final long globalOrd = mapping.applyAsLong(ord);
|
||||
long bucketOrd = bucketOrds == null ? globalOrd : bucketOrds.find(globalOrd);
|
||||
incrementBucketDocCount(bucketOrd, inc);
|
||||
long ord = i - 1; // remember we do +1 when counting
|
||||
long globalOrd = mapping.applyAsLong(ord);
|
||||
incrementBucketDocCount(collectionStrategy.globalOrdToBucketOrd(globalOrd), inc);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -378,9 +350,9 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
||||
private static final class FilteredOrdinals extends AbstractSortedSetDocValues {
|
||||
|
||||
private final SortedSetDocValues inner;
|
||||
private final LongBitSet accepted;
|
||||
private final LongPredicate accepted;
|
||||
|
||||
private FilteredOrdinals(SortedSetDocValues inner, LongBitSet accepted) {
|
||||
private FilteredOrdinals(SortedSetDocValues inner, LongPredicate accepted) {
|
||||
this.inner = inner;
|
||||
this.accepted = accepted;
|
||||
}
|
||||
@ -398,7 +370,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
||||
@Override
|
||||
public long nextOrd() throws IOException {
|
||||
for (long ord = inner.nextOrd(); ord != NO_MORE_ORDS; ord = inner.nextOrd()) {
|
||||
if (accepted.get(ord)) {
|
||||
if (accepted.test(ord)) {
|
||||
return ord;
|
||||
}
|
||||
}
|
||||
@ -409,7 +381,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
||||
public boolean advanceExact(int target) throws IOException {
|
||||
if (inner.advanceExact(target)) {
|
||||
for (long ord = inner.nextOrd(); ord != NO_MORE_ORDS; ord = inner.nextOrd()) {
|
||||
if (accepted.get(ord)) {
|
||||
if (accepted.test(ord)) {
|
||||
// reset the iterator
|
||||
boolean advanced = inner.advanceExact(target);
|
||||
assert advanced;
|
||||
@ -420,4 +392,157 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Strategy for collecting global ordinals.
|
||||
* <p>
|
||||
* The {@link GlobalOrdinalsSignificantTermsAggregator} uses one of these
|
||||
* to collect the global ordinals by calling
|
||||
* {@link CollectionStrategy#collectGlobalOrd(int, long, LeafBucketCollector)}
|
||||
* for each global ordinal that it hits and then calling
|
||||
* {@link CollectionStrategy#forEach(BucketInfoConsumer)} once to iterate on
|
||||
* the results.
|
||||
*/
|
||||
abstract class CollectionStrategy implements Releasable {
|
||||
/**
|
||||
* Short description of the collection mechanism added to the profile
|
||||
* output to help with debugging.
|
||||
*/
|
||||
abstract String describe();
|
||||
/**
|
||||
* Called when the global ordinals are ready.
|
||||
*/
|
||||
abstract void globalOrdsReady(SortedSetDocValues globalOrds);
|
||||
/**
|
||||
* Called once per unique document, global ordinal combination to
|
||||
* collect the bucket.
|
||||
*
|
||||
* @param doc the doc id in to collect
|
||||
* @param globalOrd the global ordinal to collect
|
||||
* @param sub the sub-aggregators that that will collect the bucket data
|
||||
*/
|
||||
abstract void collectGlobalOrd(int doc, long globalOrd, LeafBucketCollector sub) throws IOException;
|
||||
/**
|
||||
* Convert a global ordinal into a bucket ordinal.
|
||||
*/
|
||||
abstract long globalOrdToBucketOrd(long globalOrd);
|
||||
/**
|
||||
* Iterate all of the buckets. Implementations take into account
|
||||
* the {@link BucketCountThresholds}. In particular,
|
||||
* if the {@link BucketCountThresholds#getMinDocCount()} is 0 then
|
||||
* they'll make sure to iterate a bucket even if it was never
|
||||
* {{@link #collectGlobalOrd(int, long, LeafBucketCollector) collected}.
|
||||
* If {@link BucketCountThresholds#getMinDocCount()} is not 0 then
|
||||
* they'll skip all global ords that weren't collected.
|
||||
*/
|
||||
abstract void forEach(BucketInfoConsumer consumer) throws IOException;
|
||||
}
|
||||
interface BucketInfoConsumer {
|
||||
void accept(long globalOrd, long bucketOrd, long docCount) throws IOException;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* {@linkplain CollectionStrategy} that just uses the global ordinal as the
|
||||
* bucket ordinal.
|
||||
*/
|
||||
class DenseGlobalOrds extends CollectionStrategy {
|
||||
@Override
|
||||
String describe() {
|
||||
return "dense";
|
||||
}
|
||||
|
||||
@Override
|
||||
void globalOrdsReady(SortedSetDocValues globalOrds) {
|
||||
grow(globalOrds.getValueCount());
|
||||
}
|
||||
|
||||
@Override
|
||||
void collectGlobalOrd(int doc, long globalOrd, LeafBucketCollector sub) throws IOException {
|
||||
collectExistingBucket(sub, doc, globalOrd);
|
||||
}
|
||||
|
||||
@Override
|
||||
long globalOrdToBucketOrd(long globalOrd) {
|
||||
return globalOrd;
|
||||
}
|
||||
|
||||
@Override
|
||||
void forEach(BucketInfoConsumer consumer) throws IOException {
|
||||
for (long globalOrd = 0; globalOrd < valueCount; globalOrd++) {
|
||||
if (false == acceptedGlobalOrdinals.test(globalOrd)) {
|
||||
continue;
|
||||
}
|
||||
long docCount = bucketDocCount(globalOrd);
|
||||
if (bucketCountThresholds.getMinDocCount() == 0 || docCount > 0) {
|
||||
consumer.accept(globalOrd, globalOrd, docCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@linkplain CollectionStrategy} that uses a {@link LongHash} to map the
|
||||
* global ordinal into bucket ordinals. This uses more memory than
|
||||
* {@link DenseGlobalOrds} when collecting every ordinal, but significantly
|
||||
* less when collecting only a few.
|
||||
*/
|
||||
class RemapGlobalOrds extends CollectionStrategy {
|
||||
private final LongHash bucketOrds = new LongHash(1, context.bigArrays());
|
||||
|
||||
@Override
|
||||
String describe() {
|
||||
return "remap";
|
||||
}
|
||||
|
||||
@Override
|
||||
void globalOrdsReady(SortedSetDocValues globalOrds) {}
|
||||
|
||||
@Override
|
||||
void collectGlobalOrd(int doc, long globalOrd, LeafBucketCollector sub) throws IOException {
|
||||
long bucketOrd = bucketOrds.add(globalOrd);
|
||||
if (bucketOrd < 0) {
|
||||
bucketOrd = -1 - bucketOrd;
|
||||
collectExistingBucket(sub, doc, bucketOrd);
|
||||
} else {
|
||||
collectBucket(sub, doc, bucketOrd);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
long globalOrdToBucketOrd(long globalOrd) {
|
||||
return bucketOrds.find(globalOrd);
|
||||
}
|
||||
|
||||
@Override
|
||||
void forEach(BucketInfoConsumer consumer) throws IOException {
|
||||
if (bucketCountThresholds.getMinDocCount() == 0) {
|
||||
for (long globalOrd = 0; globalOrd < valueCount; globalOrd++) {
|
||||
if (false == acceptedGlobalOrdinals.test(globalOrd)) {
|
||||
continue;
|
||||
}
|
||||
long bucketOrd = bucketOrds.find(globalOrd);
|
||||
long docCount = bucketOrd < 0 ? 0 : bucketDocCount(bucketOrd);
|
||||
consumer.accept(globalOrd, bucketOrd, docCount);
|
||||
}
|
||||
} else {
|
||||
for (long bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) {
|
||||
long globalOrd = bucketOrds.get(bucketOrd);
|
||||
if (false == acceptedGlobalOrdinals.test(globalOrd)) {
|
||||
continue;
|
||||
}
|
||||
consumer.accept(globalOrd, bucketOrd, bucketDocCount(bucketOrd));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
bucketOrds.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -201,7 +201,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
|
||||
TermsAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
|
||||
assertThat(aggregator, instanceOf(GlobalOrdinalsStringTermsAggregator.class));
|
||||
GlobalOrdinalsStringTermsAggregator globalAgg = (GlobalOrdinalsStringTermsAggregator) aggregator;
|
||||
assertFalse(globalAgg.remapGlobalOrds());
|
||||
assertThat(globalAgg.descriptCollectionStrategy(), equalTo("dense"));
|
||||
|
||||
// Infers depth_first because the maxOrd is 0 which is less than the size
|
||||
aggregationBuilder
|
||||
@ -210,7 +210,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
|
||||
assertThat(aggregator, instanceOf(GlobalOrdinalsStringTermsAggregator.class));
|
||||
globalAgg = (GlobalOrdinalsStringTermsAggregator) aggregator;
|
||||
assertThat(globalAgg.collectMode, equalTo(Aggregator.SubAggCollectionMode.DEPTH_FIRST));
|
||||
assertTrue(globalAgg.remapGlobalOrds());
|
||||
assertThat(globalAgg.descriptCollectionStrategy(), equalTo("remap"));
|
||||
|
||||
aggregationBuilder
|
||||
.collectMode(Aggregator.SubAggCollectionMode.DEPTH_FIRST);
|
||||
@ -218,7 +218,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
|
||||
assertThat(aggregator, instanceOf(GlobalOrdinalsStringTermsAggregator.class));
|
||||
globalAgg = (GlobalOrdinalsStringTermsAggregator) aggregator;
|
||||
assertThat(globalAgg.collectMode, equalTo(Aggregator.SubAggCollectionMode.DEPTH_FIRST));
|
||||
assertTrue(globalAgg.remapGlobalOrds());
|
||||
assertThat(globalAgg.descriptCollectionStrategy(), equalTo("remap"));
|
||||
|
||||
aggregationBuilder
|
||||
.collectMode(Aggregator.SubAggCollectionMode.BREADTH_FIRST);
|
||||
@ -226,14 +226,14 @@ public class TermsAggregatorTests extends AggregatorTestCase {
|
||||
assertThat(aggregator, instanceOf(GlobalOrdinalsStringTermsAggregator.class));
|
||||
globalAgg = (GlobalOrdinalsStringTermsAggregator) aggregator;
|
||||
assertThat(globalAgg.collectMode, equalTo(Aggregator.SubAggCollectionMode.BREADTH_FIRST));
|
||||
assertFalse(globalAgg.remapGlobalOrds());
|
||||
assertThat(globalAgg.descriptCollectionStrategy(), equalTo("dense"));
|
||||
|
||||
aggregationBuilder
|
||||
.order(BucketOrder.aggregation("card", true));
|
||||
aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
|
||||
assertThat(aggregator, instanceOf(GlobalOrdinalsStringTermsAggregator.class));
|
||||
globalAgg = (GlobalOrdinalsStringTermsAggregator) aggregator;
|
||||
assertTrue(globalAgg.remapGlobalOrds());
|
||||
assertThat(globalAgg.descriptCollectionStrategy(), equalTo("remap"));
|
||||
|
||||
indexReader.close();
|
||||
directory.close();
|
||||
|
Loading…
x
Reference in New Issue
Block a user