diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index a1f129e5c4e..dd79ab5469f 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -132,6 +132,8 @@ New Features * SOLR-8002: Add column alias support to the Parallel SQL Interface (Joel Bernstein) +* SOLR-7525: Add ComplementStream and IntersectStream to the Streaming API and Streaming Expressions + (Dennis Gove, Jason Gerlowski, Joel Bernstein) Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 034d48e2ad2..2ee3d6d69d9 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -17,11 +17,8 @@ package org.apache.solr.handler; -import java.io.ByteArrayInputStream; -import java.io.ObjectInputStream; -import java.lang.invoke.MethodHandles; import java.io.IOException; -import java.net.URLDecoder; +import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,14 +27,18 @@ import java.util.Map.Entry; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.ops.DistinctOperation; import org.apache.solr.client.solrj.io.ops.GroupOperation; +import org.apache.solr.client.solrj.io.ops.ReplaceOperation; import org.apache.solr.client.solrj.io.stream.CloudSolrStream; +import org.apache.solr.client.solrj.io.stream.ComplementStream; import org.apache.solr.client.solrj.io.stream.ExceptionStream; import org.apache.solr.client.solrj.io.stream.FacetStream; +import org.apache.solr.client.solrj.io.stream.HashJoinStream; import org.apache.solr.client.solrj.io.stream.InnerJoinStream; +import org.apache.solr.client.solrj.io.stream.IntersectStream; import org.apache.solr.client.solrj.io.stream.JDBCStream; import org.apache.solr.client.solrj.io.stream.LeftOuterJoinStream; -import org.apache.solr.client.solrj.io.stream.HashJoinStream; import org.apache.solr.client.solrj.io.stream.MergeStream; import org.apache.solr.client.solrj.io.stream.OuterHashJoinStream; import org.apache.solr.client.solrj.io.stream.ParallelStream; @@ -60,7 +61,6 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; -import org.apache.solr.common.util.Base64; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.CloseHook; import org.apache.solr.core.CoreContainer; @@ -120,6 +120,8 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { .withFunctionName("facet", FacetStream.class) .withFunctionName("update", UpdateStream.class) .withFunctionName("jdbc", JDBCStream.class) + .withFunctionName("intersect", IntersectStream.class) + .withFunctionName("complement", ComplementStream.class) // metrics .withFunctionName("min", MinMetric.class) @@ -127,6 +129,13 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { .withFunctionName("avg", MeanMetric.class) .withFunctionName("sum", SumMetric.class) .withFunctionName("count", CountMetric.class) + + // tuple manipulation operations + .withFunctionName("replace", ReplaceOperation.class) + + // stream reduction operations + .withFunctionName("group", GroupOperation.class) + .withFunctionName("distinct", DistinctOperation.class) ; // This pulls all the overrides and additions from the config diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/DistinctOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/DistinctOperation.java new file mode 100644 index 00000000000..67a5ac2e77e --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/DistinctOperation.java @@ -0,0 +1,79 @@ +package org.apache.solr.client.solrj.io.ops; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.FieldComparator; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.eq.FieldEqualitor; +import org.apache.solr.client.solrj.io.eq.StreamEqualitor; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; +import java.util.ArrayList; +import java.util.Locale; +import java.util.Map; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.PriorityQueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class DistinctOperation implements ReduceOperation { + + private static final long serialVersionUID = 1L; + private Tuple current; + + public DistinctOperation(StreamExpression expression, StreamFactory factory) throws IOException { + init(); + } + + public DistinctOperation() { + init(); + } + + private void init() { + } + + public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + return expression; + } + + public Tuple reduce() { + // Return the tuple after setting current to null. This will ensure the next call to + // operate stores that tuple + Tuple toReturn = current; + current = null; + + return toReturn; + } + + public void operate(Tuple tuple) { + // we only care about the first one seen. Drop all but the first + if(null == current){ + current = tuple; + } + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ComplementStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ComplementStream.java new file mode 100644 index 00000000000..3b42d7e21cc --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ComplementStream.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.FieldComparator; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.eq.FieldEqualitor; +import org.apache.solr.client.solrj.io.eq.StreamEqualitor; +import org.apache.solr.client.solrj.io.ops.DistinctOperation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +/** +* Emits tuples from streamA which do not exist in streamB. Resulting tuples are ordered +* the same as they were in streamA. Both streams must be sorted by the fields being compared. +**/ + +public class ComplementStream extends TupleStream implements Expressible { + + private static final long serialVersionUID = 1; + + private PushBackStream streamA; + private PushBackStream streamB; + private TupleStream originalStreamB; + private StreamEqualitor eq; + + public ComplementStream(TupleStream streamA, TupleStream streamB, StreamEqualitor eq) throws IOException { + init(streamA, streamB, eq); + } + + public ComplementStream(StreamExpression expression,StreamFactory factory) throws IOException { + // grab all parameters out + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); + StreamExpressionNamedParameter onExpression = factory.getNamedOperand(expression, "on"); + + // validate expression contains only what we want. + if(expression.getParameters().size() != streamExpressions.size() + 1){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression)); + } + + if(2 != streamExpressions.size()){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting two streams but found %d (must be TupleStream types)",expression, streamExpressions.size())); + } + + if(null == onExpression || !(onExpression.getParameter() instanceof StreamExpressionValue)){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'on' parameter listing fields to merge on but didn't find one",expression)); + } + + init( factory.constructStream(streamExpressions.get(0)), + factory.constructStream(streamExpressions.get(1)), + factory.constructEqualitor(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldEqualitor.class) + ); + } + + private void init(TupleStream streamA, TupleStream streamB, StreamEqualitor eq) throws IOException { + this.streamA = new PushBackStream(streamA); + this.streamB = new PushBackStream(new UniqueStream(streamB, eq)); + this.originalStreamB = streamB; // hold onto this for toExpression + this.eq = eq; + + // streamA and streamB must both be sorted so that comp can be derived from + if(!eq.isDerivedFrom(streamA.getStreamSort()) || !eq.isDerivedFrom(streamB.getStreamSort())){ + throw new IOException("Invalid ComplementStream - both substream comparators (sort) must be a superset of this stream's equalitor."); + } + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + // streams + if(streamA instanceof Expressible){ + expression.addParameter(((Expressible)streamA).toExpression(factory)); + } + else{ + throw new IOException("This IntersectionStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + + if(originalStreamB instanceof Expressible){ + expression.addParameter(((Expressible)originalStreamB).toExpression(factory)); + } + else{ + throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + + // on + expression.addParameter(new StreamExpressionNamedParameter("on",eq.toExpression(factory))); + + return expression; + } + + public void setStreamContext(StreamContext context) { + this.streamA.setStreamContext(context); + this.streamB.setStreamContext(context); + } + + public List children() { + List l = new ArrayList(); + l.add(streamA); + l.add(streamB); + return l; + } + + public void open() throws IOException { + streamA.open(); + streamB.open(); + } + + public void close() throws IOException { + streamA.close(); + streamB.close(); + } + + public Tuple read() throws IOException { + + while(true){ + Tuple a = streamA.read(); + Tuple b = streamB.read(); + + // if a is EOF then we're done + if(a.EOF){ return a; } + + // if b is EOF then return a + if(b.EOF){ + streamB.pushBack(b); + return a; + } + + // if a != b && a < b then we know there is no b which a might equal so return a + if(!eq.test(a, b) && streamA.getStreamSort().compare(a, b) < 0){ + streamB.pushBack(b); + return a; + } + + // if a == b then ignore a cause it exists in b + // else we know that b < a so we can ignore b + if(eq.test(a, b)){ + streamB.pushBack(b); + } + else{ + streamA.pushBack(a); + } + } + } + + /** Return the stream sort - ie, the order in which records are returned */ + public StreamComparator getStreamSort(){ + return streamA.getStreamSort(); + } + + + public int getCost() { + return 0; + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/IntersectStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/IntersectStream.java new file mode 100644 index 00000000000..230083d7621 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/IntersectStream.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.FieldComparator; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.eq.FieldEqualitor; +import org.apache.solr.client.solrj.io.eq.StreamEqualitor; +import org.apache.solr.client.solrj.io.ops.DistinctOperation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +/** +* Emits tuples from streamA which also exist in streamB. Resulting tuples are ordered +* the same as they were in streamA. Both streams must be sorted by the fields being compared. +**/ + +public class IntersectStream extends TupleStream implements Expressible { + + private static final long serialVersionUID = 1; + + private PushBackStream streamA; + private PushBackStream streamB; + private TupleStream originalStreamB; + private StreamEqualitor eq; + + public IntersectStream(TupleStream streamA, TupleStream streamB, StreamEqualitor eq) throws IOException { + init(streamA, streamB, eq); + } + + public IntersectStream(StreamExpression expression,StreamFactory factory) throws IOException { + // grab all parameters out + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); + StreamExpressionNamedParameter onExpression = factory.getNamedOperand(expression, "on"); + + // validate expression contains only what we want. + if(expression.getParameters().size() != streamExpressions.size() + 1){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression)); + } + + if(2 != streamExpressions.size()){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting two streams but found %d (must be TupleStream types)",expression, streamExpressions.size())); + } + + if(null == onExpression || !(onExpression.getParameter() instanceof StreamExpressionValue)){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'on' parameter listing fields to merge on but didn't find one",expression)); + } + + init( factory.constructStream(streamExpressions.get(0)), + factory.constructStream(streamExpressions.get(1)), + factory.constructEqualitor(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldEqualitor.class) + ); + } + + private void init(TupleStream streamA, TupleStream streamB, StreamEqualitor eq) throws IOException { + this.streamA = new PushBackStream(streamA); + this.streamB = new PushBackStream(new UniqueStream(streamB, eq)); + this.originalStreamB = streamB; // hold onto this for toExpression + this.eq = eq; + + // streamA and streamB must both be sorted so that comp can be derived from + if(!eq.isDerivedFrom(streamA.getStreamSort()) || !eq.isDerivedFrom(streamB.getStreamSort())){ + throw new IOException("Invalid IntersectStream - both substream comparators (sort) must be a superset of this stream's equalitor."); + } + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + // streams + if(streamA instanceof Expressible){ + expression.addParameter(((Expressible)streamA).toExpression(factory)); + } + else{ + throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + + if(originalStreamB instanceof Expressible){ + expression.addParameter(((Expressible)originalStreamB).toExpression(factory)); + } + else{ + throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + + // on + expression.addParameter(new StreamExpressionNamedParameter("on",eq.toExpression(factory))); + + return expression; + } + + public void setStreamContext(StreamContext context) { + this.streamA.setStreamContext(context); + this.streamB.setStreamContext(context); + } + + public List children() { + List l = new ArrayList(); + l.add(streamA); + l.add(streamB); + return l; + } + + public void open() throws IOException { + streamA.open(); + streamB.open(); + } + + public void close() throws IOException { + streamA.close(); + streamB.close(); + } + + public Tuple read() throws IOException { + + while(true){ + Tuple a = streamA.read(); + Tuple b = streamB.read(); + + // if either are EOF then we're done + if(a.EOF){ return a; } + if(b.EOF){ return b; } + + if(eq.test(a, b)){ + streamB.pushBack(b); + return a; + } + + // We're not at the end and they're not equal. We now need to decide which we can + // throw away. This is accomplished by checking which is less than the other. The + // one that is less (determined by the sort) can be tossed. The other should + // be pushed back and the loop continued. We don't have to worry about an == 0 + // result because we already know tuples a and b are not equal. And because eq + // is derived from the sorts of both streamA and streamB we can rest assured that + // equality is not a possibility. + int aComp = streamA.getStreamSort().compare(a, b); + if(aComp < 0){ streamB.pushBack(b); } + else{ streamA.pushBack(a); } + } + } + + /** Return the stream sort - ie, the order in which records are returned */ + public StreamComparator getStreamSort(){ + return streamA.getStreamSort(); + } + + + public int getCost() { + return 0; + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java index 73c799c1958..82002328fe7 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java @@ -19,16 +19,14 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.Locale; import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; -import org.apache.solr.client.solrj.io.eq.Equalitor; import org.apache.solr.client.solrj.io.eq.FieldEqualitor; import org.apache.solr.client.solrj.io.eq.StreamEqualitor; +import org.apache.solr.client.solrj.io.ops.DistinctOperation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; @@ -46,9 +44,10 @@ public class UniqueStream extends TupleStream implements Expressible { private static final long serialVersionUID = 1; - private TupleStream stream; - private Equalitor eq; - private transient Tuple currentTuple; + private TupleStream originalStream; + private StreamEqualitor originalEqualitor; + + private ReducerStream reducerStream; public UniqueStream(TupleStream stream, StreamEqualitor eq) throws IOException { init(stream,eq); @@ -76,8 +75,10 @@ public class UniqueStream extends TupleStream implements Expressible { } private void init(TupleStream stream, StreamEqualitor eq) throws IOException{ - this.stream = stream; - this.eq = eq; + this.originalStream = stream; + this.originalEqualitor = eq; + + this.reducerStream = new ReducerStream(stream, eq, new DistinctOperation()); if(!eq.isDerivedFrom(stream.getStreamSort())){ throw new IOException("Invalid UniqueStream - substream comparator (sort) must be a superset of this stream's equalitor."); @@ -90,16 +91,16 @@ public class UniqueStream extends TupleStream implements Expressible { StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); // streams - if(stream instanceof Expressible){ - expression.addParameter(((Expressible)stream).toExpression(factory)); + if(originalStream instanceof Expressible){ + expression.addParameter(((Expressible)originalStream).toExpression(factory)); } else{ throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression"); } // over - if(eq instanceof Expressible){ - expression.addParameter(new StreamExpressionNamedParameter("over",((Expressible)eq).toExpression(factory))); + if(originalEqualitor instanceof Expressible){ + expression.addParameter(new StreamExpressionNamedParameter("over",((Expressible)originalEqualitor).toExpression(factory))); } else{ throw new IOException("This UniqueStream contains a non-expressible equalitor - it cannot be converted to an expression"); @@ -109,52 +110,33 @@ public class UniqueStream extends TupleStream implements Expressible { } public void setStreamContext(StreamContext context) { - this.stream.setStreamContext(context); + this.originalStream.setStreamContext(context); + this.reducerStream.setStreamContext(context); } public List children() { List l = new ArrayList(); - l.add(stream); + l.add(originalStream); return l; } public void open() throws IOException { - stream.open(); + reducerStream.open(); + // opens originalStream as well } public void close() throws IOException { - stream.close(); + reducerStream.close(); + // closes originalStream as well } public Tuple read() throws IOException { - Tuple tuple = stream.read(); - if(tuple.EOF) { - return tuple; - } - - if(currentTuple == null) { - currentTuple = tuple; - return tuple; - } else { - while(true) { - if(eq.test(currentTuple, tuple)){ - //We have duplicate tuple so read the next tuple from the stream. - tuple = stream.read(); - if(tuple.EOF) { - return tuple; - } - } else { - //We have a non duplicate - this.currentTuple = tuple; - return tuple; - } - } - } + return reducerStream.read(); } /** Return the stream sort - ie, the order in which records are returned */ public StreamComparator getStreamSort(){ - return stream.getStreamSort(); + return reducerStream.getStreamSort(); } public int getCost() { diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 7ddda3af65e..a003661ff3a 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -147,6 +147,10 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { testSubFacetStream(); testUpdateStream(); testParallelUpdateStream(); + testIntersectStream(); + testParallelIntersectStream(); + testComplementStream(); + testParallelComplementStream(); } private void testCloudSolrStream() throws Exception { @@ -2178,6 +2182,155 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { commit(); } + private void testIntersectStream() throws Exception{ + indexr(id, "0", "a_s", "setA", "a_i", "0"); + indexr(id, "2", "a_s", "setA", "a_i", "1"); + indexr(id, "3", "a_s", "setA", "a_i", "2"); + indexr(id, "4", "a_s", "setA", "a_i", "3"); + + indexr(id, "5", "a_s", "setB", "a_i", "2"); + indexr(id, "6", "a_s", "setB", "a_i", "3"); + + indexr(id, "7", "a_s", "setAB", "a_i", "0"); + indexr(id, "8", "a_s", "setAB", "a_i", "6"); + commit(); + + StreamExpression expression; + TupleStream stream; + List tuples; + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", zkServer.getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("intersect", IntersectStream.class); + + // basic + expression = StreamExpressionParser.parse("intersect(" + + "search(collection1, q=a_s:(setA || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc, a_s asc\")," + + "search(collection1, q=a_s:(setB || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc\")," + + "on=\"a_i\")"); + stream = new IntersectStream(expression, factory); + tuples = getTuples(stream); + + assert(tuples.size() == 5); + assertOrder(tuples, 0,7,3,4,8); + + del("*:*"); + commit(); + } + + private void testParallelIntersectStream() throws Exception { + indexr(id, "0", "a_s", "setA", "a_i", "0"); + indexr(id, "2", "a_s", "setA", "a_i", "1"); + indexr(id, "3", "a_s", "setA", "a_i", "2"); + indexr(id, "4", "a_s", "setA", "a_i", "3"); + + indexr(id, "5", "a_s", "setB", "a_i", "2"); + indexr(id, "6", "a_s", "setB", "a_i", "3"); + + indexr(id, "7", "a_s", "setAB", "a_i", "0"); + indexr(id, "8", "a_s", "setAB", "a_i", "6"); + commit(); + + StreamFactory streamFactory = new StreamFactory() + .withCollectionZkHost("collection1", zkServer.getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("intersect", IntersectStream.class) + .withFunctionName("parallel", ParallelStream.class); + // basic + + String zkHost = zkServer.getZkAddress(); + final TupleStream stream = streamFactory.constructStream("parallel(" + + "collection1, " + + "intersect(" + + "search(collection1, q=a_s:(setA || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc, a_s asc\", partitionKeys=\"a_i\")," + + "search(collection1, q=a_s:(setB || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\")," + + "on=\"a_i\")," + + "workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i asc\")"); + final List tuples = getTuples(stream); + + assert(tuples.size() == 5); + assertOrder(tuples, 0,7,3,4,8); + + del("*:*"); + commit(); + } + + private void testComplementStream() throws Exception{ + indexr(id, "0", "a_s", "setA", "a_i", "0"); + indexr(id, "2", "a_s", "setA", "a_i", "1"); + indexr(id, "3", "a_s", "setA", "a_i", "2"); + indexr(id, "4", "a_s", "setA", "a_i", "3"); + + indexr(id, "5", "a_s", "setB", "a_i", "2"); + indexr(id, "6", "a_s", "setB", "a_i", "3"); + indexr(id, "9", "a_s", "setB", "a_i", "5"); + + indexr(id, "7", "a_s", "setAB", "a_i", "0"); + indexr(id, "8", "a_s", "setAB", "a_i", "6"); + commit(); + + StreamExpression expression; + TupleStream stream; + List tuples; + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", zkServer.getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("complement", ComplementStream.class); + + // basic + expression = StreamExpressionParser.parse("complement(" + + "search(collection1, q=a_s:(setA || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc, a_s asc\")," + + "search(collection1, q=a_s:(setB || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc\")," + + "on=\"a_i\")"); + stream = new ComplementStream(expression, factory); + tuples = getTuples(stream); + + assert(tuples.size() == 1); + assertOrder(tuples, 2); + + del("*:*"); + commit(); + } + + private void testParallelComplementStream() throws Exception { + indexr(id, "0", "a_s", "setA", "a_i", "0"); + indexr(id, "2", "a_s", "setA", "a_i", "1"); + indexr(id, "3", "a_s", "setA", "a_i", "2"); + indexr(id, "4", "a_s", "setA", "a_i", "3"); + + indexr(id, "5", "a_s", "setB", "a_i", "2"); + indexr(id, "6", "a_s", "setB", "a_i", "3"); + indexr(id, "9", "a_s", "setB", "a_i", "5"); + + indexr(id, "7", "a_s", "setAB", "a_i", "0"); + indexr(id, "8", "a_s", "setAB", "a_i", "6"); + commit(); + + StreamFactory streamFactory = new StreamFactory() + .withCollectionZkHost("collection1", zkServer.getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("complement", ComplementStream.class) + .withFunctionName("parallel", ParallelStream.class); + + final String zkHost = zkServer.getZkAddress(); + final TupleStream stream = streamFactory.constructStream("parallel(" + + "collection1, " + + "complement(" + + "search(collection1, q=a_s:(setA || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc, a_s asc\", partitionKeys=\"a_i\")," + + "search(collection1, q=a_s:(setB || setAB), fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\")," + + "on=\"a_i\")," + + "workers=\"2\", zkHost=\"" + zkHost + "\", sort=\"a_i asc\")"); + final List tuples = getTuples(stream); + + assert(tuples.size() == 1); + assertOrder(tuples, 2); + + del("*:*"); + commit(); + } + protected List getTuples(TupleStream tupleStream) throws IOException { tupleStream.open(); List tuples = new ArrayList(); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java index 70f173877fb..2a2bcd83e60 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java @@ -54,6 +54,8 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase { .withFunctionName("stats", StatsStream.class) .withFunctionName("facet", FacetStream.class) .withFunctionName("jdbc", JDBCStream.class) + .withFunctionName("intersect", IntersectStream.class) + .withFunctionName("complement", ComplementStream.class) .withFunctionName("count", CountMetric.class) .withFunctionName("sum", SumMetric.class) .withFunctionName("min", MinMetric.class) @@ -254,6 +256,37 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase { assertTrue(expressionString.contains("sort=\"ID asc\"")); } + @Test + public void testIntersectStream() throws Exception { + IntersectStream stream; + String expressionString; + + // Basic test + stream = new IntersectStream(StreamExpressionParser.parse("intersect(" + + "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\")," + + "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\")," + + "on=\"a_f, a_s\")"), factory); + expressionString = stream.toExpression(factory).toString(); + assertTrue(expressionString.contains("q=\"id:(0 3 4)\"")); + assertTrue(expressionString.contains("q=\"id:(1 2)\"")); + assertTrue(expressionString.contains("on=\"a_f,a_s\"")); + } + + @Test + public void testComplementStream() throws Exception { + ComplementStream stream; + String expressionString; + + // Basic test + stream = new ComplementStream(StreamExpressionParser.parse("complement(" + + "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\")," + + "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\")," + + "on=\"a_f, a_s\")"), factory); + expressionString = stream.toExpression(factory).toString(); + assertTrue(expressionString.contains("q=\"id:(0 3 4)\"")); + assertTrue(expressionString.contains("q=\"id:(1 2)\"")); + assertTrue(expressionString.contains("on=\"a_f,a_s\"")); + } @Test public void testCountMetric() throws Exception {