SOLR-9141: Fix ClassCastException when using the /sql handler count() function with single-shard collections

This commit is contained in:
jdyer1 2016-05-26 13:39:37 -05:00
parent dc50b79a14
commit 4d4030350b
3 changed files with 29 additions and 24 deletions

View File

@ -236,6 +236,9 @@ Bug Fixes
* SOLR-9151: Fix SolrCLI so that bin/solr -e cloud example can be run from any CWD (janhoy) * SOLR-9151: Fix SolrCLI so that bin/solr -e cloud example can be run from any CWD (janhoy)
* SOLR-9141: Fix ClassCastException when using the /sql handler count() function with
single-shard collections (Minoru Osuka via James Dyer)
Optimizations Optimizations
---------------------- ----------------------
* SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation. * SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.

View File

@ -503,7 +503,7 @@ public class FacetStream extends TupleStream implements Expressible {
t.put(identifier, d); t.put(identifier, d);
++m; ++m;
} else { } else {
long l = (long)bucket.get("count"); long l = ((Number)bucket.get("count")).longValue();
t.put("count(*)", l); t.put("count(*)", l);
} }
} }

View File

@ -74,13 +74,18 @@ public class StreamingTest extends SolrCloudTestCase {
private static String zkHost; private static String zkHost;
private static int numShards;
private static int numWorkers;
@BeforeClass @BeforeClass
public static void configureCluster() throws Exception { public static void configureCluster() throws Exception {
configureCluster(2) numShards = random().nextInt(2) + 1; //1 - 3
numWorkers = numShards > 2 ? random().nextInt(numShards - 1) + 1 : numShards;
configureCluster(numShards)
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf")) .addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure(); .configure();
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient()); CollectionAdminRequest.createCollection(COLLECTION, "conf", numShards, 1).process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), false, true, TIMEOUT); AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), false, true, TIMEOUT);
zkHost = cluster.getZkServer().getZkAddress(); zkHost = cluster.getZkServer().getZkAddress();
@ -147,12 +152,11 @@ public class StreamingTest extends SolrCloudTestCase {
SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none"); SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA); CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING)); ParallelStream pstream = parallelStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
attachStreamFactory(pstream); attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream); List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 20); // Each tuple will be double counted. assert(tuples.size() == (10 * numWorkers)); // Each tuple will be double counted.
} }
@ -174,7 +178,7 @@ public class StreamingTest extends SolrCloudTestCase {
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f"); SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams); CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f")); UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, ustream, 2, new FieldComparator("a_f",ComparatorOrder.ASCENDING)); ParallelStream pstream = parallelStream(ustream, new FieldComparator("a_f", ComparatorOrder.ASCENDING));
attachStreamFactory(pstream); attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream); List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 5); assert(tuples.size() == 5);
@ -183,7 +187,7 @@ public class StreamingTest extends SolrCloudTestCase {
//Test the eofTuples //Test the eofTuples
Map<String,Tuple> eofTuples = pstream.getEofTuples(); Map<String,Tuple> eofTuples = pstream.getEofTuples();
assert(eofTuples.size() == 2); //There should be an EOF tuple for each worker. assert(eofTuples.size() == numWorkers); //There should be an EOF tuple for each worker.
} }
@ -253,7 +257,7 @@ public class StreamingTest extends SolrCloudTestCase {
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i"); SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams); CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING)); RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, rstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING)); ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
attachStreamFactory(pstream); attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream); List<Tuple> tuples = getTuples(pstream);
@ -405,9 +409,7 @@ public class StreamingTest extends SolrCloudTestCase {
ReducerStream rstream = new ReducerStream(stream, ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"), new FieldEqualitor("a_s"),
new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5)); new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
attachStreamFactory(pstream); attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream); List<Tuple> tuples = getTuples(pstream);
@ -433,9 +435,7 @@ public class StreamingTest extends SolrCloudTestCase {
rstream = new ReducerStream(stream, rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"), new FieldEqualitor("a_s"),
new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 3)); new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 3));
pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.DESCENDING));
pstream = new ParallelStream(zkHost, COLLECTION, rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
attachStreamFactory(pstream); attachStreamFactory(pstream);
tuples = getTuples(pstream); tuples = getTuples(pstream);
@ -1401,7 +1401,7 @@ public class StreamingTest extends SolrCloudTestCase {
new CountMetric()}; new CountMetric()};
RollupStream rollupStream = new RollupStream(stream, buckets, metrics); RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
ParallelStream parallelStream = new ParallelStream(zkHost, COLLECTION, rollupStream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING)); ParallelStream parallelStream = parallelStream(rollupStream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
attachStreamFactory(parallelStream); attachStreamFactory(parallelStream);
List<Tuple> tuples = getTuples(parallelStream); List<Tuple> tuples = getTuples(parallelStream);
@ -1501,9 +1501,7 @@ public class StreamingTest extends SolrCloudTestCase {
ReducerStream rstream = new ReducerStream(stream, ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"), new FieldEqualitor("a_s"),
new GroupOperation(new FieldComparator("a_s", ComparatorOrder.ASCENDING), 2)); new GroupOperation(new FieldComparator("a_s", ComparatorOrder.ASCENDING), 2));
ParallelStream pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
attachStreamFactory(pstream); attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream); List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 0); assert(tuples.size() == 0);
@ -1636,7 +1634,7 @@ public class StreamingTest extends SolrCloudTestCase {
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB); CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING)); MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING)); ParallelStream pstream = parallelStream(mstream, new FieldComparator("a_i", ComparatorOrder.ASCENDING));
attachStreamFactory(pstream); attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream); List<Tuple> tuples = getTuples(pstream);
@ -1651,7 +1649,7 @@ public class StreamingTest extends SolrCloudTestCase {
streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB); streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING)); mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
pstream = new ParallelStream(zkHost, COLLECTION, mstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING)); pstream = parallelStream(mstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
attachStreamFactory(pstream); attachStreamFactory(pstream);
tuples = getTuples(pstream); tuples = getTuples(pstream);
@ -1684,14 +1682,13 @@ public class StreamingTest extends SolrCloudTestCase {
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB); CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING)); MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING)); ParallelStream pstream = parallelStream(mstream, new FieldComparator("a_i", ComparatorOrder.ASCENDING));
attachStreamFactory(pstream); attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream); List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 9); assert(tuples.size() == 9);
Map<String, Tuple> eofTuples = pstream.getEofTuples(); Map<String, Tuple> eofTuples = pstream.getEofTuples();
assert(eofTuples.size() == 2); // There should be an EOF Tuple for each worker. assert(eofTuples.size() == numWorkers); // There should be an EOF Tuple for each worker.
} }
@ -1835,4 +1832,9 @@ public class StreamingTest extends SolrCloudTestCase {
return params; return params;
} }
private ParallelStream parallelStream(TupleStream stream, FieldComparator comparator) throws IOException {
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, stream, numWorkers, comparator);
return pstream;
}
} }