From e48bc2eed71b1808e5d98960976ec60e6e722d78 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Mon, 23 Jan 2017 16:33:51 +0100 Subject: [PATCH] Add field collapsing for search request (#22337) * Add top hits collapsing to search request The field collapsing is done with a custom top docs collector that "collapse" search hits with same field value. The distributed aspect is resolve using the two passes that the regular search uses. The first pass "collapse" the top hits, then the coordinating node merge/collapse the top hits from each shard. ``` GET _search { "collapse": { "field": "category", } } ``` This change also adds an ExpandCollapseSearchResponseListener that intercepts the search response and expands collapsed hits using the CollapseBuilder#innerHit} options. The retrieval of each inner_hits is done by sending a query to all shards filtered by the collapse key. ``` GET _search { "collapse": { "field": "category", "inner_hits": { "size": 2 } } } ``` --- .../search/grouping/CollapseTopFieldDocs.java | 215 +++++++++ .../grouping/CollapsingDocValuesSource.java | 211 +++++++++ .../grouping/CollapsingTopDocsCollector.java | 214 +++++++++ .../action/search/SearchPhaseController.java | 39 +- .../action/search/SearchRequestBuilder.java | 8 +- .../elasticsearch/common/lucene/Lucene.java | 258 +++++++---- .../java/org/elasticsearch/node/Node.java | 2 +- .../search/DefaultSearchContext.java | 13 + .../elasticsearch/search/SearchModule.java | 14 +- .../elasticsearch/search/SearchService.java | 6 + .../search/builder/SearchSourceBuilder.java | 34 +- .../search/collapse/CollapseBuilder.java | 179 ++++++++ .../search/collapse/CollapseContext.java | 63 +++ .../ExpandCollapseSearchResponseListener.java | 119 +++++ .../subphase/DocValueFieldsFetchSubPhase.java | 10 + .../internal/FilteredSearchContext.java | 11 + .../search/internal/SearchContext.java | 5 + .../search/internal/SubSearchContext.java | 6 + .../search/query/QueryPhase.java | 45 +- .../CollapsingTopDocsCollectorTests.java | 426 ++++++++++++++++++ .../search/AbstractSearchTestCase.java | 4 +- .../search/collapse/CollapseBuilderTests.java | 179 ++++++++ docs/build.gradle | 10 + .../search/request/collapse.asciidoc | 72 +++ .../ReindexParallelizationHelperTests.java | 3 +- .../test/search/110_field_collapsing.yaml | 203 +++++++++ .../search/RandomSearchRequestGenerator.java | 9 +- .../elasticsearch/test/TestSearchContext.java | 11 + 28 files changed, 2253 insertions(+), 116 deletions(-) create mode 100644 core/src/main/java/org/apache/lucene/search/grouping/CollapseTopFieldDocs.java create mode 100644 core/src/main/java/org/apache/lucene/search/grouping/CollapsingDocValuesSource.java create mode 100644 core/src/main/java/org/apache/lucene/search/grouping/CollapsingTopDocsCollector.java create mode 100644 core/src/main/java/org/elasticsearch/search/collapse/CollapseBuilder.java create mode 100644 core/src/main/java/org/elasticsearch/search/collapse/CollapseContext.java create mode 100644 core/src/main/java/org/elasticsearch/search/collapse/ExpandCollapseSearchResponseListener.java create mode 100644 core/src/test/java/org/apache/lucene/grouping/CollapsingTopDocsCollectorTests.java create mode 100644 core/src/test/java/org/elasticsearch/search/collapse/CollapseBuilderTests.java create mode 100644 docs/reference/search/request/collapse.asciidoc create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/search/110_field_collapsing.yaml diff --git a/core/src/main/java/org/apache/lucene/search/grouping/CollapseTopFieldDocs.java b/core/src/main/java/org/apache/lucene/search/grouping/CollapseTopFieldDocs.java new file mode 100644 index 00000000000..a101b1d82f9 --- /dev/null +++ b/core/src/main/java/org/apache/lucene/search/grouping/CollapseTopFieldDocs.java @@ -0,0 +1,215 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lucene.search.grouping; + +import org.apache.lucene.search.FieldComparator; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TopFieldDocs; +import org.apache.lucene.util.PriorityQueue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Represents hits returned by {@link CollapsingTopDocsCollector#getTopDocs()}. + */ +public class CollapseTopFieldDocs extends TopFieldDocs { + /** The field used for collapsing **/ + public final String field; + /** The collapse value for each top doc */ + public final Object[] collapseValues; + + public CollapseTopFieldDocs(String field, int totalHits, ScoreDoc[] scoreDocs, + SortField[] sortFields, Object[] values, float maxScore) { + super(totalHits, scoreDocs, sortFields, maxScore); + this.field = field; + this.collapseValues = values; + } + + // Refers to one hit: + private static class ShardRef { + // Which shard (index into shardHits[]): + final int shardIndex; + + // Which hit within the shard: + int hitIndex; + + public ShardRef(int shardIndex) { + this.shardIndex = shardIndex; + } + + @Override + public String toString() { + return "ShardRef(shardIndex=" + shardIndex + " hitIndex=" + hitIndex + ")"; + } + }; + + private static class MergeSortQueue extends PriorityQueue { + // These are really FieldDoc instances: + final ScoreDoc[][] shardHits; + final FieldComparator[] comparators; + final int[] reverseMul; + + public MergeSortQueue(Sort sort, CollapseTopFieldDocs[] shardHits) throws IOException { + super(shardHits.length); + this.shardHits = new ScoreDoc[shardHits.length][]; + for (int shardIDX = 0; shardIDX < shardHits.length; shardIDX++) { + final ScoreDoc[] shard = shardHits[shardIDX].scoreDocs; + if (shard != null) { + this.shardHits[shardIDX] = shard; + // Fail gracefully if API is misused: + for (int hitIDX = 0; hitIDX < shard.length; hitIDX++) { + final ScoreDoc sd = shard[hitIDX]; + final FieldDoc gd = (FieldDoc) sd; + assert gd.fields != null; + } + } + } + + final SortField[] sortFields = sort.getSort(); + comparators = new FieldComparator[sortFields.length]; + reverseMul = new int[sortFields.length]; + for (int compIDX = 0; compIDX < sortFields.length; compIDX++) { + final SortField sortField = sortFields[compIDX]; + comparators[compIDX] = sortField.getComparator(1, compIDX); + reverseMul[compIDX] = sortField.getReverse() ? -1 : 1; + } + } + + // Returns true if first is < second + @Override + public boolean lessThan(ShardRef first, ShardRef second) { + assert first != second; + final FieldDoc firstFD = (FieldDoc) shardHits[first.shardIndex][first.hitIndex]; + final FieldDoc secondFD = (FieldDoc) shardHits[second.shardIndex][second.hitIndex]; + + for (int compIDX = 0; compIDX < comparators.length; compIDX++) { + final FieldComparator comp = comparators[compIDX]; + + final int cmp = + reverseMul[compIDX] * comp.compareValues(firstFD.fields[compIDX], secondFD.fields[compIDX]); + + if (cmp != 0) { + return cmp < 0; + } + } + + // Tie break: earlier shard wins + if (first.shardIndex < second.shardIndex) { + return true; + } else if (first.shardIndex > second.shardIndex) { + return false; + } else { + // Tie break in same shard: resolve however the + // shard had resolved it: + assert first.hitIndex != second.hitIndex; + return first.hitIndex < second.hitIndex; + } + } + } + + /** + * Returns a new CollapseTopDocs, containing topN collapsed results across + * the provided CollapseTopDocs, sorting by score. Each {@link CollapseTopFieldDocs} instance must be sorted. + **/ + public static CollapseTopFieldDocs merge(Sort sort, int start, int size, + CollapseTopFieldDocs[] shardHits) throws IOException { + String collapseField = shardHits[0].field; + for (int i = 1; i < shardHits.length; i++) { + if (collapseField.equals(shardHits[i].field) == false) { + throw new IllegalArgumentException("collapse field differ across shards [" + + collapseField + "] != [" + shardHits[i].field + "]"); + } + } + final PriorityQueue queue = new MergeSortQueue(sort, shardHits); + + int totalHitCount = 0; + int availHitCount = 0; + float maxScore = Float.MIN_VALUE; + for(int shardIDX=0;shardIDX 0) { + availHitCount += shard.scoreDocs.length; + queue.add(new ShardRef(shardIDX)); + maxScore = Math.max(maxScore, shard.getMaxScore()); + } + } + + if (availHitCount == 0) { + maxScore = Float.NaN; + } + + final ScoreDoc[] hits; + final Object[] values; + if (availHitCount <= start) { + hits = new ScoreDoc[0]; + values = new Object[0]; + } else { + List hitList = new ArrayList<>(); + List collapseList = new ArrayList<>(); + int requestedResultWindow = start + size; + int numIterOnHits = Math.min(availHitCount, requestedResultWindow); + int hitUpto = 0; + Set seen = new HashSet<>(); + while (hitUpto < numIterOnHits) { + if (queue.size() == 0) { + break; + } + ShardRef ref = queue.top(); + final ScoreDoc hit = shardHits[ref.shardIndex].scoreDocs[ref.hitIndex]; + final Object collapseValue = shardHits[ref.shardIndex].collapseValues[ref.hitIndex++]; + if (seen.contains(collapseValue)) { + if (ref.hitIndex < shardHits[ref.shardIndex].scoreDocs.length) { + queue.updateTop(); + } else { + queue.pop(); + } + continue; + } + seen.add(collapseValue); + hit.shardIndex = ref.shardIndex; + if (hitUpto >= start) { + hitList.add(hit); + collapseList.add(collapseValue); + } + + hitUpto++; + + if (ref.hitIndex < shardHits[ref.shardIndex].scoreDocs.length) { + // Not done with this these TopDocs yet: + queue.updateTop(); + } else { + queue.pop(); + } + } + hits = hitList.toArray(new ScoreDoc[0]); + values = collapseList.toArray(new Object[0]); + } + return new CollapseTopFieldDocs(collapseField, totalHitCount, hits, sort.getSort(), values, maxScore); + } +} diff --git a/core/src/main/java/org/apache/lucene/search/grouping/CollapsingDocValuesSource.java b/core/src/main/java/org/apache/lucene/search/grouping/CollapsingDocValuesSource.java new file mode 100644 index 00000000000..7573e98c6f9 --- /dev/null +++ b/core/src/main/java/org/apache/lucene/search/grouping/CollapsingDocValuesSource.java @@ -0,0 +1,211 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lucene.search.grouping; + +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; + +import java.io.IOException; + +/** + * Utility class that ensures that a single collapse key is extracted per document. + */ +abstract class CollapsingDocValuesSource { + protected final String field; + + CollapsingDocValuesSource(String field) throws IOException { + this.field = field; + } + + abstract T get(int doc); + + abstract T copy(T value, T reuse); + + abstract void setNextReader(LeafReader reader) throws IOException; + + /** + * Implementation for {@link NumericDocValues} and {@link SortedNumericDocValues}. + * Fails with an {@link IllegalStateException} if a document contains multiple values for the specified field. + */ + static class Numeric extends CollapsingDocValuesSource { + private NumericDocValues values; + private Bits docsWithField; + + public Numeric(String field) throws IOException { + super(field); + } + + @Override + public Long get(int doc) { + if (docsWithField.get(doc)) { + return values.get(doc); + } else { + return null; + } + } + + @Override + public Long copy(Long value, Long reuse) { + return value; + } + + @Override + public void setNextReader(LeafReader reader) throws IOException { + DocValuesType type = getDocValuesType(reader, field); + if (type == null || type == DocValuesType.NONE) { + values = DocValues.emptyNumeric(); + docsWithField = new Bits.MatchNoBits(reader.maxDoc()); + return ; + } + docsWithField = DocValues.getDocsWithField(reader, field); + switch (type) { + case NUMERIC: + values = DocValues.getNumeric(reader, field); + break; + + case SORTED_NUMERIC: + final SortedNumericDocValues sorted = DocValues.getSortedNumeric(reader, field); + values = DocValues.unwrapSingleton(sorted); + if (values == null) { + values = new NumericDocValues() { + @Override + public long get(int docID) { + sorted.setDocument(docID); + assert sorted.count() > 0; + if (sorted.count() > 1) { + throw new IllegalStateException("failed to collapse " + docID + + ", the collapse field must be single valued"); + } + return sorted.valueAt(0); + } + }; + } + break; + + default: + throw new IllegalStateException("unexpected doc values type " + + type + "` for field `" + field + "`"); + } + } + } + + /** + * Implementation for {@link SortedDocValues} and {@link SortedSetDocValues}. + * Fails with an {@link IllegalStateException} if a document contains multiple values for the specified field. + */ + static class Keyword extends CollapsingDocValuesSource { + private Bits docsWithField; + private SortedDocValues values; + + public Keyword(String field) throws IOException { + super(field); + } + + @Override + public BytesRef get(int doc) { + if (docsWithField.get(doc)) { + return values.get(doc); + } else { + return null; + } + } + + @Override + public BytesRef copy(BytesRef value, BytesRef reuse) { + if (value == null) { + return null; + } + if (reuse != null) { + reuse.bytes = ArrayUtil.grow(reuse.bytes, value.length); + reuse.offset = 0; + reuse.length = value.length; + System.arraycopy(value.bytes, value.offset, reuse.bytes, 0, value.length); + return reuse; + } else { + return BytesRef.deepCopyOf(value); + } + } + + @Override + public void setNextReader(LeafReader reader) throws IOException { + DocValuesType type = getDocValuesType(reader, field); + if (type == null || type == DocValuesType.NONE) { + values = DocValues.emptySorted(); + docsWithField = new Bits.MatchNoBits(reader.maxDoc()); + return ; + } + docsWithField = DocValues.getDocsWithField(reader, field); + switch (type) { + case SORTED: + values = DocValues.getSorted(reader, field); + break; + + case SORTED_SET: + final SortedSetDocValues sorted = DocValues.getSortedSet(reader, field); + values = DocValues.unwrapSingleton(sorted); + if (values == null) { + values = new SortedDocValues() { + @Override + public int getOrd(int docID) { + sorted.setDocument(docID); + int ord = (int) sorted.nextOrd(); + if (sorted.nextOrd() != SortedSetDocValues.NO_MORE_ORDS) { + throw new IllegalStateException("failed to collapse " + docID + + ", the collapse field must be single valued"); + } + return ord; + } + + @Override + public BytesRef lookupOrd(int ord) { + return sorted.lookupOrd(ord); + } + + @Override + public int getValueCount() { + return (int) sorted.getValueCount(); + } + }; + } + break; + + default: + throw new IllegalStateException("unexpected doc values type " + + type + "` for field `" + field + "`"); + } + } + } + + private static DocValuesType getDocValuesType(LeafReader in, String field) { + FieldInfo fi = in.getFieldInfos().fieldInfo(field); + if (fi != null) { + return fi.getDocValuesType(); + } + return null; + } +} diff --git a/core/src/main/java/org/apache/lucene/search/grouping/CollapsingTopDocsCollector.java b/core/src/main/java/org/apache/lucene/search/grouping/CollapsingTopDocsCollector.java new file mode 100644 index 00000000000..955a63e5483 --- /dev/null +++ b/core/src/main/java/org/apache/lucene/search/grouping/CollapsingTopDocsCollector.java @@ -0,0 +1,214 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lucene.search.grouping; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.util.BytesRef; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; + +import static org.apache.lucene.search.SortField.Type.SCORE; + +/** + * A collector that groups documents based on field values and returns {@link CollapseTopFieldDocs} + * output. The collapsing is done in a single pass by selecting only the top sorted document per collapse key. + * The value used for the collapse key of each group can be found in {@link CollapseTopFieldDocs#collapseValues}. + */ +public abstract class CollapsingTopDocsCollector extends FirstPassGroupingCollector { + protected final String collapseField; + + protected final Sort sort; + protected Scorer scorer; + + private int totalHitCount; + private float maxScore; + private final boolean trackMaxScore; + + private CollapsingTopDocsCollector(String collapseField, Sort sort, + int topN, boolean trackMaxScore) throws IOException { + super(sort, topN); + this.collapseField = collapseField; + this.trackMaxScore = trackMaxScore; + if (trackMaxScore) { + maxScore = Float.NEGATIVE_INFINITY; + } else { + maxScore = Float.NaN; + } + this.sort = sort; + } + + /** + * Transform {@link FirstPassGroupingCollector#getTopGroups(int, boolean)} output in + * {@link CollapseTopFieldDocs}. The collapsing needs only one pass so we can create the final top docs at the end + * of the first pass. + */ + public CollapseTopFieldDocs getTopDocs() { + Collection> groups = super.getTopGroups(0, true); + if (groups == null) { + return new CollapseTopFieldDocs(collapseField, totalHitCount, new ScoreDoc[0], + sort.getSort(), new Object[0], Float.NaN); + } + FieldDoc[] docs = new FieldDoc[groups.size()]; + Object[] collapseValues = new Object[groups.size()]; + int scorePos = -1; + for (int index = 0; index < sort.getSort().length; index++) { + SortField sortField = sort.getSort()[index]; + if (sortField.getType() == SCORE) { + scorePos = index; + break; + } + } + int pos = 0; + Iterator> it = orderedGroups.iterator(); + for (SearchGroup group : groups) { + assert it.hasNext(); + CollectedSearchGroup col = it.next(); + float score = Float.NaN; + if (scorePos != -1) { + score = (float) group.sortValues[scorePos]; + } + docs[pos] = new FieldDoc(col.topDoc, score, group.sortValues); + collapseValues[pos] = group.groupValue; + pos++; + } + return new CollapseTopFieldDocs(collapseField, totalHitCount, docs, sort.getSort(), + collapseValues, maxScore); + } + + @Override + public boolean needsScores() { + if (super.needsScores() == false) { + return trackMaxScore; + } + return true; + } + + @Override + public void setScorer(Scorer scorer) throws IOException { + super.setScorer(scorer); + this.scorer = scorer; + } + + @Override + public void collect(int doc) throws IOException { + super.collect(doc); + if (trackMaxScore) { + maxScore = Math.max(maxScore, scorer.score()); + } + totalHitCount++; + } + + private static class Numeric extends CollapsingTopDocsCollector { + private final CollapsingDocValuesSource.Numeric source; + + private Numeric(String collapseField, Sort sort, int topN, boolean trackMaxScore) throws IOException { + super(collapseField, sort, topN, trackMaxScore); + source = new CollapsingDocValuesSource.Numeric(collapseField); + } + + @Override + protected void doSetNextReader(LeafReaderContext readerContext) throws IOException { + super.doSetNextReader(readerContext); + source.setNextReader(readerContext.reader()); + } + + @Override + protected Long getDocGroupValue(int doc) { + return source.get(doc); + } + + @Override + protected Long copyDocGroupValue(Long groupValue, Long reuse) { + return source.copy(groupValue, reuse); + } + } + + private static class Keyword extends CollapsingTopDocsCollector { + private final CollapsingDocValuesSource.Keyword source; + + private Keyword(String collapseField, Sort sort, int topN, boolean trackMaxScore) throws IOException { + super(collapseField, sort, topN, trackMaxScore); + source = new CollapsingDocValuesSource.Keyword(collapseField); + + } + + @Override + protected void doSetNextReader(LeafReaderContext readerContext) throws IOException { + super.doSetNextReader(readerContext); + source.setNextReader(readerContext.reader()); + } + + @Override + protected BytesRef getDocGroupValue(int doc) { + return source.get(doc); + } + + @Override + protected BytesRef copyDocGroupValue(BytesRef groupValue, BytesRef reuse) { + return source.copy(groupValue, reuse); + } + } + + /** + * Create a collapsing top docs collector on a {@link org.apache.lucene.index.NumericDocValues} field. + * It accepts also {@link org.apache.lucene.index.SortedNumericDocValues} field but + * the collect will fail with an {@link IllegalStateException} if a document contains more than one value for the + * field. + * + * @param collapseField The sort field used to group + * documents. + * @param sort The {@link Sort} used to sort the collapsed hits. + * The collapsing keeps only the top sorted document per collapsed key. + * This must be non-null, ie, if you want to groupSort by relevance + * use Sort.RELEVANCE. + * @param topN How many top groups to keep. + * @throws IOException When I/O related errors occur + */ + public static CollapsingTopDocsCollector createNumeric(String collapseField, Sort sort, + int topN, boolean trackMaxScore) throws IOException { + return new Numeric(collapseField, sort, topN, trackMaxScore); + } + + /** + * Create a collapsing top docs collector on a {@link org.apache.lucene.index.SortedDocValues} field. + * It accepts also {@link org.apache.lucene.index.SortedSetDocValues} field but + * the collect will fail with an {@link IllegalStateException} if a document contains more than one value for the + * field. + * + * @param collapseField The sort field used to group + * documents. + * @param sort The {@link Sort} used to sort the collapsed hits. The collapsing keeps only the top sorted + * document per collapsed key. + * This must be non-null, ie, if you want to groupSort by relevance use Sort.RELEVANCE. + * @param topN How many top groups to keep. + * @throws IOException When I/O related errors occur + */ + public static CollapsingTopDocsCollector createKeyword(String collapseField, Sort sort, + int topN, boolean trackMaxScore) throws IOException { + return new Keyword(collapseField, sort, topN, trackMaxScore); + } +} + diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 0243a872d61..097382c725e 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -30,6 +30,7 @@ import org.apache.lucene.search.SortField; import org.apache.lucene.search.TermStatistics; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopFieldDocs; +import org.apache.lucene.search.grouping.CollapseTopFieldDocs; import org.elasticsearch.common.collect.HppcMaps; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.lucene.Lucene; @@ -251,7 +252,26 @@ public class SearchPhaseController extends AbstractComponent { } final TopDocs mergedTopDocs; - if (firstResult.queryResult().topDocs() instanceof TopFieldDocs) { + int numShards = resultsArr.length(); + if (firstResult.queryResult().topDocs() instanceof CollapseTopFieldDocs) { + CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) firstResult.queryResult().topDocs(); + final Sort sort = new Sort(firstTopDocs.fields); + + final CollapseTopFieldDocs[] shardTopDocs = new CollapseTopFieldDocs[numShards]; + for (AtomicArray.Entry sortedResult : sortedResults) { + TopDocs topDocs = sortedResult.value.queryResult().topDocs(); + // the 'index' field is the position in the resultsArr atomic array + shardTopDocs[sortedResult.index] = (CollapseTopFieldDocs) topDocs; + } + // TopDocs#merge can't deal with null shard TopDocs + for (int i = 0; i < shardTopDocs.length; ++i) { + if (shardTopDocs[i] == null) { + shardTopDocs[i] = new CollapseTopFieldDocs(firstTopDocs.field, 0, new FieldDoc[0], + sort.getSort(), new Object[0], Float.NaN); + } + } + mergedTopDocs = CollapseTopFieldDocs.merge(sort, from, topN, shardTopDocs); + } else if (firstResult.queryResult().topDocs() instanceof TopFieldDocs) { TopFieldDocs firstTopDocs = (TopFieldDocs) firstResult.queryResult().topDocs(); final Sort sort = new Sort(firstTopDocs.fields); @@ -331,6 +351,8 @@ public class SearchPhaseController extends AbstractComponent { } // from is always zero as when we use scroll, we ignore from long size = Math.min(fetchHits, topN(queryResults)); + // with collapsing we can have more hits than sorted docs + size = Math.min(sortedScoreDocs.length, size); for (int sortedDocsIndex = 0; sortedDocsIndex < size; sortedDocsIndex++) { ScoreDoc scoreDoc = sortedScoreDocs[sortedDocsIndex]; lastEmittedDocPerShard[scoreDoc.shardIndex] = scoreDoc; @@ -377,11 +399,16 @@ public class SearchPhaseController extends AbstractComponent { boolean sorted = false; int sortScoreIndex = -1; if (firstResult.topDocs() instanceof TopFieldDocs) { - sorted = true; TopFieldDocs fieldDocs = (TopFieldDocs) firstResult.queryResult().topDocs(); - for (int i = 0; i < fieldDocs.fields.length; i++) { - if (fieldDocs.fields[i].getType() == SortField.Type.SCORE) { - sortScoreIndex = i; + if (fieldDocs instanceof CollapseTopFieldDocs && + fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) { + sorted = false; + } else { + sorted = true; + for (int i = 0; i < fieldDocs.fields.length; i++) { + if (fieldDocs.fields[i].getType() == SortField.Type.SCORE) { + sortScoreIndex = i; + } } } } @@ -420,6 +447,8 @@ public class SearchPhaseController extends AbstractComponent { } int from = ignoreFrom ? 0 : firstResult.queryResult().from(); int numSearchHits = (int) Math.min(fetchHits - from, topN(queryResults)); + // with collapsing we can have more fetch hits than sorted docs + numSearchHits = Math.min(sortedDocs.length, numSearchHits); // merge hits List hits = new ArrayList<>(); if (!fetchResults.isEmpty()) { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 3c320447fe8..865cf01430f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -26,13 +26,14 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.script.Script; +import org.elasticsearch.search.collapse.CollapseBuilder; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; -import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.rescore.RescoreBuilder; +import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.SuggestBuilder; @@ -503,6 +504,11 @@ public class SearchRequestBuilder extends ActionRequestBuilder GEO_DISTANCE_SORT_TYPE_CLASS = LatLonDocValuesField.newDistanceSort("some_geo_field", 0, 0).getClass(); public static void writeTopDocs(StreamOutput out, TopDocs topDocs) throws IOException { - if (topDocs instanceof TopFieldDocs) { - out.writeBoolean(true); + if (topDocs instanceof CollapseTopFieldDocs) { + out.writeByte((byte) 2); + CollapseTopFieldDocs collapseDocs = (CollapseTopFieldDocs) topDocs; + + out.writeVInt(topDocs.totalHits); + out.writeFloat(topDocs.getMaxScore()); + + out.writeString(collapseDocs.field); + + out.writeVInt(collapseDocs.fields.length); + for (SortField sortField : collapseDocs.fields) { + writeSortField(out, sortField); + } + + out.writeVInt(topDocs.scoreDocs.length); + for (int i = 0; i < topDocs.scoreDocs.length; i++) { + ScoreDoc doc = collapseDocs.scoreDocs[i]; + writeFieldDoc(out, (FieldDoc) doc); + writeSortValue(out, collapseDocs.collapseValues[i]); + } + } else if (topDocs instanceof TopFieldDocs) { + out.writeByte((byte) 1); TopFieldDocs topFieldDocs = (TopFieldDocs) topDocs; out.writeVInt(topDocs.totalHits); @@ -363,31 +421,7 @@ public class Lucene { out.writeVInt(topFieldDocs.fields.length); for (SortField sortField : topFieldDocs.fields) { - if (sortField.getClass() == GEO_DISTANCE_SORT_TYPE_CLASS) { - // for geo sorting, we replace the SortField with a SortField that assumes a double field. - // this works since the SortField is only used for merging top docs - SortField newSortField = new SortField(sortField.getField(), SortField.Type.DOUBLE); - newSortField.setMissingValue(sortField.getMissingValue()); - sortField = newSortField; - } - if (sortField.getClass() != SortField.class) { - throw new IllegalArgumentException("Cannot serialize SortField impl [" + sortField + "]"); - } - if (sortField.getField() == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeString(sortField.getField()); - } - if (sortField.getComparatorSource() != null) { - IndexFieldData.XFieldComparatorSource comparatorSource = (IndexFieldData.XFieldComparatorSource) sortField.getComparatorSource(); - writeSortType(out, comparatorSource.reducedType()); - writeMissingValue(out, comparatorSource.missingValue(sortField.getReverse())); - } else { - writeSortType(out, sortField.getType()); - writeMissingValue(out, sortField.getMissingValue()); - } - out.writeBoolean(sortField.getReverse()); + writeSortField(out, sortField); } out.writeVInt(topDocs.scoreDocs.length); @@ -395,7 +429,7 @@ public class Lucene { writeFieldDoc(out, (FieldDoc) doc); } } else { - out.writeBoolean(false); + out.writeByte((byte) 0); out.writeVInt(topDocs.totalHits); out.writeFloat(topDocs.getMaxScore()); @@ -431,44 +465,49 @@ public class Lucene { } } + + private static void writeSortValue(StreamOutput out, Object field) throws IOException { + if (field == null) { + out.writeByte((byte) 0); + } else { + Class type = field.getClass(); + if (type == String.class) { + out.writeByte((byte) 1); + out.writeString((String) field); + } else if (type == Integer.class) { + out.writeByte((byte) 2); + out.writeInt((Integer) field); + } else if (type == Long.class) { + out.writeByte((byte) 3); + out.writeLong((Long) field); + } else if (type == Float.class) { + out.writeByte((byte) 4); + out.writeFloat((Float) field); + } else if (type == Double.class) { + out.writeByte((byte) 5); + out.writeDouble((Double) field); + } else if (type == Byte.class) { + out.writeByte((byte) 6); + out.writeByte((Byte) field); + } else if (type == Short.class) { + out.writeByte((byte) 7); + out.writeShort((Short) field); + } else if (type == Boolean.class) { + out.writeByte((byte) 8); + out.writeBoolean((Boolean) field); + } else if (type == BytesRef.class) { + out.writeByte((byte) 9); + out.writeBytesRef((BytesRef) field); + } else { + throw new IOException("Can't handle sort field value of type [" + type + "]"); + } + } + } + public static void writeFieldDoc(StreamOutput out, FieldDoc fieldDoc) throws IOException { out.writeVInt(fieldDoc.fields.length); for (Object field : fieldDoc.fields) { - if (field == null) { - out.writeByte((byte) 0); - } else { - Class type = field.getClass(); - if (type == String.class) { - out.writeByte((byte) 1); - out.writeString((String) field); - } else if (type == Integer.class) { - out.writeByte((byte) 2); - out.writeInt((Integer) field); - } else if (type == Long.class) { - out.writeByte((byte) 3); - out.writeLong((Long) field); - } else if (type == Float.class) { - out.writeByte((byte) 4); - out.writeFloat((Float) field); - } else if (type == Double.class) { - out.writeByte((byte) 5); - out.writeDouble((Double) field); - } else if (type == Byte.class) { - out.writeByte((byte) 6); - out.writeByte((Byte) field); - } else if (type == Short.class) { - out.writeByte((byte) 7); - out.writeShort((Short) field); - } else if (type == Boolean.class) { - out.writeByte((byte) 8); - out.writeBoolean((Boolean) field); - } else if (type == BytesRef.class) { - out.writeByte((byte) 9); - out.writeBytesRef((BytesRef) field); - } else { - throw new IOException("Can't handle sort field value of type [" + type + "]"); - } - } + writeSortValue(out, field); } out.writeVInt(fieldDoc.doc); out.writeFloat(fieldDoc.score); @@ -487,10 +526,53 @@ public class Lucene { return SortField.Type.values()[in.readVInt()]; } + public static SortField readSortField(StreamInput in) throws IOException { + String field = null; + if (in.readBoolean()) { + field = in.readString(); + } + SortField.Type sortType = readSortType(in); + Object missingValue = readMissingValue(in); + boolean reverse = in.readBoolean(); + SortField sortField = new SortField(field, sortType, reverse); + if (missingValue != null) { + sortField.setMissingValue(missingValue); + } + return sortField; + } + public static void writeSortType(StreamOutput out, SortField.Type sortType) throws IOException { out.writeVInt(sortType.ordinal()); } + public static void writeSortField(StreamOutput out, SortField sortField) throws IOException { + if (sortField.getClass() == GEO_DISTANCE_SORT_TYPE_CLASS) { + // for geo sorting, we replace the SortField with a SortField that assumes a double field. + // this works since the SortField is only used for merging top docs + SortField newSortField = new SortField(sortField.getField(), SortField.Type.DOUBLE); + newSortField.setMissingValue(sortField.getMissingValue()); + sortField = newSortField; + } + if (sortField.getClass() != SortField.class) { + throw new IllegalArgumentException("Cannot serialize SortField impl [" + sortField + "]"); + } + if (sortField.getField() == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeString(sortField.getField()); + } + if (sortField.getComparatorSource() != null) { + IndexFieldData.XFieldComparatorSource comparatorSource = (IndexFieldData.XFieldComparatorSource) sortField.getComparatorSource(); + writeSortType(out, comparatorSource.reducedType()); + writeMissingValue(out, comparatorSource.missingValue(sortField.getReverse())); + } else { + writeSortType(out, sortField.getType()); + writeMissingValue(out, sortField.getMissingValue()); + } + out.writeBoolean(sortField.getReverse()); + } + public static Explanation readExplanation(StreamInput in) throws IOException { boolean match = in.readBoolean(); String description = in.readString(); diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 7ec78e84699..9279451d470 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -350,7 +350,7 @@ public class Node implements Closeable { IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); modules.add(indicesModule); - SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class)); + SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class), client); CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(), settingsModule.getClusterSettings()); resourcesToClose.add(circuitBreakerService); diff --git a/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 571584caaef..d981f4e68df 100644 --- a/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -52,6 +52,7 @@ import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.search.aggregations.SearchContextAggregations; +import org.elasticsearch.search.collapse.CollapseContext; import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.fetch.FetchSearchResult; @@ -113,6 +114,7 @@ final class DefaultSearchContext extends SearchContext { private Float minimumScore; private boolean trackScores = false; // when sorting, track scores as well... private FieldDoc searchAfter; + private CollapseContext collapse; private boolean lowLevelCancellation; // filter for sliced scroll private SliceBuilder sliceBuilder; @@ -581,6 +583,17 @@ final class DefaultSearchContext extends SearchContext { return searchAfter; } + @Override + public SearchContext collapse(CollapseContext collapse) { + this.collapse = collapse; + return this; + } + + @Override + public CollapseContext collapse() { + return collapse; + } + public SearchContext sliceBuilder(SliceBuilder sliceBuilder) { this.sliceBuilder = sliceBuilder; return this; diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index 550729d0042..5c4c246e129 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -22,6 +22,7 @@ package org.elasticsearch.search; import org.apache.lucene.search.BooleanQuery; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.common.NamedRegistry; import org.elasticsearch.common.geo.ShapesAvailability; import org.elasticsearch.common.geo.builders.ShapeBuilders; @@ -222,6 +223,7 @@ import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel; import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator; +import org.elasticsearch.search.collapse.ExpandCollapseSearchResponseListener; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.fetch.FetchSubPhase; import org.elasticsearch.search.fetch.subphase.DocValueFieldsFetchSubPhase; @@ -256,7 +258,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -285,6 +286,10 @@ public class SearchModule { private final List namedXContents = new ArrayList<>(); public SearchModule(Settings settings, boolean transportClient, List plugins) { + this(settings, transportClient, plugins, null); + } + + public SearchModule(Settings settings, boolean transportClient, List plugins, Client client) { this.settings = settings; this.transportClient = transportClient; registerSuggesters(plugins); @@ -301,7 +306,7 @@ public class SearchModule { registerFetchSubPhases(plugins); registerSearchExts(plugins); if (false == transportClient) { - registerSearchResponseListeners(plugins); + registerSearchResponseListeners(client, plugins); } registerShapes(); } @@ -694,7 +699,10 @@ public class SearchModule { registerFromPlugin(plugins, p -> p.getFetchSubPhases(context), this::registerFetchSubPhase); } - private void registerSearchResponseListeners(List plugins) { + private void registerSearchResponseListeners(Client client, List plugins) { + if (client != null) { + registerSearchResponseListener(new ExpandCollapseSearchResponseListener(client)); + } registerFromPlugin(plugins, p -> p.getSearchResponseListeners(), this::registerSearchResponseListener); } diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index e228c055430..3f2fd3b31a4 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.search.collapse.CollapseContext; import org.elasticsearch.index.query.InnerHitBuilder; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.shard.IndexEventListener; @@ -792,6 +793,11 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } context.storedFieldsContext(source.storedFields()); } + + if (source.collapse() != null) { + final CollapseContext collapseContext = source.collapse().build(context); + context.collapse(collapseContext); + } } /** diff --git a/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index c1d9cd115af..857c6b25a1e 100644 --- a/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.builder; +import org.elasticsearch.Version; import org.elasticsearch.action.support.ToXContentToBytes; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; @@ -37,6 +38,7 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryParseContext; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.script.Script; +import org.elasticsearch.search.collapse.CollapseBuilder; import org.elasticsearch.search.SearchExtBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -99,6 +101,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ public static final ParseField EXT_FIELD = new ParseField("ext"); public static final ParseField PROFILE_FIELD = new ParseField("profile"); public static final ParseField SEARCH_AFTER = new ParseField("search_after"); + public static final ParseField COLLAPSE = new ParseField("collapse"); public static final ParseField SLICE = new ParseField("slice"); public static final ParseField ALL_FIELDS_FIELDS = new ParseField("all_fields"); @@ -168,6 +171,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ private boolean profile = false; + private CollapseBuilder collapse = null; /** * Constructs a new search source builder. @@ -216,6 +220,9 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ profile = in.readBoolean(); searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new); sliceBuilder = in.readOptionalWriteable(SliceBuilder::new); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + collapse = in.readOptionalWriteable(CollapseBuilder::new); + } } @Override @@ -264,6 +271,9 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ out.writeBoolean(profile); out.writeOptionalWriteable(searchAfterBuilder); out.writeOptionalWriteable(sliceBuilder); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeOptionalWriteable(collapse); + } } /** @@ -513,6 +523,16 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ return sliceBuilder; } + + public CollapseBuilder collapse() { + return collapse; + } + + public SearchSourceBuilder collapse(CollapseBuilder collapse) { + this.collapse = collapse; + return this; + } + /** * Add an aggregation to perform as part of the search. */ @@ -873,7 +893,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ } /** - * Create a shallow copy of this source replaced {@link #queryBuilder}, {@link #postQueryBuilder}, and {@linkplain slice}. Used by + * Create a shallow copy of this source replaced {@link #queryBuilder}, {@link #postQueryBuilder}, and {@link #sliceBuilder}. Used by * {@link #rewrite(QueryShardContext)} and {@link #copyWithNewSlice(SliceBuilder)}. */ private SearchSourceBuilder shallowCopy(QueryBuilder queryBuilder, QueryBuilder postQueryBuilder, SliceBuilder slice) { @@ -903,6 +923,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ rewrittenBuilder.timeout = timeout; rewrittenBuilder.trackScores = trackScores; rewrittenBuilder.version = version; + rewrittenBuilder.collapse = collapse; return rewrittenBuilder; } @@ -1011,6 +1032,8 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ } } else if (SLICE.match(currentFieldName)) { sliceBuilder = SliceBuilder.fromXContent(context); + } else if (COLLAPSE.match(currentFieldName)) { + collapse = CollapseBuilder.fromXContent(context); } else { throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].", parser.getTokenLocation()); @@ -1202,6 +1225,10 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ } builder.endObject(); } + + if (collapse != null) { + builder.field(COLLAPSE.getPreferredName(), collapse); + } } public static class IndexBoost implements Writeable, ToXContent { @@ -1406,7 +1433,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ return Objects.hash(aggregations, explain, fetchSourceContext, docValueFields, storedFieldsContext, from, highlightBuilder, indexBoosts, minScore, postQueryBuilder, queryBuilder, rescoreBuilders, scriptFields, size, sorts, searchAfterBuilder, sliceBuilder, stats, suggestBuilder, terminateAfter, timeout, trackScores, version, - profile, extBuilders); + profile, extBuilders, collapse); } @Override @@ -1442,6 +1469,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ && Objects.equals(trackScores, other.trackScores) && Objects.equals(version, other.version) && Objects.equals(profile, other.profile) - && Objects.equals(extBuilders, other.extBuilders); + && Objects.equals(extBuilders, other.extBuilders) + && Objects.equals(collapse, other.collapse); } } diff --git a/core/src/main/java/org/elasticsearch/search/collapse/CollapseBuilder.java b/core/src/main/java/org/elasticsearch/search/collapse/CollapseBuilder.java new file mode 100644 index 00000000000..6a292104c05 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/collapse/CollapseBuilder.java @@ -0,0 +1,179 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.collapse; + +import org.apache.lucene.index.IndexOptions; +import org.elasticsearch.action.support.ToXContentToBytes; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.mapper.KeywordFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.index.query.InnerHitBuilder; +import org.elasticsearch.index.query.QueryParseContext; +import org.elasticsearch.search.SearchContextException; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.sort.SortMode; + +import java.io.IOException; +import java.util.Objects; + +/** + * A builder that enables field collapsing on search request. + */ +public class CollapseBuilder extends ToXContentToBytes implements Writeable { + public static final ParseField FIELD_FIELD = new ParseField("field"); + public static final ParseField INNER_HITS_FIELD = new ParseField("inner_hits"); + private static final ObjectParser PARSER = + new ObjectParser<>("collapse", CollapseBuilder::new); + + static { + PARSER.declareString(CollapseBuilder::setField, FIELD_FIELD); + PARSER.declareObject(CollapseBuilder::setInnerHits, + (p, c) -> InnerHitBuilder.fromXContent(c), INNER_HITS_FIELD); + } + + private String field; + private SortMode multiValueMode; + private InnerHitBuilder innerHit; + + private CollapseBuilder() {} + + /** + * Public constructor + * @param field The name of the field to collapse on + */ + public CollapseBuilder(String field) { + Objects.requireNonNull(field, "field must be non-null"); + this.field = field; + } + + public CollapseBuilder(StreamInput in) throws IOException { + this.field = in.readString(); + this.multiValueMode = in.readOptionalWriteable(SortMode::readFromStream); + this.innerHit = in.readOptionalWriteable(InnerHitBuilder::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(field); + out.writeOptionalWriteable(multiValueMode); + out.writeOptionalWriteable(innerHit); + } + + public static CollapseBuilder fromXContent(QueryParseContext context) throws IOException { + CollapseBuilder builder = PARSER.parse(context.parser(), new CollapseBuilder(), context); + return builder; + } + + private CollapseBuilder setField(String field) { + if (Strings.isEmpty(field)) { + throw new IllegalArgumentException("field name is null or empty"); + } + this.field = field; + return this; + } + + public CollapseBuilder setInnerHits(InnerHitBuilder innerHit) { + this.innerHit = innerHit; + return this; + } + + /** + * The name of the field to collapse against + */ + public String getField() { + return this.field; + } + + /** + * The inner hit options to expand the collapsed results + */ + public InnerHitBuilder getInnerHit() { + return this.innerHit; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + innerToXContent(builder); + builder.endObject(); + return builder; + } + + private void innerToXContent(XContentBuilder builder) throws IOException { + builder.field(FIELD_FIELD.getPreferredName(), field); + if (innerHit != null) { + builder.field(INNER_HITS_FIELD.getPreferredName(), innerHit); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CollapseBuilder that = (CollapseBuilder) o; + + if (field != null ? !field.equals(that.field) : that.field != null) return false; + return innerHit != null ? innerHit.equals(that.innerHit) : that.innerHit == null; + } + + @Override + public int hashCode() { + return Objects.hash(this.field, this.multiValueMode, this.innerHit); + } + + public CollapseContext build(SearchContext context) { + if (context.scrollContext() != null) { + throw new SearchContextException(context, "cannot use `collapse` in a scroll context"); + } + if (context.searchAfter() != null) { + throw new SearchContextException(context, "cannot use `collapse` in conjunction with `search_after`"); + } + if (context.rescore() != null && context.rescore().isEmpty() == false) { + throw new SearchContextException(context, "cannot use `collapse` in conjunction with `rescore`"); + } + + MappedFieldType fieldType = context.getQueryShardContext().fieldMapper(field); + if (fieldType == null) { + throw new SearchContextException(context, "no mapping found for `" + field + "` in order to collapse on"); + } + if (fieldType instanceof KeywordFieldMapper.KeywordFieldType == false && + fieldType instanceof NumberFieldMapper.NumberFieldType == false) { + throw new SearchContextException(context, "unknown type for collapse field `" + field + + "`, only keywords and numbers are accepted"); + } + + if (fieldType.hasDocValues() == false) { + throw new SearchContextException(context, "cannot collapse on field `" + field + "` without `doc_values`"); + } + if (fieldType.indexOptions() == IndexOptions.NONE && innerHit != null) { + throw new SearchContextException(context, "cannot expand `inner_hits` for collapse field `" + + field + "`, " + "only indexed field can retrieve `inner_hits`"); + } + return new CollapseContext(fieldType, innerHit); + } +} diff --git a/core/src/main/java/org/elasticsearch/search/collapse/CollapseContext.java b/core/src/main/java/org/elasticsearch/search/collapse/CollapseContext.java new file mode 100644 index 00000000000..d0ea2154ab3 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/collapse/CollapseContext.java @@ -0,0 +1,63 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.collapse; + +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.grouping.CollapsingTopDocsCollector; +import org.elasticsearch.index.mapper.KeywordFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.index.query.InnerHitBuilder; + +import java.io.IOException; + +/** + * Context used for field collapsing + */ +public class CollapseContext { + private final MappedFieldType fieldType; + private final InnerHitBuilder innerHit; + + public CollapseContext(MappedFieldType fieldType, InnerHitBuilder innerHit) { + this.fieldType = fieldType; + this.innerHit = innerHit; + } + + /** The field type used for collapsing **/ + public MappedFieldType getFieldType() { + return fieldType; + } + + + /** The inner hit options to expand the collapsed results **/ + public InnerHitBuilder getInnerHit() { + return innerHit; + } + + public CollapsingTopDocsCollector createTopDocs(Sort sort, int topN, boolean trackMaxScore) throws IOException { + if (fieldType instanceof KeywordFieldMapper.KeywordFieldType) { + return CollapsingTopDocsCollector.createKeyword(fieldType.name(), sort, topN, trackMaxScore); + } else if (fieldType instanceof NumberFieldMapper.NumberFieldType) { + return CollapsingTopDocsCollector.createNumeric(fieldType.name(), sort, topN, trackMaxScore); + } else { + throw new IllegalStateException("unknown type for collapse field " + fieldType.name() + + ", only keywords and numbers are accepted"); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/search/collapse/ExpandCollapseSearchResponseListener.java b/core/src/main/java/org/elasticsearch/search/collapse/ExpandCollapseSearchResponseListener.java new file mode 100644 index 00000000000..9a45b7a9fec --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/collapse/ExpandCollapseSearchResponseListener.java @@ -0,0 +1,119 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.collapse; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.InnerHitBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.internal.InternalSearchHit; + +import java.util.HashMap; +import java.util.Objects; +import java.util.function.BiConsumer; + +/** + * A search response listener that intercepts the search response and expands collapsed hits + * using the {@link CollapseBuilder#innerHit} options. + */ +public class ExpandCollapseSearchResponseListener implements BiConsumer { + private final Client client; + + public ExpandCollapseSearchResponseListener(Client client) { + this.client = Objects.requireNonNull(client); + } + + @Override + public void accept(SearchRequest searchRequest, SearchResponse searchResponse) { + if (searchRequest.source() == null) { + return ; + } + CollapseBuilder collapseBuilder = searchRequest.source().collapse(); + if (collapseBuilder == null || collapseBuilder.getInnerHit() == null) { + return ; + } + for (SearchHit hit : searchResponse.getHits()) { + InternalSearchHit internalHit = (InternalSearchHit) hit; + BoolQueryBuilder groupQuery = new BoolQueryBuilder(); + Object collapseValue = internalHit.field(collapseBuilder.getField()).getValue(); + if (collapseValue != null) { + groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue)); + } else { + groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField())); + } + QueryBuilder origQuery = searchRequest.source().query(); + if (origQuery != null) { + groupQuery.must(origQuery); + } + SearchSourceBuilder sourceBuilder = createGroupSearchBuilder(collapseBuilder.getInnerHit()) + .query(groupQuery); + SearchRequest groupRequest = new SearchRequest(searchRequest.indices()) + .types(searchRequest.types()) + .source(sourceBuilder); + SearchResponse groupResponse = client.search(groupRequest).actionGet(); + SearchHits innerHits = groupResponse.getHits(); + if (internalHit.getInnerHits() == null) { + internalHit.setInnerHits(new HashMap<>(1)); + } + internalHit.getInnerHits().put(collapseBuilder.getInnerHit().getName(), innerHits); + } + } + + private SearchSourceBuilder createGroupSearchBuilder(InnerHitBuilder options) { + SearchSourceBuilder groupSource = new SearchSourceBuilder(); + groupSource.from(options.getFrom()); + groupSource.size(options.getSize()); + if (options.getSorts() != null) { + options.getSorts().forEach(groupSource::sort); + } + if (options.getFetchSourceContext() != null) { + if (options.getFetchSourceContext().includes() == null && options.getFetchSourceContext().excludes() == null) { + groupSource.fetchSource(options.getFetchSourceContext().fetchSource()); + } else { + groupSource.fetchSource(options.getFetchSourceContext().includes(), + options.getFetchSourceContext().excludes()); + } + } + if (options.getDocValueFields() != null) { + options.getDocValueFields().forEach(groupSource::docValueField); + } + if (options.getStoredFieldsContext() != null && options.getStoredFieldsContext().fieldNames() != null) { + options.getStoredFieldsContext().fieldNames().forEach(groupSource::storedField); + } + if (options.getScriptFields() != null) { + for (SearchSourceBuilder.ScriptField field : options.getScriptFields()) { + groupSource.scriptField(field.fieldName(), field.script()); + } + } + if (options.getHighlightBuilder() != null) { + groupSource.highlighter(options.getHighlightBuilder()); + } + groupSource.explain(options.isExplain()); + groupSource.trackScores(options.isTrackScores()); + return groupSource; + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/fetch/subphase/DocValueFieldsFetchSubPhase.java b/core/src/main/java/org/elasticsearch/search/fetch/subphase/DocValueFieldsFetchSubPhase.java index befce94a9e4..9444f38d80e 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/subphase/DocValueFieldsFetchSubPhase.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/subphase/DocValueFieldsFetchSubPhase.java @@ -27,6 +27,7 @@ import org.elasticsearch.search.internal.InternalSearchHitField; import org.elasticsearch.search.internal.SearchContext; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; /** @@ -38,6 +39,15 @@ public final class DocValueFieldsFetchSubPhase implements FetchSubPhase { @Override public void hitExecute(SearchContext context, HitContext hitContext) { + if (context.collapse() != null) { + // retrieve the `doc_value` associated with the collapse field + String name = context.collapse().getFieldType().name(); + if (context.docValueFieldsContext() == null) { + context.docValueFieldsContext(new DocValueFieldsContext(Collections.singletonList(name))); + } else if (context.docValueFieldsContext().fields().contains(name) == false) { + context.docValueFieldsContext().fields().add(name); + } + } if (context.docValueFieldsContext() == null) { return; } diff --git a/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java index fb1e2132dee..adc528728e0 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.search.SearchExtBuilder; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.SearchContextAggregations; +import org.elasticsearch.search.collapse.CollapseContext; import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.fetch.FetchSearchResult; @@ -528,4 +529,14 @@ public abstract class FilteredSearchContext extends SearchContext { public boolean isCancelled() { return in.isCancelled(); } + + @Override + public SearchContext collapse(CollapseContext collapse) { + return in.collapse(collapse); + } + + @Override + public CollapseContext collapse() { + return in.collapse(); + } } diff --git a/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java index 697226f5a8b..4201dc0180b 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ObjectMapper; +import org.elasticsearch.search.collapse.CollapseContext; import org.elasticsearch.index.query.ParsedQuery; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.shard.IndexShard; @@ -241,6 +242,10 @@ public abstract class SearchContext extends AbstractRefCounted implements Releas public abstract FieldDoc searchAfter(); + public abstract SearchContext collapse(CollapseContext collapse); + + public abstract CollapseContext collapse(); + public abstract SearchContext parsedPostFilter(ParsedQuery postFilter); public abstract ParsedQuery parsedPostFilter(); diff --git a/core/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java index 26d47a44f80..ebdcd2d72b7 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java @@ -23,6 +23,7 @@ import org.apache.lucene.util.Counter; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.ParsedQuery; import org.elasticsearch.search.aggregations.SearchContextAggregations; +import org.elasticsearch.search.collapse.CollapseContext; import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.StoredFieldsContext; import org.elasticsearch.search.fetch.subphase.DocValueFieldsContext; @@ -316,6 +317,11 @@ public class SubSearchContext extends FilteredSearchContext { return this; } + @Override + public CollapseContext collapse() { + return null; + } + @Override public void accessed(long accessTime) { throw new UnsupportedOperationException("Not supported"); diff --git a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 5579e55826e..4a67b11a8d2 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -41,11 +41,13 @@ import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.search.TopScoreDocCollector; import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.search.Weight; +import org.apache.lucene.search.grouping.CollapsingTopDocsCollector; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.MinimumScoreCollector; import org.elasticsearch.common.lucene.search.FilteredCollector; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.collapse.CollapseContext; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchPhase; import org.elasticsearch.search.SearchService; @@ -173,7 +175,7 @@ public class QueryPhase implements SearchPhase { // Perhaps have a dedicated scroll phase? final ScrollContext scrollContext = searchContext.scrollContext(); assert (scrollContext != null) == (searchContext.request().scroll() != null); - final TopDocsCollector topDocsCollector; + final Collector topDocsCollector; ScoreDoc after = null; if (searchContext.request().scroll() != null) { numDocs = Math.min(searchContext.size(), totalNumDocs); @@ -206,17 +208,31 @@ public class QueryPhase implements SearchPhase { numDocs = 1; } assert numDocs > 0; - if (searchContext.sort() != null) { - SortAndFormats sf = searchContext.sort(); - topDocsCollector = TopFieldCollector.create(sf.sort, numDocs, + if (searchContext.collapse() == null) { + if (searchContext.sort() != null) { + SortAndFormats sf = searchContext.sort(); + topDocsCollector = TopFieldCollector.create(sf.sort, numDocs, (FieldDoc) after, true, searchContext.trackScores(), searchContext.trackScores()); - sortValueFormats = sf.formats; - } else { - rescore = !searchContext.rescore().isEmpty(); - for (RescoreSearchContext rescoreContext : searchContext.rescore()) { - numDocs = Math.max(rescoreContext.window(), numDocs); + sortValueFormats = sf.formats; + } else { + rescore = !searchContext.rescore().isEmpty(); + for (RescoreSearchContext rescoreContext : searchContext.rescore()) { + numDocs = Math.max(rescoreContext.window(), numDocs); + } + topDocsCollector = TopScoreDocCollector.create(numDocs, after); + } + } else { + Sort sort = Sort.RELEVANCE; + if (searchContext.sort() != null) { + sort = searchContext.sort().sort; + } + CollapseContext collapse = searchContext.collapse(); + topDocsCollector = collapse.createTopDocs(sort, numDocs, searchContext.trackScores()); + if (searchContext.sort() == null) { + sortValueFormats = new DocValueFormat[] {DocValueFormat.RAW}; + } else { + sortValueFormats = searchContext.sort().formats; } - topDocsCollector = TopScoreDocCollector.create(numDocs, after); } collector = topDocsCollector; if (doProfile) { @@ -225,7 +241,14 @@ public class QueryPhase implements SearchPhase { topDocsCallable = new Callable() { @Override public TopDocs call() throws Exception { - TopDocs topDocs = topDocsCollector.topDocs(); + final TopDocs topDocs; + if (topDocsCollector instanceof TopDocsCollector) { + topDocs = ((TopDocsCollector) topDocsCollector).topDocs(); + } else if (topDocsCollector instanceof CollapsingTopDocsCollector) { + topDocs = ((CollapsingTopDocsCollector) topDocsCollector).getTopDocs(); + } else { + throw new IllegalStateException("Unknown top docs collector " + topDocsCollector.getClass().getName()); + } if (scrollContext != null) { if (scrollContext.totalHits == -1) { // first round diff --git a/core/src/test/java/org/apache/lucene/grouping/CollapsingTopDocsCollectorTests.java b/core/src/test/java/org/apache/lucene/grouping/CollapsingTopDocsCollectorTests.java new file mode 100644 index 00000000000..926510ef120 --- /dev/null +++ b/core/src/test/java/org/apache/lucene/grouping/CollapsingTopDocsCollectorTests.java @@ -0,0 +1,426 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lucene.grouping; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.index.CompositeReaderContext; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexReaderContext; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.search.SortedSetSortField; +import org.apache.lucene.search.TopFieldCollector; +import org.apache.lucene.search.TopFieldDocs; +import org.apache.lucene.search.Weight; +import org.apache.lucene.search.grouping.CollapseTopFieldDocs; +import org.apache.lucene.search.grouping.CollapsingTopDocsCollector; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.NumericUtils; +import org.apache.lucene.util.TestUtil; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class CollapsingTopDocsCollectorTests extends ESTestCase { + private static class SegmentSearcher extends IndexSearcher { + private final List ctx; + + public SegmentSearcher(LeafReaderContext ctx, IndexReaderContext parent) { + super(parent); + this.ctx = Collections.singletonList(ctx); + } + + public void search(Weight weight, Collector collector) throws IOException { + search(ctx, weight, collector); + } + + @Override + public String toString() { + return "ShardSearcher(" + ctx.get(0) + ")"; + } + } + + interface CollapsingDocValuesProducer { + T randomGroup(int maxGroup); + + void add(Document doc, T value, boolean multivalued); + + SortField sortField(boolean multivalued); + } + + void assertSearchCollapse(CollapsingDocValuesProducer dvProducers, boolean numeric) throws IOException { + assertSearchCollapse(dvProducers, numeric, true); + assertSearchCollapse(dvProducers, numeric, false); + } + + private void assertSearchCollapse(CollapsingDocValuesProducer dvProducers, + boolean numeric, boolean multivalued) throws IOException { + final int numDocs = randomIntBetween(1000, 2000); + int maxGroup = randomIntBetween(2, 500); + final Directory dir = newDirectory(); + final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + Set values = new HashSet<>(); + int totalHits = 0; + for (int i = 0; i < numDocs; i++) { + final T value = dvProducers.randomGroup(maxGroup); + values.add(value); + Document doc = new Document(); + dvProducers.add(doc, value, multivalued); + doc.add(new NumericDocValuesField("sort1", randomIntBetween(0, 10))); + doc.add(new NumericDocValuesField("sort2", randomLong())); + w.addDocument(doc); + totalHits++; + } + List valueList = new ArrayList<>(values); + Collections.sort(valueList); + final IndexReader reader = w.getReader(); + final IndexSearcher searcher = newSearcher(reader); + final SortField collapseField = dvProducers.sortField(multivalued); + final SortField sort1 = new SortField("sort1", SortField.Type.INT); + final SortField sort2 = new SortField("sort2", SortField.Type.LONG); + Sort sort = new Sort(sort1, sort2, collapseField); + + int expectedNumGroups = values.size(); + + final CollapsingTopDocsCollector collapsingCollector; + if (numeric) { + collapsingCollector = + CollapsingTopDocsCollector.createNumeric(collapseField.getField(), sort, expectedNumGroups, false); + } else { + collapsingCollector = + CollapsingTopDocsCollector.createKeyword(collapseField.getField(), sort, expectedNumGroups, false); + } + + TopFieldCollector topFieldCollector = + TopFieldCollector.create(sort, totalHits, true, false, false); + + searcher.search(new MatchAllDocsQuery(), collapsingCollector); + searcher.search(new MatchAllDocsQuery(), topFieldCollector); + CollapseTopFieldDocs collapseTopFieldDocs = collapsingCollector.getTopDocs(); + TopFieldDocs topDocs = topFieldCollector.topDocs(); + assertEquals(collapseField.getField(), collapseTopFieldDocs.field); + assertEquals(expectedNumGroups, collapseTopFieldDocs.scoreDocs.length); + assertEquals(totalHits, collapseTopFieldDocs.totalHits); + assertEquals(totalHits, topDocs.scoreDocs.length); + assertEquals(totalHits, topDocs.totalHits); + + Set seen = new HashSet<>(); + // collapse field is the last sort + int collapseIndex = sort.getSort().length - 1; + int topDocsIndex = 0; + for (int i = 0; i < expectedNumGroups; i++) { + FieldDoc fieldDoc = null; + for (; topDocsIndex < totalHits; topDocsIndex++) { + fieldDoc = (FieldDoc) topDocs.scoreDocs[topDocsIndex]; + if (seen.contains(fieldDoc.fields[collapseIndex]) == false) { + break; + } + } + FieldDoc collapseFieldDoc = (FieldDoc) collapseTopFieldDocs.scoreDocs[i]; + assertNotNull(fieldDoc); + assertEquals(collapseFieldDoc.doc, fieldDoc.doc); + assertArrayEquals(collapseFieldDoc.fields, fieldDoc.fields); + seen.add(fieldDoc.fields[fieldDoc.fields.length - 1]); + } + for (; topDocsIndex < totalHits; topDocsIndex++) { + FieldDoc fieldDoc = (FieldDoc) topDocs.scoreDocs[topDocsIndex]; + assertTrue(seen.contains(fieldDoc.fields[collapseIndex])); + } + + + // check merge + final IndexReaderContext ctx = searcher.getTopReaderContext(); + final SegmentSearcher[] subSearchers; + final int[] docStarts; + + if (ctx instanceof LeafReaderContext) { + subSearchers = new SegmentSearcher[1]; + docStarts = new int[1]; + subSearchers[0] = new SegmentSearcher((LeafReaderContext) ctx, ctx); + docStarts[0] = 0; + } else { + final CompositeReaderContext compCTX = (CompositeReaderContext) ctx; + final int size = compCTX.leaves().size(); + subSearchers = new SegmentSearcher[size]; + docStarts = new int[size]; + int docBase = 0; + for (int searcherIDX = 0; searcherIDX < subSearchers.length; searcherIDX++) { + final LeafReaderContext leave = compCTX.leaves().get(searcherIDX); + subSearchers[searcherIDX] = new SegmentSearcher(leave, compCTX); + docStarts[searcherIDX] = docBase; + docBase += leave.reader().maxDoc(); + } + } + + final CollapseTopFieldDocs[] shardHits = new CollapseTopFieldDocs[subSearchers.length]; + final Weight weight = searcher.createNormalizedWeight(new MatchAllDocsQuery(), false); + for (int shardIDX = 0; shardIDX < subSearchers.length; shardIDX++) { + final SegmentSearcher subSearcher = subSearchers[shardIDX]; + final CollapsingTopDocsCollector c; + if (numeric) { + c = CollapsingTopDocsCollector.createNumeric(collapseField.getField(), sort, expectedNumGroups, false); + } else { + c = CollapsingTopDocsCollector.createKeyword(collapseField.getField(), sort, expectedNumGroups, false); + } + subSearcher.search(weight, c); + shardHits[shardIDX] = c.getTopDocs(); + } + CollapseTopFieldDocs mergedFieldDocs = CollapseTopFieldDocs.merge(sort, 0, expectedNumGroups, shardHits); + assertTopDocsEquals(mergedFieldDocs, collapseTopFieldDocs); + w.close(); + reader.close(); + dir.close(); + } + + private static void assertTopDocsEquals(CollapseTopFieldDocs topDocs1, CollapseTopFieldDocs topDocs2) { + TestUtil.assertEquals(topDocs1, topDocs2); + assertArrayEquals(topDocs1.collapseValues, topDocs2.collapseValues); + } + + public void testCollapseLong() throws Exception { + CollapsingDocValuesProducer producer = new CollapsingDocValuesProducer() { + @Override + public Long randomGroup(int maxGroup) { + return randomNonNegativeLong() % maxGroup; + } + + @Override + public void add(Document doc, Long value, boolean multivalued) { + if (multivalued) { + doc.add(new SortedNumericDocValuesField("field", value)); + } else { + doc.add(new NumericDocValuesField("field", value)); + } + } + + @Override + public SortField sortField(boolean multivalued) { + if (multivalued) { + return new SortedNumericSortField("field", SortField.Type.LONG); + } else { + return new SortField("field", SortField.Type.LONG); + } + } + }; + assertSearchCollapse(producer, true); + } + + public void testCollapseInt() throws Exception { + CollapsingDocValuesProducer producer = new CollapsingDocValuesProducer() { + @Override + public Integer randomGroup(int maxGroup) { + return randomIntBetween(0, maxGroup - 1); + } + + @Override + public void add(Document doc, Integer value, boolean multivalued) { + if (multivalued) { + doc.add(new SortedNumericDocValuesField("field", value)); + } else { + doc.add(new NumericDocValuesField("field", value)); + } + } + + @Override + public SortField sortField(boolean multivalued) { + if (multivalued) { + return new SortedNumericSortField("field", SortField.Type.INT); + } else { + return new SortField("field", SortField.Type.INT); + } + } + }; + assertSearchCollapse(producer, true); + } + + public void testCollapseFloat() throws Exception { + CollapsingDocValuesProducer producer = new CollapsingDocValuesProducer() { + @Override + public Float randomGroup(int maxGroup) { + return new Float(randomIntBetween(0, maxGroup - 1)); + } + + @Override + public void add(Document doc, Float value, boolean multivalued) { + if (multivalued) { + doc.add(new SortedNumericDocValuesField("field", NumericUtils.floatToSortableInt(value))); + } else { + doc.add(new NumericDocValuesField("field", Float.floatToIntBits(value))); + } + } + + @Override + public SortField sortField(boolean multivalued) { + if (multivalued) { + return new SortedNumericSortField("field", SortField.Type.FLOAT); + } else { + return new SortField("field", SortField.Type.FLOAT); + } + } + }; + assertSearchCollapse(producer, true); + } + + public void testCollapseDouble() throws Exception { + CollapsingDocValuesProducer producer = new CollapsingDocValuesProducer() { + @Override + public Double randomGroup(int maxGroup) { + return new Double(randomIntBetween(0, maxGroup - 1)); + } + + @Override + public void add(Document doc, Double value, boolean multivalued) { + if (multivalued) { + doc.add(new SortedNumericDocValuesField("field", NumericUtils.doubleToSortableLong(value))); + } else { + doc.add(new NumericDocValuesField("field", Double.doubleToLongBits(value))); + } + } + + @Override + public SortField sortField(boolean multivalued) { + if (multivalued) { + return new SortedNumericSortField("field", SortField.Type.DOUBLE); + } else { + return new SortField("field", SortField.Type.DOUBLE); + } + } + }; + assertSearchCollapse(producer, true); + } + + public void testCollapseString() throws Exception { + CollapsingDocValuesProducer producer = new CollapsingDocValuesProducer() { + @Override + public BytesRef randomGroup(int maxGroup) { + return new BytesRef(Integer.toString(randomIntBetween(0, maxGroup - 1))); + } + + @Override + public void add(Document doc, BytesRef value, boolean multivalued) { + if (multivalued) { + doc.add(new SortedSetDocValuesField("field", value)); + } else { + doc.add(new SortedDocValuesField("field", value)); + } + } + + @Override + public SortField sortField(boolean multivalued) { + if (multivalued) { + return new SortedSetSortField("field", false); + } else { + return new SortField("field", SortField.Type.STRING_VAL); + } + } + }; + assertSearchCollapse(producer, false); + } + + public void testEmptyNumericSegment() throws Exception { + final Directory dir = newDirectory(); + final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + Document doc = new Document(); + doc.add(new NumericDocValuesField("group", 0)); + w.addDocument(doc); + doc.clear(); + doc.add(new NumericDocValuesField("group", 1)); + w.addDocument(doc); + w.commit(); + doc.clear(); + doc.add(new NumericDocValuesField("group", 10)); + w.addDocument(doc); + w.commit(); + doc.clear(); + doc.add(new NumericDocValuesField("category", 0)); + w.addDocument(doc); + w.commit(); + final IndexReader reader = w.getReader(); + final IndexSearcher searcher = newSearcher(reader); + SortField sortField = new SortField("group", SortField.Type.LONG); + sortField.setMissingValue(Long.MAX_VALUE); + Sort sort = new Sort(sortField); + final CollapsingTopDocsCollector collapsingCollector = + CollapsingTopDocsCollector.createNumeric("group", sort, 10, false); + searcher.search(new MatchAllDocsQuery(), collapsingCollector); + CollapseTopFieldDocs collapseTopFieldDocs = collapsingCollector.getTopDocs(); + assertEquals(4, collapseTopFieldDocs.scoreDocs.length); + assertEquals(4, collapseTopFieldDocs.collapseValues.length); + assertEquals(0L, collapseTopFieldDocs.collapseValues[0]); + assertEquals(1L, collapseTopFieldDocs.collapseValues[1]); + assertEquals(10L, collapseTopFieldDocs.collapseValues[2]); + assertNull(collapseTopFieldDocs.collapseValues[3]); + w.close(); + reader.close(); + dir.close(); + } + + public void testEmptySortedSegment() throws Exception { + final Directory dir = newDirectory(); + final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + Document doc = new Document(); + doc.add(new SortedDocValuesField("group", new BytesRef("0"))); + w.addDocument(doc); + doc.clear(); + doc.add(new SortedDocValuesField("group", new BytesRef("1"))); + w.addDocument(doc); + w.commit(); + doc.clear(); + doc.add(new SortedDocValuesField("group", new BytesRef("10"))); + w.addDocument(doc); + w.commit(); + doc.clear(); + doc.add(new NumericDocValuesField("category", 0)); + w.addDocument(doc); + w.commit(); + final IndexReader reader = w.getReader(); + final IndexSearcher searcher = newSearcher(reader); + Sort sort = new Sort(new SortField("group", SortField.Type.STRING_VAL)); + final CollapsingTopDocsCollector collapsingCollector = + CollapsingTopDocsCollector.createKeyword("group", sort, 10, false); + searcher.search(new MatchAllDocsQuery(), collapsingCollector); + CollapseTopFieldDocs collapseTopFieldDocs = collapsingCollector.getTopDocs(); + assertEquals(4, collapseTopFieldDocs.scoreDocs.length); + assertEquals(4, collapseTopFieldDocs.collapseValues.length); + assertNull(collapseTopFieldDocs.collapseValues[0]); + assertEquals(new BytesRef("0"), collapseTopFieldDocs.collapseValues[1]); + assertEquals(new BytesRef("1"), collapseTopFieldDocs.collapseValues[2]); + assertEquals(new BytesRef("10"), collapseTopFieldDocs.collapseValues[3]); + w.close(); + reader.close(); + dir.close(); + } +} diff --git a/core/src/test/java/org/elasticsearch/search/AbstractSearchTestCase.java b/core/src/test/java/org/elasticsearch/search/AbstractSearchTestCase.java index 0163c98692e..016f4242e90 100644 --- a/core/src/test/java/org/elasticsearch/search/AbstractSearchTestCase.java +++ b/core/src/test/java/org/elasticsearch/search/AbstractSearchTestCase.java @@ -33,6 +33,7 @@ import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.collapse.CollapseBuilderTests; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilderTests; import org.elasticsearch.search.rescore.QueryRescoreBuilderTests; import org.elasticsearch.search.suggest.SuggestBuilderTests; @@ -90,7 +91,8 @@ public abstract class AbstractSearchTestCase extends ESTestCase { HighlightBuilderTests::randomHighlighterBuilder, SuggestBuilderTests::randomSuggestBuilder, QueryRescoreBuilderTests::randomRescoreBuilder, - randomExtBuilders); + randomExtBuilders, + CollapseBuilderTests::randomCollapseBuilder); } protected SearchRequest createSearchRequest() throws IOException { diff --git a/core/src/test/java/org/elasticsearch/search/collapse/CollapseBuilderTests.java b/core/src/test/java/org/elasticsearch/search/collapse/CollapseBuilderTests.java new file mode 100644 index 00000000000..955b85deb82 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/collapse/CollapseBuilderTests.java @@ -0,0 +1,179 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.collapse; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMDirectory; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.index.mapper.KeywordFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.index.query.InnerHitBuilder; +import org.elasticsearch.index.query.InnerHitBuilderTests; +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.search.SearchContextException; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; + +import static java.util.Collections.emptyList; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CollapseBuilderTests extends AbstractWireSerializingTestCase { + private static NamedWriteableRegistry namedWriteableRegistry; + private static NamedXContentRegistry xContentRegistry; + + @BeforeClass + public static void init() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList()); + namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables()); + xContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents()); + } + + @AfterClass + public static void afterClass() throws Exception { + namedWriteableRegistry = null; + xContentRegistry = null; + } + + public static CollapseBuilder randomCollapseBuilder() { + CollapseBuilder builder = new CollapseBuilder(randomAsciiOfLength(10)); + if (randomBoolean()) { + InnerHitBuilder innerHit = InnerHitBuilderTests.randomInnerHits(false, false); + builder.setInnerHits(innerHit); + } + return builder; + } + + @Override + protected Writeable createTestInstance() { + return randomCollapseBuilder(); + } + + @Override + protected Writeable.Reader instanceReader() { + return CollapseBuilder::new; + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return namedWriteableRegistry; + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + return xContentRegistry; + } + + private SearchContext mockSearchContext() { + SearchContext context = mock(SearchContext.class); + QueryShardContext shardContext = mock(QueryShardContext.class); + when(context.getQueryShardContext()).thenReturn(shardContext); + when(context.scrollContext()).thenReturn(null); + when(context.rescore()).thenReturn(null); + when(context.searchAfter()).thenReturn(null); + return context; + } + + public void testBuild() throws IOException { + Directory dir = new RAMDirectory(); + try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) { + writer.commit(); + } + SearchContext searchContext = mockSearchContext(); + try (IndexReader reader = DirectoryReader.open(dir)) { + when(searchContext.getQueryShardContext().getIndexReader()).thenReturn(reader); + MappedFieldType numberFieldType = + new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + MappedFieldType keywordFieldType = + new KeywordFieldMapper.KeywordFieldType(); + for (MappedFieldType fieldType : new MappedFieldType[] {numberFieldType, keywordFieldType}) { + fieldType.setName("field"); + fieldType.setHasDocValues(true); + when(searchContext.getQueryShardContext().fieldMapper("field")).thenReturn(fieldType); + CollapseBuilder builder = new CollapseBuilder("field"); + CollapseContext collapseContext = builder.build(searchContext); + assertEquals(collapseContext.getFieldType(), fieldType); + + fieldType.setIndexOptions(IndexOptions.NONE); + collapseContext = builder.build(searchContext); + assertEquals(collapseContext.getFieldType(), fieldType); + + fieldType.setHasDocValues(false); + SearchContextException exc = expectThrows(SearchContextException.class, () -> builder.build(searchContext)); + assertEquals(exc.getMessage(), "cannot collapse on field `field` without `doc_values`"); + + fieldType.setHasDocValues(true); + builder.setInnerHits(new InnerHitBuilder()); + exc = expectThrows(SearchContextException.class, () -> builder.build(searchContext)); + assertEquals(exc.getMessage(), + "cannot expand `inner_hits` for collapse field `field`, " + + "only indexed field can retrieve `inner_hits`"); + } + } + } + + public void testBuildWithSearchContextExceptions() throws IOException { + SearchContext context = mockSearchContext(); + { + CollapseBuilder builder = new CollapseBuilder("unknown_field"); + SearchContextException exc = expectThrows(SearchContextException.class, () -> builder.build(context)); + assertEquals(exc.getMessage(), "no mapping found for `unknown_field` in order to collapse on"); + } + + { + MappedFieldType fieldType = new MappedFieldType() { + @Override + public MappedFieldType clone() { + return null; + } + + @Override + public String typeName() { + return null; + } + + @Override + public Query termQuery(Object value, QueryShardContext context) { + return null; + } + }; + fieldType.setName("field"); + fieldType.setHasDocValues(true); + when(context.getQueryShardContext().fieldMapper("field")).thenReturn(fieldType); + CollapseBuilder builder = new CollapseBuilder("field"); + SearchContextException exc = expectThrows(SearchContextException.class, () -> builder.build(context)); + assertEquals(exc.getMessage(), "unknown type for collapse field `field`, only keywords and numbers are accepted"); + } + } +} diff --git a/docs/build.gradle b/docs/build.gradle index 4239cc62caa..98c79887b74 100644 --- a/docs/build.gradle +++ b/docs/build.gradle @@ -210,6 +210,16 @@ Closure setupTwitter = { String name, int count -> settings: number_of_shards: 1 number_of_replicas: 1 + mappings: + tweet: + properties: + user: + type: keyword + doc_values: true + date: + type: date + likes: + type: long - do: bulk: index: twitter diff --git a/docs/reference/search/request/collapse.asciidoc b/docs/reference/search/request/collapse.asciidoc new file mode 100644 index 00000000000..de5ba40ad20 --- /dev/null +++ b/docs/reference/search/request/collapse.asciidoc @@ -0,0 +1,72 @@ +[[search-request-collapse]] +== Collapse + +Allows to collapse search results based on field values. +The collapsing is done by selecting only the top sorted document per collapse key. +For instance the query below retrieves the best tweet for each user and sorts them by number of likes. + +[source,js] +-------------------------------------------------- +GET /twitter/tweet/_search +{ + "query": { + "match": { + "message": "elasticsearch" + } + }, + "collapse" : { + "field" : "user" <1> + }, + "sort": ["likes"], <2> + "from": 10 <3> +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:twitter] +<1> collapse the result set using the "user" field +<2> sort the top docs by number of likes +<3> define the offset of the first collapsed result + +WARNING: The total number of hits in the response indicates the number of matching documents without collapsing. +The total number of distinct group is unknown. + +The field used for collapsing must be a single valued < or <> field with <> activated + +NOTE: The collapsing is applied to the top hits only and does not affect aggregations. + + +=== Expand collapse results + +It is also possible to expand each collapsed top hits with the `inner_hits` option. + +[source,js] +-------------------------------------------------- +GET /twitter/tweet/_search +{ + "query": { + "match": { + "message": "elasticsearch" + } + }, + "collapse" : { + "field" : "user", <1> + "inner_hits": { + "name": "last_tweets", <2> + "size": 5, <3> + "sort": [{ "date": "asc" }] <4> + } + }, + "sort": ["likes"] +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:twitter] +<1> collapse the result set using the "user" field +<2> the name used for the inner hit section in the response +<3> the number of inner_hits to retrieve per collapse key +<4> how to sort the document inside each group + +See <> for the complete list of supported options and the format of the response. + +WARNING: `collapse` cannot be used in conjunction with <>, +<> or <>. diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexParallelizationHelperTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexParallelizationHelperTests.java index f1effa70ca8..02cd400d31f 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexParallelizationHelperTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexParallelizationHelperTests.java @@ -37,7 +37,8 @@ public class ReindexParallelizationHelperTests extends ESTestCase { () -> null, () -> null, () -> null, - () -> emptyList())); + () -> emptyList(), + () -> null)); if (searchRequest.source() != null) { // Clear the slice builder if there is one set. We can't call sliceIntoSubRequests if it is. searchRequest.source().slice(null); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/110_field_collapsing.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/110_field_collapsing.yaml new file mode 100644 index 00000000000..c99b6fc6d3f --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/110_field_collapsing.yaml @@ -0,0 +1,203 @@ +setup: + - do: + indices.create: + index: test + - do: + index: + index: test + type: test + id: 1 + body: { numeric_group: 1, sort: 10 } + - do: + index: + index: test + type: test + id: 2 + body: { numeric_group: 1, sort: 6 } + - do: + index: + index: test + type: test + id: 3 + body: { numeric_group: 1, sort: 24 } + - do: + index: + index: test + type: test + id: 4 + body: { numeric_group: 25, sort: 10 } + - do: + index: + index: test + type: test + id: 5 + body: { numeric_group: 25, sort: 5 } + - do: + index: + index: test + type: test + id: 6 + body: { numeric_group: 3, sort: 36 } + - do: + indices.refresh: + index: test + +--- +"field collapsing": + + - skip: + version: " - 5.99.99" + reason: this uses a new API that has been added in 6.0 + + - do: + search: + index: test + type: test + body: + collapse: { field: numeric_group } + sort: [{ sort: desc }] + + - match: {hits.total: 6 } + - length: {hits.hits: 3 } + - match: {hits.hits.0._index: test } + - match: {hits.hits.0._type: test } + - match: {hits.hits.0.fields.numeric_group: [3] } + - match: {hits.hits.0.sort: [36] } + - match: {hits.hits.0._id: "6" } + - is_false: hits.hits.0.inner_hits + - match: {hits.hits.1._index: test } + - match: {hits.hits.1._type: test } + - match: {hits.hits.1.fields.numeric_group: [1] } + - match: {hits.hits.1.sort: [24] } + - match: {hits.hits.1._id: "3" } + - is_false: hits.hits.1.inner_hits + - match: {hits.hits.2._index: test } + - match: {hits.hits.2._type: test } + - match: {hits.hits.2.fields.numeric_group: [25] } + - match: {hits.hits.2.sort: [10] } + - match: {hits.hits.2._id: "4" } + - is_false: hits.hits.2.inner_hits + +--- +"field collapsing and from": + + - skip: + version: " - 5.99.99" + reason: this uses a new API that has been added in 6.0 + + - do: + search: + index: test + type: test + body: + from: 2 + collapse: { field: numeric_group } + sort: [{ sort: desc }] + + - match: {hits.total: 6 } + - length: {hits.hits: 1 } + - match: {hits.hits.0._index: test } + - match: {hits.hits.0._type: test } + - match: {hits.hits.0.fields.numeric_group: [25]} + - match: {hits.hits.0.sort: [10] } + - match: {hits.hits.0._id: "4" } + - is_false: hits.hits.0.inner_hits + +--- +"field collapsing and inner_hits": + + - skip: + version: " - 5.99.99" + reason: this uses a new API that has been added in 6.0 + + - do: + search: + index: test + type: test + body: + collapse: { field: numeric_group, inner_hits: { name: sub_hits, size: 2, sort: [{ sort: asc }] } } + sort: [{ sort: desc }] + + - match: { hits.total: 6 } + - length: { hits.hits: 3 } + - match: { hits.hits.0._index: test } + - match: { hits.hits.0._type: test } + - match: { hits.hits.0.fields.numeric_group: [3] } + - match: { hits.hits.0.sort: [36] } + - match: { hits.hits.0._id: "6" } + - match: { hits.hits.0.inner_hits.sub_hits.hits.total: 1 } + - length: { hits.hits.0.inner_hits.sub_hits.hits.hits: 1 } + - match: { hits.hits.0.inner_hits.sub_hits.hits.hits.0._id: "6" } + - match: { hits.hits.1._index: test } + - match: { hits.hits.1._type: test } + - match: { hits.hits.1.fields.numeric_group: [1] } + - match: { hits.hits.1.sort: [24] } + - match: { hits.hits.1._id: "3" } + - match: { hits.hits.1.inner_hits.sub_hits.hits.total: 3 } + - length: { hits.hits.1.inner_hits.sub_hits.hits.hits: 2 } + - match: { hits.hits.1.inner_hits.sub_hits.hits.hits.0._id: "2" } + - match: { hits.hits.1.inner_hits.sub_hits.hits.hits.1._id: "1" } + - match: { hits.hits.2._index: test } + - match: { hits.hits.2._type: test } + - match: { hits.hits.2.fields.numeric_group: [25] } + - match: { hits.hits.2.sort: [10] } + - match: { hits.hits.2._id: "4" } + - match: { hits.hits.2.inner_hits.sub_hits.hits.total: 2 } + - length: { hits.hits.2.inner_hits.sub_hits.hits.hits: 2 } + - match: { hits.hits.2.inner_hits.sub_hits.hits.hits.0._id: "5" } + - match: { hits.hits.2.inner_hits.sub_hits.hits.hits.1._id: "4" } + +--- +"field collapsing and scroll": + + - skip: + version: " - 5.99.99" + reason: this uses a new API that has been added in 6.0 + + - do: + catch: /cannot use \`collapse\` in a scroll context/ + search: + index: test + type: test + scroll: 1s + body: + collapse: { field: numeric_group } + +--- +"field collapsing and search_after": + + - skip: + version: " - 5.99.99" + reason: this uses a new API that has been added in 6.0 + + - do: + catch: /cannot use \`collapse\` in conjunction with \`search_after\`/ + search: + index: test + type: test + body: + collapse: { field: numeric_group } + search_after: [6] + sort: [{ sort: desc }] + +--- +"field collapsing and rescore": + + - skip: + version: " - 5.99.99" + reason: this uses a new API that has been added in 6.0 + + - do: + catch: /cannot use \`collapse\` in conjunction with \`rescore\`/ + search: + index: test + type: test + body: + collapse: { field: numeric_group } + rescore: + window_size: 20 + query: + rescore_query: + match_all: {} + query_weight: 1 + rescore_query_weight: 2 diff --git a/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java b/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java index fc240e6b555..f49cd174b7f 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java +++ b/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java @@ -33,6 +33,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.script.Script; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.collapse.CollapseBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.rescore.RescoreBuilder; @@ -74,7 +75,7 @@ public class RandomSearchRequestGenerator { * Build a random search request. * * @param randomSearchSourceBuilder builds a random {@link SearchSourceBuilder}. You can use - * {@link #randomSearchSourceBuilder(Supplier, Supplier, Supplier, Supplier)}. + * {@link #randomSearchSourceBuilder(Supplier, Supplier, Supplier, Supplier, Supplier)}. */ public static SearchRequest randomSearchRequest(Supplier randomSearchSourceBuilder) throws IOException { SearchRequest searchRequest = new SearchRequest(); @@ -112,7 +113,8 @@ public class RandomSearchRequestGenerator { Supplier randomHighlightBuilder, Supplier randomSuggestBuilder, Supplier> randomRescoreBuilder, - Supplier> randomExtBuilders) { + Supplier> randomExtBuilders, + Supplier randomCollapseBuilder) { SearchSourceBuilder builder = new SearchSourceBuilder(); if (randomBoolean()) { builder.from(randomIntBetween(0, 10000)); @@ -335,6 +337,9 @@ public class RandomSearchRequestGenerator { builder.slice(new SliceBuilder(field, id, max)); } } + if (randomBoolean()) { + builder.collapse(randomCollapseBuilder.get()); + } return builder; } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java index 611e27ed057..bec80036230 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.search.SearchExtBuilder; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.SearchContextAggregations; +import org.elasticsearch.search.collapse.CollapseContext; import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.fetch.FetchSearchResult; @@ -363,6 +364,16 @@ public class TestSearchContext extends SearchContext { return null; } + @Override + public SearchContext collapse(CollapseContext collapse) { + return null; + } + + @Override + public CollapseContext collapse() { + return null; + } + @Override public SearchContext parsedPostFilter(ParsedQuery postFilter) { this.postFilter = postFilter;