SOLR-3109: Fixed numerous redundant shard requests when using distributed grouping.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1243375 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Martijn van Groningen 2012-02-13 00:13:32 +00:00
parent 463a6bd527
commit d6cb99523a
7 changed files with 84 additions and 96 deletions

View File

@ -1,4 +1,3 @@
Apache Solr Release Notes
Introduction
@ -592,6 +591,8 @@ Bug Fixes
* SOLR-3084: Fixed initialiazation error when using
<queryResponseWriter default="true" ... /> (Bernd Fehling and hossman)
* SOLR-3109: Fixed numerous redundant shard requests when using distributed grouping.
(rblack via Martijn van Groningen)
Other Changes
----------------------

View File

@ -416,7 +416,7 @@ public class QueryComponent extends SearchComponent
if (groupingSpec != null) {
try {
boolean needScores = (cmd.getFlags() & SolrIndexSearcher.GET_SCORES) != 0;
if (params.getBool("group.distibuted.first", false)) {
if (params.getBool("group.distributed.first", false)) {
CommandHandler.Builder topsGroupsActionBuilder = new CommandHandler.Builder()
.setQueryCommand(cmd)
.setNeedDocSet(false) // Order matters here
@ -437,7 +437,7 @@ public class QueryComponent extends SearchComponent
rsp.add("firstPhase", commandHandler.processResult(result, serializer));
rb.setResult(result);
return;
} else if (params.getBool("group.distibuted.second", false)) {
} else if (params.getBool("group.distributed.second", false)) {
CommandHandler.Builder secondPhaseBuilder = new CommandHandler.Builder()
.setQueryCommand(cmd)
.setTruncateGroups(groupingSpec.isTruncateGroups() && groupingSpec.getFields().length > 0)

View File

@ -19,7 +19,6 @@ package org.apache.solr.handler.component;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.grouping.SearchGroup;
import org.apache.lucene.search.grouping.TopGroups;
import org.apache.lucene.util.BytesRef;
@ -42,6 +41,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* This class is experimental and will be changing in the future.
@ -168,7 +168,7 @@ public class ResponseBuilder
// Context fields for grouping
public final Map<String, Collection<SearchGroup<BytesRef>>> mergedSearchGroups = new HashMap<String, Collection<SearchGroup<BytesRef>>>();
public final Map<String, Map<SearchGroup<BytesRef>, String>> searchGroupToShard = new HashMap<String, Map<SearchGroup<BytesRef>, String>>();
public final Map<String, Map<SearchGroup<BytesRef>, Set<String>>> searchGroupToShards = new HashMap<String, Map<SearchGroup<BytesRef>, Set<String>>>();
public final Map<String, TopGroups<BytesRef>> mergedTopGroups = new HashMap<String, TopGroups<BytesRef>>();
public final Map<String, QueryCommandResult> mergedQueryCommandResults = new HashMap<String, QueryCommandResult>();
public final Map<Object, SolrDocument> retrievedDocuments = new HashMap<Object, SolrDocument>();

View File

@ -71,7 +71,7 @@ public class SearchGroupsRequestFactory implements ShardRequestFactory {
// in this first phase, request only the unique key field
// and any fields needed for merging.
sreq.params.set("group.distibuted.first","true");
sreq.params.set("group.distributed.first","true");
if ( (rb.getFieldFlags() & SolrIndexSearcher.GET_SCORES)!=0 || rb.getSortSpec().includesScore()) {
sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName() + ",score");

View File

@ -65,102 +65,57 @@ public class TopGroupsShardRequestFactory implements ShardRequestFactory {
private ShardRequest[] createRequestForSpecificShards(ResponseBuilder rb) {
// Determine all unique shards to query for TopGroups
Set<String> shards = new HashSet<String>();
for (String command : rb.searchGroupToShard.keySet()) {
Map<SearchGroup<BytesRef>, String> groupsToShard = rb.searchGroupToShard.get(command);
shards.addAll(groupsToShard.values());
Set<String> uniqueShards = new HashSet<String>();
for (String command : rb.searchGroupToShards.keySet()) {
Map<SearchGroup<BytesRef>, Set<String>> groupsToShard = rb.searchGroupToShards.get(command);
for (Set<String> shards : groupsToShard.values()) {
uniqueShards.addAll(shards);
}
}
ShardRequest[] sreqs = new ShardRequest[shards.size()];
int i = 0;
for (String shard : shards) {
ShardRequest sreq = new ShardRequest();
sreq.purpose = ShardRequest.PURPOSE_GET_TOP_IDS;
sreq.actualShards = new String[] {shard};
sreq.params = new ModifiableSolrParams(rb.req.getParams());
// If group.format=simple group.offset doesn't make sense
Grouping.Format responseFormat = rb.getGroupingSpec().getResponseFormat();
if (responseFormat == Grouping.Format.simple || rb.getGroupingSpec().isMain()) {
sreq.params.remove(GroupParams.GROUP_OFFSET);
}
sreq.params.remove(ShardParams.SHARDS);
// set the start (offset) to 0 for each shard request so we can properly merge
// results from the start.
if(rb.shards_start > -1) {
// if the client set shards.start set this explicitly
sreq.params.set(CommonParams.START,rb.shards_start);
} else {
sreq.params.set(CommonParams.START, "0");
}
if(rb.shards_rows > -1) {
// if the client set shards.rows set this explicity
sreq.params.set(CommonParams.ROWS,rb.shards_rows);
} else {
sreq.params.set(CommonParams.ROWS, rb.getSortSpec().getOffset() + rb.getSortSpec().getCount());
}
sreq.params.set("group.distibuted.second","true");
for (Map.Entry<String, Collection<SearchGroup<BytesRef>>> entry : rb.mergedSearchGroups.entrySet()) {
for (SearchGroup<BytesRef> searchGroup : entry.getValue()) {
String groupValue;
if (searchGroup.groupValue != null) {
String rawGroupValue = searchGroup.groupValue.utf8ToString();
FieldType fieldType = rb.req.getSearcher().getSchema().getField(entry.getKey()).getType();
groupValue = fieldType.indexedToReadable(rawGroupValue);
} else {
groupValue = GROUP_NULL_VALUE;
}
sreq.params.add("group.topgroups." + entry.getKey(), groupValue);
}
}
if ((rb.getFieldFlags() & SolrIndexSearcher.GET_SCORES) != 0 || rb.getSortSpec().includesScore()) {
sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName() + ",score");
} else {
sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName());
}
sreqs[i++] = sreq;
}
return sreqs;
return createRequest(rb, uniqueShards.toArray(new String[uniqueShards.size()]));
}
private ShardRequest[] createRequestForAllShards(ResponseBuilder rb) {
ShardRequest sreq = new ShardRequest();
sreq.purpose = ShardRequest.PURPOSE_GET_TOP_IDS;
return createRequest(rb, ShardRequest.ALL_SHARDS);
}
private ShardRequest[] createRequest(ResponseBuilder rb, String[] shards)
{
ShardRequest sreq = new ShardRequest();
sreq.shards = shards;
sreq.purpose = ShardRequest.PURPOSE_GET_TOP_IDS;
sreq.params = new ModifiableSolrParams(rb.req.getParams());
// If group.format=simple group.offset doesn't make sense
Grouping.Format responseFormat = rb.getGroupingSpec().getResponseFormat();
if (responseFormat == Grouping.Format.simple || rb.getGroupingSpec().isMain()) {
sreq.params.remove(GroupParams.GROUP_OFFSET);
}
sreq.params.remove(ShardParams.SHARDS);
// set the start (offset) to 0 for each shard request so we can properly merge
// results from the start.
if(rb.shards_start > -1) {
if (rb.shards_start > -1) {
// if the client set shards.start set this explicitly
sreq.params.set(CommonParams.START,rb.shards_start);
sreq.params.set(CommonParams.START, rb.shards_start);
} else {
sreq.params.set(CommonParams.START, "0");
}
if(rb.shards_rows > -1) {
if (rb.shards_rows > -1) {
// if the client set shards.rows set this explicity
sreq.params.set(CommonParams.ROWS,rb.shards_rows);
sreq.params.set(CommonParams.ROWS, rb.shards_rows);
} else {
sreq.params.set(CommonParams.ROWS, rb.getSortSpec().getOffset() + rb.getSortSpec().getCount());
}
sreq.params.set("group.distibuted.second","true");
sreq.params.set("group.distributed.second", "true");
for (Map.Entry<String, Collection<SearchGroup<BytesRef>>> entry : rb.mergedSearchGroups.entrySet()) {
for (SearchGroup<BytesRef> searchGroup : entry.getValue()) {
String groupValue;
if (searchGroup.groupValue != null) {
String rawGroupValue = searchGroup.groupValue.utf8ToString();
String rawGroupValue = searchGroup.groupValue.utf8ToString();
FieldType fieldType = rb.req.getSearcher().getSchema().getField(entry.getKey()).getType();
groupValue = fieldType.indexedToReadable(rawGroupValue);
} else {
@ -170,7 +125,7 @@ public class TopGroupsShardRequestFactory implements ShardRequestFactory {
}
}
if ( (rb.getFieldFlags() & SolrIndexSearcher.GET_SCORES)!=0 || rb.getSortSpec().includesScore()) {
if ((rb.getFieldFlags() & SolrIndexSearcher.GET_SCORES) != 0 || rb.getSortSpec().includesScore()) {
sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName() + ",score");
} else {
sreq.params.set(CommonParams.FL, rb.req.getSchema().getUniqueKeyField().getName());

View File

@ -46,12 +46,12 @@ public class SearchGroupShardResponseProcessor implements ShardResponseProcessor
String[] fields = rb.getGroupingSpec().getFields();
Map<String, List<Collection<SearchGroup<BytesRef>>>> commandSearchGroups = new HashMap<String, List<Collection<SearchGroup<BytesRef>>>>();
Map<String, Map<SearchGroup<BytesRef>, String>> tempSearchGroupToShard = new HashMap<String, Map<SearchGroup<BytesRef>, String>>();
Map<String, Map<SearchGroup<BytesRef>, Set<String>>> tempSearchGroupToShards = new HashMap<String, Map<SearchGroup<BytesRef>, Set<String>>>();
for (String field : fields) {
commandSearchGroups.put(field, new ArrayList<Collection<SearchGroup<BytesRef>>>(shardRequest.responses.size()));
tempSearchGroupToShard.put(field, new HashMap<SearchGroup<BytesRef>, String>());
if (!rb.searchGroupToShard.containsKey(field)) {
rb.searchGroupToShard.put(field, new HashMap<SearchGroup<BytesRef>, String>());
tempSearchGroupToShards.put(field, new HashMap<SearchGroup<BytesRef>, Set<String>>());
if (!rb.searchGroupToShards.containsKey(field)) {
rb.searchGroupToShards.put(field, new HashMap<SearchGroup<BytesRef>, Set<String>>());
}
}
@ -69,7 +69,13 @@ public class SearchGroupShardResponseProcessor implements ShardResponseProcessor
commandSearchGroups.get(field).add(searchGroups);
for (SearchGroup<BytesRef> searchGroup : searchGroups) {
tempSearchGroupToShard.get(field).put(searchGroup, srsp.getShard());
Map<SearchGroup<BytesRef>, java.util.Set<String>> map = tempSearchGroupToShards.get(field);
Set<String> shards = map.get(searchGroup);
if (shards == null) {
shards = new HashSet<String>();
map.put(searchGroup, shards);
}
shards.add(srsp.getShard());
}
}
}
@ -82,7 +88,7 @@ public class SearchGroupShardResponseProcessor implements ShardResponseProcessor
rb.mergedSearchGroups.put(groupField, mergedTopGroups);
for (SearchGroup<BytesRef> mergedTopGroup : mergedTopGroups) {
rb.searchGroupToShard.get(groupField).put(mergedTopGroup, tempSearchGroupToShard.get(groupField).get(mergedTopGroup));
rb.searchGroupToShards.get(groupField).put(mergedTopGroup, tempSearchGroupToShards.get(groupField).get(mergedTopGroup));
}
}
} catch (IOException e) {
@ -90,4 +96,4 @@ public class SearchGroupShardResponseProcessor implements ShardResponseProcessor
}
}
}
}

View File

@ -18,14 +18,8 @@ package org.apache.solr;
*/
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* TODO? perhaps use:
* http://docs.codehaus.org/display/JETTY/ServletTester
@ -62,17 +56,17 @@ public class TestDistributedGrouping extends BaseDistributedSearchTestCase {
tdate_a, "2010-04-20T11:00:00Z",
tdate_b, "2009-08-20T11:00:00Z",
"foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d);
indexr(id,2, i1, 50 , tlong, 50,t1,"to come to the aid of their country.",
indexr(id,2, i1, 50 , tlong, 50,t1,"to come to the aid of their country.",
tdate_a, "2010-05-02T11:00:00Z",
tdate_b, "2009-11-02T11:00:00Z");
indexr(id,3, i1, 2, tlong, 2,t1,"how now brown cow",
indexr(id,3, i1, 2, tlong, 2,t1,"how now brown cow",
tdate_a, "2010-05-03T11:00:00Z");
indexr(id,4, i1, -100 ,tlong, 101,
t1,"the quick fox jumped over the lazy dog",
t1,"the quick fox jumped over the lazy dog",
tdate_a, "2010-05-03T11:00:00Z",
tdate_b, "2010-05-03T11:00:00Z");
indexr(id,5, i1, 500, tlong, 500 ,
t1,"the quick fox jumped way over the lazy dog",
t1,"the quick fox jumped way over the lazy dog",
tdate_a, "2010-05-05T11:00:00Z");
indexr(id,6, i1, -600, tlong, 600 ,t1,"humpty dumpy sat on a wall");
indexr(id,7, i1, 123, tlong, 123 ,t1,"humpty dumpy had a great fall");
@ -85,9 +79,6 @@ public class TestDistributedGrouping extends BaseDistributedSearchTestCase {
t1,"An eye for eye only ends up making the whole world blind.");
indexr(id,12, i1, 379, tlong, 379,
t1,"Great works are performed, not by strength, but by perseverance.");
indexr(id,13, i1, 232, tlong, 232,
t1,"no eggs on wall, lesson learned",
oddField, "odd man out");
indexr(id, 14, "SubjectTerms_mfacet", new String[] {"mathematical models", "mathematical analysis"});
indexr(id, 15, "SubjectTerms_mfacet", new String[] {"test 1", "test 2", "test3"});
@ -98,8 +89,39 @@ public class TestDistributedGrouping extends BaseDistributedSearchTestCase {
}
indexr(id, 17, "SubjectTerms_mfacet", vals);
for (int i=100; i<150; i++) {
indexr(id, i);
indexr(
id, 18, i1, 232, tlong, 332,
t1,"no eggs on wall, lesson learned",
oddField, "odd man out"
);
indexr(
id, 19, i1, 232, tlong, 432,
t1, "many eggs on wall",
oddField, "odd man in"
);
indexr(
id, 20, i1, 232, tlong, 532,
t1, "some eggs on wall",
oddField, "odd man between"
);
indexr(
id, 21, i1, 232, tlong, 632,
t1, "a few eggs on wall",
oddField, "odd man under"
);
indexr(
id, 22, i1, 232, tlong, 732,
t1, "any eggs on wall",
oddField, "odd man above"
);
indexr(
id, 23, i1, 233, tlong, 734,
t1, "dirty eggs",
oddField, "odd eggs"
);
for (int i = 100; i < 150; i++) {
indexr(id, i);
}
int[] values = new int[]{9999, 99999, 999999, 9999999};
@ -134,6 +156,10 @@ public class TestDistributedGrouping extends BaseDistributedSearchTestCase {
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.query", t1 + ":kings OR " + t1 + ":eggs", "group.limit", 10, "sort", i1 + " asc, id asc");
query("q", "*:*", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.query", t1 + ":kings OR " + t1 + ":eggs", "group.limit", 10, "sort", i1 + " asc, id asc");
// SOLR-3109
query("q", t1 + ":eggs", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", tlong + " asc, id asc");
query("q", i1 + ":232", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", tlong + " asc, id asc");
// In order to validate this we need to make sure that during indexing that all documents of one group only occur on the same shard
query("q", "*:*", "fq", s1 + ":a", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", i1 + " asc, id asc", "group.ngroups", "true");
query("q", "*:*", "fq", s1 + ":a", "rows", 100, "fl", "id," + i1, "group", "true", "group.field", i1, "group.limit", 10, "sort", i1 + " asc, id asc", "group.truncate", "true");