diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java index fdf44c98ee6..d82c8640191 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java @@ -48,6 +48,10 @@ public class Tuple implements Cloneable, MapWriter { public List fieldNames; public Map fieldLabels; + public Tuple(){ + // just an empty tuple + } + public Tuple(Map fields) { if(fields.containsKey("EOF")) { EOF = true; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/SingleValueComparator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/SingleValueComparator.java new file mode 100644 index 00000000000..c429072f439 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/SingleValueComparator.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.comp; + +import java.io.IOException; +import java.util.Map; +import java.util.UUID; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +/** + * An equality Comparator to be used when a stream will only ever return a single field, + * ie, it has no sorted order + **/ +public class SingleValueComparator implements StreamComparator { + + private static final long serialVersionUID = 1; + private UUID comparatorNodeId = UUID.randomUUID(); + + public StreamExpressionParameter toExpression(StreamFactory factory){ + return null; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return null; + } + + public int compare(Tuple leftTuple, Tuple rightTuple) { + return -1; // whatever, just keep everything in same order + } + + @Override + public boolean isDerivedFrom(StreamComparator base){ + // this doesn't sort, so everything else is a match + return true; + } + + @Override + public SingleValueComparator copyAliased(Map aliases){ + return this; + } + + @Override + public StreamComparator append(StreamComparator other){ + return other; + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ComplexEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ComplexEvaluator.java index 59a4653ef5e..ea4c88c85d4 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ComplexEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ComplexEvaluator.java @@ -101,5 +101,12 @@ public abstract class ComplexEvaluator implements StreamEvaluator { public void setStreamContext(StreamContext context) { this.streamContext = context; + + for(StreamEvaluator subEvaluator : subEvaluators){ + subEvaluator.setStreamContext(context); + } + } + public StreamContext getStreamContext(){ + return streamContext; } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FieldEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FieldEvaluator.java index 32514982a84..501e9d54639 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FieldEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FieldEvaluator.java @@ -44,9 +44,24 @@ public class FieldEvaluator extends SimpleEvaluator { } @Override - public Object evaluate(Tuple tuple) { + public Object evaluate(Tuple tuple) throws IOException { Object value = tuple.get(fieldName); + // This is somewhat radical. + // Here, we allow for the use of the context to provide alternative values + // when they are not available in the provided tuple. This means that all + // evaluators can evaluate over both a stream's tuple and the context, and + // can even evaluate over fields from both of them in the same evaluation + if(null == value && null != getStreamContext()){ + value = getStreamContext().getLets().get(fieldName); + + // If what's contained in the context is itself an evaluator then + // we need to evaluate it + if(value instanceof StreamEvaluator){ + value = ((StreamEvaluator)value).evaluate(tuple); + } + } + // if we have an array then convert to an ArrayList // if we have an iterable that is not a list then convert to ArrayList // lists are good to go diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SimpleEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SimpleEvaluator.java index 5ee17150106..4a095f88e6a 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SimpleEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SimpleEvaluator.java @@ -32,5 +32,8 @@ public abstract class SimpleEvaluator implements StreamEvaluator { public void setStreamContext(StreamContext streamContext) { this.streamContext = streamContext; } + public StreamContext getStreamContext(){ + return streamContext; + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/StreamEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/StreamEvaluator.java index 1774c46e286..e82d5d37b73 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/StreamEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/StreamEvaluator.java @@ -27,6 +27,30 @@ import org.apache.solr.client.solrj.io.stream.StreamContext; import org.apache.solr.client.solrj.io.stream.expr.Expressible; public interface StreamEvaluator extends Expressible, Serializable { - Object evaluate(final Tuple tuple) throws IOException; void setStreamContext(StreamContext streamContext); + StreamContext getStreamContext(); + + Object evaluate(final Tuple tuple) throws IOException; + + /** + * Execute the evaluator over lets stored within the StreamContext. This allows + * evaluators to be executed over values calculated elsewhere in the pipeline + * and stored in the {@link StreamContext#getLets() streamContext.lets} + * + * Default implementation just creates a tuple out of all values in the context + * and passes that to {@link StreamEvaluator#evaluate(Tuple)}. + * + * @return Evaluated value + * @throws IOException throw on error during evaluation + */ + default Object evaluateOverContext() throws IOException{ + StreamContext context = getStreamContext(); + if(null != context){ + Tuple contextTuple = new Tuple(context.getLets()); + return evaluate(contextTuple); + } + + return null; + } + } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java index 57c7b76fb17..8a71ae63202 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java @@ -20,10 +20,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; -import java.util.Set; +import java.util.Map.Entry; import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.SingleValueComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.eval.StreamEvaluator; import org.apache.solr.client.solrj.io.stream.expr.Explanation; @@ -40,27 +42,34 @@ public class TupStream extends TupleStream implements Expressible { private static final long serialVersionUID = 1; private StreamContext streamContext; - private Map tupleParams = new HashMap(); + + private Map stringParams = new HashMap<>(); + private Map evaluatorParams = new HashMap<>(); + private Map streamParams = new HashMap<>(); + private boolean finished; public TupStream(StreamExpression expression, StreamFactory factory) throws IOException { List namedParams = factory.getNamedOperands(expression); //Get all the named params - for(StreamExpressionParameter np : namedParams) { - String name = ((StreamExpressionNamedParameter)np).getName(); - StreamExpressionParameter param = ((StreamExpressionNamedParameter)np).getParameter(); + for(StreamExpressionNamedParameter np : namedParams) { + String name = np.getName(); + StreamExpressionParameter param = np.getParameter(); + // we're going to split these up here so we only make the choice once + // order of these in read() doesn't matter if(param instanceof StreamExpressionValue) { - tupleParams.put(name, ((StreamExpressionValue)param).getValue()); - } else { - if (factory.isEvaluator((StreamExpression) param)) { - StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param); - tupleParams.put(name, evaluator); - } else { - TupleStream tupleStream = factory.constructStream((StreamExpression) param); - tupleParams.put(name, tupleStream); - } + stringParams.put(name, ((StreamExpressionValue)param).getValue()); + } else if (factory.isEvaluator((StreamExpression) param)) { + StreamEvaluator evaluator = factory.constructEvaluator((StreamExpression) param); + evaluatorParams.put(name, evaluator); + } else if(factory.isStream((StreamExpression)param)) { + TupleStream tupleStream = factory.constructStream((StreamExpression) param); + streamParams.put(name, tupleStream); + } + else{ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - only string, evaluator, or stream named parameters are supported, but param %d is none of those",expression, name)); } } } @@ -73,6 +82,26 @@ public class TupStream extends TupleStream implements Expressible { private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + // add string based params + for(Entry param : stringParams.entrySet()){ + expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue())); + } + + // add evaluator based params + for(Entry param : evaluatorParams.entrySet()){ + expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue().toExpression(factory))); + } + + // add stream based params + for(Entry param : streamParams.entrySet()){ + if(includeStreams){ + expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), ((Expressible)param.getValue()).toExpression(factory))); + } + else{ + expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), "")); + } + } return expression; } @@ -91,6 +120,15 @@ public class TupStream extends TupleStream implements Expressible { public void setStreamContext(StreamContext context) { this.streamContext = context; + + // also set in evalators and streams + for(StreamEvaluator evaluator : evaluatorParams.values()){ + evaluator.setStreamContext(context); + } + + for(TupleStream stream : streamParams.values()){ + stream.setStreamContext(context); + } } public List children() { @@ -101,59 +139,68 @@ public class TupStream extends TupleStream implements Expressible { public Tuple read() throws IOException { if(finished) { - Map m = new HashMap(); + Map m = new HashMap<>(); m.put("EOF", true); return new Tuple(m); } else { finished = true; - Map map = new HashMap(); - Set> entries = tupleParams.entrySet(); - - for (Map.Entry entry : entries) { - String name = entry.getKey(); - Object o = entry.getValue(); - if (o instanceof TupleStream) { - List tuples = new ArrayList(); - TupleStream tStream = (TupleStream) o; - tStream.setStreamContext(streamContext); - try { - tStream.open(); - TUPLES: - while (true) { - Tuple tuple = tStream.read(); - if (tuple.EOF) { - break TUPLES; - } else { - tuples.add(tuple); - } - } - map.put(name, tuples); - } finally { - tStream.close(); - } - } else if ((o instanceof StreamEvaluator)) { - Tuple eTuple = new Tuple(streamContext.getLets()); - StreamEvaluator evaluator = (StreamEvaluator) o; - Object eo = evaluator.evaluate(eTuple); - map.put(name, eo); - } else { - map.put(name, streamContext.getLets().get(o.toString())); + Map values = new HashMap<>(); + + // add all string based params + // these could come from the context, or they will just be treated as straight strings + for(Entry param : stringParams.entrySet()){ + if(streamContext.getLets().containsKey(param.getValue())){ + values.put(param.getKey(), streamContext.getLets().get(param.getValue())); + } + else{ + values.put(param.getKey(), param.getValue()); } } - return new Tuple(map); + + // add all evaluators + for(Entry param : evaluatorParams.entrySet()){ + values.put(param.getKey(), param.getValue().evaluateOverContext()); + } + + // Add all streams + for(Entry param : streamParams.entrySet()){ + + try{ + List streamTuples = new ArrayList(); + + // open the stream, closed in finally block + param.getValue().open(); + + // read all values from stream (memory expensive) + Tuple streamTuple = param.getValue().read(); + while(!streamTuple.EOF){ + streamTuples.add(streamTuple); + streamTuple = param.getValue().read(); + } + + values.put(param.getKey(), streamTuples); + } + finally{ + // safely close the stream + param.getValue().close(); + } + } + + return new Tuple(values); } } public void close() throws IOException { + // Nothing to do here } public void open() throws IOException { - + // nothing to do here } /** Return the stream sort - ie, the order in which records are returned */ public StreamComparator getStreamSort(){ - return null; + return new SingleValueComparator(); } public int getCost() { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java index f03bf489c8b..703acf4118d 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java @@ -377,6 +377,18 @@ public class StreamFactory implements Serializable { throw new IOException(String.format(Locale.ROOT,"Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName())); } + public boolean isStream(StreamExpression expression) throws IOException{ + String function = expression.getFunctionName(); + if(functionNames.containsKey(function)){ + Class clazz = functionNames.get(function); + if(Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){ + return true; + } + } + + return false; + } + public boolean isEvaluator(StreamExpression expression) throws IOException{ String function = expression.getFunctionName(); if(functionNames.containsKey(function)){ diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java index ff2384c6d92..370a3e8bb52 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/eval/AbsoluteValueEvaluatorTest.java @@ -23,7 +23,9 @@ import java.util.Map; import org.apache.lucene.util.LuceneTestCase; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.eval.AbsoluteValueEvaluator; +import org.apache.solr.client.solrj.io.eval.AddEvaluator; import org.apache.solr.client.solrj.io.eval.StreamEvaluator; +import org.apache.solr.client.solrj.io.stream.StreamContext; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.junit.Test; @@ -38,7 +40,8 @@ public class AbsoluteValueEvaluatorTest extends LuceneTestCase { super(); factory = new StreamFactory() - .withFunctionName("abs", AbsoluteValueEvaluator.class); + .withFunctionName("abs", AbsoluteValueEvaluator.class) + .withFunctionName("add", AddEvaluator.class); values = new HashMap(); } @@ -65,6 +68,34 @@ public class AbsoluteValueEvaluatorTest extends LuceneTestCase { Assert.assertTrue(result instanceof Double); Assert.assertEquals(1.1D, result); } + + @Test + public void absoluteValueFromContext() throws Exception{ + StreamEvaluator evaluator = factory.constructEvaluator("abs(a)"); + StreamContext context = new StreamContext(); + evaluator.setStreamContext(context); + Object result; + + context.getLets().put("a", 1); + result = evaluator.evaluate(new Tuple()); + Assert.assertTrue(result instanceof Long); + Assert.assertEquals(1L, result); + + context.getLets().put("a", 1.1); + result = evaluator.evaluate(new Tuple()); + Assert.assertTrue(result instanceof Double); + Assert.assertEquals(1.1D, result); + + context.getLets().put("a", -1.1); + result = evaluator.evaluate(new Tuple()); + Assert.assertTrue(result instanceof Double); + Assert.assertEquals(1.1D, result); + + context.getLets().put("a", factory.constructEvaluator("add(4,-6,34,-56)")); + result = evaluator.evaluate(new Tuple()); + Assert.assertTrue(result instanceof Long); + Assert.assertEquals(24L, result); + } @Test(expected = IOException.class) public void absNoField() throws Exception{