Refactor children aggregator into a generic ParentJoinAggregator (#34845)

This commit adds a new ParentJoinAggregator that implements a join using global ordinals
in a way that can be reused by the `children` and the upcoming `parent` aggregation.
This new aggregator is a refactor of the existing ParentToChildrenAggregator with two main changes:
* It uses a dense bit array instead of a long array when the aggregation does not have any parent.
* It uses a single aggregator per bucket if it is nested under another aggregation.
For the latter case we use a `MultiBucketAggregatorWrapper` in the factory in order to ensure that each
instance of the aggregator handles a single bucket. This is more inlined with the strategy we use for other
aggregations like `terms` aggregation for instance since the number of buckets to handle should be low (thanks to the breadth_first strategy).
This change is also required for #34210 which adds the `parent` aggregation in the parent-join module.

Relates #34508
This commit is contained in:
Jim Ferenczi 2018-10-26 16:26:45 +02:00 committed by GitHub
parent 5c2c1f44c8
commit 1b879ea8ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 209 additions and 162 deletions

View File

@ -35,39 +35,49 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
public class ChildrenAggregatorFactory
extends ValuesSourceAggregatorFactory<WithOrdinals, ChildrenAggregatorFactory> {
public class ChildrenAggregatorFactory extends ValuesSourceAggregatorFactory<WithOrdinals, ChildrenAggregatorFactory> {
private final Query parentFilter;
private final Query childFilter;
public ChildrenAggregatorFactory(String name, ValuesSourceConfig<WithOrdinals> config,
Query childFilter, Query parentFilter, SearchContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
public ChildrenAggregatorFactory(String name,
ValuesSourceConfig<WithOrdinals> config,
Query childFilter,
Query parentFilter,
SearchContext context,
AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metaData) throws IOException {
super(name, config, context, parent, subFactoriesBuilder, metaData);
this.childFilter = childFilter;
this.parentFilter = parentFilter;
}
@Override
protected Aggregator createUnmapped(Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
protected Aggregator createUnmapped(Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
return new NonCollectingAggregator(name, context, parent, pipelineAggregators, metaData) {
@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalChildren(name, 0, buildEmptySubAggregations(), pipelineAggregators(), metaData());
}
};
}
@Override
protected Aggregator doCreateInternal(WithOrdinals valuesSource, Aggregator parent,
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
protected Aggregator doCreateInternal(WithOrdinals valuesSource,
Aggregator parent,
boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
long maxOrd = valuesSource.globalMaxOrd(context.searcher());
return new ParentToChildrenAggregator(name, factories, context, parent, childFilter,
parentFilter, valuesSource, maxOrd, pipelineAggregators, metaData);
if (collectsFromSingleBucket) {
return new ParentToChildrenAggregator(name, factories, context, parent, childFilter,
parentFilter, valuesSource, maxOrd, pipelineAggregators, metaData);
} else {
return asMultiBucketAggregator(this, context, parent);
}
}
}

View File

@ -0,0 +1,173 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.join.aggregations;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* An aggregator that joins documents based on global ordinals.
* Global ordinals that match the main query and the <code>inFilter</code> query are replayed
* with documents matching the <code>outFilter</code> query.
*/
public abstract class ParentJoinAggregator extends BucketsAggregator implements SingleBucketAggregator {
private final Weight inFilter;
private final Weight outFilter;
private final ValuesSource.Bytes.WithOrdinals valuesSource;
private final boolean singleAggregator;
/**
* If this aggregator is nested under another aggregator we allocate a long hash per bucket.
*/
private final LongHash ordsHash;
/**
* Otherwise we use a dense bit array to record the global ordinals.
*/
private final BitArray ordsBit;
public ParentJoinAggregator(String name,
AggregatorFactories factories,
SearchContext context,
Aggregator parent,
Query inFilter,
Query outFilter,
ValuesSource.Bytes.WithOrdinals valuesSource,
long maxOrd,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
super(name, factories, context, parent, pipelineAggregators, metaData);
if (maxOrd > Integer.MAX_VALUE) {
throw new IllegalStateException("the number of parent [" + maxOrd + "] + is greater than the allowed limit " +
"for this aggregation: " + Integer.MAX_VALUE);
}
// these two filters are cached in the parser
this.inFilter = context.searcher().createWeight(context.searcher().rewrite(inFilter), ScoreMode.COMPLETE_NO_SCORES, 1f);
this.outFilter = context.searcher().createWeight(context.searcher().rewrite(outFilter), ScoreMode.COMPLETE_NO_SCORES, 1f);
this.valuesSource = valuesSource;
this.singleAggregator = parent == null;
this.ordsBit = singleAggregator ? new BitArray((int) maxOrd, context.bigArrays()) : null;
this.ordsHash = singleAggregator ? null : new LongHash(1, context.bigArrays());
}
private void addGlobalOrdinal(int globalOrdinal) {
if (singleAggregator) {
ordsBit.set(globalOrdinal);
} else {
ordsHash.add(globalOrdinal);
}
}
private boolean existsGlobalOrdinal(int globalOrdinal) {
return singleAggregator ? ordsBit.get(globalOrdinal): ordsHash.find(globalOrdinal) >= 0;
}
@Override
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
final Bits parentDocs = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), inFilter.scorerSupplier(ctx));
return new LeafBucketCollector() {
@Override
public void collect(int docId, long bucket) throws IOException {
assert bucket == 0;
if (parentDocs.get(docId) && globalOrdinals.advanceExact(docId)) {
int globalOrdinal = (int) globalOrdinals.nextOrd();
assert globalOrdinal != -1 && globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
addGlobalOrdinal(globalOrdinal);
}
}
};
}
@Override
protected final void doPostCollection() throws IOException {
IndexReader indexReader = context().searcher().getIndexReader();
for (LeafReaderContext ctx : indexReader.leaves()) {
Scorer childDocsScorer = outFilter.scorer(ctx);
if (childDocsScorer == null) {
continue;
}
DocIdSetIterator childDocsIter = childDocsScorer.iterator();
final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx);
final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
// Set the scorer, since we now replay only the child docIds
sub.setScorer(new Scorable() {
@Override
public float score() {
return 1f;
}
@Override
public int docID() {
return childDocsIter.docID();
}
});
final Bits liveDocs = ctx.reader().getLiveDocs();
for (int docId = childDocsIter.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = childDocsIter.nextDoc()) {
if (liveDocs != null && liveDocs.get(docId) == false) {
continue;
}
if (globalOrdinals.advanceExact(docId)) {
int globalOrdinal = (int) globalOrdinals.nextOrd();
assert globalOrdinal != -1 && globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
if (existsGlobalOrdinal(globalOrdinal)) {
collectBucket(sub, docId, 0);
}
}
}
}
}
@Override
protected void doClose() {
Releasables.close(ordsBit, ordsHash);
}
}

