Save memory when string terms are not on top (#57758) (#57876)

This reworks string flavored implementations of the `terms` aggregation
to save memory when it is under another bucket by dropping the usage of
`asMultiBucketAggregator`.
This commit is contained in:
Nik Everett 2020-06-09 10:26:29 -04:00 committed by GitHub
parent b5d3565214
commit e7cc2448d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 886 additions and 265 deletions

View File

@ -45,6 +45,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.notNullValue;
@ESIntegTestCase.SuiteScopeTestCase
@ -59,7 +60,12 @@ public class AggregationProfilerIT extends ESIntegTestCase {
private static final String TOTAL_BUCKETS = "total_buckets";
private static final String WRAPPED = "wrapped_in_multi_bucket_aggregator";
private static final Object DEFERRED = "deferred_aggregators";
private static final String DEFERRED = "deferred_aggregators";
private static final String COLLECTION_STRAT = "collection_strategy";
private static final String RESULT_STRAT = "result_strategy";
private static final String HAS_FILTER = "has_filter";
private static final String SEGMENTS_WITH_SINGLE = "segments_with_single_valued_ords";
private static final String SEGMENTS_WITH_MULTI = "segments_with_multi_valued_ords";
private static final String NUMBER_FIELD = "number";
private static final String TAG_FIELD = "tag";
@ -73,6 +79,7 @@ public class AggregationProfilerIT extends ESIntegTestCase {
@Override
protected void setupSuiteScopeCluster() throws Exception {
assertAcked(client().admin().indices().prepareCreate("idx")
.setSettings(org.elasticsearch.common.collect.Map.of("number_of_shards", 1, "number_of_replicas", 0))
.addMapping("type", STRING_FIELD, "type=keyword", NUMBER_FIELD, "type=integer", TAG_FIELD, "type=keyword").get());
List<IndexRequestBuilder> builders = new ArrayList<>();
@ -90,7 +97,7 @@ public class AggregationProfilerIT extends ESIntegTestCase {
.endObject()));
}
indexRandom(true, builders);
indexRandom(true, false, builders);
createIndex("idx_unmapped");
}
@ -184,7 +191,7 @@ public class AggregationProfilerIT extends ESIntegTestCase {
assertThat(termsBreakdown.get(COLLECT), greaterThan(0L));
assertThat(termsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(termsBreakdown.get(REDUCE), equalTo(0L));
assertThat(termsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true)));
assertRemapTermsDebugInfo(termsAggResult);
assertThat(termsAggResult.getProfiledChildren().size(), equalTo(1));
ProfileResult avgAggResult = termsAggResult.getProfiledChildren().get(0);
@ -204,6 +211,18 @@ public class AggregationProfilerIT extends ESIntegTestCase {
}
}
private void assertRemapTermsDebugInfo(ProfileResult termsAggResult) {
assertThat(termsAggResult.getDebugInfo(), hasEntry(COLLECTION_STRAT, "remap"));
assertThat(termsAggResult.getDebugInfo(), hasEntry(RESULT_STRAT, "terms"));
assertThat(termsAggResult.getDebugInfo(), hasEntry(HAS_FILTER, false));
// TODO we only index single valued docs but the ordinals ends up with multi valued sometimes
assertThat(
termsAggResult.getDebugInfo().toString(),
(int) termsAggResult.getDebugInfo().get(SEGMENTS_WITH_SINGLE) + (int) termsAggResult.getDebugInfo().get(SEGMENTS_WITH_MULTI),
greaterThan(0)
);
}
public void testMultiLevelProfileBreadthFirst() {
SearchResponse response = client().prepareSearch("idx").setProfile(true)
.addAggregation(histogram("histo").field(NUMBER_FIELD).interval(1L).subAggregation(terms("terms")
@ -251,7 +270,7 @@ public class AggregationProfilerIT extends ESIntegTestCase {
assertThat(termsBreakdown.get(COLLECT), greaterThan(0L));
assertThat(termsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(termsBreakdown.get(REDUCE), equalTo(0L));
assertThat(termsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true)));
assertRemapTermsDebugInfo(termsAggResult);
assertThat(termsAggResult.getProfiledChildren().size(), equalTo(1));
ProfileResult avgAggResult = termsAggResult.getProfiledChildren().get(0);
@ -378,7 +397,7 @@ public class AggregationProfilerIT extends ESIntegTestCase {
assertThat(tagsBreakdown.get(COLLECT), greaterThan(0L));
assertThat(tagsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(tagsBreakdown.get(REDUCE), equalTo(0L));
assertThat(tagsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true)));
assertRemapTermsDebugInfo(tagsAggResult);
assertThat(tagsAggResult.getProfiledChildren().size(), equalTo(2));
Map<String, ProfileResult> tagsAggResultSubAggregations = tagsAggResult.getProfiledChildren().stream()
@ -423,7 +442,7 @@ public class AggregationProfilerIT extends ESIntegTestCase {
assertThat(stringsBreakdown.get(COLLECT), greaterThan(0L));
assertThat(stringsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(stringsBreakdown.get(REDUCE), equalTo(0L));
assertThat(stringsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true)));
assertRemapTermsDebugInfo(stringsAggResult);
assertThat(stringsAggResult.getProfiledChildren().size(), equalTo(3));
Map<String, ProfileResult> stringsAggResultSubAggregations = stringsAggResult.getProfiledChildren().stream()
@ -469,7 +488,7 @@ public class AggregationProfilerIT extends ESIntegTestCase {
assertThat(tagsBreakdown.get(COLLECT), greaterThan(0L));
assertThat(tagsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(tagsBreakdown.get(REDUCE), equalTo(0L));
assertThat(tagsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true)));
assertRemapTermsDebugInfo(tagsAggResult);
assertThat(tagsAggResult.getProfiledChildren().size(), equalTo(2));
tagsAggResultSubAggregations = tagsAggResult.getProfiledChildren().stream()

View File

@ -180,7 +180,7 @@ public abstract class Aggregator extends BucketCollector implements Releasable {
public abstract InternalAggregation buildEmptyAggregation();
/**
* Collect debug information to add to the profiling results.. This will
* Collect debug information to add to the profiling results. This will
* only be called if the aggregation is being profiled.
* <p>
* Well behaved implementations will always call the superclass

View File

@ -0,0 +1,220 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BytesRefHash;
/**
* Maps {@link BytesRef} bucket keys to bucket ordinals.
*/
public abstract class BytesKeyedBucketOrds implements Releasable {
/**
* Build a {@link LongKeyedBucketOrds}.
*/
public static BytesKeyedBucketOrds build(BigArrays bigArrays, boolean collectsFromSingleBucket) {
return collectsFromSingleBucket ? new FromSingle(bigArrays) : new FromMany(bigArrays);
}
private BytesKeyedBucketOrds() {}
/**
* Add the {@code owningBucketOrd, value} pair. Return the ord for
* their bucket if they have yet to be added, or {@code -1-ord}
* if they were already present.
*/
public abstract long add(long owningBucketOrd, BytesRef value);
/**
* Count the buckets in {@code owningBucketOrd}.
*/
public abstract long bucketsInOrd(long owningBucketOrd);
/**
* The number of collected buckets.
*/
public abstract long size();
/**
* Build an iterator for buckets inside {@code owningBucketOrd} in order
* of increasing ord.
* <p>
* When this is first returns it is "unpositioned" and you must call
* {@link BucketOrdsEnum#next()} to move it to the first value.
*/
public abstract BucketOrdsEnum ordsEnum(long owningBucketOrd);
/**
* An iterator for buckets inside a particular {@code owningBucketOrd}.
*/
public interface BucketOrdsEnum {
/**
* Advance to the next value.
* @return {@code true} if there *is* a next value,
* {@code false} if there isn't
*/
boolean next();
/**
* The ordinal of the current value.
*/
long ord();
/**
* Read the current value.
*/
void readValue(BytesRef dest);
/**
* An {@linkplain BucketOrdsEnum} that is empty.
*/
BucketOrdsEnum EMPTY = new BucketOrdsEnum() {
@Override
public boolean next() {
return false;
}
@Override
public long ord() {
return 0;
}
@Override
public void readValue(BytesRef dest) {}
};
}
/**
* Implementation that only works if it is collecting from a single bucket.
*/
private static class FromSingle extends BytesKeyedBucketOrds {
private final BytesRefHash ords;
private FromSingle(BigArrays bigArrays) {
ords = new BytesRefHash(1, bigArrays);
}
@Override
public long add(long owningBucketOrd, BytesRef value) {
assert owningBucketOrd == 0;
return ords.add(value);
}
@Override
public long bucketsInOrd(long owningBucketOrd) {
return ords.size();
}
@Override
public long size() {
return ords.size();
}
@Override
public BucketOrdsEnum ordsEnum(long owningBucketOrd) {
return new BucketOrdsEnum() {
private int ord = -1;
@Override
public boolean next() {
ord++;
return ord < ords.size();
}
@Override
public long ord() {
return ord;
}
@Override
public void readValue(BytesRef dest) {
ords.get(ord, dest);
}
};
}
@Override
public void close() {
ords.close();
}
}
/**
* Implementation that works properly when collecting from many buckets.
*/
private static class FromMany extends BytesKeyedBucketOrds {
// TODO we can almost certainly do better here by building something fit for purpose rather than trying to lego together stuff
private final BytesRefHash bytesToLong;
private final LongKeyedBucketOrds longToBucketOrds;
private FromMany(BigArrays bigArrays) {
bytesToLong = new BytesRefHash(1, bigArrays);
longToBucketOrds = LongKeyedBucketOrds.build(bigArrays, false);
}
@Override
public long add(long owningBucketOrd, BytesRef value) {
long l = bytesToLong.add(value);
if (l < 0) {
l = -1 - l;
}
return longToBucketOrds.add(owningBucketOrd, l);
}
@Override
public long bucketsInOrd(long owningBucketOrd) {
return longToBucketOrds.bucketsInOrd(owningBucketOrd);
}
@Override
public long size() {
return longToBucketOrds.size();
}
@Override
public BucketOrdsEnum ordsEnum(long owningBucketOrd) {
LongKeyedBucketOrds.BucketOrdsEnum delegate = longToBucketOrds.ordsEnum(owningBucketOrd);
return new BucketOrdsEnum() {
@Override
public boolean next() {
return delegate.next();
}
@Override
public long ord() {
return delegate.ord();
}
@Override
public void readValue(BytesRef dest) {
bytesToLong.get(delegate.value(), dest);
}
};
}
@Override
public void close() {
Releasables.close(bytesToLong, longToBucketOrds);
}
}
}

View File

@ -31,9 +31,11 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
@ -93,6 +95,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
boolean remapGlobalOrds,
SubAggCollectionMode collectionMode,
boolean showTermDocCountError,
boolean collectsFromSingleBucket,
Map<String, Object> metadata
) throws IOException {
super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata);
@ -104,7 +107,14 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
this.valueCount = values.getValueCount();
this.lookupGlobalOrd = values::lookupOrd;
this.acceptedGlobalOrdinals = includeExclude == null ? ALWAYS_TRUE : includeExclude.acceptedGlobalOrdinals(values)::get;
this.collectionStrategy = remapGlobalOrds ? new RemapGlobalOrds() : new DenseGlobalOrds();
if (remapGlobalOrds) {
this.collectionStrategy = new RemapGlobalOrds(collectsFromSingleBucket);
} else {
if (false == collectsFromSingleBucket) {
throw new AggregationExecutionException("Dense ords don't know how to collect from many buckets");
}
this.collectionStrategy = new DenseGlobalOrds();
}
}
String descriptCollectionStrategy() {
@ -126,19 +136,17 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, globalOrds) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
assert owningBucketOrd == 0;
if (false == singleValues.advanceExact(doc)) {
return;
}
int globalOrd = singleValues.ordValue();
collectionStrategy.collectGlobalOrd(doc, globalOrd, sub);
collectionStrategy.collectGlobalOrd(owningBucketOrd, doc, globalOrd, sub);
}
});
}
return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, globalOrds) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
assert owningBucketOrd == 0;
if (false == singleValues.advanceExact(doc)) {
return;
}
@ -146,7 +154,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
if (false == acceptedGlobalOrdinals.test(globalOrd)) {
return;
}
collectionStrategy.collectGlobalOrd(doc, globalOrd, sub);
collectionStrategy.collectGlobalOrd(owningBucketOrd, doc, globalOrd, sub);
}
});
}
@ -159,12 +167,11 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, globalOrds) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
assert owningBucketOrd == 0;
if (false == globalOrds.advanceExact(doc)) {
return;
}
for (long globalOrd = globalOrds.nextOrd(); globalOrd != NO_MORE_ORDS; globalOrd = globalOrds.nextOrd()) {
collectionStrategy.collectGlobalOrd(doc, globalOrd, sub);
collectionStrategy.collectGlobalOrd(owningBucketOrd, doc, globalOrd, sub);
}
}
});
@ -172,7 +179,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, globalOrds) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
assert owningBucketOrd == 0;
if (false == globalOrds.advanceExact(doc)) {
return;
}
@ -180,7 +186,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
if (false == acceptedGlobalOrdinals.test(globalOrd)) {
continue;
}
collectionStrategy.collectGlobalOrd(doc, globalOrd, sub);
collectionStrategy.collectGlobalOrd(owningBucketOrd, doc, globalOrd, sub);
}
}
});
@ -200,6 +206,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
public void collectDebugInfo(BiConsumer<String, Object> add) {
super.collectDebugInfo(add);
add.accept("collection_strategy", collectionStrategy.describe());
collectionStrategy.collectDebugInfo(add);
add.accept("result_strategy", resultStrategy.describe());
add.accept("segments_with_single_valued_ords", segmentsWithSingleValuedOrds);
add.accept("segments_with_multi_valued_ords", segmentsWithMultiValuedOrds);
@ -276,13 +283,13 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
BucketCountThresholds bucketCountThresholds,
SearchContext context,
Aggregator parent,
boolean forceDenseMode,
boolean remapGlobalOrds,
SubAggCollectionMode collectionMode,
boolean showTermDocCountError,
Map<String, Object> metadata
) throws IOException {
super(name, factories, a -> a.new StandardTermsResults(), valuesSource, order, format, bucketCountThresholds, null,
context, parent, forceDenseMode, collectionMode, showTermDocCountError, metadata);
context, parent, remapGlobalOrds, collectionMode, showTermDocCountError, true, metadata);
assert factories == null || factories.countAggregators() == 0;
this.segmentDocCounts = context.bigArrays().newIntArray(1, true);
}
@ -350,7 +357,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
}
long ord = i - 1; // remember we do +1 when counting
long globalOrd = mapping.applyAsLong(ord);
incrementBucketDocCount(collectionStrategy.globalOrdToBucketOrd(globalOrd), inc);
incrementBucketDocCount(collectionStrategy.globalOrdToBucketOrd(0, globalOrd), inc);
}
}
}
@ -360,10 +367,9 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
* <p>
* The {@link GlobalOrdinalsStringTermsAggregator} uses one of these
* to collect the global ordinals by calling
* {@link CollectionStrategy#collectGlobalOrd(int, long, LeafBucketCollector)}
* for each global ordinal that it hits and then calling
* {@link CollectionStrategy#forEach(BucketInfoConsumer)} once to iterate on
* the results.
* {@link CollectionStrategy#collectGlobalOrd} for each global ordinal
* that it hits and then calling {@link CollectionStrategy#forEach}
* once to iterate on the results.
*/
abstract class CollectionStrategy implements Releasable {
/**
@ -371,6 +377,11 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
* output to help with debugging.
*/
abstract String describe();
/**
* Collect debug information to add to the profiling results. This will
* only be called if the aggregation is being profiled.
*/
abstract void collectDebugInfo(BiConsumer<String, Object> add);
/**
* Called when the global ordinals are ready.
*/
@ -379,31 +390,31 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
* Called once per unique document, global ordinal combination to
* collect the bucket.
*
* @param owningBucketOrd the ordinal of the bucket that owns this collection
* @param doc the doc id in to collect
* @param globalOrd the global ordinal to collect
* @param sub the sub-aggregators that that will collect the bucket data
*/
abstract void collectGlobalOrd(int doc, long globalOrd, LeafBucketCollector sub) throws IOException;
abstract void collectGlobalOrd(long owningBucketOrd, int doc, long globalOrd, LeafBucketCollector sub) throws IOException;
/**
* Convert a global ordinal into a bucket ordinal.
*/
abstract long globalOrdToBucketOrd(long globalOrd);
abstract long globalOrdToBucketOrd(long owningBucketOrd, long globalOrd);
/**
* Iterate all of the buckets. Implementations take into account
* the {@link BucketCountThresholds}. In particular,
* if the {@link BucketCountThresholds#getMinDocCount()} is 0 then
* they'll make sure to iterate a bucket even if it was never
* {{@link #collectGlobalOrd(int, long, LeafBucketCollector) collected}.
* {{@link #collectGlobalOrd collected}.
* If {@link BucketCountThresholds#getMinDocCount()} is not 0 then
* they'll skip all global ords that weren't collected.
*/
abstract void forEach(BucketInfoConsumer consumer) throws IOException;
abstract void forEach(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException;
}
interface BucketInfoConsumer {
void accept(long globalOrd, long bucketOrd, long docCount) throws IOException;
}
/**
* {@linkplain CollectionStrategy} that just uses the global ordinal as the
* bucket ordinal.
@ -414,23 +425,29 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
return "dense";
}
@Override
void collectDebugInfo(BiConsumer<String, Object> add) {}
@Override
void globalOrdsReady(SortedSetDocValues globalOrds) {
grow(globalOrds.getValueCount());
}
@Override
void collectGlobalOrd(int doc, long globalOrd, LeafBucketCollector sub) throws IOException {
void collectGlobalOrd(long owningBucketOrd, int doc, long globalOrd, LeafBucketCollector sub) throws IOException {
assert owningBucketOrd == 0;
collectExistingBucket(sub, doc, globalOrd);
}
@Override
long globalOrdToBucketOrd(long globalOrd) {
long globalOrdToBucketOrd(long owningBucketOrd, long globalOrd) {
assert owningBucketOrd == 0;
return globalOrd;
}
@Override
void forEach(BucketInfoConsumer consumer) throws IOException {
void forEach(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException {
assert owningBucketOrd == 0;
for (long globalOrd = 0; globalOrd < valueCount; globalOrd++) {
if (false == acceptedGlobalOrdinals.test(globalOrd)) {
continue;
@ -452,20 +469,29 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
* {@link DenseGlobalOrds} when collecting every ordinal, but significantly
* less when collecting only a few.
*/
class RemapGlobalOrds extends CollectionStrategy {
private final LongHash bucketOrds = new LongHash(1, context.bigArrays());
private class RemapGlobalOrds extends CollectionStrategy {
private final LongKeyedBucketOrds bucketOrds;
private RemapGlobalOrds(boolean collectsFromSingleBucket) {
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}
@Override
String describe() {
return "remap";
}
@Override
void collectDebugInfo(BiConsumer<String, Object> add) {
add.accept("total_buckets", bucketOrds.size());
}
@Override
void globalOrdsReady(SortedSetDocValues globalOrds) {}
@Override
void collectGlobalOrd(int doc, long globalOrd, LeafBucketCollector sub) throws IOException {
long bucketOrd = bucketOrds.add(globalOrd);
void collectGlobalOrd(long owningBucketOrd, int doc, long globalOrd, LeafBucketCollector sub) throws IOException {
long bucketOrd = bucketOrds.add(owningBucketOrd, globalOrd);
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
collectExistingBucket(sub, doc, bucketOrd);
@ -475,33 +501,32 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
}
@Override
long globalOrdToBucketOrd(long globalOrd) {
return bucketOrds.find(globalOrd);
long globalOrdToBucketOrd(long owningBucketOrd, long globalOrd) {
return bucketOrds.find(owningBucketOrd, globalOrd);
}
@Override
void forEach(BucketInfoConsumer consumer) throws IOException {
void forEach(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException {
if (bucketCountThresholds.getMinDocCount() == 0) {
for (long globalOrd = 0; globalOrd < valueCount; globalOrd++) {
if (false == acceptedGlobalOrdinals.test(globalOrd)) {
continue;
}
long bucketOrd = bucketOrds.find(globalOrd);
long bucketOrd = bucketOrds.find(owningBucketOrd, globalOrd);
long docCount = bucketOrd < 0 ? 0 : bucketDocCount(bucketOrd);
consumer.accept(globalOrd, bucketOrd, docCount);
}
} else {
for (long bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) {
long globalOrd = bucketOrds.get(bucketOrd);
if (false == acceptedGlobalOrdinals.test(globalOrd)) {
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd);
while (ordsEnum.next()) {
if (false == acceptedGlobalOrdinals.test(ordsEnum.value())) {
continue;
}
consumer.accept(globalOrd, bucketOrd, bucketDocCount(bucketOrd));
consumer.accept(ordsEnum.value(), ordsEnum.ord(), bucketDocCount(ordsEnum.ord()));
}
}
}
@Override
public void close() {
bucketOrds.close();
@ -517,47 +542,58 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable {
private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
if (valueCount == 0) { // no context in this reader
return new InternalAggregation[] {buildEmptyAggregation()};
}
final int size;
if (bucketCountThresholds.getMinDocCount() == 0) {
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
} else {
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
}
long[] otherDocCount = new long[1];
PriorityQueue<TB> ordered = buildPriorityQueue(size);
collectionStrategy.forEach(new BucketInfoConsumer() {
TB spare = null;
@Override
public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
otherDocCount[0] += docCount;
if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
if (spare == null) {
spare = buildEmptyTemporaryBucket();
}
updateBucket(spare, globalOrd, bucketOrd, docCount);
spare = ordered.insertWithOverflow(spare);
}
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = buildNoValuesResult(owningBucketOrds[ordIdx]);
}
});
// Get the top buckets
B[] topBuckets = buildBuckets(ordered.size());
for (int i = ordered.size() - 1; i >= 0; --i) {
topBuckets[i] = convertTempBucketToRealBucket(ordered.pop());
otherDocCount[0] -= topBuckets[i].getDocCount();
return results;
}
buildSubAggs(topBuckets);
return new InternalAggregation[] {
buildResult(topBuckets, otherDocCount[0])
};
B[][] topBucketsPreOrd = buildTopBucketsPerOrd(owningBucketOrds.length);
long[] otherDocCount = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
final int size;
if (bucketCountThresholds.getMinDocCount() == 0) {
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
} else {
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
}
PriorityQueue<TB> ordered = buildPriorityQueue(size);
final int finalOrdIdx = ordIdx;
BucketUpdater<TB> updater = bucketUpdater(owningBucketOrds[ordIdx]);
collectionStrategy.forEach(owningBucketOrds[ordIdx], new BucketInfoConsumer() {
TB spare = null;
@Override
public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
otherDocCount[finalOrdIdx] += docCount;
if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
if (spare == null) {
spare = buildEmptyTemporaryBucket();
}
updater.updateBucket(spare, globalOrd, bucketOrd, docCount);
spare = ordered.insertWithOverflow(spare);
}
}
});
// Get the top buckets
topBucketsPreOrd[ordIdx] = buildBuckets(ordered.size());
for (int i = ordered.size() - 1; i >= 0; --i) {
topBucketsPreOrd[ordIdx][i] = convertTempBucketToRealBucket(ordered.pop());
otherDocCount[ordIdx] -= topBucketsPreOrd[ordIdx][i].getDocCount();
}
}
buildSubAggs(topBucketsPreOrd);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCount[ordIdx], topBucketsPreOrd[ordIdx]);
}
return results;
}
/**
@ -581,7 +617,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
* Update fields in {@code spare} to reflect information collected for
* this bucket ordinal.
*/
abstract void updateBucket(TB spare, long globalOrd, long bucketOrd, long docCount) throws IOException;
abstract BucketUpdater<TB> bucketUpdater(long owningBucketOrd) throws IOException;
/**
* Build a {@link PriorityQueue} to sort the buckets. After we've
@ -589,6 +625,11 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
*/
abstract PriorityQueue<TB> buildPriorityQueue(int size);
/**
* Build an array to hold the "top" buckets for each ordinal.
*/
abstract B[][] buildTopBucketsPerOrd(int size);
/**
* Build an array of buckets for a particular ordinal to collect the
* results. The populated list is passed to {@link #buildResult}.
@ -604,18 +645,27 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
* Build the sub-aggregations into the buckets. This will usually
* delegate to {@link #buildSubAggsForAllBuckets}.
*/
abstract void buildSubAggs(B[] topBuckets) throws IOException;
abstract void buildSubAggs(B[][] topBucketsPreOrd) throws IOException;
/**
* Turn the buckets into an aggregation result.
*/
abstract R buildResult(B[] topBuckets, long otherDocCount);
abstract R buildResult(long owningBucketOrd, long otherDocCount, B[] topBuckets);
/**
* Build an "empty" result. Only called if there isn't any data on this
* shard.
*/
abstract R buildEmptyResult();
/**
* Build an "empty" result for a particular bucket ordinal. Called when
* there aren't any values for the field on this shard.
*/
abstract R buildNoValuesResult(long owningBucketOrdinal);
}
interface BucketUpdater<TB extends InternalMultiBucketAggregation.InternalBucket> {
void updateBucket(TB spare, long globalOrd, long bucketOrd, long docCount) throws IOException;
}
/**
@ -632,6 +682,11 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
return primary;
}
@Override
StringTerms.Bucket[][] buildTopBucketsPerOrd(int size) {
return new StringTerms.Bucket[size][];
}
@Override
StringTerms.Bucket[] buildBuckets(int size) {
return new StringTerms.Bucket[size];
@ -643,10 +698,12 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
}
@Override
void updateBucket(OrdBucket spare, long globalOrd, long bucketOrd, long docCount) throws IOException {
spare.globalOrd = globalOrd;
spare.bucketOrd = bucketOrd;
spare.docCount = docCount;
BucketUpdater<OrdBucket> bucketUpdater(long owningBucketOrd) throws IOException {
return (spare, globalOrd, bucketOrd, docCount) -> {
spare.globalOrd = globalOrd;
spare.bucketOrd = bucketOrd;
spare.docCount = docCount;
};
}
@Override
@ -663,12 +720,12 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
}
@Override
void buildSubAggs(StringTerms.Bucket[] topBuckets) throws IOException {
buildSubAggsForBuckets(topBuckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
void buildSubAggs(StringTerms.Bucket[][] topBucketsPreOrd) throws IOException {
buildSubAggsForAllBuckets(topBucketsPreOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
}
@Override
StringTerms buildResult(StringTerms.Bucket[] topBuckets, long otherDocCount) {
StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bucket[] topBuckets) {
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
otherDocCount, Arrays.asList(topBuckets), 0);
@ -679,6 +736,11 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
return buildEmptyTermsAggregation();
}
@Override
StringTerms buildNoValuesResult(long owningBucketOrdinal) {
return buildEmptyResult();
}
@Override
public void close() {}
}
@ -695,7 +757,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
private final SignificantTermsAggregatorFactory termsAggFactory;
private final SignificanceHeuristic significanceHeuristic;
private long subsetSize = 0;
private LongArray subsetSizes = context.bigArrays().newLongArray(1, true);
SignificantTermsResults(SignificantTermsAggregatorFactory termsAggFactory, SignificanceHeuristic significanceHeuristic) {
this.termsAggFactory = termsAggFactory;
@ -713,11 +775,17 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
super.collect(doc, owningBucketOrd);
subsetSize++;
subsetSizes = context.bigArrays().grow(subsetSizes, owningBucketOrd + 1);
subsetSizes.increment(owningBucketOrd, 1);
}
};
}
@Override
SignificantStringTerms.Bucket[][] buildTopBucketsPerOrd(int size) {
return new SignificantStringTerms.Bucket[size][];
}
@Override
SignificantStringTerms.Bucket[] buildBuckets(int size) {
return new SignificantStringTerms.Bucket[size];
@ -729,19 +797,22 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
}
@Override
void updateBucket(SignificantStringTerms.Bucket spare, long globalOrd, long bucketOrd, long docCount) throws IOException {
spare.bucketOrd = bucketOrd;
oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
spare.subsetDf = docCount;
spare.subsetSize = subsetSize;
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
spare.supersetSize = termsAggFactory.getSupersetNumDocs();
/*
* During shard-local down-selection we use subset/superset stats
* that are for this shard only. Back at the central reducer these
* properties will be updated with global stats.
*/
spare.updateScore(significanceHeuristic);
BucketUpdater<SignificantStringTerms.Bucket> bucketUpdater(long owningBucketOrd) throws IOException {
long subsetSize = subsetSizes.get(owningBucketOrd);
return (spare, globalOrd, bucketOrd, docCount) -> {
spare.bucketOrd = bucketOrd;
oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
spare.subsetDf = docCount;
spare.subsetSize = subsetSize;
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
spare.supersetSize = termsAggFactory.getSupersetNumDocs();
/*
* During shard-local down-selection we use subset/superset stats
* that are for this shard only. Back at the central reducer these
* properties will be updated with global stats.
*/
spare.updateScore(significanceHeuristic);
};
}
@Override
@ -755,24 +826,38 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
}
@Override
void buildSubAggs(SignificantStringTerms.Bucket[] topBuckets) throws IOException {
buildSubAggsForBuckets(topBuckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPreOrd) throws IOException {
buildSubAggsForAllBuckets(topBucketsPreOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
}
@Override
SignificantStringTerms buildResult(SignificantStringTerms.Bucket[] topBuckets, long otherDocCount) {
return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, subsetSize, termsAggFactory.getSupersetNumDocs(), significanceHeuristic, Arrays.asList(topBuckets));
SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) {
return new SignificantStringTerms(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
subsetSizes.get(owningBucketOrd),
termsAggFactory.getSupersetNumDocs(),
significanceHeuristic,
Arrays.asList(topBuckets)
);
}
@Override
SignificantStringTerms buildEmptyResult() {
return buildEmptySignificantTermsAggregation(subsetSize, significanceHeuristic);
return buildEmptySignificantTermsAggregation(0, significanceHeuristic);
}
@Override
SignificantStringTerms buildNoValuesResult(long owningBucketOrdinal) {
return buildEmptySignificantTermsAggregation(subsetSizes.get(owningBucketOrdinal), significanceHeuristic);
}
@Override
public void close() {
termsAggFactory.close();
Releasables.close(termsAggFactory, subsetSizes);
}
/**

View File

@ -40,12 +40,18 @@ public abstract class LongKeyedBucketOrds implements Releasable {
private LongKeyedBucketOrds() {}
/**
* Add the {@code owningBucketOrd, term} pair. Return the ord for
* Add the {@code owningBucketOrd, value} pair. Return the ord for
* their bucket if they have yet to be added, or {@code -1-ord}
* if they were already present.
*/
public abstract long add(long owningBucketOrd, long value);
/**
* Find the {@code owningBucketOrd, value} pair. Return the ord for
* their bucket if they have been added or {@code -1} if they haven't.
*/
public abstract long find(long owningBucketOrd, long value);
/**
* Count the buckets in {@code owningBucketOrd}.
*/
@ -96,7 +102,6 @@ public abstract class LongKeyedBucketOrds implements Releasable {
};
}
/**
* Implementation that only works if it is collecting from a single bucket.
*/
@ -113,6 +118,12 @@ public abstract class LongKeyedBucketOrds implements Releasable {
return ords.add(value);
}
@Override
public long find(long owningBucketOrd, long value) {
assert owningBucketOrd == 0;
return ords.find(value);
}
@Override
public long bucketsInOrd(long owningBucketOrd) {
assert owningBucketOrd == 0;
@ -217,6 +228,22 @@ public abstract class LongKeyedBucketOrds implements Releasable {
return buckets;
}
@Override
public long find(long owningBucketOrd, long value) {
if (owningBucketOrd >= owningOrdToBuckets.size()) {
return -1;
}
Buckets buckets = owningOrdToBuckets.get(owningBucketOrd);
if (buckets == null) {
return -1;
}
long thisBucketOrd = buckets.valueToThisBucketOrd.find(value);
if (thisBucketOrd < 0) {
return -1;
}
return buckets.thisBucketOrdToGlobalOrd.get(thisBucketOrd);
}
@Override
public long bucketsInOrd(long owningBucketOrd) {
if (owningBucketOrd >= owningOrdToBuckets.size()) {

View File

@ -25,7 +25,7 @@ import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
@ -45,6 +45,7 @@ import java.util.Arrays;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* An aggregator of string values that hashes the strings on the fly rather
@ -53,7 +54,7 @@ import java.util.function.Function;
public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
private final ResultStrategy<?, ?> resultStrategy;
private final ValuesSource valuesSource;
private final BytesRefHash bucketOrds;
private final BytesKeyedBucketOrds bucketOrds;
private final IncludeExclude.StringFilter includeExclude;
public MapStringTermsAggregator(
@ -69,13 +70,14 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
Aggregator parent,
SubAggCollectionMode collectionMode,
boolean showTermDocCountError,
boolean collectsFromSingleBucket,
Map<String, Object> metadata
) throws IOException {
super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata);
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
this.valuesSource = valuesSource;
this.includeExclude = includeExclude;
bucketOrds = new BytesRefHash(1, context.bigArrays());
bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}
@Override
@ -94,31 +96,31 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
final BytesRefBuilder previous = new BytesRefBuilder();
@Override
public void collect(int doc, long bucket) throws IOException {
assert bucket == 0;
if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
public void collect(int doc, long owningBucketOrd) throws IOException {
if (false == values.advanceExact(doc)) {
return;
}
int valuesCount = values.docValueCount();
// SortedBinaryDocValues don't guarantee uniqueness so we
// need to take care of dups
previous.clear();
for (int i = 0; i < valuesCount; ++i) {
final BytesRef bytes = values.nextValue();
if (includeExclude != null && false == includeExclude.accept(bytes)) {
continue;
}
if (i > 0 && previous.get().equals(bytes)) {
continue;
}
long bucketOrdinal = bucketOrds.add(bytes);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
}
previous.copyBytes(bytes);
// SortedBinaryDocValues don't guarantee uniqueness so we
// need to take care of dups
previous.clear();
for (int i = 0; i < valuesCount; ++i) {
final BytesRef bytes = values.nextValue();
if (includeExclude != null && false == includeExclude.accept(bytes)) {
continue;
}
if (i > 0 && previous.get().equals(bytes)) {
continue;
}
long bucketOrdinal = bucketOrds.add(owningBucketOrd, bytes);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
}
previous.copyBytes(bytes);
}
}
});
@ -131,12 +133,13 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
@Override
public InternalAggregation buildEmptyAggregation() {
return buildEmptyTermsAggregation();
return resultStrategy.buildEmptyResult();
}
@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
super.collectDebugInfo(add);
add.accept("total_buckets", bucketOrds.size());
add.accept("result_strategy", resultStrategy.describe());
}
@ -153,39 +156,43 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
Releasable {
private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length);
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]);
int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
collectZeroDocEntriesIfNeeded();
int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
long otherDocCount = 0;
PriorityQueue<B> ordered = buildPriorityQueue(size);
B spare = null;
for (int bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) {
long docCount = bucketDocCount(bucketOrd);
otherDocCount += docCount;
if (docCount < bucketCountThresholds.getShardMinDocCount()) {
continue;
PriorityQueue<B> ordered = buildPriorityQueue(size);
B spare = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
if (docCount < bucketCountThresholds.getShardMinDocCount()) {
continue;
}
if (spare == null) {
spare = emptyBucketBuilder.get();
}
updateBucket(spare, ordsEnum, docCount);
spare = ordered.insertWithOverflow(spare);
}
if (spare == null) {
spare = buildEmptyBucket();
topBucketsPerOrd[ordIdx] = buildBuckets(ordered.size());
for (int i = ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd[ordIdx][i] = ordered.pop();
otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][i].getDocCount();
finalizeBucket(topBucketsPerOrd[ordIdx][i]);
}
updateBucket(spare, bucketOrd, docCount);
spare = ordered.insertWithOverflow(spare);
}
B[] topBuckets = buildBuckets(ordered.size());
for (int i = ordered.size() - 1; i >= 0; --i) {
topBuckets[i] = ordered.pop();
otherDocCount -= topBuckets[i].getDocCount();
finalizeBucket(topBuckets[i]);
buildSubAggs(topBucketsPerOrd);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]);
}
buildSubAggs(topBuckets);
return new InternalAggregation[] {
buildResult(topBuckets, otherDocCount)
};
return result;
}
/**
@ -204,12 +211,12 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
* Collect extra entries for "zero" hit documents if they were requested
* and required.
*/
abstract void collectZeroDocEntriesIfNeeded() throws IOException;
abstract void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException;
/**
* Build an empty temporary bucket.
*/
abstract B buildEmptyBucket();
abstract Supplier<B> emptyBucketBuilder(long owningBucketOrd);
/**
* Build a {@link PriorityQueue} to sort the buckets. After we've
@ -221,7 +228,12 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
* Update fields in {@code spare} to reflect information collected for
* this bucket ordinal.
*/
abstract void updateBucket(B spare, long bucketOrd, long docCount) throws IOException;
abstract void updateBucket(B spare, BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum, long docCount) throws IOException;
/**
* Build an array to hold the "top" buckets for each ordinal.
*/
abstract B[][] buildTopBucketsPerOrd(int size);
/**
* Build an array of buckets for a particular ordinal to collect the
@ -239,12 +251,12 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
* Build the sub-aggregations into the buckets. This will usually
* delegate to {@link #buildSubAggsForAllBuckets}.
*/
abstract void buildSubAggs(B[] topBuckets) throws IOException;
abstract void buildSubAggs(B[][] topBucketsPerOrd) throws IOException;
/**
* Turn the buckets into an aggregation result.
*/
abstract R buildResult(B[] topBuckets, long otherDocCount);
abstract R buildResult(long owningBucketOrd, long otherDocCount, B[] topBuckets);
/**
* Build an "empty" result. Only called if there isn't any data on this
@ -268,11 +280,11 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
}
@Override
void collectZeroDocEntriesIfNeeded() throws IOException {
void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException {
if (bucketCountThresholds.getMinDocCount() != 0) {
return;
}
if (InternalOrder.isCountDesc(order) && bucketOrds.size() >= bucketCountThresholds.getRequiredSize()) {
if (InternalOrder.isCountDesc(order) && bucketOrds.bucketsInOrd(owningBucketOrd) >= bucketCountThresholds.getRequiredSize()) {
return;
}
// we need to fill-in the blanks
@ -285,7 +297,7 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
for (int i = 0; i < valueCount; ++i) {
BytesRef term = values.nextValue();
if (includeExclude == null || includeExclude.accept(term)) {
bucketOrds.add(term);
bucketOrds.add(owningBucketOrd, term);
}
}
}
@ -294,8 +306,8 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
}
@Override
StringTerms.Bucket buildEmptyBucket() {
return new StringTerms.Bucket(new BytesRef(), 0, null, showTermDocCountError, 0, format);
Supplier<StringTerms.Bucket> emptyBucketBuilder(long owningBucketOrd) {
return () -> new StringTerms.Bucket(new BytesRef(), 0, null, showTermDocCountError, 0, format);
}
@Override
@ -304,10 +316,15 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
}
@Override
void updateBucket(StringTerms.Bucket spare, long bucketOrd, long docCount) throws IOException {
bucketOrds.get(bucketOrd, spare.termBytes);
void updateBucket(StringTerms.Bucket spare, BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum, long docCount) throws IOException {
ordsEnum.readValue(spare.termBytes);
spare.docCount = docCount;
spare.bucketOrd = bucketOrd;
spare.bucketOrd = ordsEnum.ord();
}
@Override
StringTerms.Bucket[][] buildTopBucketsPerOrd(int size) {
return new StringTerms.Bucket[size][];
}
@Override
@ -326,12 +343,12 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
}
@Override
void buildSubAggs(StringTerms.Bucket[] topBuckets) throws IOException {
buildSubAggsForBuckets(topBuckets, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
void buildSubAggs(StringTerms.Bucket[][] topBucketsPerOrd) throws IOException {
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
}
@Override
StringTerms buildResult(StringTerms.Bucket[] topBuckets, long otherDocCount) {
StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bucket[] topBuckets) {
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount,
Arrays.asList(topBuckets), 0);
@ -354,7 +371,7 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
private final SignificantTermsAggregatorFactory termsAggFactory;
private final SignificanceHeuristic significanceHeuristic;
private long subsetSize = 0;
private LongArray subsetSizes = context.bigArrays().newLongArray(1, true);
SignificantTermsResults(SignificantTermsAggregatorFactory termsAggFactory, SignificanceHeuristic significanceHeuristic) {
this.termsAggFactory = termsAggFactory;
@ -372,17 +389,19 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
super.collect(doc, owningBucketOrd);
subsetSize++;
subsetSizes = context.bigArrays().grow(subsetSizes, owningBucketOrd + 1);
subsetSizes.increment(owningBucketOrd, 1);
}
};
}
@Override
void collectZeroDocEntriesIfNeeded() throws IOException {}
void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException {}
@Override
SignificantStringTerms.Bucket buildEmptyBucket() {
return new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
Supplier<SignificantStringTerms.Bucket> emptyBucketBuilder(long owningBucketOrd) {
long subsetSize = subsetSizes.get(owningBucketOrd);
return () -> new SignificantStringTerms.Bucket(new BytesRef(), 0, subsetSize, 0, 0, null, format, 0);
}
@Override
@ -391,11 +410,12 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
}
@Override
void updateBucket(SignificantStringTerms.Bucket spare, long bucketOrd, long docCount) throws IOException {
bucketOrds.get(bucketOrd, spare.termBytes);
void updateBucket(SignificantStringTerms.Bucket spare, BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum, long docCount)
throws IOException {
ordsEnum.readValue(spare.termBytes);
spare.bucketOrd = ordsEnum.ord();
spare.subsetDf = docCount;
spare.bucketOrd = bucketOrd;
spare.subsetSize = subsetSize;
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
spare.supersetSize = termsAggFactory.getSupersetNumDocs();
/*
@ -406,6 +426,11 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
spare.updateScore(significanceHeuristic);
}
@Override
SignificantStringTerms.Bucket[][] buildTopBucketsPerOrd(int size) {
return new SignificantStringTerms.Bucket[size][];
}
@Override
SignificantStringTerms.Bucket[] buildBuckets(int size) {
return new SignificantStringTerms.Bucket[size];
@ -422,25 +447,33 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
}
@Override
void buildSubAggs(SignificantStringTerms.Bucket[] topBuckets) throws IOException {
buildSubAggsForBuckets(topBuckets, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPerOrd) throws IOException {
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
}
@Override
SignificantStringTerms buildResult(SignificantStringTerms.Bucket[] topBuckets, long otherDocCount) {
return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(),
SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) {
return new SignificantStringTerms(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(), format, subsetSize, termsAggFactory.getSupersetNumDocs(), significanceHeuristic, Arrays.asList(topBuckets));
metadata(),
format,
subsetSizes.get(owningBucketOrd),
termsAggFactory.getSupersetNumDocs(),
significanceHeuristic,
Arrays.asList(topBuckets)
);
}
@Override
SignificantStringTerms buildEmptyResult() {
return buildEmptySignificantTermsAggregation(subsetSize, significanceHeuristic);
return buildEmptySignificantTermsAggregation(0, significanceHeuristic);
}
@Override
public void close() {
termsAggFactory.close();
Releasables.close(termsAggFactory, subsetSizes);
}
}
}

View File

@ -251,7 +251,7 @@ public class NumericTermsAggregator extends TermsAggregator {
* Collect extra entries for "zero" hit documents if they were requested
* and required.
*/
abstract void collectZeroDocEntriesIfNeeded(long ord) throws IOException;
abstract void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException;
/**
* Turn the buckets into an aggregation result.
@ -296,11 +296,11 @@ public class NumericTermsAggregator extends TermsAggregator {
abstract B buildEmptyBucket();
@Override
final void collectZeroDocEntriesIfNeeded(long ord) throws IOException {
final void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException {
if (bucketCountThresholds.getMinDocCount() != 0) {
return;
}
if (InternalOrder.isCountDesc(order) && bucketOrds.bucketsInOrd(ord) >= bucketCountThresholds.getRequiredSize()) {
if (InternalOrder.isCountDesc(order) && bucketOrds.bucketsInOrd(owningBucketOrd) >= bucketCountThresholds.getRequiredSize()) {
return;
}
// we need to fill-in the blanks
@ -312,7 +312,7 @@ public class NumericTermsAggregator extends TermsAggregator {
for (int v = 0; v < valueCount; ++v) {
long value = values.nextValue();
if (longFilter == null || longFilter.accept(value)) {
bucketOrds.add(ord, value);
bucketOrds.add(owningBucketOrd, value);
}
}
}
@ -546,10 +546,10 @@ public class NumericTermsAggregator extends TermsAggregator {
}
@Override
void collectZeroDocEntriesIfNeeded(long ord) throws IOException {}
void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException {}
@Override
SignificantLongTerms buildResult(long owningBucketOrd, long otherDocCounts, SignificantLongTerms.Bucket[] topBuckets) {
SignificantLongTerms buildResult(long owningBucketOrd, long otherDocCoun, SignificantLongTerms.Bucket[] topBuckets) {
return new SignificantLongTerms(
name,
bucketCountThresholds.getRequiredSize(),

View File

@ -106,7 +106,6 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
assert collectsFromSingleBucket;
ExecutionMode execution = null;
if (executionHint != null) {
execution = ExecutionMode.fromString(executionHint, deprecationLogger);
@ -125,12 +124,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
}
return execution.create(name, factories, valuesSource, format, bucketCountThresholds, includeExclude, context, parent,
significanceHeuristic, sigTermsFactory, metadata);
}
@Override
public boolean needsToCollectFromSingleBucket() {
return true;
significanceHeuristic, sigTermsFactory, collectsFromSingleBucket, metadata);
}
};
}
@ -177,11 +171,6 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
numericValuesSource, format, null, bucketCountThresholds, context, parent, SubAggCollectionMode.BREADTH_FIRST,
longFilter, collectsFromSingleBucket, metadata);
}
@Override
public boolean needsToCollectFromSingleBucket() {
return false;
}
};
}
@ -316,9 +305,6 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
aggregatorSupplier.getClass().toString() + "]");
}
SignificantTermsAggregatorSupplier sigTermsAggregatorSupplier = (SignificantTermsAggregatorSupplier) aggregatorSupplier;
if (collectsFromSingleBucket == false && sigTermsAggregatorSupplier.needsToCollectFromSingleBucket()) {
return asMultiBucketAggregator(this, searchContext, parent);
}
numberOfAggregatorsCreated++;
BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds);
@ -359,6 +345,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
Aggregator parent,
SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory termsAggregatorFactory,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
final IncludeExclude.StringFilter filter = includeExclude == null ? null : includeExclude.convertToStringFilter(format);
@ -375,6 +362,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
parent,
SubAggCollectionMode.BREADTH_FIRST,
false,
collectsFromSingleBucket,
metadata
);
@ -394,6 +382,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
Aggregator parent,
SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory termsAggregatorFactory,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format);
@ -424,6 +413,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
remapGlobalOrd,
SubAggCollectionMode.BREADTH_FIRST,
false,
collectsFromSingleBucket,
metadata
);
}
@ -458,6 +448,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
Aggregator parent,
SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory termsAggregatorFactory,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException;
@Override

View File

@ -43,6 +43,4 @@ interface SignificantTermsAggregatorSupplier extends AggregatorSupplier {
SignificantTermsAggregatorFactory sigTermsFactory,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException;
boolean needsToCollectFromSingleBucket();
}

View File

@ -83,8 +83,6 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
boolean showTermDocCountError,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
assert collectsFromSingleBucket;
ExecutionMode execution = null;
if (executionHint != null) {
execution = ExecutionMode.fromString(executionHint);
@ -110,14 +108,9 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
// TODO: [Zach] we might want refactor and remove ExecutionMode#create(), moving that logic outside the enum
return execution.create(name, factories, valuesSource, order, format, bucketCountThresholds, includeExclude,
context, parent, subAggCollectMode, showTermDocCountError, metadata);
context, parent, subAggCollectMode, showTermDocCountError, collectsFromSingleBucket, metadata);
}
@Override
public boolean needsToCollectFromSingleBucket() {
return true;
}
};
}
@ -170,11 +163,6 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
return new NumericTermsAggregator(name, factories, resultStrategy, numericValuesSource, format, order,
bucketCountThresholds, context, parent, subAggCollectMode, longFilter, collectsFromSingleBucket, metadata);
}
@Override
public boolean needsToCollectFromSingleBucket() {
return false;
}
};
}
@ -248,10 +236,6 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
}
TermsAggregatorSupplier termsAggregatorSupplier = (TermsAggregatorSupplier) aggregatorSupplier;
if (collectsFromSingleBucket == false && termsAggregatorSupplier.needsToCollectFromSingleBucket()) {
return asMultiBucketAggregator(this, searchContext, parent);
}
BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds);
if (InternalOrder.isKeyOrder(order) == false
&& bucketCountThresholds.getShardSize() == TermsAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) {
@ -322,6 +306,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
Aggregator parent,
SubAggCollectionMode subAggCollectMode,
boolean showTermDocCountError,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
final IncludeExclude.StringFilter filter = includeExclude == null ? null : includeExclude.convertToStringFilter(format);
return new MapStringTermsAggregator(
@ -337,6 +322,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
parent,
subAggCollectMode,
showTermDocCountError,
collectsFromSingleBucket,
metadata
);
}
@ -354,6 +340,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
SearchContext context, Aggregator parent,
SubAggCollectionMode subAggCollectMode,
boolean showTermDocCountError,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
final long maxOrd = getMaxOrd(valuesSource, context.searcher());
@ -385,8 +372,13 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
}
final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format);
boolean remapGlobalOrds;
if (REMAP_GLOBAL_ORDS != null) {
// We use REMAP_GLOBAL_ORDS to allow tests to force specific optimizations
if (collectsFromSingleBucket && REMAP_GLOBAL_ORDS != null) {
/*
* We use REMAP_GLOBAL_ORDS to allow tests to force
* specific optimizations but this particular one
* is only possible if we're collecting from a single
* bucket.
*/
remapGlobalOrds = REMAP_GLOBAL_ORDS.booleanValue();
} else {
remapGlobalOrds = true;
@ -418,6 +410,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
remapGlobalOrds,
subAggCollectMode,
showTermDocCountError,
collectsFromSingleBucket,
metadata
);
}
@ -451,6 +444,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
Aggregator parent,
SubAggCollectionMode subAggCollectMode,
boolean showTermDocCountError,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException;
@Override

