SOLR-10559: Updates TupStream and enhances evaluators to work over values in the SteamContext

This commit is contained in:
Dennis Gove 2017-04-28 21:45:56 -04:00
parent 7f6f68c7f4
commit 460b3b36e9
9 changed files with 261 additions and 53 deletions

View File

@ -48,6 +48,10 @@ public class Tuple implements Cloneable, MapWriter {
public List<String> fieldNames; public List<String> fieldNames;
public Map<String, String> fieldLabels; public Map<String, String> fieldLabels;
public Tuple(){
// just an empty tuple
}
public Tuple(Map fields) { public Tuple(Map fields) {
if(fields.containsKey("EOF")) { if(fields.containsKey("EOF")) {
EOF = true; EOF = true;

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.comp;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/**
* An equality Comparator to be used when a stream will only ever return a single field,
* ie, it has no sorted order
**/
public class SingleValueComparator implements StreamComparator {
private static final long serialVersionUID = 1;
private UUID comparatorNodeId = UUID.randomUUID();
public StreamExpressionParameter toExpression(StreamFactory factory){
return null;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return null;
}
public int compare(Tuple leftTuple, Tuple rightTuple) {
return -1; // whatever, just keep everything in same order
}
@Override
public boolean isDerivedFrom(StreamComparator base){
// this doesn't sort, so everything else is a match
return true;
}
@Override
public SingleValueComparator copyAliased(Map<String,String> aliases){
return this;
}
@Override
public StreamComparator append(StreamComparator other){
return other;
}
}

View File

@ -101,5 +101,12 @@ public abstract class ComplexEvaluator implements StreamEvaluator {
public void setStreamContext(StreamContext context) { public void setStreamContext(StreamContext context) {
this.streamContext = context; this.streamContext = context;
for(StreamEvaluator subEvaluator : subEvaluators){
subEvaluator.setStreamContext(context);
}
}
public StreamContext getStreamContext(){
return streamContext;
} }
} }

View File

@ -44,9 +44,24 @@ public class FieldEvaluator extends SimpleEvaluator {
} }
@Override @Override
public Object evaluate(Tuple tuple) { public Object evaluate(Tuple tuple) throws IOException {
Object value = tuple.get(fieldName); Object value = tuple.get(fieldName);
// This is somewhat radical.
// Here, we allow for the use of the context to provide alternative values
// when they are not available in the provided tuple. This means that all
// evaluators can evaluate over both a stream's tuple and the context, and
// can even evaluate over fields from both of them in the same evaluation
if(null == value && null != getStreamContext()){
value = getStreamContext().getLets().get(fieldName);
// If what's contained in the context is itself an evaluator then
// we need to evaluate it
if(value instanceof StreamEvaluator){
value = ((StreamEvaluator)value).evaluate(tuple);
}
}
// if we have an array then convert to an ArrayList // if we have an array then convert to an ArrayList
// if we have an iterable that is not a list then convert to ArrayList // if we have an iterable that is not a list then convert to ArrayList
// lists are good to go // lists are good to go

View File

@ -32,5 +32,8 @@ public abstract class SimpleEvaluator implements StreamEvaluator {
public void setStreamContext(StreamContext streamContext) { public void setStreamContext(StreamContext streamContext) {
this.streamContext = streamContext; this.streamContext = streamContext;
} }
public StreamContext getStreamContext(){
return streamContext;
}
} }

View File

@ -27,6 +27,30 @@ import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.Expressible;
public interface StreamEvaluator extends Expressible, Serializable { public interface StreamEvaluator extends Expressible, Serializable {
Object evaluate(final Tuple tuple) throws IOException;
void setStreamContext(StreamContext streamContext); void setStreamContext(StreamContext streamContext);
StreamContext getStreamContext();
Object evaluate(final Tuple tuple) throws IOException;
/**
* Execute the evaluator over lets stored within the StreamContext. This allows
* evaluators to be executed over values calculated elsewhere in the pipeline
* and stored in the {@link StreamContext#getLets() streamContext.lets}
*
* Default implementation just creates a tuple out of all values in the context
* and passes that to {@link StreamEvaluator#evaluate(Tuple)}.
*
* @return Evaluated value
* @throws IOException throw on error during evaluation
*/
default Object evaluateOverContext() throws IOException{
StreamContext context = getStreamContext();
if(null != context){
Tuple contextTuple = new Tuple(context.getLets());
return evaluate(contextTuple);
}
return null;
}
} }

View File

@ -20,10 +20,12 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Map.Entry;
import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.SingleValueComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eval.StreamEvaluator; import org.apache.solr.client.solrj.io.eval.StreamEvaluator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Explanation;
@ -40,27 +42,34 @@ public class TupStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1; private static final long serialVersionUID = 1;
private StreamContext streamContext; private StreamContext streamContext;
private Map tupleParams = new HashMap();
private Map<String,String> stringParams = new HashMap<>();
private Map<String,StreamEvaluator> evaluatorParams = new HashMap<>();
private Map<String,TupleStream> streamParams = new HashMap<>();
private boolean finished; private boolean finished;
public TupStream(StreamExpression expression, StreamFactory factory) throws IOException { public TupStream(StreamExpression expression, StreamFactory factory) throws IOException {
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression); List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
//Get all the named params //Get all the named params
for(StreamExpressionParameter np : namedParams) { for(StreamExpressionNamedParameter np : namedParams) {
String name = ((StreamExpressionNamedParameter)np).getName(); String name = np.getName();
StreamExpressionParameter param = ((StreamExpressionNamedParameter)np).getParameter(); StreamExpressionParameter param = np.getParameter();
// we're going to split these up here so we only make the choice once
// order of these in read() doesn't matter
if(param instanceof StreamExpressionValue) { if(param instanceof StreamExpressionValue) {
tupleParams.put(name, ((StreamExpressionValue)param).getValue()); stringParams.put(name, ((StreamExpressionValue)param).getValue());
} else { } else if (factory.isEvaluator((StreamExpression) param)) {
if (factory.isEvaluator((StreamExpression) param)) { StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param);
StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param); evaluatorParams.put(name, evaluator);
tupleParams.put(name, evaluator); } else if(factory.isStream((StreamExpression)param)) {
} else { TupleStream tupleStream = factory.constructStream((StreamExpression) param);
TupleStream tupleStream = factory.constructStream((StreamExpression) param); streamParams.put(name, tupleStream);
tupleParams.put(name, tupleStream); }
} else{
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - only string, evaluator, or stream named parameters are supported, but param %d is none of those",expression, name));
} }
} }
} }
@ -74,6 +83,26 @@ public class TupStream extends TupleStream implements Expressible {
// function name // function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// add string based params
for(Entry<String,String> param : stringParams.entrySet()){
expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue()));
}
// add evaluator based params
for(Entry<String,StreamEvaluator> param : evaluatorParams.entrySet()){
expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue().toExpression(factory)));
}
// add stream based params
for(Entry<String,TupleStream> param : streamParams.entrySet()){
if(includeStreams){
expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), ((Expressible)param.getValue()).toExpression(factory)));
}
else{
expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), "<stream>"));
}
}
return expression; return expression;
} }
@ -91,6 +120,15 @@ public class TupStream extends TupleStream implements Expressible {
public void setStreamContext(StreamContext context) { public void setStreamContext(StreamContext context) {
this.streamContext = context; this.streamContext = context;
// also set in evalators and streams
for(StreamEvaluator evaluator : evaluatorParams.values()){
evaluator.setStreamContext(context);
}
for(TupleStream stream : streamParams.values()){
stream.setStreamContext(context);
}
} }
public List<TupleStream> children() { public List<TupleStream> children() {
@ -101,59 +139,68 @@ public class TupStream extends TupleStream implements Expressible {
public Tuple read() throws IOException { public Tuple read() throws IOException {
if(finished) { if(finished) {
Map m = new HashMap(); Map<String,Object> m = new HashMap<>();
m.put("EOF", true); m.put("EOF", true);
return new Tuple(m); return new Tuple(m);
} else { } else {
finished = true; finished = true;
Map<String, Object> map = new HashMap(); Map<String, Object> values = new HashMap<>();
Set<Map.Entry<String, Object>> entries = tupleParams.entrySet();
for (Map.Entry<String, Object> entry : entries) { // add all string based params
String name = entry.getKey(); // these could come from the context, or they will just be treated as straight strings
Object o = entry.getValue(); for(Entry<String,String> param : stringParams.entrySet()){
if (o instanceof TupleStream) { if(streamContext.getLets().containsKey(param.getValue())){
List<Tuple> tuples = new ArrayList(); values.put(param.getKey(), streamContext.getLets().get(param.getValue()));
TupleStream tStream = (TupleStream) o; }
tStream.setStreamContext(streamContext); else{
try { values.put(param.getKey(), param.getValue());
tStream.open();
TUPLES:
while (true) {
Tuple tuple = tStream.read();
if (tuple.EOF) {
break TUPLES;
} else {
tuples.add(tuple);
}
}
map.put(name, tuples);
} finally {
tStream.close();
}
} else if ((o instanceof StreamEvaluator)) {
Tuple eTuple = new Tuple(streamContext.getLets());
StreamEvaluator evaluator = (StreamEvaluator) o;
Object eo = evaluator.evaluate(eTuple);
map.put(name, eo);
} else {
map.put(name, streamContext.getLets().get(o.toString()));
} }
} }
return new Tuple(map);
// add all evaluators
for(Entry<String,StreamEvaluator> param : evaluatorParams.entrySet()){
values.put(param.getKey(), param.getValue().evaluateOverContext());
}
// Add all streams
for(Entry<String,TupleStream> param : streamParams.entrySet()){
try{
List<Tuple> streamTuples = new ArrayList<Tuple>();
// open the stream, closed in finally block
param.getValue().open();
// read all values from stream (memory expensive)
Tuple streamTuple = param.getValue().read();
while(!streamTuple.EOF){
streamTuples.add(streamTuple);
streamTuple = param.getValue().read();
}
values.put(param.getKey(), streamTuples);
}
finally{
// safely close the stream
param.getValue().close();
}
}
return new Tuple(values);
} }
} }
public void close() throws IOException { public void close() throws IOException {
// Nothing to do here
} }
public void open() throws IOException { public void open() throws IOException {
// nothing to do here
} }
/** Return the stream sort - ie, the order in which records are returned */ /** Return the stream sort - ie, the order in which records are returned */
public StreamComparator getStreamSort(){ public StreamComparator getStreamSort(){
return null; return new SingleValueComparator();
} }
public int getCost() { public int getCost() {

View File

@ -377,6 +377,18 @@ public class StreamFactory implements Serializable {
throw new IOException(String.format(Locale.ROOT,"Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName())); throw new IOException(String.format(Locale.ROOT,"Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName()));
} }
public boolean isStream(StreamExpression expression) throws IOException{
String function = expression.getFunctionName();
if(functionNames.containsKey(function)){
Class<? extends Expressible> clazz = functionNames.get(function);
if(Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){
return true;
}
}
return false;
}
public boolean isEvaluator(StreamExpression expression) throws IOException{ public boolean isEvaluator(StreamExpression expression) throws IOException{
String function = expression.getFunctionName(); String function = expression.getFunctionName();
if(functionNames.containsKey(function)){ if(functionNames.containsKey(function)){

View File

@ -23,7 +23,9 @@ import java.util.Map;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.eval.AbsoluteValueEvaluator; import org.apache.solr.client.solrj.io.eval.AbsoluteValueEvaluator;
import org.apache.solr.client.solrj.io.eval.AddEvaluator;
import org.apache.solr.client.solrj.io.eval.StreamEvaluator; import org.apache.solr.client.solrj.io.eval.StreamEvaluator;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.junit.Test; import org.junit.Test;
@ -38,7 +40,8 @@ public class AbsoluteValueEvaluatorTest extends LuceneTestCase {
super(); super();
factory = new StreamFactory() factory = new StreamFactory()
.withFunctionName("abs", AbsoluteValueEvaluator.class); .withFunctionName("abs", AbsoluteValueEvaluator.class)
.withFunctionName("add", AddEvaluator.class);
values = new HashMap<String,Object>(); values = new HashMap<String,Object>();
} }
@ -66,6 +69,34 @@ public class AbsoluteValueEvaluatorTest extends LuceneTestCase {
Assert.assertEquals(1.1D, result); Assert.assertEquals(1.1D, result);
} }
@Test
public void absoluteValueFromContext() throws Exception{
StreamEvaluator evaluator = factory.constructEvaluator("abs(a)");
StreamContext context = new StreamContext();
evaluator.setStreamContext(context);
Object result;
context.getLets().put("a", 1);
result = evaluator.evaluate(new Tuple());
Assert.assertTrue(result instanceof Long);
Assert.assertEquals(1L, result);
context.getLets().put("a", 1.1);
result = evaluator.evaluate(new Tuple());
Assert.assertTrue(result instanceof Double);
Assert.assertEquals(1.1D, result);
context.getLets().put("a", -1.1);
result = evaluator.evaluate(new Tuple());
Assert.assertTrue(result instanceof Double);
Assert.assertEquals(1.1D, result);
context.getLets().put("a", factory.constructEvaluator("add(4,-6,34,-56)"));
result = evaluator.evaluate(new Tuple());
Assert.assertTrue(result instanceof Long);
Assert.assertEquals(24L, result);
}
@Test(expected = IOException.class) @Test(expected = IOException.class)
public void absNoField() throws Exception{ public void absNoField() throws Exception{
factory.constructEvaluator("abs()"); factory.constructEvaluator("abs()");