mirror of https://github.com/apache/lucene.git
SOLR-4797: Shard splitting creates sub shards which have the wrong hash range in cluster state. This happens when numShards is not a power of two and router is compositeId
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1480478 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
84d08b269c
commit
5ef9e9eff4
|
@ -104,6 +104,9 @@ Bug Fixes
|
|||
may be placed in the wrong shard when the default compositeId router
|
||||
is used in conjunction with IDs containing "!". (yonik)
|
||||
|
||||
* SOLR-4797: Shard splitting creates sub shards which have the wrong hash
|
||||
range in cluster state. This happens when numShards is not a power of two
|
||||
and router is compositeId. (shalin)
|
||||
|
||||
|
||||
Other Changes
|
||||
|
|
|
@ -320,21 +320,22 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
|
||||
private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
|
||||
log.info("Split shard invoked");
|
||||
String collection = message.getStr("collection");
|
||||
String collectionName = message.getStr("collection");
|
||||
String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||
Slice parentSlice = clusterState.getSlice(collection, slice);
|
||||
Slice parentSlice = clusterState.getSlice(collectionName, slice);
|
||||
|
||||
if (parentSlice == null) {
|
||||
if(clusterState.getCollections().contains(collection)) {
|
||||
if(clusterState.getCollections().contains(collectionName)) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
|
||||
} else {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collection);
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collectionName);
|
||||
}
|
||||
}
|
||||
|
||||
// find the leader for the shard
|
||||
Replica parentShardLeader = clusterState.getLeader(collection, slice);
|
||||
|
||||
Replica parentShardLeader = clusterState.getLeader(collectionName, slice);
|
||||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
|
||||
DocRouter.Range range = parentSlice.getRange();
|
||||
if (range == null) {
|
||||
range = new PlainIdRouter().fullRange();
|
||||
|
@ -342,8 +343,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
|
||||
// todo: fixed to two partitions?
|
||||
// todo: accept the range as a param to api?
|
||||
// todo: handle randomizing subshard name in case a shard with the same name already exists.
|
||||
List<DocRouter.Range> subRanges = new PlainIdRouter().partitionRange(2, range);
|
||||
List<DocRouter.Range> subRanges = router.partitionRange(2, range);
|
||||
try {
|
||||
List<String> subSlices = new ArrayList<String>(subRanges.size());
|
||||
List<String> subShardNames = new ArrayList<String>(subRanges.size());
|
||||
|
@ -351,10 +351,10 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
for (int i = 0; i < subRanges.size(); i++) {
|
||||
String subSlice = slice + "_" + i;
|
||||
subSlices.add(subSlice);
|
||||
String subShardName = collection + "_" + subSlice + "_replica1";
|
||||
String subShardName = collectionName + "_" + subSlice + "_replica1";
|
||||
subShardNames.add(subShardName);
|
||||
|
||||
Slice oSlice = clusterState.getSlice(collection, subSlice);
|
||||
Slice oSlice = clusterState.getSlice(collectionName, subSlice);
|
||||
if (oSlice != null) {
|
||||
if (Slice.ACTIVE.equals(oSlice.getState())) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
|
||||
|
@ -386,14 +386,14 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
DocRouter.Range subRange = subRanges.get(i);
|
||||
|
||||
log.info("Creating shard " + subShardName + " as part of slice "
|
||||
+ subSlice + " of collection " + collection + " on "
|
||||
+ subSlice + " of collection " + collectionName + " on "
|
||||
+ nodeName);
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
|
||||
|
||||
params.set(CoreAdminParams.NAME, subShardName);
|
||||
params.set(CoreAdminParams.COLLECTION, collection);
|
||||
params.set(CoreAdminParams.COLLECTION, collectionName);
|
||||
params.set(CoreAdminParams.SHARD, subSlice);
|
||||
params.set(CoreAdminParams.SHARD_RANGE, subRange.toString());
|
||||
params.set(CoreAdminParams.SHARD_STATE, Slice.CONSTRUCTION);
|
||||
|
@ -421,10 +421,10 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
} while (srsp != null);
|
||||
|
||||
log.info("Successfully created all sub-shards for collection "
|
||||
+ collection + " parent shard: " + slice + " on: " + parentShardLeader);
|
||||
+ collectionName + " parent shard: " + slice + " on: " + parentShardLeader);
|
||||
|
||||
log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice "
|
||||
+ slice + " of collection " + collection + " on "
|
||||
+ slice + " of collection " + collectionName + " on "
|
||||
+ parentShardLeader);
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
|
@ -474,7 +474,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
|
||||
// TODO: Have replication factor decided in some other way instead of numShards for the parent
|
||||
|
||||
int repFactor = clusterState.getSlice(collection, slice).getReplicas().size();
|
||||
int repFactor = clusterState.getSlice(collectionName, slice).getReplicas().size();
|
||||
|
||||
// we need to look at every node and see how many cores it serves
|
||||
// add our new cores to existing nodes serving the least number of cores
|
||||
|
@ -501,10 +501,10 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
String sliceName = subSlices.get(i - 1);
|
||||
for (int j = 2; j <= repFactor; j++) {
|
||||
String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size());
|
||||
String shardName = collection + "_" + sliceName + "_replica" + (j);
|
||||
String shardName = collectionName + "_" + sliceName + "_replica" + (j);
|
||||
|
||||
log.info("Creating replica shard " + shardName + " as part of slice "
|
||||
+ sliceName + " of collection " + collection + " on "
|
||||
+ sliceName + " of collection " + collectionName + " on "
|
||||
+ subShardNodeName);
|
||||
|
||||
// Need to create new params for each request
|
||||
|
@ -512,7 +512,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
|
||||
|
||||
params.set(CoreAdminParams.NAME, shardName);
|
||||
params.set(CoreAdminParams.COLLECTION, collection);
|
||||
params.set(CoreAdminParams.COLLECTION, collectionName);
|
||||
params.set(CoreAdminParams.SHARD, sliceName);
|
||||
// TODO: Figure the config used by the parent shard and use it.
|
||||
//params.set("collection.configName", configName);
|
||||
|
@ -552,7 +552,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
for (String subSlice : subSlices) {
|
||||
propMap.put(subSlice, Slice.ACTIVE);
|
||||
}
|
||||
propMap.put(ZkStateReader.COLLECTION_PROP, collection);
|
||||
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
|
||||
ZkNodeProps m = new ZkNodeProps(propMap);
|
||||
inQueue.offer(ZkStateReader.toJSON(m));
|
||||
|
||||
|
@ -560,7 +560,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
} catch (SolrException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
log.error("Error executing split operation for collection: " + collection + " parent shard: " + slice, e);
|
||||
log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
|
|||
waitForThingsToLevelOut(15);
|
||||
|
||||
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
|
||||
DocRouter router = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getRouter();
|
||||
final DocRouter router = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getRouter();
|
||||
Slice shard1 = clusterState.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
|
||||
DocRouter.Range shard1Range = shard1.getRange() != null ? shard1.getRange() : router.fullRange();
|
||||
final List<DocRouter.Range> ranges = router.partitionRange(2, shard1Range);
|
||||
|
@ -78,7 +78,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
|
|||
try {
|
||||
del("*:*");
|
||||
for (int id = 0; id < 100; id++) {
|
||||
indexAndUpdateCount(ranges, docCounts, id);
|
||||
indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id));
|
||||
}
|
||||
commit();
|
||||
|
||||
|
@ -88,7 +88,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
|
|||
int max = atLeast(401);
|
||||
for (int id = 101; id < max; id++) {
|
||||
try {
|
||||
indexAndUpdateCount(ranges, docCounts, id);
|
||||
indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id));
|
||||
Thread.sleep(atLeast(25));
|
||||
} catch (Exception e) {
|
||||
log.error("Exception while adding doc", e);
|
||||
|
|
|
@ -28,13 +28,15 @@ import org.apache.solr.client.solrj.request.QueryRequest;
|
|||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.common.SolrDocument;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.CompositeIdRouter;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.HashBasedRouter;
|
||||
import org.apache.solr.common.cloud.PlainIdRouter;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.Hash;
|
||||
import org.apache.solr.handler.admin.CollectionsHandler;
|
||||
import org.apache.solr.update.DirectUpdateHandler2;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -52,6 +54,10 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
public static final String SHARD1_0 = SHARD1 + "_0";
|
||||
public static final String SHARD1_1 = SHARD1 + "_1";
|
||||
|
||||
public ShardSplitTest() {
|
||||
schemaString = "schema15.xml"; // we need a string id
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
@ -94,7 +100,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
waitForThingsToLevelOut(15);
|
||||
|
||||
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
|
||||
DocRouter router = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getRouter();
|
||||
final DocRouter router = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getRouter();
|
||||
Slice shard1 = clusterState.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
|
||||
DocRouter.Range shard1Range = shard1.getRange() != null ? shard1.getRange() : router.fullRange();
|
||||
final List<DocRouter.Range> ranges = router.partitionRange(2, shard1Range);
|
||||
|
@ -102,8 +108,9 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
int numReplicas = shard1.getReplicas().size();
|
||||
|
||||
del("*:*");
|
||||
for (int id = 0; id < 100; id++) {
|
||||
indexAndUpdateCount(ranges, docCounts, id);
|
||||
for (int id = 0; id <= 100; id++) {
|
||||
String shardKey = "" + (char)('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution
|
||||
indexAndUpdateCount(router, ranges, docCounts, shardKey + "!" + String.valueOf(id));
|
||||
}
|
||||
commit();
|
||||
|
||||
|
@ -113,7 +120,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
int max = atLeast(401);
|
||||
for (int id = 101; id < max; id++) {
|
||||
try {
|
||||
indexAndUpdateCount(ranges, docCounts, id);
|
||||
indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id));
|
||||
Thread.sleep(atLeast(25));
|
||||
} catch (Exception e) {
|
||||
log.error("Exception while adding doc", e);
|
||||
|
@ -201,12 +208,14 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
baseServer.request(request);
|
||||
}
|
||||
|
||||
protected void indexAndUpdateCount(List<DocRouter.Range> ranges, int[] docCounts, int id) throws Exception {
|
||||
indexr("id", id);
|
||||
protected void indexAndUpdateCount(DocRouter router, List<DocRouter.Range> ranges, int[] docCounts, String id) throws Exception {
|
||||
index("id", id);
|
||||
|
||||
// todo - hook in custom hashing
|
||||
byte[] bytes = String.valueOf(id).getBytes("UTF-8");
|
||||
int hash = Hash.murmurhash3_x86_32(bytes, 0, bytes.length, 0);
|
||||
int hash = 0;
|
||||
if (router instanceof HashBasedRouter) {
|
||||
HashBasedRouter hashBasedRouter = (HashBasedRouter) router;
|
||||
hash = hashBasedRouter.sliceHash(id, null, null);
|
||||
}
|
||||
for (int i = 0; i < ranges.size(); i++) {
|
||||
DocRouter.Range range = ranges.get(i);
|
||||
if (range.includes(hash))
|
||||
|
|
Loading…
Reference in New Issue