mirror of https://github.com/apache/lucene.git
SOLR-8461: CloudSolrStream and ParallelStream can choose replicas that are not active
This commit is contained in:
parent
83ebd1bb71
commit
8eb58cc000
|
@ -26,6 +26,7 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.Random;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -294,7 +295,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
|
|||
|
||||
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
|
||||
Set<String> liveNodes = clusterState.getLiveNodes();
|
||||
//System.out.println("Connected to zk an got cluster state.");
|
||||
|
||||
Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
|
||||
|
@ -302,7 +303,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
|
|||
if(slices == null) {
|
||||
//Try case insensitive match
|
||||
for(String col : clusterState.getCollections()) {
|
||||
if(col.equalsIgnoreCase(this.collection)) {
|
||||
if(col.equalsIgnoreCase(collection)) {
|
||||
slices = clusterState.getActiveSlices(col);
|
||||
break;
|
||||
}
|
||||
|
@ -319,6 +320,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
|
|||
Collection<Replica> replicas = slice.getReplicas();
|
||||
List<Replica> shuffler = new ArrayList();
|
||||
for(Replica replica : replicas) {
|
||||
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
|
||||
shuffler.add(replica);
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Set;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
@ -239,11 +240,13 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
|
|||
|
||||
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
Set<String> liveNodes = clusterState.getLiveNodes();
|
||||
Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
|
||||
List<Replica> shuffler = new ArrayList();
|
||||
for(Slice slice : slices) {
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
for (Replica replica : replicas) {
|
||||
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
|
||||
shuffler.add(replica);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue