mirror of https://github.com/apache/lucene.git
SOLR-5308: Handle route keys with bit separators. Route docs to target shard leader directly.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1549821 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a1461ad9b1
commit
4b8d5f0d51
|
@ -53,6 +53,7 @@ import org.apache.solr.common.util.StrUtils;
|
||||||
import org.apache.solr.handler.component.ShardHandler;
|
import org.apache.solr.handler.component.ShardHandler;
|
||||||
import org.apache.solr.handler.component.ShardRequest;
|
import org.apache.solr.handler.component.ShardRequest;
|
||||||
import org.apache.solr.handler.component.ShardResponse;
|
import org.apache.solr.handler.component.ShardResponse;
|
||||||
|
import org.apache.solr.update.SolrIndexSplitter;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -1145,7 +1146,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
||||||
Overseer.QUEUE_OPERATION, Overseer.ADD_ROUTING_RULE,
|
Overseer.QUEUE_OPERATION, Overseer.ADD_ROUTING_RULE,
|
||||||
COLLECTION_PROP, sourceCollection.getName(),
|
COLLECTION_PROP, sourceCollection.getName(),
|
||||||
SHARD_ID_PROP, sourceSlice.getName(),
|
SHARD_ID_PROP, sourceSlice.getName(),
|
||||||
"routeKey", splitKey,
|
"routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!",
|
||||||
"range", splitRange.toString(),
|
"range", splitRange.toString(),
|
||||||
"targetCollection", targetCollection.getName(),
|
"targetCollection", targetCollection.getName(),
|
||||||
"expireAt", String.valueOf(System.currentTimeMillis() + timeout));
|
"expireAt", String.valueOf(System.currentTimeMillis() + timeout));
|
||||||
|
@ -1161,7 +1162,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
Map<String, RoutingRule> rules = zkStateReader.getClusterState().getSlice(sourceCollection.getName(), sourceSlice.getName()).getRoutingRules();
|
Map<String, RoutingRule> rules = zkStateReader.getClusterState().getSlice(sourceCollection.getName(), sourceSlice.getName()).getRoutingRules();
|
||||||
if (rules != null) {
|
if (rules != null) {
|
||||||
RoutingRule rule = rules.get(splitKey);
|
RoutingRule rule = rules.get(SolrIndexSplitter.getRouteKey(splitKey) + "!");
|
||||||
if (rule != null && rule.getRouteRanges().contains(splitRange)) {
|
if (rule != null && rule.getRouteRanges().contains(splitRange)) {
|
||||||
added = true;
|
added = true;
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -396,13 +396,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
||||||
for (DocRouter.Range range : ranges) {
|
for (DocRouter.Range range : ranges) {
|
||||||
if (range.includes(hash)) {
|
if (range.includes(hash)) {
|
||||||
if (nodes == null) nodes = new ArrayList<Node>();
|
if (nodes == null) nodes = new ArrayList<Node>();
|
||||||
Collection<Slice> activeSlices = cstate.getActiveSlices(rule.getTargetCollectionName());
|
DocCollection targetColl = cstate.getCollection(rule.getTargetCollectionName());
|
||||||
|
Collection<Slice> activeSlices = targetColl.getRouter().getSearchSlicesSingle(id, null, targetColl);
|
||||||
if (activeSlices == null || activeSlices.isEmpty()) {
|
if (activeSlices == null || activeSlices.isEmpty()) {
|
||||||
throw new SolrException(ErrorCode.SERVER_ERROR,
|
throw new SolrException(ErrorCode.SERVER_ERROR,
|
||||||
"No active slices found for target collection: " + rule.getTargetCollectionName());
|
"No active slices serving " + id + " found for target collection: " + rule.getTargetCollectionName());
|
||||||
}
|
}
|
||||||
// it doesn't matter where we forward it so just choose the first one
|
|
||||||
// todo this can be optimized
|
|
||||||
Replica targetLeader = cstate.getLeader(rule.getTargetCollectionName(), activeSlices.iterator().next().getName());
|
Replica targetLeader = cstate.getLeader(rule.getTargetCollectionName(), activeSlices.iterator().next().getName());
|
||||||
nodes.add(new StdNode(new ZkCoreNodeProps(targetLeader)));
|
nodes.add(new StdNode(new ZkCoreNodeProps(targetLeader)));
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
import org.apache.solr.common.params.CollectionParams;
|
import org.apache.solr.common.params.CollectionParams;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
import org.apache.solr.update.DirectUpdateHandler2;
|
import org.apache.solr.update.DirectUpdateHandler2;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
|
@ -89,21 +90,57 @@ public class MigrateRouteKeyTest extends BasicDistributedZkTest {
|
||||||
public void doTest() throws Exception {
|
public void doTest() throws Exception {
|
||||||
waitForThingsToLevelOut(15);
|
waitForThingsToLevelOut(15);
|
||||||
|
|
||||||
final String splitKey = "a!";
|
multipleShardMigrateTest();
|
||||||
final int[] splitKeyCount = new int[1];
|
printLayout();
|
||||||
for (int id = 0; id < 26*3; id++) {
|
}
|
||||||
String shardKey = "" + (char) ('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution
|
|
||||||
|
private boolean waitForRuleToExpire(String splitKey, long finishTime) throws KeeperException, InterruptedException, SolrServerException, IOException {
|
||||||
|
ClusterState state;Slice slice;
|
||||||
|
boolean ruleRemoved = false;
|
||||||
|
while (System.currentTimeMillis() - finishTime < 60000) {
|
||||||
|
getCommonCloudSolrServer().getZkStateReader().updateClusterState(true);
|
||||||
|
state = getCommonCloudSolrServer().getZkStateReader().getClusterState();
|
||||||
|
slice = state.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD2);
|
||||||
|
Map<String,RoutingRule> routingRules = slice.getRoutingRules();
|
||||||
|
if (routingRules == null || routingRules.isEmpty() || !routingRules.containsKey(splitKey)) {
|
||||||
|
ruleRemoved = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
SolrInputDocument doc = new SolrInputDocument();
|
SolrInputDocument doc = new SolrInputDocument();
|
||||||
doc.addField("id", shardKey + "!" + id);
|
doc.addField("id", splitKey + System.currentTimeMillis());
|
||||||
doc.addField("n_ti", id);
|
|
||||||
cloudClient.add(doc);
|
cloudClient.add(doc);
|
||||||
if (splitKey.equals(shardKey + "!"))
|
Thread.sleep(1000);
|
||||||
splitKeyCount[0]++;
|
|
||||||
}
|
}
|
||||||
assertTrue(splitKeyCount[0] > 0);
|
return ruleRemoved;
|
||||||
|
}
|
||||||
|
|
||||||
String targetCollection = "migrate_routekey_test_targetCollection";
|
private void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException {
|
||||||
|
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||||
|
params.set("action", CollectionParams.CollectionAction.MIGRATE.toString());
|
||||||
|
params.set("collection", sourceCollection);
|
||||||
|
params.set("target.collection", targetCollection);
|
||||||
|
params.set("split.key", splitKey);
|
||||||
|
params.set("forward.timeout", 45);
|
||||||
|
|
||||||
|
invoke(params);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invoke(ModifiableSolrParams params) throws SolrServerException, IOException {
|
||||||
|
SolrRequest request = new QueryRequest(params);
|
||||||
|
request.setPath("/admin/collections");
|
||||||
|
|
||||||
|
String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient)
|
||||||
|
.getBaseURL();
|
||||||
|
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
|
||||||
|
|
||||||
|
HttpSolrServer baseServer = new HttpSolrServer(baseUrl);
|
||||||
|
baseServer.setConnectionTimeout(15000);
|
||||||
|
baseServer.setSoTimeout(60000 * 5);
|
||||||
|
baseServer.request(request);
|
||||||
|
baseServer.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createCollection(String targetCollection) throws Exception {
|
||||||
HashMap<String, List<Integer>> collectionInfos = new HashMap<String, List<Integer>>();
|
HashMap<String, List<Integer>> collectionInfos = new HashMap<String, List<Integer>>();
|
||||||
CloudSolrServer client = null;
|
CloudSolrServer client = null;
|
||||||
try {
|
try {
|
||||||
|
@ -122,38 +159,34 @@ public class MigrateRouteKeyTest extends BasicDistributedZkTest {
|
||||||
checkForCollection(targetCollection, list, null);
|
checkForCollection(targetCollection, list, null);
|
||||||
|
|
||||||
waitForRecoveriesToFinish(targetCollection, false);
|
waitForRecoveriesToFinish(targetCollection, false);
|
||||||
|
}
|
||||||
|
|
||||||
class Indexer extends Thread {
|
private void multipleShardMigrateTest() throws Exception {
|
||||||
final int seconds;
|
del("*:*");
|
||||||
|
commit();
|
||||||
public Indexer(int seconds) {
|
assertTrue(cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound() == 0);
|
||||||
this.seconds = seconds;
|
final String splitKey = "a";
|
||||||
}
|
final int BIT_SEP = 1;
|
||||||
|
final int[] splitKeyCount = new int[1];
|
||||||
@Override
|
for (int id = 0; id < 26*3; id++) {
|
||||||
public void run() {
|
String shardKey = "" + (char) ('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution
|
||||||
long start = System.currentTimeMillis();
|
String key = shardKey;
|
||||||
for (int id = 26*3; id < 500 && System.currentTimeMillis() - start <= seconds*1000; id++) {
|
if (splitKey.equals(shardKey)) {
|
||||||
String shardKey = "" + (char) ('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution
|
key += "/" + BIT_SEP; // spread it over half the collection
|
||||||
SolrInputDocument doc = new SolrInputDocument();
|
|
||||||
doc.addField("id", shardKey + "!" + id);
|
|
||||||
doc.addField("n_ti", id);
|
|
||||||
try {
|
|
||||||
cloudClient.add(doc);
|
|
||||||
if (splitKey.equals(shardKey + "!"))
|
|
||||||
splitKeyCount[0]++;
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Exception while adding document id: " + doc.getField("id"), e);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
Thread.sleep(50);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
SolrInputDocument doc = new SolrInputDocument();
|
||||||
|
doc.addField("id", key + "!" + id);
|
||||||
|
doc.addField("n_ti", id);
|
||||||
|
cloudClient.add(doc);
|
||||||
|
if (splitKey.equals(shardKey))
|
||||||
|
splitKeyCount[0]++;
|
||||||
}
|
}
|
||||||
Thread indexer = new Indexer(30);
|
assertTrue(splitKeyCount[0] > 0);
|
||||||
|
|
||||||
|
String targetCollection = "migrate_multipleshardtest_targetCollection";
|
||||||
|
createCollection(targetCollection);
|
||||||
|
|
||||||
|
Indexer indexer = new Indexer(cloudClient, splitKey, 1, 30);
|
||||||
indexer.start();
|
indexer.start();
|
||||||
|
|
||||||
String url = CustomCollectionTest.getUrlFromZk(getCommonCloudSolrServer().getZkStateReader().getClusterState(), targetCollection);
|
String url = CustomCollectionTest.getUrlFromZk(getCommonCloudSolrServer().getZkStateReader().getClusterState(), targetCollection);
|
||||||
|
@ -162,68 +195,76 @@ public class MigrateRouteKeyTest extends BasicDistributedZkTest {
|
||||||
SolrQuery solrQuery = new SolrQuery("*:*");
|
SolrQuery solrQuery = new SolrQuery("*:*");
|
||||||
assertEquals("DocCount on target collection does not match", 0, collectionClient.query(solrQuery).getResults().getNumFound());
|
assertEquals("DocCount on target collection does not match", 0, collectionClient.query(solrQuery).getResults().getNumFound());
|
||||||
|
|
||||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
invokeMigrateApi(AbstractDistribZkTestBase.DEFAULT_COLLECTION, splitKey + "/" + BIT_SEP + "!", targetCollection);
|
||||||
params.set("action", CollectionParams.CollectionAction.MIGRATE.toString());
|
|
||||||
params.set("collection", AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
|
||||||
params.set("target.collection", targetCollection);
|
|
||||||
params.set("split.key", splitKey);
|
|
||||||
params.set("forward.timeout", 45);
|
|
||||||
|
|
||||||
SolrRequest request = new QueryRequest(params);
|
|
||||||
request.setPath("/admin/collections");
|
|
||||||
|
|
||||||
String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient)
|
|
||||||
.getBaseURL();
|
|
||||||
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
|
|
||||||
|
|
||||||
HttpSolrServer baseServer = new HttpSolrServer(baseUrl);
|
|
||||||
baseServer.setConnectionTimeout(15000);
|
|
||||||
baseServer.setSoTimeout(60000 * 5);
|
|
||||||
baseServer.request(request);
|
|
||||||
baseServer.shutdown();
|
|
||||||
long finishTime = System.currentTimeMillis();
|
long finishTime = System.currentTimeMillis();
|
||||||
|
|
||||||
indexer.join();
|
indexer.join();
|
||||||
|
splitKeyCount[0] += indexer.getSplitKeyCount();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
cloudClient.deleteById("a!104");
|
cloudClient.deleteById("a/" + BIT_SEP + "!104");
|
||||||
splitKeyCount[0]--;
|
splitKeyCount[0]--;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("Error deleting document a!104", e);
|
log.warn("Error deleting document a/" + BIT_SEP + "!104", e);
|
||||||
}
|
}
|
||||||
cloudClient.commit();
|
cloudClient.commit();
|
||||||
collectionClient.commit();
|
collectionClient.commit();
|
||||||
|
|
||||||
|
solrQuery = new SolrQuery("*:*").setRows(1000);
|
||||||
|
QueryResponse response = collectionClient.query(solrQuery);
|
||||||
|
log.info("Response from target collection: " + response);
|
||||||
|
assertEquals("DocCount on target collection does not match", splitKeyCount[0], response.getResults().getNumFound());
|
||||||
|
|
||||||
getCommonCloudSolrServer().getZkStateReader().updateClusterState(true);
|
getCommonCloudSolrServer().getZkStateReader().updateClusterState(true);
|
||||||
ClusterState state = getCommonCloudSolrServer().getZkStateReader().getClusterState();
|
ClusterState state = getCommonCloudSolrServer().getZkStateReader().getClusterState();
|
||||||
Slice slice = state.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD2);
|
Slice slice = state.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD2);
|
||||||
assertNotNull("Routing rule map is null", slice.getRoutingRules());
|
assertNotNull("Routing rule map is null", slice.getRoutingRules());
|
||||||
assertFalse("Routing rule map is empty", slice.getRoutingRules().isEmpty());
|
assertFalse("Routing rule map is empty", slice.getRoutingRules().isEmpty());
|
||||||
assertNotNull("No routing rule exists for route key: " + splitKey, slice.getRoutingRules().get(splitKey));
|
assertNotNull("No routing rule exists for route key: " + splitKey, slice.getRoutingRules().get(splitKey + "!"));
|
||||||
|
|
||||||
boolean ruleRemoved = false;
|
boolean ruleRemoved = waitForRuleToExpire(splitKey, finishTime);
|
||||||
while (System.currentTimeMillis() - finishTime < 60000) {
|
assertTrue("Routing rule was not expired", ruleRemoved);
|
||||||
getCommonCloudSolrServer().getZkStateReader().updateClusterState(true);
|
}
|
||||||
state = getCommonCloudSolrServer().getZkStateReader().getClusterState();
|
|
||||||
slice = state.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD2);
|
static class Indexer extends Thread {
|
||||||
Map<String,RoutingRule> routingRules = slice.getRoutingRules();
|
final int seconds;
|
||||||
if (routingRules == null || routingRules.isEmpty() || !routingRules.containsKey(splitKey)) {
|
final CloudSolrServer cloudClient;
|
||||||
ruleRemoved = true;
|
final String splitKey;
|
||||||
break;
|
int splitKeyCount = 0;
|
||||||
}
|
final int bitSep;
|
||||||
SolrInputDocument doc = new SolrInputDocument();
|
|
||||||
doc.addField("id", splitKey + System.currentTimeMillis());
|
public Indexer(CloudSolrServer cloudClient, String splitKey, int bitSep, int seconds) {
|
||||||
cloudClient.add(doc);
|
this.seconds = seconds;
|
||||||
Thread.sleep(1000);
|
this.cloudClient = cloudClient;
|
||||||
|
this.splitKey = splitKey;
|
||||||
|
this.bitSep = bitSep;
|
||||||
}
|
}
|
||||||
|
|
||||||
assertTrue("Routing rule was not expired", ruleRemoved);
|
@Override
|
||||||
|
public void run() {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
for (int id = 26*3; id < 500 && System.currentTimeMillis() - start <= seconds*1000; id++) {
|
||||||
|
String shardKey = "" + (char) ('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution
|
||||||
|
SolrInputDocument doc = new SolrInputDocument();
|
||||||
|
doc.addField("id", shardKey + (bitSep != -1 ? "/" + bitSep : "") + "!" + id);
|
||||||
|
doc.addField("n_ti", id);
|
||||||
|
try {
|
||||||
|
cloudClient.add(doc);
|
||||||
|
if (splitKey.equals(shardKey))
|
||||||
|
splitKeyCount++;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Exception while adding document id: " + doc.getField("id"), e);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(50);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
solrQuery = new SolrQuery("*:*").setRows(1000);
|
public int getSplitKeyCount() {
|
||||||
QueryResponse response = collectionClient.query(solrQuery);
|
return splitKeyCount;
|
||||||
log.info("Response from target collection: " + response);
|
}
|
||||||
assertEquals("DocCount on shard1_0 does not match", splitKeyCount[0], response.getResults().getNumFound());
|
|
||||||
|
|
||||||
printLayout();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue