mirror of https://github.com/apache/lucene.git
SOLR-2656: distrib/cloud support for realtime get
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1306018 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b658af7876
commit
0827fffde6
|
@ -21,12 +21,22 @@ import org.apache.lucene.document.Document;
|
||||||
import org.apache.lucene.index.IndexableField;
|
import org.apache.lucene.index.IndexableField;
|
||||||
import org.apache.lucene.index.Term;
|
import org.apache.lucene.index.Term;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
|
import org.apache.solr.client.solrj.SolrResponse;
|
||||||
|
import org.apache.solr.client.solrj.util.ClientUtils;
|
||||||
|
import org.apache.solr.cloud.CloudDescriptor;
|
||||||
|
import org.apache.solr.cloud.ZkController;
|
||||||
import org.apache.solr.common.SolrDocument;
|
import org.apache.solr.common.SolrDocument;
|
||||||
import org.apache.solr.common.SolrDocumentList;
|
import org.apache.solr.common.SolrDocumentList;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.SolrInputDocument;
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
|
import org.apache.solr.common.cloud.*;
|
||||||
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
|
import org.apache.solr.common.params.ShardParams;
|
||||||
import org.apache.solr.common.params.SolrParams;
|
import org.apache.solr.common.params.SolrParams;
|
||||||
|
import org.apache.solr.common.util.Hash;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.common.util.StrUtils;
|
import org.apache.solr.common.util.StrUtils;
|
||||||
|
import org.apache.solr.core.CoreDescriptor;
|
||||||
import org.apache.solr.core.SolrCore;
|
import org.apache.solr.core.SolrCore;
|
||||||
import org.apache.solr.request.SolrQueryRequest;
|
import org.apache.solr.request.SolrQueryRequest;
|
||||||
import org.apache.solr.response.SolrQueryResponse;
|
import org.apache.solr.response.SolrQueryResponse;
|
||||||
|
@ -44,11 +54,9 @@ import org.apache.solr.util.RefCounted;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.xml.transform.Transformer;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
|
|
||||||
public class RealTimeGetComponent extends SearchComponent
|
public class RealTimeGetComponent extends SearchComponent
|
||||||
|
@ -228,6 +236,156 @@ public class RealTimeGetComponent extends SearchComponent
|
||||||
return toSolrDoc(out, schema);
|
return toSolrDoc(out, schema);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int distributedProcess(ResponseBuilder rb) throws IOException {
|
||||||
|
if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS)
|
||||||
|
return ResponseBuilder.STAGE_GET_FIELDS;
|
||||||
|
if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
|
||||||
|
return createSubRequests(rb);
|
||||||
|
}
|
||||||
|
return ResponseBuilder.STAGE_DONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int createSubRequests(ResponseBuilder rb) throws IOException {
|
||||||
|
SolrParams params = rb.req.getParams();
|
||||||
|
String id1[] = params.getParams("id");
|
||||||
|
String ids[] = params.getParams("ids");
|
||||||
|
|
||||||
|
if (id1 == null && ids == null) {
|
||||||
|
return ResponseBuilder.STAGE_DONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<String> allIds = new ArrayList<String>();
|
||||||
|
if (id1 != null) {
|
||||||
|
for (String s : id1) {
|
||||||
|
allIds.add(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (ids != null) {
|
||||||
|
for (String s : ids) {
|
||||||
|
allIds.addAll( StrUtils.splitSmart(s, ",", true) );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: handle collection=...?
|
||||||
|
|
||||||
|
ZkController zkController = rb.req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
|
||||||
|
|
||||||
|
// if shards=... then use that
|
||||||
|
if (zkController != null && params.get("shards") == null) {
|
||||||
|
SchemaField sf = rb.req.getSchema().getUniqueKeyField();
|
||||||
|
|
||||||
|
CloudDescriptor cloudDescriptor = rb.req.getCore().getCoreDescriptor().getCloudDescriptor();
|
||||||
|
|
||||||
|
String collection = cloudDescriptor.getCollectionName();
|
||||||
|
|
||||||
|
CloudState cloudState = zkController.getCloudState();
|
||||||
|
|
||||||
|
Map<String, List<String>> shardToId = new HashMap<String, List<String>>();
|
||||||
|
for (String id : allIds) {
|
||||||
|
BytesRef br = new BytesRef();
|
||||||
|
sf.getType().readableToIndexed(id, br);
|
||||||
|
int hash = Hash.murmurhash3_x86_32(br.bytes, br.offset, br.length, 0);
|
||||||
|
String shard = cloudState.getShard(hash, collection);
|
||||||
|
|
||||||
|
List<String> idsForShard = shardToId.get(shard);
|
||||||
|
if (idsForShard == null) {
|
||||||
|
idsForShard = new ArrayList<String>(2);
|
||||||
|
shardToId.put(shard, idsForShard);
|
||||||
|
}
|
||||||
|
idsForShard.add(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Map.Entry<String,List<String>> entry : shardToId.entrySet()) {
|
||||||
|
String shard = entry.getKey();
|
||||||
|
String shardIdList = StrUtils.join(entry.getValue(), ',');
|
||||||
|
|
||||||
|
ShardRequest sreq = new ShardRequest();
|
||||||
|
|
||||||
|
sreq.purpose = 1;
|
||||||
|
// sreq.shards = new String[]{shard}; // TODO: would be nice if this would work...
|
||||||
|
sreq.shards = sliceToShards(rb, collection, shard);
|
||||||
|
sreq.actualShards = sreq.shards;
|
||||||
|
sreq.params = new ModifiableSolrParams();
|
||||||
|
sreq.params.set("shards.qt","/get"); // TODO: how to avoid hardcoding this and hit the same handler?
|
||||||
|
sreq.params.set("distrib",false);
|
||||||
|
sreq.params.set("ids", shardIdList);
|
||||||
|
|
||||||
|
rb.addRequest(this, sreq);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
String shardIdList = StrUtils.join(allIds, ',');
|
||||||
|
ShardRequest sreq = new ShardRequest();
|
||||||
|
|
||||||
|
sreq.purpose = 1;
|
||||||
|
sreq.shards = null; // ALL
|
||||||
|
sreq.actualShards = sreq.shards;
|
||||||
|
sreq.params = new ModifiableSolrParams();
|
||||||
|
sreq.params.set("shards.qt","/get"); // TODO: how to avoid hardcoding this and hit the same handler?
|
||||||
|
sreq.params.set("distrib",false);
|
||||||
|
sreq.params.set("ids", shardIdList);
|
||||||
|
|
||||||
|
rb.addRequest(this, sreq);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ResponseBuilder.STAGE_DONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String[] sliceToShards(ResponseBuilder rb, String collection, String slice) {
|
||||||
|
String lookup = collection + '_' + slice; // seems either form may be filled in rb.slices?
|
||||||
|
|
||||||
|
// We use this since the shard handler already filled in the slice to shards mapping.
|
||||||
|
// A better approach would be to avoid filling out every slice each time, or to cache
|
||||||
|
// the mappings.
|
||||||
|
|
||||||
|
for (int i=0; i<rb.slices.length; i++) {
|
||||||
|
log.info("LOOKUP_SLICE:" + rb.slices[i] + "=" + rb.shards[i]);
|
||||||
|
if (lookup.equals(rb.slices[i]) || slice.equals(rb.slices[i])) {
|
||||||
|
return new String[]{rb.shards[i]};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't find shard '" + lookup + "'");
|
||||||
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
private void handleRegularResponses(ResponseBuilder rb, ShardRequest sreq) {
|
||||||
|
}
|
||||||
|
***/
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void finishStage(ResponseBuilder rb) {
|
||||||
|
if (rb.stage != ResponseBuilder.STAGE_GET_FIELDS) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
mergeResponses(rb);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mergeResponses(ResponseBuilder rb) {
|
||||||
|
SolrDocumentList docList = new SolrDocumentList();
|
||||||
|
|
||||||
|
for (ShardRequest sreq : rb.finished) {
|
||||||
|
// if shards=shard1,shard2 was used, then we query both shards for each id and
|
||||||
|
// can get more than one response
|
||||||
|
for (ShardResponse srsp : sreq.responses) {
|
||||||
|
SolrResponse sr = srsp.getSolrResponse();
|
||||||
|
NamedList nl = sr.getResponse();
|
||||||
|
SolrDocumentList subList = (SolrDocumentList)nl.get("response");
|
||||||
|
docList.addAll(subList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (docList.size() <= 1 && rb.req.getParams().getParams("ids")==null) {
|
||||||
|
// if the doc was not found, then use a value of null.
|
||||||
|
rb.rsp.add("doc", docList.size() > 0 ? docList.get(0) : null);
|
||||||
|
} else {
|
||||||
|
docList.setNumFound(docList.size());
|
||||||
|
rb.rsp.add("response", docList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
////////////////////////////////////////////
|
////////////////////////////////////////////
|
||||||
|
|
|
@ -817,8 +817,23 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkQueries() throws Exception {
|
private void checkQueries() throws Exception {
|
||||||
|
|
||||||
|
handle.put("_version_", SKIPVAL);
|
||||||
|
|
||||||
query("q", "*:*", "sort", "n_tl1 desc");
|
query("q", "*:*", "sort", "n_tl1 desc");
|
||||||
|
|
||||||
|
handle.put("response", UNORDERED); // get?ids=a,b,c requests are unordered
|
||||||
|
String ids = "987654";
|
||||||
|
for (int i=0; i<20; i++) {
|
||||||
|
query("qt","/get", "id",Integer.toString(i));
|
||||||
|
query("qt","/get", "ids",Integer.toString(i));
|
||||||
|
ids = ids + ',' + Integer.toString(i);
|
||||||
|
query("qt","/get", "ids",ids);
|
||||||
|
}
|
||||||
|
handle.remove("response");
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// random value sort
|
// random value sort
|
||||||
for (String f : fieldNames) {
|
for (String f : fieldNames) {
|
||||||
query("q", "*:*", "sort", f + " desc");
|
query("q", "*:*", "sort", f + " desc");
|
||||||
|
|
|
@ -643,11 +643,16 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
|
||||||
if (System.getProperty("remove.version.field") != null) {
|
if (System.getProperty("remove.version.field") != null) {
|
||||||
// we don't care if one has a version and the other doesnt -
|
// we don't care if one has a version and the other doesnt -
|
||||||
// control vs distrib
|
// control vs distrib
|
||||||
for (SolrDocument doc : a.getResults()) {
|
// TODO: this should prob be done by adding an ignore on _version_ rather than mutating the responses?
|
||||||
doc.removeFields("_version_");
|
if (a.getResults() != null) {
|
||||||
|
for (SolrDocument doc : a.getResults()) {
|
||||||
|
doc.removeFields("_version_");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for (SolrDocument doc : b.getResults()) {
|
if (b.getResults() != null) {
|
||||||
doc.removeFields("_version_");
|
for (SolrDocument doc : b.getResults()) {
|
||||||
|
doc.removeFields("_version_");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cmp = compare(a.getResponse(), b.getResponse(), flags, handle);
|
cmp = compare(a.getResponse(), b.getResponse(), flags, handle);
|
||||||
|
|
Loading…
Reference in New Issue