mirror of https://github.com/apache/lucene.git
SOLR-5246: Shard splitting now supports collections configured with router.field
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1526151 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
627ee9eba6
commit
8dae16c6d9
|
@ -71,6 +71,9 @@ New Features
|
|||
* SOLR-5167: Add support for AnalyzingInfixSuggester (AnalyzingInfixLookupFactory).
|
||||
(Areek Zillur, Varun Thacker via Robert Muir)
|
||||
|
||||
* SOLR-5246: Shard splitting now supports collections configured with router.field.
|
||||
(shalin)
|
||||
|
||||
Security
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -542,7 +542,6 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
params.set(CoreAdminParams.SHARD, subSlice);
|
||||
params.set(CoreAdminParams.SHARD_RANGE, subRange.toString());
|
||||
params.set(CoreAdminParams.SHARD_STATE, Slice.CONSTRUCTION);
|
||||
//params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); todo: is it necessary, we're not creating collections?
|
||||
|
||||
sendShardRequest(nodeName, params);
|
||||
}
|
||||
|
|
|
@ -77,6 +77,8 @@ import java.util.Map;
|
|||
import java.util.Properties;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
|
||||
|
||||
/**
|
||||
*
|
||||
* @since solr 1.3
|
||||
|
@ -248,6 +250,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
int partitions = pathsArr != null ? pathsArr.length : newCoreNames.length;
|
||||
|
||||
DocRouter router = null;
|
||||
String routeFieldName = null;
|
||||
if (coreContainer.isZooKeeperAware()) {
|
||||
ClusterState clusterState = coreContainer.getZkController().getClusterState();
|
||||
String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
|
||||
|
@ -257,6 +260,10 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
DocRouter.Range currentRange = slice.getRange();
|
||||
router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
|
||||
ranges = currentRange != null ? router.partitionRange(partitions, currentRange) : null;
|
||||
Map m = (Map) collection.get(DOC_ROUTER);
|
||||
if (m != null) {
|
||||
routeFieldName = (String) m.get("field");
|
||||
}
|
||||
}
|
||||
|
||||
if (pathsArr == null) {
|
||||
|
@ -274,7 +281,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
}
|
||||
|
||||
|
||||
SplitIndexCommand cmd = new SplitIndexCommand(req, paths, newCores, ranges, router);
|
||||
SplitIndexCommand cmd = new SplitIndexCommand(req, paths, newCores, ranges, router, routeFieldName);
|
||||
core.getUpdateHandler().split(cmd);
|
||||
|
||||
// After the split has completed, someone (here?) should start the process of replaying the buffered updates.
|
||||
|
|
|
@ -33,10 +33,8 @@ import org.apache.lucene.util.IOUtils;
|
|||
import org.apache.lucene.util.OpenBitSet;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.HashBasedRouter;
|
||||
import org.apache.solr.common.util.Hash;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.schema.SchemaField;
|
||||
import org.apache.solr.schema.StrField;
|
||||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
import org.apache.solr.util.RefCounted;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -59,10 +57,10 @@ public class SolrIndexSplitter {
|
|||
HashBasedRouter hashRouter;
|
||||
int numPieces;
|
||||
int currPartition = 0;
|
||||
String routeFieldName;
|
||||
|
||||
public SolrIndexSplitter(SplitIndexCommand cmd) {
|
||||
searcher = cmd.getReq().getSearcher();
|
||||
field = searcher.getSchema().getUniqueKeyField();
|
||||
ranges = cmd.ranges;
|
||||
paths = cmd.paths;
|
||||
cores = cmd.cores;
|
||||
|
@ -75,6 +73,12 @@ public class SolrIndexSplitter {
|
|||
numPieces = ranges.size();
|
||||
rangesArr = ranges.toArray(new DocRouter.Range[ranges.size()]);
|
||||
}
|
||||
routeFieldName = cmd.routeFieldName;
|
||||
if (routeFieldName == null) {
|
||||
field = searcher.getSchema().getUniqueKeyField();
|
||||
} else {
|
||||
field = searcher.getSchema().getField(routeFieldName);
|
||||
}
|
||||
}
|
||||
|
||||
public void split() throws IOException {
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.solr.request.SolrQueryRequest;
|
|||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A merge indexes command encapsulated in an object.
|
||||
* A split index command encapsulated in an object.
|
||||
*
|
||||
* @since solr 1.4
|
||||
*
|
||||
|
@ -35,13 +35,15 @@ public class SplitIndexCommand extends UpdateCommand {
|
|||
public List<SolrCore> cores; // either paths or cores should be specified
|
||||
public List<DocRouter.Range> ranges;
|
||||
public DocRouter router;
|
||||
public String routeFieldName;
|
||||
|
||||
public SplitIndexCommand(SolrQueryRequest req, List<String> paths, List<SolrCore> cores, List<DocRouter.Range> ranges, DocRouter router) {
|
||||
public SplitIndexCommand(SolrQueryRequest req, List<String> paths, List<SolrCore> cores, List<DocRouter.Range> ranges, DocRouter router, String routeFieldName) {
|
||||
super(req);
|
||||
this.paths = paths;
|
||||
this.cores = cores;
|
||||
this.ranges = ranges;
|
||||
this.router = router;
|
||||
this.routeFieldName = routeFieldName;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -135,7 +135,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
|
|||
killerThread.start();
|
||||
killCounter.incrementAndGet();
|
||||
|
||||
splitShard(SHARD1);
|
||||
splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
|
||||
|
||||
log.info("Layout after split: \n");
|
||||
printLayout();
|
||||
|
|
|
@ -201,7 +201,7 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase {
|
|||
List<Integer> list = entry.getValue();
|
||||
checkForCollection(collection, list, null);
|
||||
|
||||
String url = getUrlFromZk(collection);
|
||||
String url = getUrlFromZk(getCommonCloudSolrServer().getZkStateReader().getClusterState(), collection);
|
||||
|
||||
HttpSolrServer collectionClient = new HttpSolrServer(url);
|
||||
|
||||
|
@ -226,7 +226,7 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
String collectionName = collectionNameList.get(random().nextInt(collectionNameList.size()));
|
||||
|
||||
String url = getUrlFromZk(collectionName);
|
||||
String url = getUrlFromZk(getCommonCloudSolrServer().getZkStateReader().getClusterState(), collectionName);
|
||||
|
||||
HttpSolrServer collectionClient = new HttpSolrServer(url);
|
||||
|
||||
|
@ -325,7 +325,7 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase {
|
|||
checkForCollection(collectionName, list, null);
|
||||
|
||||
|
||||
url = getUrlFromZk(collectionName);
|
||||
url = getUrlFromZk(getCommonCloudSolrServer().getZkStateReader().getClusterState(), collectionName);
|
||||
|
||||
collectionClient = new HttpSolrServer(url);
|
||||
|
||||
|
@ -386,7 +386,7 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase {
|
|||
checkForCollection(collectionName, list, null);
|
||||
|
||||
|
||||
String url = getUrlFromZk(collectionName);
|
||||
String url = getUrlFromZk(getCommonCloudSolrServer().getZkStateReader().getClusterState(), collectionName);
|
||||
|
||||
HttpSolrServer collectionClient = new HttpSolrServer(url);
|
||||
|
||||
|
@ -420,8 +420,7 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
|
||||
|
||||
private String getUrlFromZk(String collection) {
|
||||
ClusterState clusterState = getCommonCloudSolrServer().getZkStateReader().getClusterState();
|
||||
public static String getUrlFromZk(ClusterState clusterState, String collection) {
|
||||
Map<String,Slice> slices = clusterState.getCollectionStates().get(collection).getSlicesMap();
|
||||
|
||||
if (slices == null) {
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.solr.cloud;
|
|||
*/
|
||||
|
||||
import org.apache.http.params.CoreConnectionPNames;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServer;
|
||||
|
@ -33,6 +34,7 @@ import org.apache.solr.common.cloud.HashBasedRouter;
|
|||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
|
@ -50,6 +52,12 @@ import java.util.Map;
|
|||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
|
||||
|
||||
@Slow
|
||||
public class ShardSplitTest extends BasicDistributedZkTest {
|
||||
|
||||
public static final String SHARD1_0 = SHARD1 + "_0";
|
||||
|
@ -100,6 +108,17 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
public void doTest() throws Exception {
|
||||
waitForThingsToLevelOut(15);
|
||||
|
||||
splitByUniqueKeyTest();
|
||||
splitByRouteFieldTest();
|
||||
|
||||
// todo can't call waitForThingsToLevelOut because it looks for jettys of all shards
|
||||
// and the new sub-shards don't have any.
|
||||
waitForRecoveriesToFinish(true);
|
||||
//waitForThingsToLevelOut(15);
|
||||
|
||||
}
|
||||
|
||||
private void splitByUniqueKeyTest() throws Exception {
|
||||
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
|
||||
final DocRouter router = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getRouter();
|
||||
Slice shard1 = clusterState.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
|
||||
|
@ -148,7 +167,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
try {
|
||||
for (int i = 0; i < 3; i++) {
|
||||
try {
|
||||
splitShard(SHARD1);
|
||||
splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
|
||||
log.info("Layout after split: \n");
|
||||
printLayout();
|
||||
break;
|
||||
|
@ -171,11 +190,83 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
}
|
||||
|
||||
checkDocCountsAndShardStates(docCounts, numReplicas);
|
||||
}
|
||||
|
||||
// todo can't call waitForThingsToLevelOut because it looks for jettys of all shards
|
||||
// and the new sub-shards don't have any.
|
||||
waitForRecoveriesToFinish(true);
|
||||
//waitForThingsToLevelOut(15);
|
||||
|
||||
public void splitByRouteFieldTest() throws Exception {
|
||||
log.info("Starting testSplitWithRouteField");
|
||||
String collectionName = "routeFieldColl";
|
||||
int numShards = 4;
|
||||
int replicationFactor = 2;
|
||||
int maxShardsPerNode = (((numShards * replicationFactor) / getCommonCloudSolrServer()
|
||||
.getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
|
||||
|
||||
HashMap<String, List<Integer>> collectionInfos = new HashMap<String, List<Integer>>();
|
||||
CloudSolrServer client = null;
|
||||
String shard_fld = "shard_s";
|
||||
try {
|
||||
client = createCloudClient(null);
|
||||
Map<String, Object> props = ZkNodeProps.makeMap(
|
||||
REPLICATION_FACTOR, replicationFactor,
|
||||
MAX_SHARDS_PER_NODE, maxShardsPerNode,
|
||||
NUM_SLICES, numShards,
|
||||
"router.field", shard_fld);
|
||||
|
||||
createCollection(collectionInfos, collectionName,props,client);
|
||||
} finally {
|
||||
if (client != null) client.shutdown();
|
||||
}
|
||||
|
||||
List<Integer> list = collectionInfos.get(collectionName);
|
||||
checkForCollection(collectionName, list, null);
|
||||
|
||||
waitForRecoveriesToFinish(false);
|
||||
|
||||
String url = CustomCollectionTest.getUrlFromZk(getCommonCloudSolrServer().getZkStateReader().getClusterState(), collectionName);
|
||||
|
||||
HttpSolrServer collectionClient = new HttpSolrServer(url);
|
||||
|
||||
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
|
||||
final DocRouter router = clusterState.getCollection(collectionName).getRouter();
|
||||
Slice shard1 = clusterState.getSlice(collectionName, SHARD1);
|
||||
DocRouter.Range shard1Range = shard1.getRange() != null ? shard1.getRange() : router.fullRange();
|
||||
final List<DocRouter.Range> ranges = router.partitionRange(2, shard1Range);
|
||||
final int[] docCounts = new int[ranges.size()];
|
||||
|
||||
for (int i = 100; i <= 200; i++) {
|
||||
String shardKey = "" + (char)('a' + (i % 26)); // See comment in ShardRoutingTest for hash distribution
|
||||
|
||||
collectionClient.add(getDoc(id, i, "n_ti", i, shard_fld, shardKey));
|
||||
int idx = getHashRangeIdx(router, ranges, shardKey);
|
||||
if (idx != -1) {
|
||||
docCounts[idx]++;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < docCounts.length; i++) {
|
||||
int docCount = docCounts[i];
|
||||
log.info("Shard {} docCount = {}", "shard1_" + i, docCount);
|
||||
}
|
||||
|
||||
collectionClient.commit();
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
try {
|
||||
splitShard(collectionName, SHARD1);
|
||||
break;
|
||||
} catch (HttpSolrServer.RemoteSolrException e) {
|
||||
if (e.code() != 500) {
|
||||
throw e;
|
||||
}
|
||||
log.error("SPLITSHARD failed. " + (i < 2 ? " Retring split" : ""), e);
|
||||
if (i == 2) {
|
||||
fail("SPLITSHARD was not successful even after three tries");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(docCounts[0], collectionClient.query(new SolrQuery("*:*").setParam("shards", "shard1_0")).getResults().getNumFound());
|
||||
assertEquals(docCounts[1], collectionClient.query(new SolrQuery("*:*").setParam("shards", "shard1_1")).getResults().getNumFound());
|
||||
}
|
||||
|
||||
protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas) throws Exception {
|
||||
|
@ -248,10 +339,10 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
}
|
||||
}
|
||||
|
||||
protected void splitShard(String shardId) throws SolrServerException, IOException {
|
||||
protected void splitShard(String collection, String shardId) throws SolrServerException, IOException {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("action", CollectionParams.CollectionAction.SPLITSHARD.toString());
|
||||
params.set("collection", "collection1");
|
||||
params.set("collection", collection);
|
||||
params.set("shard", shardId);
|
||||
SolrRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
|
@ -269,7 +360,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
protected void indexAndUpdateCount(DocRouter router, List<DocRouter.Range> ranges, int[] docCounts, String id, int n) throws Exception {
|
||||
index("id", id, "n_ti", n);
|
||||
|
||||
int idx = getHashRangeIdx(router, ranges, docCounts, id);
|
||||
int idx = getHashRangeIdx(router, ranges, id);
|
||||
if (idx != -1) {
|
||||
docCounts[idx]++;
|
||||
}
|
||||
|
@ -279,13 +370,13 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
controlClient.deleteById(id);
|
||||
cloudClient.deleteById(id);
|
||||
|
||||
int idx = getHashRangeIdx(router, ranges, docCounts, id);
|
||||
int idx = getHashRangeIdx(router, ranges, id);
|
||||
if (idx != -1) {
|
||||
docCounts[idx]--;
|
||||
}
|
||||
}
|
||||
|
||||
private int getHashRangeIdx(DocRouter router, List<DocRouter.Range> ranges, int[] docCounts, String id) {
|
||||
public static int getHashRangeIdx(DocRouter router, List<DocRouter.Range> ranges, String id) {
|
||||
int hash = 0;
|
||||
if (router instanceof HashBasedRouter) {
|
||||
HashBasedRouter hashBasedRouter = (HashBasedRouter) router;
|
||||
|
|
|
@ -96,7 +96,7 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
|
|||
request = lrf.makeRequest("q", "dummy");
|
||||
|
||||
SplitIndexCommand command = new SplitIndexCommand(request,
|
||||
Lists.newArrayList(indexDir1.getAbsolutePath(), indexDir2.getAbsolutePath()), null, ranges, new PlainIdRouter());
|
||||
Lists.newArrayList(indexDir1.getAbsolutePath(), indexDir2.getAbsolutePath()), null, ranges, new PlainIdRouter(), null);
|
||||
new SolrIndexSplitter(command).split();
|
||||
|
||||
Directory directory = h.getCore().getDirectoryFactory().get(indexDir1.getAbsolutePath(),
|
||||
|
@ -141,7 +141,7 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
|
|||
request = lrf.makeRequest("q", "dummy");
|
||||
|
||||
SplitIndexCommand command = new SplitIndexCommand(request,
|
||||
Lists.newArrayList(indexDir1.getAbsolutePath(), indexDir2.getAbsolutePath()), null, ranges, new PlainIdRouter());
|
||||
Lists.newArrayList(indexDir1.getAbsolutePath(), indexDir2.getAbsolutePath()), null, ranges, new PlainIdRouter(), null);
|
||||
new SolrIndexSplitter(command).split();
|
||||
|
||||
Directory directory = h.getCore().getDirectoryFactory().get(indexDir1.getAbsolutePath(),
|
||||
|
@ -198,7 +198,7 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
|
|||
try {
|
||||
request = lrf.makeRequest("q", "dummy");
|
||||
|
||||
SplitIndexCommand command = new SplitIndexCommand(request, null, Lists.newArrayList(core1, core2), ranges, new PlainIdRouter());
|
||||
SplitIndexCommand command = new SplitIndexCommand(request, null, Lists.newArrayList(core1, core2), ranges, new PlainIdRouter(), null);
|
||||
new SolrIndexSplitter(command).split();
|
||||
} finally {
|
||||
if (request != null) request.close();
|
||||
|
@ -235,7 +235,7 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
|
|||
request = lrf.makeRequest("q", "dummy");
|
||||
|
||||
SplitIndexCommand command = new SplitIndexCommand(request,
|
||||
Lists.newArrayList(indexDir1.getAbsolutePath(), indexDir2.getAbsolutePath(), indexDir3.getAbsolutePath()), null, null, new PlainIdRouter());
|
||||
Lists.newArrayList(indexDir1.getAbsolutePath(), indexDir2.getAbsolutePath(), indexDir3.getAbsolutePath()), null, null, new PlainIdRouter(), null);
|
||||
new SolrIndexSplitter(command).split();
|
||||
|
||||
directory = h.getCore().getDirectoryFactory().get(indexDir1.getAbsolutePath(),
|
||||
|
|
Loading…
Reference in New Issue