mirror of https://github.com/apache/lucene.git
SOLR-5609 use coreNodeName to compare replicas, CollectionsAPIDistributedZkTest.testCollectionsAPI() randomly switches to legacyCloud=false
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1572449 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d98ee29e8e
commit
58c98f703b
|
@ -305,8 +305,7 @@ public class Overseer {
|
|||
makeMap(
|
||||
ZkStateReader.CORE_NAME_PROP, message.getStr(ZkStateReader.CORE_NAME_PROP),
|
||||
ZkStateReader.BASE_URL_PROP,message.getStr(ZkStateReader.BASE_URL_PROP),
|
||||
ZkStateReader.STATE_PROP,message.getStr(ZkStateReader.STATE_PROP),
|
||||
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName));
|
||||
ZkStateReader.STATE_PROP,message.getStr(ZkStateReader.STATE_PROP)));
|
||||
sl.getReplicasMap().put(coreNodeName, replica);
|
||||
return clusterState;
|
||||
}
|
||||
|
|
|
@ -74,6 +74,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
@ -1624,6 +1625,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully createcollection: " + message.getStr("name"));
|
||||
|
||||
log.info("going to create cores replicas shardNames {} , repFactor : {}", shardNames, repFactor);
|
||||
Map<String ,ShardRequest> coresToCreate = new LinkedHashMap<String, ShardRequest>();
|
||||
for (int i = 1; i <= shardNames.size(); i++) {
|
||||
String sliceName = shardNames.get(i-1);
|
||||
for (int j = 1; j <= repFactor; j++) {
|
||||
|
@ -1662,13 +1664,22 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
ShardRequest sreq = new ShardRequest();
|
||||
params.set("qt", adminPath);
|
||||
sreq.purpose = 1;
|
||||
String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
|
||||
sreq.shards = new String[] {replica};
|
||||
sreq.shards = new String[] {baseUrl};
|
||||
sreq.actualShards = sreq.shards;
|
||||
sreq.params = params;
|
||||
|
||||
shardHandler.submit(sreq, replica, sreq.params);
|
||||
if(isLegacyCloud) shardHandler.submit(sreq, sreq.shards[0], sreq.params);
|
||||
else coresToCreate.put(coreName, sreq);
|
||||
}
|
||||
}
|
||||
|
||||
if(!isLegacyCloud) {
|
||||
//wait for all replica entries to be created
|
||||
Map<String, Replica> replicas = lookupReplicas(collectionName, coresToCreate.keySet());
|
||||
for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
|
||||
ShardRequest sreq = e.getValue();
|
||||
sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
|
||||
shardHandler.submit(sreq, sreq.shards[0], sreq.params);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1690,6 +1701,37 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
}
|
||||
}
|
||||
|
||||
private Map<String, Replica> lookupReplicas(String collectionName, Collection<String> coreNames) throws InterruptedException {
|
||||
Map<String, Replica> result = new HashMap<String, Replica>();
|
||||
long endTime = System.currentTimeMillis() +3000;
|
||||
for(;;) {
|
||||
DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName);
|
||||
for (String coreName : coreNames) {
|
||||
if(result.containsKey(coreName)) continue;
|
||||
for (Slice slice : coll.getSlices()) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
if(coreName.equals(replica.getStr(ZkStateReader.CORE_NAME_PROP))) {
|
||||
result.put(coreName,replica);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(result.size() == coreNames.size()) {
|
||||
return result;
|
||||
}
|
||||
if( System.currentTimeMillis() > endTime) {
|
||||
//time up . throw exception and go out
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to create replica entries in ZK");
|
||||
}
|
||||
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private void addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
|
||||
String collection = message.getStr(COLLECTION_PROP);
|
||||
String node = message.getStr("node");
|
||||
|
@ -1733,6 +1775,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
}
|
||||
coreName = collection + "_" + shard + "_replica" + replicaNum;
|
||||
}
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
|
||||
if(!Overseer.isLegacy(zkStateReader.getClusterProps())){
|
||||
ZkNodeProps props = new ZkNodeProps(
|
||||
|
@ -1743,6 +1786,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
|
||||
ZkStateReader.BASE_URL_PROP,zkStateReader.getBaseUrlForNodeName(node));
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
|
||||
params.set(CoreAdminParams.CORE_NODE_NAME, lookupReplicas(collection, Collections.singletonList(coreName)).get(coreName).getName());
|
||||
}
|
||||
|
||||
|
||||
|
@ -1751,7 +1795,6 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
String dataDir = message.getStr(CoreAdminParams.DATA_DIR);
|
||||
String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR);
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
|
||||
params.set(CoreAdminParams.NAME, coreName);
|
||||
params.set(COLL_CONF, configName);
|
||||
|
|
|
@ -1262,7 +1262,7 @@ public final class ZkController {
|
|||
String msgNodeName = getNodeName();
|
||||
String msgCore = descriptor.getName();
|
||||
|
||||
if (nodeName.equals(msgNodeName) && core.equals(msgCore)) {
|
||||
if (msgNodeName.equals(nodeName) && core.equals(msgCore)) {
|
||||
descriptor.getCloudDescriptor()
|
||||
.setCoreNodeName(replica.getName());
|
||||
return;
|
||||
|
@ -1354,10 +1354,11 @@ public final class ZkController {
|
|||
|
||||
String coreNodeName = getCoreNodeName(cd);
|
||||
|
||||
checkStateInZk(cd);
|
||||
// before becoming available, make sure we are not live and active
|
||||
// this also gets us our assigned shard id if it was not specified
|
||||
try {
|
||||
checkStateInZk(cd);
|
||||
|
||||
CloudDescriptor cloudDesc = cd.getCloudDescriptor();
|
||||
|
||||
|
||||
|
@ -1385,28 +1386,26 @@ public final class ZkController {
|
|||
|
||||
}
|
||||
|
||||
private void checkStateInZk(CoreDescriptor cd) {
|
||||
if(!Overseer.isLegacy(zkStateReader.getClusterProps())){
|
||||
DocCollection coll = zkStateReader.getClusterState().getCollection(cd.getCollectionName());
|
||||
private void checkStateInZk(CoreDescriptor cd) throws InterruptedException {
|
||||
if (!Overseer.isLegacy(zkStateReader.getClusterProps())) {
|
||||
CloudDescriptor cloudDesc = cd.getCloudDescriptor();
|
||||
if(cloudDesc.getShardId() == null) throw new RuntimeException("No shard id for :"+ cd.toString());
|
||||
Slice slice = coll.getSlice(cloudDesc.getShardId());
|
||||
if(slice == null) throw new RuntimeException("Invalid slice : "+cloudDesc.getShardId());
|
||||
Replica replica = null;
|
||||
if(cloudDesc.getCoreNodeName() !=null){
|
||||
replica = slice.getReplica(cloudDesc.getCoreNodeName());
|
||||
} else {
|
||||
for (Replica r : slice.getReplicas()) {
|
||||
if(cd.getName().equals(r.get(ZkStateReader.CORE_NAME_PROP)) && getBaseUrl().equals(r.get(ZkStateReader.BASE_URL_PROP))){
|
||||
replica = r;
|
||||
break;
|
||||
String coreNodeName = cloudDesc.getCoreNodeName();
|
||||
assert coreNodeName != null;
|
||||
if (cloudDesc.getShardId() == null) throw new SolrException(ErrorCode.SERVER_ERROR ,"No shard id for :" + cd);
|
||||
long endTime = System.currentTimeMillis()+3000;
|
||||
String errMessage= null;
|
||||
for (; System.currentTimeMillis()<endTime; ) {
|
||||
Thread.sleep(100);
|
||||
errMessage = null;
|
||||
Slice slice = zkStateReader.getClusterState().getSlice(cd.getCollectionName(), cloudDesc.getShardId());
|
||||
if (slice == null) {
|
||||
errMessage = "Invalid slice : " + cloudDesc.getShardId();
|
||||
continue;
|
||||
}
|
||||
if (slice.getReplica(coreNodeName) != null) return;
|
||||
}
|
||||
}
|
||||
if(replica == null){
|
||||
throw new RuntimeException(" No such replica in clusterstate "+cd.toString());
|
||||
}
|
||||
|
||||
if(errMessage == null) errMessage = " no_such_replica in clusterstate ,replicaName : " + coreNodeName;
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR,errMessage + "state : "+ zkStateReader.getClusterState().getCollection(cd.getCollectionName()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.lucene.search.MatchAllDocsQuery;
|
|||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.solr.cloud.CloudDescriptor;
|
||||
import org.apache.solr.cloud.Overseer;
|
||||
import org.apache.solr.cloud.SyncStrategy;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
|
@ -489,6 +490,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
protected void handleCreateAction(SolrQueryRequest req, SolrQueryResponse rsp) throws SolrException {
|
||||
|
||||
SolrParams params = req.getParams();
|
||||
log.info("core create command {}", params);
|
||||
CoreDescriptor dcore = buildCoreDescriptor(params, coreContainer);
|
||||
|
||||
if (coreContainer.getAllCoreNames().contains(dcore.getName())) {
|
||||
|
@ -500,6 +502,13 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
// TODO this should be moved into CoreContainer, really...
|
||||
try {
|
||||
if (coreContainer.getZkController() != null) {
|
||||
if(!Overseer.isLegacy(coreContainer.getZkController() .getZkStateReader().getClusterProps())){
|
||||
if(dcore.getCloudDescriptor().getCoreNodeName() ==null) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR,
|
||||
"non legacy mode coreNodeName missing "+ params);
|
||||
|
||||
}
|
||||
}
|
||||
coreContainer.preRegisterInZk(dcore);
|
||||
}
|
||||
|
||||
|
|
|
@ -631,6 +631,15 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
|
|||
|
||||
private void testCollectionsAPI() throws Exception {
|
||||
|
||||
boolean disableLegacy = random().nextBoolean();
|
||||
CloudSolrServer client1 = null;
|
||||
|
||||
if(disableLegacy) {
|
||||
log.info("legacyCloud=false");
|
||||
client1 = createCloudClient(null);
|
||||
setClusterProp(client1, ZkStateReader.LEGACY_CLOUD, "false");
|
||||
}
|
||||
|
||||
// TODO: fragile - because we dont pass collection.confName, it will only
|
||||
// find a default if a conf set with a name matching the collection name is found, or
|
||||
// if there is only one conf set. That and the fact that other tests run first in this
|
||||
|
@ -907,6 +916,10 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
|
|||
checkForCollection(collectionInfos.keySet().iterator().next(), collectionInfos.entrySet().iterator().next().getValue(), createNodeList);
|
||||
|
||||
checkNoTwoShardsUseTheSameIndexDir();
|
||||
if(disableLegacy) {
|
||||
setClusterProp(client1, ZkStateReader.LEGACY_CLOUD, null);
|
||||
client1.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void testCollectionsAPIAddRemoveStress() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue