Aggregations: added pagination support to `top_hits` aggregation by adding `from` option.

Closes #6299
This commit is contained in:
Martijn van Groningen 2014-05-26 17:56:07 +02:00
parent 35755cd8a4
commit aab38fb2e6
8 changed files with 76 additions and 8 deletions

View File

@ -13,6 +13,7 @@ This aggregator can't hold any sub-aggregators and therefor can only be used as
==== Options ==== Options
* `from` - The offset from the first result you want to fetch.
* `size` - The maximum number of top matching hits to return per bucket. By default the top three matching hits are returned. * `size` - The maximum number of top matching hits to return per bucket. By default the top three matching hits are returned.
* `sort` - How the top matching hits should be sorted. By default the hits are sorted by the score of the main query. * `sort` - How the top matching hits should be sorted. By default the hits are sorted by the score of the main query.

View File

@ -54,6 +54,7 @@ public class InternalTopHits extends InternalAggregation implements TopHits, ToX
AggregationStreams.registerStream(STREAM, TYPE.stream()); AggregationStreams.registerStream(STREAM, TYPE.stream());
} }
private int from;
private int size; private int size;
private Sort sort; private Sort sort;
private TopDocs topDocs; private TopDocs topDocs;
@ -62,8 +63,9 @@ public class InternalTopHits extends InternalAggregation implements TopHits, ToX
InternalTopHits() { InternalTopHits() {
} }
public InternalTopHits(String name, int size, Sort sort, TopDocs topDocs, InternalSearchHits searchHits) { public InternalTopHits(String name, int from, int size, Sort sort, TopDocs topDocs, InternalSearchHits searchHits) {
this.name = name; this.name = name;
this.from = from;
this.size = size; this.size = size;
this.sort = sort; this.sort = sort;
this.topDocs = topDocs; this.topDocs = topDocs;
@ -104,7 +106,7 @@ public class InternalTopHits extends InternalAggregation implements TopHits, ToX
try { try {
int[] tracker = new int[shardHits.length]; int[] tracker = new int[shardHits.length];
TopDocs reducedTopDocs = TopDocs.merge(sort, size, shardDocs); TopDocs reducedTopDocs = TopDocs.merge(sort, from, size, shardDocs);
InternalSearchHit[] hits = new InternalSearchHit[reducedTopDocs.scoreDocs.length]; InternalSearchHit[] hits = new InternalSearchHit[reducedTopDocs.scoreDocs.length];
for (int i = 0; i < reducedTopDocs.scoreDocs.length; i++) { for (int i = 0; i < reducedTopDocs.scoreDocs.length; i++) {
ScoreDoc scoreDoc = reducedTopDocs.scoreDocs[i]; ScoreDoc scoreDoc = reducedTopDocs.scoreDocs[i];
@ -119,6 +121,7 @@ public class InternalTopHits extends InternalAggregation implements TopHits, ToX
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
name = in.readString(); name = in.readString();
from = in.readVInt();
size = in.readVInt(); size = in.readVInt();
topDocs = Lucene.readTopDocs(in); topDocs = Lucene.readTopDocs(in);
if (topDocs instanceof TopFieldDocs) { if (topDocs instanceof TopFieldDocs) {
@ -130,6 +133,7 @@ public class InternalTopHits extends InternalAggregation implements TopHits, ToX
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(name); out.writeString(name);
out.writeVInt(from);
out.writeVInt(size); out.writeVInt(size);
Lucene.writeTopDocs(out, topDocs, 0); Lucene.writeTopDocs(out, topDocs, 0);
searchHits.writeTo(out); searchHits.writeTo(out);

View File

@ -22,9 +22,13 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregation;
/** /**
* Accumulation of the most relevant hits for a bucket this aggregation falls into.
*/ */
public interface TopHits extends Aggregation { public interface TopHits extends Aggregation {
/**
* @return The top matching hits for the bucket
*/
SearchHits getHits(); SearchHits getHits();
} }

View File

@ -90,7 +90,7 @@ public class TopHitsAggregator extends BucketsAggregator implements ScorerAware
searchHitFields.sortValues(fieldDoc.fields); searchHitFields.sortValues(fieldDoc.fields);
} }
} }
return new InternalTopHits(name, topHitsContext.size(), topHitsContext.sort(), topDocs, fetchResult.hits()); return new InternalTopHits(name, topHitsContext.from(), topHitsContext.size(), topHitsContext.sort(), topDocs, fetchResult.hits());
} }
} }
@ -104,10 +104,10 @@ public class TopHitsAggregator extends BucketsAggregator implements ScorerAware
TopDocsCollector topDocsCollector = topDocsCollectors.get(bucketOrdinal); TopDocsCollector topDocsCollector = topDocsCollectors.get(bucketOrdinal);
if (topDocsCollector == null) { if (topDocsCollector == null) {
Sort sort = topHitsContext.sort(); Sort sort = topHitsContext.sort();
int size = topHitsContext.size(); int topN = topHitsContext.from() + topHitsContext.size();
topDocsCollectors.put( topDocsCollectors.put(
bucketOrdinal, bucketOrdinal,
topDocsCollector = sort != null ? TopFieldCollector.create(sort, size, true, topHitsContext.trackScores(), true, false) : TopScoreDocCollector.create(size, false) topDocsCollector = sort != null ? TopFieldCollector.create(sort, topN, true, topHitsContext.trackScores(), true, false) : TopScoreDocCollector.create(topN, false)
); );
topDocsCollector.setNextReader(currentContext); topDocsCollector.setNextReader(currentContext);
topDocsCollector.setScorer(currentScorer); topDocsCollector.setScorer(currentScorer);

View File

@ -40,6 +40,15 @@ public class TopHitsBuilder extends AbstractAggregationBuilder {
super(name, InternalTopHits.TYPE.name()); super(name, InternalTopHits.TYPE.name());
} }
/**
* The index to start to return hits from. Defaults to <tt>0</tt>.
*/
public TopHitsBuilder setFrom(int from) {
sourceBuilder().from(from);
return this;
}
/** /**
* The number of search hits to return. Defaults to <tt>10</tt>. * The number of search hits to return. Defaults to <tt>10</tt>.
*/ */

View File

@ -71,6 +71,7 @@ public class TopHitsContext extends SearchContext {
// the to hits are returned per bucket. // the to hits are returned per bucket.
private final static int DEFAULT_SIZE = 3; private final static int DEFAULT_SIZE = 3;
private int from;
private int size = DEFAULT_SIZE; private int size = DEFAULT_SIZE;
private Sort sort; private Sort sort;
@ -440,12 +441,13 @@ public class TopHitsContext extends SearchContext {
@Override @Override
public int from() { public int from() {
return context.from(); return from;
} }
@Override @Override
public SearchContext from(int from) { public SearchContext from(int from) {
throw new UnsupportedOperationException("Not supported"); this.from = from;
return this;
} }
@Override @Override

View File

@ -72,6 +72,9 @@ public class TopHitsParser implements Aggregator.Parser {
currentFieldName = parser.currentName(); currentFieldName = parser.currentName();
} else if (token.isValue()) { } else if (token.isValue()) {
switch (currentFieldName) { switch (currentFieldName) {
case "from":
topHitsContext.from(parser.intValue());
break;
case "size": case "size":
topHitsContext.size(parser.intValue()); topHitsContext.size(parser.intValue());
break; break;

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
@ -76,7 +77,6 @@ public class TopHitsTests extends ElasticsearchIntegrationTest {
.endObject())); .endObject()));
} }
// Use routing to make sure all docs are in the same shard for consistent scoring
builders.add(client().prepareIndex("idx", "field-collapsing", "1").setSource(jsonBuilder() builders.add(client().prepareIndex("idx", "field-collapsing", "1").setSource(jsonBuilder()
.startObject() .startObject()
.field("group", "a") .field("group", "a")
@ -169,6 +169,51 @@ public class TopHitsTests extends ElasticsearchIntegrationTest {
} }
} }
@Test
public void testPagination() throws Exception {
int size = randomIntBetween(0, 10);
int from = randomIntBetween(0, 10);
SearchResponse response = client().prepareSearch("idx").setTypes("type")
.addAggregation(terms("terms")
.executionHint(randomExecutionHint())
.field(TERMS_AGGS_FIELD)
.subAggregation(
topHits("hits").addSort(SortBuilders.fieldSort(SORT_FIELD).order(SortOrder.DESC))
.setFrom(from)
.setSize(size)
)
)
.get();
assertSearchResponse(response);
SearchResponse control = client().prepareSearch("idx")
.setTypes("type")
.setFrom(from)
.setSize(size)
.setPostFilter(FilterBuilders.termFilter(TERMS_AGGS_FIELD, "val0"))
.addSort(SORT_FIELD, SortOrder.DESC)
.get();
assertSearchResponse(control);
SearchHits controlHits = control.getHits();
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
assertThat(terms.getBuckets().size(), equalTo(5));
Terms.Bucket bucket = terms.getBucketByKey("val0");
assertThat(bucket, notNullValue());
assertThat(bucket.getDocCount(), equalTo(10l));
TopHits topHits = bucket.getAggregations().get("hits");
SearchHits hits = topHits.getHits();
assertThat(hits.totalHits(), equalTo(controlHits.totalHits()));
assertThat(hits.getHits().length, equalTo(controlHits.getHits().length));
for (int i = 0; i < hits.getHits().length; i++) {
assertThat(hits.getAt(i).id(), equalTo(controlHits.getAt(i).id()));
assertThat(hits.getAt(i).sortValues()[0], equalTo(controlHits.getAt(i).sortValues()[0]));
}
}
@Test @Test
public void testSortByBucket() throws Exception { public void testSortByBucket() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type") SearchResponse response = client().prepareSearch("idx").setTypes("type")