diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index 68ed988d119..9d427dca8a0 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -53,6 +53,7 @@ import org.apache.solr.common.util.StrUtils; import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.handler.component.ShardRequest; import org.apache.solr.handler.component.ShardResponse; +import org.apache.solr.update.SolrIndexSplitter; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1145,7 +1146,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { Overseer.QUEUE_OPERATION, Overseer.ADD_ROUTING_RULE, COLLECTION_PROP, sourceCollection.getName(), SHARD_ID_PROP, sourceSlice.getName(), - "routeKey", splitKey, + "routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!", "range", splitRange.toString(), "targetCollection", targetCollection.getName(), "expireAt", String.valueOf(System.currentTimeMillis() + timeout)); @@ -1161,7 +1162,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { Thread.sleep(100); Map rules = zkStateReader.getClusterState().getSlice(sourceCollection.getName(), sourceSlice.getName()).getRoutingRules(); if (rules != null) { - RoutingRule rule = rules.get(splitKey); + RoutingRule rule = rules.get(SolrIndexSplitter.getRouteKey(splitKey) + "!"); if (rule != null && rule.getRouteRanges().contains(splitRange)) { added = true; break; diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index e2db6812b9c..de2b0885262 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -396,13 +396,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { for (DocRouter.Range range : ranges) { if (range.includes(hash)) { if (nodes == null) nodes = new ArrayList(); - Collection activeSlices = cstate.getActiveSlices(rule.getTargetCollectionName()); + DocCollection targetColl = cstate.getCollection(rule.getTargetCollectionName()); + Collection activeSlices = targetColl.getRouter().getSearchSlicesSingle(id, null, targetColl); if (activeSlices == null || activeSlices.isEmpty()) { throw new SolrException(ErrorCode.SERVER_ERROR, - "No active slices found for target collection: " + rule.getTargetCollectionName()); + "No active slices serving " + id + " found for target collection: " + rule.getTargetCollectionName()); } - // it doesn't matter where we forward it so just choose the first one - // todo this can be optimized Replica targetLeader = cstate.getLeader(rule.getTargetCollectionName(), activeSlices.iterator().next().getName()); nodes.add(new StdNode(new ZkCoreNodeProps(targetLeader))); break; diff --git a/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java b/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java index 7774a759ec2..71d48d25e7b 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java @@ -32,6 +32,7 @@ import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.update.DirectUpdateHandler2; +import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.Before; @@ -89,21 +90,57 @@ public class MigrateRouteKeyTest extends BasicDistributedZkTest { public void doTest() throws Exception { waitForThingsToLevelOut(15); - final String splitKey = "a!"; - final int[] splitKeyCount = new int[1]; - for (int id = 0; id < 26*3; id++) { - String shardKey = "" + (char) ('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution + multipleShardMigrateTest(); + printLayout(); + } + + private boolean waitForRuleToExpire(String splitKey, long finishTime) throws KeeperException, InterruptedException, SolrServerException, IOException { + ClusterState state;Slice slice; + boolean ruleRemoved = false; + while (System.currentTimeMillis() - finishTime < 60000) { + getCommonCloudSolrServer().getZkStateReader().updateClusterState(true); + state = getCommonCloudSolrServer().getZkStateReader().getClusterState(); + slice = state.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD2); + Map routingRules = slice.getRoutingRules(); + if (routingRules == null || routingRules.isEmpty() || !routingRules.containsKey(splitKey)) { + ruleRemoved = true; + break; + } SolrInputDocument doc = new SolrInputDocument(); - doc.addField("id", shardKey + "!" + id); - doc.addField("n_ti", id); + doc.addField("id", splitKey + System.currentTimeMillis()); cloudClient.add(doc); - if (splitKey.equals(shardKey + "!")) - splitKeyCount[0]++; + Thread.sleep(1000); } - assertTrue(splitKeyCount[0] > 0); + return ruleRemoved; + } - String targetCollection = "migrate_routekey_test_targetCollection"; + private void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set("action", CollectionParams.CollectionAction.MIGRATE.toString()); + params.set("collection", sourceCollection); + params.set("target.collection", targetCollection); + params.set("split.key", splitKey); + params.set("forward.timeout", 45); + invoke(params); + } + + private void invoke(ModifiableSolrParams params) throws SolrServerException, IOException { + SolrRequest request = new QueryRequest(params); + request.setPath("/admin/collections"); + + String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient) + .getBaseURL(); + baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length()); + + HttpSolrServer baseServer = new HttpSolrServer(baseUrl); + baseServer.setConnectionTimeout(15000); + baseServer.setSoTimeout(60000 * 5); + baseServer.request(request); + baseServer.shutdown(); + } + + private void createCollection(String targetCollection) throws Exception { HashMap> collectionInfos = new HashMap>(); CloudSolrServer client = null; try { @@ -122,38 +159,34 @@ public class MigrateRouteKeyTest extends BasicDistributedZkTest { checkForCollection(targetCollection, list, null); waitForRecoveriesToFinish(targetCollection, false); + } - class Indexer extends Thread { - final int seconds; - - public Indexer(int seconds) { - this.seconds = seconds; - } - - @Override - public void run() { - long start = System.currentTimeMillis(); - for (int id = 26*3; id < 500 && System.currentTimeMillis() - start <= seconds*1000; id++) { - String shardKey = "" + (char) ('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution - SolrInputDocument doc = new SolrInputDocument(); - doc.addField("id", shardKey + "!" + id); - doc.addField("n_ti", id); - try { - cloudClient.add(doc); - if (splitKey.equals(shardKey + "!")) - splitKeyCount[0]++; - } catch (Exception e) { - log.error("Exception while adding document id: " + doc.getField("id"), e); - } - try { - Thread.sleep(50); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } + private void multipleShardMigrateTest() throws Exception { + del("*:*"); + commit(); + assertTrue(cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound() == 0); + final String splitKey = "a"; + final int BIT_SEP = 1; + final int[] splitKeyCount = new int[1]; + for (int id = 0; id < 26*3; id++) { + String shardKey = "" + (char) ('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution + String key = shardKey; + if (splitKey.equals(shardKey)) { + key += "/" + BIT_SEP; // spread it over half the collection } + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", key + "!" + id); + doc.addField("n_ti", id); + cloudClient.add(doc); + if (splitKey.equals(shardKey)) + splitKeyCount[0]++; } - Thread indexer = new Indexer(30); + assertTrue(splitKeyCount[0] > 0); + + String targetCollection = "migrate_multipleshardtest_targetCollection"; + createCollection(targetCollection); + + Indexer indexer = new Indexer(cloudClient, splitKey, 1, 30); indexer.start(); String url = CustomCollectionTest.getUrlFromZk(getCommonCloudSolrServer().getZkStateReader().getClusterState(), targetCollection); @@ -162,68 +195,76 @@ public class MigrateRouteKeyTest extends BasicDistributedZkTest { SolrQuery solrQuery = new SolrQuery("*:*"); assertEquals("DocCount on target collection does not match", 0, collectionClient.query(solrQuery).getResults().getNumFound()); - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set("action", CollectionParams.CollectionAction.MIGRATE.toString()); - params.set("collection", AbstractDistribZkTestBase.DEFAULT_COLLECTION); - params.set("target.collection", targetCollection); - params.set("split.key", splitKey); - params.set("forward.timeout", 45); - - SolrRequest request = new QueryRequest(params); - request.setPath("/admin/collections"); - - String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient) - .getBaseURL(); - baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length()); - - HttpSolrServer baseServer = new HttpSolrServer(baseUrl); - baseServer.setConnectionTimeout(15000); - baseServer.setSoTimeout(60000 * 5); - baseServer.request(request); - baseServer.shutdown(); + invokeMigrateApi(AbstractDistribZkTestBase.DEFAULT_COLLECTION, splitKey + "/" + BIT_SEP + "!", targetCollection); long finishTime = System.currentTimeMillis(); indexer.join(); + splitKeyCount[0] += indexer.getSplitKeyCount(); try { - cloudClient.deleteById("a!104"); + cloudClient.deleteById("a/" + BIT_SEP + "!104"); splitKeyCount[0]--; } catch (Exception e) { - log.warn("Error deleting document a!104", e); + log.warn("Error deleting document a/" + BIT_SEP + "!104", e); } cloudClient.commit(); collectionClient.commit(); + solrQuery = new SolrQuery("*:*").setRows(1000); + QueryResponse response = collectionClient.query(solrQuery); + log.info("Response from target collection: " + response); + assertEquals("DocCount on target collection does not match", splitKeyCount[0], response.getResults().getNumFound()); + getCommonCloudSolrServer().getZkStateReader().updateClusterState(true); ClusterState state = getCommonCloudSolrServer().getZkStateReader().getClusterState(); Slice slice = state.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD2); assertNotNull("Routing rule map is null", slice.getRoutingRules()); assertFalse("Routing rule map is empty", slice.getRoutingRules().isEmpty()); - assertNotNull("No routing rule exists for route key: " + splitKey, slice.getRoutingRules().get(splitKey)); + assertNotNull("No routing rule exists for route key: " + splitKey, slice.getRoutingRules().get(splitKey + "!")); - boolean ruleRemoved = false; - while (System.currentTimeMillis() - finishTime < 60000) { - getCommonCloudSolrServer().getZkStateReader().updateClusterState(true); - state = getCommonCloudSolrServer().getZkStateReader().getClusterState(); - slice = state.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD2); - Map routingRules = slice.getRoutingRules(); - if (routingRules == null || routingRules.isEmpty() || !routingRules.containsKey(splitKey)) { - ruleRemoved = true; - break; - } - SolrInputDocument doc = new SolrInputDocument(); - doc.addField("id", splitKey + System.currentTimeMillis()); - cloudClient.add(doc); - Thread.sleep(1000); + boolean ruleRemoved = waitForRuleToExpire(splitKey, finishTime); + assertTrue("Routing rule was not expired", ruleRemoved); + } + + static class Indexer extends Thread { + final int seconds; + final CloudSolrServer cloudClient; + final String splitKey; + int splitKeyCount = 0; + final int bitSep; + + public Indexer(CloudSolrServer cloudClient, String splitKey, int bitSep, int seconds) { + this.seconds = seconds; + this.cloudClient = cloudClient; + this.splitKey = splitKey; + this.bitSep = bitSep; } - assertTrue("Routing rule was not expired", ruleRemoved); + @Override + public void run() { + long start = System.currentTimeMillis(); + for (int id = 26*3; id < 500 && System.currentTimeMillis() - start <= seconds*1000; id++) { + String shardKey = "" + (char) ('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", shardKey + (bitSep != -1 ? "/" + bitSep : "") + "!" + id); + doc.addField("n_ti", id); + try { + cloudClient.add(doc); + if (splitKey.equals(shardKey)) + splitKeyCount++; + } catch (Exception e) { + log.error("Exception while adding document id: " + doc.getField("id"), e); + } + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } - solrQuery = new SolrQuery("*:*").setRows(1000); - QueryResponse response = collectionClient.query(solrQuery); - log.info("Response from target collection: " + response); - assertEquals("DocCount on shard1_0 does not match", splitKeyCount[0], response.getResults().getNumFound()); - - printLayout(); + public int getSplitKeyCount() { + return splitKeyCount; + } } }