View File

@ -18,73 +18,28 @@
*/
package org.elasticsearch.join.aggregations;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongObjectPagedHashMap;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
// The RecordingPerReaderBucketCollector assumes per segment recording which isn't the case for this
// aggregation, for this reason that collector can't be used
public class ParentToChildrenAggregator extends BucketsAggregator implements SingleBucketAggregator {
public class ParentToChildrenAggregator extends ParentJoinAggregator {
static final ParseField TYPE_FIELD = new ParseField("type");
private final Weight childFilter;
private final Weight parentFilter;
private final ValuesSource.Bytes.WithOrdinals valuesSource;
// Maybe use PagedGrowableWriter? This will be less wasteful than LongArray,
// but then we don't have the reuse feature of BigArrays.
// Also if we know the highest possible value that a parent agg will create
// then we store multiple values into one slot
private final LongArray parentOrdToBuckets;
// Only pay the extra storage price if the a parentOrd has multiple buckets
// Most of the times a parent doesn't have multiple buckets, since there is
// only one document per parent ord,
// only in the case of terms agg if a parent doc has multiple terms per
// field this is needed:
private final LongObjectPagedHashMap<long[]> parentOrdToOtherBuckets;
private boolean multipleBucketsPerParentOrd = false;
public ParentToChildrenAggregator(String name, AggregatorFactories factories,
SearchContext context, Aggregator parent, Query childFilter,
Query parentFilter, ValuesSource.Bytes.WithOrdinals valuesSource,
long maxOrd, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
super(name, factories, context, parent, pipelineAggregators, metaData);
// these two filters are cached in the parser
this.childFilter = context.searcher().createWeight(context.searcher().rewrite(childFilter), ScoreMode.COMPLETE_NO_SCORES, 1f);
this.parentFilter = context.searcher().createWeight(context.searcher().rewrite(parentFilter), ScoreMode.COMPLETE_NO_SCORES, 1f);
this.parentOrdToBuckets = context.bigArrays().newLongArray(maxOrd, false);
this.parentOrdToBuckets.fill(0, maxOrd, -1);
this.parentOrdToOtherBuckets = new LongObjectPagedHashMap<>(context.bigArrays());
this.valuesSource = valuesSource;
long maxOrd, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, factories, context, parent, parentFilter, childFilter, valuesSource, maxOrd, pipelineAggregators, metaData);
}
@Override
@ -99,96 +54,4 @@ public class ParentToChildrenAggregator extends BucketsAggregator implements Sin
metaData());
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
final Bits parentDocs = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), parentFilter.scorerSupplier(ctx));
return new LeafBucketCollector() {
@Override
public void collect(int docId, long bucket) throws IOException {
if (parentDocs.get(docId) && globalOrdinals.advanceExact(docId)) {
long globalOrdinal = globalOrdinals.nextOrd();
assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
if (globalOrdinal != -1) {
if (parentOrdToBuckets.get(globalOrdinal) == -1) {
parentOrdToBuckets.set(globalOrdinal, bucket);
} else {
long[] bucketOrds = parentOrdToOtherBuckets.get(globalOrdinal);
if (bucketOrds != null) {
bucketOrds = Arrays.copyOf(bucketOrds, bucketOrds.length + 1);
bucketOrds[bucketOrds.length - 1] = bucket;
parentOrdToOtherBuckets.put(globalOrdinal, bucketOrds);
} else {
parentOrdToOtherBuckets.put(globalOrdinal, new long[] { bucket });
}
multipleBucketsPerParentOrd = true;
}
}
}
}
};
}
@Override
protected void doPostCollection() throws IOException {
IndexReader indexReader = context().searcher().getIndexReader();
for (LeafReaderContext ctx : indexReader.leaves()) {
Scorer childDocsScorer = childFilter.scorer(ctx);
if (childDocsScorer == null) {
continue;
}
DocIdSetIterator childDocsIter = childDocsScorer.iterator();
final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx);
final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
// Set the scorer, since we now replay only the child docIds
sub.setScorer(new Scorable() {
@Override
public float score() {
return 1f;
}
@Override
public int docID() {
return childDocsIter.docID();
}
});
final Bits liveDocs = ctx.reader().getLiveDocs();
for (int docId = childDocsIter
.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = childDocsIter
.nextDoc()) {
if (liveDocs != null && liveDocs.get(docId) == false) {
continue;
}
if (globalOrdinals.advanceExact(docId)) {
long globalOrdinal = globalOrdinals.nextOrd();
assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
long bucketOrd = parentOrdToBuckets.get(globalOrdinal);
if (bucketOrd != -1) {
collectBucket(sub, docId, bucketOrd);
if (multipleBucketsPerParentOrd) {
long[] otherBucketOrds = parentOrdToOtherBuckets.get(globalOrdinal);
if (otherBucketOrds != null) {
for (long otherBucketOrd : otherBucketOrds) {
collectBucket(sub, docId, otherBucketOrd);
}
}
}
}
}
}
}
}
@Override
protected void doClose() {
Releasables.close(parentOrdToBuckets, parentOrdToOtherBuckets);
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
package org.elasticsearch.common.util;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
@ -30,11 +30,11 @@ import org.elasticsearch.common.util.LongArray;
* The underlying long array grows lazily based on the biggest index
* that needs to be set.
*/
final class BitArray implements Releasable {
public final class BitArray implements Releasable {
private final BigArrays bigArrays;
private LongArray bits;
BitArray(BigArrays bigArrays, int initialSize) {
public BitArray(int initialSize, BigArrays bigArrays) {
this.bigArrays = bigArrays;
this.bits = bigArrays.newLongArray(initialSize, true);
}

View File

@ -25,6 +25,7 @@ import org.apache.lucene.search.Query;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.mapper.MappedFieldType;
@ -48,7 +49,7 @@ class DoubleValuesSource extends SingleDimensionValuesSource<Double> {
DocValueFormat format, boolean missingBucket, int size, int reverseMul) {
super(bigArrays, format, fieldType, missingBucket, size, reverseMul);
this.docValuesFunc = docValuesFunc;
this.bits = missingBucket ? new BitArray(bigArrays, 100) : null;
this.bits = missingBucket ? new BitArray(100, bigArrays) : null;
this.values = bigArrays.newDoubleArray(Math.min(size, 100), false);
}

View File

@ -30,6 +30,7 @@ import org.apache.lucene.search.Query;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
@ -61,7 +62,7 @@ class LongValuesSource extends SingleDimensionValuesSource<Long> {
this.bigArrays = bigArrays;
this.docValuesFunc = docValuesFunc;
this.rounding = rounding;
this.bits = missingBucket ? new BitArray(bigArrays, Math.min(size, 100)) : null;
this.bits = missingBucket ? new BitArray(Math.min(size, 100), bigArrays) : null;
this.values = bigArrays.newLongArray(Math.min(size, 100), false);
}

View File

@ -17,9 +17,8 @@
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
package org.elasticsearch.common.util;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
@ -28,7 +27,7 @@ import java.util.List;
public class BitArrayTests extends ESTestCase {
public void testRandom() {
try (BitArray bitArray = new BitArray(BigArrays.NON_RECYCLING_INSTANCE, 1)) {
try (BitArray bitArray = new BitArray(1, BigArrays.NON_RECYCLING_INSTANCE)) {
int numBits = randomIntBetween(1000, 10000);
for (int step = 0; step < 3; step++) {
boolean[] bits = new boolean[numBits];