SOLR-10559: Cleaner syntax

This commit is contained in:
Joel Bernstein 2017-04-27 16:30:46 -04:00
parent 64caf176ba
commit e57fab17c0
10 changed files with 370 additions and 97 deletions

View File

@ -166,6 +166,8 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("let", LetStream.class)
.withFunctionName("get", GetStream.class)
.withFunctionName("timeseries", TimeSeriesStream.class)
.withFunctionName("tuple", TupStream.class)
.withFunctionName("col", ColumnEvaluator.class)
// metrics
.withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class)

View File

@ -34,8 +34,8 @@ public class AddEvaluator extends NumberEvaluator {
public AddEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
super(expression, factory);
if(subEvaluators.size() < 2){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting at least two values but found %d",expression,subEvaluators.size()));
if(subEvaluators.size() < 1){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting at least 1 value but found %d",expression,subEvaluators.size()));
}
}

View File

@ -50,6 +50,19 @@ public abstract class NumberEvaluator extends ComplexEvaluator {
}
else if(result instanceof Number){
results.add(new BigDecimal(result.toString()));
} else if(result instanceof List) {
List l = (List) result;
for(Object o : l) {
if(o instanceof Number) {
results.add(new BigDecimal(o.toString()));
} else {
String message = String.format(Locale.ROOT,"Failed to evaluate to a numeric value - evaluator '%s' resulted in type '%s' and value '%s'",
subEvaluator.toExpression(constructingFactory),
o.getClass().getName(),
o.toString());
throw new IOException(message);
}
}
}
else{
String message = String.format(Locale.ROOT,"Failed to evaluate to a numeric value - evaluator '%s' resulted in type '%s' and value '%s'",

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.eval.SimpleEvaluator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class ColumnEvaluator extends SimpleEvaluator implements Expressible {
private static final long serialVersionUID = 1;
private String name;
private String fieldName;
;
public ColumnEvaluator(StreamExpression expression, StreamFactory factory) throws IOException {
String name = factory.getValueOperand(expression, 0);
String fieldName = factory.getValueOperand(expression, 1);
init(name, fieldName);
}
private void init(String name, String fieldName) {
this.name = name;
this.fieldName = fieldName;
}
public List<Number> evaluate(Tuple tuple) throws IOException {
List<Tuple> tuples = (List<Tuple>)tuple.get(name);
List<Number> column = new ArrayList(tuples.size());
for(Tuple t : tuples) {
System.out.println("###### Field:"+fieldName);
Object o = t.get(fieldName);
if(o instanceof Number) {
column.add((Number)o);
} else {
throw new IOException("Found non-numeric in column:"+o.toString());
}
}
return column;
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
StreamExpression expression = new StreamExpression(factory.getFunctionName(getClass()));
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new Explanation(nodeId.toString())
.withExpressionType(ExpressionType.EVALUATOR)
.withFunctionName(factory.getFunctionName(getClass()))
.withImplementingClass(getClass().getName())
.withExpression(toExpression(factory).toString());
}
}

View File

@ -18,60 +18,55 @@ package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eval.StreamEvaluator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class LetStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1;
private TupleStream stream;
private List<CellStream> cellStreams;
private StreamContext streamContext;
public LetStream(TupleStream stream, List<CellStream> cellStreams) throws IOException {
init(stream, cellStreams);
}
private Map letParams = new HashMap();
public LetStream(StreamExpression expression, StreamFactory factory) throws IOException {
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
if(streamExpressions.size() < 2){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting atleast 2 streams but found %d",expression, streamExpressions.size()));
}
TupleStream stream = null;
List<CellStream> cellStreams = new ArrayList();
for(StreamExpression streamExpression : streamExpressions) {
TupleStream s = factory.constructStream(streamExpression);
if(s instanceof CellStream) {
cellStreams.add((CellStream)s);
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
//Get all the named params
for(StreamExpressionParameter np : namedParams) {
String name = ((StreamExpressionNamedParameter)np).getName();
StreamExpressionParameter param = ((StreamExpressionNamedParameter)np).getParameter();
if(factory.isEvaluator((StreamExpression)param)) {
StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param);
letParams.put(name, evaluator);
} else {
if(stream == null) {
stream = s;
} else {
throw new IOException("Found more then one stream that was not a CellStream");
}
TupleStream tupleStream = factory.constructStream((StreamExpression) param);
letParams.put(name, tupleStream);
}
}
init(stream, cellStreams);
if(streamExpressions.size() != 1){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting 1 stream but found %d",expression, streamExpressions.size()));
}
stream = factory.constructStream(streamExpressions.get(0));
}
private void init(TupleStream _stream, List<CellStream> _cellStreams) {
this.stream = _stream;
this.cellStreams = _cellStreams;
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException{
@ -82,9 +77,6 @@ public class LetStream extends TupleStream implements Expressible {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
expression.addParameter(((Expressible) stream).toExpression(factory));
for(CellStream cellStream : cellStreams) {
expression.addParameter(((Expressible)cellStream).toExpression(factory));
}
return expression;
}
@ -123,17 +115,40 @@ public class LetStream extends TupleStream implements Expressible {
}
public void open() throws IOException {
Map<String, List<Tuple>> lets = streamContext.getLets();
for(CellStream cellStream : cellStreams) {
try {
cellStream.setStreamContext(streamContext);
cellStream.open();
Tuple tup = cellStream.read();
String name = cellStream.getName();
List<Tuple> tuples = (List<Tuple>)tup.get(name);
lets.put(name, tuples);
} finally {
cellStream.close();
Map<String, Object> lets = streamContext.getLets();
Set<Map.Entry<String, Object>> entries = letParams.entrySet();
//Load up the StreamContext with the data created by the letParams.
for(Map.Entry<String, Object> entry : entries) {
String name = entry.getKey();
Object o = entry.getValue();
if(o instanceof TupleStream) {
List<Tuple> tuples = new ArrayList();
TupleStream tStream = (TupleStream)o;
tStream.setStreamContext(streamContext);
try {
tStream.open();
TUPLES:
while(true) {
Tuple tuple = tStream.read();
if (tuple.EOF) {
break TUPLES;
} else {
tuples.add(tuple);
}
}
lets.put(name, tuples);
} finally {
tStream.close();
}
} else {
//Add the data from the StreamContext to a tuple.
//Let the evaluator work from this tuple.
//This will allow columns to be created from tuples already in the StreamContext.
Tuple eTuple = new Tuple(lets);
StreamEvaluator evaluator = (StreamEvaluator)o;
Object eo = evaluator.evaluate(eTuple);
lets.put(name, eo);
}
}
stream.open();

View File

@ -39,14 +39,14 @@ public class StreamContext implements Serializable{
private Map entries = new HashMap();
private Map tupleContext = new HashMap();
private Map<String, List<Tuple>> lets = new HashMap();
private Map<String, Object> lets = new HashMap();
public int workerID;
public int numWorkers;
private SolrClientCache clientCache;
private ModelCache modelCache;
private StreamFactory streamFactory;
public Map<String, List<Tuple>> getLets(){
public Map<String, Object> getLets(){
return lets;
}

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eval.StreamEvaluator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class TupStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1;
private StreamContext streamContext;
private Map tupleParams = new HashMap();
private boolean finished;
public TupStream(StreamExpression expression, StreamFactory factory) throws IOException {
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
//Get all the named params
for(StreamExpressionParameter np : namedParams) {
String name = ((StreamExpressionNamedParameter)np).getName();
StreamExpressionParameter param = ((StreamExpressionNamedParameter)np).getParameter();
if(param instanceof StreamExpressionValue) {
tupleParams.put(name, ((StreamExpressionValue)param).getValue());
} else {
if (factory.isEvaluator((StreamExpression) param)) {
StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param);
tupleParams.put(name, evaluator);
} else {
TupleStream tupleStream = factory.constructStream((StreamExpression) param);
tupleParams.put(name, tupleStream);
}
}
}
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
explanation.setFunctionName(factory.getFunctionName(this.getClass()));
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.STREAM_DECORATOR);
explanation.setExpression(toExpression(factory, false).toString());
return explanation;
}
public void setStreamContext(StreamContext context) {
this.streamContext = context;
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList<TupleStream>();
return l;
}
public Tuple read() throws IOException {
if(finished) {
Map m = new HashMap();
m.put("EOF", true);
return new Tuple(m);
} else {
finished = true;
Map<String, Object> map = new HashMap();
Set<Map.Entry<String, Object>> entries = tupleParams.entrySet();
for (Map.Entry<String, Object> entry : entries) {
String name = entry.getKey();
Object o = entry.getValue();
if (o instanceof TupleStream) {
List<Tuple> tuples = new ArrayList();
TupleStream tStream = (TupleStream) o;
tStream.setStreamContext(streamContext);
try {
tStream.open();
TUPLES:
while (true) {
Tuple tuple = tStream.read();
if (tuple.EOF) {
break TUPLES;
} else {
tuples.add(tuple);
}
}
map.put(name, tuples);
} finally {
tStream.close();
}
} else if ((o instanceof StreamEvaluator)) {
Tuple eTuple = new Tuple(streamContext.getLets());
StreamEvaluator evaluator = (StreamEvaluator) o;
Object eo = evaluator.evaluate(eTuple);
map.put(name, eo);
} else {
map.put(name, streamContext.getLets().get(o.toString()));
}
}
return new Tuple(map);
}
}
public void close() throws IOException {
}
public void open() throws IOException {
}
/** Return the stream sort - ie, the order in which records are returned */
public StreamComparator getStreamSort(){
return null;
}
public int getCost() {
return 0;
}
}

View File

@ -90,7 +90,7 @@ public class StreamFactory implements Serializable {
}
public List<String> getValueOperands(StreamExpression expression){
return getOperandsOfType(expression, StreamExpressionValue.class).stream().map(item -> ((StreamExpressionValue)item).getValue()).collect(Collectors.toList());
return getOperandsOfType(expression, StreamExpressionValue.class).stream().map(item -> ((StreamExpressionValue) item).getValue()).collect(Collectors.toList());
}
/** Given an expression, will return the value parameter at the given index, or null if doesn't exist */
@ -377,6 +377,17 @@ public class StreamFactory implements Serializable {
throw new IOException(String.format(Locale.ROOT,"Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName()));
}
public boolean isEvaluator(StreamExpression expression) throws IOException{
String function = expression.getFunctionName();
if(functionNames.containsKey(function)){
Class<? extends Expressible> clazz = functionNames.get(function);
if(Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)){
return true;
}
}
return false;
}
public <T> T createInstance(Class<T> clazz, Class<?>[] paramTypes, Object[] params) throws IOException{
Constructor<T> ctor;

View File

@ -5111,6 +5111,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
@Test
public void testListStream() throws Exception {
UpdateRequest updateRequest = new UpdateRequest();
@ -5174,15 +5176,16 @@ public class StreamExpressionTest extends SolrCloudTestCase {
}
@Test
public void testLetGetStream() throws Exception {
public void testTupleStream() throws Exception {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.add(id, "hello", "test_t", "l b c d c e");
updateRequest.add(id, "hello1", "test_t", "l b c d c");
updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String expr = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=\"id,test_t\", sort=\"id desc\")";
String cat = "let(cell(results,"+expr+"), get(results))";
//Add a Stream and an Evaluator to the Tuple.
String cat = "tuple(results="+expr+", sum=add(1,1))";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cat);
paramsLoc.set("qt", "/stream");
@ -5193,60 +5196,51 @@ public class StreamExpressionTest extends SolrCloudTestCase {
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertTrue(tuples.size() == 2);
assertTrue(tuples.get(0).get("id").equals("hello1"));
assertTrue(tuples.get(0).get("test_t").equals("l b c d c"));
assertTrue(tuples.get(1).get("id").equals("hello"));
assertTrue(tuples.get(1).get("test_t").equals("l b c d c e"));
assertTrue(tuples.size() == 1);
List<Map> results = (List<Map>)tuples.get(0).get("results");
assertTrue(results.get(0).get("id").equals("hello1"));
assertTrue(results.get(0).get("test_t").equals("l b c d c"));
assertTrue(results.get(1).get("id").equals("hello"));
assertTrue(results.get(1).get("test_t").equals("l b c d c e"));
assertTrue(tuples.get(0).getLong("sum").equals(2L));
//Test there are no side effects when transforming tuples.
expr = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=\"id,test_t\", sort=\"id desc\")";
cat = "let(cell(results,"+expr+"), list(select(get(results), id as newid, test_t), get(results)))";
paramsLoc = new ModifiableSolrParams();
}
@Test
public void testLetStream() throws Exception {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.add(id, "hello", "test_t", "l b c d c e", "test_i", "5");
updateRequest.add(id, "hello1", "test_t", "l b c d c", "test_i", "4");
updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String expr = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=\"id,test_t, test_i\", sort=\"id desc\")";
String cat = "let(a="+expr+", b=add(1,3), c=col(a, test_i), tuple(test=add(1,1), test1=b, results=a, test2=add(c)))";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cat);
paramsLoc.set("qt", "/stream");
solrStream = new SolrStream(url, paramsLoc);
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS;
TupleStream solrStream = new SolrStream(url, paramsLoc);
context = new StreamContext();
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
tuples = getTuples(solrStream);
assertTrue(tuples.size() == 4);
assertTrue(tuples.get(0).get("newid").equals("hello1"));
assertTrue(tuples.get(0).get("test_t").equals("l b c d c"));
assertTrue(tuples.get(1).get("newid").equals("hello"));
assertTrue(tuples.get(1).get("test_t").equals("l b c d c e"));
assertTrue(tuples.get(2).get("id").equals("hello1"));
assertTrue(tuples.get(2).get("test_t").equals("l b c d c"));
assertTrue(tuples.get(3).get("id").equals("hello"));
assertTrue(tuples.get(3).get("test_t").equals("l b c d c e"));
List<Tuple> tuples = getTuples(solrStream);
assertTrue(tuples.size() == 1);
Tuple tuple1 = tuples.get(0);
List<Map> results = (List<Map>)tuple1.get("results");
assertTrue(results.size() == 2);
assertTrue(results.get(0).get("id").equals("hello1"));
assertTrue(results.get(0).get("test_t").equals("l b c d c"));
assertTrue(results.get(1).get("id").equals("hello"));
assertTrue(results.get(1).get("test_t").equals("l b c d c e"));
//Test multiple lets
assertTrue(tuple1.getLong("test").equals(2L));
assertTrue(tuple1.getLong("test1").equals(4L));
assertTrue(tuple1.getLong("test2").equals(9L));
//Test there are no side effects when transforming tuples.
expr = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=\"id,test_t\", sort=\"id desc\")";
String expr1 = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=\"id,test_t\", sort=\"id asc\")";
cat = "let(cell(results,"+expr+"), cell(results1,"+expr1+"), list(select(get(results), id as newid, test_t), get(results1)))";
paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cat);
paramsLoc.set("qt", "/stream");
solrStream = new SolrStream(url, paramsLoc);
context = new StreamContext();
solrStream.setStreamContext(context);
tuples = getTuples(solrStream);
assertTrue(tuples.size() == 4);
assertTrue(tuples.get(0).get("newid").equals("hello1"));
assertTrue(tuples.get(0).get("test_t").equals("l b c d c"));
assertTrue(tuples.get(1).get("newid").equals("hello"));
assertTrue(tuples.get(1).get("test_t").equals("l b c d c e"));
assertTrue(tuples.get(2).get("id").equals("hello"));
assertTrue(tuples.get(2).get("test_t").equals("l b c d c e"));
assertTrue(tuples.get(3).get("id").equals("hello1"));
assertTrue(tuples.get(3).get("test_t").equals("l b c d c"));
}
@Test

View File

@ -68,11 +68,6 @@ public class AddEvaluatorTest extends LuceneTestCase {
Assert.assertTrue(result instanceof Double);
Assert.assertEquals(3.2D, result);
}
@Test(expected = IOException.class)
public void addOneField() throws Exception{
factory.constructEvaluator("add(a)");
}
@Test
public void addTwoFieldWithNulls() throws Exception{