SOLR-8337: Add ReduceOperation and wire it into the ReducerStream

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1719246 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Joel Bernstein 2015-12-11 02:28:25 +00:00
parent 72a5850a20
commit 60dca1b4f9
10 changed files with 350 additions and 134 deletions

View File

@ -114,6 +114,8 @@ New Features
* SOLR-7669: Add SelectStream and Tuple Operations to the Streaming API and Streaming Expressions (Dennis Gove)
* SOLR-8337: Add ReduceOperation and wire it into the ReducerStream (Joel Bernstein)
Bug Fixes
----------------------
* SOLR-8386: Add field option in the new admin UI schema page loads up even when no schemaFactory has been

View File

@ -30,6 +30,7 @@ import java.util.Map.Entry;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.InnerJoinStream;
@ -101,7 +102,8 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
.withFunctionName("merge", MergeStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
.withFunctionName("group", ReducerStream.class)
.withFunctionName("group", GroupOperation.class)
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("stats", StatsStream.class)

View File

@ -119,12 +119,12 @@ public class Tuple implements Cloneable {
return this.fields;
}
public List<Map> getMaps() {
return (List<Map>)this.fields.get("_MAPS_");
public List<Map> getMaps(Object key) {
return (List<Map>)this.fields.get(key);
}
public void setMaps(List<Map> maps) {
this.fields.put("_MAPS_", maps);
public void setMaps(Object key, List<Map> maps) {
this.fields.put(key, maps);
}
public Map<String,Map> getMetrics() {

View File

@ -0,0 +1,129 @@
package org.apache.solr.client.solrj.io.ops;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.ArrayList;
import java.util.Locale;
import java.util.Map;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.PriorityQueue;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class GroupOperation implements ReduceOperation {
private PriorityQueue<Tuple> priorityQueue;
private Comparator comp;
private StreamComparator streamComparator;
private int size;
public GroupOperation(StreamExpression expression, StreamFactory factory) throws IOException {
StreamExpressionNamedParameter nParam = factory.getNamedOperand(expression, "n");
StreamExpressionNamedParameter sortExpression = factory.getNamedOperand(expression, "sort");
StreamComparator streamComparator = factory.constructComparator(((StreamExpressionValue) sortExpression.getParameter()).getValue(), FieldComparator.class);
String nStr = ((StreamExpressionValue)nParam.getParameter()).getValue();
int nInt = 0;
try{
nInt = Integer.parseInt(nStr);
if(nInt <= 0){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - topN '%s' must be greater than 0.",expression, nStr));
}
} catch(NumberFormatException e) {
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - topN '%s' is not a valid integer.",expression, nStr));
}
init(streamComparator, nInt);
}
public GroupOperation(StreamComparator streamComparator, int size) {
init(streamComparator, size);
}
private void init(StreamComparator streamComparator, int size) {
this.size = size;
this.streamComparator = streamComparator;
this.comp = new ReverseComp(streamComparator);
this.priorityQueue = new PriorityQueue(size, this.comp);
}
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// n
expression.addParameter(new StreamExpressionNamedParameter("n", Integer.toString(size)));
// sort
expression.addParameter(new StreamExpressionNamedParameter("sort", streamComparator.toExpression(factory)));
return expression;
}
public Tuple reduce() {
Map map = new HashMap();
List<Map> list = new ArrayList();
LinkedList ll = new LinkedList();
while(priorityQueue.size() > 0) {
ll.addFirst(priorityQueue.poll().getMap());
//This will clear priority queue and so it will be ready for the next group.
}
list.addAll(ll);
Map groupHead = list.get(0);
map.putAll(groupHead);
map.put("group", list);
return new Tuple(map);
}
public void operate(Tuple tuple) {
if(priorityQueue.size() >= size) {
Tuple peek = priorityQueue.peek();
if(streamComparator.compare(tuple, peek) < 0) {
priorityQueue.poll();
priorityQueue.add(tuple);
}
} else {
priorityQueue.add(tuple);
}
}
class ReverseComp implements Comparator<Tuple>, Serializable {
private StreamComparator comp;
public ReverseComp(StreamComparator comp) {
this.comp = comp;
}
public int compare(Tuple t1, Tuple t2) {
return comp.compare(t1, t2)*(-1);
}
}
}

View File

@ -0,0 +1,25 @@
package org.apache.solr.client.solrj.io.ops;
import org.apache.solr.client.solrj.io.Tuple;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public interface ReduceOperation extends StreamOperation {
public Tuple reduce();
}

View File

@ -161,8 +161,8 @@ public class RankStream extends TupleStream implements Expressible {
topList.addLast(tuple);
break;
} else {
Tuple peek = top.peek();
if(top.size() >= size) {
Tuple peek = top.peek();
if(comp.compare(tuple, peek) < 0) {
top.poll();
top.add(tuple);

View File

@ -31,6 +31,8 @@ import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.ops.ReduceOperation;
import org.apache.solr.client.solrj.io.ops.StreamOperation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
@ -58,15 +60,17 @@ public class ReducerStream extends TupleStream implements Expressible {
private PushBackStream stream;
private StreamEqualitor eq;
private ReduceOperation op;
private boolean needsReduce;
private transient Tuple currentGroupHead;
public ReducerStream(TupleStream stream,StreamEqualitor eq) throws IOException {
init(stream,eq);
public ReducerStream(TupleStream stream, StreamEqualitor eq, ReduceOperation op) throws IOException {
init(stream, eq, op);
}
public ReducerStream(TupleStream stream,StreamComparator comp) throws IOException {
init(stream, convertToEqualitor(comp));
public ReducerStream(TupleStream stream, StreamComparator comp, ReduceOperation op) throws IOException {
init(stream, convertToEqualitor(comp), op);
}
private StreamEqualitor convertToEqualitor(StreamComparator comp){
@ -88,9 +92,10 @@ public class ReducerStream extends TupleStream implements Expressible {
// grab all parameters out
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
StreamExpressionNamedParameter byExpression = factory.getNamedOperand(expression, "by");
List<StreamExpression> operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ReduceOperation.class);
// validate expression contains only what we want.
if(expression.getParameters().size() != streamExpressions.size() + 1){
if(expression.getParameters().size() != streamExpressions.size() + 2){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
}
@ -100,15 +105,29 @@ public class ReducerStream extends TupleStream implements Expressible {
if(null == byExpression || !(byExpression.getParameter() instanceof StreamExpressionValue)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'by' parameter listing fields to group by but didn't find one",expression));
}
ReduceOperation reduceOperation = null;
if(operationExpressions != null && operationExpressions.size() == 1) {
StreamExpression ex = operationExpressions.get(0);
StreamOperation operation = factory.constructOperation(ex);
if(operation instanceof ReduceOperation) {
reduceOperation = (ReduceOperation) operation;
} else {
throw new IOException("The ReducerStream requires a ReduceOperation. A StreamOperation was provided.");
}
} else {
throw new IOException("The ReducerStream requires a ReduceOperation.");
}
init(factory.constructStream(streamExpressions.get(0)),
factory.constructEqualitor(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldEqualitor.class)
);
factory.constructEqualitor(((StreamExpressionValue) byExpression.getParameter()).getValue(), FieldEqualitor.class),
reduceOperation);
}
private void init(TupleStream stream, StreamEqualitor eq) throws IOException{
private void init(TupleStream stream, StreamEqualitor eq, ReduceOperation op) throws IOException{
this.stream = new PushBackStream(stream);
this.eq = eq;
this.op = op;
if(!eq.isDerivedFrom(stream.getStreamSort())){
throw new IOException("Invalid ReducerStream - substream comparator (sort) must be a superset of this stream's comparator.");
@ -130,6 +149,12 @@ public class ReducerStream extends TupleStream implements Expressible {
else{
throw new IOException("This ReducerStream contains a non-expressible comparator - it cannot be converted to an expression");
}
if(op instanceof Expressible) {
expression.addParameter(op.toExpression(factory));
} else {
throw new IOException("This ReducerStream contains a non-expressible operation - it cannot be converted to an expression");
}
return expression;
}
@ -154,19 +179,14 @@ public class ReducerStream extends TupleStream implements Expressible {
public Tuple read() throws IOException {
List<Map> maps = new ArrayList();
while(true) {
Tuple t = stream.read();
if(t.EOF) {
if(maps.size() > 0) {
if(needsReduce) {
stream.pushBack(t);
Map map1 = maps.get(0);
Map map2 = new HashMap();
map2.putAll(map1);
Tuple groupHead = new Tuple(map2);
groupHead.setMaps(maps);
return groupHead;
needsReduce = false;
return op.reduce();
} else {
return t;
}
@ -174,16 +194,17 @@ public class ReducerStream extends TupleStream implements Expressible {
if(currentGroupHead == null) {
currentGroupHead = t;
maps.add(t.getMap());
op.operate(t);
needsReduce = true;
} else {
if(eq.test(currentGroupHead, t)) {
maps.add(t.getMap());
op.operate(t);
needsReduce = true;
} else {
Tuple groupHead = currentGroupHead.clone();
stream.pushBack(t);
currentGroupHead = null;
groupHead.setMaps(maps);
return groupHead;
needsReduce = false;
return op.reduce();
}
}
}

View File

@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
@ -159,8 +160,8 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
tuples = getTuples(stream);
assert(tuples.size() == 5);
assertOrder(tuples, 0,2,1,3,4);
assertLong(tuples.get(0),"a_i", 0);
assertOrder(tuples, 0, 2, 1, 3, 4);
assertLong(tuples.get(0), "a_i", 0);
// Basic w/aliases
expression = StreamExpressionParser.parse("search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", aliases=\"a_i=alias.a_i, a_s=name\")");
@ -168,9 +169,9 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
tuples = getTuples(stream);
assert(tuples.size() == 5);
assertOrder(tuples, 0,2,1,3,4);
assertLong(tuples.get(0),"alias.a_i", 0);
assertString(tuples.get(0),"name", "hello0");
assertOrder(tuples, 0, 2, 1, 3, 4);
assertLong(tuples.get(0), "alias.a_i", 0);
assertString(tuples.get(0), "name", "hello0");
// Basic filtered test
expression = StreamExpressionParser.parse("search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
@ -178,8 +179,8 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
tuples = getTuples(stream);
assert(tuples.size() == 3);
assertOrder(tuples, 0,3,4);
assertLong(tuples.get(1),"a_i", 3);
assertOrder(tuples, 0, 3, 4);
assertLong(tuples.get(1), "a_i", 3);
del("*:*");
commit();
@ -216,7 +217,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
assert(tuples.size() == 5);
assertOrder(tuples, 0,2,1,3,4);
assertLong(tuples.get(0),"alias.a_i", 0);
assertString(tuples.get(0),"name", "hello0");
assertString(tuples.get(0), "name", "hello0");
// Basic filtered test
expression = StreamExpressionParser.parse("search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", zkHost=" + zkServer.getZkAddress() + ", sort=\"a_f asc, a_i asc\")");
@ -225,7 +226,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
assert(tuples.size() == 3);
assertOrder(tuples, 0,3,4);
assertLong(tuples.get(1),"a_i", 3);
assertLong(tuples.get(1), "a_i", 3);
del("*:*");
commit();
@ -453,54 +454,57 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
List<Map> maps0, maps1, maps2;
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
.withFunctionName("group", ReducerStream.class);
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("group", GroupOperation.class);
// basic
expression = StreamExpressionParser.parse("group("
+ "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+ "by=\"a_s\")");
stream = new ReducerStream(expression, factory);
expression = StreamExpressionParser.parse("reduce("
+ "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+ "by=\"a_s\","
+ "group(sort=\"a_f desc\", n=\"4\"))");
stream = factory.constructStream(expression);
tuples = getTuples(stream);
assert(tuples.size() == 3);
assertOrder(tuples, 0,3,4);
t0 = tuples.get(0);
maps0 = t0.getMaps();
assertMaps(maps0, 0, 2,1, 9);
maps0 = t0.getMaps("group");
assertMaps(maps0, 9, 1, 2, 0);
t1 = tuples.get(1);
maps1 = t1.getMaps();
assertMaps(maps1, 3, 5, 7, 8);
maps1 = t1.getMaps("group");
assertMaps(maps1, 8, 7, 5, 3);
t2 = tuples.get(2);
maps2 = t2.getMaps();
assertMaps(maps2, 4, 6);
maps2 = t2.getMaps("group");
assertMaps(maps2, 6, 4);
// basic w/spaces
expression = StreamExpressionParser.parse("group("
+ "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+ "by=\"a_s\")");
stream = new ReducerStream(expression, factory);
expression = StreamExpressionParser.parse("reduce("
+ "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+ "by=\"a_s\"," +
"group(sort=\"a_i asc\", n=\"2\"))");
stream = factory.constructStream(expression);
tuples = getTuples(stream);
assert(tuples.size() == 3);
assertOrder(tuples, 0,3,4);
t0 = tuples.get(0);
maps0 = t0.getMaps();
assertMaps(maps0, 0, 2,1, 9);
maps0 = t0.getMaps("group");
assert(maps0.size() == 2);
assertMaps(maps0, 0, 1);
t1 = tuples.get(1);
maps1 = t1.getMaps();
assertMaps(maps1, 3, 5, 7, 8);
maps1 = t1.getMaps("group");
assertMaps(maps1, 3, 5);
t2 = tuples.get(2);
maps2 = t2.getMaps();
maps2 = t2.getMaps("group");
assertMaps(maps2, 4, 6);
del("*:*");
@ -748,54 +752,58 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
String zkHost = zkServer.getZkAddress();
StreamFactory streamFactory = new StreamFactory().withCollectionZkHost("collection1", zkServer.getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
.withFunctionName("group", ReducerStream.class)
.withFunctionName("group", GroupOperation.class)
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("parallel", ParallelStream.class);
ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")");
ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1," +
"reduce(" +
"search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), " +
"by=\"a_s\"," +
"group(sort=\"a_i asc\", n=\"5\")), " +
"workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")");
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 3);
assertOrder(tuples, 0,3,4);
Tuple t0 = tuples.get(0);
List<Map> maps0 = t0.getMaps();
assertMaps(maps0, 0, 2, 1, 9);
List<Map> maps0 = t0.getMaps("group");
assertMaps(maps0, 0, 1, 2, 9);
Tuple t1 = tuples.get(1);
List<Map> maps1 = t1.getMaps();
List<Map> maps1 = t1.getMaps("group");
assertMaps(maps1, 3, 5, 7, 8);
Tuple t2 = tuples.get(2);
List<Map> maps2 = t2.getMaps();
List<Map> maps2 = t2.getMaps("group");
assertMaps(maps2, 4, 6);
//Test Descending with Ascending subsort
pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s desc\")");
pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, " +
"reduce(" +
"search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), " +
"by=\"a_s\", " +
"group(sort=\"a_i desc\", n=\"5\")),"+
"workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s desc\")");
tuples = getTuples(pstream);
assert(tuples.size() == 3);
assertOrder(tuples, 4,3,0);
t0 = tuples.get(0);
maps0 = t0.getMaps();
assertMaps(maps0, 4, 6);
maps0 = t0.getMaps("group");
assertMaps(maps0, 6, 4);
t1 = tuples.get(1);
maps1 = t1.getMaps();
assertMaps(maps1, 3, 5, 7, 8);
maps1 = t1.getMaps("group");
assertMaps(maps1, 8, 7, 5, 3);
t2 = tuples.get(2);
maps2 = t2.getMaps();
assertMaps(maps2, 0, 2, 1, 9);
maps2 = t2.getMaps("group");
assertMaps(maps2, 9, 2, 1, 0);
del("*:*");
commit();
@ -1246,7 +1254,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("outerHashJoin", OuterHashJoinStream.class);
@ -1262,9 +1270,9 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
// Basic desc
expression = StreamExpressionParser.parse("outerHashJoin("
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ "hashed=search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ "on=\"join1_i, join2_s\")");
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ "hashed=search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ "on=\"join1_i, join2_s\")");
stream = new OuterHashJoinStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 10);
@ -1272,9 +1280,9 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
// Results in both searches, no join matches
expression = StreamExpressionParser.parse("outerHashJoin("
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
+ "hashed=search(collection1, q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
+ "on=\"ident_s\")");
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
+ "hashed=search(collection1, q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
+ "on=\"ident_s\")");
stream = new OuterHashJoinStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 8);
@ -1406,7 +1414,22 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
}
return true;
}
protected boolean assertMapOrder(List<Tuple> tuples, int... ids) throws Exception {
int i = 0;
for(int val : ids) {
Tuple t = tuples.get(i);
List<Map> tip = t.getMaps("group");
int id = (int)tip.get(0).get("id");
if(id != val) {
throw new Exception("Found value:"+id+" expecting:"+val);
}
++i;
}
return true;
}
protected boolean assertFields(List<Tuple> tuples, String ... fields) throws Exception{
for(Tuple tuple : tuples){
for(String field : fields){
@ -1480,6 +1503,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
return true;
}
@Override
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = getDoc(fields);

View File

@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.io.stream;
*/
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
@ -44,8 +45,9 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase {
.withFunctionName("merge", MergeStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
.withFunctionName("group", ReducerStream.class)
.withFunctionName("stats", StatsStream.class)
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("group", GroupOperation.class)
.withFunctionName("stats", StatsStream.class)
.withFunctionName("count", CountMetric.class)
.withFunctionName("sum", SumMetric.class)
.withFunctionName("min", MinMetric.class)
@ -153,11 +155,11 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase {
String expressionString;
// Basic test
stream = new ReducerStream(StreamExpressionParser.parse("group("
stream = new ReducerStream(StreamExpressionParser.parse("reduce("
+ "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc, a_f asc\"),"
+ "by=\"a_s\")"), factory);
+ "by=\"a_s\", group(sort=\"a_i desc\", n=\"5\"))"), factory);
expressionString = stream.toExpression(factory).toString();
assertTrue(expressionString.contains("group(search(collection1"));
assertTrue(expressionString.contains("reduce(search(collection1"));
assertTrue(expressionString.contains("by=a_s"));
}

View File

@ -33,6 +33,7 @@ import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
@ -117,8 +118,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
.withFunctionName("merge", MergeStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
.withFunctionName("group", ReducerStream.class)
.withFunctionName("count", RecordCountStream.class)
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("group", GroupOperation.class)
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("parallel", ParallelStream.class);
}
@ -356,47 +357,48 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
//Test with spaces in the parameter lists.
Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
List<Tuple> tuples = getTuples(rstream);
assert(tuples.size() == 3);
assertOrder(tuples, 0,3,4);
Tuple t0 = tuples.get(0);
List<Map> maps0 = t0.getMaps();
List<Map> maps0 = t0.getMaps("group");
assertMaps(maps0, 0, 2, 1, 9);
Tuple t1 = tuples.get(1);
List<Map> maps1 = t1.getMaps();
List<Map> maps1 = t1.getMaps("group");
assertMaps(maps1, 3, 5, 7, 8);
Tuple t2 = tuples.get(2);
List<Map> maps2 = t2.getMaps();
List<Map> maps2 = t2.getMaps("group");
assertMaps(maps2, 4, 6);
//Test with spaces in the parameter lists using a comparator
paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc");
stream = new CloudSolrStream(zkHost, "collection1", paramsA);
rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
stream = new CloudSolrStream(zkHost, "collection1", paramsA);
rstream = new ReducerStream(stream,
new FieldComparator("a_s", ComparatorOrder.ASCENDING),
new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
tuples = getTuples(rstream);
assert(tuples.size() == 3);
assertOrder(tuples, 0,3,4);
t0 = tuples.get(0);
maps0 = t0.getMaps();
assertMaps(maps0, 0, 2, 1, 9);
maps0 = t0.getMaps("group");
assertMaps(maps0, 9, 1, 2, 0);
t1 = tuples.get(1);
maps1 = t1.getMaps();
assertMaps(maps1, 3, 5, 7, 8);
maps1 = t1.getMaps("group");
assertMaps(maps1, 8, 7, 5, 3);
t2 = tuples.get(2);
maps2 = t2.getMaps();
assertMaps(maps2, 4, 6);
maps2 = t2.getMaps("group");
assertMaps(maps2, 6, 4);
del("*:*");
commit();
@ -424,7 +426,9 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
//Test with spaces in the parameter lists.
Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
List<Tuple> tuples = getTuples(rstream);
@ -455,53 +459,57 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 3);
assertOrder(tuples, 0,3,4);
Tuple t0 = tuples.get(0);
List<Map> maps0 = t0.getMaps();
assertMaps(maps0, 0, 2, 1, 9);
List<Map> maps0 = t0.getMaps("group");
assertMaps(maps0, 9, 1, 2, 0);
Tuple t1 = tuples.get(1);
List<Map> maps1 = t1.getMaps();
assertMaps(maps1, 3, 5, 7, 8);
List<Map> maps1 = t1.getMaps("group");
assertMaps(maps1, 8, 7, 5, 3);
Tuple t2 = tuples.get(2);
List<Map> maps2 = t2.getMaps();
assertMaps(maps2, 4, 6);
List<Map> maps2 = t2.getMaps("group");
assertMaps(maps2, 6, 4);
//Test Descending with Ascending subsort
paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
stream = new CloudSolrStream(zkHost, "collection1", paramsA);
rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 3));
pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
attachStreamFactory(pstream);
tuples = getTuples(pstream);
assert(tuples.size() == 3);
assertOrder(tuples, 4,3,0);
t0 = tuples.get(0);
maps0 = t0.getMaps();
maps0 = t0.getMaps("group");
assertMaps(maps0, 4, 6);
t1 = tuples.get(1);
maps1 = t1.getMaps();
assertMaps(maps1, 3, 5, 7, 8);
maps1 = t1.getMaps("group");
assertMaps(maps1, 3, 5, 7);
t2 = tuples.get(2);
maps2 = t2.getMaps();
assertMaps(maps2, 0, 2, 1, 9);
maps2 = t2.getMaps("group");
assertMaps(maps2, 0, 2, 1);
@ -1499,7 +1507,10 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
new GroupOperation(new FieldComparator("a_s", ComparatorOrder.ASCENDING), 2));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
@ -1654,7 +1665,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
assertOrder(tuples, 0,1,2,3,4,7,6,8,9);
//Test descending
paramsA = mapParams("q","id:(4 1 8 9)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");
paramsA = mapParams("q", "id:(4 1 8 9)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i");
streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");