SOLR-13088: Add zplot Stream Evaluator to plot math expressions in Apache Zeppelin

This commit is contained in:
Joel Bernstein 2018-12-27 14:42:03 -05:00
parent 106d300052
commit d018cd18f4
4 changed files with 290 additions and 2 deletions

View File

@ -92,6 +92,8 @@ public class Lang {
.withFunctionName("tuple", TupStream.class)
.withFunctionName("sql", SqlStream.class)
.withFunctionName("plist", ParallelListStream.class)
.withFunctionName("zplot", ZplotStream.class)
// metrics
.withFunctionName("min", MinMetric.class)

View File

@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eval.StreamEvaluator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class ZplotStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1;
private StreamContext streamContext;
private Map letParams = new LinkedHashMap();
private Iterator<Tuple> out;
public ZplotStream(StreamExpression expression, StreamFactory factory) throws IOException {
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
//Get all the named params
for(StreamExpressionParameter np : namedParams) {
String name = ((StreamExpressionNamedParameter)np).getName();
StreamExpressionParameter param = ((StreamExpressionNamedParameter)np).getParameter();
if(param instanceof StreamExpressionValue) {
String paramValue = ((StreamExpressionValue) param).getValue();
letParams.put(name, factory.constructPrimitiveObject(paramValue));
} else if(factory.isEvaluator((StreamExpression)param)) {
StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param);
letParams.put(name, evaluator);
}
}
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
explanation.setFunctionName(factory.getFunctionName(this.getClass()));
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.STREAM_DECORATOR);
explanation.setExpression(toExpression(factory, false).toString());
return explanation;
}
public void setStreamContext(StreamContext context) {
this.streamContext = context;
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList<TupleStream>();
return l;
}
public Tuple read() throws IOException {
if(out.hasNext()) {
return out.next();
} else {
Map m = new HashMap();
m.put("EOF", true);
Tuple t = new Tuple(m);
return t;
}
}
public void close() throws IOException {
}
public void open() throws IOException {
Map<String, Object> lets = streamContext.getLets();
Set<Map.Entry<String, Object>> entries = letParams.entrySet();
Map<String, Object> evaluated = new HashMap();
//Load up the StreamContext with the data created by the letParams.
int numTuples = -1;
int columns = 0;
boolean table = false;
for(Map.Entry<String, Object> entry : entries) {
++columns;
String name = entry.getKey();
if(name.equals("table")) {
table = true;
}
Object o = entry.getValue();
if(o instanceof StreamEvaluator) {
Tuple eTuple = new Tuple(lets);
StreamEvaluator evaluator = (StreamEvaluator)o;
evaluator.setStreamContext(streamContext);
Object eo = evaluator.evaluate(eTuple);
if(eo instanceof List) {
List l = (List)eo;
if(numTuples == -1) {
numTuples = l.size();
} else {
if(l.size() != numTuples) {
throw new IOException("All lists provided to the zplot function must be the same length.");
}
}
evaluated.put(name, l);
} else if (eo instanceof Tuple) {
evaluated.put(name, eo);
}
} else {
Object eval = lets.get(o);
if(eval instanceof List) {
List l = (List)eval;
if(numTuples == -1) {
numTuples = l.size();
} else {
if(l.size() != numTuples) {
throw new IOException("All lists provided to the zplot function must be the same length.");
}
}
evaluated.put(name, l);
} else if(eval instanceof Tuple) {
evaluated.put(name, eval);
}
}
}
if(columns > 1 && table) {
throw new IOException("If the table parameter is set there can only be one parameter.");
}
//Load the values into tuples
List<Tuple> outTuples = new ArrayList();
if(!table) {
//Handle the vectors
for (int i = 0; i < numTuples; i++) {
Tuple tuple = new Tuple(new HashMap());
for (String key : evaluated.keySet()) {
List l = (List) evaluated.get(key);
tuple.put(key, l.get(i));
}
outTuples.add(tuple);
}
} else {
//Handle the Tuple and List of Tuples
Object o = evaluated.get("table");
if(o instanceof List) {
List<Tuple> tuples = (List<Tuple>)o;
outTuples.addAll(tuples);
} else if(o instanceof Tuple) {
outTuples.add((Tuple)o);
}
}
this.out = outTuples.iterator();
}
/** Return the stream sort - ie, the order in which records are returned */
public StreamComparator getStreamSort(){
return null;
}
public int getCost() {
return 0;
}
}