View File

@ -44,6 +44,4 @@ interface TermsAggregatorSupplier extends AggregatorSupplier {
boolean showTermDocCountError,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException;
boolean needsToCollectFromSingleBucket();
}

View File

@ -0,0 +1,200 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import static org.hamcrest.Matchers.equalTo;
public class BytesKeyedBucketOrdsTests extends ESTestCase {
private static final BytesRef SHIP_1 = new BytesRef("Just Read The Instructions");
private static final BytesRef SHIP_2 = new BytesRef("Of Course I Still Love You");
private final MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
public void testExplicitCollectsFromSingleBucket() {
collectsFromSingleBucketCase(BytesKeyedBucketOrds.build(bigArrays, true));
}
public void testSurpriseCollectsFromSingleBucket() {
collectsFromSingleBucketCase(BytesKeyedBucketOrds.build(bigArrays, false));
}
private void collectsFromSingleBucketCase(BytesKeyedBucketOrds ords) {
try {
// Test a few explicit values
assertThat(ords.add(0, SHIP_1), equalTo(0L));
assertThat(ords.add(0, SHIP_2), equalTo(1L));
assertThat(ords.add(0, SHIP_1), equalTo(-1L));
assertThat(ords.add(0, SHIP_2), equalTo(-2L));
// And some random values
Set<BytesRef> seen = new HashSet<>();
seen.add(SHIP_1);
seen.add(SHIP_2);
assertThat(ords.size(), equalTo(2L));
BytesRef[] values = new BytesRef[scaledRandomIntBetween(1, 10000)];
for (int i = 0; i < values.length; i++) {
values[i] = randomValueOtherThanMany(seen::contains, () -> new BytesRef(Long.toString(randomLong())));
seen.add(values[i]);
}
for (int i = 0; i < values.length; i++) {
assertThat(ords.add(0, values[i]), equalTo(i + 2L));
assertThat(ords.size(), equalTo(i + 3L));
if (randomBoolean()) {
assertThat(ords.add(0, SHIP_1), equalTo(-1L));
}
}
for (int i = 0; i < values.length; i++) {
assertThat(ords.add(0, values[i]), equalTo(-1 - (i + 2L)));
}
// And the explicit values are still ok
assertThat(ords.add(0, SHIP_1), equalTo(-1L));
assertThat(ords.add(0, SHIP_2), equalTo(-2L));
// Check counting values
assertThat(ords.bucketsInOrd(0), equalTo(values.length + 2L));
// Check iteration
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = ords.ordsEnum(0);
BytesRef scratch = new BytesRef();
assertTrue(ordsEnum.next());
assertThat(ordsEnum.ord(), equalTo(0L));
ordsEnum.readValue(scratch);
assertThat(scratch, equalTo(SHIP_1));
assertTrue(ordsEnum.next());
assertThat(ordsEnum.ord(), equalTo(1L));
ordsEnum.readValue(scratch);
assertThat(scratch, equalTo(SHIP_2));
for (int i = 0; i < values.length; i++) {
assertTrue(ordsEnum.next());
assertThat(ordsEnum.ord(), equalTo(i + 2L));
ordsEnum.readValue(scratch);
assertThat(scratch, equalTo(values[i]));
}
assertFalse(ordsEnum.next());
} finally {
ords.close();
}
}
public void testCollectsFromManyBuckets() {
try (BytesKeyedBucketOrds ords = BytesKeyedBucketOrds.build(bigArrays, false)) {
// Test a few explicit values
assertThat(ords.add(0, SHIP_1), equalTo(0L));
assertThat(ords.add(1, SHIP_1), equalTo(1L));
assertThat(ords.add(0, SHIP_1), equalTo(-1L));
assertThat(ords.add(1, SHIP_1), equalTo(-2L));
assertThat(ords.size(), equalTo(2L));
// And some random values
Set<OwningBucketOrdAndValue> seen = new HashSet<>();
seen.add(new OwningBucketOrdAndValue(0, SHIP_1));
seen.add(new OwningBucketOrdAndValue(1, SHIP_1));
OwningBucketOrdAndValue[] values = new OwningBucketOrdAndValue[scaledRandomIntBetween(1, 10000)];
long maxOwningBucketOrd = scaledRandomIntBetween(0, values.length);
for (int i = 0; i < values.length; i++) {
values[i] = randomValueOtherThanMany(seen::contains, () ->
new OwningBucketOrdAndValue(randomLongBetween(0, maxOwningBucketOrd), new BytesRef(Long.toString(randomLong()))));
seen.add(values[i]);
}
for (int i = 0; i < values.length; i++) {
assertThat(ords.add(values[i].owningBucketOrd, values[i].value), equalTo(i + 2L));
assertThat(ords.size(), equalTo(i + 3L));
if (randomBoolean()) {
assertThat(ords.add(0, SHIP_1), equalTo(-1L));
}
}
for (int i = 0; i < values.length; i++) {
assertThat(ords.add(values[i].owningBucketOrd, values[i].value), equalTo(-1 - (i + 2L)));
}
// And the explicit values are still ok
assertThat(ords.add(0, SHIP_1), equalTo(-1L));
assertThat(ords.add(1, SHIP_1), equalTo(-2L));
BytesRef scratch = new BytesRef();
for (long owningBucketOrd = 0; owningBucketOrd <= maxOwningBucketOrd; owningBucketOrd++) {
long expectedCount = 0;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = ords.ordsEnum(owningBucketOrd);
if (owningBucketOrd <= 1) {
expectedCount++;
assertTrue(ordsEnum.next());
assertThat(ordsEnum.ord(), equalTo(owningBucketOrd));
ordsEnum.readValue(scratch);
assertThat(scratch, equalTo(SHIP_1));
}
for (int i = 0; i < values.length; i++) {
if (values[i].owningBucketOrd == owningBucketOrd) {
expectedCount++;
assertTrue(ordsEnum.next());
assertThat(ordsEnum.ord(), equalTo(i + 2L));
ordsEnum.readValue(scratch);
assertThat(scratch, equalTo(values[i].value));
}
}
assertFalse(ordsEnum.next());
assertThat(ords.bucketsInOrd(owningBucketOrd), equalTo(expectedCount));
}
assertFalse(ords.ordsEnum(randomLongBetween(maxOwningBucketOrd + 1, Long.MAX_VALUE)).next());
assertThat(ords.bucketsInOrd(randomLongBetween(maxOwningBucketOrd + 1, Long.MAX_VALUE)), equalTo(0L));
}
}
private class OwningBucketOrdAndValue {
private final long owningBucketOrd;
private final BytesRef value;
OwningBucketOrdAndValue(long owningBucketOrd, BytesRef value) {
this.owningBucketOrd = owningBucketOrd;
this.value = value;
}
@Override
public String toString() {
return owningBucketOrd + "/" + value;
}
@Override
public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
OwningBucketOrdAndValue other = (OwningBucketOrdAndValue) obj;
return owningBucketOrd == other.owningBucketOrd && value == other.value;
}
@Override
public int hashCode() {
return Objects.hash(owningBucketOrd, value);
}
}
}

