diff --git a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java index 8edd44e0253..f326b98eec2 100644 --- a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java @@ -32,7 +32,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Iterables; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.ComparatorOrder; -import org.apache.solr.client.solrj.io.comp.FieldComparator; +import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.comp.MultiComp; import org.apache.solr.client.solrj.io.stream.CloudSolrStream; import org.apache.solr.client.solrj.io.stream.ParallelStream; @@ -316,7 +316,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { for(int i=0; i functionMappings = (NamedList)functionMappingsObj; for(Entry functionMapping : functionMappings){ - Class clazz = core.getResourceLoader().findClass((String)functionMapping.getValue(), ExpressibleStream.class); + Class clazz = core.getResourceLoader().findClass((String)functionMapping.getValue(), Expressible.class); streamFactory.withStreamFunction(functionMapping.getKey(), clazz); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java deleted file mode 100644 index 8b63ec2dfb6..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.client.solrj.io.comp; - -import java.io.Serializable; -import java.util.Comparator; - -import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; -import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; - -/** - * An equality field Comparator which compares a field of two Tuples and determines sort order. - **/ -public class FieldComparator extends StreamComparator implements Comparator, ExpressibleComparator, Serializable { - - private static final long serialVersionUID = 1; - private ComparatorLambda comparator; - - public FieldComparator(String field, ComparatorOrder order) { - super(field, order); - assignComparator(); - } - public FieldComparator(String leftField, String rightField, ComparatorOrder order){ - super(leftField,rightField,order); - assignComparator(); - } - - public StreamExpressionParameter toExpression(StreamFactory factory){ - StringBuilder sb = new StringBuilder(); - - sb.append(leftField); - - if(!leftField.equals(rightField)){ - sb.append("="); - sb.append(rightField); - } - - sb.append(" "); - sb.append(order); - - return new StreamExpressionValue(sb.toString()); - } - - /* - * What're we doing here messing around with lambdas for the comparator logic? - * We want the compare(...) function to run as fast as possible because it will be called many many - * times over the lifetime of this object. For that reason we want to limit the number of comparisons - * taking place in the compare(...) function. Because this class supports both ascending and - * descending comparisons and the logic for each is slightly different, we want to do the - * if(ascending){ compare like this } else { compare like this } - * check only once - we can do that in the constructor of this class, create a lambda, and then execute - * that lambda in the compare function. A little bit of branch prediction savings right here. - */ - private void assignComparator(){ - if(ComparatorOrder.DESCENDING == order){ - comparator = new ComparatorLambda() { - public int compare(Tuple leftTuple, Tuple rightTuple) { - Comparable leftComp = (Comparable)leftTuple.get(leftField); - Comparable rightComp = (Comparable)rightTuple.get(rightField); - return rightComp.compareTo(leftComp); - } - }; - } - else{ - comparator = new ComparatorLambda() { - public int compare(Tuple leftTuple, Tuple rightTuple) { - Comparable leftComp = (Comparable)leftTuple.get(leftField); - Comparable rightComp = (Comparable)rightTuple.get(rightField); - return leftComp.compareTo(rightComp); - } - }; - } - } - - public int compare(Tuple leftTuple, Tuple rightTuple) { - return comparator.compare(leftTuple, rightTuple); - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java index ef37eaeac47..3b3bc112134 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultiComp.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.Comparator; import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; @@ -31,7 +32,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; * Wraps multiple Comparators to provide sub-sorting. **/ -public class MultiComp implements Comparator, ExpressibleComparator, Serializable { +public class MultiComp implements Comparator, Expressible, Serializable { private static final long serialVersionUID = 1; @@ -56,9 +57,9 @@ public class MultiComp implements Comparator, ExpressibleComparator, Seri public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { StringBuilder sb = new StringBuilder(); for(Comparator comp : comps){ - if(comp instanceof ExpressibleComparator){ + if(comp instanceof Expressible){ if(sb.length() > 0){ sb.append(","); } - sb.append(((ExpressibleComparator)comp).toExpression(factory)); + sb.append(((Expressible)comp).toExpression(factory)); } else{ throw new IOException("This MultiComp contains a non-expressible comparator - it cannot be converted to an expression"); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java index 66c35d322f1..0bf7e3a2eaa 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java @@ -1,12 +1,3 @@ -package org.apache.solr.client.solrj.io.comp; - -import java.io.Serializable; -import java.util.Comparator; - -import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; -import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -24,22 +15,103 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; * limitations under the License. */ -/** - * Defines a comparator that can be expressed in an expression - */ -public abstract class StreamComparator implements Comparator, Serializable { - protected String leftField; - protected String rightField; - protected final ComparatorOrder order; +package org.apache.solr.client.solrj.io.comp; +import java.io.Serializable; +import java.util.Comparator; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +/** + * An equality field Comparator which compares a field of two Tuples and determines sort order. + **/ +public class StreamComparator implements Comparator, Expressible, Serializable { + + private static final long serialVersionUID = 1; + + private String leftField; + private String rightField; + private final ComparatorOrder order; + private ComparatorLambda comparator; + public StreamComparator(String field, ComparatorOrder order) { this.leftField = field; this.rightField = field; this.order = order; + assignComparator(); } public StreamComparator(String leftField, String rightField, ComparatorOrder order){ this.leftField = leftField; this.rightField = rightField; this.order = order; + assignComparator(); } -} + + public StreamExpressionParameter toExpression(StreamFactory factory){ + StringBuilder sb = new StringBuilder(); + + sb.append(leftField); + + if(!leftField.equals(rightField)){ + sb.append("="); + sb.append(rightField); + } + + sb.append(" "); + sb.append(order); + + return new StreamExpressionValue(sb.toString()); + } + + /* + * What're we doing here messing around with lambdas for the comparator logic? + * We want the compare(...) function to run as fast as possible because it will be called many many + * times over the lifetime of this object. For that reason we want to limit the number of comparisons + * taking place in the compare(...) function. Because this class supports both ascending and + * descending comparisons and the logic for each is slightly different, we want to do the + * if(ascending){ compare like this } else { compare like this } + * check only once - we can do that in the constructor of this class, create a lambda, and then execute + * that lambda in the compare function. A little bit of branch prediction savings right here. + */ + private void assignComparator(){ + if(ComparatorOrder.DESCENDING == order){ + comparator = new ComparatorLambda() { + @Override + public int compare(Tuple leftTuple, Tuple rightTuple) { + Comparable leftComp = (Comparable)leftTuple.get(leftField); + Comparable rightComp = (Comparable)rightTuple.get(rightField); + + if(leftComp == rightComp){ return 0; } // if both null then they are equal. if both are same ref then are equal + if(null == leftComp){ return 1; } + if(null == rightComp){ return -1; } + + return rightComp.compareTo(leftComp); + } + }; + } + else{ + // See above for black magic reasoning. + comparator = new ComparatorLambda() { + @Override + public int compare(Tuple leftTuple, Tuple rightTuple) { + Comparable leftComp = (Comparable)leftTuple.get(leftField); + Comparable rightComp = (Comparable)rightTuple.get(rightField); + + if(leftComp == rightComp){ return 0; } // if both null then they are equal. if both are same ref then are equal + if(null == leftComp){ return -1; } + if(null == rightComp){ return 1; } + + return leftComp.compareTo(rightComp); + } + }; + } + } + + public int compare(Tuple leftTuple, Tuple rightTuple) { + return comparator.compare(leftTuple, rightTuple); + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ExpressibleComparator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/Equalitor.java similarity index 61% rename from solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ExpressibleComparator.java rename to solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/Equalitor.java index 57a697c5fc1..8d2c2888a4d 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/ExpressibleComparator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/Equalitor.java @@ -1,12 +1,5 @@ -package org.apache.solr.client.solrj.io.comp; +package org.apache.solr.client.solrj.io.eq; -import java.io.IOException; -import java.io.Serializable; -import java.util.Comparator; - -import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; -import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; /* * Licensed to the Apache Software Foundation (ASF) under one or more @@ -26,8 +19,12 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; */ /** - * Defines a comparator that can be expressed in an expression + * Interface defining a way to determine if two items are equal + * + * This borrows from Java 8's BiPredicate interface but to keep Java 7 compatible + * we will not use that interface directory. We will use the test method, however, + * so that future refactoring for Java 8 is simplified. */ -public interface ExpressibleComparator { - StreamExpressionParameter toExpression(StreamFactory factory) throws IOException; +public interface Equalitor { + public boolean test(T left, T right); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultiEqualitor.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultiEqualitor.java new file mode 100644 index 00000000000..0791cbcbaba --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultiEqualitor.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.eq; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Comparator; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + + +/** + * Wraps multiple Equalitors. + **/ + +public class MultiEqualitor implements Equalitor, Expressible, Serializable { + + private static final long serialVersionUID = 1; + + private Equalitor[] eqs; + + public MultiEqualitor(Equalitor... eqs) { + this.eqs = eqs; + } + + public boolean test(Tuple t1, Tuple t2) { + for(Equalitor eq : eqs) { + if(!eq.test(t1, t2)){ + return false; + } + } + + return true; + } + + @Override + public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + StringBuilder sb = new StringBuilder(); + for(Equalitor eq : eqs){ + if(eq instanceof Expressible){ + if(sb.length() > 0){ sb.append(","); } + sb.append(((Expressible)eq).toExpression(factory)); + } + else{ + throw new IOException("This MultiEqualitor contains a non-expressible equalitor - it cannot be converted to an expression"); + } + } + + return new StreamExpressionValue(sb.toString()); + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/StreamEqualitor.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/StreamEqualitor.java new file mode 100644 index 00000000000..4b495b0178c --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/StreamEqualitor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.eq; + +import java.io.Serializable; +import java.util.Comparator; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.ComparatorOrder; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +/** + * An equality field Equalitor which compares a field of two Tuples and determines if they are equal. + **/ +public class StreamEqualitor implements Equalitor, Expressible, Serializable { + + private static final long serialVersionUID = 1; + + private String leftFieldName; + private String rightFieldName; + private StreamComparator comparator; + + public StreamEqualitor(String fieldName) { + init(fieldName, fieldName); + } + public StreamEqualitor(String leftFieldName, String rightFieldName){ + init(leftFieldName, rightFieldName); + } + + private void init(String leftFieldName, String rightFieldName){ + this.leftFieldName = leftFieldName; + this.rightFieldName = rightFieldName; + this.comparator = new StreamComparator(leftFieldName, rightFieldName, ComparatorOrder.ASCENDING); + } + + public StreamExpressionParameter toExpression(StreamFactory factory){ + StringBuilder sb = new StringBuilder(); + + sb.append(leftFieldName); + + if(!leftFieldName.equals(rightFieldName)){ + sb.append("="); + sb.append(rightFieldName); + } + + return new StreamExpressionValue(sb.toString()); + } + + public boolean test(Tuple leftTuple, Tuple rightTuple) { + return 0 == comparator.compare(leftTuple, rightTuple); + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/package-info.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/package-info.java new file mode 100644 index 00000000000..a28e5104a25 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Primary APIs for communicating with a Solr Server from a Java client. + **/ +package org.apache.solr.client.solrj.io.eq; \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java index 63b5aa006d5..dd87a7b9dbd 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java @@ -38,8 +38,9 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.ComparatorOrder; -import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.MultiComp; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; @@ -60,7 +61,7 @@ import org.apache.solr.common.util.SolrjNamedThreadFactory; * to iterate and merge Tuples from each SolrStream. **/ -public class CloudSolrStream extends TupleStream implements ExpressibleStream { +public class CloudSolrStream extends TupleStream implements Expressible { private static final long serialVersionUID = 1; @@ -268,7 +269,7 @@ public class CloudSolrStream extends TupleStream implements ExpressibleStream { fieldName = fieldMappings.get(fieldName); } - comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING); + comps[i] = new StreamComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING); } if(comps.length > 1) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java index acb41fdf980..04ef3f9576f 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java @@ -18,15 +18,14 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.ArrayList; import java.util.Locale; -import java.util.Map.Entry; import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.comp.FieldComparator; -import org.apache.solr.client.solrj.io.comp.ExpressibleComparator; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -38,7 +37,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; **/ -public class MergeStream extends TupleStream implements ExpressibleStream { +public class MergeStream extends TupleStream implements Expressible { private static final long serialVersionUID = 1; @@ -54,7 +53,7 @@ public class MergeStream extends TupleStream implements ExpressibleStream { public MergeStream(StreamExpression expression,StreamFactory factory) throws IOException { // grab all parameters out - List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class); + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); StreamExpressionNamedParameter onExpression = factory.getNamedOperand(expression, "on"); // validate expression contains only what we want. @@ -73,7 +72,7 @@ public class MergeStream extends TupleStream implements ExpressibleStream { } // Merge is always done over equality, so always use an EqualTo comparator - this.comp = factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldComparator.class); + this.comp = factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), StreamComparator.class); } @Override @@ -86,8 +85,8 @@ public class MergeStream extends TupleStream implements ExpressibleStream { expression.addParameter(streamB.toExpression(factory)); // on - if(comp instanceof ExpressibleComparator){ - expression.addParameter(new StreamExpressionNamedParameter("on",((ExpressibleComparator)comp).toExpression(factory))); + if(comp instanceof Expressible){ + expression.addParameter(new StreamExpressionNamedParameter("on",((Expressible)comp).toExpression(factory))); } else{ throw new IOException("This MergeStream contains a non-expressible comparator - it cannot be converted to an expression"); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java index d607a6a68dc..2db6b3ece26 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java @@ -17,25 +17,25 @@ package org.apache.solr.client.solrj.io.stream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.net.URLEncoder; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; -import java.io.ByteArrayOutputStream; import java.util.Random; import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.comp.FieldComparator; -import org.apache.solr.client.solrj.io.comp.ExpressibleComparator; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -54,7 +54,7 @@ import org.apache.solr.common.util.Base64; **/ -public class ParallelStream extends CloudSolrStream implements ExpressibleStream { +public class ParallelStream extends CloudSolrStream implements Expressible { private TupleStream tupleStream; private int workers; @@ -85,7 +85,7 @@ public class ParallelStream extends CloudSolrStream implements ExpressibleStream objectSerialize = false; String collectionName = factory.getValueOperand(expression, 0); StreamExpressionNamedParameter workersParam = factory.getNamedOperand(expression, "workers"); - List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class); + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); StreamExpressionNamedParameter sortExpression = factory.getNamedOperand(expression, "sort"); StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost"); @@ -140,7 +140,7 @@ public class ParallelStream extends CloudSolrStream implements ExpressibleStream // We've got all the required items TupleStream stream = factory.constructStream(streamExpressions.get(0)); - Comparator comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), FieldComparator.class); + Comparator comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), StreamComparator.class); streamFactory = factory; init(zkHost,collectionName,stream,workersInt,comp); } @@ -153,7 +153,7 @@ public class ParallelStream extends CloudSolrStream implements ExpressibleStream this.tupleStream = tupleStream; // requires Expressible stream and comparator - if(!objectSerialize && !(tupleStream instanceof ExpressibleStream)){ + if(!objectSerialize && !(tupleStream instanceof Expressible)){ throw new IOException("Unable to create ParallelStream with a non-expressible TupleStream."); } } @@ -171,16 +171,16 @@ public class ParallelStream extends CloudSolrStream implements ExpressibleStream expression.addParameter(new StreamExpressionNamedParameter("workers", Integer.toString(workers))); // stream - if(tupleStream instanceof ExpressibleStream){ - expression.addParameter(((ExpressibleStream)tupleStream).toExpression(factory)); + if(tupleStream instanceof Expressible){ + expression.addParameter(((Expressible)tupleStream).toExpression(factory)); } else{ throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression"); } // sort - if(comp instanceof ExpressibleComparator){ - expression.addParameter(new StreamExpressionNamedParameter("sort",((ExpressibleComparator)comp).toExpression(factory))); + if(comp instanceof Expressible){ + expression.addParameter(new StreamExpressionNamedParameter("sort",((Expressible)comp).toExpression(factory))); } else{ throw new IOException("This ParallelStream contains a non-expressible comparator - it cannot be converted to an expression"); @@ -241,7 +241,7 @@ public class ParallelStream extends CloudSolrStream implements ExpressibleStream String encoded = Base64.byteArrayToBase64(bytes, 0, bytes.length); pushStream = URLEncoder.encode(encoded, "UTF-8"); } else { - pushStream = ((ExpressibleStream) tupleStream).toExpression(streamFactory); + pushStream = ((Expressible) tupleStream).toExpression(streamFactory); } ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java index f3dc9eaa9ae..3a21842b84a 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; @@ -44,8 +44,8 @@ public class PushBackStream extends TupleStream { } public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException{ - if(stream instanceof ExpressibleStream){ - return ((ExpressibleStream)stream).toExpression(factory); + if(stream instanceof Expressible){ + return ((Expressible)stream).toExpression(factory); } throw new IOException("This PushBackStream contains a non-expressible TupleStream - it cannot be converted to an expression"); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java index e613492b865..114525027bf 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java @@ -27,8 +27,8 @@ import java.util.Locale; import java.util.PriorityQueue; import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.comp.FieldComparator; -import org.apache.solr.client.solrj.io.comp.ExpressibleComparator; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -39,7 +39,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; * Iterates over a TupleStream and Ranks the topN tuples based on a Comparator. **/ -public class RankStream extends TupleStream implements ExpressibleStream { +public class RankStream extends TupleStream implements Expressible { private static final long serialVersionUID = 1; @@ -56,7 +56,7 @@ public class RankStream extends TupleStream implements ExpressibleStream { public RankStream(StreamExpression expression, StreamFactory factory) throws IOException { // grab all parameters out - List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class); + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); StreamExpressionNamedParameter nParam = factory.getNamedOperand(expression, "n"); StreamExpressionNamedParameter sortExpression = factory.getNamedOperand(expression, "sort"); @@ -87,7 +87,7 @@ public class RankStream extends TupleStream implements ExpressibleStream { } TupleStream stream = factory.constructStream(streamExpressions.get(0)); - Comparator comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), FieldComparator.class); + Comparator comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), StreamComparator.class); init(stream,nInt,comp); } @@ -107,16 +107,16 @@ public class RankStream extends TupleStream implements ExpressibleStream { expression.addParameter(new StreamExpressionNamedParameter("n", Integer.toString(size))); // stream - if(tupleStream instanceof ExpressibleStream){ - expression.addParameter(((ExpressibleStream)tupleStream).toExpression(factory)); + if(tupleStream instanceof Expressible){ + expression.addParameter(((Expressible)tupleStream).toExpression(factory)); } else{ throw new IOException("This RankStream contains a non-expressible TupleStream - it cannot be converted to an expression"); } // sort - if(comp instanceof ExpressibleComparator){ - expression.addParameter(new StreamExpressionNamedParameter("sort",((ExpressibleComparator)comp).toExpression(factory))); + if(comp instanceof Expressible){ + expression.addParameter(new StreamExpressionNamedParameter("sort",((Expressible)comp).toExpression(factory))); } else{ throw new IOException("This RankStream contains a non-expressible comparator - it cannot be converted to an expression"); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java index bd95633015f..2bb39ea31fa 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java @@ -18,16 +18,16 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; -import java.util.List; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.HashMap; import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.comp.FieldComparator; -import org.apache.solr.client.solrj.io.comp.ExpressibleComparator; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -48,7 +48,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; * **/ -public class ReducerStream extends TupleStream implements ExpressibleStream { +public class ReducerStream extends TupleStream implements Expressible { private static final long serialVersionUID = 1; @@ -65,7 +65,7 @@ public class ReducerStream extends TupleStream implements ExpressibleStream { public ReducerStream(StreamExpression expression, StreamFactory factory) throws IOException{ // grab all parameters out - List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class); + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); StreamExpressionNamedParameter byExpression = factory.getNamedOperand(expression, "by"); // validate expression contains only what we want. @@ -83,7 +83,7 @@ public class ReducerStream extends TupleStream implements ExpressibleStream { } // Reducing is always done over equality, so always use an EqualTo comparator - this.comp = factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldComparator.class); + this.comp = factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), StreamComparator.class); } @Override @@ -95,8 +95,8 @@ public class ReducerStream extends TupleStream implements ExpressibleStream { expression.addParameter(tupleStream.toExpression(factory)); // over - if(comp instanceof ExpressibleComparator){ - expression.addParameter(new StreamExpressionNamedParameter("by",((ExpressibleComparator)comp).toExpression(factory))); + if(comp instanceof Expressible){ + expression.addParameter(new StreamExpressionNamedParameter("by",((Expressible)comp).toExpression(factory))); } else{ throw new IOException("This ReducerStream contains a non-expressible comparator - it cannot be converted to an expression"); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java index 132a2e4c653..bd728eae6cf 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java @@ -24,8 +24,10 @@ import java.util.List; import java.util.Locale; import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.comp.FieldComparator; -import org.apache.solr.client.solrj.io.comp.ExpressibleComparator; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.eq.Equalitor; +import org.apache.solr.client.solrj.io.eq.StreamEqualitor; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -38,22 +40,22 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; * Note: The sort order of the underlying stream must match the Comparator. **/ -public class UniqueStream extends TupleStream implements ExpressibleStream { +public class UniqueStream extends TupleStream implements Expressible { private static final long serialVersionUID = 1; private TupleStream tupleStream; - private Comparator comp; + private Equalitor eq; private transient Tuple currentTuple; - public UniqueStream(TupleStream tupleStream, Comparator comp) { + public UniqueStream(TupleStream tupleStream, Equalitor eq) { this.tupleStream = tupleStream; - this.comp = comp; + this.eq = eq; } public UniqueStream(StreamExpression expression,StreamFactory factory) throws IOException { // grab all parameters out - List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class); + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); StreamExpressionNamedParameter overExpression = factory.getNamedOperand(expression, "over"); // validate expression contains only what we want. @@ -71,7 +73,7 @@ public class UniqueStream extends TupleStream implements ExpressibleStream { } // Uniqueness is always done over equality, so always use an EqualTo comparator - this.comp = factory.constructComparator(((StreamExpressionValue)overExpression.getParameter()).getValue(), FieldComparator.class); + this.eq = factory.constructEqualitor(((StreamExpressionValue)overExpression.getParameter()).getValue(), StreamEqualitor.class); } @Override @@ -80,28 +82,24 @@ public class UniqueStream extends TupleStream implements ExpressibleStream { StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); // streams - if(tupleStream instanceof ExpressibleStream){ - expression.addParameter(((ExpressibleStream)tupleStream).toExpression(factory)); + if(tupleStream instanceof Expressible){ + expression.addParameter(((Expressible)tupleStream).toExpression(factory)); } else{ throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression"); } // over - if(comp instanceof ExpressibleComparator){ - expression.addParameter(new StreamExpressionNamedParameter("over",((ExpressibleComparator)comp).toExpression(factory))); + if(eq instanceof Expressible){ + expression.addParameter(new StreamExpressionNamedParameter("over",((Expressible)eq).toExpression(factory))); } else{ - throw new IOException("This UniqueStream contains a non-expressible comparator - it cannot be converted to an expression"); + throw new IOException("This UniqueStream contains a non-expressible equalitor - it cannot be converted to an expression"); } return expression; } - - public void setComp(Comparator comp) { - this.comp = comp; - } - + public void setStreamContext(StreamContext context) { this.tupleStream.setStreamContext(context); } @@ -131,8 +129,7 @@ public class UniqueStream extends TupleStream implements ExpressibleStream { return tuple; } else { while(true) { - int i = comp.compare(currentTuple, tuple); - if(i == 0) { + if(eq.test(currentTuple, tuple)){ //We have duplicate tuple so read the next tuple from the stream. tuple = tupleStream.read(); if(tuple.EOF) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExpressibleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/Expressible.java similarity index 81% rename from solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExpressibleStream.java rename to solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/Expressible.java index d6b9bc62ccb..1543d2c34e4 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExpressibleStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/Expressible.java @@ -1,10 +1,7 @@ -package org.apache.solr.client.solrj.io.stream; +package org.apache.solr.client.solrj.io.stream.expr; import java.io.IOException; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; -import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -25,6 +22,6 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; /** * Defines a stream that can be expressed in an expression */ -public interface ExpressibleStream { +public interface Expressible { StreamExpressionParameter toExpression(StreamFactory factory) throws IOException; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java index ba16740a925..f4dec674518 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java @@ -14,9 +14,9 @@ import java.util.Map.Entry; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.ComparatorOrder; -import org.apache.solr.client.solrj.io.comp.ExpressibleComparator; import org.apache.solr.client.solrj.io.comp.MultiComp; -import org.apache.solr.client.solrj.io.stream.ExpressibleStream; +import org.apache.solr.client.solrj.io.eq.Equalitor; +import org.apache.solr.client.solrj.io.eq.MultiEqualitor; import org.apache.solr.client.solrj.io.stream.TupleStream; /* @@ -170,7 +170,7 @@ public class StreamFactory implements Serializable { String function = expression.getFunctionName(); if(streamFunctions.containsKey(function)){ Class clazz = streamFunctions.get(function); - if(ExpressibleStream.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){ + if(Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){ TupleStream stream = (TupleStream)createInstance(streamFunctions.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this}); return stream; } @@ -201,6 +201,36 @@ public class StreamFactory implements Serializable { } } + public Equalitor constructEqualitor(String equalitorString, Class equalitorType) throws IOException { + if(equalitorString.contains(",")){ + String[] parts = equalitorString.split(","); + Equalitor[] eqs = new Equalitor[parts.length]; + for(int idx = 0; idx < parts.length; ++idx){ + eqs[idx] = constructEqualitor(parts[idx].trim(), equalitorType); + } + return new MultiEqualitor(eqs); + } + else{ + String leftFieldName; + String rightFieldName; + + if(equalitorString.contains("=")){ + String[] parts = equalitorString.split("="); + if(2 != parts.length){ + throw new IOException(String.format(Locale.ROOT,"Invalid equalitor expression %s - expecting fieldName=fieldName",equalitorString)); + } + + leftFieldName = parts[0].trim(); + rightFieldName = parts[1].trim(); + } + else{ + leftFieldName = rightFieldName = equalitorString.trim(); + } + + return (Equalitor)createInstance(equalitorType, new Class[]{ String.class, String.class }, new Object[]{ leftFieldName, rightFieldName }); + } + } + public T createInstance(Class clazz, Class[] paramTypes, Object[] params) throws IOException{ // This should use SolrResourceLoader - TODO // This is adding a restriction that the class has a public constructor - we may not want to do that diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/package-info.java b/solr/solrj/src/java/org/apache/solr/client/solrj/package-info.java index b88918ed56c..70a83dd76e2 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/package-info.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/package-info.java @@ -15,9 +15,9 @@ * limitations under the License. */ -/** +/** * Primary APIs for communicating with a Solr Server from a Java client. - */ + **/ package org.apache.solr.client.solrj; diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java index 58da40727ca..f5cfbaec924 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CountStream.java @@ -23,11 +23,12 @@ import java.util.List; import java.util.Locale; import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; -public class CountStream extends TupleStream implements ExpressibleStream, Serializable { +public class CountStream extends TupleStream implements Expressible, Serializable { private TupleStream stream; private int count; @@ -37,7 +38,7 @@ public class CountStream extends TupleStream implements ExpressibleStream, Seria } public CountStream(StreamExpression expression, StreamFactory factory) throws IOException{ - List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ExpressibleStream.class, TupleStream.class); + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); // validate expression contains only what we want. if(expression.getParameters().size() != streamExpressions.size()){ @@ -57,8 +58,8 @@ public class CountStream extends TupleStream implements ExpressibleStream, Seria StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); // stream - if(stream instanceof ExpressibleStream){ - expression.addParameter(((ExpressibleStream)stream).toExpression(factory)); + if(stream instanceof Expressible){ + expression.addParameter(((Expressible)stream).toExpression(factory)); } else{ throw new IOException("This CountStream contains a non-expressible TupleStream - it cannot be converted to an expression"); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index bcd46c0d806..8b714e2f622 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -236,7 +236,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { .withStreamFunction("unique", UniqueStream.class); // Basic test - expression = StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f asc\")"); + expression = StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f\")"); stream = new UniqueStream(expression, factory); tuples = getTuples(stream); @@ -244,7 +244,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { assertOrder(tuples, 0, 1, 3, 4); // Basic test desc - expression = StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"), over=\"a_f desc\")"); + expression = StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"), over=\"a_f\")"); stream = new UniqueStream(expression, factory); tuples = getTuples(stream); @@ -252,7 +252,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { assertOrder(tuples, 4,3,1,2); // Basic w/multi comp - expression = StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f asc, a_i asc\")"); + expression = StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")"); stream = new UniqueStream(expression, factory); tuples = getTuples(stream); @@ -260,7 +260,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { assertOrder(tuples, 0,2,1,3,4); // full factory w/multi comp - stream = factory.constructStream("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f asc, a_i asc\")"); + stream = factory.constructStream("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f, a_i\")"); tuples = getTuples(stream); assert(tuples.size() == 5); @@ -371,7 +371,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { + "n=2," + "unique(" + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\")," - + "over=\"a_f desc\")," + + "over=\"a_f\")," + "sort=\"a_f desc\")"); stream = new RankStream(expression, factory); tuples = getTuples(stream); @@ -384,7 +384,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { + "n=4," + "unique(" + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")," - + "over=\"a_f asc\")," + + "over=\"a_f\")," + "sort=\"a_f asc\")"); tuples = getTuples(stream); @@ -491,7 +491,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { .withStreamFunction("group", ReducerStream.class) .withStreamFunction("parallel", ParallelStream.class); - ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f asc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_f asc\")"); + ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_f asc\")"); List tuples = getTuples(pstream); assert(tuples.size() == 5); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java index 4a0408c0c9e..35ef764448d 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java @@ -19,25 +19,19 @@ package org.apache.solr.client.solrj.io.stream; import java.io.File; import java.io.IOException; -import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.HashMap; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.ComparatorOrder; -import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.MultiComp; -import org.apache.solr.client.solrj.io.stream.CloudSolrStream; -import org.apache.solr.client.solrj.io.stream.MergeStream; -import org.apache.solr.client.solrj.io.stream.ParallelStream; -import org.apache.solr.client.solrj.io.stream.RankStream; -import org.apache.solr.client.solrj.io.stream.ReducerStream; -import org.apache.solr.client.solrj.io.stream.TupleStream; -import org.apache.solr.client.solrj.io.stream.UniqueStream; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.eq.StreamEqualitor; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.client.solrj.io.stream.metrics.Bucket; import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; @@ -55,9 +49,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import java.util.List; -import java.util.ArrayList; - /** * All base tests will be done with CloudSolrStream. Under the covers CloudSolrStream uses SolrStream so * SolrStream will get fully exercised through these tests. @@ -148,7 +139,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); - UniqueStream ustream = new UniqueStream(stream, new FieldComparator("a_f",ComparatorOrder.ASCENDING)); + UniqueStream ustream = new UniqueStream(stream, new StreamEqualitor("a_f")); List tuples = getTuples(ustream); assert(tuples.size() == 4); assertOrder(tuples, 0,1,3,4); @@ -197,7 +188,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { Map paramsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); - ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING)); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new StreamComparator("a_s",ComparatorOrder.ASCENDING)); List tuples = getTuples(pstream); @@ -230,8 +221,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc", "partitionKeys", "a_f"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); - UniqueStream ustream = new UniqueStream(stream, new FieldComparator("a_f",ComparatorOrder.ASCENDING)); - ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new FieldComparator("a_f",ComparatorOrder.ASCENDING)); + UniqueStream ustream = new UniqueStream(stream, new StreamEqualitor("a_f")); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new StreamComparator("a_f",ComparatorOrder.ASCENDING)); List tuples = getTuples(pstream); assert(tuples.size() == 5); assertOrder(tuples, 0,1,3,4,6); @@ -264,7 +255,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); - RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i",ComparatorOrder.DESCENDING)); + RankStream rstream = new RankStream(stream, 3, new StreamComparator("a_i",ComparatorOrder.DESCENDING)); List tuples = getTuples(rstream); @@ -296,8 +287,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); - RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING)); - ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING)); + RankStream rstream = new RankStream(stream, 11, new StreamComparator("a_i",ComparatorOrder.DESCENDING)); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_i",ComparatorOrder.DESCENDING)); List tuples = getTuples(pstream); assert(tuples.size() == 10); @@ -363,7 +354,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { //Test with spaces in the parameter lists. Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); - ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING)); + ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s",ComparatorOrder.ASCENDING)); List tuples = getTuples(rstream); @@ -410,7 +401,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { //Test with spaces in the parameter lists. Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); - ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING)); + ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s", ComparatorOrder.ASCENDING)); List tuples = getTuples(rstream); @@ -441,8 +432,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); - ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING)); - ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING)); + ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s",ComparatorOrder.ASCENDING)); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_s",ComparatorOrder.ASCENDING)); List tuples = getTuples(pstream); @@ -465,8 +456,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s"); stream = new CloudSolrStream(zkHost, "collection1", paramsA); - rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.DESCENDING)); - pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING)); + rstream = new ReducerStream(stream, new StreamComparator("a_s",ComparatorOrder.DESCENDING)); + pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_s",ComparatorOrder.DESCENDING)); tuples = getTuples(pstream); @@ -648,7 +639,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { new CountMetric()}; RollupStream rollupStream = new RollupStream(stream, buckets, metrics); - ParallelStream parallelStream = new ParallelStream(zkHost, "collection1", rollupStream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING)); + ParallelStream parallelStream = new ParallelStream(zkHost, "collection1", rollupStream, 2, new StreamComparator("a_s", ComparatorOrder.ASCENDING)); List tuples = getTuples(parallelStream); assert(tuples.size() == 3); @@ -748,8 +739,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); - ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING)); - ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING)); + ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s", ComparatorOrder.ASCENDING)); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_s", ComparatorOrder.ASCENDING)); List tuples = getTuples(pstream); assert(tuples.size() == 0); @@ -818,7 +809,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { Map paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i asc"); CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING)); + MergeStream mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.ASCENDING)); List tuples = getTuples(mstream); assert(tuples.size() == 5); @@ -831,7 +822,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i desc"); streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING)); + mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.DESCENDING)); tuples = getTuples(mstream); assert(tuples.size() == 5); @@ -845,7 +836,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc"); streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - mstream = new MergeStream(streamA, streamB, new MultiComp(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.ASCENDING))); + mstream = new MergeStream(streamA, streamB, new MultiComp(new StreamComparator("a_f",ComparatorOrder.ASCENDING),new StreamComparator("a_i",ComparatorOrder.ASCENDING))); tuples = getTuples(mstream); assert(tuples.size() == 5); @@ -857,7 +848,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc"); streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - mstream = new MergeStream(streamA, streamB, new MultiComp(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING))); + mstream = new MergeStream(streamA, streamB, new MultiComp(new StreamComparator("a_f",ComparatorOrder.ASCENDING),new StreamComparator("a_i",ComparatorOrder.DESCENDING))); tuples = getTuples(mstream); assert(tuples.size() == 5); @@ -893,8 +884,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i"); CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING)); - ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING)); + MergeStream mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.ASCENDING)); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new StreamComparator("a_i",ComparatorOrder.ASCENDING)); List tuples = getTuples(pstream); assert(tuples.size() == 9); @@ -907,8 +898,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i"); streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING)); - pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING)); + mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.DESCENDING)); + pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new StreamComparator("a_i",ComparatorOrder.DESCENDING)); tuples = getTuples(pstream); assert(tuples.size() == 8); @@ -943,9 +934,9 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i"); CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB); - MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING)); + MergeStream mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.ASCENDING)); CountStream cstream = new CountStream(mstream); - ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING)); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new StreamComparator("a_i",ComparatorOrder.ASCENDING)); List tuples = getTuples(pstream); assert(tuples.size() == 9);