From 58c98f703b2faaf510bd390dc63698fd059de8b2 Mon Sep 17 00:00:00 2001 From: Noble Paul Date: Thu, 27 Feb 2014 06:50:22 +0000 Subject: [PATCH] SOLR-5609 use coreNodeName to compare replicas, CollectionsAPIDistributedZkTest.testCollectionsAPI() randomly switches to legacyCloud=false git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1572449 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/solr/cloud/Overseer.java | 3 +- .../cloud/OverseerCollectionProcessor.java | 51 +++++++++++++++++-- .../org/apache/solr/cloud/ZkController.java | 41 ++++++++------- .../solr/handler/admin/CoreAdminHandler.java | 9 ++++ .../CollectionsAPIDistributedZkTest.java | 15 +++++- 5 files changed, 91 insertions(+), 28 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 48348581a09..ff38e0a6b54 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -305,8 +305,7 @@ public class Overseer { makeMap( ZkStateReader.CORE_NAME_PROP, message.getStr(ZkStateReader.CORE_NAME_PROP), ZkStateReader.BASE_URL_PROP,message.getStr(ZkStateReader.BASE_URL_PROP), - ZkStateReader.STATE_PROP,message.getStr(ZkStateReader.STATE_PROP), - ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName)); + ZkStateReader.STATE_PROP,message.getStr(ZkStateReader.STATE_PROP))); sl.getReplicasMap().put(coreNodeName, replica); return clusterState; } diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index 069854919dc..f15a5b2bd95 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -74,6 +74,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashMap; import java.util.List; @@ -1624,6 +1625,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully createcollection: " + message.getStr("name")); log.info("going to create cores replicas shardNames {} , repFactor : {}", shardNames, repFactor); + Map coresToCreate = new LinkedHashMap(); for (int i = 1; i <= shardNames.size(); i++) { String sliceName = shardNames.get(i-1); for (int j = 1; j <= repFactor; j++) { @@ -1662,13 +1664,22 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { ShardRequest sreq = new ShardRequest(); params.set("qt", adminPath); sreq.purpose = 1; - String replica = zkStateReader.getBaseUrlForNodeName(nodeName); - sreq.shards = new String[] {replica}; + sreq.shards = new String[] {baseUrl}; sreq.actualShards = sreq.shards; sreq.params = params; - shardHandler.submit(sreq, replica, sreq.params); + if(isLegacyCloud) shardHandler.submit(sreq, sreq.shards[0], sreq.params); + else coresToCreate.put(coreName, sreq); + } + } + if(!isLegacyCloud) { + //wait for all replica entries to be created + Map replicas = lookupReplicas(collectionName, coresToCreate.keySet()); + for (Map.Entry e : coresToCreate.entrySet()) { + ShardRequest sreq = e.getValue(); + sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName()); + shardHandler.submit(sreq, sreq.shards[0], sreq.params); } } @@ -1690,6 +1701,37 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { } } + private Map lookupReplicas(String collectionName, Collection coreNames) throws InterruptedException { + Map result = new HashMap(); + long endTime = System.currentTimeMillis() +3000; + for(;;) { + DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName); + for (String coreName : coreNames) { + if(result.containsKey(coreName)) continue; + for (Slice slice : coll.getSlices()) { + for (Replica replica : slice.getReplicas()) { + if(coreName.equals(replica.getStr(ZkStateReader.CORE_NAME_PROP))) { + result.put(coreName,replica); + break; + } + } + } + } + + if(result.size() == coreNames.size()) { + return result; + } + if( System.currentTimeMillis() > endTime) { + //time up . throw exception and go out + throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to create replica entries in ZK"); + } + + Thread.sleep(100); + } + + } + + private void addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException { String collection = message.getStr(COLLECTION_PROP); String node = message.getStr("node"); @@ -1733,6 +1775,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { } coreName = collection + "_" + shard + "_replica" + replicaNum; } + ModifiableSolrParams params = new ModifiableSolrParams(); if(!Overseer.isLegacy(zkStateReader.getClusterProps())){ ZkNodeProps props = new ZkNodeProps( @@ -1743,6 +1786,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { ZkStateReader.STATE_PROP, ZkStateReader.DOWN, ZkStateReader.BASE_URL_PROP,zkStateReader.getBaseUrlForNodeName(node)); Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props)); + params.set(CoreAdminParams.CORE_NODE_NAME, lookupReplicas(collection, Collections.singletonList(coreName)).get(coreName).getName()); } @@ -1751,7 +1795,6 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { String dataDir = message.getStr(CoreAdminParams.DATA_DIR); String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR); - ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString()); params.set(CoreAdminParams.NAME, coreName); params.set(COLL_CONF, configName); diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index debf0c080b8..4a747779923 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1262,7 +1262,7 @@ public final class ZkController { String msgNodeName = getNodeName(); String msgCore = descriptor.getName(); - if (nodeName.equals(msgNodeName) && core.equals(msgCore)) { + if (msgNodeName.equals(nodeName) && core.equals(msgCore)) { descriptor.getCloudDescriptor() .setCoreNodeName(replica.getName()); return; @@ -1354,10 +1354,11 @@ public final class ZkController { String coreNodeName = getCoreNodeName(cd); - checkStateInZk(cd); // before becoming available, make sure we are not live and active // this also gets us our assigned shard id if it was not specified try { + checkStateInZk(cd); + CloudDescriptor cloudDesc = cd.getCloudDescriptor(); @@ -1385,28 +1386,26 @@ public final class ZkController { } - private void checkStateInZk(CoreDescriptor cd) { - if(!Overseer.isLegacy(zkStateReader.getClusterProps())){ - DocCollection coll = zkStateReader.getClusterState().getCollection(cd.getCollectionName()); + private void checkStateInZk(CoreDescriptor cd) throws InterruptedException { + if (!Overseer.isLegacy(zkStateReader.getClusterProps())) { CloudDescriptor cloudDesc = cd.getCloudDescriptor(); - if(cloudDesc.getShardId() == null) throw new RuntimeException("No shard id for :"+ cd.toString()); - Slice slice = coll.getSlice(cloudDesc.getShardId()); - if(slice == null) throw new RuntimeException("Invalid slice : "+cloudDesc.getShardId()); - Replica replica = null; - if(cloudDesc.getCoreNodeName() !=null){ - replica = slice.getReplica(cloudDesc.getCoreNodeName()); - } else { - for (Replica r : slice.getReplicas()) { - if(cd.getName().equals(r.get(ZkStateReader.CORE_NAME_PROP)) && getBaseUrl().equals(r.get(ZkStateReader.BASE_URL_PROP))){ - replica = r; - break; - } + String coreNodeName = cloudDesc.getCoreNodeName(); + assert coreNodeName != null; + if (cloudDesc.getShardId() == null) throw new SolrException(ErrorCode.SERVER_ERROR ,"No shard id for :" + cd); + long endTime = System.currentTimeMillis()+3000; + String errMessage= null; + for (; System.currentTimeMillis()