SOLR-2066: Added support for distributed grouping.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1171970 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Martijn van Groningen 2011-09-17 12:48:27 +00:00
parent ff383452b6
commit 96c4bb7074
39 changed files with 3063 additions and 274 deletions

View File

@ -17,27 +17,13 @@ package org.apache.lucene.search;
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.index.IndexReader.AtomicReaderContext;
import org.apache.lucene.index.values.IndexDocValues;
import org.apache.lucene.index.values.IndexDocValues.Source;
import org.apache.lucene.search.FieldCache.DocTerms;
import org.apache.lucene.search.FieldCache.DocTermsIndex;
import org.apache.lucene.search.cache.ByteValuesCreator;
import org.apache.lucene.search.cache.CachedArray;
import org.apache.lucene.search.cache.CachedArrayCreator;
import org.apache.lucene.search.cache.DoubleValuesCreator;
import org.apache.lucene.search.cache.FloatValuesCreator;
import org.apache.lucene.search.cache.IntValuesCreator;
import org.apache.lucene.search.cache.LongValuesCreator;
import org.apache.lucene.search.cache.ShortValuesCreator;
import org.apache.lucene.search.cache.CachedArray.ByteValues;
import org.apache.lucene.search.cache.CachedArray.DoubleValues;
import org.apache.lucene.search.cache.CachedArray.FloatValues;
import org.apache.lucene.search.cache.CachedArray.IntValues;
import org.apache.lucene.search.cache.CachedArray.LongValues;
import org.apache.lucene.search.cache.CachedArray.ShortValues;
import org.apache.lucene.search.cache.*;
import org.apache.lucene.search.cache.CachedArray.*;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.packed.Direct16;
@ -45,6 +31,8 @@ import org.apache.lucene.util.packed.Direct32;
import org.apache.lucene.util.packed.Direct8;
import org.apache.lucene.util.packed.PackedInts;
import java.io.IOException;
/**
* Expert: a FieldComparator compares hits so as to determine their
* sort order when collecting the top results with {@link
@ -187,7 +175,17 @@ public abstract class FieldComparator<T> {
* if your values may sometimes be null */
@SuppressWarnings("unchecked")
public int compareValues(T first, T second) {
return ((Comparable<T>) first).compareTo(second);
if (first == null) {
if (second == null) {
return 0;
} else {
return -1;
}
} else if (second == null) {
return 1;
} else {
return ((Comparable<T>) first).compareTo(second);
}
}
public static abstract class NumericComparator<T extends CachedArray, U extends Number> extends FieldComparator<U> {

View File

@ -17,10 +17,10 @@ package org.apache.lucene.search;
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.util.PriorityQueue;
import java.io.IOException;
/** Represents hits returned by {@link
* IndexSearcher#search(Query,Filter,int)} and {@link
* IndexSearcher#search(Query,int)}. */
@ -232,11 +232,8 @@ public class TopDocs {
assert queue.size() > 0;
ShardRef ref = queue.pop();
final ScoreDoc hit = shardHits[ref.shardIndex].scoreDocs[ref.hitIndex++];
if (sort == null) {
hits[hitUpto] = new ScoreDoc(hit.doc, hit.score, ref.shardIndex);
} else {
hits[hitUpto] = new FieldDoc(hit.doc, hit.score, ((FieldDoc) hit).fields, ref.shardIndex);
}
hit.shardIndex = ref.shardIndex;
hits[hitUpto] = hit;
//System.out.println(" hitUpto=" + hitUpto);
//System.out.println(" doc=" + hits[hitUpto].doc + " score=" + hits[hitUpto].score);

View File

@ -17,13 +17,12 @@ package org.apache.lucene.search.grouping;
* limitations under the License.
*/
import java.io.IOException;
import java.util.*;
import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.util.BytesRef;
import java.io.IOException;
import java.util.*;
/**
* Represents a group that is found during the first pass search.
@ -47,6 +46,29 @@ public class SearchGroup<GROUP_VALUE_TYPE> {
return("SearchGroup(groupValue=" + groupValue + " sortValues=" + Arrays.toString(sortValues) + ")");
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SearchGroup that = (SearchGroup) o;
if (groupValue == null) {
if (that.groupValue != null) {
return false;
}
} else if (!groupValue.equals(that.groupValue)) {
return false;
}
return true;
}
@Override
public int hashCode() {
return groupValue != null ? groupValue.hashCode() : 0;
}
private static class ShardIter<T> {
public final Iterator<SearchGroup<T>> iter;
public final int shardIndex;

View File

@ -17,13 +17,13 @@ package org.apache.lucene.search.grouping;
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import java.io.IOException;
/** Represents result returned by a grouping search.
*
* @lucene.experimental */
@ -70,8 +70,13 @@ public class TopGroups<GROUP_VALUE_TYPE> {
* same groupSort and docSort, and the top groups passed
* to all second-pass collectors must be the same.
*
* <b>NOTE</b>: this cannot merge totalGroupCount; ie the
* returned TopGroups will have null totalGroupCount.
* <b>NOTE</b>: We can't always compute an exact totalGroupCount.
* Documents belonging to a group may occur on more than
* one shard and thus the merged totalGroupCount can be
* higher than the actual totalGroupCount. In this case the
* totalGroupCount represents a upper bound. If the documents
* of one group do only reside in one shard then the
* totalGroupCount is exact.
*
* <b>NOTE</b>: the topDocs in each GroupDocs is actually
* an instance of TopDocsAndShards
@ -87,6 +92,8 @@ public class TopGroups<GROUP_VALUE_TYPE> {
int totalHitCount = 0;
int totalGroupedHitCount = 0;
// Optionally merge the totalGroupCount.
Integer totalGroupCount = null;
final int numGroups = shardGroups[0].groups.length;
for(TopGroups<T> shard : shardGroups) {
@ -95,6 +102,13 @@ public class TopGroups<GROUP_VALUE_TYPE> {
}
totalHitCount += shard.totalHitCount;
totalGroupedHitCount += shard.totalGroupedHitCount;
if (shard.totalGroupCount != null) {
if (totalGroupCount == null) {
totalGroupCount = 0;
}
totalGroupCount += shard.totalGroupCount;
}
}
@SuppressWarnings("unchecked")
@ -156,10 +170,19 @@ public class TopGroups<GROUP_VALUE_TYPE> {
shardGroups[0].groups[groupIDX].groupSortValues);
}
return new TopGroups<T>(groupSort.getSort(),
docSort == null ? null : docSort.getSort(),
totalHitCount,
totalGroupedHitCount,
mergedGroupDocs);
if (totalGroupCount != null) {
TopGroups<T> result = new TopGroups<T>(groupSort.getSort(),
docSort == null ? null : docSort.getSort(),
totalHitCount,
totalGroupedHitCount,
mergedGroupDocs);
return new TopGroups<T>(result, totalGroupCount);
} else {
return new TopGroups<T>(groupSort.getSort(),
docSort == null ? null : docSort.getSort(),
totalHitCount,
totalGroupedHitCount,
mergedGroupDocs);
}
}
}

View File

@ -336,6 +336,9 @@ New Features
can be specified with a name in solrconfig.xml, and use hl.boundaryScanner=name
parameter to specify the named <boundaryScanner/>. (koji)
* SOLR-2066: Added support for distributed grouping.
(Martijn van Groningen, Jasper van Veghel, Matt Beaumont)
Bug Fixes
----------------------
* SOLR-2748: The CommitTracker used for commitWith or autoCommit by maxTime

View File

@ -17,10 +17,6 @@
package org.apache.solr.handler.component;
import java.io.IOException;
import java.net.URL;
import java.util.*;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexReader.AtomicReaderContext;
@ -28,6 +24,9 @@ import org.apache.lucene.index.IndexReader.ReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.search.*;
import org.apache.lucene.search.grouping.GroupDocs;
import org.apache.lucene.search.grouping.SearchGroup;
import org.apache.lucene.search.grouping.TopGroups;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
import org.apache.lucene.util.ReaderUtil;
@ -50,8 +49,31 @@ import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.*;
import org.apache.solr.search.grouping.CommandHandler;
import org.apache.solr.search.grouping.GroupingSpecification;
import org.apache.solr.search.grouping.distributed.shardresultserializer.TopGroupsResultTransformer;
import org.apache.solr.search.grouping.endresulttransformer.EndResultTransformer;
import org.apache.solr.search.grouping.distributed.ShardRequestFactory;
import org.apache.solr.search.grouping.distributed.ShardResponseProcessor;
import org.apache.solr.search.grouping.distributed.command.QueryCommand;
import org.apache.solr.search.grouping.distributed.command.SearchGroupsFieldCommand;
import org.apache.solr.search.grouping.distributed.command.TopGroupsFieldCommand;
import org.apache.solr.search.grouping.distributed.requestfactory.SearchGroupsRequestFactory;
import org.apache.solr.search.grouping.distributed.requestfactory.StoredFieldsShardRequestFactory;
import org.apache.solr.search.grouping.distributed.requestfactory.TopGroupsShardRequestFactory;
import org.apache.solr.search.grouping.distributed.responseprocessor.SearchGroupShardResponseProcessor;
import org.apache.solr.search.grouping.distributed.responseprocessor.StoredFieldsShardResponseProcessor;
import org.apache.solr.search.grouping.distributed.responseprocessor.TopGroupsShardResponseProcessor;
import org.apache.solr.search.grouping.distributed.shardresultserializer.SearchGroupsResultTransformer;
import org.apache.solr.search.grouping.endresulttransformer.GroupedEndResultTransformer;
import org.apache.solr.search.grouping.endresulttransformer.MainEndResultTransformer;
import org.apache.solr.search.grouping.endresulttransformer.SimpleEndResultTransformer;
import org.apache.solr.util.SolrPluginUtils;
import java.io.IOException;
import java.net.URL;
import java.util.*;
/**
* TODO!
*
@ -127,6 +149,47 @@ public class QueryComponent extends SearchComponent
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
}
boolean grouping = params.getBool(GroupParams.GROUP, false);
if (!grouping) {
return;
}
SolrIndexSearcher.QueryCommand cmd = rb.getQueryCommand();
SolrIndexSearcher searcher = rb.req.getSearcher();
GroupingSpecification groupingSpec = new GroupingSpecification();
rb.setGroupingSpec(groupingSpec);
//TODO: move weighting of sort
Sort groupSort = searcher.weightSort(cmd.getSort());
// groupSort defaults to sort
String groupSortStr = params.get(GroupParams.GROUP_SORT);
if (groupSort == null) {
groupSort = new Sort();
}
//TODO: move weighting of sort
Sort sortWithinGroup = groupSortStr == null ? groupSort : searcher.weightSort(QueryParsing.parseSort(groupSortStr, req));
groupingSpec.setSortWithinGroup(sortWithinGroup);
groupingSpec.setGroupSort(groupSort);
String formatStr = params.get(GroupParams.GROUP_FORMAT, Grouping.Format.grouped.name());
Grouping.Format responseFormat;
try {
responseFormat = Grouping.Format.valueOf(formatStr);
} catch (IllegalArgumentException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, String.format("Illegal %s parameter", GroupParams.GROUP_FORMAT));
}
groupingSpec.setResponseFormat(responseFormat);
groupingSpec.setFields(params.getParams(GroupParams.GROUP_FIELD));
groupingSpec.setQueries(params.getParams(GroupParams.GROUP_QUERY));
groupingSpec.setFunctions(params.getParams(GroupParams.GROUP_FUNC));
groupingSpec.setGroupOffset(params.getInt(GroupParams.GROUP_OFFSET, 0));
groupingSpec.setGroupLimit(params.getInt(GroupParams.GROUP_LIMIT, 1));
groupingSpec.setOffset(rb.getSortSpec().getOffset());
groupingSpec.setLimit(rb.getSortSpec().getCount());
groupingSpec.setIncludeGroupCount(params.getBool(GroupParams.GROUP_TOTAL_COUNT, false));
groupingSpec.setMain(params.getBool(GroupParams.GROUP_MAIN, false));
groupingSpec.setNeedScore((cmd.getFlags() & SolrIndexSearcher.GET_SCORES) != 0);
}
@ -311,65 +374,119 @@ public class QueryComponent extends SearchComponent
//
// grouping / field collapsing
//
boolean doGroup = params.getBool(GroupParams.GROUP, false);
if (doGroup) {
GroupingSpecification groupingSpec = rb.getGroupingSpec();
if (groupingSpec != null) {
try {
int maxDocsPercentageToCache = params.getInt(GroupParams.GROUP_CACHE_PERCENTAGE, 0);
boolean cacheSecondPassSearch = maxDocsPercentageToCache >= 1 && maxDocsPercentageToCache <= 100;
String[] fields = params.getParams(GroupParams.GROUP_FIELD);
String[] funcs = params.getParams(GroupParams.GROUP_FUNC);
String[] queries = params.getParams(GroupParams.GROUP_QUERY);
String groupSortStr = params.get(GroupParams.GROUP_SORT);
boolean main = params.getBool(GroupParams.GROUP_MAIN, false);
boolean truncateGroups = params.getBool(GroupParams.GROUP_TRUNCATE, false);
boolean needScores = (cmd.getFlags() & SolrIndexSearcher.GET_SCORES) != 0;
if (params.getBool("group.distibuted.first", false)) {
CommandHandler.Builder topsGroupsActionBuilder = new CommandHandler.Builder()
.setQueryCommand(cmd)
.setNeedDocSet(false) // Order matters here
.setSearcher(searcher);
String formatStr = params.get(GroupParams.GROUP_FORMAT, Grouping.Format.grouped.name());
Grouping.Format defaultFormat;
try {
defaultFormat = Grouping.Format.valueOf(formatStr);
} catch (IllegalArgumentException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, String.format("Illegal %s parameter", GroupParams.GROUP_FORMAT));
for (String field : groupingSpec.getFields()) {
topsGroupsActionBuilder.addCommandField(new SearchGroupsFieldCommand.Builder()
.setField(searcher.getSchema().getField(field))
.setGroupSort(groupingSpec.getGroupSort())
.setTopNGroups(cmd.getOffset() + cmd.getLen())
.build()
);
}
CommandHandler commandHandler = topsGroupsActionBuilder.build();
commandHandler.execute();
SearchGroupsResultTransformer serializer = new SearchGroupsResultTransformer(searcher);
rsp.add("firstPhase", commandHandler.processResult(result, serializer));
rb.setResult(result);
return;
} else if (params.getBool("group.distibuted.second", false)) {
CommandHandler.Builder secondPhaseBuilder = new CommandHandler.Builder()
.setQueryCommand(cmd)
.setSearcher(searcher);
for (String field : groupingSpec.getFields()) {
String[] topGroupsParam = params.getParams("group.topgroups." + field);
if (topGroupsParam == null) {
continue;
}
List<SearchGroup<BytesRef>> topGroups = new ArrayList<SearchGroup<BytesRef>>(topGroupsParam.length);
for (String topGroup : topGroupsParam) {
SearchGroup<BytesRef> searchGroup = new SearchGroup<BytesRef>();
if (!topGroup.equals(TopGroupsShardRequestFactory.GROUP_NULL_VALUE)) {
searchGroup.groupValue = new BytesRef(searcher.getSchema().getField(field).getType().readableToIndexed(topGroup));
}
topGroups.add(searchGroup);
}
secondPhaseBuilder.addCommandField(
new TopGroupsFieldCommand.Builder()
.setField(searcher.getSchema().getField(field))
.setGroupSort(groupingSpec.getGroupSort())
.setSortWithinGroup(groupingSpec.getSortWithinGroup())
.setFirstPhaseGroups(topGroups)
.setMaxDocPerGroup(groupingSpec.getGroupOffset() + groupingSpec.getGroupLimit())
.setNeedScores(needScores)
.setNeedMaxScore(needScores)
.setNeedGroupCount(groupingSpec.isIncludeGroupCount())
.build()
);
}
for (String query : groupingSpec.getQueries()) {
secondPhaseBuilder.addCommandField(new QueryCommand.Builder()
.setDocsToCollect(groupingSpec.getOffset() + groupingSpec.getLimit())
.setSort(groupingSpec.getGroupSort())
.setQuery(query, rb.req)
.setDocSet(searcher)
.build()
);
}
CommandHandler commandHandler = secondPhaseBuilder.build();
commandHandler.execute();
TopGroupsResultTransformer serializer = new TopGroupsResultTransformer(rb);
rsp.add("secondPhase", commandHandler.processResult(result, serializer));
rb.setResult(result);
return;
}
boolean includeTotalGroupCount = params.getBool(GroupParams.GROUP_TOTAL_COUNT, false);
Grouping.TotalCount defaultTotalCount = includeTotalGroupCount ? Grouping.TotalCount.grouped : Grouping.TotalCount.ungrouped;
Sort sort = searcher.weightSort(cmd.getSort());
// groupSort defaults to sort
Sort groupSort = groupSortStr == null ? sort : searcher.weightSort(QueryParsing.parseSort(groupSortStr, req));
int maxDocsPercentageToCache = params.getInt(GroupParams.GROUP_CACHE_PERCENTAGE, 0);
boolean cacheSecondPassSearch = maxDocsPercentageToCache >= 1 && maxDocsPercentageToCache <= 100;
boolean truncateGroups = params.getBool(GroupParams.GROUP_TRUNCATE, false);
Grouping.TotalCount defaultTotalCount = groupingSpec.isIncludeGroupCount() ?
Grouping.TotalCount.grouped : Grouping.TotalCount.ungrouped;
int limitDefault = cmd.getLen(); // this is normally from "rows"
int groupOffsetDefault = params.getInt(GroupParams.GROUP_OFFSET, 0);
int docsPerGroupDefault = params.getInt(GroupParams.GROUP_LIMIT, 1);
Grouping grouping = new Grouping(searcher, result, cmd, cacheSecondPassSearch, maxDocsPercentageToCache, main);
grouping.setSort(sort)
.setGroupSort(groupSort)
.setDefaultFormat(defaultFormat)
Grouping grouping =
new Grouping(searcher, result, cmd, cacheSecondPassSearch, maxDocsPercentageToCache, groupingSpec.isMain());
grouping.setSort(groupingSpec.getGroupSort())
.setGroupSort(groupingSpec.getSortWithinGroup())
.setDefaultFormat(groupingSpec.getResponseFormat())
.setLimitDefault(limitDefault)
.setDefaultTotalCount(defaultTotalCount)
.setDocsPerGroupDefault(docsPerGroupDefault)
.setGroupOffsetDefault(groupOffsetDefault)
.setDocsPerGroupDefault(groupingSpec.getGroupLimit())
.setGroupOffsetDefault(groupingSpec.getGroupOffset())
.setGetGroupedDocSet(truncateGroups);
if (fields != null) {
for (String field : fields) {
if (groupingSpec.getFields() != null) {
for (String field : groupingSpec.getFields()) {
grouping.addFieldCommand(field, rb.req);
}
}
if (funcs != null) {
for (String groupByStr : funcs) {
if (groupingSpec.getFunctions() != null) {
for (String groupByStr : groupingSpec.getFunctions()) {
grouping.addFunctionCommand(groupByStr, rb.req);
}
}
if (queries != null) {
for (String groupByStr : queries) {
if (groupingSpec.getQueries() != null) {
for (String groupByStr : groupingSpec.getQueries()) {
grouping.addQueryCommand(groupByStr, rb.req);
}
}
if (rb.doHighlights || rb.isDebug()) {
if (rb.doHighlights || rb.isDebug() || params.getBool(MoreLikeThisParams.MLT, false)) {
// we need a single list of the returned docs
cmd.setFlags(SolrIndexSearcher.GET_DOCLIST);
}
@ -382,7 +499,6 @@ public class QueryComponent extends SearchComponent
);
}
rb.setResult(result);
rsp.add("grouped", result.groupedResults);
if (grouping.mainResult != null) {
ResultContext ctx = new ResultContext();
@ -391,6 +507,7 @@ public class QueryComponent extends SearchComponent
rsp.add("response", ctx);
rsp.getToLog().add("hits", grouping.mainResult.matches());
} else if (!grouping.getCommands().isEmpty()) { // Can never be empty since grouping.execute() checks for this.
rsp.add("grouped", result.groupedResults);
rsp.getToLog().add("hits", grouping.getCommands().get(0).getMatches());
}
return;
@ -426,7 +543,7 @@ public class QueryComponent extends SearchComponent
if(fsv){
Sort sort = searcher.weightSort(rb.getSortSpec().getSort());
SortField[] sortFields = sort==null ? new SortField[]{SortField.FIELD_SCORE} : sort.getSort();
NamedList sortVals = new NamedList(); // order is important for the sort fields
NamedList<List> sortVals = new NamedList<List>(); // order is important for the sort fields
Field field = new StringField("dummy", ""); // a dummy Field
ReaderContext topReaderContext = searcher.getTopReaderContext();
AtomicReaderContext[] leaves = ReaderUtil.leaves(topReaderContext);
@ -448,7 +565,7 @@ public class QueryComponent extends SearchComponent
FieldType ft = fieldname==null ? null : req.getSchema().getFieldTypeNoEx(fieldname);
DocList docList = rb.getResults().docList;
ArrayList<Object> vals = new ArrayList<Object>(docList.size());
List<Object> vals = new ArrayList<Object>(docList.size());
DocIterator it = rb.getResults().docList.iterator();
int idx = 0;
@ -513,6 +630,48 @@ public class QueryComponent extends SearchComponent
@Override
public int distributedProcess(ResponseBuilder rb) throws IOException {
if (rb.grouping()) {
return groupedDistributedProcess(rb);
} else {
return regularDistributedProcess(rb);
}
}
private int groupedDistributedProcess(ResponseBuilder rb) {
int nextStage = ResponseBuilder.STAGE_DONE;
ShardRequestFactory shardRequestFactory = null;
if (rb.stage < ResponseBuilder.STAGE_PARSE_QUERY) {
nextStage = ResponseBuilder.STAGE_PARSE_QUERY;
} else if (rb.stage == ResponseBuilder.STAGE_PARSE_QUERY) {
createDistributedIdf(rb);
nextStage = ResponseBuilder.STAGE_TOP_GROUPS;
} else if (rb.stage < ResponseBuilder.STAGE_TOP_GROUPS) {
nextStage = ResponseBuilder.STAGE_TOP_GROUPS;
} else if (rb.stage == ResponseBuilder.STAGE_TOP_GROUPS) {
shardRequestFactory = new SearchGroupsRequestFactory();
nextStage = ResponseBuilder.STAGE_EXECUTE_QUERY;
} else if (rb.stage < ResponseBuilder.STAGE_EXECUTE_QUERY) {
nextStage = ResponseBuilder.STAGE_EXECUTE_QUERY;
} else if (rb.stage == ResponseBuilder.STAGE_EXECUTE_QUERY) {
shardRequestFactory = new TopGroupsShardRequestFactory();
nextStage = ResponseBuilder.STAGE_GET_FIELDS;
} else if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS) {
nextStage = ResponseBuilder.STAGE_GET_FIELDS;
} else if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
shardRequestFactory = new StoredFieldsShardRequestFactory();
nextStage = ResponseBuilder.STAGE_DONE;
}
if (shardRequestFactory != null) {
for (ShardRequest shardRequest : shardRequestFactory.constructRequest(rb)) {
rb.addRequest(this, shardRequest);
}
}
return nextStage;
}
private int regularDistributedProcess(ResponseBuilder rb) {
if (rb.stage < ResponseBuilder.STAGE_PARSE_QUERY)
return ResponseBuilder.STAGE_PARSE_QUERY;
if (rb.stage == ResponseBuilder.STAGE_PARSE_QUERY) {
@ -532,35 +691,102 @@ public class QueryComponent extends SearchComponent
return ResponseBuilder.STAGE_DONE;
}
@Override
public void handleResponses(ResponseBuilder rb, ShardRequest sreq) {
if (rb.grouping()) {
handleGroupedResponses(rb, sreq);
} else {
handleRegularResponses(rb, sreq);
}
}
private void handleGroupedResponses(ResponseBuilder rb, ShardRequest sreq) {
ShardResponseProcessor responseProcessor = null;
if ((sreq.purpose & ShardRequest.PURPOSE_GET_TOP_GROUPS) != 0) {
responseProcessor = new SearchGroupShardResponseProcessor();
} else if ((sreq.purpose & ShardRequest.PURPOSE_GET_TOP_IDS) != 0) {
responseProcessor = new TopGroupsShardResponseProcessor();
} else if ((sreq.purpose & ShardRequest.PURPOSE_GET_FIELDS) != 0) {
responseProcessor = new StoredFieldsShardResponseProcessor();
}
if (responseProcessor != null) {
responseProcessor.process(rb, sreq);
}
}
private void handleRegularResponses(ResponseBuilder rb, ShardRequest sreq) {
if ((sreq.purpose & ShardRequest.PURPOSE_GET_TOP_IDS) != 0) {
mergeIds(rb, sreq);
}
if ((sreq.purpose & ShardRequest.PURPOSE_GET_FIELDS) != 0) {
returnFields(rb, sreq);
return;
}
}
@Override
public void finishStage(ResponseBuilder rb) {
if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
// We may not have been able to retrieve all the docs due to an
// index change. Remove any null documents.
for (Iterator<SolrDocument> iter = rb._responseDocs.iterator(); iter.hasNext();) {
if (iter.next() == null) {
iter.remove();
rb._responseDocs.setNumFound(rb._responseDocs.getNumFound()-1);
}
}
rb.rsp.add("response", rb._responseDocs);
if (rb.stage != ResponseBuilder.STAGE_GET_FIELDS) {
return;
}
if (rb.grouping()) {
groupedFinishStage(rb);
} else {
regularFinishStage(rb);
}
}
private static final EndResultTransformer MAIN_END_RESULT_TRANSFORMER = new MainEndResultTransformer();
private static final EndResultTransformer SIMPLE_END_RESULT_TRANSFORMER = new SimpleEndResultTransformer();
@SuppressWarnings("unchecked")
private void groupedFinishStage(final ResponseBuilder rb) {
// To have same response as non-distributed request.
GroupingSpecification groupSpec = rb.getGroupingSpec();
if (rb.mergedTopGroups.isEmpty()) {
for (String field : groupSpec.getFields()) {
rb.mergedTopGroups.put(field, new TopGroups(null, null, 0, 0, new GroupDocs[]{}));
}
rb.resultIds = new HashMap<Object, ShardDoc>();
}
EndResultTransformer.SolrDocumentSource solrDocumentSource = new EndResultTransformer.SolrDocumentSource() {
public SolrDocument retrieve(ScoreDoc doc) {
ShardDoc solrDoc = (ShardDoc) doc;
return rb.retrievedDocuments.get(solrDoc.id);
}
};
EndResultTransformer endResultTransformer;
if (groupSpec.isMain()) {
endResultTransformer = MAIN_END_RESULT_TRANSFORMER;
} else if (Grouping.Format.grouped == groupSpec.getResponseFormat()) {
endResultTransformer = new GroupedEndResultTransformer(rb.req.getSearcher());
} else if (Grouping.Format.simple == groupSpec.getResponseFormat() && !groupSpec.isMain()) {
endResultTransformer = SIMPLE_END_RESULT_TRANSFORMER;
} else {
return;
}
Map<String, Object> combinedMap = new LinkedHashMap<String, Object>();
combinedMap.putAll(rb.mergedTopGroups);
combinedMap.putAll(rb.mergedQueryCommandResults);
endResultTransformer.transform(combinedMap, rb.rsp, rb.getGroupingSpec(), solrDocumentSource);
}
private void regularFinishStage(ResponseBuilder rb) {
// We may not have been able to retrieve all the docs due to an
// index change. Remove any null documents.
for (Iterator<SolrDocument> iter = rb._responseDocs.iterator(); iter.hasNext();) {
if (iter.next() == null) {
iter.remove();
rb._responseDocs.setNumFound(rb._responseDocs.getNumFound()-1);
}
}
rb.rsp.add("response", rb._responseDocs);
}
private void createDistributedIdf(ResponseBuilder rb) {
// TODO
@ -696,7 +922,7 @@ public class QueryComponent extends SearchComponent
Map<Object,ShardDoc> resultIds = new HashMap<Object,ShardDoc>();
for (int i=resultSize-1; i>=0; i--) {
ShardDoc shardDoc = (ShardDoc)queue.pop();
ShardDoc shardDoc = queue.pop();
shardDoc.positionInResponse = i;
// Need the toString() for correlation with other lists that must
// be strings (like keys in highlighting, explain, etc)
@ -802,7 +1028,7 @@ public class QueryComponent extends SearchComponent
}
rb._responseDocs.set(sdoc.positionInResponse, doc);
}
}
}
}
}

View File

@ -18,6 +18,11 @@
package org.apache.solr.handler.component;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.grouping.SearchGroup;
import org.apache.lucene.search.grouping.TopGroups;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.RTimer;
@ -27,9 +32,13 @@ import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.DocListAndSet;
import org.apache.solr.search.QParser;
import org.apache.solr.search.SortSpec;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.search.SortSpec;
import org.apache.solr.search.grouping.GroupingSpecification;
import org.apache.solr.search.grouping.distributed.command.QueryCommandResult;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -59,6 +68,7 @@ public class ResponseBuilder
private Query query = null;
private List<Query> filters = null;
private SortSpec sortSpec = null;
private GroupingSpecification groupingSpec;
private DocListAndSet results = null;
private NamedList<Object> debugInfo = null;
@ -100,6 +110,7 @@ public class ResponseBuilder
public static int STAGE_START = 0;
public static int STAGE_PARSE_QUERY = 1000;
public static int STAGE_TOP_GROUPS = 1500;
public static int STAGE_EXECUTE_QUERY = 2000;
public static int STAGE_GET_FIELDS = 3000;
public static int STAGE_DONE = Integer.MAX_VALUE;
@ -137,7 +148,7 @@ public class ResponseBuilder
public GlobalCollectionStat globalCollectionStat;
Map<Object, ShardDoc> resultIds;
public Map<Object, ShardDoc> resultIds;
// Maps uniqueKeyValue to ShardDoc, which may be used to
// determine order of the doc or uniqueKey in the final
// returned sequence.
@ -151,6 +162,13 @@ public class ResponseBuilder
TermsComponent.TermsHelper _termsHelper;
SimpleOrderedMap<List<NamedList<Object>>> _pivots;
// Context fields for grouping
public final Map<String, Collection<SearchGroup<BytesRef>>> mergedSearchGroups = new HashMap<String, Collection<SearchGroup<BytesRef>>>();
public final Map<String, Map<SearchGroup<BytesRef>, String>> searchGroupToShard = new HashMap<String, Map<SearchGroup<BytesRef>, String>>();
public final Map<String, TopGroups<BytesRef>> mergedTopGroups = new HashMap<String, TopGroups<BytesRef>>();
public final Map<String, QueryCommandResult> mergedQueryCommandResults = new HashMap<String, QueryCommandResult>();
public final Map<Object, SolrDocument> retrievedDocuments = new HashMap<Object, SolrDocument>();
/**
* Utility function to add debugging info. This will make sure a valid
* debugInfo exists before adding to it.
@ -315,6 +333,18 @@ public class ResponseBuilder
this.sortSpec = sort;
}
public GroupingSpecification getGroupingSpec() {
return groupingSpec;
}
public void setGroupingSpec(GroupingSpecification groupingSpec) {
this.groupingSpec = groupingSpec;
}
public boolean grouping() {
return groupingSpec != null;
}
public RTimer getTimer() {
return timer;
}

View File

@ -16,19 +16,20 @@
*/
package org.apache.solr.handler.component;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.FieldComparatorSource;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.SortField;
import org.apache.lucene.util.PriorityQueue;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.search.MissingStringLastComparatorSource;
import java.text.Collator;
import java.util.Comparator;
import java.util.Locale;
import java.util.List;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
public class ShardDoc {
public class ShardDoc extends FieldDoc {
public String shard;
public String shardAddress; // TODO
@ -37,7 +38,7 @@ public class ShardDoc {
// to short-circuit comparisons if the shard is equal, and can
// also be used to break ties within the same shard.
Object id;
public Object id;
// this is currently the uniqueKeyField but
// may be replaced with internal docid in a future release.
@ -53,9 +54,36 @@ public class ShardDoc {
// but we shouldn't expose uniqueKey (have a map by it) until the stored-field
// retrieval stage.
int positionInResponse;
public int positionInResponse;
// the ordinal position in the merged response arraylist
public ShardDoc(float score, Object[] fields, Object uniqueId, String shard) {
super(-1, score, fields);
this.id = uniqueId;
this.shard = shard;
}
public ShardDoc() {
super(-1, Float.NaN);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardDoc shardDoc = (ShardDoc) o;
if (id != null ? !id.equals(shardDoc.id) : shardDoc.id != null) return false;
return true;
}
@Override
public int hashCode() {
return id != null ? id.hashCode() : 0;
}
@Override
public String toString(){
return "id="+id
@ -70,7 +98,7 @@ public class ShardDoc {
// used by distributed search to merge results.
class ShardFieldSortedHitQueue extends PriorityQueue {
class ShardFieldSortedHitQueue extends PriorityQueue<ShardDoc> {
/** Stores a comparator corresponding to each field being sorted by */
protected Comparator[] comparators;
@ -111,10 +139,7 @@ class ShardFieldSortedHitQueue extends PriorityQueue {
}
@Override
protected boolean lessThan(Object objA, Object objB) {
ShardDoc docA = (ShardDoc)objA;
ShardDoc docB = (ShardDoc)objB;
protected boolean lessThan(ShardDoc docA, ShardDoc docB) {
// If these docs are from the same shard, then the relative order
// is how they appeared in the response from that shard.
if (docA.shard == docB.shard) {

View File

@ -16,11 +16,11 @@
*/
package org.apache.solr.handler.component;
import org.apache.solr.common.params.ModifiableSolrParams;
import java.util.ArrayList;
import java.util.List;
import org.apache.solr.common.params.ModifiableSolrParams;
// todo... when finalized make accessors
public class ShardRequest {
@ -37,6 +37,7 @@ public class ShardRequest {
public final static int PURPOSE_GET_DEBUG =0x100;
public final static int PURPOSE_GET_STATS =0x200;
public final static int PURPOSE_GET_TERMS =0x400;
public final static int PURPOSE_GET_TOP_GROUPS =0x800;
public int purpose; // the purpose of this request

View File

@ -1,4 +1,6 @@
/**
package org.apache.solr.search;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -15,12 +17,10 @@
* limitations under the License.
*/
package org.apache.solr.search;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.util.OpenBitSet;
import org.apache.lucene.index.IndexReader.AtomicReaderContext;
import java.io.IOException;
@ -28,7 +28,7 @@ import java.io.IOException;
*
*/
class DocSetCollector extends Collector {
public class DocSetCollector extends Collector {
int pos=0;
OpenBitSet bits;
final int maxDoc;
@ -40,11 +40,12 @@ class DocSetCollector extends Collector {
// in case there are only a few.
final int[] scratch;
DocSetCollector(int smallSetSize, int maxDoc) {
public DocSetCollector(int smallSetSize, int maxDoc) {
this.smallSetSize = smallSetSize;
this.maxDoc = maxDoc;
this.scratch = new int[smallSetSize];
}
@Override
public void collect(int doc) throws IOException {
doc += base;
@ -67,65 +68,6 @@ class DocSetCollector extends Collector {
pos++;
}
public DocSet getDocSet() {
if (pos<=scratch.length) {
// assumes docs were collected in sorted order!
return new SortedIntDocSet(scratch, pos);
} else {
// set the bits for ids that were collected in the array
for (int i=0; i<scratch.length; i++) bits.fastSet(scratch[i]);
return new BitDocSet(bits,pos);
}
}
@Override
public void setScorer(Scorer scorer) throws IOException {
}
@Override
public void setNextReader(AtomicReaderContext context) throws IOException {
this.base = context.docBase;
}
@Override
public boolean acceptsDocsOutOfOrder() {
return false;
}
}
class DocSetDelegateCollector extends DocSetCollector {
final Collector collector;
DocSetDelegateCollector(int smallSetSize, int maxDoc, Collector collector) {
super(smallSetSize, maxDoc);
this.collector = collector;
}
@Override
public void collect(int doc) throws IOException {
collector.collect(doc);
doc += base;
// optimistically collect the first docs in an array
// in case the total number will be small enough to represent
// as a small set like SortedIntDocSet instead...
// Storing in this array will be quicker to convert
// than scanning through a potentially huge bit vector.
// FUTURE: when search methods all start returning docs in order, maybe
// we could have a ListDocSet() and use the collected array directly.
if (pos < scratch.length) {
scratch[pos]=doc;
} else {
// this conditional could be removed if BitSet was preallocated, but that
// would take up more memory, and add more GC time...
if (bits==null) bits = new OpenBitSet(maxDoc);
bits.fastSet(doc);
}
pos++;
}
@Override
public DocSet getDocSet() {
if (pos<=scratch.length) {
// assumes docs were collected in sorted order!
@ -139,12 +81,15 @@ class DocSetDelegateCollector extends DocSetCollector {
@Override
public void setScorer(Scorer scorer) throws IOException {
collector.setScorer(scorer);
}
@Override
public void setNextReader(AtomicReaderContext context) throws IOException {
collector.setNextReader(context);
public void setNextReader(IndexReader.AtomicReaderContext context) throws IOException {
this.base = context.docBase;
}
@Override
public boolean acceptsDocsOutOfOrder() {
return false;
}
}

View File

@ -0,0 +1,67 @@
package org.apache.solr.search;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.util.OpenBitSet;
import java.io.IOException;
/**
*
*/
public class DocSetDelegateCollector extends DocSetCollector {
final Collector collector;
public DocSetDelegateCollector(int smallSetSize, int maxDoc, Collector collector) {
super(smallSetSize, maxDoc);
this.collector = collector;
}
@Override
public void collect(int doc) throws IOException {
collector.collect(doc);
doc += base;
// optimistically collect the first docs in an array
// in case the total number will be small enough to represent
// as a small set like SortedIntDocSet instead...
// Storing in this array will be quicker to convert
// than scanning through a potentially huge bit vector.
// FUTURE: when search methods all start returning docs in order, maybe
// we could have a ListDocSet() and use the collected array directly.
if (pos < scratch.length) {
scratch[pos]=doc;
} else {
// this conditional could be removed if BitSet was preallocated, but that
// would take up more memory, and add more GC time...
if (bits==null) bits = new OpenBitSet(maxDoc);
bits.fastSet(doc);
}
pos++;
}
@Override
public DocSet getDocSet() {
if (pos<=scratch.length) {
// assumes docs were collected in sorted order!
return new SortedIntDocSet(scratch, pos);
} else {
// set the bits for ids that were collected in the array
for (int i=0; i<scratch.length; i++) bits.fastSet(scratch[i]);
return new BitDocSet(bits,pos);
}
}
@Override
public void setScorer(Scorer scorer) throws IOException {
collector.setScorer(scorer);
}
@Override
public void setNextReader(IndexReader.AtomicReaderContext context) throws IOException {
collector.setNextReader(context);
this.base = context.docBase;
}
}

View File

@ -38,6 +38,7 @@ import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.schema.StrFieldSource;
import org.apache.solr.search.grouping.collector.FilterCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -821,7 +822,7 @@ public class Grouping {
* {@inheritDoc}
*/
protected void finish() throws IOException {
TopDocsCollector topDocsCollector = (TopDocsCollector) collector.collector;
TopDocsCollector topDocsCollector = (TopDocsCollector) collector.getDelegate();
TopDocs topDocs = topDocsCollector.topDocs();
GroupDocs<String> groupDocs = new GroupDocs<String>(topDocs.getMaxScore(), topDocs.totalHits, topDocs.scoreDocs, query.toString(), null);
if (main) {
@ -836,7 +837,7 @@ public class Grouping {
* {@inheritDoc}
*/
public int getMatches() {
return collector.matches;
return collector.getMatches();
}
}
@ -975,42 +976,6 @@ public class Grouping {
}
/**
* A collector that filters incoming doc ids that are not in the filter
*/
static class FilterCollector extends Collector {
final DocSet filter;
final Collector collector;
int docBase;
int matches;
public FilterCollector(DocSet filter, Collector collector) throws IOException {
this.filter = filter;
this.collector = collector;
}
public void setScorer(Scorer scorer) throws IOException {
collector.setScorer(scorer);
}
public void collect(int doc) throws IOException {
matches++;
if (filter.exists(doc + docBase)) {
collector.collect(doc);
}
}
public void setNextReader(AtomicReaderContext context) throws IOException {
this.docBase = context.docBase;
collector.setNextReader(context);
}
public boolean acceptsDocsOutOfOrder() {
return collector.acceptsDocsOutOfOrder();
}
}
static class FunctionFirstPassGroupingCollector extends AbstractFirstPassGroupingCollector<MutableValue> {
private final ValueSource groupByVS;

View File

@ -17,9 +17,11 @@
package org.apache.solr.search;
import org.apache.lucene.search.*;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexReader.AtomicReaderContext;
import org.apache.lucene.search.FieldCache;
import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.FieldComparatorSource;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.UnicodeUtil;
import org.apache.lucene.util.packed.Direct16;
@ -56,7 +58,7 @@ public class MissingStringLastComparatorSource extends FieldComparatorSource {
// Copied from Lucene's TermOrdValComparator and modified since the Lucene version couldn't
// be extended.
class TermOrdValComparator_SML extends FieldComparator<BytesRef> {
class TermOrdValComparator_SML extends FieldComparator<Comparable> {
private static final int NULL_ORD = Integer.MAX_VALUE;
private final int[] ords;
@ -102,6 +104,21 @@ class TermOrdValComparator_SML extends FieldComparator<BytesRef> {
throw new UnsupportedOperationException();
}
@Override
public int compareValues(Comparable first, Comparable second) {
if (first == null) {
if (second == null) {
return 0;
} else {
return 1;
}
} else if (second == null) {
return -1;
} else {
return first.compareTo(second);
}
}
@Override
public FieldComparator setNextReader(AtomicReaderContext context) throws IOException {
return TermOrdValComparator_SML.createComparator(context.reader, this);

View File

@ -17,10 +17,10 @@
package org.apache.solr.search;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import java.util.List;
@ -30,7 +30,7 @@ import java.util.List;
public class QueryUtils {
/** return true if this query has no positive components */
static boolean isNegative(Query q) {
public static boolean isNegative(Query q) {
if (!(q instanceof BooleanQuery)) return false;
BooleanQuery bq = (BooleanQuery)q;
List<BooleanClause> clauses = bq.clauses();
@ -51,7 +51,7 @@ public class QueryUtils {
* @param q
* @return
*/
static Query getAbs(Query q) {
public static Query getAbs(Query q) {
if (q instanceof WrappedQuery) {
Query subQ = ((WrappedQuery)q).getWrappedQuery();
Query absSubQ = getAbs(subQ);
@ -95,7 +95,7 @@ public class QueryUtils {
/** Makes negative queries suitable for querying by
* lucene.
*/
static Query makeQueryable(Query q) {
public static Query makeQueryable(Query q) {
if (q instanceof WrappedQuery) {
return makeQueryable(((WrappedQuery)q).getWrappedQuery());
}
@ -105,7 +105,7 @@ public class QueryUtils {
/** Fixes a negative query by adding a MatchAllDocs query clause.
* The query passed in *must* be a negative query.
*/
static Query fixNegativeQuery(Query q) {
public static Query fixNegativeQuery(Query q) {
BooleanQuery newBq = (BooleanQuery)q.clone();
newBq.add(new MatchAllDocsQuery(), BooleanClause.Occur.MUST);
return newBq;

View File

@ -17,53 +17,13 @@
package org.apache.solr.search;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.document.FieldSelectorResult;
import org.apache.lucene.document.FieldSelectorVisitor;
import org.apache.lucene.index.DocsEnum;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexReader.AtomicReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiDocsEnum;
import org.apache.lucene.index.MultiFields;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TimeLimitingCollector;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopScoreDocCollector;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.*;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Bits;
@ -87,6 +47,11 @@ import org.apache.solr.update.SolrIndexConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URL;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* SolrIndexSearcher adds schema awareness and caching functionality
@ -625,10 +590,10 @@ public class SolrIndexSearcher extends IndexSearcher implements SolrInfoMBean {
private static Query matchAllDocsQuery = new MatchAllDocsQuery();
static class ProcessedFilter {
DocSet answer; // the answer, if non-null
Filter filter;
DelegatingCollector postFilter;
public static class ProcessedFilter {
public DocSet answer; // the answer, if non-null
public Filter filter;
public DelegatingCollector postFilter;
}
@ -1048,7 +1013,7 @@ public class SolrIndexSearcher extends IndexSearcher implements SolrInfoMBean {
}
static final int NO_CHECK_QCACHE = 0x80000000;
static final int GET_DOCSET = 0x40000000;
public static final int GET_DOCSET = 0x40000000;
static final int NO_CHECK_FILTERCACHE = 0x20000000;
static final int NO_SET_QCACHE = 0x10000000;

View File

@ -0,0 +1,42 @@
package org.apache.solr.search.grouping;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.Sort;
import org.apache.solr.schema.SchemaField;
import java.io.IOException;
import java.util.List;
/**
*
*/
public interface Command<T> {
List<Collector> create() throws IOException;
T result();
String getKey();
Sort getGroupSort();
Sort getSortWithinGroup();
}

View File

@ -0,0 +1,142 @@
package org.apache.solr.search.grouping;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.MultiCollector;
import org.apache.lucene.search.Query;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.search.*;
import org.apache.solr.search.grouping.distributed.shardresultserializer.ShardResultTransformer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
*
*/
public class CommandHandler {
public static class Builder {
private SolrIndexSearcher.QueryCommand queryCommand;
private List<Command> commands = new ArrayList<Command>();
private SolrIndexSearcher searcher;
private boolean needDocSet = false;
public Builder setQueryCommand(SolrIndexSearcher.QueryCommand queryCommand) {
this.queryCommand = queryCommand;
this.needDocSet = (queryCommand.getFlags() & SolrIndexSearcher.GET_DOCSET) != 0;
return this;
}
public Builder addCommandField(Command commandField) {
commands.add(commandField);
return this;
}
public Builder setSearcher(SolrIndexSearcher searcher) {
this.searcher = searcher;
return this;
}
/**
* Sets whether to compute a {@link DocSet}.
* May override the value set by {@link #setQueryCommand(org.apache.solr.search.SolrIndexSearcher.QueryCommand)}.
*
* @param needDocSet Whether to compute a {@link DocSet}
* @return this
*/
public Builder setNeedDocSet(boolean needDocSet) {
this.needDocSet = needDocSet;
return this;
}
public CommandHandler build() {
if (queryCommand == null || searcher == null) {
throw new IllegalStateException("All fields must be set");
}
return new CommandHandler(queryCommand, commands, searcher, needDocSet);
}
}
private final SolrIndexSearcher.QueryCommand queryCommand;
private final List<Command> commands;
private final SolrIndexSearcher searcher;
private final boolean needDocset;
private DocSet docSet;
private CommandHandler(SolrIndexSearcher.QueryCommand queryCommand,
List<Command> commands,
SolrIndexSearcher searcher,
boolean needDocset) {
this.queryCommand = queryCommand;
this.commands = commands;
this.searcher = searcher;
this.needDocset = needDocset;
}
@SuppressWarnings("unchecked")
public void execute() throws IOException {
final int nrOfCommands = commands.size();
List<Collector> collectors = new ArrayList<Collector>(nrOfCommands);
for (Command command : commands) {
collectors.addAll(command.create());
}
SolrIndexSearcher.ProcessedFilter pf = searcher.getProcessedFilter(
queryCommand.getFilter(), queryCommand.getFilterList()
);
Filter luceneFilter = pf.filter;
Query query = QueryUtils.makeQueryable(queryCommand.getQuery());
Collector wrappedCollectors;
if (collectors.isEmpty()) {
wrappedCollectors = null;
} else {
wrappedCollectors = MultiCollector.wrap(collectors.toArray(new Collector[nrOfCommands]));
}
if (wrappedCollectors == null && needDocset) {
int maxDoc = searcher.maxDoc();
DocSetCollector docSetCollector = new DocSetCollector(maxDoc >> 6, maxDoc);
searcher.search(query, luceneFilter, docSetCollector);
docSet = docSetCollector.getDocSet();
} else if (needDocset) {
int maxDoc = searcher.maxDoc();
DocSetCollector docSetCollector = new DocSetDelegateCollector(maxDoc >> 6, maxDoc, wrappedCollectors);
searcher.search(query, luceneFilter, docSetCollector);
docSet = docSetCollector.getDocSet();
} else {
searcher.search(query, luceneFilter, wrappedCollectors);
}
}
@SuppressWarnings("unchecked")
public NamedList processResult(SolrIndexSearcher.QueryResult queryResult, ShardResultTransformer transformer) throws IOException {
if (needDocset) {
queryResult.setDocSet(docSet);
}
return transformer.transform(commands);
}
}

View File

@ -0,0 +1,158 @@
package org.apache.solr.search.grouping;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.Sort;
import org.apache.solr.search.Grouping;
/**
* Encapsulates the grouping options like fields group sort and more specified by clients.
*/
public class GroupingSpecification {
private String[] fields = new String[]{};
private String[] queries = new String[]{};
private String[] functions = new String[]{};
private int offset;
private int limit;
private int groupOffset;
private int groupLimit;
private Sort groupSort;
private Sort sortWithinGroup;
private boolean includeGroupCount;
private boolean main;
private Grouping.Format responseFormat;
private boolean needScore;
public String[] getFields() {
return fields;
}
public void setFields(String[] fields) {
if (fields == null) {
return;
}
this.fields = fields;
}
public String[] getQueries() {
return queries;
}
public void setQueries(String[] queries) {
if (queries == null) {
return;
}
this.queries = queries;
}
public String[] getFunctions() {
return functions;
}
public void setFunctions(String[] functions) {
if (functions == null) {
return;
}
this.functions = functions;
}
public int getGroupOffset() {
return groupOffset;
}
public void setGroupOffset(int groupOffset) {
this.groupOffset = groupOffset;
}
public int getGroupLimit() {
return groupLimit;
}
public void setGroupLimit(int groupLimit) {
this.groupLimit = groupLimit;
}
public int getOffset() {
return offset;
}
public void setOffset(int offset) {
this.offset = offset;
}
public int getLimit() {
return limit;
}
public void setLimit(int limit) {
this.limit = limit;
}
public Sort getGroupSort() {
return groupSort;
}
public void setGroupSort(Sort groupSort) {
this.groupSort = groupSort;
}
public Sort getSortWithinGroup() {
return sortWithinGroup;
}
public void setSortWithinGroup(Sort sortWithinGroup) {
this.sortWithinGroup = sortWithinGroup;
}
public boolean isIncludeGroupCount() {
return includeGroupCount;
}
public void setIncludeGroupCount(boolean includeGroupCount) {
this.includeGroupCount = includeGroupCount;
}
public boolean isMain() {
return main;
}
public void setMain(boolean main) {
this.main = main;
}
public Grouping.Format getResponseFormat() {
return responseFormat;
}
public void setResponseFormat(Grouping.Format responseFormat) {
this.responseFormat = responseFormat;
}
public boolean isNeedScore() {
return needScore;
}
public void setNeedScore(boolean needScore) {
this.needScore = needScore;
}
}

View File

@ -0,0 +1,74 @@
package org.apache.solr.search.grouping.collector;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.Scorer;
import org.apache.solr.search.DocSet;
import java.io.IOException;
/**
* A collector that filters incoming doc ids that are not in the filter
*/
public class FilterCollector extends Collector {
private final DocSet filter;
private final Collector delegate;
private int docBase;
private int matches;
public FilterCollector(DocSet filter, Collector delegate) throws IOException {
this.filter = filter;
this.delegate = delegate;
}
public void setScorer(Scorer scorer) throws IOException {
delegate.setScorer(scorer);
}
public void collect(int doc) throws IOException {
matches++;
if (filter.exists(doc + docBase)) {
delegate.collect(doc);
}
}
public void setNextReader(IndexReader.AtomicReaderContext context) throws IOException {
this.docBase = context.docBase;
delegate.setNextReader(context);
}
public boolean acceptsDocsOutOfOrder() {
return delegate.acceptsDocsOutOfOrder();
}
public int getMatches() {
return matches;
}
/**
* Returns the delegate collector
*
* @return the delegate collector
*/
public Collector getDelegate() {
return delegate;
}
}

View File

@ -0,0 +1,37 @@
package org.apache.solr.search.grouping.distributed;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.handler.component.ShardRequest;
/**
* Responsible for creating shard requests to the shards in the cluster to perform distributed grouping.
*/
public interface ShardRequestFactory {
/**
* Returns {@link ShardRequest} instances.
* Never returns <code>null</code>. If no {@link ShardRequest} instances are constructed an empty array is returned.
*
* @param rb The response builder
* @return {@link ShardRequest} instances
*/
ShardRequest[] constructRequest(ResponseBuilder rb);
}

View File

@ -0,0 +1,37 @@
package org.apache.solr.search.grouping.distributed;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.handler.component.ShardRequest;
/**
* Responsible for processing shard responses.
*/
public interface ShardResponseProcessor {
/**
* Processes the responses from the specified shardRequest. The result is put into specific
* fields in the specified rb.
*
* @param rb The ResponseBuilder to put the merge result into
* @param shardRequest The shard request containing the responses from all shards.
*/
void process(ResponseBuilder rb, ShardRequest shardRequest);
}

View File

@ -0,0 +1,152 @@
package org.apache.solr.search.grouping.distributed.command;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.search.*;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.DocSet;
import org.apache.solr.search.QParser;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.search.grouping.Command;
import org.apache.solr.search.grouping.collector.FilterCollector;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
/**
*
*/
public class QueryCommand implements Command<QueryCommandResult> {
public static class Builder {
private Sort sort;
private String queryString;
private Query query;
private DocSet docSet;
private Integer docsToCollect;
private boolean needScores;
public Builder setSort(Sort sort) {
this.sort = sort;
return this;
}
public Builder setQuery(Query query) {
this.query = query;
return this;
}
/**
* Sets the group query from the specified groupQueryString.
* The groupQueryString is parsed into a query.
*
* @param groupQueryString The group query string to parse
* @param request The current request
* @return this
* @throws ParseException If parsing the groupQueryString failed
*/
public Builder setQuery(String groupQueryString, SolrQueryRequest request) throws ParseException {
QParser parser = QParser.getParser(groupQueryString, null, request);
this.queryString = groupQueryString;
return setQuery(parser.getQuery());
}
public Builder setDocSet(DocSet docSet) {
this.docSet = docSet;
return this;
}
/**
* Sets the docSet based on the created {@link DocSet}
*
* @param searcher The searcher executing the
* @return
* @throws IOException
*/
public Builder setDocSet(SolrIndexSearcher searcher) throws IOException {
return setDocSet(searcher.getDocSet(query));
}
public Builder setDocsToCollect(int docsToCollect) {
this.docsToCollect = docsToCollect;
return this;
}
public Builder setNeedScores(boolean needScores) {
this.needScores = needScores;
return this;
}
public QueryCommand build() {
if (sort == null || query == null || docSet == null || docsToCollect == null) {
throw new IllegalStateException("All fields must be set");
}
return new QueryCommand(sort, query, docsToCollect, needScores, docSet, queryString);
}
}
private final Sort sort;
private final Query query;
private final DocSet docSet;
private final int docsToCollect;
private final boolean needScores;
private final String queryString;
private TopDocsCollector collector;
private FilterCollector filterCollector;
private QueryCommand(Sort sort, Query query, int docsToCollect, boolean needScores, DocSet docSet, String queryString) {
this.sort = sort;
this.query = query;
this.docsToCollect = docsToCollect;
this.needScores = needScores;
this.docSet = docSet;
this.queryString = queryString;
}
public List<Collector> create() throws IOException {
if (sort == null || sort == Sort.RELEVANCE) {
collector = TopScoreDocCollector.create(docsToCollect, true);
} else {
collector = TopFieldCollector.create(sort, docsToCollect, true, needScores, needScores, true);
}
filterCollector = new FilterCollector(docSet, collector);
return Arrays.asList((Collector) filterCollector);
}
public QueryCommandResult result() {
return new QueryCommandResult(collector.topDocs(), filterCollector.getMatches());
}
public String getKey() {
return queryString != null ? queryString : query.toString();
}
public Sort getGroupSort() {
return sort;
}
public Sort getSortWithinGroup() {
return null;
}
}

View File

@ -0,0 +1,42 @@
package org.apache.solr.search.grouping.distributed.command;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.TopDocs;
/**
* Encapsulates {@link TopDocs} and the number of matches.
*/
public class QueryCommandResult {
private final TopDocs topDocs;
private final int matches;
public QueryCommandResult(TopDocs topDocs, int matches) {
this.topDocs = topDocs;
this.matches = matches;
}
public TopDocs getTopDocs() {
return topDocs;
}
public int getMatches() {
return matches;
}
}

View File

@ -0,0 +1,101 @@
package org.apache.solr.search.grouping.distributed.command;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.grouping.SearchGroup;
import org.apache.lucene.search.grouping.TermFirstPassGroupingCollector;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.grouping.Command;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
*
*/
public class SearchGroupsFieldCommand implements Command<Collection<SearchGroup<BytesRef>>> {
public static class Builder {
private SchemaField field;
private Sort groupSort;
private Integer topNGroups;
public Builder setField(SchemaField field) {
this.field = field;
return this;
}
public Builder setGroupSort(Sort groupSort) {
this.groupSort = groupSort;
return this;
}
public Builder setTopNGroups(int topNGroups) {
this.topNGroups = topNGroups;
return this;
}
public SearchGroupsFieldCommand build() {
if (field == null || groupSort == null || topNGroups == null) {
throw new IllegalStateException("All fields must be set");
}
return new SearchGroupsFieldCommand(field, groupSort, topNGroups);
}
}
private final SchemaField field;
private final Sort groupSort;
private final int topNGroups;
private TermFirstPassGroupingCollector collector;
private SearchGroupsFieldCommand(SchemaField field, Sort groupSort, int topNGroups) {
this.field = field;
this.groupSort = groupSort;
this.topNGroups = topNGroups;
}
public List<Collector> create() throws IOException {
collector = new TermFirstPassGroupingCollector(field.getName(), groupSort, topNGroups);
return Arrays.asList((Collector) collector);
}
public Collection<SearchGroup<BytesRef>> result() {
return collector.getTopGroups(0, true);
}
public Sort getSortWithinGroup() {
return null;
}
public Sort getGroupSort() {
return groupSort;
}
public String getKey() {
return field.getName();
}
}

View File

@ -0,0 +1,165 @@
package org.apache.solr.search.grouping.distributed.command;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.grouping.SearchGroup;
import org.apache.lucene.search.grouping.TermAllGroupsCollector;
import org.apache.lucene.search.grouping.TermSecondPassGroupingCollector;
import org.apache.lucene.search.grouping.TopGroups;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.grouping.Command;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
*
*/
public class TopGroupsFieldCommand implements Command<TopGroups<BytesRef>> {
public static class Builder {
private SchemaField field;
private Sort groupSort;
private Sort sortWithinGroup;
private Collection<SearchGroup<BytesRef>> firstPhaseGroups;
private Integer maxDocPerGroup;
private boolean needScores = false;
private boolean needMaxScore = false;
private boolean needGroupCount = false;
public Builder setField(SchemaField field) {
this.field = field;
return this;
}
public Builder setGroupSort(Sort groupSort) {
this.groupSort = groupSort;
return this;
}
public Builder setSortWithinGroup(Sort sortWithinGroup) {
this.sortWithinGroup = sortWithinGroup;
return this;
}
public Builder setFirstPhaseGroups(Collection<SearchGroup<BytesRef>> firstPhaseGroups) {
this.firstPhaseGroups = firstPhaseGroups;
return this;
}
public Builder setMaxDocPerGroup(int maxDocPerGroup) {
this.maxDocPerGroup = maxDocPerGroup;
return this;
}
public Builder setNeedScores(Boolean needScores) {
this.needScores = needScores;
return this;
}
public Builder setNeedMaxScore(Boolean needMaxScore) {
this.needMaxScore = needMaxScore;
return this;
}
public Builder setNeedGroupCount(Boolean needGroupCount) {
this.needGroupCount = needGroupCount;
return this;
}
public TopGroupsFieldCommand build() {
if (field == null || groupSort == null || sortWithinGroup == null || firstPhaseGroups == null ||
maxDocPerGroup == null) {
throw new IllegalStateException("All required fields must be set");
}
return new TopGroupsFieldCommand(field, groupSort, sortWithinGroup, firstPhaseGroups, maxDocPerGroup, needScores, needMaxScore, needGroupCount);
}
}
private final SchemaField field;
private final Sort groupSort;
private final Sort sortWithinGroup;
private final Collection<SearchGroup<BytesRef>> firstPhaseGroups;
private final int maxDocPerGroup;
private final boolean needScores;
private final boolean needMaxScore;
private final boolean needGroupCount;
private TermSecondPassGroupingCollector secondPassCollector;
private TermAllGroupsCollector allGroupsCollector;
private TopGroupsFieldCommand(SchemaField field,
Sort groupSort,
Sort sortWithinGroup,
Collection<SearchGroup<BytesRef>> firstPhaseGroups,
int maxDocPerGroup,
boolean needScores,
boolean needMaxScore,
boolean needGroupCount) {
this.field = field;
this.groupSort = groupSort;
this.sortWithinGroup = sortWithinGroup;
this.firstPhaseGroups = firstPhaseGroups;
this.maxDocPerGroup = maxDocPerGroup;
this.needScores = needScores;
this.needMaxScore = needMaxScore;
this.needGroupCount = needGroupCount;
}
public List<Collector> create() throws IOException {
List<Collector> collectors = new ArrayList<Collector>();
secondPassCollector = new TermSecondPassGroupingCollector(
field.getName(), firstPhaseGroups, groupSort, sortWithinGroup, maxDocPerGroup, needScores, needMaxScore, true
);
collectors.add(secondPassCollector);
if (!needGroupCount) {
return collectors;
}
allGroupsCollector = new TermAllGroupsCollector(field.getName());
collectors.add(allGroupsCollector);
return collectors;
}
public TopGroups<BytesRef> result() {
TopGroups<BytesRef> result = secondPassCollector.getTopGroups(0);
if (allGroupsCollector != null) {
result = new TopGroups<BytesRef>(result, allGroupsCollector.getGroupCount());
}
return result;
}
public String getKey() {
return field.getName();
}
public Sort getGroupSort() {
return groupSort;
}
public Sort getSortWithinGroup() {
return sortWithinGroup;
}
}

View File

@ -0,0 +1,84 @@
package org.apache.solr.search.grouping.distributed.requestfactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.search.grouping.GroupingSpecification;
import org.apache.solr.search.grouping.distributed.ShardRequestFactory;
/**
* Concrete implementation of {@link ShardRequestFactory} that creates {@link ShardRequest} instances for getting the
* search groups from all shards.
*/
public class SearchGroupsRequestFactory implements ShardRequestFactory {
/**
* {@inheritDoc}
*/
public ShardRequest[] constructRequest(ResponseBuilder rb) {
ShardRequest sreq = new ShardRequest();
GroupingSpecification groupingSpecification = rb.getGroupingSpec();
if (groupingSpecification.getFields().length == 0) {
return new ShardRequest[0];
}
sreq.purpose = ShardRequest.PURPOSE_GET_TOP_GROUPS;
sreq.params = new ModifiableSolrParams(rb.req.getParams());
// TODO: base on current params or original params?
// don't pass through any shards param
sreq.params.remove(ShardParams.SHARDS);
// set the start (offset) to 0 for each shard request so we can properly merge
// results from the start.
if(rb.shards_start > -1) {
// if the client set shards.start set this explicitly
sreq.params.set(CommonParams.START,rb.shards_start);
} else {
sreq.params.set(CommonParams.START, "0");
}
// TODO: should we even use the SortSpec? That's obtained from the QParser, and
// perhaps we shouldn't attempt to parse the query at this level?
// Alternate Idea: instead of specifying all these things at the upper level,
// we could just specify that this is a shard request.
if(rb.shards_rows > -1) {
// if the client set shards.rows set this explicity
sreq.params.set(CommonParams.ROWS,rb.shards_rows);
} else {
sreq.params.set(CommonParams.ROWS, rb.getSortSpec().getOffset() + rb.getSortSpec().getCount());
}
// in this first phase, request only the unique key field
// and any fields needed for merging.
sreq.params.set("group.distibuted.first","true");
if ( (rb.getFieldFlags() & SolrIndexSearcher.GET_SCORES)!=0 || rb.getSortSpec().includesScore()) {
sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName() + ",score");
} else {
sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName());
}
return new ShardRequest[] {sreq};
}
}

View File

@ -0,0 +1,100 @@
package org.apache.solr.search.grouping.distributed.requestfactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.grouping.GroupDocs;
import org.apache.lucene.search.grouping.TopGroups;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.GroupParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.handler.component.ShardDoc;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.grouping.distributed.ShardRequestFactory;
import org.apache.solr.search.grouping.distributed.command.QueryCommandResult;
import java.util.*;
/**
*
*/
public class StoredFieldsShardRequestFactory implements ShardRequestFactory {
@Override
public ShardRequest[] constructRequest(ResponseBuilder rb) {
HashMap<String, Set<ShardDoc>> shardMap = new HashMap<String,Set<ShardDoc>>();
for (TopGroups<BytesRef> topGroups : rb.mergedTopGroups.values()) {
for (GroupDocs<BytesRef> group : topGroups.groups) {
mapShardToDocs(shardMap, group.scoreDocs);
}
}
for (QueryCommandResult queryCommandResult : rb.mergedQueryCommandResults.values()) {
mapShardToDocs(shardMap, queryCommandResult.getTopDocs().scoreDocs);
}
ShardRequest[] shardRequests = new ShardRequest[shardMap.size()];
SchemaField uniqueField = rb.req.getSchema().getUniqueKeyField();
int i = 0;
for (Collection<ShardDoc> shardDocs : shardMap.values()) {
ShardRequest sreq = new ShardRequest();
sreq.purpose = ShardRequest.PURPOSE_GET_FIELDS;
sreq.shards = new String[] {shardDocs.iterator().next().shard};
sreq.params = new ModifiableSolrParams();
sreq.params.add( rb.req.getParams());
sreq.params.remove(GroupParams.GROUP);
sreq.params.remove(CommonParams.SORT);
sreq.params.remove(ResponseBuilder.FIELD_SORT_VALUES);
String fl = sreq.params.get(CommonParams.FL);
if (fl != null) {
fl = fl.trim();
// currently, "score" is synonymous with "*,score" so
// don't add "id" if the fl is empty or "score" or it would change the meaning.
if (fl.length()!=0 && !"score".equals(fl) && !"*".equals(fl)) {
sreq.params.set(CommonParams.FL, fl+','+uniqueField.getName());
}
}
List<String> ids = new ArrayList<String>(shardDocs.size());
for (ShardDoc shardDoc : shardDocs) {
ids.add(shardDoc.id.toString());
}
sreq.params.add(ShardParams.IDS, StrUtils.join(ids, ','));
shardRequests[i++] = sreq;
}
return shardRequests;
}
private void mapShardToDocs(HashMap<String, Set<ShardDoc>> shardMap, ScoreDoc[] scoreDocs) {
for (ScoreDoc scoreDoc : scoreDocs) {
ShardDoc solrDoc = (ShardDoc) scoreDoc;
Set<ShardDoc> shardDocs = shardMap.get(solrDoc.shard);
if (shardDocs == null) {
shardMap.put(solrDoc.shard, shardDocs = new HashSet<ShardDoc>());
}
shardDocs.add(solrDoc);
}
}
}

View File

@ -0,0 +1,182 @@
package org.apache.solr.search.grouping.distributed.requestfactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.analysis.reverse.ReverseStringFilter;
import org.apache.lucene.search.grouping.SearchGroup;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.GroupParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.schema.FieldType;
import org.apache.solr.search.Grouping;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.search.grouping.distributed.ShardRequestFactory;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Concrete implementation of {@link ShardRequestFactory} that creates {@link ShardRequest} instances for getting the
* top groups from all shards.
*/
public class TopGroupsShardRequestFactory implements ShardRequestFactory {
/**
* Represents a string value for
*/
public static final String GROUP_NULL_VALUE = "" + ReverseStringFilter.START_OF_HEADING_MARKER;
/**
* {@inheritDoc}
*/
public ShardRequest[] constructRequest(ResponseBuilder rb) {
// If we have a group.query we need to query all shards... Or we move this to the group first phase queries
boolean containsGroupByQuery = rb.getGroupingSpec().getQueries().length > 0;
// TODO: If groups.truncate=true we only have to query the specific shards even faceting and statistics are enabled
if ((rb.getQueryCommand().getFlags() & SolrIndexSearcher.GET_DOCSET) != 0 || containsGroupByQuery) {
// In case we need more results such as faceting and statistics we have to query all shards
return createRequestForAllShards(rb);
} else {
// In case we only need top groups we only have to query the shards that contain these groups.
return createRequestForSpecificShards(rb);
}
}
private ShardRequest[] createRequestForSpecificShards(ResponseBuilder rb) {
// Determine all unique shards to query for TopGroups
Set<String> shards = new HashSet<String>();
for (String command : rb.searchGroupToShard.keySet()) {
Map<SearchGroup<BytesRef>, String> groupsToShard = rb.searchGroupToShard.get(command);
shards.addAll(groupsToShard.values());
}
ShardRequest[] sreqs = new ShardRequest[shards.size()];
int i = 0;
for (String shard : shards) {
ShardRequest sreq = new ShardRequest();
sreq.purpose = ShardRequest.PURPOSE_GET_TOP_IDS;
sreq.actualShards = new String[] {shard};
sreq.params = new ModifiableSolrParams(rb.req.getParams());
// If group.format=simple group.offset doesn't make sense
Grouping.Format responseFormat = rb.getGroupingSpec().getResponseFormat();
if (responseFormat == Grouping.Format.simple || rb.getGroupingSpec().isMain()) {
sreq.params.remove(GroupParams.GROUP_OFFSET);
}
sreq.params.remove(ShardParams.SHARDS);
// set the start (offset) to 0 for each shard request so we can properly merge
// results from the start.
if(rb.shards_start > -1) {
// if the client set shards.start set this explicitly
sreq.params.set(CommonParams.START,rb.shards_start);
} else {
sreq.params.set(CommonParams.START, "0");
}
if(rb.shards_rows > -1) {
// if the client set shards.rows set this explicity
sreq.params.set(CommonParams.ROWS,rb.shards_rows);
} else {
sreq.params.set(CommonParams.ROWS, rb.getSortSpec().getOffset() + rb.getSortSpec().getCount());
}
sreq.params.set("group.distibuted.second","true");
for (Map.Entry<String, Collection<SearchGroup<BytesRef>>> entry : rb.mergedSearchGroups.entrySet()) {
for (SearchGroup<BytesRef> searchGroup : entry.getValue()) {
String groupValue;
if (searchGroup.groupValue != null) {
String rawGroupValue = searchGroup.groupValue.utf8ToString();
FieldType fieldType = rb.req.getSearcher().getSchema().getField(entry.getKey()).getType();
groupValue = fieldType.indexedToReadable(rawGroupValue);
} else {
groupValue = GROUP_NULL_VALUE;
}
sreq.params.add("group.topgroups." + entry.getKey(), groupValue);
}
}
if ((rb.getFieldFlags() & SolrIndexSearcher.GET_SCORES) != 0 || rb.getSortSpec().includesScore()) {
sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName() + ",score");
} else {
sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName());
}
sreqs[i++] = sreq;
}
return sreqs;
}
private ShardRequest[] createRequestForAllShards(ResponseBuilder rb) {
ShardRequest sreq = new ShardRequest();
sreq.purpose = ShardRequest.PURPOSE_GET_TOP_IDS;
sreq.params = new ModifiableSolrParams(rb.req.getParams());
// If group.format=simple group.offset doesn't make sense
Grouping.Format responseFormat = rb.getGroupingSpec().getResponseFormat();
if (responseFormat == Grouping.Format.simple || rb.getGroupingSpec().isMain()) {
sreq.params.remove(GroupParams.GROUP_OFFSET);
}
sreq.params.remove(ShardParams.SHARDS);
// set the start (offset) to 0 for each shard request so we can properly merge
// results from the start.
if(rb.shards_start > -1) {
// if the client set shards.start set this explicitly
sreq.params.set(CommonParams.START,rb.shards_start);
} else {
sreq.params.set(CommonParams.START, "0");
}
if(rb.shards_rows > -1) {
// if the client set shards.rows set this explicity
sreq.params.set(CommonParams.ROWS,rb.shards_rows);
} else {
sreq.params.set(CommonParams.ROWS, rb.getSortSpec().getOffset() + rb.getSortSpec().getCount());
}
sreq.params.set("group.distibuted.second","true");
for (Map.Entry<String, Collection<SearchGroup<BytesRef>>> entry : rb.mergedSearchGroups.entrySet()) {
for (SearchGroup<BytesRef> searchGroup : entry.getValue()) {
String groupValue;
if (searchGroup.groupValue != null) {
String rawGroupValue = searchGroup.groupValue.utf8ToString();
FieldType fieldType = rb.req.getSearcher().getSchema().getField(entry.getKey()).getType();
groupValue = fieldType.indexedToReadable(rawGroupValue);
} else {
groupValue = GROUP_NULL_VALUE;
}
sreq.params.add("group.topgroups." + entry.getKey(), groupValue);
}
}
if ( (rb.getFieldFlags() & SolrIndexSearcher.GET_SCORES)!=0 || rb.getSortSpec().includesScore()) {
sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName() + ",score");
} else {
sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName());
}
return new ShardRequest[] {sreq};
}
}

View File

@ -0,0 +1,93 @@
package org.apache.solr.search.grouping.distributed.responseprocessor;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.grouping.SearchGroup;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.search.SortSpec;
import org.apache.solr.search.grouping.distributed.ShardResponseProcessor;
import org.apache.solr.search.grouping.distributed.shardresultserializer.SearchGroupsResultTransformer;
import java.io.IOException;
import java.util.*;
/**
* Concrete implementation for merging {@link SearchGroup} instances from shard responses.
*/
public class SearchGroupShardResponseProcessor implements ShardResponseProcessor {
/**
* {@inheritDoc}
*/
public void process(ResponseBuilder rb, ShardRequest shardRequest) {
SortSpec ss = rb.getSortSpec();
Sort groupSort = rb.getGroupingSpec().getGroupSort();
String[] fields = rb.getGroupingSpec().getFields();
Map<String, List<Collection<SearchGroup<BytesRef>>>> commandSearchGroups = new HashMap<String, List<Collection<SearchGroup<BytesRef>>>>();
Map<String, Map<SearchGroup<BytesRef>, String>> tempSearchGroupToShard = new HashMap<String, Map<SearchGroup<BytesRef>, String>>();
for (String field : fields) {
commandSearchGroups.put(field, new ArrayList<Collection<SearchGroup<BytesRef>>>(shardRequest.responses.size()));
tempSearchGroupToShard.put(field, new HashMap<SearchGroup<BytesRef>, String>());
if (!rb.searchGroupToShard.containsKey(field)) {
rb.searchGroupToShard.put(field, new HashMap<SearchGroup<BytesRef>, String>());
}
}
SearchGroupsResultTransformer serializer = new SearchGroupsResultTransformer(rb.req.getSearcher());
try {
for (ShardResponse srsp : shardRequest.responses) {
@SuppressWarnings("unchecked")
NamedList<NamedList> firstPhaseResult = (NamedList<NamedList>) srsp.getSolrResponse().getResponse().get("firstPhase");
Map<String, Collection<SearchGroup<BytesRef>>> result = serializer.transformToNative(firstPhaseResult, groupSort, null, srsp.getShard());
for (String field : commandSearchGroups.keySet()) {
Collection<SearchGroup<BytesRef>> searchGroups = result.get(field);
if (searchGroups == null) {
continue;
}
commandSearchGroups.get(field).add(searchGroups);
for (SearchGroup<BytesRef> searchGroup : searchGroups) {
tempSearchGroupToShard.get(field).put(searchGroup, srsp.getShard());
}
}
}
for (String groupField : commandSearchGroups.keySet()) {
List<Collection<SearchGroup<BytesRef>>> topGroups = commandSearchGroups.get(groupField);
Collection<SearchGroup<BytesRef>> mergedTopGroups = SearchGroup.merge(topGroups, ss.getOffset(), ss.getCount(), groupSort);
if (mergedTopGroups == null) {
continue;
}
rb.mergedSearchGroups.put(groupField, mergedTopGroups);
for (SearchGroup<BytesRef> mergedTopGroup : mergedTopGroups) {
rb.searchGroupToShard.get(groupField).put(mergedTopGroup, tempSearchGroupToShard.get(groupField).get(mergedTopGroup));
}
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
}

View File

@ -0,0 +1,56 @@
package org.apache.solr.search.grouping.distributed.responseprocessor;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.FieldDoc;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.handler.component.ShardDoc;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.search.grouping.distributed.ShardResponseProcessor;
/**
* Concrete implementation for processing the stored field values from shard responses.
*/
public class StoredFieldsShardResponseProcessor implements ShardResponseProcessor {
/**
* {@inheritDoc}
*/
public void process(ResponseBuilder rb, ShardRequest shardRequest) {
boolean returnScores = (rb.getFieldFlags() & SolrIndexSearcher.GET_SCORES) != 0;
ShardResponse srsp = shardRequest.responses.get(0);
SolrDocumentList docs = (SolrDocumentList)srsp.getSolrResponse().getResponse().get("response");
String uniqueIdFieldName = rb.req.getSchema().getUniqueKeyField().getName();
for (SolrDocument doc : docs) {
Object id = doc.getFieldValue(uniqueIdFieldName).toString();
ShardDoc shardDoc = rb.resultIds.get(id);
FieldDoc fieldDoc = (FieldDoc) shardDoc;
if (shardDoc != null) {
if (returnScores && !Float.isNaN(fieldDoc.score)) {
doc.setField("score", fieldDoc.score);
}
rb.retrievedDocuments.put(id, doc);
}
}
}
}

View File

@ -0,0 +1,141 @@
package org.apache.solr.search.grouping.distributed.responseprocessor;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.grouping.GroupDocs;
import org.apache.lucene.search.grouping.TopGroups;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.handler.component.ShardDoc;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.search.Grouping;
import org.apache.solr.search.grouping.distributed.ShardResponseProcessor;
import org.apache.solr.search.grouping.distributed.command.QueryCommandResult;
import org.apache.solr.search.grouping.distributed.shardresultserializer.TopGroupsResultTransformer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Concrete implementation for merging {@link TopGroups} instances from shard responses.
*/
public class TopGroupsShardResponseProcessor implements ShardResponseProcessor {
/**
* {@inheritDoc}
*/
@SuppressWarnings("unchecked")
public void process(ResponseBuilder rb, ShardRequest shardRequest) {
Sort groupSort = rb.getGroupingSpec().getGroupSort();
String[] fields = rb.getGroupingSpec().getFields();
String[] queries = rb.getGroupingSpec().getQueries();
Sort sortWithinGroup = rb.getGroupingSpec().getSortWithinGroup();
// If group.format=simple group.offset doesn't make sense
int groupOffsetDefault;
if (rb.getGroupingSpec().getResponseFormat() == Grouping.Format.simple || rb.getGroupingSpec().isMain()) {
groupOffsetDefault = 0;
} else {
groupOffsetDefault = rb.getGroupingSpec().getGroupOffset();
}
int docsPerGroupDefault = rb.getGroupingSpec().getGroupLimit();
Map<String, List<TopGroups<BytesRef>>> commandTopGroups = new HashMap<String, List<TopGroups<BytesRef>>>();
for (String field : fields) {
commandTopGroups.put(field, new ArrayList<TopGroups<BytesRef>>());
}
Map<String, List<QueryCommandResult>> commandTopDocs = new HashMap<String, List<QueryCommandResult>>();
for (String query : queries) {
commandTopDocs.put(query, new ArrayList<QueryCommandResult>());
}
TopGroupsResultTransformer serializer = new TopGroupsResultTransformer(rb);
for (ShardResponse srsp : shardRequest.responses) {
NamedList<NamedList> secondPhaseResult = (NamedList<NamedList>) srsp.getSolrResponse().getResponse().get("secondPhase");
Map<String, ?> result = serializer.transformToNative(secondPhaseResult, groupSort, sortWithinGroup, srsp.getShard());
for (String field : commandTopGroups.keySet()) {
TopGroups<BytesRef> topGroups = (TopGroups<BytesRef>) result.get(field);
if (topGroups == null) {
continue;
}
commandTopGroups.get(field).add(topGroups);
}
for (String query : queries) {
commandTopDocs.get(query).add((QueryCommandResult) result.get(query));
}
}
try {
for (String groupField : commandTopGroups.keySet()) {
List<TopGroups<BytesRef>> topGroups = commandTopGroups.get(groupField);
if (topGroups.isEmpty()) {
continue;
}
TopGroups<BytesRef>[] topGroupsArr = new TopGroups[topGroups.size()];
rb.mergedTopGroups.put(groupField, TopGroups.merge(topGroups.toArray(topGroupsArr), groupSort, sortWithinGroup, groupOffsetDefault, docsPerGroupDefault));
}
for (String query : commandTopDocs.keySet()) {
List<QueryCommandResult> queryCommandResults = commandTopDocs.get(query);
List<TopDocs> topDocs = new ArrayList<TopDocs>(queryCommandResults.size());
int mergedMatches = 0;
for (QueryCommandResult queryCommandResult : queryCommandResults) {
topDocs.add(queryCommandResult.getTopDocs());
mergedMatches += queryCommandResult.getMatches();
}
int topN = rb.getGroupingSpec().getOffset() + rb.getGroupingSpec().getLimit();
TopDocs mergedTopDocs = TopDocs.merge(sortWithinGroup, topN, topDocs.toArray(new TopDocs[topDocs.size()]));
rb.mergedQueryCommandResults.put(query, new QueryCommandResult(mergedTopDocs, mergedMatches));
}
Map<Object, ShardDoc> resultIds = new HashMap<Object, ShardDoc>();
int i = 0;
for (TopGroups<BytesRef> topGroups : rb.mergedTopGroups.values()) {
for (GroupDocs<BytesRef> group : topGroups.groups) {
for (ScoreDoc scoreDoc : group.scoreDocs) {
ShardDoc solrDoc = (ShardDoc) scoreDoc;
solrDoc.positionInResponse = i++;
resultIds.put(solrDoc.id, solrDoc);
}
}
}
for (QueryCommandResult queryCommandResult : rb.mergedQueryCommandResults.values()) {
for (ScoreDoc scoreDoc : queryCommandResult.getTopDocs().scoreDocs) {
ShardDoc solrDoc = (ShardDoc) scoreDoc;
solrDoc.positionInResponse = i++;
resultIds.put(solrDoc.id, solrDoc);
}
}
rb.resultIds = resultIds;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
}

View File

@ -0,0 +1,117 @@
package org.apache.solr.search.grouping.distributed.shardresultserializer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.grouping.SearchGroup;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.search.grouping.Command;
import org.apache.solr.search.grouping.distributed.command.SearchGroupsFieldCommand;
import java.io.IOException;
import java.util.*;
/**
* Implementation for transforming {@link SearchGroup} into a {@link NamedList} structure and visa versa.
*/
public class SearchGroupsResultTransformer implements ShardResultTransformer<List<Command>, Map<String, Collection<SearchGroup<BytesRef>>>> {
private final SolrIndexSearcher searcher;
public SearchGroupsResultTransformer(SolrIndexSearcher searcher) {
this.searcher = searcher;
}
/**
* {@inheritDoc}
*/
public NamedList transform(List<Command> data) throws IOException {
NamedList<NamedList> result = new NamedList<NamedList>();
for (Command command : data) {
NamedList commandResult;
if (SearchGroupsFieldCommand.class.isInstance(command)) {
SearchGroupsFieldCommand fieldCommand = (SearchGroupsFieldCommand) command;
Collection<SearchGroup<BytesRef>> searchGroups = fieldCommand.result();
if (searchGroups == null) {
continue;
}
commandResult = serializeSearchGroup(searchGroups, fieldCommand.getGroupSort());
} else {
commandResult = null;
}
result.add(command.getKey(), commandResult);
}
return result;
}
/**
* {@inheritDoc}
*/
public Map<String, Collection<SearchGroup<BytesRef>>> transformToNative(NamedList<NamedList> shardResponse, Sort groupSort, Sort sortWithinGroup, String shard) throws IOException {
Map<String, Collection<SearchGroup<BytesRef>>> result = new HashMap<String, Collection<SearchGroup<BytesRef>>>();
for (Map.Entry<String, NamedList> command : shardResponse) {
List<SearchGroup<BytesRef>> searchGroups = new ArrayList<SearchGroup<BytesRef>>();
@SuppressWarnings("unchecked")
NamedList<List<Comparable>> rawSearchGroups = command.getValue();
for (Map.Entry<String, List<Comparable>> rawSearchGroup : rawSearchGroups){
SearchGroup<BytesRef> searchGroup = new SearchGroup<BytesRef>();
searchGroup.groupValue = rawSearchGroup.getKey() != null ? new BytesRef(rawSearchGroup.getKey()) : null;
searchGroup.sortValues = rawSearchGroup.getValue().toArray(new Comparable[rawSearchGroup.getValue().size()]);
searchGroups.add(searchGroup);
}
result.put(command.getKey(), searchGroups);
}
return result;
}
private NamedList serializeSearchGroup(Collection<SearchGroup<BytesRef>> data, Sort groupSort) {
NamedList<Comparable[]> result = new NamedList<Comparable[]>();
CharsRef spare = new CharsRef();
for (SearchGroup<BytesRef> searchGroup : data) {
Comparable[] convertedSortValues = new Comparable[searchGroup.sortValues.length];
for (int i = 0; i < searchGroup.sortValues.length; i++) {
Comparable sortValue = (Comparable) searchGroup.sortValues[i];
SchemaField field = groupSort.getSort()[i].getField() != null ? searcher.getSchema().getFieldOrNull(groupSort.getSort()[i].getField()) : null;
if (field != null) {
FieldType fieldType = field.getType();
if (sortValue instanceof BytesRef) {
String indexedValue = ((BytesRef) sortValue).utf8ToChars(spare).toString();
sortValue = (Comparable) fieldType.toObject(field.createField(fieldType.indexedToReadable(indexedValue), 0.0f));
} else if (sortValue instanceof String) {
sortValue = (Comparable) fieldType.toObject(field.createField(fieldType.indexedToReadable((String) sortValue), 0.0f));
}
}
convertedSortValues[i] = sortValue;
}
String groupValue = searchGroup.groupValue != null ? searchGroup.groupValue.utf8ToString() : null;
result.add(groupValue, convertedSortValues);
}
return result;
}
}

View File

@ -0,0 +1,53 @@
package org.apache.solr.search.grouping.distributed.shardresultserializer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.Sort;
import org.apache.solr.common.util.NamedList;
import java.io.IOException;
/**
* A <code>ShardResultTransformer</code> is responsible for transforming a grouped shard result into group related
* structures (such as {@link org.apache.lucene.search.grouping.TopGroups} and {@link org.apache.lucene.search.grouping.SearchGroup})
* and visa versa.
*/
public interface ShardResultTransformer<T, R> {
/**
* Transforms data to a {@link NamedList} structure for serialization purposes.
*
* @param data The data to be transformed
* @return {@link NamedList} structure
* @throws IOException If I/O related errors occur during transforming
*/
NamedList transform(T data) throws IOException;
/**
* Transforms the specified shard response into native structures.
*
* @param shardResponse The shard response containing data in a {@link NamedList} structure
* @param groupSort The group sort
* @param sortWithinGroup The sort inside a group
* @param shard The shard address where the response originated from
* @return native structure of the data
* @throws IOException If I/O related errors occur during transforming
*/
R transformToNative(NamedList<NamedList> shardResponse, Sort groupSort, Sort sortWithinGroup, String shard) throws IOException;
}

View File

@ -0,0 +1,285 @@
package org.apache.solr.search.grouping.distributed.shardresultserializer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.document.Document;
import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.document.FieldSelectorResult;
import org.apache.lucene.document.FieldSelectorVisitor;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.grouping.GroupDocs;
import org.apache.lucene.search.grouping.TopGroups;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.handler.component.ShardDoc;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.grouping.Command;
import org.apache.solr.search.grouping.distributed.command.QueryCommand;
import org.apache.solr.search.grouping.distributed.command.QueryCommandResult;
import org.apache.solr.search.grouping.distributed.command.TopGroupsFieldCommand;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Implementation for transforming {@link TopGroups} and {@link TopDocs} into a {@link NamedList} structure and
* visa versa.
*/
public class TopGroupsResultTransformer implements ShardResultTransformer<List<Command>, Map<String, ?>> {
private final ResponseBuilder rb;
public TopGroupsResultTransformer(ResponseBuilder rb) {
this.rb = rb;
}
/**
* {@inheritDoc}
*/
public NamedList transform(List<Command> data) throws IOException {
NamedList<NamedList> result = new NamedList<NamedList>();
for (Command command : data) {
NamedList commandResult;
if (TopGroupsFieldCommand.class.isInstance(command)) {
TopGroupsFieldCommand fieldCommand = (TopGroupsFieldCommand) command;
SchemaField groupField = rb.req.getSearcher().getSchema().getField(fieldCommand.getKey());
commandResult = serializeTopGroups(fieldCommand.result(), groupField);
} else if (QueryCommand.class.isInstance(command)) {
QueryCommand queryCommand = (QueryCommand) command;
commandResult = serializeTopDocs(queryCommand.result());
} else {
commandResult = null;
}
result.add(command.getKey(), commandResult);
}
return result;
}
/**
* {@inheritDoc}
*/
public Map<String, ?> transformToNative(NamedList<NamedList> shardResponse, Sort groupSort, Sort sortWithinGroup, String shard) {
Map<String, Object> result = new HashMap<String, Object>();
for (Map.Entry<String, NamedList> entry : shardResponse) {
String key = entry.getKey();
NamedList commandResult = entry.getValue();
Integer totalGroupedHitCount = (Integer) commandResult.get("totalGroupedHitCount");
Integer totalHits = (Integer) commandResult.get("totalHits");
if (totalHits != null) {
Integer matches = (Integer) commandResult.get("matches");
Float maxScore = (Float) commandResult.get("maxScore");
if (maxScore == null) {
maxScore = Float.NaN;
}
@SuppressWarnings("unchecked")
List<NamedList<Object>> documents = (List<NamedList<Object>>) commandResult.get("documents");
ScoreDoc[] scoreDocs = new ScoreDoc[documents.size()];
int j = 0;
for (NamedList<Object> document : documents) {
Object uniqueId = document.get("id").toString();
Float score = (Float) document.get("score");
if (score == null) {
score = Float.NaN;
}
Object[] sortValues = ((List) document.get("sortValues")).toArray();
scoreDocs[j++] = new ShardDoc(score, sortValues, uniqueId, shard);
}
result.put(key, new QueryCommandResult(new TopDocs(totalHits, scoreDocs, maxScore), matches));
continue;
}
Integer totalHitCount = (Integer) commandResult.get("totalHitCount");
Integer totalGroupCount = (Integer) commandResult.get("totalGroupCount");
List<GroupDocs<BytesRef>> groupDocs = new ArrayList<GroupDocs<BytesRef>>();
for (int i = totalGroupCount == null ? 2 : 3; i < commandResult.size(); i++) {
String groupValue = commandResult.getName(i);
@SuppressWarnings("unchecked")
NamedList<Object> groupResult = (NamedList<Object>) commandResult.getVal(i);
Integer totalGroupHits = (Integer) groupResult.get("totalHits");
Float maxScore = (Float) groupResult.get("maxScore");
if (maxScore == null) {
maxScore = Float.NaN;
}
@SuppressWarnings("unchecked")
List<NamedList<Object>> documents = (List<NamedList<Object>>) groupResult.get("documents");
ScoreDoc[] scoreDocs = new ScoreDoc[documents.size()];
int j = 0;
for (NamedList<Object> document : documents) {
Object uniqueId = document.get("id").toString();
Float score = (Float) document.get("score");
if (score == null) {
score = Float.NaN;
}
Object[] sortValues = ((List) document.get("sortValues")).toArray();
scoreDocs[j++] = new ShardDoc(score, sortValues, uniqueId, shard);
}
BytesRef groupValueRef = groupValue != null ? new BytesRef(groupValue) : null;
groupDocs.add(new GroupDocs<BytesRef>(maxScore, totalGroupHits, scoreDocs, groupValueRef, null));
}
@SuppressWarnings("unchecked")
GroupDocs<BytesRef>[] groupDocsArr = groupDocs.toArray(new GroupDocs[groupDocs.size()]);
TopGroups<BytesRef> topGroups = new TopGroups<BytesRef>(
groupSort.getSort(), sortWithinGroup.getSort(), totalHitCount, totalGroupedHitCount, groupDocsArr
);
if (totalGroupCount != null) {
topGroups = new TopGroups<BytesRef>(topGroups, totalGroupCount);
}
result.put(key, topGroups);
}
return result;
}
protected NamedList serializeTopGroups(TopGroups<BytesRef> data, SchemaField groupField) throws IOException {
NamedList<Object> result = new NamedList<Object>();
result.add("totalGroupedHitCount", data.totalGroupedHitCount);
result.add("totalHitCount", data.totalHitCount);
if (data.totalGroupCount != null) {
result.add("totalGroupCount", data.totalGroupCount);
}
CharsRef spare = new CharsRef();
SchemaField uniqueField = rb.req.getSearcher().getSchema().getUniqueKeyField();
for (GroupDocs<BytesRef> searchGroup : data.groups) {
NamedList<Object> groupResult = new NamedList<Object>();
groupResult.add("totalHits", searchGroup.totalHits);
if (!Float.isNaN(searchGroup.maxScore)) {
groupResult.add("maxScore", searchGroup.maxScore);
}
List<NamedList<Object>> documents = new ArrayList<NamedList<Object>>();
for (int i = 0; i < searchGroup.scoreDocs.length; i++) {
NamedList<Object> document = new NamedList<Object>();
documents.add(document);
Document doc = retrieveDocument(uniqueField, searchGroup.scoreDocs[i].doc);
document.add("id", uniqueField.getType().toObject(doc.getField(uniqueField.getName())));
if (!Float.isNaN(searchGroup.scoreDocs[i].score)) {
document.add("score", searchGroup.scoreDocs[i].score);
}
if (!(searchGroup.scoreDocs[i] instanceof FieldDoc)) {
continue;
}
FieldDoc fieldDoc = (FieldDoc) searchGroup.scoreDocs[i];
Object[] convertedSortValues = new Object[fieldDoc.fields.length];
for (int j = 0; j < fieldDoc.fields.length; j++) {
Object sortValue = fieldDoc.fields[j];
Sort sortWithinGroup = rb.getGroupingSpec().getSortWithinGroup();
SchemaField field = sortWithinGroup.getSort()[j].getField() != null ? rb.req.getSearcher().getSchema().getFieldOrNull(sortWithinGroup.getSort()[j].getField()) : null;
if (field != null) {
FieldType fieldType = field.getType();
if (sortValue instanceof BytesRef) {
String indexedValue = ((BytesRef) sortValue).utf8ToChars(spare).toString();
sortValue = fieldType.toObject(field.createField(fieldType.indexedToReadable(indexedValue), 0.0f));
} else if (sortValue instanceof String) {
sortValue = fieldType.toObject(field.createField(fieldType.indexedToReadable((String) sortValue), 0.0f));
}
}
convertedSortValues[j] = sortValue;
}
document.add("sortValues", convertedSortValues);
}
groupResult.add("documents", documents);
String groupValue = searchGroup.groupValue != null ? groupField.getType().indexedToReadable(searchGroup.groupValue.utf8ToString()): null;
result.add(groupValue, groupResult);
}
return result;
}
protected NamedList serializeTopDocs(QueryCommandResult result) throws IOException {
NamedList<Object> queryResult = new NamedList<Object>();
queryResult.add("matches", result.getMatches());
queryResult.add("totalHits", result.getTopDocs().totalHits);
if (rb.getGroupingSpec().isNeedScore()) {
queryResult.add("maxScore", result.getTopDocs().getMaxScore());
}
List<NamedList> documents = new ArrayList<NamedList>();
queryResult.add("documents", documents);
SchemaField uniqueField = rb.req.getSearcher().getSchema().getUniqueKeyField();
CharsRef spare = new CharsRef();
for (ScoreDoc scoreDoc : result.getTopDocs().scoreDocs) {
NamedList<Object> document = new NamedList<Object>();
documents.add(document);
Document doc = retrieveDocument(uniqueField, scoreDoc.doc);
document.add("id", uniqueField.getType().toObject(doc.getField(uniqueField.getName())));
if (rb.getGroupingSpec().isNeedScore()) {
document.add("score", scoreDoc.score);
}
if (!FieldDoc.class.isInstance(scoreDoc)) {
continue;
}
FieldDoc fieldDoc = (FieldDoc) scoreDoc;
Object[] convertedSortValues = new Object[fieldDoc.fields.length];
for (int j = 0; j < fieldDoc.fields.length; j++) {
Object sortValue = fieldDoc.fields[j];
Sort groupSort = rb.getGroupingSpec().getGroupSort();
SchemaField field = groupSort.getSort()[j].getField() != null ? rb.req.getSearcher().getSchema().getFieldOrNull(groupSort.getSort()[j].getField()) : null;
if (field != null) {
FieldType fieldType = field.getType();
if (sortValue instanceof BytesRef) {
String indexedValue = ((BytesRef) sortValue).utf8ToChars(spare).toString();
sortValue = fieldType.toObject(field.createField(fieldType.indexedToReadable(indexedValue), 0.0f));
} else if (sortValue instanceof String) {
sortValue = fieldType.toObject(field.createField(fieldType.indexedToReadable((String) sortValue), 0.0f));
}
}
convertedSortValues[j] = sortValue;
}
document.add("sortValues", convertedSortValues);
}
return queryResult;
}
private Document retrieveDocument(final SchemaField uniqueField, int doc) throws IOException {
FieldSelectorVisitor fieldSelectorVisitor = new FieldSelectorVisitor(new FieldSelector() {
public FieldSelectorResult accept(String fieldName) {
if (uniqueField.getName().equals(fieldName)) {
return FieldSelectorResult.LOAD_AND_BREAK;
}
return FieldSelectorResult.NO_LOAD;
}
});
rb.req.getSearcher().doc(doc, fieldSelectorVisitor);
return fieldSelectorVisitor.getDocument();
}
}

View File

@ -0,0 +1,52 @@
package org.apache.solr.search.grouping.endresulttransformer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.ScoreDoc;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.grouping.GroupingSpecification;
import java.util.Map;
/**
* Responsible for transforming the grouped result into the final format for displaying purposes.
*/
public interface EndResultTransformer {
/**
* Transforms the specified result into its final form and puts it into the specified response.
*
* @param result The map containing the grouping result (for grouping by field and query)
* @param response The response that will be rendered to the client
* @param groupingSpecification The grouping specification
* @param solrDocumentSource The source of {@link SolrDocument} instances
*/
void transform(Map<String, ?> result, SolrQueryResponse response, GroupingSpecification groupingSpecification, SolrDocumentSource solrDocumentSource);
/**
* Abstracts the source for {@link SolrDocument} instances.
* The source of documents is different for a distributed search than local search
*/
public interface SolrDocumentSource {
SolrDocument retrieve(ScoreDoc doc);
}
}

View File

@ -0,0 +1,108 @@
package org.apache.solr.search.grouping.endresulttransformer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.grouping.GroupDocs;
import org.apache.lucene.search.grouping.TopGroups;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.search.grouping.GroupingSpecification;
import org.apache.solr.search.grouping.distributed.command.QueryCommandResult;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
*
*/
public class GroupedEndResultTransformer implements EndResultTransformer {
private final SolrIndexSearcher searcher;
public GroupedEndResultTransformer(SolrIndexSearcher searcher) {
this.searcher = searcher;
}
public void transform(Map<String, ?> result, SolrQueryResponse response, GroupingSpecification groupingSpecification, SolrDocumentSource solrDocumentSource) {
NamedList<Object> commands = new NamedList<Object>();
for (Map.Entry<String, ?> entry : result.entrySet()) {
Object value = entry.getValue();
if (TopGroups.class.isInstance(value)) {
@SuppressWarnings("unchecked")
TopGroups<BytesRef> topGroups = (TopGroups<BytesRef>) value;
NamedList<Object> command = new SimpleOrderedMap<Object>();
command.add("matches", topGroups.totalHitCount);
if (topGroups.totalGroupCount != null) {
command.add("ngroups", topGroups.totalGroupCount);
}
List<NamedList> groups = new ArrayList<NamedList>();
SchemaField groupField = searcher.getSchema().getField(entry.getKey());
FieldType groupFieldType = groupField.getType();
for (GroupDocs<BytesRef> group : topGroups.groups) {
SimpleOrderedMap<Object> groupResult = new SimpleOrderedMap<Object>();
if (group.groupValue != null) {
groupResult.add(
"groupValue", groupFieldType.toObject(groupField.createField(group.groupValue.utf8ToString(), 0.0f))
);
} else {
groupResult.add("groupValue", null);
}
SolrDocumentList docList = new SolrDocumentList();
docList.setNumFound(group.totalHits);
if (!Float.isNaN(group.maxScore)) {
docList.setMaxScore(group.maxScore);
}
docList.setStart(groupingSpecification.getGroupOffset());
for (ScoreDoc scoreDoc : group.scoreDocs) {
docList.add(solrDocumentSource.retrieve(scoreDoc));
}
groupResult.add("doclist", docList);
groups.add(groupResult);
}
command.add("groups", groups);
commands.add(entry.getKey(), command);
} else if (QueryCommandResult.class.isInstance(value)) {
QueryCommandResult queryCommandResult = (QueryCommandResult) value;
NamedList<Object> command = new SimpleOrderedMap<Object>();
command.add("matches", queryCommandResult.getMatches());
SolrDocumentList docList = new SolrDocumentList();
docList.setNumFound(queryCommandResult.getTopDocs().totalHits);
if (!Float.isNaN(queryCommandResult.getTopDocs().getMaxScore())) {
docList.setMaxScore(queryCommandResult.getTopDocs().getMaxScore());
}
docList.setStart(groupingSpecification.getGroupOffset());
for (ScoreDoc scoreDoc :queryCommandResult.getTopDocs().scoreDocs){
docList.add(solrDocumentSource.retrieve(scoreDoc));
}
command.add("doclist", docList);
commands.add(entry.getKey(), command);
}
}
response.add("grouped", commands);
}
}

View File

@ -0,0 +1,59 @@
package org.apache.solr.search.grouping.endresulttransformer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.grouping.GroupDocs;
import org.apache.lucene.search.grouping.TopGroups;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.grouping.GroupingSpecification;
import java.util.Map;
/**
*
*/
public class MainEndResultTransformer implements EndResultTransformer {
public void transform(Map<String, ?> result, SolrQueryResponse response, GroupingSpecification groupingSpecification, SolrDocumentSource solrDocumentSource) {
Object value = result.get(groupingSpecification.getFields()[0]);
if (TopGroups.class.isInstance(value)) {
@SuppressWarnings("unchecked")
TopGroups<BytesRef> topGroups = (TopGroups<BytesRef>) value;
SolrDocumentList docList = new SolrDocumentList();
docList.setStart(groupingSpecification.getOffset());
docList.setNumFound(topGroups.totalHitCount);
Float maxScore = Float.NEGATIVE_INFINITY;
for (GroupDocs<BytesRef> group : topGroups.groups) {
for (ScoreDoc scoreDoc : group.scoreDocs) {
if (maxScore < scoreDoc.score) {
maxScore = scoreDoc.score;
}
docList.add(solrDocumentSource.retrieve(scoreDoc));
}
}
if (maxScore != Float.NEGATIVE_INFINITY) {
docList.setMaxScore(maxScore);
}
response.add("response", docList);
}
}
}

View File

@ -0,0 +1,73 @@
package org.apache.solr.search.grouping.endresulttransformer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.grouping.GroupDocs;
import org.apache.lucene.search.grouping.TopGroups;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.grouping.GroupingSpecification;
import java.util.Map;
/**
*
*/
public class SimpleEndResultTransformer implements EndResultTransformer {
@Override
public void transform(Map<String, ?> result, SolrQueryResponse response, GroupingSpecification groupingSpecification, SolrDocumentSource solrDocumentSource) {
NamedList<Object> commands = new SimpleOrderedMap<Object>();
for (Map.Entry<String, ?> entry : result.entrySet()) {
Object value = entry.getValue();
if (TopGroups.class.isInstance(value)) {
@SuppressWarnings("unchecked")
TopGroups<BytesRef> topGroups = (TopGroups<BytesRef>) value;
NamedList<Object> command = new SimpleOrderedMap<Object>();
command.add("matches", topGroups.totalHitCount);
if (topGroups.totalGroupCount != null) {
command.add("ngroups", topGroups.totalGroupCount);
}
SolrDocumentList docList = new SolrDocumentList();
docList.setStart(groupingSpecification.getOffset());
docList.setNumFound(topGroups.totalHitCount);
Float maxScore = Float.NEGATIVE_INFINITY;
for (GroupDocs<BytesRef> group : topGroups.groups) {
for (ScoreDoc scoreDoc : group.scoreDocs) {
if (maxScore < scoreDoc.score) {
maxScore = scoreDoc.score;
}
docList.add(solrDocumentSource.retrieve(scoreDoc));
}
}
if (maxScore != Float.NEGATIVE_INFINITY) {
docList.setMaxScore(maxScore);
}
command.add("doclist", docList);
commands.add(entry.getKey(), command);
}
}
response.add("grouped", commands);
}
}

View File

@ -0,0 +1,152 @@
package org.apache.solr;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* TODO? perhaps use:
* http://docs.codehaus.org/display/JETTY/ServletTester
* rather then open a real connection?
*
* @since solr 4.0
*/
public class TestDistributedGrouping extends BaseDistributedSearchTestCase {
String t1="a_t";
String i1="a_si";
String s1="a_s";
String tlong = "other_tl1";
String tdate_a = "a_n_tdt";
String tdate_b = "b_n_tdt";
String oddField="oddField_s";
public void doTest() throws Exception {
del("*:*");
commit();
handle.clear();
handle.put("QTime", SKIPVAL);
handle.put("timestamp", SKIPVAL);
// Test distributed grouping with empty indices
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", i1 + " asc, id asc");
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", i1 + " asc, id asc", "hl","true","hl.fl",t1);
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", i1 + " asc, id asc", "facet", "true", "facet.field", t1);
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", i1 + " asc, id asc", "stats", "true", "stats.field", i1);
query("q", "kings", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", i1 + " asc, id asc", "spellcheck", "true", "spellcheck.build", "true", "qt", "spellCheckCompRH");
indexr(id,1, i1, 100, tlong, 100,t1,"now is the time for all good men",
tdate_a, "2010-04-20T11:00:00Z",
tdate_b, "2009-08-20T11:00:00Z",
"foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d);
indexr(id,2, i1, 50 , tlong, 50,t1,"to come to the aid of their country.",
tdate_a, "2010-05-02T11:00:00Z",
tdate_b, "2009-11-02T11:00:00Z");
indexr(id,3, i1, 2, tlong, 2,t1,"how now brown cow",
tdate_a, "2010-05-03T11:00:00Z");
indexr(id,4, i1, -100 ,tlong, 101,
t1,"the quick fox jumped over the lazy dog",
tdate_a, "2010-05-03T11:00:00Z",
tdate_b, "2010-05-03T11:00:00Z");
indexr(id,5, i1, 500, tlong, 500 ,
t1,"the quick fox jumped way over the lazy dog",
tdate_a, "2010-05-05T11:00:00Z");
indexr(id,6, i1, -600, tlong, 600 ,t1,"humpty dumpy sat on a wall");
indexr(id,7, i1, 123, tlong, 123 ,t1,"humpty dumpy had a great fall");
indexr(id,8, i1, 876, tlong, 876,
tdate_b, "2010-01-05T11:00:00Z",
t1,"all the kings horses and all the kings men");
indexr(id,9, i1, 7, tlong, 7,t1,"couldn't put humpty together again");
indexr(id,10, i1, 4321, tlong, 4321,t1,"this too shall pass");
indexr(id,11, i1, -987, tlong, 987,
t1,"An eye for eye only ends up making the whole world blind.");
indexr(id,12, i1, 379, tlong, 379,
t1,"Great works are performed, not by strength, but by perseverance.");
indexr(id,13, i1, 232, tlong, 232,
t1,"no eggs on wall, lesson learned",
oddField, "odd man out");
indexr(id, 14, "SubjectTerms_mfacet", new String[] {"mathematical models", "mathematical analysis"});
indexr(id, 15, "SubjectTerms_mfacet", new String[] {"test 1", "test 2", "test3"});
indexr(id, 16, "SubjectTerms_mfacet", new String[] {"test 1", "test 2", "test3"});
String[] vals = new String[100];
for (int i=0; i<100; i++) {
vals[i] = "test " + i;
}
indexr(id, 17, "SubjectTerms_mfacet", vals);
for (int i=100; i<150; i++) {
indexr(id, i);
}
int[] values = new int[]{9999, 99999, 999999, 9999999};
for (int shard = 0; shard < clients.size(); shard++) {
int groupValue = values[shard];
for (int i = 500; i < 600; i++) {
index_specific(shard, i1, groupValue, s1, "a", id, i * (shard + 1));
}
}
commit();
// test grouping
// The second sort = id asc . The sorting behaviour is different in dist mode. See TopDocs#merge
// The shard the result came from matters in the order if both document sortvalues are equal
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", i1 + " asc, id asc");
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", "id asc, _docid_ asc");
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", "{!func}add(" + i1 + ",5) asc, id asc");
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", i1 + " asc, id asc", "facet", "true", "facet.field", t1);
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", i1 + " asc, id asc", "stats", "true", "stats.field", i1);
query("q", "kings", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", i1 + " asc, id asc", "spellcheck", "true", "spellcheck.build", "true", "qt", "spellCheckCompRH");
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", i1 + " asc, id asc", "facet", "true", "hl","true","hl.fl",t1);
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", i1 + " asc, id asc", "group.sort", "id desc");
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.offset", 5, "group.limit", 5, "sort", i1 + " asc, id asc");
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "offset", 5, "rows", 5, "group.offset", 5, "group.limit", 5, "sort", i1 + " asc, id asc");
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "offset", 5, "rows", 5, "sort", i1 + " asc, id asc", "group.format", "simple");
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "offset", 5, "rows", 5, "sort", i1 + " asc, id asc", "group.main", "true");
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.offset", 5, "group.limit", 5, "sort", i1 + " asc, id asc", "group.format", "simple", "offset", 5, "rows", 5);
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.offset", 5, "group.limit", 5, "sort", i1 + " asc, id asc", "group.main", "true", "offset", 5, "rows", 5);
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.query", t1 + ":kings OR " + t1 + ":eggs", "group.limit", 10, "sort", i1 + " asc, id asc");
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.query", t1 + ":kings OR " + t1 + ":eggs", "group.limit", 10, "sort", i1 + " asc, id asc");
// In order to validate this we need to make sure that during indexing that all documents of one group only occur on the same shard
query("q", "*:*", "fq", s1 + ":a", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", i1 + " asc, id asc", "group.ngroups", "true");
// We cannot validate distributed grouping with scoring as first sort. since there is no global idf. We can check if no errors occur
simpleQuery("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", "score desc, _docid_ asc, id asc");
simpleQuery("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10);
}
private void simpleQuery(Object... queryParams) throws SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
for (int i = 0; i < queryParams.length; i += 2) {
params.add(queryParams[i].toString(), queryParams[i + 1].toString());
}
queryServer(params);
}
}