SOLR-8198: Change ReducerStream to use StreamEqualitor instead of StreamComparator

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1713204 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Dennis Gove 2015-11-08 03:05:13 +00:00
parent 8309dc9b32
commit 4f00eb8300
5 changed files with 67 additions and 21 deletions

View File

@ -85,6 +85,8 @@ New Features
* SOLR-7938: MergeStream now supports merging more than 2 streams together (Dennis Gove) * SOLR-7938: MergeStream now supports merging more than 2 streams together (Dennis Gove)
* SOLR-8198: Change ReducerStream to use StreamEqualitor instead of StreamComparator (Dennis Gove)
Optimizations Optimizations
---------------------- ----------------------
* SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been * SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been

View File

@ -26,7 +26,11 @@ import java.util.Map;
import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
@ -53,14 +57,33 @@ public class ReducerStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1; private static final long serialVersionUID = 1;
private PushBackStream stream; private PushBackStream stream;
private StreamComparator comp; private StreamEqualitor eq;
private transient Tuple currentGroupHead; private transient Tuple currentGroupHead;
public ReducerStream(TupleStream stream,StreamEqualitor eq) throws IOException {
init(stream,eq);
}
public ReducerStream(TupleStream stream,StreamComparator comp) throws IOException { public ReducerStream(TupleStream stream,StreamComparator comp) throws IOException {
init(stream,comp); init(stream, convertToEqualitor(comp));
} }
private StreamEqualitor convertToEqualitor(StreamComparator comp){
if(comp instanceof MultipleFieldComparator){
MultipleFieldComparator mComp = (MultipleFieldComparator)comp;
StreamEqualitor[] eqs = new StreamEqualitor[mComp.getComps().length];
for(int idx = 0; idx < mComp.getComps().length; ++idx){
eqs[idx] = convertToEqualitor(mComp.getComps()[idx]);
}
return new MultipleFieldEqualitor(eqs);
}
else{
FieldComparator fComp = (FieldComparator)comp;
return new FieldEqualitor(fComp.getFieldName());
}
}
public ReducerStream(StreamExpression expression, StreamFactory factory) throws IOException{ public ReducerStream(StreamExpression expression, StreamFactory factory) throws IOException{
// grab all parameters out // grab all parameters out
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
@ -81,15 +104,15 @@ public class ReducerStream extends TupleStream implements Expressible {
// Reducing is always done over equality, so always use an EqualTo comparator // Reducing is always done over equality, so always use an EqualTo comparator
init(factory.constructStream(streamExpressions.get(0)), init(factory.constructStream(streamExpressions.get(0)),
factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldComparator.class) factory.constructEqualitor(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldEqualitor.class)
); );
} }
private void init(TupleStream stream, StreamComparator comp) throws IOException{ private void init(TupleStream stream, StreamEqualitor eq) throws IOException{
this.stream = new PushBackStream(stream); this.stream = new PushBackStream(stream);
this.comp = comp; this.eq = eq;
if(!comp.isDerivedFrom(stream.getStreamSort())){ if(!eq.isDerivedFrom(stream.getStreamSort())){
throw new IOException("Invalid ReducerStream - substream comparator (sort) must be a superset of this stream's comparator."); throw new IOException("Invalid ReducerStream - substream comparator (sort) must be a superset of this stream's comparator.");
} }
} }
@ -103,8 +126,8 @@ public class ReducerStream extends TupleStream implements Expressible {
expression.addParameter(stream.toExpression(factory)); expression.addParameter(stream.toExpression(factory));
// over // over
if(comp instanceof Expressible){ if(eq instanceof Expressible){
expression.addParameter(new StreamExpressionNamedParameter("by",((Expressible)comp).toExpression(factory))); expression.addParameter(new StreamExpressionNamedParameter("by",((Expressible)eq).toExpression(factory)));
} }
else{ else{
throw new IOException("This ReducerStream contains a non-expressible comparator - it cannot be converted to an expression"); throw new IOException("This ReducerStream contains a non-expressible comparator - it cannot be converted to an expression");
@ -155,7 +178,7 @@ public class ReducerStream extends TupleStream implements Expressible {
currentGroupHead = t; currentGroupHead = t;
maps.add(t.getMap()); maps.add(t.getMap());
} else { } else {
if(comp.compare(currentGroupHead, t) == 0) { if(eq.test(currentGroupHead, t)) {
maps.add(t.getMap()); maps.add(t.getMap());
} else { } else {
Tuple groupHead = currentGroupHead.clone(); Tuple groupHead = currentGroupHead.clone();
@ -170,7 +193,7 @@ public class ReducerStream extends TupleStream implements Expressible {
/** Return the stream sort - ie, the order in which records are returned */ /** Return the stream sort - ie, the order in which records are returned */
public StreamComparator getStreamSort(){ public StreamComparator getStreamSort(){
return comp; return stream.getStreamSort();
} }
public int getCost() { public int getCost() {

View File

@ -444,7 +444,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
// basic // basic
expression = StreamExpressionParser.parse("group(" expression = StreamExpressionParser.parse("group("
+ "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\")," + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+ "by=\"a_s asc\")"); + "by=\"a_s\")");
stream = new ReducerStream(expression, factory); stream = new ReducerStream(expression, factory);
tuples = getTuples(stream); tuples = getTuples(stream);
@ -466,7 +466,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
// basic w/spaces // basic w/spaces
expression = StreamExpressionParser.parse("group(" expression = StreamExpressionParser.parse("group("
+ "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\")," + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc, a_f asc\"),"
+ "by=\"a_s asc\")"); + "by=\"a_s\")");
stream = new ReducerStream(expression, factory); stream = new ReducerStream(expression, factory);
tuples = getTuples(stream); tuples = getTuples(stream);
@ -672,7 +672,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
.withFunctionName("group", ReducerStream.class) .withFunctionName("group", ReducerStream.class)
.withFunctionName("parallel", ParallelStream.class); .withFunctionName("parallel", ParallelStream.class);
ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s asc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")"); ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")");
List<Tuple> tuples = getTuples(pstream); List<Tuple> tuples = getTuples(pstream);
@ -693,7 +693,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
//Test Descending with Ascending subsort //Test Descending with Ascending subsort
pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s desc\")"); pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s desc\")");
tuples = getTuples(pstream); tuples = getTuples(pstream);

View File

@ -133,10 +133,10 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase {
// Basic test // Basic test
stream = new ReducerStream(StreamExpressionParser.parse("group(" stream = new ReducerStream(StreamExpressionParser.parse("group("
+ "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc, a_f asc\")," + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc, a_f asc\"),"
+ "by=\"a_s desc\")"), factory); + "by=\"a_s\")"), factory);
expressionString = stream.toExpression(factory).toString(); expressionString = stream.toExpression(factory).toString();
assertTrue(expressionString.contains("group(search(collection1")); assertTrue(expressionString.contains("group(search(collection1"));
assertTrue(expressionString.contains("by=\"a_s desc\"")); assertTrue(expressionString.contains("by=a_s"));
} }
@Test @Test

View File

@ -355,7 +355,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
//Test with spaces in the parameter lists. //Test with spaces in the parameter lists.
Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc"); Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING)); ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
List<Tuple> tuples = getTuples(rstream); List<Tuple> tuples = getTuples(rstream);
@ -374,6 +374,27 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
List<Map> maps2 = t2.getMaps(); List<Map> maps2 = t2.getMaps();
assertMaps(maps2, 4, 6); assertMaps(maps2, 4, 6);
//Test with spaces in the parameter lists using a comparator
paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc");
stream = new CloudSolrStream(zkHost, "collection1", paramsA);
rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
tuples = getTuples(rstream);
assert(tuples.size() == 3);
assertOrder(tuples, 0,3,4);
t0 = tuples.get(0);
maps0 = t0.getMaps();
assertMaps(maps0, 0, 2, 1, 9);
t1 = tuples.get(1);
maps1 = t1.getMaps();
assertMaps(maps1, 3, 5, 7, 8);
t2 = tuples.get(2);
maps2 = t2.getMaps();
assertMaps(maps2, 4, 6);
del("*:*"); del("*:*");
@ -402,7 +423,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
//Test with spaces in the parameter lists. //Test with spaces in the parameter lists.
Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc"); Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING)); ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
List<Tuple> tuples = getTuples(rstream); List<Tuple> tuples = getTuples(rstream);
@ -433,7 +454,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s"); Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING)); ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING)); ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(pstream); List<Tuple> tuples = getTuples(pstream);
@ -457,7 +478,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s"); paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
stream = new CloudSolrStream(zkHost, "collection1", paramsA); stream = new CloudSolrStream(zkHost, "collection1", paramsA);
rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.DESCENDING)); rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING)); pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
tuples = getTuples(pstream); tuples = getTuples(pstream);
@ -1474,7 +1495,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s"); Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING)); ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING)); ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(pstream); List<Tuple> tuples = getTuples(pstream);