SOLR-7584: Adds Inner and LeftOuter Joins to the Streaming API and Streaming Expressions (Dennis Gove, Corey Wu)

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1713753 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Dennis Gove 2015-11-11 01:45:04 +00:00
parent cad6188624
commit 5c5812940a
17 changed files with 881 additions and 49 deletions

View File

@ -93,6 +93,8 @@ New Features
* SOLR-8268: StatsStream now implements the Expressible interface (Dennis Gove)
* SOLR-7584: Adds Inner and LeftOuter Joins to the Streaming API and Streaming Expressions (Dennis Gove, Corey Wu)
Optimizations
----------------------
* SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been

View File

@ -276,15 +276,17 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
if(sqlVisitor.sorts != null && sqlVisitor.sorts.size() > 0) {
StreamComparator[] adjustedSorts = adjustSorts(sqlVisitor.sorts, buckets);
// Because of the way adjustSorts works we know that each FieldComparator has a single
// field name. For this reason we can just look at the leftFieldName
FieldEqualitor[] fieldEqualitors = new FieldEqualitor[adjustedSorts.length];
StringBuilder buf = new StringBuilder();
for(int i=0; i<adjustedSorts.length; i++) {
FieldComparator fieldComparator = (FieldComparator)adjustedSorts[i];
fieldEqualitors[i] = new FieldEqualitor(fieldComparator.getFieldName());
fieldEqualitors[i] = new FieldEqualitor(fieldComparator.getLeftFieldName());
if(i>0) {
buf.append(",");
}
buf.append(fieldComparator.getFieldName()).append(" ").append(fieldComparator.getOrder().toString());
buf.append(fieldComparator.getLeftFieldName()).append(" ").append(fieldComparator.getOrder().toString());
}
sort = buf.toString();

View File

@ -31,6 +31,8 @@ import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.InnerJoinStream;
import org.apache.solr.client.solrj.io.stream.LeftOuterJoinStream;
import org.apache.solr.client.solrj.io.stream.MergeStream;
import org.apache.solr.client.solrj.io.stream.ParallelStream;
import org.apache.solr.client.solrj.io.stream.RankStream;
@ -100,6 +102,8 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("stats", StatsStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class)
.withFunctionName("leftOuterJoin", LeftOuterJoinStream.class)
// metrics
.withFunctionName("min", MinMetric.class)

View File

@ -141,4 +141,8 @@ public class Tuple implements Cloneable {
Tuple clone = new Tuple(m);
return clone;
}
public void merge(Tuple other){
fields.putAll(other.getMap());
}
}

View File

