SOLR-7352: Synchronize CloudSolrStream EOF Tuple Map

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1671547 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Joel Bernstein 2015-04-06 15:12:44 +00:00
parent 19c70cab54
commit 69fdcfe306
2 changed files with 66 additions and 4 deletions

View File

@ -108,7 +108,7 @@ public class CloudSolrStream extends TupleStream {
public void open() throws IOException {
this.tuples = new TreeSet();
this.solrStreams = new ArrayList();
this.eofTuples = new HashMap();
this.eofTuples = Collections.synchronizedMap(new HashMap());
if(this.cache != null) {
this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
} else {

View File

@ -228,7 +228,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
String zkHost = zkServer.getZkAddress();
Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
RankStream rstream = new RankStream(stream, 3, new DescFieldComp("a_i"));
List<Tuple> tuples = getTuples(rstream);
@ -259,7 +259,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
String zkHost = zkServer.getZkAddress();
Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
RankStream rstream = new RankStream(stream, 11, new DescFieldComp("a_i"));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new DescFieldComp("a_i"));
@ -335,7 +335,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
Tuple t0 = tuples.get(0);
List<Map> maps0 = t0.getMaps();
assertMaps(maps0, 0, 2,1, 9);
assertMaps(maps0, 0, 2, 1, 9);
Tuple t1 = tuples.get(1);
List<Map> maps1 = t1.getMaps();
@ -351,6 +351,37 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
commit();
}
private void testZeroReducerStream() throws Exception {
//Gracefully handle zero results
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
commit();
String zkHost = zkServer.getZkAddress();
//Test with spaces in the parameter lists.
Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ReducerStream rstream = new ReducerStream(stream, new AscFieldComp("a_s"));
List<Tuple> tuples = getTuples(rstream);
assert(tuples.size() == 0);
del("*:*");
commit();
}
private void testParallelReducerStream() throws Exception {
@ -423,6 +454,35 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
commit();
}
private void testZeroParallelReducerStream() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
commit();
String zkHost = zkServer.getZkAddress();
Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ReducerStream rstream = new ReducerStream(stream, new AscFieldComp("a_s"));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new AscFieldComp("a_s"));
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 0);
del("*:*");
commit();
}
private void testTuple() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "5.1", "s_multi", "a", "s_multi", "b", "i_multi", "1", "i_multi", "2", "f_multi", "1.2", "f_multi", "1.3");
@ -698,11 +758,13 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
testRankStream();
testMergeStream();
testReducerStream();
testZeroReducerStream();
testParallelEOF();
testParallelUniqueStream();
testParallelRankStream();
testParallelMergeStream();
testParallelReducerStream();
testZeroParallelReducerStream();
}
protected Map mapParams(String... vals) {