SOLR-10747: Allow /stream handler to execute Stream Evaluators directly

This commit is contained in:
Joel Bernstein 2017-05-25 13:52:04 -04:00
parent 1e4d2052e6
commit b3ee2d03db
2 changed files with 14 additions and 34 deletions

View File

@ -45,6 +45,9 @@ import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
@ -298,7 +301,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
TupleStream tupleStream;
try {
tupleStream = this.streamFactory.constructStream(params.get("expr"));
StreamExpression streamExpression = StreamExpressionParser.parse(params.get("expr"));
if(this.streamFactory.isEvaluator(streamExpression)) {
StreamExpression tupleExpression = new StreamExpression("tuple");
tupleExpression.addParameter(new StreamExpressionNamedParameter("out", streamExpression));
tupleStream = this.streamFactory.constructStream(tupleExpression);
} else {
tupleStream = this.streamFactory.constructStream(streamExpression);
}
} catch (Exception e) {
//Catch exceptions that occur while the stream is being created. This will include streaming expression parse rules.
SolrException.log(logger, e);

View File

@ -5207,8 +5207,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
@Test
public void testSequence() throws Exception {
String expr = "tuple(seq=sequence(20, 0, 1))";
public void testEvaluatorOnly() throws Exception {
String expr = "sequence(20, 0, 1)";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", expr);
paramsLoc.set("qt", "/stream");
@ -5220,41 +5220,11 @@ public class StreamExpressionTest extends SolrCloudTestCase {
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertTrue(tuples.size() == 1);
List<Number> sequence = (List<Number>)tuples.get(0).get("seq");
List<Number> sequence = (List<Number>)tuples.get(0).get("out");
assertTrue(sequence.size() == 20);
for(int i=0; i<sequence.size(); i++) {
assertTrue(sequence.get(i).intValue() == i);
}
//Change the size, stride
expr = "tuple(seq=sequence(100, 0, 4))";
paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", expr);
paramsLoc.set("qt", "/stream");
solrStream = new SolrStream(url, paramsLoc);
tuples = getTuples(solrStream);
assertTrue(tuples.size() == 1);
sequence = (List<Number>)tuples.get(0).get("seq");
assertTrue(sequence.size() == 100);
for(int i=0; i<sequence.size(); i++) {
assertTrue(sequence.get(i).intValue() == (i*4));
}
//Change the start
expr = "tuple(seq=sequence(100, 10, 1))";
paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", expr);
paramsLoc.set("qt", "/stream");
solrStream = new SolrStream(url, paramsLoc);
tuples = getTuples(solrStream);
assertTrue(tuples.size() == 1);
sequence = (List<Number>)tuples.get(0).get("seq");
assertTrue(sequence.size() == 100);
for(int i=0; i<sequence.size(); i++) {
assertTrue(sequence.get(i).intValue() == (i+10));
}
}