SOLR-8924: RollupStream breaks with null values in the group by buckets

This commit is contained in:
jbernste 2016-03-31 11:39:55 -04:00
parent 6e446c0b92
commit 0b2040d61c
2 changed files with 56 additions and 3 deletions

View File

@ -22,6 +22,7 @@ import org.apache.solr.client.solrj.io.Tuple;
public class Bucket implements Serializable {
private static final long serialVersionUID = 1;
private static final String NULL_VALUE = "NULL";
private String bucketKey;
@ -33,8 +34,14 @@ public class Bucket implements Serializable {
this.bucketKey = bucketKey;
}
public Object getBucketValue(Tuple tuple) {
return tuple.get(bucketKey);
public Object getBucketValue(Tuple tuple)
{
Object o = tuple.get(bucketKey);
if(o == null) {
return NULL_VALUE;
} else {
return o;
}
}
public String toString() {

View File

@ -1364,6 +1364,52 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
assertTrue(count.doubleValue() == 2);
//Test will null value in the grouping field
indexr(id, "12", "a_s", null, "a_i", "14", "a_f", "10");
commit();
paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc", "qt", "/export");
stream = new CloudSolrStream(zkHost, "collection1", paramsA);
Bucket[] buckets1 = {new Bucket("a_s")};
Metric[] metrics1 = {new SumMetric("a_i"),
new SumMetric("a_f"),
new MinMetric("a_i"),
new MinMetric("a_f"),
new MaxMetric("a_i"),
new MaxMetric("a_f"),
new MeanMetric("a_i"),
new MeanMetric("a_f"),
new CountMetric()};
rollupStream = new RollupStream(stream, buckets1, metrics1);
tuples = getTuples(rollupStream);
//Check that we've got the extra NULL bucket
assert(tuples.size() == 4);
tuple = tuples.get(0);
assert(tuple.getString("a_s").equals("NULL"));
sumi = tuple.getDouble("sum(a_i)");
sumf = tuple.getDouble("sum(a_f)");
mini = tuple.getDouble("min(a_i)");
minf = tuple.getDouble("min(a_f)");
maxi = tuple.getDouble("max(a_i)");
maxf = tuple.getDouble("max(a_f)");
avgi = tuple.getDouble("avg(a_i)");
avgf = tuple.getDouble("avg(a_f)");
count = tuple.getDouble("count(*)");
assertTrue(sumi.doubleValue() == 14.0D);
assertTrue(sumf.doubleValue() == 10.0D);
assertTrue(mini.doubleValue() == 14.0D);
assertTrue(minf.doubleValue() == 10.0D);
assertTrue(maxi.doubleValue() == 14.0D);
assertTrue(maxf.doubleValue() == 10.0D);
assertTrue(avgi.doubleValue() == 14.0D);
assertTrue(avgf.doubleValue() == 10.0D);
assertTrue(count.doubleValue() == 1);
del("*:*");
commit();
@ -1577,7 +1623,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
String zkHost = zkServer.getZkAddress();
streamFactory.withCollectionZkHost("collection1", zkHost);
Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
Map paramsA = mapParams("q", "blah", "fl", "id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),