From 0827fffde65009c9203400827f470aa85088ca10 Mon Sep 17 00:00:00 2001 From: Yonik Seeley Date: Tue, 27 Mar 2012 20:58:34 +0000 Subject: [PATCH] SOLR-2656: distrib/cloud support for realtime get git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1306018 13f79535-47bb-0310-9956-ffa450edef68 --- .../component/RealTimeGetComponent.java | 164 +++++++++++++++++- .../apache/solr/cloud/FullSolrCloudTest.java | 17 +- .../solr/BaseDistributedSearchTestCase.java | 13 +- 3 files changed, 186 insertions(+), 8 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java index a3f7f48a314..014e408a8d9 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -21,12 +21,22 @@ import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; import org.apache.lucene.util.BytesRef; +import org.apache.solr.client.solrj.SolrResponse; +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.cloud.CloudDescriptor; +import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.*; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.Hash; +import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.SolrCore; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; @@ -44,11 +54,9 @@ import org.apache.solr.util.RefCounted; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.xml.transform.Transformer; import java.io.IOException; import java.net.URL; -import java.util.ArrayList; -import java.util.List; +import java.util.*; public class RealTimeGetComponent extends SearchComponent @@ -228,6 +236,156 @@ public class RealTimeGetComponent extends SearchComponent return toSolrDoc(out, schema); } + @Override + public int distributedProcess(ResponseBuilder rb) throws IOException { + if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS) + return ResponseBuilder.STAGE_GET_FIELDS; + if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) { + return createSubRequests(rb); + } + return ResponseBuilder.STAGE_DONE; + } + + public int createSubRequests(ResponseBuilder rb) throws IOException { + SolrParams params = rb.req.getParams(); + String id1[] = params.getParams("id"); + String ids[] = params.getParams("ids"); + + if (id1 == null && ids == null) { + return ResponseBuilder.STAGE_DONE; + } + + List allIds = new ArrayList(); + if (id1 != null) { + for (String s : id1) { + allIds.add(s); + } + } + if (ids != null) { + for (String s : ids) { + allIds.addAll( StrUtils.splitSmart(s, ",", true) ); + } + } + + // TODO: handle collection=...? + + ZkController zkController = rb.req.getCore().getCoreDescriptor().getCoreContainer().getZkController(); + + // if shards=... then use that + if (zkController != null && params.get("shards") == null) { + SchemaField sf = rb.req.getSchema().getUniqueKeyField(); + + CloudDescriptor cloudDescriptor = rb.req.getCore().getCoreDescriptor().getCloudDescriptor(); + + String collection = cloudDescriptor.getCollectionName(); + + CloudState cloudState = zkController.getCloudState(); + + Map> shardToId = new HashMap>(); + for (String id : allIds) { + BytesRef br = new BytesRef(); + sf.getType().readableToIndexed(id, br); + int hash = Hash.murmurhash3_x86_32(br.bytes, br.offset, br.length, 0); + String shard = cloudState.getShard(hash, collection); + + List idsForShard = shardToId.get(shard); + if (idsForShard == null) { + idsForShard = new ArrayList(2); + shardToId.put(shard, idsForShard); + } + idsForShard.add(id); + } + + for (Map.Entry> entry : shardToId.entrySet()) { + String shard = entry.getKey(); + String shardIdList = StrUtils.join(entry.getValue(), ','); + + ShardRequest sreq = new ShardRequest(); + + sreq.purpose = 1; + // sreq.shards = new String[]{shard}; // TODO: would be nice if this would work... + sreq.shards = sliceToShards(rb, collection, shard); + sreq.actualShards = sreq.shards; + sreq.params = new ModifiableSolrParams(); + sreq.params.set("shards.qt","/get"); // TODO: how to avoid hardcoding this and hit the same handler? + sreq.params.set("distrib",false); + sreq.params.set("ids", shardIdList); + + rb.addRequest(this, sreq); + } + } else { + String shardIdList = StrUtils.join(allIds, ','); + ShardRequest sreq = new ShardRequest(); + + sreq.purpose = 1; + sreq.shards = null; // ALL + sreq.actualShards = sreq.shards; + sreq.params = new ModifiableSolrParams(); + sreq.params.set("shards.qt","/get"); // TODO: how to avoid hardcoding this and hit the same handler? + sreq.params.set("distrib",false); + sreq.params.set("ids", shardIdList); + + rb.addRequest(this, sreq); + } + + return ResponseBuilder.STAGE_DONE; + } + + private String[] sliceToShards(ResponseBuilder rb, String collection, String slice) { + String lookup = collection + '_' + slice; // seems either form may be filled in rb.slices? + + // We use this since the shard handler already filled in the slice to shards mapping. + // A better approach would be to avoid filling out every slice each time, or to cache + // the mappings. + + for (int i=0; i 0 ? docList.get(0) : null); + } else { + docList.setNumFound(docList.size()); + rb.rsp.add("response", docList); + } + } + //////////////////////////////////////////// diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java index 3df1ee76ffc..729316641ea 100644 --- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java @@ -817,8 +817,23 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase { } private void checkQueries() throws Exception { + + handle.put("_version_", SKIPVAL); + query("q", "*:*", "sort", "n_tl1 desc"); - + + handle.put("response", UNORDERED); // get?ids=a,b,c requests are unordered + String ids = "987654"; + for (int i=0; i<20; i++) { + query("qt","/get", "id",Integer.toString(i)); + query("qt","/get", "ids",Integer.toString(i)); + ids = ids + ',' + Integer.toString(i); + query("qt","/get", "ids",ids); + } + handle.remove("response"); + + + // random value sort for (String f : fieldNames) { query("q", "*:*", "sort", f + " desc"); diff --git a/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java b/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java index 0664445c732..55b50680357 100644 --- a/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java +++ b/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java @@ -643,11 +643,16 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 { if (System.getProperty("remove.version.field") != null) { // we don't care if one has a version and the other doesnt - // control vs distrib - for (SolrDocument doc : a.getResults()) { - doc.removeFields("_version_"); + // TODO: this should prob be done by adding an ignore on _version_ rather than mutating the responses? + if (a.getResults() != null) { + for (SolrDocument doc : a.getResults()) { + doc.removeFields("_version_"); + } } - for (SolrDocument doc : b.getResults()) { - doc.removeFields("_version_"); + if (b.getResults() != null) { + for (SolrDocument doc : b.getResults()) { + doc.removeFields("_version_"); + } } } cmp = compare(a.getResponse(), b.getResponse(), flags, handle);