mirror of https://github.com/apache/lucene.git
SOLR-6101: Shard splitting doesn't work when legacyCloud=false is set in cluster properties
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1596828 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2af3939769
commit
75b9821eb4
|
@ -128,6 +128,9 @@ Bug Fixes
|
|||
* SOLR-6104: The 'addreplica' Collection API does not support 'async' parameter.
|
||||
(shalin)
|
||||
|
||||
* SOLR-6101: Shard splitting doesn't work when legacyCloud=false is set in
|
||||
cluster properties. (shalin)
|
||||
|
||||
Other Changes
|
||||
---------------------
|
||||
|
||||
|
|
|
@ -1321,7 +1321,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
String subShardName = subShardNames.get(i);
|
||||
DocRouter.Range subRange = subRanges.get(i);
|
||||
|
||||
log.info("Creating shard " + subShardName + " as part of slice "
|
||||
log.info("Creating slice "
|
||||
+ subSlice + " of collection " + collectionName + " on "
|
||||
+ nodeName);
|
||||
|
||||
|
@ -1332,19 +1332,35 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
|
||||
propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.CONSTRUCTION);
|
||||
propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
|
||||
ZkNodeProps m = new ZkNodeProps(propMap);
|
||||
DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
|
||||
inQueue.offer(ZkStateReader.toJSON(m));
|
||||
inQueue.offer(ZkStateReader.toJSON(new ZkNodeProps(propMap)));
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
|
||||
// wait until we are able to see the new shard in cluster state
|
||||
waitForNewShard(collectionName, subSlice);
|
||||
|
||||
params.set(CoreAdminParams.NAME, subShardName);
|
||||
params.set(CoreAdminParams.COLLECTION, collectionName);
|
||||
params.set(CoreAdminParams.SHARD, subSlice);
|
||||
setupAsyncRequest(asyncId, requestMap, params, nodeName);
|
||||
addPropertyParams(message, params);
|
||||
sendShardRequest(nodeName, params, shardHandler);
|
||||
// refresh cluster state
|
||||
clusterState = zkStateReader.getClusterState();
|
||||
|
||||
log.info("Adding replica " + subShardName + " as part of slice "
|
||||
+ subSlice + " of collection " + collectionName + " on "
|
||||
+ nodeName);
|
||||
propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
|
||||
propMap.put(COLLECTION_PROP, collectionName);
|
||||
propMap.put(SHARD_ID_PROP, subSlice);
|
||||
propMap.put("node", nodeName);
|
||||
propMap.put(CoreAdminParams.NAME, subShardName);
|
||||
// copy over property params:
|
||||
for (String key : message.keySet()) {
|
||||
if (key.startsWith(COLL_PROP_PREFIX)) {
|
||||
propMap.put(key, message.getStr(key));
|
||||
}
|
||||
}
|
||||
// add async param
|
||||
if(asyncId != null) {
|
||||
propMap.put(ASYNC, asyncId);
|
||||
}
|
||||
addReplica(clusterState, new ZkNodeProps(propMap), results);
|
||||
}
|
||||
|
||||
collectShardResponses(results, true,
|
||||
|
@ -1355,7 +1371,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
for (String subShardName : subShardNames) {
|
||||
// wait for parent leader to acknowledge the sub-shard core
|
||||
log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
|
||||
String coreNodeName = waitForCoreNodeName(collection, nodeName, subShardName);
|
||||
String coreNodeName = waitForCoreNodeName(collectionName, nodeName, subShardName);
|
||||
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
|
||||
cmd.setCoreName(subShardName);
|
||||
cmd.setNodeName(nodeName);
|
||||
|
@ -1463,29 +1479,25 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
+ sliceName + " of collection " + collectionName + " on "
|
||||
+ subShardNodeName);
|
||||
|
||||
// Need to create new params for each request
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
|
||||
|
||||
params.set(CoreAdminParams.NAME, shardName);
|
||||
params.set(CoreAdminParams.COLLECTION, collectionName);
|
||||
params.set(CoreAdminParams.SHARD, sliceName);
|
||||
if(asyncId != null) {
|
||||
String requestId = asyncId + Math.abs(System.nanoTime());
|
||||
params.set(ASYNC, requestId);
|
||||
requestMap.put(subShardNodeName, requestId);
|
||||
HashMap<String, Object> propMap = new HashMap<>();
|
||||
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
|
||||
propMap.put(COLLECTION_PROP, collectionName);
|
||||
propMap.put(SHARD_ID_PROP, sliceName);
|
||||
propMap.put("node", subShardNodeName);
|
||||
propMap.put(CoreAdminParams.NAME, shardName);
|
||||
// copy over property params:
|
||||
for (String key : message.keySet()) {
|
||||
if (key.startsWith(COLL_PROP_PREFIX)) {
|
||||
propMap.put(key, message.getStr(key));
|
||||
}
|
||||
}
|
||||
// add async param
|
||||
if(asyncId != null) {
|
||||
propMap.put(ASYNC, asyncId);
|
||||
}
|
||||
addReplica(clusterState, new ZkNodeProps(propMap), results);
|
||||
|
||||
addPropertyParams(message, params);
|
||||
// TODO: Figure the config used by the parent shard and use it.
|
||||
//params.set("collection.configName", configName);
|
||||
|
||||
//Not using this property. Do we really need to use it?
|
||||
//params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
|
||||
|
||||
sendShardRequest(subShardNodeName, params, shardHandler);
|
||||
|
||||
String coreNodeName = waitForCoreNodeName(collection, subShardNodeName, shardName);
|
||||
String coreNodeName = waitForCoreNodeName(collectionName, subShardNodeName, shardName);
|
||||
// wait for the replicas to be seen as active on sub shard leader
|
||||
log.info("Asking sub shard leader to wait for: " + shardName + " to be alive on: " + subShardNodeName);
|
||||
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
|
||||
|
@ -1582,11 +1594,11 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
}
|
||||
}
|
||||
|
||||
private String waitForCoreNodeName(DocCollection collection, String msgNodeName, String msgCore) {
|
||||
private String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) {
|
||||
int retryCount = 320;
|
||||
while (retryCount-- > 0) {
|
||||
Map<String,Slice> slicesMap = zkStateReader.getClusterState()
|
||||
.getSlicesMap(collection.getName());
|
||||
.getSlicesMap(collectionName);
|
||||
if (slicesMap != null) {
|
||||
|
||||
for (Slice slice : slicesMap.values()) {
|
||||
|
@ -1611,6 +1623,31 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
|
||||
}
|
||||
|
||||
private void waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
|
||||
log.info("Waiting for slice {} of collection {} to be available", sliceName, collectionName);
|
||||
long startTime = System.currentTimeMillis();
|
||||
int retryCount = 320;
|
||||
while (retryCount-- > 0) {
|
||||
DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
|
||||
if (collection == null) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR,
|
||||
"Unable to find collection: " + collectionName + " in clusterstate");
|
||||
}
|
||||
Slice slice = collection.getSlice(sliceName);
|
||||
if (slice != null) {
|
||||
log.info("Waited for {} seconds for slice {} of collection {} to be available",
|
||||
(System.currentTimeMillis() - startTime) / 1000, sliceName, collectionName);
|
||||
return;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
zkStateReader.updateClusterState(true);
|
||||
}
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR,
|
||||
"Could not find new slice " + sliceName + " in collection " + collectionName
|
||||
+ " even after waiting for " + (System.currentTimeMillis() - startTime) / 1000 + " seconds"
|
||||
);
|
||||
}
|
||||
|
||||
private void collectShardResponses(NamedList results, boolean abortOnError,
|
||||
String msgOnError,
|
||||
ShardHandler shardHandler) {
|
||||
|
@ -1851,7 +1888,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
Replica tempSourceLeader = zkStateReader.getLeaderRetry(tempSourceCollectionName, tempSourceSlice.getName(), 120000);
|
||||
|
||||
String tempCollectionReplica1 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica1";
|
||||
String coreNodeName = waitForCoreNodeName(clusterState.getCollection(tempSourceCollectionName),
|
||||
String coreNodeName = waitForCoreNodeName(tempSourceCollectionName,
|
||||
sourceLeader.getNodeName(), tempCollectionReplica1);
|
||||
// wait for the replicas to be seen as active on temp source leader
|
||||
log.info("Asking source leader to wait for: " + tempCollectionReplica1 + " to be alive on: " + sourceLeader.getNodeName());
|
||||
|
@ -1901,7 +1938,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
|
||||
completeAsyncRequest(asyncId, requestMap, results);
|
||||
|
||||
coreNodeName = waitForCoreNodeName(clusterState.getCollection(tempSourceCollectionName),
|
||||
coreNodeName = waitForCoreNodeName(tempSourceCollectionName,
|
||||
targetLeader.getNodeName(), tempCollectionReplica2);
|
||||
// wait for the replicas to be seen as active on temp source leader
|
||||
log.info("Asking temp source leader to wait for: " + tempCollectionReplica2 + " to be alive on: " + targetLeader.getNodeName());
|
||||
|
|
|
@ -96,6 +96,10 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
public void doTest() throws Exception {
|
||||
waitForThingsToLevelOut(15);
|
||||
|
||||
if (usually()) {
|
||||
log.info("Using legacyCloud=false for cluster");
|
||||
CollectionsAPIDistributedZkTest.setClusterProp(cloudClient, "legacyCloud", "false");
|
||||
}
|
||||
incompleteOrOverlappingCustomRangeTest();
|
||||
splitByUniqueKeyTest();
|
||||
splitByRouteFieldTest();
|
||||
|
|
Loading…
Reference in New Issue