Aggregation fix: Sampler agg could not be used with Terms agg’s order.

The Sampler agg was not capable of collecting samples for more than one parent bucket.
Added a Junit test case and changed BestDocsDeferringCollector to internally maintain collections per parent bucket.

Closes #10719
This commit is contained in:
markharwood 2015-04-24 14:31:08 +01:00
parent 20279a2556
commit 8c3500a676
8 changed files with 192 additions and 62 deletions

View File

@ -43,6 +43,8 @@ import org.elasticsearch.search.aggregations.bucket.range.RangeBuilder;
import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeBuilder;
import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanceBuilder;
import org.elasticsearch.search.aggregations.bucket.range.ipv4.IPv4RangeBuilder;
import org.elasticsearch.search.aggregations.bucket.sampler.Sampler;
import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
@ -145,6 +147,13 @@ public class AggregationBuilders {
return new FiltersAggregationBuilder(name);
}
/**
* Create a new {@link Sampler} aggregation with the given name.
*/
public static SamplerAggregationBuilder sampler(String name) {
return new SamplerAggregationBuilder(name);
}
/**
* Create a new {@link Global} aggregation with the given name.
*/

View File

@ -27,6 +27,10 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopScoreDocCollector;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
@ -46,25 +50,29 @@ import java.util.List;
*
*/
public class BestDocsDeferringCollector extends DeferringBucketCollector {
public class BestDocsDeferringCollector extends DeferringBucketCollector implements Releasable {
final List<PerSegmentCollects> entries = new ArrayList<>();
BucketCollector deferred;
TopDocsCollector<? extends ScoreDoc> tdc;
boolean finished = false;
ObjectArray<PerParentBucketSamples> perBucketSamples;
private int shardSize;
private PerSegmentCollects perSegCollector;
private int matchedDocs;
private final BigArrays bigArrays;
/**
* Sole constructor.
*
* @param shardSize
* The number of top-scoring docs to collect for each bucket
* @param bigArrays
*/
public BestDocsDeferringCollector(int shardSize) {
public BestDocsDeferringCollector(int shardSize, BigArrays bigArrays) {
this.shardSize = shardSize;
this.bigArrays = bigArrays;
perBucketSamples = bigArrays.newObjectArray(1);
}
@Override
public boolean needsScores() {
return true;
@ -73,16 +81,10 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector {
/** Set the deferred collectors. */
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.deferred = BucketCollector.wrap(deferredCollectors);
try {
tdc = createTopDocsCollector(shardSize);
} catch (IOException e) {
throw new ElasticsearchException("IO error creating collector", e);
}
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
// finishLeaf();
perSegCollector = new PerSegmentCollects(ctx);
entries.add(perSegCollector);
@ -95,7 +97,7 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector {
@Override
public void collect(int doc, long bucket) throws IOException {
perSegCollector.collect(doc);
perSegCollector.collect(doc, bucket);
}
};
}
@ -112,50 +114,102 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector {
@Override
public void postCollection() throws IOException {
finished = true;
runDeferredAggs();
}
/**
* Replay the wrapped collector, but only on a selection of buckets.
*/
@Override
public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
if (!finished) {
throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
}
if (selectedBuckets.length > 1) {
throw new IllegalStateException("Collection only supported on a single bucket");
}
// no-op - deferred aggs processed in postCollection call
}
private void runDeferredAggs() throws IOException {
deferred.preCollection();
TopDocs topDocs = tdc.topDocs();
ScoreDoc[] sd = topDocs.scoreDocs;
matchedDocs = sd.length;
// Sort the top matches by docID for the benefit of deferred collector
Arrays.sort(sd, new Comparator<ScoreDoc>() {
@Override
public int compare(ScoreDoc o1, ScoreDoc o2) {
return o1.doc - o2.doc;
List<ScoreDoc> allDocs = new ArrayList<>(shardSize);
for (int i = 0; i < perBucketSamples.size(); i++) {
PerParentBucketSamples perBucketSample = perBucketSamples.get(i);
if (perBucketSample == null) {
continue;
}
});
perBucketSample.getMatches(allDocs);
}
// Sort the top matches by docID for the benefit of deferred collector
ScoreDoc[] docsArr = allDocs.toArray(new ScoreDoc[allDocs.size()]);
Arrays.sort(docsArr, new Comparator<ScoreDoc>() {
@Override
public int compare(ScoreDoc o1, ScoreDoc o2) {
if(o1.doc == o2.doc){
return o1.shardIndex - o2.shardIndex;
}
return o1.doc - o2.doc;
}
});
try {
for (PerSegmentCollects perSegDocs : entries) {
perSegDocs.replayRelatedMatches(sd);
perSegDocs.replayRelatedMatches(docsArr);
}
// deferred.postCollection();
} catch (IOException e) {
throw new ElasticsearchException("IOException collecting best scoring results", e);
}
deferred.postCollection();
}
class PerParentBucketSamples {
private LeafCollector currentLeafCollector;
private TopDocsCollector<? extends ScoreDoc> tdc;
private long parentBucket;
private int matchedDocs;
public PerParentBucketSamples(long parentBucket, Scorer scorer, LeafReaderContext readerContext) {
try {
this.parentBucket = parentBucket;
tdc = createTopDocsCollector(shardSize);
currentLeafCollector = tdc.getLeafCollector(readerContext);
setScorer(scorer);
} catch (IOException e) {
throw new ElasticsearchException("IO error creating collector", e);
}
}
public void getMatches(List<ScoreDoc> allDocs) {
TopDocs topDocs = tdc.topDocs();
ScoreDoc[] sd = topDocs.scoreDocs;
matchedDocs = sd.length;
for (ScoreDoc scoreDoc : sd) {
// A bit of a hack to (ab)use shardIndex property here to
// hold a bucket ID but avoids allocating extra data structures
// and users should have bigger concerns if bucket IDs
// exceed int capacity..
scoreDoc.shardIndex = (int) parentBucket;
}
allDocs.addAll(Arrays.asList(sd));
}
public void collect(int doc) throws IOException {
currentLeafCollector.collect(doc);
}
public void setScorer(Scorer scorer) throws IOException {
currentLeafCollector.setScorer(scorer);
}
public void changeSegment(LeafReaderContext readerContext) throws IOException {
currentLeafCollector = tdc.getLeafCollector(readerContext);
}
public int getDocCount() {
return matchedDocs;
}
}
class PerSegmentCollects extends Scorer {
private LeafReaderContext readerContext;
int maxDocId = Integer.MIN_VALUE;
private float currentScore;
private int currentDocId = -1;
private LeafCollector currentLeafCollector;
private Scorer currentScorer;
PerSegmentCollects(LeafReaderContext readerContext) throws IOException {
// The publisher behaviour for Reader/Scorer listeners triggers a
@ -164,12 +218,24 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector {
// However, passing null seems to have no adverse effects here...
super(null);
this.readerContext = readerContext;
currentLeafCollector = tdc.getLeafCollector(readerContext);
for (int i = 0; i < perBucketSamples.size(); i++) {
PerParentBucketSamples perBucketSample = perBucketSamples.get(i);
if (perBucketSample == null) {
continue;
}
perBucketSample.changeSegment(readerContext);
}
}
public void setScorer(Scorer scorer) throws IOException {
currentLeafCollector.setScorer(scorer);
this.currentScorer = scorer;
for (int i = 0; i < perBucketSamples.size(); i++) {
PerParentBucketSamples perBucketSample = perBucketSamples.get(i);
if (perBucketSample == null) {
continue;
}
perBucketSample.setScorer(scorer);
}
}
public void replayRelatedMatches(ScoreDoc[] sd) throws IOException {
@ -188,7 +254,9 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector {
if ((rebased >= 0) && (rebased <= maxDocId)) {
currentScore = scoreDoc.score;
currentDocId = rebased;
leafCollector.collect(rebased, 0);
// We stored the bucket ID in Lucene's shardIndex property
// for convenience.
leafCollector.collect(rebased, scoreDoc.shardIndex);
}
}
@ -224,15 +292,32 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector {
throw new ElasticsearchException("This caching scorer implementation only implements score() and docID()");
}
public void collect(int docId) throws IOException {
currentLeafCollector.collect(docId);
public void collect(int docId, long parentBucket) throws IOException {
perBucketSamples = bigArrays.grow(perBucketSamples, parentBucket + 1);
PerParentBucketSamples sampler = perBucketSamples.get((int) parentBucket);
if (sampler == null) {
sampler = new PerParentBucketSamples(parentBucket, currentScorer, readerContext);
perBucketSamples.set((int) parentBucket, sampler);
}
sampler.collect(docId);
maxDocId = Math.max(maxDocId, docId);
}
}
public int getDocCount() {
return matchedDocs;
public int getDocCount(long parentBucket) {
PerParentBucketSamples sampler = perBucketSamples.get((int) parentBucket);
if (sampler == null) {
// There are conditions where no docs are collected and the aggs
// framework still asks for doc count.
return 0;
}
return sampler.getDocCount();
}
@Override
public void close() throws ElasticsearchException {
Releasables.close(perBucketSamples);
}
}

View File

@ -71,7 +71,7 @@ public class DiversifiedBytesHashSamplerAggregator extends SamplerAggregator {
class DiverseDocsDeferringCollector extends BestDocsDeferringCollector {
public DiverseDocsDeferringCollector() {
super(shardSize);
super(shardSize, context.bigArrays());
}

View File

@ -77,7 +77,7 @@ public class DiversifiedMapSamplerAggregator extends SamplerAggregator {
class DiverseDocsDeferringCollector extends BestDocsDeferringCollector {
public DiverseDocsDeferringCollector() {
super(shardSize);
super(shardSize, context.bigArrays());
}

View File

@ -64,7 +64,7 @@ public class DiversifiedNumericSamplerAggregator extends SamplerAggregator {
*/
class DiverseDocsDeferringCollector extends BestDocsDeferringCollector {
public DiverseDocsDeferringCollector() {
super(shardSize);
super(shardSize, context.bigArrays());
}
@Override

View File

@ -66,7 +66,7 @@ public class DiversifiedOrdinalsSamplerAggregator extends SamplerAggregator {
class DiverseDocsDeferringCollector extends BestDocsDeferringCollector {
public DiverseDocsDeferringCollector() {
super(shardSize);
super(shardSize, context.bigArrays());
}
@Override

View File

@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations.bucket.sampler;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@ -154,7 +155,7 @@ public class SamplerAggregator extends SingleBucketAggregator {
@Override
public DeferringBucketCollector getDeferringCollector() {
bdd = new BestDocsDeferringCollector(shardSize);
bdd = new BestDocsDeferringCollector(shardSize, context.bigArrays());
return bdd;
}
@ -168,7 +169,8 @@ public class SamplerAggregator extends SingleBucketAggregator {
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
runDeferredCollections(owningBucketOrdinal);
return new InternalSampler(name, bdd == null ? 0 : bdd.getDocCount(), bucketAggregations(owningBucketOrdinal), pipelineAggregators(),
return new InternalSampler(name, bdd == null ? 0 : bdd.getDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal),
pipelineAggregators(),
metaData());
}
@ -189,10 +191,6 @@ public class SamplerAggregator extends SingleBucketAggregator {
@Override
public Aggregator createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, context, parent);
}
return new SamplerAggregator(name, shardSize, factories, context, parent, pipelineAggregators, metaData);
}
@ -216,11 +214,6 @@ public class SamplerAggregator extends SingleBucketAggregator {
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, context, parent);
}
if (valuesSource instanceof ValuesSource.Numeric) {
return new DiversifiedNumericSamplerAggregator(name, shardSize, factories, context, parent, pipelineAggregators, metaData,
(Numeric) valuesSource, maxDocsPerValue);
@ -272,5 +265,11 @@ public class SamplerAggregator extends SingleBucketAggregator {
return bdd.getLeafCollector(ctx);
}
@Override
protected void doClose() {
Releasables.close(bdd);
super.doClose();
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
@ -35,10 +36,14 @@ import java.util.Collection;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.search.aggregations.AggregationBuilders.max;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sampler;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
/**
@ -58,11 +63,11 @@ public class SamplerTests extends ElasticsearchIntegrationTest {
public void setupSuiteScopeCluster() throws Exception {
assertAcked(prepareCreate("test").setSettings(SETTING_NUMBER_OF_SHARDS, NUM_SHARDS, SETTING_NUMBER_OF_REPLICAS, 0).addMapping(
"book", "author", "type=string,index=not_analyzed", "name", "type=string,index=analyzed", "genre",
"type=string,index=not_analyzed"));
"type=string,index=not_analyzed", "price", "type=float"));
createIndex("idx_unmapped");
// idx_unmapped_author is same as main index but missing author field
assertAcked(prepareCreate("idx_unmapped_author").setSettings(SETTING_NUMBER_OF_SHARDS, NUM_SHARDS, SETTING_NUMBER_OF_REPLICAS, 0)
.addMapping("book", "name", "type=string,index=analyzed", "genre", "type=string,index=not_analyzed"));
.addMapping("book", "name", "type=string,index=analyzed", "genre", "type=string,index=not_analyzed", "price", "type=float"));
ensureGreen();
String data[] = {
@ -70,7 +75,7 @@ public class SamplerTests extends ElasticsearchIntegrationTest {
"0553573403,book,A Game of Thrones,7.99,true,George R.R. Martin,A Song of Ice and Fire,1,fantasy",
"0553579908,book,A Clash of Kings,7.99,true,George R.R. Martin,A Song of Ice and Fire,2,fantasy",
"055357342X,book,A Storm of Swords,7.99,true,George R.R. Martin,A Song of Ice and Fire,3,fantasy",
"0553293354,book,Foundation,7.99,true,Isaac Asimov,Foundation Novels,1,scifi",
"0553293354,book,Foundation,17.99,true,Isaac Asimov,Foundation Novels,1,scifi",
"0812521390,book,The Black Company,6.99,false,Glen Cook,The Chronicles of The Black Company,1,fantasy",
"0812550706,book,Ender's Game,6.99,true,Orson Scott Card,Ender,1,scifi",
"0441385532,book,Jhereg,7.95,false,Steven Brust,Vlad Taltos,1,fantasy",
@ -82,11 +87,43 @@ public class SamplerTests extends ElasticsearchIntegrationTest {
for (int i = 0; i < data.length; i++) {
String[] parts = data[i].split(",");
client().prepareIndex("test", "book", "" + i).setSource("author", parts[5], "name", parts[2], "genre", parts[8]).get();
client().prepareIndex("idx_unmapped_author", "book", "" + i).setSource("name", parts[2], "genre", parts[8]).get();
client().prepareIndex("test", "book", "" + i).setSource("author", parts[5], "name", parts[2], "genre", parts[8], "price",Float.parseFloat(parts[3])).get();
client().prepareIndex("idx_unmapped_author", "book", "" + i).setSource("name", parts[2], "genre", parts[8],"price",Float.parseFloat(parts[3])).get();
}
client().admin().indices().refresh(new RefreshRequest("test")).get();
}
@Test
public void issue10719() throws Exception {
// Tests that we can refer to nested elements under a sample in a path
// statement
boolean asc = randomBoolean();
SearchResponse response = client().prepareSearch("test").setTypes("book").setSearchType(SearchType.QUERY_AND_FETCH)
.addAggregation(terms("genres")
.field("genre")
.order(Terms.Order.aggregation("sample>max_price.value", asc))
.subAggregation(sampler("sample").shardSize(100)
.subAggregation(max("max_price").field("price")))
).execute().actionGet();
assertSearchResponse(response);
Terms genres = response.getAggregations().get("genres");
Collection<Bucket> genreBuckets = genres.getBuckets();
// For this test to be useful we need >1 genre bucket to compare
assertThat(genreBuckets.size(), greaterThan(1));
double lastMaxPrice = asc ? Double.MIN_VALUE : Double.MAX_VALUE;
for (Terms.Bucket genreBucket : genres.getBuckets()) {
Sampler sample = genreBucket.getAggregations().get("sample");
Max maxPriceInGenre = sample.getAggregations().get("max_price");
double price = maxPriceInGenre.getValue();
if (asc) {
assertThat(price, greaterThanOrEqualTo(lastMaxPrice));
} else {
assertThat(price, lessThanOrEqualTo(lastMaxPrice));
}
lastMaxPrice = price;
}
}
@Test
public void noDiversity() throws Exception {