diff --git a/solr/core/src/java/org/apache/solr/cloud/AssignShard.java b/solr/core/src/java/org/apache/solr/cloud/AssignShard.java index 8818a809a47..9c862456e53 100644 --- a/solr/core/src/java/org/apache/solr/cloud/AssignShard.java +++ b/solr/core/src/java/org/apache/solr/cloud/AssignShard.java @@ -41,6 +41,9 @@ public class AssignShard { String returnShardId = null; Map sliceMap = state.getSlicesMap(collection); + + // TODO: now that we create shards ahead of time, is this code needed? Esp since hash ranges aren't assigned when creating via this method? + if (sliceMap == null) { return "shard1"; } @@ -51,6 +54,8 @@ public class AssignShard { return "shard" + (shardIdNames.size() + 1); } + // TODO: don't need to sort to find shard with fewest replicas! + // else figure out which shard needs more replicas final Map map = new HashMap(); for (String shardId : shardIdNames) { diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 698af23792c..4d3c4a557e6 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -29,6 +29,7 @@ import org.apache.solr.common.cloud.ClosableThread; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.DocRouter; +import org.apache.solr.common.cloud.ImplicitDocRouter; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; @@ -285,10 +286,9 @@ public class Overseer { private ClusterState createCollection(ClusterState state, String collectionName, int numShards) { log.info("Create collection {} with numShards {}", collectionName, numShards); - - DocRouter hp = new DocRouter(); - List ranges = hp.partitionRange(numShards, hp.fullRange()); + DocRouter router = DocRouter.DEFAULT; + List ranges = router.partitionRange(numShards, router.fullRange()); Map newCollections = new LinkedHashMap(); @@ -306,7 +306,6 @@ public class Overseer { // TODO: fill in with collection properties read from the /collections/ node Map collectionProps = defaultCollectionProps(); - DocRouter router = DocRouter.DEFAULT; DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router); @@ -343,10 +342,12 @@ public class Overseer { DocRouter router; if (coll == null) { - // TODO: is updateSlice really called on a collection that doesn't exist? + // when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself + // without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router. slices = new HashMap(1); - props = defaultCollectionProps(); - router = DocRouter.DEFAULT; + props = new HashMap(1); + props.put(DocCollection.DOC_ROUTER, ImplicitDocRouter.NAME); + router = new ImplicitDocRouter(); } else { props = coll.getProperties(); router = coll.getRouter(); diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java index d9cf0166906..545b66c35df 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java @@ -293,7 +293,7 @@ public class CoreAdminHandler extends RequestHandlerBase { // TODO (cloud): get from the current core DocRouter.Range currentRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE); - DocRouter hp = new DocRouter(); + DocRouter hp = DocRouter.DEFAULT; // TODO: get actual doc router for collection if available ranges = hp.partitionRange(partitions, currentRange); if (pathsArr == null) { diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 90ddc1ea27e..85c14362780 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -177,115 +177,43 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } - private List setupRequest(int hash) { - List nodes = null; - // if we are in zk mode... - if (zkEnabled) { - // set num nodes - numNodes = zkController.getClusterState().getLiveNodes().size(); - - String shardId = getShard(hash, collection, zkController.getClusterState()); // get the right shard based on the hash... - - try { - ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps( - collection, shardId)); - - String leaderNodeName = leaderProps.getCoreNodeName(); - String coreName = req.getCore().getName(); - String coreNodeName = zkController.getNodeName() + "_" + coreName; - isLeader = coreNodeName.equals(leaderNodeName); - - DistribPhase phase = - DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); - - doDefensiveChecks(shardId, phase); - - - if (DistribPhase.FROMLEADER == phase) { - // we are coming from the leader, just go local - add no urls - forwardToLeader = false; - } else if (isLeader) { - // that means I want to forward onto my replicas... - // so get the replicas... - forwardToLeader = false; - List replicaProps = zkController.getZkStateReader() - .getReplicaProps(collection, shardId, zkController.getNodeName(), - coreName, null, ZkStateReader.DOWN); - if (replicaProps != null) { - nodes = new ArrayList(replicaProps.size()); - // check for test param that lets us miss replicas - String[] skipList = req.getParams().getParams("test.distrib.skip.servers"); - Set skipListSet = null; - if (skipList != null) { - skipListSet = new HashSet(skipList.length); - skipListSet.addAll(Arrays.asList(skipList)); - } - - for (ZkCoreNodeProps props : replicaProps) { - if (skipList != null) { - if (!skipListSet.contains(props.getCoreUrl())) { - nodes.add(new StdNode(props)); - } - } else { - nodes.add(new StdNode(props)); - } - } - } - - } else { - // I need to forward onto the leader... - nodes = new ArrayList(1); - nodes.add(new RetryNode(leaderProps, zkController.getZkStateReader(), collection, shardId)); - forwardToLeader = true; - } - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", - e); - } - } - - return nodes; - } - - - // trying to incrementally get things to pass - private List setupRequest2(int hash, String id, SolrInputDocument doc) { + private List setupRequest(String id, SolrInputDocument doc) { List nodes = null; // if we are in zk mode... if (zkEnabled) { -////// + String coreName = req.getCore().getName(); + String coreNodeName = zkController.getNodeName() + "_" + coreName; + ClusterState cstate = zkController.getClusterState(); numNodes = cstate.getLiveNodes().size(); DocCollection coll = cstate.getCollection(collection); Slice slice = coll.getRouter().getTargetShard(id, doc, req.getParams(), coll); - Replica leader = slice.getLeader(); + if (slice == null) { + // No slice found. Most strict routers will have already thrown an exception, so a null return is + // a signal to use the slice of this core. + // TODO: what if this core is not in the targeted collection? + String shardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId(); + slice = coll.getSlice(shardId); + if (slice == null) { + throw new SolrException(ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll); + } + } - -////// - - // set num nodes - numNodes = cstate.getLiveNodes().size(); - - String shardId = getShard(hash, collection, zkController.getClusterState()); // get the right shard based on the hash... - -if (shardId != slice.getName()) { - System.out.println("######################## shardId="+shardId + " slice="+slice + " cstate=" + cstate); -} + String shardId = slice.getName(); try { + // Not equivalent to getLeaderProps, which does retries to find a leader. + // Replica leader = slice.getLeader(); + ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps( collection, shardId)); String leaderNodeName = leaderProps.getCoreNodeName(); - String coreName = req.getCore().getName(); - String coreNodeName = zkController.getNodeName() + "_" + coreName; isLeader = coreNodeName.equals(leaderNodeName); DistribPhase phase = @@ -343,80 +271,6 @@ if (shardId != slice.getName()) { } - // use old code for now - private List setupRequest(String id, SolrInputDocument doc) { -// return setupRequest2(DocRouter.DEFAULT.shardHash(id, null, null), id, doc); - return setupRequest(DocRouter.DEFAULT.shardHash(id, null, null)); - } - - - private List setupRequestX(String id, SolrInputDocument doc) { - List nodes = null; - - // if we are in zk mode... - if (zkEnabled) { - // set num nodes - ClusterState cstate = zkController.getClusterState(); - numNodes = cstate.getLiveNodes().size(); - DocCollection coll = cstate.getCollection(collection); - Slice slice = coll.getRouter().getTargetShard(id, doc, req.getParams(), coll); - - Replica leader = slice.getLeader(); - - String coreName = req.getCore().getName(); - String coreNodeName = zkController.getNodeName() + "_" + coreName; - isLeader = coreNodeName.equals(leader.getName()); // is this me? - - DistribPhase phase = - DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); - - doDefensiveChecks(slice.getName(), phase); - - - if (DistribPhase.FROMLEADER == phase) { - // we are coming from the leader, just go local - add no urls - forwardToLeader = false; - } else if (isLeader) { - // that means I want to forward onto my replicas... - // so get the replicas... - forwardToLeader = false; - List replicaProps = zkController.getZkStateReader() - .getReplicaProps(collection, slice.getName(), zkController.getNodeName(), - coreName, null, ZkStateReader.DOWN); - if (replicaProps != null) { - nodes = new ArrayList(replicaProps.size()); - // check for test param that lets us miss replicas - String[] skipList = req.getParams().getParams("test.distrib.skip.servers"); - Set skipListSet = null; - if (skipList != null) { - skipListSet = new HashSet(skipList.length); - skipListSet.addAll(Arrays.asList(skipList)); - } - - for (ZkCoreNodeProps props : replicaProps) { - if (skipList != null) { - if (!skipListSet.contains(props.getCoreUrl())) { - nodes.add(new StdNode(props)); - } - } else { - nodes.add(new StdNode(props)); - } - } - } - - } else { - // I need to forward onto the leader... - nodes = new ArrayList(1); - nodes.add(new RetryNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(), collection, leader.getName())); - forwardToLeader = true; - } - - } - - return nodes; - } - - private void doDefensiveChecks(String shardId, DistribPhase phase) { String from = req.getParams().get("distrib.from"); boolean logReplay = req.getParams().getBool(LOG_REPLAY, false); diff --git a/solr/core/src/test/org/apache/solr/cloud/TestHashPartitioner.java b/solr/core/src/test/org/apache/solr/cloud/TestHashPartitioner.java index 8f0e6b7e48d..7ffd15f5c7a 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestHashPartitioner.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestHashPartitioner.java @@ -27,7 +27,7 @@ import org.apache.solr.common.cloud.DocRouter.Range; public class TestHashPartitioner extends SolrTestCaseJ4 { public void testMapHashes() throws Exception { - DocRouter hp = new DocRouter(); + DocRouter hp = DocRouter.DEFAULT; List ranges; // make sure the partitioner uses the "natural" boundaries and doesn't suffer from an off-by-one diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java index bebcd432016..f8c6b69465c 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java @@ -51,9 +51,6 @@ public class ClusterState implements JSONWriter.Writable { private final Set liveNodes; private final Map rangeInfos = new HashMap(); - private final Map> leaders = new HashMap>(); - - /** * Use this constr when ClusterState is meant for publication. @@ -80,9 +77,9 @@ public class ClusterState implements JSONWriter.Writable { /** - * Get properties of a shard/slice leader for specific collection. + * Get properties of a shard/slice leader for specific collection, or null if one currently doesn't exist. */ - public ZkNodeProps getLeader(String collection, String sliceName) { + public Replica getLeader(String collection, String sliceName) { DocCollection coll = collectionStates.get(collection); if (coll == null) return null; Slice slice = coll.getSlice(sliceName); @@ -283,7 +280,7 @@ public class ClusterState implements JSONWriter.Writable { private static DocCollection collectionFromObjects(String name, Map objs) { Map props = (Map)objs.get(DocCollection.PROPERTIES); if (props == null) props = Collections.emptyMap(); - DocRouter router = getRouter(props.get(DocCollection.DOC_ROUTER)); + DocRouter router = DocRouter.getDocRouter(props.get(DocCollection.DOC_ROUTER)); Map slices = makeSlices(objs); return new DocCollection(name, slices, props, router); } @@ -306,19 +303,6 @@ public class ClusterState implements JSONWriter.Writable { return result; } - - private static DocRouter getRouter(Object routerSpec) { - if (routerSpec == null) return new PlainIdRouter(); // back compat with 4.0 - - if (DocRouter.DEFAULT_NAME.equals(routerSpec)) { - return DocRouter.DEFAULT; - } - - // TODO: how to instantiate custom routers? - - throw new SolrException(ErrorCode.SERVER_ERROR, "Unknown document router '"+ routerSpec + "'"); - } - @Override public void write(JSONWriter jsonWriter) { jsonWriter.write(collectionStates); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java index 884bce9ae98..96ae4cb0252 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java @@ -25,15 +25,37 @@ import org.apache.solr.common.util.Hash; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Class to partition int range into n ranges. * @lucene.experimental */ -public class DocRouter { - public static final String DEFAULT_NAME = "compositeId"; - public static final DocRouter DEFAULT = new CompositeIdRouter(); +public abstract class DocRouter { + public static final String DEFAULT_NAME = CompositeIdRouter.NAME; + public static final DocRouter DEFAULT = new CompositeIdRouter(); + + public static DocRouter getDocRouter(Object routerSpec) { + DocRouter router = routerMap.get(routerSpec); + if (router != null) return router; + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown document router '"+ routerSpec + "'"); + } + + // currently just an implementation detail... + private final static Map routerMap; + static { + routerMap = new HashMap(); + PlainIdRouter plain = new PlainIdRouter(); + // instead of doing back compat this way, we could always convert the clusterstate on first read to "plain" if it doesn't have any properties. + routerMap.put(null, plain); // back compat with 4.0 + routerMap.put(PlainIdRouter.NAME, plain); + routerMap.put(CompositeIdRouter.NAME, DEFAULT_NAME.equals(CompositeIdRouter.NAME) ? DEFAULT : new CompositeIdRouter()); + routerMap.put(ImplicitDocRouter.NAME, new ImplicitDocRouter()); + // NOTE: careful that the map keys (the static .NAME members) are filled in by making them final + } + // Hash ranges can't currently "wrap" - i.e. max must be greater or equal to min. // TODO: ranges may not be all contiguous in the future (either that or we will @@ -122,47 +144,57 @@ public class DocRouter { } - public int shardHash(String id, SolrInputDocument sdoc, SolrParams params) { - return Hash.murmurhash3_x86_32(id, 0, id.length(), 0); - } + public abstract Slice getTargetShard(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection); - public String getId(SolrInputDocument sdoc, SolrParams params) { - Object idObj = sdoc.getFieldValue("id"); // blech - String id = idObj != null ? idObj.toString() : "null"; // should only happen on client side - return id; - } + /* + List shardQuery(String id, SolrParams params, ClusterState state) + List shardQuery(SolrParams params, ClusterState state) + */ + + + +} + +abstract class HashBasedRouter extends DocRouter { + + @Override public Slice getTargetShard(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) { if (id == null) id = getId(sdoc, params); int hash = shardHash(id, sdoc, params); return hashToSlice(hash, collection); } + protected int shardHash(String id, SolrInputDocument sdoc, SolrParams params) { + return Hash.murmurhash3_x86_32(id, 0, id.length(), 0); + } + + protected String getId(SolrInputDocument sdoc, SolrParams params) { + Object idObj = sdoc.getFieldValue("id"); // blech + String id = idObj != null ? idObj.toString() : "null"; // should only happen on client side + return id; + } + protected Slice hashToSlice(int hash, DocCollection collection) { for (Slice slice : collection.getSlices()) { DocRouter.Range range = slice.getRange(); if (range != null && range.includes(hash)) return slice; } - // return null or throw exception? throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No slice servicing hash code " + Integer.toHexString(hash) + " in " + collection); } - - /* - List shardQuery(String id, SolrParams params, ClusterState state) - List shardQuery(SolrParams params, ClusterState state) - */ } - -class PlainIdRouter extends DocRouter { - +class PlainIdRouter extends HashBasedRouter { + public static final String NAME = "plain"; } // // user!uniqueid // user,4!uniqueid // -class CompositeIdRouter extends DocRouter { +class CompositeIdRouter extends HashBasedRouter { + public static final String NAME = "compositeId"; + private int separator = '!'; private int bits = 16; private int mask1 = 0xffff0000; @@ -185,7 +217,7 @@ class CompositeIdRouter extends DocRouter { } @Override - public int shardHash(String id, SolrInputDocument doc, SolrParams params) { + protected int shardHash(String id, SolrInputDocument doc, SolrParams params) { int idx = id.indexOf(separator); if (idx < 0) { return Hash.murmurhash3_x86_32(id, 0, id.length(), 0); @@ -212,4 +244,4 @@ class CompositeIdRouter extends DocRouter { return (hash1 & m1) | (hash2 & m2); } -} \ No newline at end of file +} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java new file mode 100644 index 00000000000..040505c1e5e --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java @@ -0,0 +1,53 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.SolrParams; + +/** This document router is for custom sharding + */ +public class ImplicitDocRouter extends DocRouter { + public static final String NAME = "implicit"; + + @Override + public Slice getTargetShard(String id, SolrInputDocument sdoc, SolrParams params, DocCollection collection) { + String shard = null; + if (sdoc != null) { + Object o = sdoc.getFieldValue("_shard_"); + if (o != null) { + shard = o.toString(); + } + } + + if (shard == null) { + shard = params.get("_shard_"); + } + + if (shard != null) { + Slice slice = collection.getSlice(shard); + if (slice == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No _shard_=" + shard + " in " + collection); + } + } + + return null; // no shard specified... use default. + } + +} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java index 6f8a0b22713..2a0b59ba656 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java @@ -24,11 +24,12 @@ import java.util.Map; public class Replica extends ZkNodeProps { private final String name; + private final String nodeName; public Replica(String name, Map propMap) { super(propMap); this.name = name; - String nodeName = (String)propMap.get(ZkStateReader.NODE_NAME_PROP); + nodeName = (String)propMap.get(ZkStateReader.NODE_NAME_PROP); assert nodeName == null || name.startsWith(nodeName); } @@ -36,10 +37,13 @@ public class Replica extends ZkNodeProps { return name; } + /** The name of the node this replica resides on */ + public String getNodeName() { + return nodeName; + } + @Override public String toString() { return name + ':' + JSONUtil.toJSON(propMap, -1); // small enough, keep it on one line (i.e. no indent) } - - // TODO: should we have a pointer back to the slice the replica belongs to? } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java index 7577a601396..60b13652ad4 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java @@ -53,11 +53,17 @@ public class Slice extends ZkNodeProps { if (rangeObj instanceof DocRouter.Range) { tmpRange = (DocRouter.Range)rangeObj; } else if (rangeObj != null) { - DocRouter hp = new DocRouter(); - tmpRange = hp.fromString(rangeObj.toString()); + // Doesn't support custom implementations of Range, but currently not needed. + tmpRange = DocRouter.DEFAULT.fromString(rangeObj.toString()); } range = tmpRange; + /** debugging. this isn't an error condition for custom sharding. + if (range == null) { + System.out.println("###### NO RANGE for " + name + " props=" + props); + } + **/ + replicationFactor = null; // future // add the replicas *after* the other properties (for aesthetics, so it's easy to find slice properties in the JSON output) diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 2b323da49fe..8b2e88e02a9 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -397,19 +397,22 @@ public class ZkStateReader { } /** - * Get shard leader properties. + * Get shard leader properties, with retry if none exist. */ - public ZkNodeProps getLeaderProps(String collection, String shard) throws InterruptedException { + public Replica getLeaderProps(String collection, String shard) throws InterruptedException { return getLeaderProps(collection, shard, 1000); } - - public ZkNodeProps getLeaderProps(String collection, String shard, int timeout) throws InterruptedException { + + /** + * Get shard leader properties, with retry if none exist. + */ + public Replica getLeaderProps(String collection, String shard, int timeout) throws InterruptedException { long timeoutAt = System.currentTimeMillis() + timeout; while (System.currentTimeMillis() < timeoutAt) { if (clusterState != null) { - final ZkNodeProps nodeProps = clusterState.getLeader(collection, shard); - if (nodeProps != null && getClusterState().liveNodesContain((String) nodeProps.get(ZkStateReader.NODE_NAME_PROP))) { - return nodeProps; + Replica replica = clusterState.getLeader(collection, shard); + if (replica != null && getClusterState().liveNodesContain(replica.getNodeName())) { + return replica; } } Thread.sleep(50);