SOLR-7225: ParallelStream chooses workers incorrectly

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1665565 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Joel Bernstein 2015-03-10 14:25:53 +00:00
parent ca8e4be1d3
commit ce47eee2f4
2 changed files with 49 additions and 14 deletions

View File

@ -129,30 +129,36 @@ public class ParallelStream extends CloudSolrStream {
ClusterState clusterState = zkStateReader.getClusterState();
Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
long time = System.currentTimeMillis();
int workerNum = 0;
List<Replica> shuffler = new ArrayList();
for(Slice slice : slices) {
HashMap params = new HashMap();
params.put("distrib","false"); // We are the aggregator.
params.put("numWorkers", workers);
params.put("workerID", workerNum);
params.put("stream", this.encoded);
params.put("qt","/stream");
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList();
for(Replica replica : replicas) {
for (Replica replica : replicas) {
shuffler.add(replica);
}
}
Collections.shuffle(shuffler, new Random(time));
Replica rep = shuffler.get(0);
if(workers > shuffler.size()) {
throw new IOException("Number of workers exceeds nodes in the worker collection");
}
Collections.shuffle(shuffler, new Random(time));
for(int w=0; w<workers; w++) {
HashMap params = new HashMap();
params.put("distrib","false"); // We are the aggregator.
params.put("numWorkers", workers);
params.put("workerID", w);
params.put("stream", this.encoded);
params.put("qt","/stream");
Replica rep = shuffler.get(w);
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
String url = zkProps.getCoreUrl();
SolrStream solrStream = new SolrStream(url, params);
solrStreams.add(solrStream);
++workerNum;
}
assert(solrStreams.size() == workers);
} catch (Exception e) {
throw new IOException(e);
}

View File

@ -907,6 +907,34 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
commit();
}
private void testParallelStreamSingleWorker() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
commit();
String zkHost = zkServer.getZkAddress();
Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc", "partitionKeys","a_s");
CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
Map paramsB = mapParams("q","id:(0 2)","fl","a_s","sort", "a_s asc", "partitionKeys","a_s");
CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
FilterStream fstream = new FilterStream(streamA, streamB, new AscFieldComp("a_s"));
ParallelStream pstream = new ParallelStream(zkHost,"collection1", fstream, 1, new AscFieldComp("a_s"));
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 2);
assertOrder(tuples, 0,2);
del("*:*");
commit();
}
private void testParallelHashJoinStream() {
@ -1119,6 +1147,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
testHashJoinStream();
testMergeJoinStream();
testMergeStream();
testParallelStreamSingleWorker();
testParallelStream();
testParallelRollupStream();
testParallelMetricStream();