LUCENE-5666: remove insanity during distributed grouping

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/lucene5666@1594452 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Muir 2014-05-14 04:22:07 +00:00
parent cce7c4db9c
commit b3a556bb32
4 changed files with 231 additions and 15 deletions

View File

@ -19,8 +19,10 @@ package org.apache.solr.search.grouping;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.MultiCollector;
@ -28,8 +30,12 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.TimeLimitingCollector;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.grouping.AbstractAllGroupHeadsCollector;
import org.apache.lucene.search.grouping.function.FunctionAllGroupHeadsCollector;
import org.apache.lucene.search.grouping.function.FunctionAllGroupsCollector;
import org.apache.lucene.search.grouping.term.TermAllGroupHeadsCollector;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.BitDocSet;
import org.apache.solr.search.DocSet;
import org.apache.solr.search.DocSetCollector;
@ -157,16 +163,25 @@ public class CommandHandler {
private DocSet computeGroupedDocSet(Query query, ProcessedFilter filter, List<Collector> collectors) throws IOException {
Command firstCommand = commands.get(0);
AbstractAllGroupHeadsCollector termAllGroupHeadsCollector =
TermAllGroupHeadsCollector.create(firstCommand.getKey(), firstCommand.getSortWithinGroup());
if (collectors.isEmpty()) {
searchWithTimeLimiter(query, filter, termAllGroupHeadsCollector);
String field = firstCommand.getKey();
SchemaField sf = searcher.getSchema().getField(field);
FieldType fieldType = sf.getType();
final AbstractAllGroupHeadsCollector allGroupHeadsCollector;
if (fieldType.getNumericType() != null) {
ValueSource vs = fieldType.getValueSource(sf, null);
allGroupHeadsCollector = new FunctionAllGroupHeadsCollector(vs, new HashMap<Object,Object>(), firstCommand.getSortWithinGroup());
} else {
collectors.add(termAllGroupHeadsCollector);
allGroupHeadsCollector = TermAllGroupHeadsCollector.create(firstCommand.getKey(), firstCommand.getSortWithinGroup());
}
if (collectors.isEmpty()) {
searchWithTimeLimiter(query, filter, allGroupHeadsCollector);
} else {
collectors.add(allGroupHeadsCollector);
searchWithTimeLimiter(query, filter, MultiCollector.wrap(collectors.toArray(new Collector[collectors.size()])));
}
return new BitDocSet(termAllGroupHeadsCollector.retrieveGroupHeads(searcher.maxDoc()));
return new BitDocSet(allGroupHeadsCollector.retrieveGroupHeads(searcher.maxDoc()));
}
private DocSet computeDocSet(Query query, ProcessedFilter filter, List<Collector> collectors) throws IOException {

View File

@ -0,0 +1,160 @@
package org.apache.solr.search.grouping.distributed.command;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import org.apache.lucene.search.grouping.GroupDocs;
import org.apache.lucene.search.grouping.SearchGroup;
import org.apache.lucene.search.grouping.TopGroups;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.mutable.MutableValue;
import org.apache.lucene.util.mutable.MutableValueDate;
import org.apache.lucene.util.mutable.MutableValueDouble;
import org.apache.lucene.util.mutable.MutableValueFloat;
import org.apache.lucene.util.mutable.MutableValueInt;
import org.apache.lucene.util.mutable.MutableValueLong;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.schema.TrieField;
/**
* this is a transition class: for numeric types we use function-based distributed grouping,
* otherwise term-based. so for now we internally use function-based but pretend like we did
* it all with bytes, to not change any wire serialization etc.
*/
class GroupConverter {
static Collection<SearchGroup<BytesRef>> fromMutable(SchemaField field, Collection<SearchGroup<MutableValue>> values) {
if (values == null) {
return null;
}
FieldType fieldType = field.getType();
List<SearchGroup<BytesRef>> result = new ArrayList<>(values.size());
for (SearchGroup<MutableValue> original : values) {
SearchGroup<BytesRef> converted = new SearchGroup<BytesRef>();
converted.sortValues = original.sortValues;
if (original.groupValue.exists) {
BytesRef binary = new BytesRef();
fieldType.readableToIndexed(original.groupValue.toString(), binary);
converted.groupValue = binary;
} else {
converted.groupValue = null;
}
result.add(converted);
}
return result;
}
static Collection<SearchGroup<MutableValue>> toMutable(SchemaField field, Collection<SearchGroup<BytesRef>> values) {
FieldType fieldType = field.getType();
List<SearchGroup<MutableValue>> result = new ArrayList<>(values.size());
for (SearchGroup<BytesRef> original : values) {
SearchGroup<MutableValue> converted = new SearchGroup<MutableValue>();
converted.sortValues = original.sortValues; // ?
TrieField.TrieTypes type = ((TrieField)fieldType).getType();
final MutableValue v;
switch (type) {
case INTEGER:
MutableValueInt mutableInt = new MutableValueInt();
if (original.groupValue == null) {
mutableInt.value = 0;
mutableInt.exists = false;
} else {
mutableInt.value = (Integer) fieldType.toObject(field, original.groupValue);
}
v = mutableInt;
break;
case FLOAT:
MutableValueFloat mutableFloat = new MutableValueFloat();
if (original.groupValue == null) {
mutableFloat.value = 0;
mutableFloat.exists = false;
} else {
mutableFloat.value = (Float) fieldType.toObject(field, original.groupValue);
}
v = mutableFloat;
break;
case DOUBLE:
MutableValueDouble mutableDouble = new MutableValueDouble();
if (original.groupValue == null) {
mutableDouble.value = 0;
mutableDouble.exists = false;
} else {
mutableDouble.value = (Double) fieldType.toObject(field, original.groupValue);
}
v = mutableDouble;
break;
case LONG:
MutableValueLong mutableLong = new MutableValueLong();
if (original.groupValue == null) {
mutableLong.value = 0;
mutableLong.exists = false;
} else {
mutableLong.value = (Long) fieldType.toObject(field, original.groupValue);
}
v = mutableLong;
break;
case DATE:
MutableValueDate mutableDate = new MutableValueDate();
if (original.groupValue == null) {
mutableDate.value = 0;
mutableDate.exists = false;
} else {
mutableDate.value = ((Date)fieldType.toObject(field, original.groupValue)).getTime();
}
v = mutableDate;
break;
default:
throw new AssertionError();
}
converted.groupValue = v;
result.add(converted);
}
return result;
}
static TopGroups<BytesRef> fromMutable(SchemaField field, TopGroups<MutableValue> values) {
if (values == null) {
return null;
}
FieldType fieldType = field.getType();
@SuppressWarnings("unchecked")
GroupDocs<BytesRef> groupDocs[] = new GroupDocs[values.groups.length];
for (int i = 0; i < values.groups.length; i++) {
GroupDocs<MutableValue> original = values.groups[i];
final BytesRef groupValue;
if (original.groupValue.exists) {
BytesRef binary = new BytesRef();
fieldType.readableToIndexed(original.groupValue.toString(), binary);
groupValue = binary;
} else {
groupValue = null;
}
groupDocs[i] = new GroupDocs<BytesRef>(original.score, original.maxScore, original.totalHits, original.scoreDocs, groupValue, original.groupSortValues);
}
return new TopGroups<BytesRef>(values.groupSort, values.withinGroupSort, values.totalHitCount, values.totalGroupedHitCount, groupDocs, values.maxScore);
}
}

View File

@ -17,12 +17,18 @@ package org.apache.solr.search.grouping.distributed.command;
* limitations under the License.
*/
import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.grouping.AbstractAllGroupsCollector;
import org.apache.lucene.search.grouping.AbstractFirstPassGroupingCollector;
import org.apache.lucene.search.grouping.SearchGroup;
import org.apache.lucene.search.grouping.function.FunctionAllGroupsCollector;
import org.apache.lucene.search.grouping.function.FunctionFirstPassGroupingCollector;
import org.apache.lucene.search.grouping.term.TermAllGroupsCollector;
import org.apache.lucene.search.grouping.term.TermFirstPassGroupingCollector;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.grouping.Command;
@ -76,8 +82,8 @@ public class SearchGroupsFieldCommand implements Command<Pair<Integer, Collectio
private final int topNGroups;
private final boolean includeGroupCount;
private TermFirstPassGroupingCollector firstPassGroupingCollector;
private TermAllGroupsCollector allGroupsCollector;
private AbstractFirstPassGroupingCollector firstPassGroupingCollector;
private AbstractAllGroupsCollector allGroupsCollector;
private SearchGroupsFieldCommand(SchemaField field, Sort groupSort, int topNGroups, boolean includeGroupCount) {
this.field = field;
@ -89,12 +95,23 @@ public class SearchGroupsFieldCommand implements Command<Pair<Integer, Collectio
@Override
public List<Collector> create() throws IOException {
List<Collector> collectors = new ArrayList<>();
FieldType fieldType = field.getType();
if (topNGroups > 0) {
firstPassGroupingCollector = new TermFirstPassGroupingCollector(field.getName(), groupSort, topNGroups);
if (fieldType.getNumericType() != null) {
ValueSource vs = fieldType.getValueSource(field, null);
firstPassGroupingCollector = new FunctionFirstPassGroupingCollector(vs, new HashMap<Object,Object>(), groupSort, topNGroups);
} else {
firstPassGroupingCollector = new TermFirstPassGroupingCollector(field.getName(), groupSort, topNGroups);
}
collectors.add(firstPassGroupingCollector);
}
if (includeGroupCount) {
allGroupsCollector = new TermAllGroupsCollector(field.getName());
if (fieldType.getNumericType() != null) {
ValueSource vs = fieldType.getValueSource(field, null);
allGroupsCollector = new FunctionAllGroupsCollector(vs, new HashMap<Object,Object>());
} else {
allGroupsCollector = new TermAllGroupsCollector(field.getName());
}
collectors.add(allGroupsCollector);
}
return collectors;
@ -104,7 +121,11 @@ public class SearchGroupsFieldCommand implements Command<Pair<Integer, Collectio
public Pair<Integer, Collection<SearchGroup<BytesRef>>> result() {
final Collection<SearchGroup<BytesRef>> topGroups;
if (topNGroups > 0) {
topGroups = firstPassGroupingCollector.getTopGroups(0, true);
if (field.getType().getNumericType() != null) {
topGroups = GroupConverter.fromMutable(field, firstPassGroupingCollector.getTopGroups(0, true));
} else {
topGroups = firstPassGroupingCollector.getTopGroups(0, true);
}
} else {
topGroups = Collections.emptyList();
}

View File

@ -17,13 +17,18 @@ package org.apache.solr.search.grouping.distributed.command;
* limitations under the License.
*/
import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.grouping.AbstractSecondPassGroupingCollector;
import org.apache.lucene.search.grouping.GroupDocs;
import org.apache.lucene.search.grouping.SearchGroup;
import org.apache.lucene.search.grouping.TopGroups;
import org.apache.lucene.search.grouping.function.FunctionSecondPassGroupingCollector;
import org.apache.lucene.search.grouping.term.TermSecondPassGroupingCollector;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.mutable.MutableValue;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.grouping.Command;
@ -31,6 +36,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
/**
@ -101,7 +107,7 @@ public class TopGroupsFieldCommand implements Command<TopGroups<BytesRef>> {
private final int maxDocPerGroup;
private final boolean needScores;
private final boolean needMaxScore;
private TermSecondPassGroupingCollector secondPassCollector;
private AbstractSecondPassGroupingCollector secondPassCollector;
private TopGroupsFieldCommand(SchemaField field,
Sort groupSort,
@ -126,9 +132,18 @@ public class TopGroupsFieldCommand implements Command<TopGroups<BytesRef>> {
}
List<Collector> collectors = new ArrayList<>();
secondPassCollector = new TermSecondPassGroupingCollector(
FieldType fieldType = field.getType();
if (fieldType.getNumericType() != null) {
ValueSource vs = fieldType.getValueSource(field, null);
Collection<SearchGroup<MutableValue>> v = GroupConverter.toMutable(field, firstPhaseGroups);
secondPassCollector = new FunctionSecondPassGroupingCollector(
v, groupSort, sortWithinGroup, maxDocPerGroup, needScores, needMaxScore, true, vs, new HashMap<Object,Object>()
);
} else {
secondPassCollector = new TermSecondPassGroupingCollector(
field.getName(), firstPhaseGroups, groupSort, sortWithinGroup, maxDocPerGroup, needScores, needMaxScore, true
);
);
}
collectors.add(secondPassCollector);
return collectors;
}
@ -140,7 +155,12 @@ public class TopGroupsFieldCommand implements Command<TopGroups<BytesRef>> {
return new TopGroups<>(groupSort.getSort(), sortWithinGroup.getSort(), 0, 0, new GroupDocs[0], Float.NaN);
}
return secondPassCollector.getTopGroups(0);
FieldType fieldType = field.getType();
if (fieldType.getNumericType() != null) {
return GroupConverter.fromMutable(field, secondPassCollector.getTopGroups(0));
} else {
return secondPassCollector.getTopGroups(0);
}
}
@Override