From 60dca1b4f988e0a839ceb5e54ad85df70cc1f1f9 Mon Sep 17 00:00:00 2001 From: Joel Bernstein Date: Fri, 11 Dec 2015 02:28:25 +0000 Subject: [PATCH] SOLR-8337: Add ReduceOperation and wire it into the ReducerStream git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1719246 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 2 + .../apache/solr/handler/StreamHandler.java | 4 +- .../apache/solr/client/solrj/io/Tuple.java | 8 +- .../client/solrj/io/ops/GroupOperation.java | 129 +++++++++++++++ .../client/solrj/io/ops/ReduceOperation.java | 25 +++ .../client/solrj/io/stream/RankStream.java | 2 +- .../client/solrj/io/stream/ReducerStream.java | 67 +++++--- .../solrj/io/stream/StreamExpressionTest.java | 150 ++++++++++-------- .../StreamExpressionToExpessionTest.java | 12 +- .../client/solrj/io/stream/StreamingTest.java | 85 +++++----- 10 files changed, 350 insertions(+), 134 deletions(-) create mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java create mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReduceOperation.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index b38a464f77a..55ec0390a85 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -114,6 +114,8 @@ New Features * SOLR-7669: Add SelectStream and Tuple Operations to the Streaming API and Streaming Expressions (Dennis Gove) +* SOLR-8337: Add ReduceOperation and wire it into the ReducerStream (Joel Bernstein) + Bug Fixes ---------------------- * SOLR-8386: Add field option in the new admin UI schema page loads up even when no schemaFactory has been diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 63cca8560d8..6e372f4cfde 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -30,6 +30,7 @@ import java.util.Map.Entry; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.ops.GroupOperation; import org.apache.solr.client.solrj.io.stream.CloudSolrStream; import org.apache.solr.client.solrj.io.stream.ExceptionStream; import org.apache.solr.client.solrj.io.stream.InnerJoinStream; @@ -101,7 +102,8 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { .withFunctionName("merge", MergeStream.class) .withFunctionName("unique", UniqueStream.class) .withFunctionName("top", RankStream.class) - .withFunctionName("group", ReducerStream.class) + .withFunctionName("group", GroupOperation.class) + .withFunctionName("reduce", ReducerStream.class) .withFunctionName("parallel", ParallelStream.class) .withFunctionName("rollup", RollupStream.class) .withFunctionName("stats", StatsStream.class) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java index 22bc5882dac..c8643096fde 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java @@ -119,12 +119,12 @@ public class Tuple implements Cloneable { return this.fields; } - public List getMaps() { - return (List)this.fields.get("_MAPS_"); + public List getMaps(Object key) { + return (List)this.fields.get(key); } - public void setMaps(List maps) { - this.fields.put("_MAPS_", maps); + public void setMaps(Object key, List maps) { + this.fields.put(key, maps); } public Map getMetrics() { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java new file mode 100644 index 00000000000..fff7e94bb2b --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java @@ -0,0 +1,129 @@ +package org.apache.solr.client.solrj.io.ops; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.FieldComparator; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; +import java.util.ArrayList; +import java.util.Locale; +import java.util.Map; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.PriorityQueue; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class GroupOperation implements ReduceOperation { + + private PriorityQueue priorityQueue; + private Comparator comp; + private StreamComparator streamComparator; + private int size; + + public GroupOperation(StreamExpression expression, StreamFactory factory) throws IOException { + + StreamExpressionNamedParameter nParam = factory.getNamedOperand(expression, "n"); + StreamExpressionNamedParameter sortExpression = factory.getNamedOperand(expression, "sort"); + + StreamComparator streamComparator = factory.constructComparator(((StreamExpressionValue) sortExpression.getParameter()).getValue(), FieldComparator.class); + String nStr = ((StreamExpressionValue)nParam.getParameter()).getValue(); + int nInt = 0; + + try{ + nInt = Integer.parseInt(nStr); + if(nInt <= 0){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - topN '%s' must be greater than 0.",expression, nStr)); + } + } catch(NumberFormatException e) { + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - topN '%s' is not a valid integer.",expression, nStr)); + } + + init(streamComparator, nInt); + } + + public GroupOperation(StreamComparator streamComparator, int size) { + init(streamComparator, size); + } + + private void init(StreamComparator streamComparator, int size) { + this.size = size; + this.streamComparator = streamComparator; + this.comp = new ReverseComp(streamComparator); + this.priorityQueue = new PriorityQueue(size, this.comp); + } + + public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + // n + expression.addParameter(new StreamExpressionNamedParameter("n", Integer.toString(size))); + + // sort + expression.addParameter(new StreamExpressionNamedParameter("sort", streamComparator.toExpression(factory))); + return expression; + } + + public Tuple reduce() { + Map map = new HashMap(); + List list = new ArrayList(); + LinkedList ll = new LinkedList(); + while(priorityQueue.size() > 0) { + ll.addFirst(priorityQueue.poll().getMap()); + //This will clear priority queue and so it will be ready for the next group. + } + + list.addAll(ll); + Map groupHead = list.get(0); + map.putAll(groupHead); + map.put("group", list); + return new Tuple(map); + } + + public void operate(Tuple tuple) { + if(priorityQueue.size() >= size) { + Tuple peek = priorityQueue.peek(); + if(streamComparator.compare(tuple, peek) < 0) { + priorityQueue.poll(); + priorityQueue.add(tuple); + } + } else { + priorityQueue.add(tuple); + } + } + + class ReverseComp implements Comparator, Serializable { + private StreamComparator comp; + + public ReverseComp(StreamComparator comp) { + this.comp = comp; + } + + public int compare(Tuple t1, Tuple t2) { + return comp.compare(t1, t2)*(-1); + } + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReduceOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReduceOperation.java new file mode 100644 index 00000000000..10fa5c6d2b2 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReduceOperation.java @@ -0,0 +1,25 @@ +package org.apache.solr.client.solrj.io.ops; + +import org.apache.solr.client.solrj.io.Tuple; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +public interface ReduceOperation extends StreamOperation { + public Tuple reduce(); +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java index d5a25cf0502..fb3c304d1ae 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java @@ -161,8 +161,8 @@ public class RankStream extends TupleStream implements Expressible { topList.addLast(tuple); break; } else { - Tuple peek = top.peek(); if(top.size() >= size) { + Tuple peek = top.peek(); if(comp.compare(tuple, peek) < 0) { top.poll(); top.add(tuple); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java index 2f338926f6e..04cb53537fa 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java @@ -31,6 +31,8 @@ import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.eq.FieldEqualitor; import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor; import org.apache.solr.client.solrj.io.eq.StreamEqualitor; +import org.apache.solr.client.solrj.io.ops.ReduceOperation; +import org.apache.solr.client.solrj.io.ops.StreamOperation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; @@ -58,15 +60,17 @@ public class ReducerStream extends TupleStream implements Expressible { private PushBackStream stream; private StreamEqualitor eq; + private ReduceOperation op; + private boolean needsReduce; private transient Tuple currentGroupHead; - public ReducerStream(TupleStream stream,StreamEqualitor eq) throws IOException { - init(stream,eq); + public ReducerStream(TupleStream stream, StreamEqualitor eq, ReduceOperation op) throws IOException { + init(stream, eq, op); } - public ReducerStream(TupleStream stream,StreamComparator comp) throws IOException { - init(stream, convertToEqualitor(comp)); + public ReducerStream(TupleStream stream, StreamComparator comp, ReduceOperation op) throws IOException { + init(stream, convertToEqualitor(comp), op); } private StreamEqualitor convertToEqualitor(StreamComparator comp){ @@ -88,9 +92,10 @@ public class ReducerStream extends TupleStream implements Expressible { // grab all parameters out List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); StreamExpressionNamedParameter byExpression = factory.getNamedOperand(expression, "by"); - + List operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ReduceOperation.class); + // validate expression contains only what we want. - if(expression.getParameters().size() != streamExpressions.size() + 1){ + if(expression.getParameters().size() != streamExpressions.size() + 2){ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression)); } @@ -100,15 +105,29 @@ public class ReducerStream extends TupleStream implements Expressible { if(null == byExpression || !(byExpression.getParameter() instanceof StreamExpressionValue)){ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'by' parameter listing fields to group by but didn't find one",expression)); } - + + ReduceOperation reduceOperation = null; + if(operationExpressions != null && operationExpressions.size() == 1) { + StreamExpression ex = operationExpressions.get(0); + StreamOperation operation = factory.constructOperation(ex); + if(operation instanceof ReduceOperation) { + reduceOperation = (ReduceOperation) operation; + } else { + throw new IOException("The ReducerStream requires a ReduceOperation. A StreamOperation was provided."); + } + } else { + throw new IOException("The ReducerStream requires a ReduceOperation."); + } + init(factory.constructStream(streamExpressions.get(0)), - factory.constructEqualitor(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldEqualitor.class) - ); + factory.constructEqualitor(((StreamExpressionValue) byExpression.getParameter()).getValue(), FieldEqualitor.class), + reduceOperation); } - private void init(TupleStream stream, StreamEqualitor eq) throws IOException{ + private void init(TupleStream stream, StreamEqualitor eq, ReduceOperation op) throws IOException{ this.stream = new PushBackStream(stream); this.eq = eq; + this.op = op; if(!eq.isDerivedFrom(stream.getStreamSort())){ throw new IOException("Invalid ReducerStream - substream comparator (sort) must be a superset of this stream's comparator."); @@ -130,6 +149,12 @@ public class ReducerStream extends TupleStream implements Expressible { else{ throw new IOException("This ReducerStream contains a non-expressible comparator - it cannot be converted to an expression"); } + + if(op instanceof Expressible) { + expression.addParameter(op.toExpression(factory)); + } else { + throw new IOException("This ReducerStream contains a non-expressible operation - it cannot be converted to an expression"); + } return expression; } @@ -154,19 +179,14 @@ public class ReducerStream extends TupleStream implements Expressible { public Tuple read() throws IOException { - List maps = new ArrayList(); while(true) { Tuple t = stream.read(); if(t.EOF) { - if(maps.size() > 0) { + if(needsReduce) { stream.pushBack(t); - Map map1 = maps.get(0); - Map map2 = new HashMap(); - map2.putAll(map1); - Tuple groupHead = new Tuple(map2); - groupHead.setMaps(maps); - return groupHead; + needsReduce = false; + return op.reduce(); } else { return t; } @@ -174,16 +194,17 @@ public class ReducerStream extends TupleStream implements Expressible { if(currentGroupHead == null) { currentGroupHead = t; - maps.add(t.getMap()); + op.operate(t); + needsReduce = true; } else { if(eq.test(currentGroupHead, t)) { - maps.add(t.getMap()); + op.operate(t); + needsReduce = true; } else { - Tuple groupHead = currentGroupHead.clone(); stream.pushBack(t); currentGroupHead = null; - groupHead.setMaps(maps); - return groupHead; + needsReduce = false; + return op.reduce(); } } } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index a46a95d21c2..d99c871ebe2 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.ops.GroupOperation; import org.apache.solr.client.solrj.io.ops.ReplaceOperation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; @@ -159,8 +160,8 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { tuples = getTuples(stream); assert(tuples.size() == 5); - assertOrder(tuples, 0,2,1,3,4); - assertLong(tuples.get(0),"a_i", 0); + assertOrder(tuples, 0, 2, 1, 3, 4); + assertLong(tuples.get(0), "a_i", 0); // Basic w/aliases expression = StreamExpressionParser.parse("search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\")"); @@ -168,9 +169,9 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { tuples = getTuples(stream); assert(tuples.size() == 5); - assertOrder(tuples, 0,2,1,3,4); - assertLong(tuples.get(0),"alias.a_i", 0); - assertString(tuples.get(0),"name", "hello0"); + assertOrder(tuples, 0, 2, 1, 3, 4); + assertLong(tuples.get(0), "alias.a_i", 0); + assertString(tuples.get(0), "name", "hello0"); // Basic filtered test expression = StreamExpressionParser.parse("search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")"); @@ -178,8 +179,8 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { tuples = getTuples(stream); assert(tuples.size() == 3); - assertOrder(tuples, 0,3,4); - assertLong(tuples.get(1),"a_i", 3); + assertOrder(tuples, 0, 3, 4); + assertLong(tuples.get(1), "a_i", 3); del("*:*"); commit(); @@ -216,7 +217,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { assert(tuples.size() == 5); assertOrder(tuples, 0,2,1,3,4); assertLong(tuples.get(0),"alias.a_i", 0); - assertString(tuples.get(0),"name", "hello0"); + assertString(tuples.get(0), "name", "hello0"); // Basic filtered test expression = StreamExpressionParser.parse("search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", zkHost=" + zkServer.getZkAddress() + ", sort=\"a_f asc, a_i asc\")"); @@ -225,7 +226,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { assert(tuples.size() == 3); assertOrder(tuples, 0,3,4); - assertLong(tuples.get(1),"a_i", 3); + assertLong(tuples.get(1), "a_i", 3); del("*:*"); commit(); @@ -453,54 +454,57 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { List maps0, maps1, maps2; StreamFactory factory = new StreamFactory() - .withCollectionZkHost("collection1", zkServer.getZkAddress()) - .withFunctionName("search", CloudSolrStream.class) - .withFunctionName("unique", UniqueStream.class) - .withFunctionName("top", RankStream.class) - .withFunctionName("group", ReducerStream.class); + .withCollectionZkHost("collection1", zkServer.getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("reduce", ReducerStream.class) + .withFunctionName("group", GroupOperation.class); // basic - expression = StreamExpressionParser.parse("group(" - + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\")," - + "by=\"a_s\")"); - stream = new ReducerStream(expression, factory); + expression = StreamExpressionParser.parse("reduce(" + + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\")," + + "by=\"a_s\"," + + "group(sort=\"a_f desc\", n=\"4\"))"); + + stream = factory.constructStream(expression); tuples = getTuples(stream); assert(tuples.size() == 3); - assertOrder(tuples, 0,3,4); t0 = tuples.get(0); - maps0 = t0.getMaps(); - assertMaps(maps0, 0, 2,1, 9); + maps0 = t0.getMaps("group"); + assertMaps(maps0, 9, 1, 2, 0); t1 = tuples.get(1); - maps1 = t1.getMaps(); - assertMaps(maps1, 3, 5, 7, 8); + maps1 = t1.getMaps("group"); + assertMaps(maps1, 8, 7, 5, 3); + t2 = tuples.get(2); - maps2 = t2.getMaps(); - assertMaps(maps2, 4, 6); + maps2 = t2.getMaps("group"); + assertMaps(maps2, 6, 4); // basic w/spaces - expression = StreamExpressionParser.parse("group(" - + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\")," - + "by=\"a_s\")"); - stream = new ReducerStream(expression, factory); + expression = StreamExpressionParser.parse("reduce(" + + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\")," + + "by=\"a_s\"," + + "group(sort=\"a_i asc\", n=\"2\"))"); + stream = factory.constructStream(expression); tuples = getTuples(stream); assert(tuples.size() == 3); - assertOrder(tuples, 0,3,4); t0 = tuples.get(0); - maps0 = t0.getMaps(); - assertMaps(maps0, 0, 2,1, 9); + maps0 = t0.getMaps("group"); + assert(maps0.size() == 2); + + assertMaps(maps0, 0, 1); t1 = tuples.get(1); - maps1 = t1.getMaps(); - assertMaps(maps1, 3, 5, 7, 8); + maps1 = t1.getMaps("group"); + assertMaps(maps1, 3, 5); t2 = tuples.get(2); - maps2 = t2.getMaps(); + maps2 = t2.getMaps("group"); assertMaps(maps2, 4, 6); del("*:*"); @@ -748,54 +752,58 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { String zkHost = zkServer.getZkAddress(); StreamFactory streamFactory = new StreamFactory().withCollectionZkHost("collection1", zkServer.getZkAddress()) .withFunctionName("search", CloudSolrStream.class) - .withFunctionName("unique", UniqueStream.class) - .withFunctionName("top", RankStream.class) - .withFunctionName("group", ReducerStream.class) + .withFunctionName("group", GroupOperation.class) + .withFunctionName("reduce", ReducerStream.class) .withFunctionName("parallel", ParallelStream.class); - ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")"); + ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1," + + "reduce(" + + "search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), " + + "by=\"a_s\"," + + "group(sort=\"a_i asc\", n=\"5\")), " + + "workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")"); List tuples = getTuples(pstream); assert(tuples.size() == 3); - assertOrder(tuples, 0,3,4); Tuple t0 = tuples.get(0); - List maps0 = t0.getMaps(); - assertMaps(maps0, 0, 2, 1, 9); + List maps0 = t0.getMaps("group"); + assertMaps(maps0, 0, 1, 2, 9); Tuple t1 = tuples.get(1); - List maps1 = t1.getMaps(); + List maps1 = t1.getMaps("group"); assertMaps(maps1, 3, 5, 7, 8); Tuple t2 = tuples.get(2); - List maps2 = t2.getMaps(); + List maps2 = t2.getMaps("group"); assertMaps(maps2, 4, 6); - //Test Descending with Ascending subsort - pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s desc\")"); + pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, " + + "reduce(" + + "search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), " + + "by=\"a_s\", " + + "group(sort=\"a_i desc\", n=\"5\")),"+ + "workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s desc\")"); tuples = getTuples(pstream); assert(tuples.size() == 3); - assertOrder(tuples, 4,3,0); t0 = tuples.get(0); - maps0 = t0.getMaps(); - assertMaps(maps0, 4, 6); + maps0 = t0.getMaps("group"); + assertMaps(maps0, 6, 4); t1 = tuples.get(1); - maps1 = t1.getMaps(); - assertMaps(maps1, 3, 5, 7, 8); + maps1 = t1.getMaps("group"); + assertMaps(maps1, 8, 7, 5, 3); t2 = tuples.get(2); - maps2 = t2.getMaps(); - assertMaps(maps2, 0, 2, 1, 9); - - + maps2 = t2.getMaps("group"); + assertMaps(maps2, 9, 2, 1, 0); del("*:*"); commit(); @@ -1246,7 +1254,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { List tuples; StreamFactory factory = new StreamFactory() - .withCollectionZkHost("collection1", zkServer.getZkAddress()) + .withCollectionZkHost("collection1", zkServer.getZkAddress()) .withFunctionName("search", CloudSolrStream.class) .withFunctionName("outerHashJoin", OuterHashJoinStream.class); @@ -1262,9 +1270,9 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { // Basic desc expression = StreamExpressionParser.parse("outerHashJoin(" - + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\")," - + "hashed=search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\")," - + "on=\"join1_i, join2_s\")"); + + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\")," + + "hashed=search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\")," + + "on=\"join1_i, join2_s\")"); stream = new OuterHashJoinStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 10); @@ -1272,9 +1280,9 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { // Results in both searches, no join matches expression = StreamExpressionParser.parse("outerHashJoin(" - + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\")," - + "hashed=search(collection1, q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\")," - + "on=\"ident_s\")"); + + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\")," + + "hashed=search(collection1, q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\")," + + "on=\"ident_s\")"); stream = new OuterHashJoinStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 8); @@ -1406,7 +1414,22 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { } return true; } - + + protected boolean assertMapOrder(List tuples, int... ids) throws Exception { + int i = 0; + for(int val : ids) { + Tuple t = tuples.get(i); + List tip = t.getMaps("group"); + int id = (int)tip.get(0).get("id"); + if(id != val) { + throw new Exception("Found value:"+id+" expecting:"+val); + } + ++i; + } + return true; + } + + protected boolean assertFields(List tuples, String ... fields) throws Exception{ for(Tuple tuple : tuples){ for(String field : fields){ @@ -1480,6 +1503,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { return true; } + @Override protected void indexr(Object... fields) throws Exception { SolrInputDocument doc = getDoc(fields); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java index 973164a5730..dfaf9d18dc5 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExpessionTest.java @@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.io.stream; */ import org.apache.lucene.util.LuceneTestCase; +import org.apache.solr.client.solrj.io.ops.GroupOperation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; @@ -44,8 +45,9 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase { .withFunctionName("merge", MergeStream.class) .withFunctionName("unique", UniqueStream.class) .withFunctionName("top", RankStream.class) - .withFunctionName("group", ReducerStream.class) - .withFunctionName("stats", StatsStream.class) + .withFunctionName("reduce", ReducerStream.class) + .withFunctionName("group", GroupOperation.class) + .withFunctionName("stats", StatsStream.class) .withFunctionName("count", CountMetric.class) .withFunctionName("sum", SumMetric.class) .withFunctionName("min", MinMetric.class) @@ -153,11 +155,11 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase { String expressionString; // Basic test - stream = new ReducerStream(StreamExpressionParser.parse("group(" + stream = new ReducerStream(StreamExpressionParser.parse("reduce(" + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc, a_f asc\")," - + "by=\"a_s\")"), factory); + + "by=\"a_s\", group(sort=\"a_i desc\", n=\"5\"))"), factory); expressionString = stream.toExpression(factory).toString(); - assertTrue(expressionString.contains("group(search(collection1")); + assertTrue(expressionString.contains("reduce(search(collection1")); assertTrue(expressionString.contains("by=a_s")); } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java index a09acab9fbe..726d3bf1f6b 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java @@ -33,6 +33,7 @@ import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator; import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.eq.FieldEqualitor; +import org.apache.solr.client.solrj.io.ops.GroupOperation; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.client.solrj.io.stream.metrics.Bucket; import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; @@ -117,8 +118,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { .withFunctionName("merge", MergeStream.class) .withFunctionName("unique", UniqueStream.class) .withFunctionName("top", RankStream.class) - .withFunctionName("group", ReducerStream.class) - .withFunctionName("count", RecordCountStream.class) + .withFunctionName("reduce", ReducerStream.class) + .withFunctionName("group", GroupOperation.class) .withFunctionName("rollup", RollupStream.class) .withFunctionName("parallel", ParallelStream.class); } @@ -356,47 +357,48 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { //Test with spaces in the parameter lists. Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); - ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s")); + ReducerStream rstream = new ReducerStream(stream, + new FieldEqualitor("a_s"), + new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5)); List tuples = getTuples(rstream); assert(tuples.size() == 3); - assertOrder(tuples, 0,3,4); Tuple t0 = tuples.get(0); - List maps0 = t0.getMaps(); + List maps0 = t0.getMaps("group"); assertMaps(maps0, 0, 2, 1, 9); Tuple t1 = tuples.get(1); - List maps1 = t1.getMaps(); + List maps1 = t1.getMaps("group"); assertMaps(maps1, 3, 5, 7, 8); Tuple t2 = tuples.get(2); - List maps2 = t2.getMaps(); + List maps2 = t2.getMaps("group"); assertMaps(maps2, 4, 6); //Test with spaces in the parameter lists using a comparator paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc"); - stream = new CloudSolrStream(zkHost, "collection1", paramsA); - rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING)); + stream = new CloudSolrStream(zkHost, "collection1", paramsA); + rstream = new ReducerStream(stream, + new FieldComparator("a_s", ComparatorOrder.ASCENDING), + new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5)); tuples = getTuples(rstream); assert(tuples.size() == 3); - assertOrder(tuples, 0,3,4); t0 = tuples.get(0); - maps0 = t0.getMaps(); - assertMaps(maps0, 0, 2, 1, 9); + maps0 = t0.getMaps("group"); + assertMaps(maps0, 9, 1, 2, 0); t1 = tuples.get(1); - maps1 = t1.getMaps(); - assertMaps(maps1, 3, 5, 7, 8); + maps1 = t1.getMaps("group"); + assertMaps(maps1, 8, 7, 5, 3); t2 = tuples.get(2); - maps2 = t2.getMaps(); - assertMaps(maps2, 4, 6); - + maps2 = t2.getMaps("group"); + assertMaps(maps2, 6, 4); del("*:*"); commit(); @@ -424,7 +426,9 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { //Test with spaces in the parameter lists. Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); - ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s")); + ReducerStream rstream = new ReducerStream(stream, + new FieldEqualitor("a_s"), + new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5)); List tuples = getTuples(rstream); @@ -455,53 +459,57 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); - ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s")); + + ReducerStream rstream = new ReducerStream(stream, + new FieldEqualitor("a_s"), + new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5)); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING)); attachStreamFactory(pstream); List tuples = getTuples(pstream); assert(tuples.size() == 3); - assertOrder(tuples, 0,3,4); Tuple t0 = tuples.get(0); - List maps0 = t0.getMaps(); - assertMaps(maps0, 0, 2, 1, 9); + List maps0 = t0.getMaps("group"); + assertMaps(maps0, 9, 1, 2, 0); Tuple t1 = tuples.get(1); - List maps1 = t1.getMaps(); - assertMaps(maps1, 3, 5, 7, 8); + List maps1 = t1.getMaps("group"); + assertMaps(maps1, 8, 7, 5, 3); Tuple t2 = tuples.get(2); - List maps2 = t2.getMaps(); - assertMaps(maps2, 4, 6); + List maps2 = t2.getMaps("group"); + assertMaps(maps2, 6, 4); //Test Descending with Ascending subsort paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s"); stream = new CloudSolrStream(zkHost, "collection1", paramsA); - rstream = new ReducerStream(stream, new FieldEqualitor("a_s")); + + rstream = new ReducerStream(stream, + new FieldEqualitor("a_s"), + new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 3)); + pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING)); attachStreamFactory(pstream); tuples = getTuples(pstream); assert(tuples.size() == 3); - assertOrder(tuples, 4,3,0); t0 = tuples.get(0); - maps0 = t0.getMaps(); + maps0 = t0.getMaps("group"); assertMaps(maps0, 4, 6); - t1 = tuples.get(1); - maps1 = t1.getMaps(); - assertMaps(maps1, 3, 5, 7, 8); - + maps1 = t1.getMaps("group"); + assertMaps(maps1, 3, 5, 7); t2 = tuples.get(2); - maps2 = t2.getMaps(); - assertMaps(maps2, 0, 2, 1, 9); + maps2 = t2.getMaps("group"); + assertMaps(maps2, 0, 2, 1); @@ -1499,7 +1507,10 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); - ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s")); + ReducerStream rstream = new ReducerStream(stream, + new FieldEqualitor("a_s"), + new GroupOperation(new FieldComparator("a_s", ComparatorOrder.ASCENDING), 2)); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING)); attachStreamFactory(pstream); @@ -1654,7 +1665,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { assertOrder(tuples, 0,1,2,3,4,7,6,8,9); //Test descending - paramsA = mapParams("q","id:(4 1 8 9)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i"); + paramsA = mapParams("q", "id:(4 1 8 9)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i"); streamA = new CloudSolrStream(zkHost, "collection1", paramsA); paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");