View File

@ -73,7 +73,8 @@ public class TestLang extends LuceneTestCase {
"outliers", "stream", "getCache", "putCache", "listCache", "removeCache", "zscores", "latlonVectors",
"convexHull", "getVertices", "getBaryCenter", "getArea", "getBoundarySize","oscillate",
"getAmplitude", "getPhase", "getAngularFrequency", "enclosingDisk", "getCenter", "getRadius",
"getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim", "export"};
"getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim", "export",
"zplot"};
@Test
public void testLang() {

View File

@ -1356,6 +1356,83 @@ public class MathExpressionTest extends SolrCloudTestCase {
assertTrue(tuples.get(0).getLong("i")== 2);
}
@Test
public void testZplot() throws Exception {
String cexpr = "let(c=tuple(a=add(1,2), b=add(2,3))," +
" zplot(table=c))";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cexpr);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS;
TupleStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertTrue(tuples.size() == 1);
Tuple out = tuples.get(0);
assertEquals(out.getDouble("a").doubleValue(), 3.0, 0.0);
assertEquals(out.getDouble("b").doubleValue(), 5.0, 0.0);
cexpr = "let(c=list(tuple(a=add(1,2), b=add(2,3)), tuple(a=add(1,3), b=add(2,4)))," +
" zplot(table=c))";
paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cexpr);
paramsLoc.set("qt", "/stream");
solrStream = new SolrStream(url, paramsLoc);
context = new StreamContext();
solrStream.setStreamContext(context);
tuples = getTuples(solrStream);
assertTrue(tuples.size() == 2);
out = tuples.get(0);
assertEquals(out.getDouble("a").doubleValue(), 3.0, 0.0);
assertEquals(out.getDouble("b").doubleValue(), 5.0, 0.0);
out = tuples.get(1);
assertEquals(out.getDouble("a").doubleValue(), 4.0, 0.0);
assertEquals(out.getDouble("b").doubleValue(), 6.0, 0.0);
cexpr = "let(a=array(1,2,3,4)," +
" b=array(10,11,12,13),"+
" zplot(x=a, y=b))";
paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cexpr);
paramsLoc.set("qt", "/stream");
solrStream = new SolrStream(url, paramsLoc);
context = new StreamContext();
solrStream.setStreamContext(context);
tuples = getTuples(solrStream);
assertTrue(tuples.size() == 4);
out = tuples.get(0);
assertEquals(out.getDouble("x").doubleValue(), 1.0, 0.0);
assertEquals(out.getDouble("y").doubleValue(), 10.0, 0.0);
out = tuples.get(1);
assertEquals(out.getDouble("x").doubleValue(), 2.0, 0.0);
assertEquals(out.getDouble("y").doubleValue(), 11.0, 0.0);
out = tuples.get(2);
assertEquals(out.getDouble("x").doubleValue(), 3.0, 0.0);
assertEquals(out.getDouble("y").doubleValue(), 12.0, 0.0);
out = tuples.get(3);
assertEquals(out.getDouble("x").doubleValue(), 4.0, 0.0);
assertEquals(out.getDouble("y").doubleValue(), 13.0, 0.0);
}
@Test
public void testMatrixMath() throws Exception {
String cexpr = "let(echo=true, a=matrix(array(1.5, 2.5, 3.5), array(4.5,5.5,6.5)), " +