aggs: Allow aggregation sorting via nested aggregation.

The nested aggregator now buffers all bucket ords per parent document and
emits all bucket ords for a parent document's nested document once. This way
the nested documents document DocIdSetIterator gets used once per bucket
instead of wrapping the nested aggregator inside a multi bucket aggregator,
which was the current solution upto now. This allows sorting by buckets
under a nested bucket.

Closes #16838
This commit is contained in:
Martijn van Groningen 2017-09-18 07:41:18 +02:00
parent a1c766c75c
commit 61849a1150
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
4 changed files with 321 additions and 27 deletions

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations.bucket.nested;
import com.carrotsearch.hppc.LongArrayList;
import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil;
@ -51,14 +52,19 @@ class NestedAggregator extends BucketsAggregator implements SingleBucketAggregat
private final BitSetProducer parentFilter;
private final Query childFilter;
private final boolean collectsFromSingleBucket;
private BufferingNestedLeafBucketCollector bufferingNestedLeafBucketCollector;
NestedAggregator(String name, AggregatorFactories factories, ObjectMapper parentObjectMapper, ObjectMapper childObjectMapper,
SearchContext context, Aggregator parentAggregator,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
SearchContext context, Aggregator parentAggregator,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData,
boolean collectsFromSingleBucket) throws IOException {
super(name, factories, context, parentAggregator, pipelineAggregators, metaData);
Query parentFilter = parentObjectMapper != null ? parentObjectMapper.nestedTypeFilter() : Queries.newNonNestedFilter();
this.parentFilter = context.bitsetFilterCache().getBitSetProducer(parentFilter);
this.childFilter = childObjectMapper.nestedTypeFilter();
this.collectsFromSingleBucket = collectsFromSingleBucket;
}
@Override
@ -71,26 +77,38 @@ class NestedAggregator extends BucketsAggregator implements SingleBucketAggregat
final BitSet parentDocs = parentFilter.getBitSet(ctx);
final DocIdSetIterator childDocs = childDocsScorer != null ? childDocsScorer.iterator() : null;
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
return;
}
if (collectsFromSingleBucket) {
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
return;
}
final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
collectBucket(sub, childDocId, bucket);
for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
collectBucket(sub, childDocId, bucket);
}
}
}
};
};
} else {
doPostCollection();
return bufferingNestedLeafBucketCollector = new BufferingNestedLeafBucketCollector(sub, parentDocs, childDocs);
}
}
@Override
protected void doPostCollection() throws IOException {
if (bufferingNestedLeafBucketCollector != null) {
bufferingNestedLeafBucketCollector.postCollect();
}
}
@Override
@ -104,4 +122,63 @@ class NestedAggregator extends BucketsAggregator implements SingleBucketAggregat
return new InternalNested(name, 0, buildEmptySubAggregations(), pipelineAggregators(), metaData());
}
class BufferingNestedLeafBucketCollector extends LeafBucketCollectorBase {
final BitSet parentDocs;
final LeafBucketCollector sub;
final DocIdSetIterator childDocs;
final LongArrayList bucketBuffer = new LongArrayList();
int currentParentDoc = -1;
BufferingNestedLeafBucketCollector(LeafBucketCollector sub, BitSet parentDocs, DocIdSetIterator childDocs) {
super(sub, null);
this.sub = sub;
this.parentDocs = parentDocs;
this.childDocs = childDocs;
}
@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
return;
}
if (currentParentDoc != parentDoc) {
processChildBuckets(currentParentDoc, bucketBuffer);
currentParentDoc = parentDoc;
}
bucketBuffer.add(bucket);
}
void processChildBuckets(int parentDoc, LongArrayList buckets) throws IOException {
if (bucketBuffer.isEmpty()) {
return;
}
final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
final long[] buffer = buckets.buffer;
final int size = buckets.size();
for (int i = 0; i < size; i++) {
collectBucket(sub, childDocId, buffer[i]);
}
}
bucketBuffer.clear();
}
void postCollect() throws IOException {
processChildBuckets(currentParentDoc, bucketBuffer);
}
}
}

View File