@ -17,11 +17,7 @@
package org.apache.solr.client.solrj.io.comp;
import java.io.Serializable;
import java.util.Comparator;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
@ -33,28 +29,55 @@ public class FieldComparator implements StreamComparator {
private static final long serialVersionUID = 1;
private String fieldName;
private String leftFieldName;
private String rightFieldName;
private final ComparatorOrder order;
private ComparatorLambda comparator;
public FieldComparator(String fieldName, ComparatorOrder order) {
this.fieldName = fieldName;
public FieldComparator(String fieldName, ComparatorOrder order){
leftFieldName = fieldName;
rightFieldName = fieldName;
this.order = order;
assignComparator();
}
public String getFieldName(){
return fieldName;
public FieldComparator(String leftFieldName, String rightFieldName, ComparatorOrder order) {
this.leftFieldName = leftFieldName;
this.rightFieldName = rightFieldName;
this.order = order;
assignComparator();
}
public void setLeftFieldName(String leftFieldName){
this.leftFieldName = leftFieldName;
}
public String getLeftFieldName(){
return leftFieldName;
}
public void setRightFieldName(String rightFieldName){
this.rightFieldName = rightFieldName;
}
public String getRightFieldName(){
return rightFieldName;
}
public ComparatorOrder getOrder(){
return order;
}
public boolean hasDifferentFieldNames(){
return !leftFieldName.equals(rightFieldName);
}
public StreamExpressionParameter toExpression(StreamFactory factory){
StringBuilder sb = new StringBuilder();
sb.append(fieldName);
sb.append(leftFieldName);
if(!leftFieldName.equals(rightFieldName)){
sb.append("=");
sb.append(rightFieldName);
}
sb.append(" ");
sb.append(order);
@ -76,8 +99,8 @@ public class FieldComparator implements StreamComparator {
comparator = new ComparatorLambda() {
@Override
public int compare(Tuple leftTuple, Tuple rightTuple) {
Comparable leftComp = (Comparable)leftTuple.get(fieldName);
Comparable rightComp = (Comparable)rightTuple.get(fieldName);
Comparable leftComp = (Comparable)leftTuple.get(leftFieldName);
Comparable rightComp = (Comparable)rightTuple.get(rightFieldName);
if(leftComp == rightComp){ return 0; } // if both null then they are equal. if both are same ref then are equal
if(null == leftComp){ return 1; }
@ -92,8 +115,8 @@ public class FieldComparator implements StreamComparator {
comparator = new ComparatorLambda() {
@Override
public int compare(Tuple leftTuple, Tuple rightTuple) {
Comparable leftComp = (Comparable)leftTuple.get(fieldName);
Comparable rightComp = (Comparable)rightTuple.get(fieldName);
Comparable leftComp = (Comparable)leftTuple.get(leftFieldName);
Comparable rightComp = (Comparable)rightTuple.get(rightFieldName);
if(leftComp == rightComp){ return 0; } // if both null then they are equal. if both are same ref then are equal
if(null == leftComp){ return -1; }
@ -114,7 +137,7 @@ public class FieldComparator implements StreamComparator {
if(null == base){ return false; }
if(base instanceof FieldComparator){
FieldComparator baseComp = (FieldComparator)base;
return fieldName.equals(baseComp.fieldName) && order == baseComp.order;
return (leftFieldName.equals(baseComp.leftFieldName) || rightFieldName.equals(baseComp.rightFieldName)) && order == baseComp.order;
}
else if(base instanceof MultipleFieldComparator){
// must equal the first one

View File

@ -27,4 +27,5 @@ package org.apache.solr.client.solrj.io.eq;
*/
public interface Equalitor<T> {
public boolean test(T left, T right);
}

View File

@ -76,6 +76,14 @@ public class FieldEqualitor implements StreamEqualitor {
return 0 == leftComp.compareTo(rightComp);
}
public String getLeftFieldName(){
return leftFieldName;
}
public String getRightFieldName(){
return rightFieldName;
}
@Override
public boolean isDerivedFrom(StreamEqualitor base){
if(null == base){ return false; }
@ -99,7 +107,7 @@ public class FieldEqualitor implements StreamEqualitor {
if(null == base){ return false; }
if(base instanceof FieldComparator){
FieldComparator baseComp = (FieldComparator)base;
return leftFieldName.equals(baseComp.getFieldName()) && rightFieldName.equals(baseComp.getFieldName());
return leftFieldName.equals(baseComp.getLeftFieldName()) || rightFieldName.equals(baseComp.getRightFieldName());
}
else if(base instanceof MultipleFieldComparator){
// must equal the first one
@ -111,11 +119,4 @@ public class FieldEqualitor implements StreamEqualitor {
return false;
}
public String getLeftFieldName(){
return leftFieldName;
}
public String getRightFieldName(){
return rightFieldName;
}
}

View File

@ -46,16 +46,6 @@ public class MultipleFieldEqualitor implements StreamEqualitor {
return eqs;
}
public boolean test(Tuple t1, Tuple t2) {
for(Equalitor<Tuple> eq : eqs) {
if(!eq.test(t1, t2)){
return false;
}
}
return true;
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
StringBuilder sb = new StringBuilder();
@ -108,5 +98,16 @@ public class MultipleFieldEqualitor implements StreamEqualitor {
}
return false;
}
public boolean test(Tuple t1, Tuple t2) {
for(Equalitor<Tuple> eq : eqs) {
if(!eq.test(t1, t2)){
return false;
}
}
return true;
}
}

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.Equalitor;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/**
* Joins leftStream with rightStream based on a Equalitor. Both streams must be sorted by the fields being joined on.
* Resulting stream is sorted by the equalitor.
**/
public abstract class BiJoinStream extends JoinStream implements Expressible {
protected PushBackStream leftStream;
protected PushBackStream rightStream;
// This is used to determine whether we should iterate the left or right side (depending on stream order).
// It is built from the incoming equalitor and streams' comparators.
protected StreamComparator iterationComparator;
protected StreamComparator leftStreamComparator, rightStreamComparator;
public BiJoinStream(TupleStream leftStream, TupleStream rightStream, StreamEqualitor eq) throws IOException {
super(eq, leftStream, rightStream);
init();
}
public BiJoinStream(StreamExpression expression, StreamFactory factory) throws IOException {
super(expression, factory);
init();
}
private void init() throws IOException {
// Validates all incoming streams for tuple order
validateTupleOrder();
leftStream = getStream(0);
rightStream = getStream(1);
// iterationComparator is a combination of the equalitor and the comp from each stream. This can easily be done by
// grabbing the first N parts of each comp where N is the number of parts in the equalitor. Because we've already
// validated tuple order (the comps) then we know this can be done.
iterationComparator = createIterationComparator(eq, leftStream.getStreamSort());
leftStreamComparator = createSideComparator(eq, leftStream.getStreamSort());
rightStreamComparator = createSideComparator(eq, rightStream.getStreamSort());
}
protected void validateTupleOrder() throws IOException {
if (!isValidTupleOrder()) {
throw new IOException(
"Invalid JoinStream - all incoming stream comparators (sort) must be a superset of this stream's equalitor.");
}
}
private StreamComparator createIterationComparator(StreamEqualitor eq, StreamComparator comp) throws IOException {
if (eq instanceof MultipleFieldEqualitor && comp instanceof MultipleFieldComparator) {
// we know the comp is at least as long as the eq because we've already validated the tuple order
StreamComparator[] compoundComps = new StreamComparator[((MultipleFieldEqualitor) eq).getEqs().length];
for (int idx = 0; idx < compoundComps.length; ++idx) {
StreamEqualitor sourceEqualitor = ((MultipleFieldEqualitor) eq).getEqs()[idx];
StreamComparator sourceComparator = ((MultipleFieldComparator) comp).getComps()[idx];
if (sourceEqualitor instanceof FieldEqualitor && sourceComparator instanceof FieldComparator) {
FieldEqualitor fieldEqualitor = (FieldEqualitor) sourceEqualitor;
FieldComparator fieldComparator = (FieldComparator) sourceComparator;
compoundComps[idx] = new FieldComparator(fieldEqualitor.getLeftFieldName(),
fieldEqualitor.getRightFieldName(), fieldComparator.getOrder());
} else {
throw new IOException("Failed to create an iteration comparator");
}
}
return new MultipleFieldComparator(compoundComps);
} else if (comp instanceof MultipleFieldComparator) {
StreamEqualitor sourceEqualitor = eq;
StreamComparator sourceComparator = ((MultipleFieldComparator) comp).getComps()[0];
if (sourceEqualitor instanceof FieldEqualitor && sourceComparator instanceof FieldComparator) {
FieldEqualitor fieldEqualitor = (FieldEqualitor) sourceEqualitor;
FieldComparator fieldComparator = (FieldComparator) sourceComparator;
return new FieldComparator(fieldEqualitor.getLeftFieldName(), fieldEqualitor.getRightFieldName(),
fieldComparator.getOrder());
} else {
throw new IOException("Failed to create an iteration comparator");
}
} else {
StreamEqualitor sourceEqualitor = eq;
StreamComparator sourceComparator = comp;
if (sourceEqualitor instanceof FieldEqualitor && sourceComparator instanceof FieldComparator) {
FieldEqualitor fieldEqualitor = (FieldEqualitor) sourceEqualitor;
FieldComparator fieldComparator = (FieldComparator) sourceComparator;
return new FieldComparator(fieldEqualitor.getLeftFieldName(), fieldEqualitor.getRightFieldName(),
fieldComparator.getOrder());
} else {
throw new IOException("Failed to create an iteration comparator");
}
}
}
private StreamComparator createSideComparator(StreamEqualitor eq, StreamComparator comp) throws IOException {
if (eq instanceof MultipleFieldEqualitor && comp instanceof MultipleFieldComparator) {
// we know the comp is at least as long as the eq because we've already validated the tuple order
StreamComparator[] compoundComps = new StreamComparator[((MultipleFieldEqualitor) eq).getEqs().length];
for (int idx = 0; idx < compoundComps.length; ++idx) {
StreamComparator sourceComparator = ((MultipleFieldComparator) comp).getComps()[idx];
if (sourceComparator instanceof FieldComparator) {
FieldComparator fieldComparator = (FieldComparator) sourceComparator;
compoundComps[idx] = new FieldComparator(fieldComparator.getLeftFieldName(),
fieldComparator.getRightFieldName(), fieldComparator.getOrder());
} else {
throw new IOException("Failed to create an side comparator");
}
}
return new MultipleFieldComparator(compoundComps);
} else if (comp instanceof MultipleFieldComparator) {
StreamComparator sourceComparator = ((MultipleFieldComparator) comp).getComps()[0];
if (sourceComparator instanceof FieldComparator) {
FieldComparator fieldComparator = (FieldComparator) sourceComparator;
return new FieldComparator(fieldComparator.getLeftFieldName(), fieldComparator.getRightFieldName(),
fieldComparator.getOrder());
} else {
throw new IOException("Failed to create an side comparator");
}
} else {
StreamComparator sourceComparator = comp;
if (sourceComparator instanceof FieldComparator) {
FieldComparator fieldComparator = (FieldComparator) sourceComparator;
return new FieldComparator(fieldComparator.getLeftFieldName(), fieldComparator.getRightFieldName(),
fieldComparator.getOrder());
} else {
throw new IOException("Failed to create an side comparator");
}
}
}
}

View File

@ -407,9 +407,9 @@ public class CloudSolrStream extends TupleStream implements Expressible {
protected class TupleWrapper implements Comparable<TupleWrapper> {
private Tuple tuple;
private SolrStream stream;
private Comparator comp;
private StreamComparator comp;
public TupleWrapper(SolrStream stream, Comparator comp) {
public TupleWrapper(SolrStream stream, StreamComparator comp) {
this.stream = stream;
this.comp = comp;
}
@ -449,9 +449,9 @@ public class CloudSolrStream extends TupleStream implements Expressible {
protected class StreamOpener implements Callable<TupleWrapper> {
private SolrStream stream;
private Comparator<Tuple> comp;
private StreamComparator comp;
public StreamOpener(SolrStream stream, Comparator<Tuple> comp) {
public StreamOpener(SolrStream stream, StreamComparator comp) {
this.stream = stream;
this.comp = comp;
}

View File

@ -62,7 +62,7 @@ public class FacetStream extends TupleStream {
Bucket[] buckets,
Metric[] metrics,
FieldComparator[] sorts,
int limit) {
int limit) throws IOException {
this.zkHost = zkHost;
this.props = props;
this.buckets = buckets;
@ -70,6 +70,15 @@ public class FacetStream extends TupleStream {
this.limit = limit;
this.collection = collection;
this.sorts = sorts;
// In a facet world it only makes sense to have the same field name in all of the sorters
// Because FieldComparator allows for left and right field names we will need to validate
// that they are the same
for(FieldComparator sort : sorts){
if(sort.hasDifferentFieldNames()){
throw new IOException("Invalid FacetStream - all sorts must be constructed with a single field name.");
}
}
}
public void setStreamContext(StreamContext context) {
@ -144,7 +153,7 @@ public class FacetStream extends TupleStream {
return _sorts;
} else if(_sorts.length == 1) {
FieldComparator[] adjustedSorts = new FieldComparator[_buckets.length];
if (_sorts[0].getFieldName().contains("(")) {
if (_sorts[0].getLeftFieldName().contains("(")) {
//Its a metric sort so apply the same sort criteria at each level.
for (int i = 0; i < adjustedSorts.length; i++) {
adjustedSorts[i] = _sorts[0];
@ -174,7 +183,7 @@ public class FacetStream extends TupleStream {
buf.append("\"type\":\"terms\"");
buf.append(",\"field\":\""+_buckets[level].toString()+"\"");
buf.append(",\"limit\":"+_limit);
buf.append(",\"sort\":{\""+getFacetSort(_sorts[level].getFieldName(), _metrics)+"\":\""+_sorts[level].getOrder()+"\"}");
buf.append(",\"sort\":{\""+getFacetSort(_sorts[level].getLeftFieldName(), _metrics)+"\":\""+_sorts[level].getOrder()+"\"}");
buf.append(",\"facet\":{");
int metricCount = 0;

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.LinkedList;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/**
* Joins leftStream with rightStream based on a Equalitor. Both streams must be sorted by the fields being joined on.
* Resulting stream is sorted by the equalitor.
**/
public class InnerJoinStream extends BiJoinStream implements Expressible {
private LinkedList<Tuple> joinedTuples = new LinkedList<Tuple>();
private LinkedList<Tuple> leftTupleGroup = new LinkedList<Tuple>();
private LinkedList<Tuple> rightTupleGroup = new LinkedList<Tuple>();
public InnerJoinStream(TupleStream leftStream, TupleStream rightStream, StreamEqualitor eq) throws IOException {
super(leftStream, rightStream, eq);
}
public InnerJoinStream(StreamExpression expression, StreamFactory factory) throws IOException {
super(expression, factory);
}
public Tuple read() throws IOException {
// if we've already figured out the next joined tuple then just return it
if (joinedTuples.size() > 0) {
return joinedTuples.removeFirst();
}
// keep going until we find something to return or (left or right) are empty
while (true) {
if (0 == leftTupleGroup.size()) {
Tuple firstMember = loadEqualTupleGroup(leftStream, leftTupleGroup, leftStreamComparator);
// if first member of group is EOF then we're done
if (firstMember.EOF) {
return firstMember;
}
}
if (0 == rightTupleGroup.size()) {
Tuple firstMember = loadEqualTupleGroup(rightStream, rightTupleGroup, rightStreamComparator);
// if first member of group is EOF then we're done
if (firstMember.EOF) {
return firstMember;
}
}
// At this point we know both left and right groups have at least 1 member
if (eq.test(leftTupleGroup.get(0), rightTupleGroup.get(0))) {
// The groups are equal. Join em together and build the joinedTuples
for (Tuple left : leftTupleGroup) {
for (Tuple right : rightTupleGroup) {
Tuple clone = left.clone();
clone.merge(right);
joinedTuples.add(clone);
}
}
// Cause each to advance next time we need to look
leftTupleGroup.clear();
rightTupleGroup.clear();
return joinedTuples.removeFirst();
} else {
int c = iterationComparator.compare(leftTupleGroup.get(0), rightTupleGroup.get(0));
if (c < 0) {
// advance left
leftTupleGroup.clear();
} else {
// advance right
rightTupleGroup.clear();
}
}
}
}
@Override
public StreamComparator getStreamSort() {
return iterationComparator;
}
}

View File

@ -0,0 +1,196 @@
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Defines a JoinStream which can hold N streams, all joined with the same equalitor */
public abstract class JoinStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1;
private List<PushBackStream> streams;
protected StreamEqualitor eq;
public JoinStream(StreamEqualitor eq, TupleStream first, TupleStream second, TupleStream... others) {
this.streams = new ArrayList<PushBackStream>();
this.eq = eq;
this.streams.add(new PushBackStream(first));
this.streams.add(new PushBackStream(second));
for (TupleStream other : others) {
this.streams.add(new PushBackStream(other));
}
}
protected abstract void validateTupleOrder() throws IOException;
public JoinStream(StreamExpression expression, StreamFactory factory) throws IOException {
// grab all parameters out
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression,
Expressible.class, TupleStream.class);
StreamExpressionNamedParameter onExpression = factory.getNamedOperand(expression, "on");
// validate expression contains only what we want.
if (expression.getParameters().size() != streamExpressions.size() + 1) {
throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - unknown operands found", expression));
}
if (streamExpressions.size() < 2) {
throw new IOException(String.format(Locale.ROOT,
"Invalid expression %s - expecting at least two streams but found %d (must be PushBackStream types)",
expression, streamExpressions.size()));
}
this.streams = new ArrayList<PushBackStream>();
for (StreamExpression streamExpression : streamExpressions) {
this.streams.add(new PushBackStream(factory.constructStream(streamExpression)));
}
if (null == onExpression || !(onExpression.getParameter() instanceof StreamExpressionValue)) {
throw new IOException(String.format(Locale.ROOT,
"Invalid expression %s - expecting single 'on' parameter listing fields to join on but didn't find one",
expression));
}
this.eq = factory.constructEqualitor(((StreamExpressionValue) onExpression.getParameter()).getValue(),
FieldEqualitor.class);
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
for (PushBackStream stream : streams) {
expression.addParameter(stream.toExpression(factory));
}
// on
if (eq instanceof Expressible) {
expression.addParameter(new StreamExpressionNamedParameter("on", ((Expressible) eq).toExpression(factory)));
} else {
throw new IOException(
"This JoinStream contains a non-expressible equalitor - it cannot be converted to an expression");
}
return expression;
}
public void setStreamContext(StreamContext context) {
for (PushBackStream stream : streams) {
stream.setStreamContext(context);
}
}
public void open() throws IOException {
for (PushBackStream stream : streams) {
stream.open();
}
}
public void close() throws IOException {
for (PushBackStream stream : streams) {
stream.close();
}
}
public List<TupleStream> children() {
List<TupleStream> list = new ArrayList<TupleStream>();
for (TupleStream stream : streams) {
list.add(stream);
}
return list;
}
public PushBackStream getStream(int idx) {
if (streams.size() > idx) {
return streams.get(idx);
}
throw new IllegalArgumentException(String.format(Locale.ROOT,"Stream idx=%d doesn't exist. Number of streams is %d", idx,
streams.size()));
}
protected boolean isValidTupleOrder() {
// Validate that the equalitor is derivable from the comparator in each stream. If it is, then we know all stream
// comparators are
// derivable with each other stream
for (TupleStream stream : streams) {
if (!eq.isDerivedFrom(stream.getStreamSort())) {
return false;
}
}
return true;
}
/**
* Given the stream, start from beginning and load group with all tuples that are equal to the first in stream
* (including the first one in the stream). All matched tuples are removed from the stream. Result is at least one
* tuple will be read from the stream and 0 or more tuples will exist in the group. If the first tuple is EOF then the
* group will have 0 items. Else it will have at least one item. The first group member is returned.
*
* @param group
* - should be empty
*/
protected Tuple loadEqualTupleGroup(PushBackStream stream, LinkedList<Tuple> group, StreamComparator groupComparator)
throws IOException {
// Find next set of same tuples from the stream
Tuple firstMember = stream.read();
if (!firstMember.EOF) {
// first in group, implicitly a member
group.add(firstMember);
BREAKPOINT: while (true) {
Tuple nMember = stream.read();
if (!nMember.EOF && 0 == groupComparator.compare(firstMember, nMember)) {
// they are in same group
group.add(nMember);
} else {
stream.pushBack(nMember);
break BREAKPOINT;
}
}
}
return firstMember;
}
public int getCost() {
return 0;
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.LinkedList;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/**
* Joins leftStream with rightStream based on a Equalitor. Both streams must be sorted by the fields being joined on.
* Resulting stream is sorted by the equalitor.
**/
public class LeftOuterJoinStream extends BiJoinStream implements Expressible {
private LinkedList<Tuple> joinedTuples = new LinkedList<Tuple>();
private LinkedList<Tuple> leftTupleGroup = new LinkedList<Tuple>();
private LinkedList<Tuple> rightTupleGroup = new LinkedList<Tuple>();
public LeftOuterJoinStream(TupleStream leftStream, TupleStream rightStream, StreamEqualitor eq) throws IOException {
super(leftStream, rightStream, eq);
}
public LeftOuterJoinStream(StreamExpression expression, StreamFactory factory) throws IOException {
super(expression, factory);
}
public Tuple read() throws IOException {
// if we've already figured out the next joined tuple then just return it
if (joinedTuples.size() > 0) {
return joinedTuples.removeFirst();
}
// keep going until we find something to return or left stream is empty
while (true) {
if (0 == leftTupleGroup.size()) {
Tuple firstMember = loadEqualTupleGroup(leftStream, leftTupleGroup, leftStreamComparator);
// if first member of group is EOF then we're done
if (firstMember.EOF) {
return firstMember;
}
}
if (0 == rightTupleGroup.size()) {
// Load the right tuple group, but don't end if it's EOF
loadEqualTupleGroup(rightStream, rightTupleGroup, rightStreamComparator);
}
// If the right stream is at the EOF, we just return the next element from the left stream
if (0 == rightTupleGroup.size() || rightTupleGroup.get(0).EOF) {
return leftTupleGroup.removeFirst();
}
// At this point we know both left and right groups have at least 1 member
if (eq.test(leftTupleGroup.get(0), rightTupleGroup.get(0))) {
// The groups are equal. Join em together and build the joinedTuples
for (Tuple left : leftTupleGroup) {
for (Tuple right : rightTupleGroup) {
Tuple clone = left.clone();
clone.merge(right);
joinedTuples.add(clone);
}
}
// Cause each to advance next time we need to look
leftTupleGroup.clear();
rightTupleGroup.clear();
return joinedTuples.removeFirst();
} else {
int c = iterationComparator.compare(leftTupleGroup.get(0), rightTupleGroup.get(0));
if (c < 0) {
// If there's no match, we still advance the left stream while returning every element.
// Because it's a left-outer join we still return the left tuple if no match on right.
return leftTupleGroup.removeFirst();
} else {
// advance right
rightTupleGroup.clear();
}
}
}
}
@Override
public StreamComparator getStreamSort() {
return iterationComparator;
}
}

View File

@ -80,7 +80,7 @@ public class ReducerStream extends TupleStream implements Expressible {
}
else{
FieldComparator fComp = (FieldComparator)comp;
return new FieldEqualitor(fComp.getFieldName());
return new FieldEqualitor(fComp.getLeftFieldName(), fComp.getRightFieldName());
}
}
@ -101,8 +101,6 @@ public class ReducerStream extends TupleStream implements Expressible {
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'by' parameter listing fields to group by but didn't find one",expression));
}
// Reducing is always done over equality, so always use an EqualTo comparator
init(factory.constructStream(streamExpressions.get(0)),
factory.constructEqualitor(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldEqualitor.class)
);

View File

@ -204,10 +204,45 @@ public class StreamFactory implements Serializable {
}
return new MultipleFieldComparator(comps);
}
else if(comparatorString.contains("=")){
// expected format is "left=right order"
String[] parts = comparatorString.split("[ =]");
if(parts.length < 3){
throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting 'left=right order'",comparatorString));
}
String leftFieldName = null;
String rightFieldName = null;
String order = null;
for(String part : parts){
// skip empty
if(null == part || 0 == part.trim().length()){ continue; }
// assign each in order
if(null == leftFieldName){
leftFieldName = part.trim();
}
else if(null == rightFieldName){
rightFieldName = part.trim();
}
else if(null == order){
order = part.trim();
break; // we're done, stop looping
}
}
if(null == leftFieldName || null == rightFieldName || null == order){
throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting 'left=right order'",comparatorString));
}
return (StreamComparator)createInstance(comparatorType, new Class[]{ String.class, String.class, ComparatorOrder.class }, new Object[]{ leftFieldName, rightFieldName, ComparatorOrder.fromString(order) });
}
else{
// expected format is "field order"
String[] parts = comparatorString.split(" ");
if(2 != parts.length){
throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting fieldName and order",comparatorString));
throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting 'field order'",comparatorString));
}
String fieldName = parts[0].trim();
@ -254,7 +289,12 @@ public class StreamFactory implements Serializable {
return ctor.newInstance(params);
} catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
throw new IOException(String.format(Locale.ROOT,"Unable to construct instance of %s", clazz.getName()),e);
if(null != e.getMessage()){
throw new IOException(String.format(Locale.ROOT,"Unable to construct instance of %s caused by %s", clazz.getName(), e.getMessage()),e);
}
else{
throw new IOException(String.format(Locale.ROOT,"Unable to construct instance of %s", clazz.getName()),e);
}
}
}

View File

@ -29,11 +29,9 @@ import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
@ -132,6 +130,8 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
testParallelRankStream();
testParallelMergeStream();
testParallelRollupStream();
testInnerJoinStream();
testLeftOuterJoinStream();
}
private void testCloudSolrStream() throws Exception {
@ -411,6 +411,18 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
assert(tuples.size() == 4);
assertOrder(tuples, 0,1,3,4);
// full factory, switch order
stream = factory.constructStream("top("
+ "n=4,"
+ "unique("
+ "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc, a_i desc\"),"
+ "over=\"a_f\"),"
+ "sort=\"a_f asc\")");
tuples = getTuples(stream);
assert(tuples.size() == 4);
assertOrder(tuples, 2,1,3,4);
del("*:*");
commit();
@ -996,6 +1008,152 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
commit();
}
private void testInnerJoinStream() throws Exception {
indexr(id, "1", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9
indexr(id, "15", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9
indexr(id, "2", "side_s", "left", "join1_i", "0", "join2_s", "b", "ident_s", "left_2");
indexr(id, "3", "side_s", "left", "join1_i", "1", "join2_s", "a", "ident_s", "left_3"); // 10
indexr(id, "4", "side_s", "left", "join1_i", "1", "join2_s", "b", "ident_s", "left_4"); // 11
indexr(id, "5", "side_s", "left", "join1_i", "1", "join2_s", "c", "ident_s", "left_5"); // 12
indexr(id, "6", "side_s", "left", "join1_i", "2", "join2_s", "d", "ident_s", "left_6");
indexr(id, "7", "side_s", "left", "join1_i", "3", "join2_s", "e", "ident_s", "left_7"); // 14
indexr(id, "8", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_1", "join3_i", "0"); // 1,15
indexr(id, "9", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_2", "join3_i", "0"); // 1,15
indexr(id, "10", "side_s", "right", "join1_i", "1", "join2_s", "a", "ident_s", "right_3", "join3_i", "1"); // 3
indexr(id, "11", "side_s", "right", "join1_i", "1", "join2_s", "b", "ident_s", "right_4", "join3_i", "1"); // 4
indexr(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1"); // 5
indexr(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2");
indexr(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3"); // 7
commit();
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class);
// Basic test
expression = StreamExpressionParser.parse("innerJoin("
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ "search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
+ "on=\"join1_i=join1_i, join2_s=join2_s\")");
stream = new InnerJoinStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 8);
assertOrder(tuples, 1,1,15,15,3,4,5,7);
// Basic desc
expression = StreamExpressionParser.parse("innerJoin("
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ "search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ "on=\"join1_i=join1_i, join2_s=join2_s\")");
stream = new InnerJoinStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 8);
assertOrder(tuples, 7,3,4,5,1,1,15,15);
// Results in both searches, no join matches
expression = StreamExpressionParser.parse("innerJoin("
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
+ "search(collection1, q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\", aliases=\"id=right.id, join1_i=right.join1_i, join2_s=right.join2_s, ident_s=right.ident_s\"),"
+ "on=\"ident_s=right.ident_s\")");
stream = new InnerJoinStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 0);
// Differing field names
expression = StreamExpressionParser.parse("innerJoin("
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ "search(collection1, q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\", aliases=\"join3_i=aliasesField\"),"
+ "on=\"join1_i=aliasesField, join2_s=join2_s\")");
stream = new InnerJoinStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 8);
assertOrder(tuples, 1,1,15,15,3,4,5,7);
del("*:*");
commit();
}
private void testLeftOuterJoinStream() throws Exception {
indexr(id, "1", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9
indexr(id, "15", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9
indexr(id, "2", "side_s", "left", "join1_i", "0", "join2_s", "b", "ident_s", "left_2");
indexr(id, "3", "side_s", "left", "join1_i", "1", "join2_s", "a", "ident_s", "left_3"); // 10
indexr(id, "4", "side_s", "left", "join1_i", "1", "join2_s", "b", "ident_s", "left_4"); // 11
indexr(id, "5", "side_s", "left", "join1_i", "1", "join2_s", "c", "ident_s", "left_5"); // 12
indexr(id, "6", "side_s", "left", "join1_i", "2", "join2_s", "d", "ident_s", "left_6");
indexr(id, "7", "side_s", "left", "join1_i", "3", "join2_s", "e", "ident_s", "left_7"); // 14
indexr(id, "8", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_1", "join3_i", "0"); // 1,15
indexr(id, "9", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_2", "join3_i", "0"); // 1,15
indexr(id, "10", "side_s", "right", "join1_i", "1", "join2_s", "a", "ident_s", "right_3", "join3_i", "1"); // 3
indexr(id, "11", "side_s", "right", "join1_i", "1", "join2_s", "b", "ident_s", "right_4", "join3_i", "1"); // 4
indexr(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1"); // 5
indexr(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2");
indexr(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3"); // 7
commit();
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("leftOuterJoin", LeftOuterJoinStream.class);
// Basic test
expression = StreamExpressionParser.parse("leftOuterJoin("
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ "search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\"),"
+ "on=\"join1_i=join1_i, join2_s=join2_s\")");
stream = new LeftOuterJoinStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 10);
assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7);
// Basic desc
expression = StreamExpressionParser.parse("leftOuterJoin("
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ "search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\"),"
+ "on=\"join1_i=join1_i, join2_s=join2_s\")");
stream = new LeftOuterJoinStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 10);
assertOrder(tuples, 7,6,3,4,5,1,1,15,15,2);
// Results in both searches, no join matches
expression = StreamExpressionParser.parse("leftOuterJoin("
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\"),"
+ "search(collection1, q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\", aliases=\"id=right.id, join1_i=right.join1_i, join2_s=right.join2_s, ident_s=right.ident_s\"),"
+ "on=\"ident_s=right.ident_s\")");
stream = new LeftOuterJoinStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 8);
assertOrder(tuples, 1,15,2,3,4,5,6,7);
// Differing field names
expression = StreamExpressionParser.parse("leftOuterJoin("
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ "search(collection1, q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\", aliases=\"join3_i=aliasesField\"),"
+ "on=\"join1_i=aliasesField, join2_s=join2_s\")");
stream = new LeftOuterJoinStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 10);
assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7);
del("*:*");
commit();
}
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
tupleStream.open();
List<Tuple> tuples = new ArrayList<Tuple>();