View File

@ -49,6 +49,8 @@ public class LongKeyedBucketOrdsTests extends ESTestCase {
assertThat(ords.add(0, 1000), equalTo(1L));
assertThat(ords.add(0, 0), equalTo(-1L));
assertThat(ords.add(0, 1000), equalTo(-2L));
assertThat(ords.find(0, 0), equalTo(0L));
assertThat(ords.find(0, 1000), equalTo(1L));
// And some random values
Set<Long> seen = new HashSet<>();
@ -61,7 +63,9 @@ public class LongKeyedBucketOrdsTests extends ESTestCase {
seen.add(values[i]);
}
for (int i = 0; i < values.length; i++) {
assertThat(ords.find(0, values[i]), equalTo(-1L));
assertThat(ords.add(0, values[i]), equalTo(i + 2L));
assertThat(ords.find(0, values[i]), equalTo(i + 2L));
assertThat(ords.size(), equalTo(i + 3L));
if (randomBoolean()) {
assertThat(ords.add(0, 0), equalTo(-1L));
@ -79,19 +83,19 @@ public class LongKeyedBucketOrdsTests extends ESTestCase {
assertThat(ords.bucketsInOrd(0), equalTo(values.length + 2L));
// Check iteration
LongKeyedBucketOrds.BucketOrdsEnum ordEnum = ords.ordsEnum(0);
assertTrue(ordEnum.next());
assertThat(ordEnum.ord(), equalTo(0L));
assertThat(ordEnum.value(), equalTo(0L));
assertTrue(ordEnum.next());
assertThat(ordEnum.ord(), equalTo(1L));
assertThat(ordEnum.value(), equalTo(1000L));
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = ords.ordsEnum(0);
assertTrue(ordsEnum.next());
assertThat(ordsEnum.ord(), equalTo(0L));
assertThat(ordsEnum.value(), equalTo(0L));
assertTrue(ordsEnum.next());
assertThat(ordsEnum.ord(), equalTo(1L));
assertThat(ordsEnum.value(), equalTo(1000L));
for (int i = 0; i < values.length; i++) {
assertTrue(ordEnum.next());
assertThat(ordEnum.ord(), equalTo(i + 2L));
assertThat(ordEnum.value(), equalTo(values[i]));
assertTrue(ordsEnum.next());
assertThat(ordsEnum.ord(), equalTo(i + 2L));
assertThat(ordsEnum.value(), equalTo(values[i]));
}
assertFalse(ordEnum.next());
assertFalse(ordsEnum.next());
} finally {
ords.close();
}
@ -105,6 +109,8 @@ public class LongKeyedBucketOrdsTests extends ESTestCase {
assertThat(ords.add(0, 0), equalTo(-1L));
assertThat(ords.add(1, 0), equalTo(-2L));
assertThat(ords.size(), equalTo(2L));
assertThat(ords.find(0, 0), equalTo(0L));
assertThat(ords.find(1, 0), equalTo(1L));
// And some random values
Set<OwningBucketOrdAndValue> seen = new HashSet<>();
@ -118,7 +124,9 @@ public class LongKeyedBucketOrdsTests extends ESTestCase {
seen.add(values[i]);
}
for (int i = 0; i < values.length; i++) {
assertThat(ords.find(values[i].owningBucketOrd, values[i].value), equalTo(-1L));
assertThat(ords.add(values[i].owningBucketOrd, values[i].value), equalTo(i + 2L));
assertThat(ords.find(values[i].owningBucketOrd, values[i].value), equalTo(i + 2L));
assertThat(ords.size(), equalTo(i + 3L));
if (randomBoolean()) {
assertThat(ords.add(0, 0), equalTo(-1L));
@ -135,22 +143,22 @@ public class LongKeyedBucketOrdsTests extends ESTestCase {
for (long owningBucketOrd = 0; owningBucketOrd <= maxOwningBucketOrd; owningBucketOrd++) {
long expectedCount = 0;
LongKeyedBucketOrds.BucketOrdsEnum ordEnum = ords.ordsEnum(owningBucketOrd);
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = ords.ordsEnum(owningBucketOrd);
if (owningBucketOrd <= 1) {
expectedCount++;
assertTrue(ordEnum.next());
assertThat(ordEnum.ord(), equalTo(owningBucketOrd));
assertThat(ordEnum.value(), equalTo(0L));
assertTrue(ordsEnum.next());
assertThat(ordsEnum.ord(), equalTo(owningBucketOrd));
assertThat(ordsEnum.value(), equalTo(0L));
}
for (int i = 0; i < values.length; i++) {
if (values[i].owningBucketOrd == owningBucketOrd) {
expectedCount++;
assertTrue(ordEnum.next());
assertThat(ordEnum.ord(), equalTo(i + 2L));
assertThat(ordEnum.value(), equalTo(values[i].value));
assertTrue(ordsEnum.next());
assertThat(ordsEnum.ord(), equalTo(i + 2L));
assertThat(ordsEnum.value(), equalTo(values[i].value));
}
}
assertFalse(ordEnum.next());
assertFalse(ordsEnum.next());
assertThat(ords.bucketsInOrd(owningBucketOrd), equalTo(expectedCount));
}

View File

@ -1307,6 +1307,54 @@ public class TermsAggregatorTests extends AggregatorTestCase {
}, fieldType);
}
public void testThreeLayerStringViaGlobalOrds() throws IOException {
threeLayerStringTestCase("global_ordinals");
}
public void testThreeLayerStringViaMap() throws IOException {
threeLayerStringTestCase("map");
}
private void threeLayerStringTestCase(String executionHint) throws IOException {
try (Directory dir = newDirectory()) {
try (RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) {
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
for (int k = 0; k < 10; k++) {
Document d = new Document();
d.add(new SortedDocValuesField("i", new BytesRef(Integer.toString(i))));
d.add(new SortedDocValuesField("j", new BytesRef(Integer.toString(j))));
d.add(new SortedDocValuesField("k", new BytesRef(Integer.toString(k))));
writer.addDocument(d);
}
}
}
try (IndexReader reader = maybeWrapReaderEs(writer.getReader())) {
IndexSearcher searcher = newIndexSearcher(reader);
TermsAggregationBuilder request = new TermsAggregationBuilder("i").field("i").executionHint(executionHint)
.subAggregation(new TermsAggregationBuilder("j").field("j").executionHint(executionHint)
.subAggregation(new TermsAggregationBuilder("k").field("k").executionHint(executionHint)));
StringTerms result = search(searcher, new MatchAllDocsQuery(), request,
keywordField("i"), keywordField("j"), keywordField("k"));
for (int i = 0; i < 10; i++) {
StringTerms.Bucket iBucket = result.getBucketByKey(Integer.toString(i));
assertThat(iBucket.getDocCount(), equalTo(100L));
StringTerms jAgg = iBucket.getAggregations().get("j");
for (int j = 0; j < 10; j++) {
StringTerms.Bucket jBucket = jAgg.getBucketByKey(Integer.toString(j));
assertThat(jBucket.getDocCount(), equalTo(10L));
StringTerms kAgg = jBucket.getAggregations().get("k");
for (int k = 0; k < 10; k++) {
StringTerms.Bucket kBucket = kAgg.getBucketByKey(Integer.toString(k));
assertThat(kBucket.getDocCount(), equalTo(1L));
}
}
}
}
}
}
}
public void testThreeLayerLong() throws IOException {
try (Directory dir = newDirectory()) {
try (RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) {