@ -48,13 +48,11 @@ class NestedAggregatorFactory extends AggregatorFactory<NestedAggregatorFactory>
@Override
public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, context, parent);
}
if (childObjectMapper == null) {
return new Unmapped(name, context, parent, pipelineAggregators, metaData);
}
return new NestedAggregator(name, factories, parentObjectMapper, childObjectMapper, context, parent, pipelineAggregators, metaData);
return new NestedAggregator(name, factories, parentObjectMapper, childObjectMapper, context, parent,
pipelineAggregators, metaData, collectsFromSingleBucket);
}
private static final class Unmapped extends NonCollectingAggregator {

View File

@ -91,10 +91,6 @@ public class TopHitsAggregator extends MetricsAggregator {
public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
for (LongObjectPagedHashMap.Cursor<TopDocsAndLeafCollector> cursor : topDocsCollectors) {
cursor.value.leafCollector = cursor.value.topLevelCollector.getLeafCollector(ctx);
}
return new LeafBucketCollectorBase(sub, null) {
Scorer scorer;
@ -103,6 +99,11 @@ public class TopHitsAggregator extends MetricsAggregator {
public void setScorer(Scorer scorer) throws IOException {
this.scorer = scorer;
for (LongObjectPagedHashMap.Cursor<TopDocsAndLeafCollector> cursor : topDocsCollectors) {
// Instantiate the leaf collector not in the getLeafCollector(...) method or in the constructor of this
// anonymous class. Otherwise in the case this leaf bucket collector gets invoked with post collection
// then we already have moved on to the next reader and then we may encounter assertion errors or
// incorrect results.
cursor.value.leafCollector = cursor.value.topLevelCollector.getLeafCollector(ctx);
cursor.value.leafCollector.setScorer(scorer);
}
super.setScorer(scorer);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.bucket.nested;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriterConfig;
@ -34,21 +35,33 @@ import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.TypeFieldMapper;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.stream.DoubleStream;
public class NestedAggregatorTests extends AggregatorTestCase {
@ -314,6 +327,189 @@ public class NestedAggregatorTests extends AggregatorTestCase {
}
}
public void testNestedOrdering() throws IOException {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) {
iw.addDocuments(generateBook("1", new String[]{"a"}, new int[]{12, 13, 14}));
iw.addDocuments(generateBook("2", new String[]{"b"}, new int[]{5, 50}));
iw.addDocuments(generateBook("3", new String[]{"c"}, new int[]{39, 19}));
iw.addDocuments(generateBook("4", new String[]{"d"}, new int[]{2, 1, 3}));
iw.addDocuments(generateBook("5", new String[]{"a"}, new int[]{70, 10}));
iw.addDocuments(generateBook("6", new String[]{"e"}, new int[]{23, 21}));
iw.addDocuments(generateBook("7", new String[]{"e", "a"}, new int[]{8, 8}));
iw.addDocuments(generateBook("8", new String[]{"f"}, new int[]{12, 14}));
iw.addDocuments(generateBook("9", new String[]{"g", "c", "e"}, new int[]{18, 8}));
}
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
MappedFieldType fieldType1 = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
fieldType1.setName("num_pages");
MappedFieldType fieldType2 = new KeywordFieldMapper.KeywordFieldType();
fieldType2.setHasDocValues(true);
fieldType2.setName("author");
TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("authors", ValueType.STRING)
.field("author").order(BucketOrder.aggregation("chapters>num_pages.value", true));
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder("chapters", "nested_chapters");
MaxAggregationBuilder maxAgg = new MaxAggregationBuilder("num_pages").field("num_pages");
nestedBuilder.subAggregation(maxAgg);
termsBuilder.subAggregation(nestedBuilder);
Terms terms = search(newSearcher(indexReader, false, true),
new MatchAllDocsQuery(), termsBuilder, fieldType1, fieldType2);
assertEquals(7, terms.getBuckets().size());
assertEquals("authors", terms.getName());
Terms.Bucket bucket = terms.getBuckets().get(0);
assertEquals("d", bucket.getKeyAsString());
Max numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(3, (int) numPages.getValue());
bucket = terms.getBuckets().get(1);
assertEquals("f", bucket.getKeyAsString());
numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(14, (int) numPages.getValue());
bucket = terms.getBuckets().get(2);
assertEquals("g", bucket.getKeyAsString());
numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(18, (int) numPages.getValue());
bucket = terms.getBuckets().get(3);
assertEquals("e", bucket.getKeyAsString());
numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(23, (int) numPages.getValue());
bucket = terms.getBuckets().get(4);
assertEquals("c", bucket.getKeyAsString());
numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(39, (int) numPages.getValue());
bucket = terms.getBuckets().get(5);
assertEquals("b", bucket.getKeyAsString());
numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(50, (int) numPages.getValue());
bucket = terms.getBuckets().get(6);
assertEquals("a", bucket.getKeyAsString());
numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(70, (int) numPages.getValue());
// reverse order:
termsBuilder = new TermsAggregationBuilder("authors", ValueType.STRING)
.field("author").order(BucketOrder.aggregation("chapters>num_pages.value", false));
nestedBuilder = new NestedAggregationBuilder("chapters", "nested_chapters");
maxAgg = new MaxAggregationBuilder("num_pages").field("num_pages");
nestedBuilder.subAggregation(maxAgg);
termsBuilder.subAggregation(nestedBuilder);
terms = search(newSearcher(indexReader, false, true), new MatchAllDocsQuery(), termsBuilder, fieldType1, fieldType2);
assertEquals(7, terms.getBuckets().size());
assertEquals("authors", terms.getName());
bucket = terms.getBuckets().get(0);
assertEquals("a", bucket.getKeyAsString());
numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(70, (int) numPages.getValue());
bucket = terms.getBuckets().get(1);
assertEquals("b", bucket.getKeyAsString());
numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(50, (int) numPages.getValue());
bucket = terms.getBuckets().get(2);
assertEquals("c", bucket.getKeyAsString());
numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(39, (int) numPages.getValue());
bucket = terms.getBuckets().get(3);
assertEquals("e", bucket.getKeyAsString());
numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(23, (int) numPages.getValue());
bucket = terms.getBuckets().get(4);
assertEquals("g", bucket.getKeyAsString());
numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(18, (int) numPages.getValue());
bucket = terms.getBuckets().get(5);
assertEquals("f", bucket.getKeyAsString());
numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(14, (int) numPages.getValue());
bucket = terms.getBuckets().get(6);
assertEquals("d", bucket.getKeyAsString());
numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(3, (int) numPages.getValue());
}
}
}
public void testNestedOrdering_random() throws IOException {
int numBooks = randomIntBetween(32, 512);
List<Tuple<String, int[]>> books = new ArrayList<>();
for (int i = 0; i < numBooks; i++) {
int numChapters = randomIntBetween(1, 8);
int[] chapters = new int[numChapters];
for (int j = 0; j < numChapters; j++) {
chapters[j] = randomIntBetween(2, 64);
}
books.add(Tuple.tuple(String.format(Locale.ROOT, "%03d", i), chapters));
}
try (Directory directory = newDirectory()) {
try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) {
int id = 0;
for (Tuple<String, int[]> book : books) {
iw.addDocuments(generateBook(
String.format(Locale.ROOT, "%03d", id), new String[]{book.v1()}, book.v2())
);
id++;
}
}
for (Tuple<String, int[]> book : books) {
Arrays.sort(book.v2());
}
books.sort((o1, o2) -> {
int cmp = Integer.compare(o1.v2()[0], o2.v2()[0]);
if (cmp == 0) {
return o1.v1().compareTo(o2.v1());
} else {
return cmp;
}
});
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
MappedFieldType fieldType1 = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
fieldType1.setName("num_pages");
MappedFieldType fieldType2 = new KeywordFieldMapper.KeywordFieldType();
fieldType2.setHasDocValues(true);
fieldType2.setName("author");
TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("authors", ValueType.STRING)
.size(books.size()).field("author")
.order(BucketOrder.compound(BucketOrder.aggregation("chapters>num_pages.value", true), BucketOrder.key(true)));
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder("chapters", "nested_chapters");
MinAggregationBuilder minAgg = new MinAggregationBuilder("num_pages").field("num_pages");
nestedBuilder.subAggregation(minAgg);
termsBuilder.subAggregation(nestedBuilder);
Terms terms = search(newSearcher(indexReader, false, true),
new MatchAllDocsQuery(), termsBuilder, fieldType1, fieldType2);
assertEquals(books.size(), terms.getBuckets().size());
assertEquals("authors", terms.getName());
for (int i = 0; i < books.size(); i++) {
Tuple<String, int[]> book = books.get(i);
Terms.Bucket bucket = terms.getBuckets().get(i);
assertEquals(book.v1(), bucket.getKeyAsString());
Min numPages = ((Nested) bucket.getAggregations().get("chapters")).getAggregations().get("num_pages");
assertEquals(book.v2()[0], (int) numPages.getValue());
}
}
}
}
private double generateMaxDocs(List<Document> documents, int numNestedDocs, int id, String path, String fieldName) {
return DoubleStream.of(generateDocuments(documents, numNestedDocs, id, path, fieldName))
.max().orElse(Double.NEGATIVE_INFINITY);
@ -340,4 +536,26 @@ public class NestedAggregatorTests extends AggregatorTestCase {
return values;
}
private List<Document> generateBook(String id, String[] authors, int[] numPages) {
List<Document> documents = new ArrayList<>();
for (int numPage : numPages) {
Document document = new Document();
document.add(new Field(UidFieldMapper.NAME, "book#" + id, UidFieldMapper.Defaults.NESTED_FIELD_TYPE));
document.add(new Field(TypeFieldMapper.NAME, "__nested_chapters", TypeFieldMapper.Defaults.FIELD_TYPE));
document.add(new SortedNumericDocValuesField("num_pages", numPage));
documents.add(document);
}
Document document = new Document();
document.add(new Field(UidFieldMapper.NAME, "book#" + id, UidFieldMapper.Defaults.FIELD_TYPE));
document.add(new Field(TypeFieldMapper.NAME, "book", TypeFieldMapper.Defaults.FIELD_TYPE));
for (String author : authors) {
document.add(new SortedSetDocValuesField("author", new BytesRef(author)));
}
documents.add(document);
return documents;
}
}