diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java index 1f194e44f17..07e6cb16200 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.Random; import java.util.TreeSet; import java.util.concurrent.Callable; @@ -294,7 +295,7 @@ public class CloudSolrStream extends TupleStream implements Expressible { ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader(); ClusterState clusterState = zkStateReader.getClusterState(); - + Set liveNodes = clusterState.getLiveNodes(); //System.out.println("Connected to zk an got cluster state."); Collection slices = clusterState.getActiveSlices(this.collection); @@ -302,7 +303,7 @@ public class CloudSolrStream extends TupleStream implements Expressible { if(slices == null) { //Try case insensitive match for(String col : clusterState.getCollections()) { - if(col.equalsIgnoreCase(this.collection)) { + if(col.equalsIgnoreCase(collection)) { slices = clusterState.getActiveSlices(col); break; } @@ -319,6 +320,7 @@ public class CloudSolrStream extends TupleStream implements Expressible { Collection replicas = slice.getReplicas(); List shuffler = new ArrayList(); for(Replica replica : replicas) { + if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) shuffler.add(replica); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java index c75738fe02e..28b1c6e7f05 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Set; import java.util.Iterator; import java.util.List; import java.util.Locale; @@ -239,11 +240,13 @@ public class ParallelStream extends CloudSolrStream implements Expressible { ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader(); ClusterState clusterState = zkStateReader.getClusterState(); + Set liveNodes = clusterState.getLiveNodes(); Collection slices = clusterState.getActiveSlices(this.collection); List shuffler = new ArrayList(); for(Slice slice : slices) { Collection replicas = slice.getReplicas(); for (Replica replica : replicas) { + if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) shuffler.add(replica); } }