diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java index 3ef1737a120..6221769d5a6 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java @@ -45,6 +45,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.notNullValue; @ESIntegTestCase.SuiteScopeTestCase @@ -59,7 +60,12 @@ public class AggregationProfilerIT extends ESIntegTestCase { private static final String TOTAL_BUCKETS = "total_buckets"; private static final String WRAPPED = "wrapped_in_multi_bucket_aggregator"; - private static final Object DEFERRED = "deferred_aggregators"; + private static final String DEFERRED = "deferred_aggregators"; + private static final String COLLECTION_STRAT = "collection_strategy"; + private static final String RESULT_STRAT = "result_strategy"; + private static final String HAS_FILTER = "has_filter"; + private static final String SEGMENTS_WITH_SINGLE = "segments_with_single_valued_ords"; + private static final String SEGMENTS_WITH_MULTI = "segments_with_multi_valued_ords"; private static final String NUMBER_FIELD = "number"; private static final String TAG_FIELD = "tag"; @@ -73,6 +79,7 @@ public class AggregationProfilerIT extends ESIntegTestCase { @Override protected void setupSuiteScopeCluster() throws Exception { assertAcked(client().admin().indices().prepareCreate("idx") + .setSettings(org.elasticsearch.common.collect.Map.of("number_of_shards", 1, "number_of_replicas", 0)) .addMapping("type", STRING_FIELD, "type=keyword", NUMBER_FIELD, "type=integer", TAG_FIELD, "type=keyword").get()); List builders = new ArrayList<>(); @@ -90,7 +97,7 @@ public class AggregationProfilerIT extends ESIntegTestCase { .endObject())); } - indexRandom(true, builders); + indexRandom(true, false, builders); createIndex("idx_unmapped"); } @@ -184,7 +191,7 @@ public class AggregationProfilerIT extends ESIntegTestCase { assertThat(termsBreakdown.get(COLLECT), greaterThan(0L)); assertThat(termsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L)); assertThat(termsBreakdown.get(REDUCE), equalTo(0L)); - assertThat(termsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true))); + assertRemapTermsDebugInfo(termsAggResult); assertThat(termsAggResult.getProfiledChildren().size(), equalTo(1)); ProfileResult avgAggResult = termsAggResult.getProfiledChildren().get(0); @@ -204,6 +211,18 @@ public class AggregationProfilerIT extends ESIntegTestCase { } } + private void assertRemapTermsDebugInfo(ProfileResult termsAggResult) { + assertThat(termsAggResult.getDebugInfo(), hasEntry(COLLECTION_STRAT, "remap")); + assertThat(termsAggResult.getDebugInfo(), hasEntry(RESULT_STRAT, "terms")); + assertThat(termsAggResult.getDebugInfo(), hasEntry(HAS_FILTER, false)); + // TODO we only index single valued docs but the ordinals ends up with multi valued sometimes + assertThat( + termsAggResult.getDebugInfo().toString(), + (int) termsAggResult.getDebugInfo().get(SEGMENTS_WITH_SINGLE) + (int) termsAggResult.getDebugInfo().get(SEGMENTS_WITH_MULTI), + greaterThan(0) + ); + } + public void testMultiLevelProfileBreadthFirst() { SearchResponse response = client().prepareSearch("idx").setProfile(true) .addAggregation(histogram("histo").field(NUMBER_FIELD).interval(1L).subAggregation(terms("terms") @@ -251,7 +270,7 @@ public class AggregationProfilerIT extends ESIntegTestCase { assertThat(termsBreakdown.get(COLLECT), greaterThan(0L)); assertThat(termsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L)); assertThat(termsBreakdown.get(REDUCE), equalTo(0L)); - assertThat(termsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true))); + assertRemapTermsDebugInfo(termsAggResult); assertThat(termsAggResult.getProfiledChildren().size(), equalTo(1)); ProfileResult avgAggResult = termsAggResult.getProfiledChildren().get(0); @@ -378,7 +397,7 @@ public class AggregationProfilerIT extends ESIntegTestCase { assertThat(tagsBreakdown.get(COLLECT), greaterThan(0L)); assertThat(tagsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L)); assertThat(tagsBreakdown.get(REDUCE), equalTo(0L)); - assertThat(tagsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true))); + assertRemapTermsDebugInfo(tagsAggResult); assertThat(tagsAggResult.getProfiledChildren().size(), equalTo(2)); Map tagsAggResultSubAggregations = tagsAggResult.getProfiledChildren().stream() @@ -423,7 +442,7 @@ public class AggregationProfilerIT extends ESIntegTestCase { assertThat(stringsBreakdown.get(COLLECT), greaterThan(0L)); assertThat(stringsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L)); assertThat(stringsBreakdown.get(REDUCE), equalTo(0L)); - assertThat(stringsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true))); + assertRemapTermsDebugInfo(stringsAggResult); assertThat(stringsAggResult.getProfiledChildren().size(), equalTo(3)); Map stringsAggResultSubAggregations = stringsAggResult.getProfiledChildren().stream() @@ -469,7 +488,7 @@ public class AggregationProfilerIT extends ESIntegTestCase { assertThat(tagsBreakdown.get(COLLECT), greaterThan(0L)); assertThat(tagsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L)); assertThat(tagsBreakdown.get(REDUCE), equalTo(0L)); - assertThat(tagsAggResult.getDebugInfo(), equalTo(org.elasticsearch.common.collect.Map.of(WRAPPED, true))); + assertRemapTermsDebugInfo(tagsAggResult); assertThat(tagsAggResult.getProfiledChildren().size(), equalTo(2)); tagsAggResultSubAggregations = tagsAggResult.getProfiledChildren().stream() diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java index 2ed53216844..aefba9bac33 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java @@ -180,7 +180,7 @@ public abstract class Aggregator extends BucketCollector implements Releasable { public abstract InternalAggregation buildEmptyAggregation(); /** - * Collect debug information to add to the profiling results.. This will + * Collect debug information to add to the profiling results. This will * only be called if the aggregation is being profiled. *

* Well behaved implementations will always call the superclass diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BytesKeyedBucketOrds.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BytesKeyedBucketOrds.java new file mode 100644 index 00000000000..c25a757d810 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BytesKeyedBucketOrds.java @@ -0,0 +1,220 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.terms; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BytesRefHash; + +/** + * Maps {@link BytesRef} bucket keys to bucket ordinals. + */ +public abstract class BytesKeyedBucketOrds implements Releasable { + /** + * Build a {@link LongKeyedBucketOrds}. + */ + public static BytesKeyedBucketOrds build(BigArrays bigArrays, boolean collectsFromSingleBucket) { + return collectsFromSingleBucket ? new FromSingle(bigArrays) : new FromMany(bigArrays); + } + + private BytesKeyedBucketOrds() {} + + /** + * Add the {@code owningBucketOrd, value} pair. Return the ord for + * their bucket if they have yet to be added, or {@code -1-ord} + * if they were already present. + */ + public abstract long add(long owningBucketOrd, BytesRef value); + + /** + * Count the buckets in {@code owningBucketOrd}. + */ + public abstract long bucketsInOrd(long owningBucketOrd); + + /** + * The number of collected buckets. + */ + public abstract long size(); + + /** + * Build an iterator for buckets inside {@code owningBucketOrd} in order + * of increasing ord. + *

+ * When this is first returns it is "unpositioned" and you must call + * {@link BucketOrdsEnum#next()} to move it to the first value. + */ + public abstract BucketOrdsEnum ordsEnum(long owningBucketOrd); + + /** + * An iterator for buckets inside a particular {@code owningBucketOrd}. + */ + public interface BucketOrdsEnum { + /** + * Advance to the next value. + * @return {@code true} if there *is* a next value, + * {@code false} if there isn't + */ + boolean next(); + + /** + * The ordinal of the current value. + */ + long ord(); + + /** + * Read the current value. + */ + void readValue(BytesRef dest); + + /** + * An {@linkplain BucketOrdsEnum} that is empty. + */ + BucketOrdsEnum EMPTY = new BucketOrdsEnum() { + @Override + public boolean next() { + return false; + } + + @Override + public long ord() { + return 0; + } + + @Override + public void readValue(BytesRef dest) {} + }; + } + + /** + * Implementation that only works if it is collecting from a single bucket. + */ + private static class FromSingle extends BytesKeyedBucketOrds { + private final BytesRefHash ords; + + private FromSingle(BigArrays bigArrays) { + ords = new BytesRefHash(1, bigArrays); + } + + @Override + public long add(long owningBucketOrd, BytesRef value) { + assert owningBucketOrd == 0; + return ords.add(value); + } + + @Override + public long bucketsInOrd(long owningBucketOrd) { + return ords.size(); + } + + @Override + public long size() { + return ords.size(); + } + + @Override + public BucketOrdsEnum ordsEnum(long owningBucketOrd) { + return new BucketOrdsEnum() { + private int ord = -1; + + @Override + public boolean next() { + ord++; + return ord < ords.size(); + } + + @Override + public long ord() { + return ord; + } + + @Override + public void readValue(BytesRef dest) { + ords.get(ord, dest); + } + }; + } + + @Override + public void close() { + ords.close(); + } + } + + /** + * Implementation that works properly when collecting from many buckets. + */ + private static class FromMany extends BytesKeyedBucketOrds { + // TODO we can almost certainly do better here by building something fit for purpose rather than trying to lego together stuff + private final BytesRefHash bytesToLong; + private final LongKeyedBucketOrds longToBucketOrds; + + private FromMany(BigArrays bigArrays) { + bytesToLong = new BytesRefHash(1, bigArrays); + longToBucketOrds = LongKeyedBucketOrds.build(bigArrays, false); + } + + @Override + public long add(long owningBucketOrd, BytesRef value) { + long l = bytesToLong.add(value); + if (l < 0) { + l = -1 - l; + } + return longToBucketOrds.add(owningBucketOrd, l); + } + + @Override + public long bucketsInOrd(long owningBucketOrd) { + return longToBucketOrds.bucketsInOrd(owningBucketOrd); + } + + @Override + public long size() { + return longToBucketOrds.size(); + } + + @Override + public BucketOrdsEnum ordsEnum(long owningBucketOrd) { + LongKeyedBucketOrds.BucketOrdsEnum delegate = longToBucketOrds.ordsEnum(owningBucketOrd); + return new BucketOrdsEnum() { + @Override + public boolean next() { + return delegate.next(); + } + + @Override + public long ord() { + return delegate.ord(); + } + + @Override + public void readValue(BytesRef dest) { + bytesToLong.get(delegate.value(), dest); + } + }; + } + + @Override + public void close() { + Releasables.close(bytesToLong, longToBucketOrds); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index e9c0c7efccd..ebacf62d7ad 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -31,9 +31,11 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.IntArray; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.BucketOrder; @@ -93,6 +95,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr boolean remapGlobalOrds, SubAggCollectionMode collectionMode, boolean showTermDocCountError, + boolean collectsFromSingleBucket, Map metadata ) throws IOException { super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata); @@ -104,7 +107,14 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr this.valueCount = values.getValueCount(); this.lookupGlobalOrd = values::lookupOrd; this.acceptedGlobalOrdinals = includeExclude == null ? ALWAYS_TRUE : includeExclude.acceptedGlobalOrdinals(values)::get; - this.collectionStrategy = remapGlobalOrds ? new RemapGlobalOrds() : new DenseGlobalOrds(); + if (remapGlobalOrds) { + this.collectionStrategy = new RemapGlobalOrds(collectsFromSingleBucket); + } else { + if (false == collectsFromSingleBucket) { + throw new AggregationExecutionException("Dense ords don't know how to collect from many buckets"); + } + this.collectionStrategy = new DenseGlobalOrds(); + } } String descriptCollectionStrategy() { @@ -126,19 +136,17 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, globalOrds) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { - assert owningBucketOrd == 0; if (false == singleValues.advanceExact(doc)) { return; } int globalOrd = singleValues.ordValue(); - collectionStrategy.collectGlobalOrd(doc, globalOrd, sub); + collectionStrategy.collectGlobalOrd(owningBucketOrd, doc, globalOrd, sub); } }); } return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, globalOrds) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { - assert owningBucketOrd == 0; if (false == singleValues.advanceExact(doc)) { return; } @@ -146,7 +154,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr if (false == acceptedGlobalOrdinals.test(globalOrd)) { return; } - collectionStrategy.collectGlobalOrd(doc, globalOrd, sub); + collectionStrategy.collectGlobalOrd(owningBucketOrd, doc, globalOrd, sub); } }); } @@ -159,12 +167,11 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, globalOrds) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { - assert owningBucketOrd == 0; if (false == globalOrds.advanceExact(doc)) { return; } for (long globalOrd = globalOrds.nextOrd(); globalOrd != NO_MORE_ORDS; globalOrd = globalOrds.nextOrd()) { - collectionStrategy.collectGlobalOrd(doc, globalOrd, sub); + collectionStrategy.collectGlobalOrd(owningBucketOrd, doc, globalOrd, sub); } } }); @@ -172,7 +179,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, globalOrds) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { - assert owningBucketOrd == 0; if (false == globalOrds.advanceExact(doc)) { return; } @@ -180,7 +186,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr if (false == acceptedGlobalOrdinals.test(globalOrd)) { continue; } - collectionStrategy.collectGlobalOrd(doc, globalOrd, sub); + collectionStrategy.collectGlobalOrd(owningBucketOrd, doc, globalOrd, sub); } } }); @@ -200,6 +206,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr public void collectDebugInfo(BiConsumer add) { super.collectDebugInfo(add); add.accept("collection_strategy", collectionStrategy.describe()); + collectionStrategy.collectDebugInfo(add); add.accept("result_strategy", resultStrategy.describe()); add.accept("segments_with_single_valued_ords", segmentsWithSingleValuedOrds); add.accept("segments_with_multi_valued_ords", segmentsWithMultiValuedOrds); @@ -276,13 +283,13 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr BucketCountThresholds bucketCountThresholds, SearchContext context, Aggregator parent, - boolean forceDenseMode, + boolean remapGlobalOrds, SubAggCollectionMode collectionMode, boolean showTermDocCountError, Map metadata ) throws IOException { super(name, factories, a -> a.new StandardTermsResults(), valuesSource, order, format, bucketCountThresholds, null, - context, parent, forceDenseMode, collectionMode, showTermDocCountError, metadata); + context, parent, remapGlobalOrds, collectionMode, showTermDocCountError, true, metadata); assert factories == null || factories.countAggregators() == 0; this.segmentDocCounts = context.bigArrays().newIntArray(1, true); } @@ -350,7 +357,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } long ord = i - 1; // remember we do +1 when counting long globalOrd = mapping.applyAsLong(ord); - incrementBucketDocCount(collectionStrategy.globalOrdToBucketOrd(globalOrd), inc); + incrementBucketDocCount(collectionStrategy.globalOrdToBucketOrd(0, globalOrd), inc); } } } @@ -360,10 +367,9 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr *

* The {@link GlobalOrdinalsStringTermsAggregator} uses one of these * to collect the global ordinals by calling - * {@link CollectionStrategy#collectGlobalOrd(int, long, LeafBucketCollector)} - * for each global ordinal that it hits and then calling - * {@link CollectionStrategy#forEach(BucketInfoConsumer)} once to iterate on - * the results. + * {@link CollectionStrategy#collectGlobalOrd} for each global ordinal + * that it hits and then calling {@link CollectionStrategy#forEach} + * once to iterate on the results. */ abstract class CollectionStrategy implements Releasable { /** @@ -371,6 +377,11 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr * output to help with debugging. */ abstract String describe(); + /** + * Collect debug information to add to the profiling results. This will + * only be called if the aggregation is being profiled. + */ + abstract void collectDebugInfo(BiConsumer add); /** * Called when the global ordinals are ready. */ @@ -379,31 +390,31 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr * Called once per unique document, global ordinal combination to * collect the bucket. * + * @param owningBucketOrd the ordinal of the bucket that owns this collection * @param doc the doc id in to collect * @param globalOrd the global ordinal to collect * @param sub the sub-aggregators that that will collect the bucket data */ - abstract void collectGlobalOrd(int doc, long globalOrd, LeafBucketCollector sub) throws IOException; + abstract void collectGlobalOrd(long owningBucketOrd, int doc, long globalOrd, LeafBucketCollector sub) throws IOException; /** * Convert a global ordinal into a bucket ordinal. */ - abstract long globalOrdToBucketOrd(long globalOrd); + abstract long globalOrdToBucketOrd(long owningBucketOrd, long globalOrd); /** * Iterate all of the buckets. Implementations take into account * the {@link BucketCountThresholds}. In particular, * if the {@link BucketCountThresholds#getMinDocCount()} is 0 then * they'll make sure to iterate a bucket even if it was never - * {{@link #collectGlobalOrd(int, long, LeafBucketCollector) collected}. + * {{@link #collectGlobalOrd collected}. * If {@link BucketCountThresholds#getMinDocCount()} is not 0 then * they'll skip all global ords that weren't collected. */ - abstract void forEach(BucketInfoConsumer consumer) throws IOException; + abstract void forEach(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException; } interface BucketInfoConsumer { void accept(long globalOrd, long bucketOrd, long docCount) throws IOException; } - /** * {@linkplain CollectionStrategy} that just uses the global ordinal as the * bucket ordinal. @@ -414,23 +425,29 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr return "dense"; } + @Override + void collectDebugInfo(BiConsumer add) {} + @Override void globalOrdsReady(SortedSetDocValues globalOrds) { grow(globalOrds.getValueCount()); } @Override - void collectGlobalOrd(int doc, long globalOrd, LeafBucketCollector sub) throws IOException { + void collectGlobalOrd(long owningBucketOrd, int doc, long globalOrd, LeafBucketCollector sub) throws IOException { + assert owningBucketOrd == 0; collectExistingBucket(sub, doc, globalOrd); } @Override - long globalOrdToBucketOrd(long globalOrd) { + long globalOrdToBucketOrd(long owningBucketOrd, long globalOrd) { + assert owningBucketOrd == 0; return globalOrd; } @Override - void forEach(BucketInfoConsumer consumer) throws IOException { + void forEach(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException { + assert owningBucketOrd == 0; for (long globalOrd = 0; globalOrd < valueCount; globalOrd++) { if (false == acceptedGlobalOrdinals.test(globalOrd)) { continue; @@ -452,20 +469,29 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr * {@link DenseGlobalOrds} when collecting every ordinal, but significantly * less when collecting only a few. */ - class RemapGlobalOrds extends CollectionStrategy { - private final LongHash bucketOrds = new LongHash(1, context.bigArrays()); + private class RemapGlobalOrds extends CollectionStrategy { + private final LongKeyedBucketOrds bucketOrds; + + private RemapGlobalOrds(boolean collectsFromSingleBucket) { + bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket); + } @Override String describe() { return "remap"; } + @Override + void collectDebugInfo(BiConsumer add) { + add.accept("total_buckets", bucketOrds.size()); + } + @Override void globalOrdsReady(SortedSetDocValues globalOrds) {} @Override - void collectGlobalOrd(int doc, long globalOrd, LeafBucketCollector sub) throws IOException { - long bucketOrd = bucketOrds.add(globalOrd); + void collectGlobalOrd(long owningBucketOrd, int doc, long globalOrd, LeafBucketCollector sub) throws IOException { + long bucketOrd = bucketOrds.add(owningBucketOrd, globalOrd); if (bucketOrd < 0) { bucketOrd = -1 - bucketOrd; collectExistingBucket(sub, doc, bucketOrd); @@ -475,33 +501,32 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } @Override - long globalOrdToBucketOrd(long globalOrd) { - return bucketOrds.find(globalOrd); + long globalOrdToBucketOrd(long owningBucketOrd, long globalOrd) { + return bucketOrds.find(owningBucketOrd, globalOrd); } @Override - void forEach(BucketInfoConsumer consumer) throws IOException { + void forEach(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException { if (bucketCountThresholds.getMinDocCount() == 0) { for (long globalOrd = 0; globalOrd < valueCount; globalOrd++) { if (false == acceptedGlobalOrdinals.test(globalOrd)) { continue; } - long bucketOrd = bucketOrds.find(globalOrd); + long bucketOrd = bucketOrds.find(owningBucketOrd, globalOrd); long docCount = bucketOrd < 0 ? 0 : bucketDocCount(bucketOrd); consumer.accept(globalOrd, bucketOrd, docCount); } } else { - for (long bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) { - long globalOrd = bucketOrds.get(bucketOrd); - if (false == acceptedGlobalOrdinals.test(globalOrd)) { + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); + while (ordsEnum.next()) { + if (false == acceptedGlobalOrdinals.test(ordsEnum.value())) { continue; } - consumer.accept(globalOrd, bucketOrd, bucketDocCount(bucketOrd)); + consumer.accept(ordsEnum.value(), ordsEnum.ord(), bucketDocCount(ordsEnum.ord())); } } } - @Override public void close() { bucketOrds.close(); @@ -517,47 +542,58 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable { private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; if (valueCount == 0) { // no context in this reader - return new InternalAggregation[] {buildEmptyAggregation()}; - } - - final int size; - if (bucketCountThresholds.getMinDocCount() == 0) { - // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns - size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); - } else { - size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize()); - } - long[] otherDocCount = new long[1]; - PriorityQueue ordered = buildPriorityQueue(size); - collectionStrategy.forEach(new BucketInfoConsumer() { - TB spare = null; - - @Override - public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { - otherDocCount[0] += docCount; - if (docCount >= bucketCountThresholds.getShardMinDocCount()) { - if (spare == null) { - spare = buildEmptyTemporaryBucket(); - } - updateBucket(spare, globalOrd, bucketOrd, docCount); - spare = ordered.insertWithOverflow(spare); - } + InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; + for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { + results[ordIdx] = buildNoValuesResult(owningBucketOrds[ordIdx]); } - }); - - // Get the top buckets - B[] topBuckets = buildBuckets(ordered.size()); - for (int i = ordered.size() - 1; i >= 0; --i) { - topBuckets[i] = convertTempBucketToRealBucket(ordered.pop()); - otherDocCount[0] -= topBuckets[i].getDocCount(); + return results; } - buildSubAggs(topBuckets); - return new InternalAggregation[] { - buildResult(topBuckets, otherDocCount[0]) - }; + B[][] topBucketsPreOrd = buildTopBucketsPerOrd(owningBucketOrds.length); + long[] otherDocCount = new long[owningBucketOrds.length]; + for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { + final int size; + if (bucketCountThresholds.getMinDocCount() == 0) { + // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns + size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); + } else { + size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize()); + } + PriorityQueue ordered = buildPriorityQueue(size); + final int finalOrdIdx = ordIdx; + BucketUpdater updater = bucketUpdater(owningBucketOrds[ordIdx]); + collectionStrategy.forEach(owningBucketOrds[ordIdx], new BucketInfoConsumer() { + TB spare = null; + + @Override + public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { + otherDocCount[finalOrdIdx] += docCount; + if (docCount >= bucketCountThresholds.getShardMinDocCount()) { + if (spare == null) { + spare = buildEmptyTemporaryBucket(); + } + updater.updateBucket(spare, globalOrd, bucketOrd, docCount); + spare = ordered.insertWithOverflow(spare); + } + } + }); + + // Get the top buckets + topBucketsPreOrd[ordIdx] = buildBuckets(ordered.size()); + for (int i = ordered.size() - 1; i >= 0; --i) { + topBucketsPreOrd[ordIdx][i] = convertTempBucketToRealBucket(ordered.pop()); + otherDocCount[ordIdx] -= topBucketsPreOrd[ordIdx][i].getDocCount(); + } + } + + buildSubAggs(topBucketsPreOrd); + + InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; + for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { + results[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCount[ordIdx], topBucketsPreOrd[ordIdx]); + } + return results; } /** @@ -581,7 +617,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr * Update fields in {@code spare} to reflect information collected for * this bucket ordinal. */ - abstract void updateBucket(TB spare, long globalOrd, long bucketOrd, long docCount) throws IOException; + abstract BucketUpdater bucketUpdater(long owningBucketOrd) throws IOException; /** * Build a {@link PriorityQueue} to sort the buckets. After we've @@ -589,6 +625,11 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr */ abstract PriorityQueue buildPriorityQueue(int size); + /** + * Build an array to hold the "top" buckets for each ordinal. + */ + abstract B[][] buildTopBucketsPerOrd(int size); + /** * Build an array of buckets for a particular ordinal to collect the * results. The populated list is passed to {@link #buildResult}. @@ -604,18 +645,27 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr * Build the sub-aggregations into the buckets. This will usually * delegate to {@link #buildSubAggsForAllBuckets}. */ - abstract void buildSubAggs(B[] topBuckets) throws IOException; + abstract void buildSubAggs(B[][] topBucketsPreOrd) throws IOException; /** * Turn the buckets into an aggregation result. */ - abstract R buildResult(B[] topBuckets, long otherDocCount); + abstract R buildResult(long owningBucketOrd, long otherDocCount, B[] topBuckets); /** * Build an "empty" result. Only called if there isn't any data on this * shard. */ abstract R buildEmptyResult(); + + /** + * Build an "empty" result for a particular bucket ordinal. Called when + * there aren't any values for the field on this shard. + */ + abstract R buildNoValuesResult(long owningBucketOrdinal); + } + interface BucketUpdater { + void updateBucket(TB spare, long globalOrd, long bucketOrd, long docCount) throws IOException; } /** @@ -632,6 +682,11 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr return primary; } + @Override + StringTerms.Bucket[][] buildTopBucketsPerOrd(int size) { + return new StringTerms.Bucket[size][]; + } + @Override StringTerms.Bucket[] buildBuckets(int size) { return new StringTerms.Bucket[size]; @@ -643,10 +698,12 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } @Override - void updateBucket(OrdBucket spare, long globalOrd, long bucketOrd, long docCount) throws IOException { - spare.globalOrd = globalOrd; - spare.bucketOrd = bucketOrd; - spare.docCount = docCount; + BucketUpdater bucketUpdater(long owningBucketOrd) throws IOException { + return (spare, globalOrd, bucketOrd, docCount) -> { + spare.globalOrd = globalOrd; + spare.bucketOrd = bucketOrd; + spare.docCount = docCount; + }; } @Override @@ -663,12 +720,12 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } @Override - void buildSubAggs(StringTerms.Bucket[] topBuckets) throws IOException { - buildSubAggsForBuckets(topBuckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + void buildSubAggs(StringTerms.Bucket[][] topBucketsPreOrd) throws IOException { + buildSubAggsForAllBuckets(topBucketsPreOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); } @Override - StringTerms buildResult(StringTerms.Bucket[] topBuckets, long otherDocCount) { + StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bucket[] topBuckets) { return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, Arrays.asList(topBuckets), 0); @@ -679,6 +736,11 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr return buildEmptyTermsAggregation(); } + @Override + StringTerms buildNoValuesResult(long owningBucketOrdinal) { + return buildEmptyResult(); + } + @Override public void close() {} } @@ -695,7 +757,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr private final SignificantTermsAggregatorFactory termsAggFactory; private final SignificanceHeuristic significanceHeuristic; - private long subsetSize = 0; + private LongArray subsetSizes = context.bigArrays().newLongArray(1, true); SignificantTermsResults(SignificantTermsAggregatorFactory termsAggFactory, SignificanceHeuristic significanceHeuristic) { this.termsAggFactory = termsAggFactory; @@ -713,11 +775,17 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr @Override public void collect(int doc, long owningBucketOrd) throws IOException { super.collect(doc, owningBucketOrd); - subsetSize++; + subsetSizes = context.bigArrays().grow(subsetSizes, owningBucketOrd + 1); + subsetSizes.increment(owningBucketOrd, 1); } }; } + @Override + SignificantStringTerms.Bucket[][] buildTopBucketsPerOrd(int size) { + return new SignificantStringTerms.Bucket[size][]; + } + @Override SignificantStringTerms.Bucket[] buildBuckets(int size) { return new SignificantStringTerms.Bucket[size]; @@ -729,19 +797,22 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } @Override - void updateBucket(SignificantStringTerms.Bucket spare, long globalOrd, long bucketOrd, long docCount) throws IOException { - spare.bucketOrd = bucketOrd; - oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes); - spare.subsetDf = docCount; - spare.subsetSize = subsetSize; - spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes); - spare.supersetSize = termsAggFactory.getSupersetNumDocs(); - /* - * During shard-local down-selection we use subset/superset stats - * that are for this shard only. Back at the central reducer these - * properties will be updated with global stats. - */ - spare.updateScore(significanceHeuristic); + BucketUpdater bucketUpdater(long owningBucketOrd) throws IOException { + long subsetSize = subsetSizes.get(owningBucketOrd); + return (spare, globalOrd, bucketOrd, docCount) -> { + spare.bucketOrd = bucketOrd; + oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes); + spare.subsetDf = docCount; + spare.subsetSize = subsetSize; + spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes); + spare.supersetSize = termsAggFactory.getSupersetNumDocs(); + /* + * During shard-local down-selection we use subset/superset stats + * that are for this shard only. Back at the central reducer these + * properties will be updated with global stats. + */ + spare.updateScore(significanceHeuristic); + }; } @Override @@ -755,24 +826,38 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } @Override - void buildSubAggs(SignificantStringTerms.Bucket[] topBuckets) throws IOException { - buildSubAggsForBuckets(topBuckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPreOrd) throws IOException { + buildSubAggsForAllBuckets(topBucketsPreOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); } @Override - SignificantStringTerms buildResult(SignificantStringTerms.Bucket[] topBuckets, long otherDocCount) { - return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, subsetSize, termsAggFactory.getSupersetNumDocs(), significanceHeuristic, Arrays.asList(topBuckets)); + SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) { + return new SignificantStringTerms( + name, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), + format, + subsetSizes.get(owningBucketOrd), + termsAggFactory.getSupersetNumDocs(), + significanceHeuristic, + Arrays.asList(topBuckets) + ); } @Override SignificantStringTerms buildEmptyResult() { - return buildEmptySignificantTermsAggregation(subsetSize, significanceHeuristic); + return buildEmptySignificantTermsAggregation(0, significanceHeuristic); + } + + @Override + SignificantStringTerms buildNoValuesResult(long owningBucketOrdinal) { + return buildEmptySignificantTermsAggregation(subsetSizes.get(owningBucketOrdinal), significanceHeuristic); } @Override public void close() { - termsAggFactory.close(); + Releasables.close(termsAggFactory, subsetSizes); } /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrds.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrds.java index cd49e47afde..aacf102eb9f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrds.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrds.java @@ -40,12 +40,18 @@ public abstract class LongKeyedBucketOrds implements Releasable { private LongKeyedBucketOrds() {} /** - * Add the {@code owningBucketOrd, term} pair. Return the ord for + * Add the {@code owningBucketOrd, value} pair. Return the ord for * their bucket if they have yet to be added, or {@code -1-ord} * if they were already present. */ public abstract long add(long owningBucketOrd, long value); + /** + * Find the {@code owningBucketOrd, value} pair. Return the ord for + * their bucket if they have been added or {@code -1} if they haven't. + */ + public abstract long find(long owningBucketOrd, long value); + /** * Count the buckets in {@code owningBucketOrd}. */ @@ -96,7 +102,6 @@ public abstract class LongKeyedBucketOrds implements Releasable { }; } - /** * Implementation that only works if it is collecting from a single bucket. */ @@ -113,6 +118,12 @@ public abstract class LongKeyedBucketOrds implements Releasable { return ords.add(value); } + @Override + public long find(long owningBucketOrd, long value) { + assert owningBucketOrd == 0; + return ords.find(value); + } + @Override public long bucketsInOrd(long owningBucketOrd) { assert owningBucketOrd == 0; @@ -217,6 +228,22 @@ public abstract class LongKeyedBucketOrds implements Releasable { return buckets; } + @Override + public long find(long owningBucketOrd, long value) { + if (owningBucketOrd >= owningOrdToBuckets.size()) { + return -1; + } + Buckets buckets = owningOrdToBuckets.get(owningBucketOrd); + if (buckets == null) { + return -1; + } + long thisBucketOrd = buckets.valueToThisBucketOrd.find(value); + if (thisBucketOrd < 0) { + return -1; + } + return buckets.thisBucketOrdToGlobalOrd.get(thisBucketOrd); + } + @Override public long bucketsInOrd(long owningBucketOrd) { if (owningBucketOrd >= owningOrdToBuckets.size()) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java index 98d83335999..6925ec85668 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -25,7 +25,7 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.util.BytesRefHash; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.index.fielddata.SortedBinaryDocValues; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregator; @@ -45,6 +45,7 @@ import java.util.Arrays; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; /** * An aggregator of string values that hashes the strings on the fly rather @@ -53,7 +54,7 @@ import java.util.function.Function; public class MapStringTermsAggregator extends AbstractStringTermsAggregator { private final ResultStrategy resultStrategy; private final ValuesSource valuesSource; - private final BytesRefHash bucketOrds; + private final BytesKeyedBucketOrds bucketOrds; private final IncludeExclude.StringFilter includeExclude; public MapStringTermsAggregator( @@ -69,13 +70,14 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { Aggregator parent, SubAggCollectionMode collectionMode, boolean showTermDocCountError, + boolean collectsFromSingleBucket, Map metadata ) throws IOException { super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata); this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job. this.valuesSource = valuesSource; this.includeExclude = includeExclude; - bucketOrds = new BytesRefHash(1, context.bigArrays()); + bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket); } @Override @@ -94,31 +96,31 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { final BytesRefBuilder previous = new BytesRefBuilder(); @Override - public void collect(int doc, long bucket) throws IOException { - assert bucket == 0; - if (values.advanceExact(doc)) { - final int valuesCount = values.docValueCount(); + public void collect(int doc, long owningBucketOrd) throws IOException { + if (false == values.advanceExact(doc)) { + return; + } + int valuesCount = values.docValueCount(); - // SortedBinaryDocValues don't guarantee uniqueness so we - // need to take care of dups - previous.clear(); - for (int i = 0; i < valuesCount; ++i) { - final BytesRef bytes = values.nextValue(); - if (includeExclude != null && false == includeExclude.accept(bytes)) { - continue; - } - if (i > 0 && previous.get().equals(bytes)) { - continue; - } - long bucketOrdinal = bucketOrds.add(bytes); - if (bucketOrdinal < 0) { // already seen - bucketOrdinal = -1 - bucketOrdinal; - collectExistingBucket(sub, doc, bucketOrdinal); - } else { - collectBucket(sub, doc, bucketOrdinal); - } - previous.copyBytes(bytes); + // SortedBinaryDocValues don't guarantee uniqueness so we + // need to take care of dups + previous.clear(); + for (int i = 0; i < valuesCount; ++i) { + final BytesRef bytes = values.nextValue(); + if (includeExclude != null && false == includeExclude.accept(bytes)) { + continue; } + if (i > 0 && previous.get().equals(bytes)) { + continue; + } + long bucketOrdinal = bucketOrds.add(owningBucketOrd, bytes); + if (bucketOrdinal < 0) { // already seen + bucketOrdinal = -1 - bucketOrdinal; + collectExistingBucket(sub, doc, bucketOrdinal); + } else { + collectBucket(sub, doc, bucketOrdinal); + } + previous.copyBytes(bytes); } } }); @@ -131,12 +133,13 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { @Override public InternalAggregation buildEmptyAggregation() { - return buildEmptyTermsAggregation(); + return resultStrategy.buildEmptyResult(); } @Override public void collectDebugInfo(BiConsumer add) { super.collectDebugInfo(add); + add.accept("total_buckets", bucketOrds.size()); add.accept("result_strategy", resultStrategy.describe()); } @@ -153,39 +156,43 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { Releasable { private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; + B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length); + long[] otherDocCounts = new long[owningBucketOrds.length]; + for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { + collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]); + int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); - collectZeroDocEntriesIfNeeded(); - - int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); - - long otherDocCount = 0; - PriorityQueue ordered = buildPriorityQueue(size); - B spare = null; - for (int bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) { - long docCount = bucketDocCount(bucketOrd); - otherDocCount += docCount; - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; + PriorityQueue ordered = buildPriorityQueue(size); + B spare = null; + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); + Supplier emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]); + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts[ordIdx] += docCount; + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } + if (spare == null) { + spare = emptyBucketBuilder.get(); + } + updateBucket(spare, ordsEnum, docCount); + spare = ordered.insertWithOverflow(spare); } - if (spare == null) { - spare = buildEmptyBucket(); + + topBucketsPerOrd[ordIdx] = buildBuckets(ordered.size()); + for (int i = ordered.size() - 1; i >= 0; --i) { + topBucketsPerOrd[ordIdx][i] = ordered.pop(); + otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][i].getDocCount(); + finalizeBucket(topBucketsPerOrd[ordIdx][i]); } - updateBucket(spare, bucketOrd, docCount); - spare = ordered.insertWithOverflow(spare); } - B[] topBuckets = buildBuckets(ordered.size()); - for (int i = ordered.size() - 1; i >= 0; --i) { - topBuckets[i] = ordered.pop(); - otherDocCount -= topBuckets[i].getDocCount(); - finalizeBucket(topBuckets[i]); + buildSubAggs(topBucketsPerOrd); + InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; + for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { + result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]); } - - buildSubAggs(topBuckets); - return new InternalAggregation[] { - buildResult(topBuckets, otherDocCount) - }; + return result; } /** @@ -204,12 +211,12 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { * Collect extra entries for "zero" hit documents if they were requested * and required. */ - abstract void collectZeroDocEntriesIfNeeded() throws IOException; + abstract void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException; /** * Build an empty temporary bucket. */ - abstract B buildEmptyBucket(); + abstract Supplier emptyBucketBuilder(long owningBucketOrd); /** * Build a {@link PriorityQueue} to sort the buckets. After we've @@ -221,7 +228,12 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { * Update fields in {@code spare} to reflect information collected for * this bucket ordinal. */ - abstract void updateBucket(B spare, long bucketOrd, long docCount) throws IOException; + abstract void updateBucket(B spare, BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum, long docCount) throws IOException; + + /** + * Build an array to hold the "top" buckets for each ordinal. + */ + abstract B[][] buildTopBucketsPerOrd(int size); /** * Build an array of buckets for a particular ordinal to collect the @@ -239,12 +251,12 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { * Build the sub-aggregations into the buckets. This will usually * delegate to {@link #buildSubAggsForAllBuckets}. */ - abstract void buildSubAggs(B[] topBuckets) throws IOException; + abstract void buildSubAggs(B[][] topBucketsPerOrd) throws IOException; /** * Turn the buckets into an aggregation result. */ - abstract R buildResult(B[] topBuckets, long otherDocCount); + abstract R buildResult(long owningBucketOrd, long otherDocCount, B[] topBuckets); /** * Build an "empty" result. Only called if there isn't any data on this @@ -268,11 +280,11 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { } @Override - void collectZeroDocEntriesIfNeeded() throws IOException { + void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException { if (bucketCountThresholds.getMinDocCount() != 0) { return; } - if (InternalOrder.isCountDesc(order) && bucketOrds.size() >= bucketCountThresholds.getRequiredSize()) { + if (InternalOrder.isCountDesc(order) && bucketOrds.bucketsInOrd(owningBucketOrd) >= bucketCountThresholds.getRequiredSize()) { return; } // we need to fill-in the blanks @@ -285,7 +297,7 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { for (int i = 0; i < valueCount; ++i) { BytesRef term = values.nextValue(); if (includeExclude == null || includeExclude.accept(term)) { - bucketOrds.add(term); + bucketOrds.add(owningBucketOrd, term); } } } @@ -294,8 +306,8 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { } @Override - StringTerms.Bucket buildEmptyBucket() { - return new StringTerms.Bucket(new BytesRef(), 0, null, showTermDocCountError, 0, format); + Supplier emptyBucketBuilder(long owningBucketOrd) { + return () -> new StringTerms.Bucket(new BytesRef(), 0, null, showTermDocCountError, 0, format); } @Override @@ -304,10 +316,15 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { } @Override - void updateBucket(StringTerms.Bucket spare, long bucketOrd, long docCount) throws IOException { - bucketOrds.get(bucketOrd, spare.termBytes); + void updateBucket(StringTerms.Bucket spare, BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum, long docCount) throws IOException { + ordsEnum.readValue(spare.termBytes); spare.docCount = docCount; - spare.bucketOrd = bucketOrd; + spare.bucketOrd = ordsEnum.ord(); + } + + @Override + StringTerms.Bucket[][] buildTopBucketsPerOrd(int size) { + return new StringTerms.Bucket[size][]; } @Override @@ -326,12 +343,12 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { } @Override - void buildSubAggs(StringTerms.Bucket[] topBuckets) throws IOException { - buildSubAggsForBuckets(topBuckets, b -> b.bucketOrd, (b, a) -> b.aggregations = a); + void buildSubAggs(StringTerms.Bucket[][] topBucketsPerOrd) throws IOException { + buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); } @Override - StringTerms buildResult(StringTerms.Bucket[] topBuckets, long otherDocCount) { + StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bucket[] topBuckets) { return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, Arrays.asList(topBuckets), 0); @@ -354,7 +371,7 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { private final SignificantTermsAggregatorFactory termsAggFactory; private final SignificanceHeuristic significanceHeuristic; - private long subsetSize = 0; + private LongArray subsetSizes = context.bigArrays().newLongArray(1, true); SignificantTermsResults(SignificantTermsAggregatorFactory termsAggFactory, SignificanceHeuristic significanceHeuristic) { this.termsAggFactory = termsAggFactory; @@ -372,17 +389,19 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { @Override public void collect(int doc, long owningBucketOrd) throws IOException { super.collect(doc, owningBucketOrd); - subsetSize++; + subsetSizes = context.bigArrays().grow(subsetSizes, owningBucketOrd + 1); + subsetSizes.increment(owningBucketOrd, 1); } }; } @Override - void collectZeroDocEntriesIfNeeded() throws IOException {} + void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException {} @Override - SignificantStringTerms.Bucket buildEmptyBucket() { - return new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0); + Supplier emptyBucketBuilder(long owningBucketOrd) { + long subsetSize = subsetSizes.get(owningBucketOrd); + return () -> new SignificantStringTerms.Bucket(new BytesRef(), 0, subsetSize, 0, 0, null, format, 0); } @Override @@ -391,11 +410,12 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { } @Override - void updateBucket(SignificantStringTerms.Bucket spare, long bucketOrd, long docCount) throws IOException { - bucketOrds.get(bucketOrd, spare.termBytes); + void updateBucket(SignificantStringTerms.Bucket spare, BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum, long docCount) + throws IOException { + + ordsEnum.readValue(spare.termBytes); + spare.bucketOrd = ordsEnum.ord(); spare.subsetDf = docCount; - spare.bucketOrd = bucketOrd; - spare.subsetSize = subsetSize; spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes); spare.supersetSize = termsAggFactory.getSupersetNumDocs(); /* @@ -406,6 +426,11 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { spare.updateScore(significanceHeuristic); } + @Override + SignificantStringTerms.Bucket[][] buildTopBucketsPerOrd(int size) { + return new SignificantStringTerms.Bucket[size][]; + } + @Override SignificantStringTerms.Bucket[] buildBuckets(int size) { return new SignificantStringTerms.Bucket[size]; @@ -422,25 +447,33 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { } @Override - void buildSubAggs(SignificantStringTerms.Bucket[] topBuckets) throws IOException { - buildSubAggsForBuckets(topBuckets, b -> b.bucketOrd, (b, a) -> b.aggregations = a); + void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPerOrd) throws IOException { + buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); } @Override - SignificantStringTerms buildResult(SignificantStringTerms.Bucket[] topBuckets, long otherDocCount) { - return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), + SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) { + return new SignificantStringTerms( + name, + bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, subsetSize, termsAggFactory.getSupersetNumDocs(), significanceHeuristic, Arrays.asList(topBuckets)); + metadata(), + format, + subsetSizes.get(owningBucketOrd), + termsAggFactory.getSupersetNumDocs(), + significanceHeuristic, + Arrays.asList(topBuckets) + ); } @Override SignificantStringTerms buildEmptyResult() { - return buildEmptySignificantTermsAggregation(subsetSize, significanceHeuristic); + return buildEmptySignificantTermsAggregation(0, significanceHeuristic); } @Override public void close() { - termsAggFactory.close(); + Releasables.close(termsAggFactory, subsetSizes); } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index 50e43d99e15..0cfc5d993bc 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -251,7 +251,7 @@ public class NumericTermsAggregator extends TermsAggregator { * Collect extra entries for "zero" hit documents if they were requested * and required. */ - abstract void collectZeroDocEntriesIfNeeded(long ord) throws IOException; + abstract void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException; /** * Turn the buckets into an aggregation result. @@ -296,11 +296,11 @@ public class NumericTermsAggregator extends TermsAggregator { abstract B buildEmptyBucket(); @Override - final void collectZeroDocEntriesIfNeeded(long ord) throws IOException { + final void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException { if (bucketCountThresholds.getMinDocCount() != 0) { return; } - if (InternalOrder.isCountDesc(order) && bucketOrds.bucketsInOrd(ord) >= bucketCountThresholds.getRequiredSize()) { + if (InternalOrder.isCountDesc(order) && bucketOrds.bucketsInOrd(owningBucketOrd) >= bucketCountThresholds.getRequiredSize()) { return; } // we need to fill-in the blanks @@ -312,7 +312,7 @@ public class NumericTermsAggregator extends TermsAggregator { for (int v = 0; v < valueCount; ++v) { long value = values.nextValue(); if (longFilter == null || longFilter.accept(value)) { - bucketOrds.add(ord, value); + bucketOrds.add(owningBucketOrd, value); } } } @@ -546,10 +546,10 @@ public class NumericTermsAggregator extends TermsAggregator { } @Override - void collectZeroDocEntriesIfNeeded(long ord) throws IOException {} + void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException {} @Override - SignificantLongTerms buildResult(long owningBucketOrd, long otherDocCounts, SignificantLongTerms.Bucket[] topBuckets) { + SignificantLongTerms buildResult(long owningBucketOrd, long otherDocCoun, SignificantLongTerms.Bucket[] topBuckets) { return new SignificantLongTerms( name, bucketCountThresholds.getRequiredSize(), diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java index 5542fec6bd0..6e89707b26b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java @@ -106,7 +106,6 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac boolean collectsFromSingleBucket, Map metadata) throws IOException { - assert collectsFromSingleBucket; ExecutionMode execution = null; if (executionHint != null) { execution = ExecutionMode.fromString(executionHint, deprecationLogger); @@ -125,12 +124,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac } return execution.create(name, factories, valuesSource, format, bucketCountThresholds, includeExclude, context, parent, - significanceHeuristic, sigTermsFactory, metadata); - } - - @Override - public boolean needsToCollectFromSingleBucket() { - return true; + significanceHeuristic, sigTermsFactory, collectsFromSingleBucket, metadata); } }; } @@ -177,11 +171,6 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac numericValuesSource, format, null, bucketCountThresholds, context, parent, SubAggCollectionMode.BREADTH_FIRST, longFilter, collectsFromSingleBucket, metadata); } - - @Override - public boolean needsToCollectFromSingleBucket() { - return false; - } }; } @@ -316,9 +305,6 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac aggregatorSupplier.getClass().toString() + "]"); } SignificantTermsAggregatorSupplier sigTermsAggregatorSupplier = (SignificantTermsAggregatorSupplier) aggregatorSupplier; - if (collectsFromSingleBucket == false && sigTermsAggregatorSupplier.needsToCollectFromSingleBucket()) { - return asMultiBucketAggregator(this, searchContext, parent); - } numberOfAggregatorsCreated++; BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds); @@ -359,6 +345,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac Aggregator parent, SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory termsAggregatorFactory, + boolean collectsFromSingleBucket, Map metadata) throws IOException { final IncludeExclude.StringFilter filter = includeExclude == null ? null : includeExclude.convertToStringFilter(format); @@ -375,6 +362,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac parent, SubAggCollectionMode.BREADTH_FIRST, false, + collectsFromSingleBucket, metadata ); @@ -394,6 +382,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac Aggregator parent, SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory termsAggregatorFactory, + boolean collectsFromSingleBucket, Map metadata) throws IOException { final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format); @@ -424,6 +413,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac remapGlobalOrd, SubAggCollectionMode.BREADTH_FIRST, false, + collectsFromSingleBucket, metadata ); } @@ -458,6 +448,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac Aggregator parent, SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory termsAggregatorFactory, + boolean collectsFromSingleBucket, Map metadata) throws IOException; @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java index 24f630ca4b6..69455f884af 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java @@ -43,6 +43,4 @@ interface SignificantTermsAggregatorSupplier extends AggregatorSupplier { SignificantTermsAggregatorFactory sigTermsFactory, boolean collectsFromSingleBucket, Map metadata) throws IOException; - - boolean needsToCollectFromSingleBucket(); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index bf2789ffd59..e7d9e5b5a8e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -83,8 +83,6 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { boolean showTermDocCountError, boolean collectsFromSingleBucket, Map metadata) throws IOException { - assert collectsFromSingleBucket; - ExecutionMode execution = null; if (executionHint != null) { execution = ExecutionMode.fromString(executionHint); @@ -110,14 +108,9 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { // TODO: [Zach] we might want refactor and remove ExecutionMode#create(), moving that logic outside the enum return execution.create(name, factories, valuesSource, order, format, bucketCountThresholds, includeExclude, - context, parent, subAggCollectMode, showTermDocCountError, metadata); + context, parent, subAggCollectMode, showTermDocCountError, collectsFromSingleBucket, metadata); } - - @Override - public boolean needsToCollectFromSingleBucket() { - return true; - } }; } @@ -170,11 +163,6 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { return new NumericTermsAggregator(name, factories, resultStrategy, numericValuesSource, format, order, bucketCountThresholds, context, parent, subAggCollectMode, longFilter, collectsFromSingleBucket, metadata); } - - @Override - public boolean needsToCollectFromSingleBucket() { - return false; - } }; } @@ -248,10 +236,6 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { } TermsAggregatorSupplier termsAggregatorSupplier = (TermsAggregatorSupplier) aggregatorSupplier; - if (collectsFromSingleBucket == false && termsAggregatorSupplier.needsToCollectFromSingleBucket()) { - return asMultiBucketAggregator(this, searchContext, parent); - } - BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds); if (InternalOrder.isKeyOrder(order) == false && bucketCountThresholds.getShardSize() == TermsAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) { @@ -322,6 +306,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { Aggregator parent, SubAggCollectionMode subAggCollectMode, boolean showTermDocCountError, + boolean collectsFromSingleBucket, Map metadata) throws IOException { final IncludeExclude.StringFilter filter = includeExclude == null ? null : includeExclude.convertToStringFilter(format); return new MapStringTermsAggregator( @@ -337,6 +322,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { parent, subAggCollectMode, showTermDocCountError, + collectsFromSingleBucket, metadata ); } @@ -354,6 +340,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { SearchContext context, Aggregator parent, SubAggCollectionMode subAggCollectMode, boolean showTermDocCountError, + boolean collectsFromSingleBucket, Map metadata) throws IOException { final long maxOrd = getMaxOrd(valuesSource, context.searcher()); @@ -385,8 +372,13 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { } final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format); boolean remapGlobalOrds; - if (REMAP_GLOBAL_ORDS != null) { - // We use REMAP_GLOBAL_ORDS to allow tests to force specific optimizations + if (collectsFromSingleBucket && REMAP_GLOBAL_ORDS != null) { + /* + * We use REMAP_GLOBAL_ORDS to allow tests to force + * specific optimizations but this particular one + * is only possible if we're collecting from a single + * bucket. + */ remapGlobalOrds = REMAP_GLOBAL_ORDS.booleanValue(); } else { remapGlobalOrds = true; @@ -418,6 +410,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { remapGlobalOrds, subAggCollectMode, showTermDocCountError, + collectsFromSingleBucket, metadata ); } @@ -451,6 +444,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { Aggregator parent, SubAggCollectionMode subAggCollectMode, boolean showTermDocCountError, + boolean collectsFromSingleBucket, Map metadata) throws IOException; @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorSupplier.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorSupplier.java index e862e8f4fd5..8f91178dbe3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorSupplier.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorSupplier.java @@ -44,6 +44,4 @@ interface TermsAggregatorSupplier extends AggregatorSupplier { boolean showTermDocCountError, boolean collectsFromSingleBucket, Map metadata) throws IOException; - - boolean needsToCollectFromSingleBucket(); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/BytesKeyedBucketOrdsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/BytesKeyedBucketOrdsTests.java new file mode 100644 index 00000000000..b2dd644598d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/BytesKeyedBucketOrdsTests.java @@ -0,0 +1,200 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.terms; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.ESTestCase; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; + +public class BytesKeyedBucketOrdsTests extends ESTestCase { + private static final BytesRef SHIP_1 = new BytesRef("Just Read The Instructions"); + private static final BytesRef SHIP_2 = new BytesRef("Of Course I Still Love You"); + + private final MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + + public void testExplicitCollectsFromSingleBucket() { + collectsFromSingleBucketCase(BytesKeyedBucketOrds.build(bigArrays, true)); + } + + public void testSurpriseCollectsFromSingleBucket() { + collectsFromSingleBucketCase(BytesKeyedBucketOrds.build(bigArrays, false)); + } + + private void collectsFromSingleBucketCase(BytesKeyedBucketOrds ords) { + try { + // Test a few explicit values + assertThat(ords.add(0, SHIP_1), equalTo(0L)); + assertThat(ords.add(0, SHIP_2), equalTo(1L)); + assertThat(ords.add(0, SHIP_1), equalTo(-1L)); + assertThat(ords.add(0, SHIP_2), equalTo(-2L)); + + // And some random values + Set seen = new HashSet<>(); + seen.add(SHIP_1); + seen.add(SHIP_2); + assertThat(ords.size(), equalTo(2L)); + BytesRef[] values = new BytesRef[scaledRandomIntBetween(1, 10000)]; + for (int i = 0; i < values.length; i++) { + values[i] = randomValueOtherThanMany(seen::contains, () -> new BytesRef(Long.toString(randomLong()))); + seen.add(values[i]); + } + for (int i = 0; i < values.length; i++) { + assertThat(ords.add(0, values[i]), equalTo(i + 2L)); + assertThat(ords.size(), equalTo(i + 3L)); + if (randomBoolean()) { + assertThat(ords.add(0, SHIP_1), equalTo(-1L)); + } + } + for (int i = 0; i < values.length; i++) { + assertThat(ords.add(0, values[i]), equalTo(-1 - (i + 2L))); + } + + // And the explicit values are still ok + assertThat(ords.add(0, SHIP_1), equalTo(-1L)); + assertThat(ords.add(0, SHIP_2), equalTo(-2L)); + + // Check counting values + assertThat(ords.bucketsInOrd(0), equalTo(values.length + 2L)); + + // Check iteration + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = ords.ordsEnum(0); + BytesRef scratch = new BytesRef(); + assertTrue(ordsEnum.next()); + assertThat(ordsEnum.ord(), equalTo(0L)); + ordsEnum.readValue(scratch); + assertThat(scratch, equalTo(SHIP_1)); + assertTrue(ordsEnum.next()); + assertThat(ordsEnum.ord(), equalTo(1L)); + ordsEnum.readValue(scratch); + assertThat(scratch, equalTo(SHIP_2)); + for (int i = 0; i < values.length; i++) { + assertTrue(ordsEnum.next()); + assertThat(ordsEnum.ord(), equalTo(i + 2L)); + ordsEnum.readValue(scratch); + assertThat(scratch, equalTo(values[i])); + } + assertFalse(ordsEnum.next()); + } finally { + ords.close(); + } + } + + public void testCollectsFromManyBuckets() { + try (BytesKeyedBucketOrds ords = BytesKeyedBucketOrds.build(bigArrays, false)) { + // Test a few explicit values + assertThat(ords.add(0, SHIP_1), equalTo(0L)); + assertThat(ords.add(1, SHIP_1), equalTo(1L)); + assertThat(ords.add(0, SHIP_1), equalTo(-1L)); + assertThat(ords.add(1, SHIP_1), equalTo(-2L)); + assertThat(ords.size(), equalTo(2L)); + + // And some random values + Set seen = new HashSet<>(); + seen.add(new OwningBucketOrdAndValue(0, SHIP_1)); + seen.add(new OwningBucketOrdAndValue(1, SHIP_1)); + OwningBucketOrdAndValue[] values = new OwningBucketOrdAndValue[scaledRandomIntBetween(1, 10000)]; + long maxOwningBucketOrd = scaledRandomIntBetween(0, values.length); + for (int i = 0; i < values.length; i++) { + values[i] = randomValueOtherThanMany(seen::contains, () -> + new OwningBucketOrdAndValue(randomLongBetween(0, maxOwningBucketOrd), new BytesRef(Long.toString(randomLong())))); + seen.add(values[i]); + } + for (int i = 0; i < values.length; i++) { + assertThat(ords.add(values[i].owningBucketOrd, values[i].value), equalTo(i + 2L)); + assertThat(ords.size(), equalTo(i + 3L)); + if (randomBoolean()) { + assertThat(ords.add(0, SHIP_1), equalTo(-1L)); + } + } + for (int i = 0; i < values.length; i++) { + assertThat(ords.add(values[i].owningBucketOrd, values[i].value), equalTo(-1 - (i + 2L))); + } + + // And the explicit values are still ok + assertThat(ords.add(0, SHIP_1), equalTo(-1L)); + assertThat(ords.add(1, SHIP_1), equalTo(-2L)); + + BytesRef scratch = new BytesRef(); + for (long owningBucketOrd = 0; owningBucketOrd <= maxOwningBucketOrd; owningBucketOrd++) { + long expectedCount = 0; + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = ords.ordsEnum(owningBucketOrd); + if (owningBucketOrd <= 1) { + expectedCount++; + assertTrue(ordsEnum.next()); + assertThat(ordsEnum.ord(), equalTo(owningBucketOrd)); + ordsEnum.readValue(scratch); + assertThat(scratch, equalTo(SHIP_1)); + } + for (int i = 0; i < values.length; i++) { + if (values[i].owningBucketOrd == owningBucketOrd) { + expectedCount++; + assertTrue(ordsEnum.next()); + assertThat(ordsEnum.ord(), equalTo(i + 2L)); + ordsEnum.readValue(scratch); + assertThat(scratch, equalTo(values[i].value)); + } + } + assertFalse(ordsEnum.next()); + + assertThat(ords.bucketsInOrd(owningBucketOrd), equalTo(expectedCount)); + } + assertFalse(ords.ordsEnum(randomLongBetween(maxOwningBucketOrd + 1, Long.MAX_VALUE)).next()); + assertThat(ords.bucketsInOrd(randomLongBetween(maxOwningBucketOrd + 1, Long.MAX_VALUE)), equalTo(0L)); + } + } + + private class OwningBucketOrdAndValue { + private final long owningBucketOrd; + private final BytesRef value; + + OwningBucketOrdAndValue(long owningBucketOrd, BytesRef value) { + this.owningBucketOrd = owningBucketOrd; + this.value = value; + } + + @Override + public String toString() { + return owningBucketOrd + "/" + value; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; + } + OwningBucketOrdAndValue other = (OwningBucketOrdAndValue) obj; + return owningBucketOrd == other.owningBucketOrd && value == other.value; + } + + @Override + public int hashCode() { + return Objects.hash(owningBucketOrd, value); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrdsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrdsTests.java index 99cab4dda80..3863689c4f5 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrdsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrdsTests.java @@ -49,6 +49,8 @@ public class LongKeyedBucketOrdsTests extends ESTestCase { assertThat(ords.add(0, 1000), equalTo(1L)); assertThat(ords.add(0, 0), equalTo(-1L)); assertThat(ords.add(0, 1000), equalTo(-2L)); + assertThat(ords.find(0, 0), equalTo(0L)); + assertThat(ords.find(0, 1000), equalTo(1L)); // And some random values Set seen = new HashSet<>(); @@ -61,7 +63,9 @@ public class LongKeyedBucketOrdsTests extends ESTestCase { seen.add(values[i]); } for (int i = 0; i < values.length; i++) { + assertThat(ords.find(0, values[i]), equalTo(-1L)); assertThat(ords.add(0, values[i]), equalTo(i + 2L)); + assertThat(ords.find(0, values[i]), equalTo(i + 2L)); assertThat(ords.size(), equalTo(i + 3L)); if (randomBoolean()) { assertThat(ords.add(0, 0), equalTo(-1L)); @@ -79,19 +83,19 @@ public class LongKeyedBucketOrdsTests extends ESTestCase { assertThat(ords.bucketsInOrd(0), equalTo(values.length + 2L)); // Check iteration - LongKeyedBucketOrds.BucketOrdsEnum ordEnum = ords.ordsEnum(0); - assertTrue(ordEnum.next()); - assertThat(ordEnum.ord(), equalTo(0L)); - assertThat(ordEnum.value(), equalTo(0L)); - assertTrue(ordEnum.next()); - assertThat(ordEnum.ord(), equalTo(1L)); - assertThat(ordEnum.value(), equalTo(1000L)); + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = ords.ordsEnum(0); + assertTrue(ordsEnum.next()); + assertThat(ordsEnum.ord(), equalTo(0L)); + assertThat(ordsEnum.value(), equalTo(0L)); + assertTrue(ordsEnum.next()); + assertThat(ordsEnum.ord(), equalTo(1L)); + assertThat(ordsEnum.value(), equalTo(1000L)); for (int i = 0; i < values.length; i++) { - assertTrue(ordEnum.next()); - assertThat(ordEnum.ord(), equalTo(i + 2L)); - assertThat(ordEnum.value(), equalTo(values[i])); + assertTrue(ordsEnum.next()); + assertThat(ordsEnum.ord(), equalTo(i + 2L)); + assertThat(ordsEnum.value(), equalTo(values[i])); } - assertFalse(ordEnum.next()); + assertFalse(ordsEnum.next()); } finally { ords.close(); } @@ -105,6 +109,8 @@ public class LongKeyedBucketOrdsTests extends ESTestCase { assertThat(ords.add(0, 0), equalTo(-1L)); assertThat(ords.add(1, 0), equalTo(-2L)); assertThat(ords.size(), equalTo(2L)); + assertThat(ords.find(0, 0), equalTo(0L)); + assertThat(ords.find(1, 0), equalTo(1L)); // And some random values Set seen = new HashSet<>(); @@ -118,7 +124,9 @@ public class LongKeyedBucketOrdsTests extends ESTestCase { seen.add(values[i]); } for (int i = 0; i < values.length; i++) { + assertThat(ords.find(values[i].owningBucketOrd, values[i].value), equalTo(-1L)); assertThat(ords.add(values[i].owningBucketOrd, values[i].value), equalTo(i + 2L)); + assertThat(ords.find(values[i].owningBucketOrd, values[i].value), equalTo(i + 2L)); assertThat(ords.size(), equalTo(i + 3L)); if (randomBoolean()) { assertThat(ords.add(0, 0), equalTo(-1L)); @@ -135,22 +143,22 @@ public class LongKeyedBucketOrdsTests extends ESTestCase { for (long owningBucketOrd = 0; owningBucketOrd <= maxOwningBucketOrd; owningBucketOrd++) { long expectedCount = 0; - LongKeyedBucketOrds.BucketOrdsEnum ordEnum = ords.ordsEnum(owningBucketOrd); + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = ords.ordsEnum(owningBucketOrd); if (owningBucketOrd <= 1) { expectedCount++; - assertTrue(ordEnum.next()); - assertThat(ordEnum.ord(), equalTo(owningBucketOrd)); - assertThat(ordEnum.value(), equalTo(0L)); + assertTrue(ordsEnum.next()); + assertThat(ordsEnum.ord(), equalTo(owningBucketOrd)); + assertThat(ordsEnum.value(), equalTo(0L)); } for (int i = 0; i < values.length; i++) { if (values[i].owningBucketOrd == owningBucketOrd) { expectedCount++; - assertTrue(ordEnum.next()); - assertThat(ordEnum.ord(), equalTo(i + 2L)); - assertThat(ordEnum.value(), equalTo(values[i].value)); + assertTrue(ordsEnum.next()); + assertThat(ordsEnum.ord(), equalTo(i + 2L)); + assertThat(ordsEnum.value(), equalTo(values[i].value)); } } - assertFalse(ordEnum.next()); + assertFalse(ordsEnum.next()); assertThat(ords.bucketsInOrd(owningBucketOrd), equalTo(expectedCount)); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java index d7235c08f28..a372afc54b5 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java @@ -1307,6 +1307,54 @@ public class TermsAggregatorTests extends AggregatorTestCase { }, fieldType); } + public void testThreeLayerStringViaGlobalOrds() throws IOException { + threeLayerStringTestCase("global_ordinals"); + } + + public void testThreeLayerStringViaMap() throws IOException { + threeLayerStringTestCase("map"); + } + + private void threeLayerStringTestCase(String executionHint) throws IOException { + try (Directory dir = newDirectory()) { + try (RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) { + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + for (int k = 0; k < 10; k++) { + Document d = new Document(); + d.add(new SortedDocValuesField("i", new BytesRef(Integer.toString(i)))); + d.add(new SortedDocValuesField("j", new BytesRef(Integer.toString(j)))); + d.add(new SortedDocValuesField("k", new BytesRef(Integer.toString(k)))); + writer.addDocument(d); + } + } + } + try (IndexReader reader = maybeWrapReaderEs(writer.getReader())) { + IndexSearcher searcher = newIndexSearcher(reader); + TermsAggregationBuilder request = new TermsAggregationBuilder("i").field("i").executionHint(executionHint) + .subAggregation(new TermsAggregationBuilder("j").field("j").executionHint(executionHint) + .subAggregation(new TermsAggregationBuilder("k").field("k").executionHint(executionHint))); + StringTerms result = search(searcher, new MatchAllDocsQuery(), request, + keywordField("i"), keywordField("j"), keywordField("k")); + for (int i = 0; i < 10; i++) { + StringTerms.Bucket iBucket = result.getBucketByKey(Integer.toString(i)); + assertThat(iBucket.getDocCount(), equalTo(100L)); + StringTerms jAgg = iBucket.getAggregations().get("j"); + for (int j = 0; j < 10; j++) { + StringTerms.Bucket jBucket = jAgg.getBucketByKey(Integer.toString(j)); + assertThat(jBucket.getDocCount(), equalTo(10L)); + StringTerms kAgg = jBucket.getAggregations().get("k"); + for (int k = 0; k < 10; k++) { + StringTerms.Bucket kBucket = kAgg.getBucketByKey(Integer.toString(k)); + assertThat(kBucket.getDocCount(), equalTo(1L)); + } + } + } + } + } + } + } + public void testThreeLayerLong() throws IOException { try (Directory dir = newDirectory()) { try (RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) {