diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 63dd27b99a0..a556b3326a8 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -91,6 +91,8 @@ New Features * SOLR-8198: Change ReducerStream to use StreamEqualitor instead of StreamComparator (Dennis Gove) +* SOLR-8268: StatsStream now implements the Expressible interface (Dennis Gove) + Optimizations ---------------------- * SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 64192035fdc..68b40aecb4d 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -36,6 +36,7 @@ import org.apache.solr.client.solrj.io.stream.ParallelStream; import org.apache.solr.client.solrj.io.stream.RankStream; import org.apache.solr.client.solrj.io.stream.ReducerStream; import org.apache.solr.client.solrj.io.stream.RollupStream; +import org.apache.solr.client.solrj.io.stream.StatsStream; import org.apache.solr.client.solrj.io.stream.StreamContext; import org.apache.solr.client.solrj.io.stream.TupleStream; import org.apache.solr.client.solrj.io.stream.UniqueStream; @@ -98,6 +99,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { .withFunctionName("group", ReducerStream.class) .withFunctionName("parallel", ParallelStream.class) .withFunctionName("rollup", RollupStream.class) + .withFunctionName("stats", StatsStream.class) // metrics .withFunctionName("min", MinMetric.class) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java index 27580316894..4fba9c4d280 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java @@ -18,21 +18,30 @@ package org.apache.solr.client.solrj.io.stream; */ import java.io.IOException; -import java.util.HashMap; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Map.Entry; + import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.client.solrj.io.stream.metrics.Metric; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; -public class StatsStream extends TupleStream { +public class StatsStream extends TupleStream implements Expressible { private static final long serialVersionUID = 1; @@ -51,11 +60,92 @@ public class StatsStream extends TupleStream { String collection, Map props, Metric[] metrics) { + init(zkHost, collection, props, metrics); + } + + private void init(String zkHost, String collection, Map props, Metric[] metrics) { this.zkHost = zkHost; this.props = props; this.metrics = metrics; this.collection = collection; } + + public StatsStream(StreamExpression expression, StreamFactory factory) throws IOException{ + // grab all parameters out + String collectionName = factory.getValueOperand(expression, 0); + List namedParams = factory.getNamedOperands(expression); + List metricExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Metric.class); + StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost"); + + // Validate there are no unknown parameters - zkHost is namedParameter so we don't need to count it twice + if(expression.getParameters().size() != 1 + namedParams.size() + metricExpressions.size()){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found",expression)); + } + + // Collection Name + if(null == collectionName){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression)); + } + + // Named parameters - passed directly to solr as solrparams + if(0 == namedParams.size()){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression)); + } + + Map params = new HashMap(); + for(StreamExpressionNamedParameter namedParam : namedParams){ + if(!namedParam.getName().equals("zkHost")){ + params.put(namedParam.getName(), namedParam.getParameter().toString().trim()); + } + } + + // zkHost, optional - if not provided then will look into factory list to get + String zkHost = null; + if(null == zkHostExpression){ + zkHost = factory.getCollectionZkHost(collectionName); + } + else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){ + zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue(); + } + if(null == zkHost){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName)); + } + + // metrics, optional - if not provided then why are you using this? + Metric[] metrics = new Metric[metricExpressions.size()]; + for(int idx = 0; idx < metricExpressions.size(); ++idx){ + metrics[idx] = factory.constructMetric(metricExpressions.get(idx)); + } + + // We've got all the required items + init(zkHost, collectionName, params, metrics); + } + + @Override + public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + // functionName(collectionName, param1, param2, ..., paramN, sort="comp", sum(fieldA), avg(fieldB)) + + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + // collection + expression.addParameter(collection); + + // parameters + for(Entry param : props.entrySet()){ + expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue())); + } + + // zkHost + expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost)); + + // metrics + for(Metric metric : metrics){ + expression.addParameter(metric.toExpression(factory)); + } + + return expression; + } public void setStreamContext(StreamContext context) { cache = context.getSolrClientCache(); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 98d24a3e6f0..f19875a9ea3 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -126,6 +126,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { testReducerStream(); testUniqueStream(); testRollupStream(); + testStatsStream(); testParallelUniqueStream(); testParallelReducerStream(); testParallelRankStream(); @@ -611,6 +612,69 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { commit(); } + private void testStatsStream() throws Exception { + + indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1"); + indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2"); + indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); + indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); + indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5"); + indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6"); + indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7"); + indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8"); + indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9"); + indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10"); + + commit(); + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", zkServer.getZkAddress()) + .withFunctionName("stats", StatsStream.class) + .withFunctionName("sum", SumMetric.class) + .withFunctionName("min", MinMetric.class) + .withFunctionName("max", MaxMetric.class) + .withFunctionName("avg", MeanMetric.class) + .withFunctionName("count", CountMetric.class); + + StreamExpression expression; + TupleStream stream; + List tuples; + + expression = StreamExpressionParser.parse("stats(collection1, q=*:*, sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), count(*))"); + stream = factory.constructStream(expression); + + tuples = getTuples(stream); + + assert(tuples.size() == 1); + + //Test Long and Double Sums + + Tuple tuple = tuples.get(0); + + Double sumi = tuple.getDouble("sum(a_i)"); + Double sumf = tuple.getDouble("sum(a_f)"); + Double mini = tuple.getDouble("min(a_i)"); + Double minf = tuple.getDouble("min(a_f)"); + Double maxi = tuple.getDouble("max(a_i)"); + Double maxf = tuple.getDouble("max(a_f)"); + Double avgi = tuple.getDouble("avg(a_i)"); + Double avgf = tuple.getDouble("avg(a_f)"); + Double count = tuple.getDouble("count(*)"); + + assertTrue(sumi.longValue() == 70); + assertTrue(sumf.doubleValue() == 55.0D); + assertTrue(mini.doubleValue() == 0.0D); + assertTrue(minf.doubleValue() == 1.0D); + assertTrue(maxi.doubleValue() == 14.0D); + assertTrue(maxf.doubleValue() == 10.0D); + assertTrue(avgi.doubleValue() == 7.0D); + assertTrue(avgf.doubleValue() == 5.5D); + assertTrue(count.doubleValue() == 10); + + del("*:*"); + commit(); + } + private void testParallelUniqueStream() throws Exception { indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java index 8b814caa35d..973164a5730 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java @@ -45,6 +45,7 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase { .withFunctionName("unique", UniqueStream.class) .withFunctionName("top", RankStream.class) .withFunctionName("group", ReducerStream.class) + .withFunctionName("stats", StatsStream.class) .withFunctionName("count", CountMetric.class) .withFunctionName("sum", SumMetric.class) .withFunctionName("min", MinMetric.class) @@ -75,6 +76,27 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase { assertTrue(expressionString.contains("a_s=kayden")); } + + @Test + public void testStatsStream() throws Exception { + + StatsStream stream; + String expressionString; + + // Basic test + stream = new StatsStream(StreamExpressionParser.parse("stats(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", sum(a_i), avg(a_i), count(*), min(a_i), max(a_i))"), factory); + expressionString = stream.toExpression(factory).toString(); + assertTrue(expressionString.contains("stats(collection1,")); + assertTrue(expressionString.contains("q=\"*:*\"")); + assertTrue(expressionString.contains("fl=\"id,a_s,a_i,a_f\"")); + assertTrue(expressionString.contains("sort=\"a_f asc, a_i asc\"")); + assertTrue(expressionString.contains("min(a_i)")); + assertTrue(expressionString.contains("max(a_i)")); + assertTrue(expressionString.contains("avg(a_i)")); + assertTrue(expressionString.contains("count(*)")); + assertTrue(expressionString.contains("sum(a_i)")); + + } @Test public void testUniqueStream() throws Exception {