mirror of https://github.com/apache/lucene.git
SOLR-5308: Use source collection's configName to create temp collection. Fixed NPE routing rule wait loop. Wait for temp collection leader to be active before splitting the source index.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1544414 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0971bc8f21
commit
911ee1dcd3
|
@ -1163,7 +1163,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
||||||
Map<String, RoutingRule> rules = zkStateReader.getClusterState().getSlice(sourceCollection.getName(), sourceSlice.getName()).getRoutingRules();
|
Map<String, RoutingRule> rules = zkStateReader.getClusterState().getSlice(sourceCollection.getName(), sourceSlice.getName()).getRoutingRules();
|
||||||
if (rules != null) {
|
if (rules != null) {
|
||||||
RoutingRule rule = rules.get(splitKey);
|
RoutingRule rule = rules.get(splitKey);
|
||||||
if (rule.getRouteRanges().contains(splitRange)) {
|
if (rule != null && rule.getRouteRanges().contains(splitRange)) {
|
||||||
added = true;
|
added = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1179,13 +1179,13 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
||||||
Replica sourceLeader = sourceSlice.getLeader();
|
Replica sourceLeader = sourceSlice.getLeader();
|
||||||
|
|
||||||
// create a temporary collection with just one node on the shard leader
|
// create a temporary collection with just one node on the shard leader
|
||||||
String sourceLeaderUrl = zkStateReader.getZkClient().getBaseUrlForNodeName(sourceLeader.getNodeName());
|
String configName = zkStateReader.readConfigName(sourceCollection.getName());
|
||||||
if (sourceLeaderUrl.startsWith("http://")) sourceLeaderUrl = sourceLeaderUrl.substring(7);
|
|
||||||
Map<String, Object> props = ZkNodeProps.makeMap(
|
Map<String, Object> props = ZkNodeProps.makeMap(
|
||||||
QUEUE_OPERATION, CREATECOLLECTION,
|
QUEUE_OPERATION, CREATECOLLECTION,
|
||||||
"name", tempSourceCollectionName,
|
"name", tempSourceCollectionName,
|
||||||
REPLICATION_FACTOR, 1,
|
REPLICATION_FACTOR, 1,
|
||||||
NUM_SLICES, 1,
|
NUM_SLICES, 1,
|
||||||
|
COLL_CONF, configName,
|
||||||
CREATE_NODE_SET, sourceLeader.getNodeName());
|
CREATE_NODE_SET, sourceLeader.getNodeName());
|
||||||
log.info("Creating temporary collection: " + props);
|
log.info("Creating temporary collection: " + props);
|
||||||
createCollection(clusterState, new ZkNodeProps(props), results);
|
createCollection(clusterState, new ZkNodeProps(props), results);
|
||||||
|
@ -1194,6 +1194,23 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
||||||
Slice tempSourceSlice = clusterState.getCollection(tempSourceCollectionName).getSlices().iterator().next();
|
Slice tempSourceSlice = clusterState.getCollection(tempSourceCollectionName).getSlices().iterator().next();
|
||||||
Replica tempSourceLeader = clusterState.getLeader(tempSourceCollectionName, tempSourceSlice.getName());
|
Replica tempSourceLeader = clusterState.getLeader(tempSourceCollectionName, tempSourceSlice.getName());
|
||||||
|
|
||||||
|
String tempCollectionReplica1 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica1";
|
||||||
|
String coreNodeName = waitForCoreNodeName(clusterState.getCollection(tempSourceCollectionName),
|
||||||
|
zkStateReader.getZkClient().getBaseUrlForNodeName(sourceLeader.getNodeName()), tempCollectionReplica1);
|
||||||
|
// wait for the replicas to be seen as active on temp source leader
|
||||||
|
log.info("Asking source leader to wait for: " + tempCollectionReplica1 + " to be alive on: " + sourceLeader.getNodeName());
|
||||||
|
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
|
||||||
|
cmd.setCoreName(tempCollectionReplica1);
|
||||||
|
cmd.setNodeName(sourceLeader.getNodeName());
|
||||||
|
cmd.setCoreNodeName(coreNodeName);
|
||||||
|
cmd.setState(ZkStateReader.ACTIVE);
|
||||||
|
cmd.setCheckLive(true);
|
||||||
|
cmd.setOnlyIfLeader(true);
|
||||||
|
sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()));
|
||||||
|
|
||||||
|
collectShardResponses(results, true,
|
||||||
|
"MIGRATE failed to create temp collection leader or timed out waiting for it to come up");
|
||||||
|
|
||||||
log.info("Asking source leader to split index");
|
log.info("Asking source leader to split index");
|
||||||
params = new ModifiableSolrParams();
|
params = new ModifiableSolrParams();
|
||||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString());
|
params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString());
|
||||||
|
@ -1215,11 +1232,11 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
||||||
params.set(CoreAdminParams.SHARD, tempSourceSlice.getName());
|
params.set(CoreAdminParams.SHARD, tempSourceSlice.getName());
|
||||||
sendShardRequest(targetLeader.getNodeName(), params);
|
sendShardRequest(targetLeader.getNodeName(), params);
|
||||||
|
|
||||||
String coreNodeName = waitForCoreNodeName(clusterState.getCollection(tempSourceCollectionName),
|
coreNodeName = waitForCoreNodeName(clusterState.getCollection(tempSourceCollectionName),
|
||||||
zkStateReader.getZkClient().getBaseUrlForNodeName(targetLeader.getNodeName()), tempCollectionReplica2);
|
zkStateReader.getZkClient().getBaseUrlForNodeName(targetLeader.getNodeName()), tempCollectionReplica2);
|
||||||
// wait for the replicas to be seen as active on temp source leader
|
// wait for the replicas to be seen as active on temp source leader
|
||||||
log.info("Asking temp source leader to wait for: " + tempCollectionReplica2 + " to be alive on: " + targetLeader.getNodeName());
|
log.info("Asking temp source leader to wait for: " + tempCollectionReplica2 + " to be alive on: " + targetLeader.getNodeName());
|
||||||
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
|
cmd = new CoreAdminRequest.WaitForState();
|
||||||
cmd.setCoreName(tempSourceLeader.getStr("core"));
|
cmd.setCoreName(tempSourceLeader.getStr("core"));
|
||||||
cmd.setNodeName(targetLeader.getNodeName());
|
cmd.setNodeName(targetLeader.getNodeName());
|
||||||
cmd.setCoreNodeName(coreNodeName);
|
cmd.setCoreNodeName(coreNodeName);
|
||||||
|
|
|
@ -733,35 +733,6 @@ public final class ZkController {
|
||||||
return zkClient.exists(path, true);
|
return zkClient.exists(path, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns config value
|
|
||||||
*/
|
|
||||||
public String readConfigName(String collection) throws KeeperException,
|
|
||||||
InterruptedException {
|
|
||||||
|
|
||||||
String configName = null;
|
|
||||||
|
|
||||||
String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
|
|
||||||
if (log.isInfoEnabled()) {
|
|
||||||
log.info("Load collection config from:" + path);
|
|
||||||
}
|
|
||||||
byte[] data = zkClient.getData(path, null, null, true);
|
|
||||||
|
|
||||||
if(data != null) {
|
|
||||||
ZkNodeProps props = ZkNodeProps.load(data);
|
|
||||||
configName = props.getStr(CONFIGNAME_PROP);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (configName != null && !zkClient.exists(CONFIGS_ZKNODE + "/" + configName, true)) {
|
|
||||||
log.error("Specified config does not exist in ZooKeeper:" + configName);
|
|
||||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
|
|
||||||
"Specified config does not exist in ZooKeeper:" + configName);
|
|
||||||
}
|
|
||||||
|
|
||||||
return configName;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register shard with ZooKeeper.
|
* Register shard with ZooKeeper.
|
||||||
|
|
|
@ -676,7 +676,7 @@ public class CoreContainer {
|
||||||
String collection = cd.getCloudDescriptor().getCollectionName();
|
String collection = cd.getCloudDescriptor().getCollectionName();
|
||||||
zkSys.getZkController().createCollectionZkNode(cd.getCloudDescriptor());
|
zkSys.getZkController().createCollectionZkNode(cd.getCloudDescriptor());
|
||||||
|
|
||||||
String zkConfigName = zkSys.getZkController().readConfigName(collection);
|
String zkConfigName = zkSys.getZkController().getZkStateReader().readConfigName(collection);
|
||||||
if (zkConfigName == null) {
|
if (zkConfigName == null) {
|
||||||
log.error("Could not find config name for collection:" + collection);
|
log.error("Could not find config name for collection:" + collection);
|
||||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
|
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
|
||||||
|
|
|
@ -220,7 +220,7 @@ public class ZkContainer {
|
||||||
String collection = dcore.getCloudDescriptor().getCollectionName();
|
String collection = dcore.getCloudDescriptor().getCollectionName();
|
||||||
zkController.createCollectionZkNode(dcore.getCloudDescriptor());
|
zkController.createCollectionZkNode(dcore.getCloudDescriptor());
|
||||||
|
|
||||||
zkConfigName = zkController.readConfigName(collection);
|
zkConfigName = zkController.getZkStateReader().readConfigName(collection);
|
||||||
if (zkConfigName == null) {
|
if (zkConfigName == null) {
|
||||||
log.error("Could not find config name for collection:" + collection);
|
log.error("Could not find config name for collection:" + collection);
|
||||||
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
|
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
|
||||||
|
|
|
@ -208,7 +208,7 @@ public class QueryElevationComponent extends SearchComponent implements SolrCore
|
||||||
ZkController zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
|
ZkController zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
|
||||||
if (zkController != null) {
|
if (zkController != null) {
|
||||||
// TODO : shouldn't have to keep reading the config name when it has been read before
|
// TODO : shouldn't have to keep reading the config name when it has been read before
|
||||||
exists = zkController.configFileExists(zkController.readConfigName(core.getCoreDescriptor().getCloudDescriptor().getCollectionName()), f);
|
exists = zkController.configFileExists(zkController.getZkStateReader().readConfigName(core.getCoreDescriptor().getCloudDescriptor().getCollectionName()), f);
|
||||||
} else {
|
} else {
|
||||||
File fC = new File(core.getResourceLoader().getConfigDir(), f);
|
File fC = new File(core.getResourceLoader().getConfigDir(), f);
|
||||||
File fD = new File(core.getDataDir(), f);
|
File fD = new File(core.getDataDir(), f);
|
||||||
|
|
|
@ -188,7 +188,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
try {
|
try {
|
||||||
String configName = zkController.readConfigName(COLLECTION_NAME);
|
String configName = zkController.getZkStateReader().readConfigName(COLLECTION_NAME);
|
||||||
assertEquals(configName, actualConfigName);
|
assertEquals(configName, actualConfigName);
|
||||||
} finally {
|
} finally {
|
||||||
zkController.close();
|
zkController.close();
|
||||||
|
|
|
@ -75,6 +75,9 @@ public class ZkStateReader {
|
||||||
public static final String DOWN = "down";
|
public static final String DOWN = "down";
|
||||||
public static final String SYNC = "sync";
|
public static final String SYNC = "sync";
|
||||||
|
|
||||||
|
public static final String CONFIGS_ZKNODE = "/configs";
|
||||||
|
public final static String CONFIGNAME_PROP="configName";
|
||||||
|
|
||||||
private volatile ClusterState clusterState;
|
private volatile ClusterState clusterState;
|
||||||
|
|
||||||
private static final long SOLRCLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("solrcloud.update.delay", "5000"));
|
private static final long SOLRCLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("solrcloud.update.delay", "5000"));
|
||||||
|
@ -115,6 +118,35 @@ public class ZkStateReader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns config value
|
||||||
|
* @param collection
|
||||||
|
*/
|
||||||
|
public String readConfigName(String collection) throws KeeperException,
|
||||||
|
InterruptedException {
|
||||||
|
|
||||||
|
String configName = null;
|
||||||
|
|
||||||
|
String path = COLLECTIONS_ZKNODE + "/" + collection;
|
||||||
|
if (log.isInfoEnabled()) {
|
||||||
|
log.info("Load collection config from:" + path);
|
||||||
|
}
|
||||||
|
byte[] data = zkClient.getData(path, null, null, true);
|
||||||
|
|
||||||
|
if(data != null) {
|
||||||
|
ZkNodeProps props = ZkNodeProps.load(data);
|
||||||
|
configName = props.getStr(CONFIGNAME_PROP);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (configName != null && !zkClient.exists(CONFIGS_ZKNODE + "/" + configName, true)) {
|
||||||
|
log.error("Specified config does not exist in ZooKeeper:" + configName);
|
||||||
|
throw new ZooKeeperException(ErrorCode.SERVER_ERROR,
|
||||||
|
"Specified config does not exist in ZooKeeper:" + configName);
|
||||||
|
}
|
||||||
|
|
||||||
|
return configName;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static class ZKTF implements ThreadFactory {
|
private static class ZKTF implements ThreadFactory {
|
||||||
private static ThreadGroup tg = new ThreadGroup("ZkStateReader");
|
private static ThreadGroup tg = new ThreadGroup("ZkStateReader");
|
||||||
|
|
Loading…
Reference in New Issue