diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/Bucket.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/Bucket.java index 12ee2815e5d..8e2293c1801 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/Bucket.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/Bucket.java @@ -22,6 +22,7 @@ import org.apache.solr.client.solrj.io.Tuple; public class Bucket implements Serializable { private static final long serialVersionUID = 1; + private static final String NULL_VALUE = "NULL"; private String bucketKey; @@ -33,8 +34,14 @@ public class Bucket implements Serializable { this.bucketKey = bucketKey; } - public Object getBucketValue(Tuple tuple) { - return tuple.get(bucketKey); + public Object getBucketValue(Tuple tuple) + { + Object o = tuple.get(bucketKey); + if(o == null) { + return NULL_VALUE; + } else { + return o; + } } public String toString() { diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java index 3091a19083f..61253e1d9ee 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java @@ -1364,6 +1364,52 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { assertTrue(count.doubleValue() == 2); + //Test will null value in the grouping field + indexr(id, "12", "a_s", null, "a_i", "14", "a_f", "10"); + commit(); + + paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc", "qt", "/export"); + stream = new CloudSolrStream(zkHost, "collection1", paramsA); + + Bucket[] buckets1 = {new Bucket("a_s")}; + + Metric[] metrics1 = {new SumMetric("a_i"), + new SumMetric("a_f"), + new MinMetric("a_i"), + new MinMetric("a_f"), + new MaxMetric("a_i"), + new MaxMetric("a_f"), + new MeanMetric("a_i"), + new MeanMetric("a_f"), + new CountMetric()}; + + rollupStream = new RollupStream(stream, buckets1, metrics1); + tuples = getTuples(rollupStream); + //Check that we've got the extra NULL bucket + assert(tuples.size() == 4); + tuple = tuples.get(0); + assert(tuple.getString("a_s").equals("NULL")); + + sumi = tuple.getDouble("sum(a_i)"); + sumf = tuple.getDouble("sum(a_f)"); + mini = tuple.getDouble("min(a_i)"); + minf = tuple.getDouble("min(a_f)"); + maxi = tuple.getDouble("max(a_i)"); + maxf = tuple.getDouble("max(a_f)"); + avgi = tuple.getDouble("avg(a_i)"); + avgf = tuple.getDouble("avg(a_f)"); + count = tuple.getDouble("count(*)"); + + assertTrue(sumi.doubleValue() == 14.0D); + assertTrue(sumf.doubleValue() == 10.0D); + assertTrue(mini.doubleValue() == 14.0D); + assertTrue(minf.doubleValue() == 10.0D); + assertTrue(maxi.doubleValue() == 14.0D); + assertTrue(maxf.doubleValue() == 10.0D); + assertTrue(avgi.doubleValue() == 14.0D); + assertTrue(avgf.doubleValue() == 10.0D); + assertTrue(count.doubleValue() == 1); + del("*:*"); commit(); @@ -1577,7 +1623,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { String zkHost = zkServer.getZkAddress(); streamFactory.withCollectionZkHost("collection1", zkHost); - Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s"); + Map paramsA = mapParams("q", "blah", "fl", "id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"),