mirror of https://github.com/apache/lucene.git
SOLR-7554: Add checks in Streams for incoming stream order
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1687258 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
eb1542030e
commit
33a4099b98
|
@ -32,8 +32,9 @@ import com.google.common.base.Strings;
|
|||
import com.google.common.collect.Iterables;
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
|
||||
import org.apache.solr.client.solrj.io.comp.FieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.MultiComp;
|
||||
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
|
||||
import org.apache.solr.client.solrj.io.stream.ParallelStream;
|
||||
import org.apache.solr.client.solrj.io.stream.RankStream;
|
||||
|
@ -171,7 +172,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
|
|||
if(numWorkers > 1) {
|
||||
// Do the rollups in parallel
|
||||
// Maintain the sort of the Tuples coming from the workers.
|
||||
Comparator<Tuple> comp = bucketSortComp(buckets, sortDirection);
|
||||
StreamComparator comp = bucketSortComp(buckets, sortDirection);
|
||||
tupleStream = new ParallelStream(workerZkHost, workerCollection, tupleStream, numWorkers, comp);
|
||||
}
|
||||
|
||||
|
@ -185,7 +186,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
|
|||
if(sqlVisitor.sorts != null && sqlVisitor.sorts.size() > 0) {
|
||||
if(!sortsEqual(buckets, sortDirection, sqlVisitor.sorts)) {
|
||||
int limit = sqlVisitor.limit == -1 ? 100 : sqlVisitor.limit;
|
||||
Comparator<Tuple> comp = getComp(sqlVisitor.sorts);
|
||||
StreamComparator comp = getComp(sqlVisitor.sorts);
|
||||
//Rank the Tuples
|
||||
//If parallel stream is used ALL the Rolled up tuples from the workers will be ranked
|
||||
//Providing a true Top or Bottom.
|
||||
|
@ -311,35 +312,35 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
|
|||
return "asc";
|
||||
}
|
||||
|
||||
private static Comparator<Tuple> bucketSortComp(Bucket[] buckets, String dir) {
|
||||
Comparator<Tuple>[] comps = new Comparator[buckets.length];
|
||||
private static StreamComparator bucketSortComp(Bucket[] buckets, String dir) {
|
||||
FieldComparator[] comps = new FieldComparator[buckets.length];
|
||||
for(int i=0; i<buckets.length; i++) {
|
||||
ComparatorOrder comparatorOrder = ascDescComp(dir);
|
||||
String sortKey = buckets[i].toString();
|
||||
comps[i] = new StreamComparator(stripQuotes(sortKey), comparatorOrder);
|
||||
comps[i] = new FieldComparator(stripQuotes(sortKey), comparatorOrder);
|
||||
}
|
||||
|
||||
if(comps.length == 1) {
|
||||
return comps[0];
|
||||
} else {
|
||||
return new MultiComp(comps);
|
||||
return new MultipleFieldComparator(comps);
|
||||
}
|
||||
}
|
||||
|
||||
private static Comparator<Tuple> getComp(List<SortItem> sortItems) {
|
||||
Comparator<Tuple>[] comps = new Comparator[sortItems.size()];
|
||||
private static StreamComparator getComp(List<SortItem> sortItems) {
|
||||
FieldComparator[] comps = new FieldComparator[sortItems.size()];
|
||||
for(int i=0; i<sortItems.size(); i++) {
|
||||
SortItem sortItem = sortItems.get(i);
|
||||
String ordering = sortItem.getOrdering().toString();
|
||||
ComparatorOrder comparatorOrder = ascDescComp(ordering);
|
||||
String sortKey = sortItem.getSortKey().toString();
|
||||
comps[i] = new StreamComparator(stripQuotes(sortKey), comparatorOrder);
|
||||
comps[i] = new FieldComparator(stripQuotes(sortKey), comparatorOrder);
|
||||
}
|
||||
|
||||
if(comps.length == 1) {
|
||||
return comps[0];
|
||||
} else {
|
||||
return new MultiComp(comps);
|
||||
return new MultipleFieldComparator(comps);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -671,6 +672,10 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
|
|||
return children;
|
||||
}
|
||||
|
||||
public StreamComparator getStreamSort(){
|
||||
return stream.getStreamSort();
|
||||
}
|
||||
|
||||
public void setStreamContext(StreamContext context) {
|
||||
stream.setStreamContext(context);
|
||||
}
|
||||
|
@ -708,6 +713,10 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
|
|||
this.stream.close();
|
||||
}
|
||||
|
||||
public StreamComparator getStreamSort(){
|
||||
return stream.getStreamSort();
|
||||
}
|
||||
|
||||
public List<TupleStream> children() {
|
||||
List<TupleStream> children = new ArrayList();
|
||||
children.add(stream);
|
||||
|
|
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.io.comp;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
|
||||
/**
|
||||
* An equality field Comparator which compares a field of two Tuples and determines sort order.
|
||||
**/
|
||||
public class FieldComparator implements StreamComparator {
|
||||
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
private String fieldName;
|
||||
private final ComparatorOrder order;
|
||||
private ComparatorLambda comparator;
|
||||
|
||||
public FieldComparator(String fieldName, ComparatorOrder order) {
|
||||
this.fieldName = fieldName;
|
||||
this.order = order;
|
||||
assignComparator();
|
||||
}
|
||||
|
||||
public String getFieldName(){
|
||||
return fieldName;
|
||||
}
|
||||
|
||||
public ComparatorOrder getOrder(){
|
||||
return order;
|
||||
}
|
||||
|
||||
public StreamExpressionParameter toExpression(StreamFactory factory){
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
sb.append(fieldName);
|
||||
sb.append(" ");
|
||||
sb.append(order);
|
||||
|
||||
return new StreamExpressionValue(sb.toString());
|
||||
}
|
||||
|
||||
/*
|
||||
* What're we doing here messing around with lambdas for the comparator logic?
|
||||
* We want the compare(...) function to run as fast as possible because it will be called many many
|
||||
* times over the lifetime of this object. For that reason we want to limit the number of comparisons
|
||||
* taking place in the compare(...) function. Because this class supports both ascending and
|
||||
* descending comparisons and the logic for each is slightly different, we want to do the
|
||||
* if(ascending){ compare like this } else { compare like this }
|
||||
* check only once - we can do that in the constructor of this class, create a lambda, and then execute
|
||||
* that lambda in the compare function. A little bit of branch prediction savings right here.
|
||||
*/
|
||||
private void assignComparator(){
|
||||
if(ComparatorOrder.DESCENDING == order){
|
||||
comparator = new ComparatorLambda() {
|
||||
@Override
|
||||
public int compare(Tuple leftTuple, Tuple rightTuple) {
|
||||
Comparable leftComp = (Comparable)leftTuple.get(fieldName);
|
||||
Comparable rightComp = (Comparable)rightTuple.get(fieldName);
|
||||
|
||||
if(leftComp == rightComp){ return 0; } // if both null then they are equal. if both are same ref then are equal
|
||||
if(null == leftComp){ return 1; }
|
||||
if(null == rightComp){ return -1; }
|
||||
|
||||
return rightComp.compareTo(leftComp);
|
||||
}
|
||||
};
|
||||
}
|
||||
else{
|
||||
// See above for black magic reasoning.
|
||||
comparator = new ComparatorLambda() {
|
||||
@Override
|
||||
public int compare(Tuple leftTuple, Tuple rightTuple) {
|
||||
Comparable leftComp = (Comparable)leftTuple.get(fieldName);
|
||||
Comparable rightComp = (Comparable)rightTuple.get(fieldName);
|
||||
|
||||
if(leftComp == rightComp){ return 0; } // if both null then they are equal. if both are same ref then are equal
|
||||
if(null == leftComp){ return -1; }
|
||||
if(null == rightComp){ return 1; }
|
||||
|
||||
return leftComp.compareTo(rightComp);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public int compare(Tuple leftTuple, Tuple rightTuple) {
|
||||
return comparator.compare(leftTuple, rightTuple);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDerivedFrom(StreamComparator base){
|
||||
if(null == base){ return false; }
|
||||
if(base instanceof FieldComparator){
|
||||
FieldComparator baseComp = (FieldComparator)base;
|
||||
return fieldName.equals(baseComp.fieldName) && order == baseComp.order;
|
||||
}
|
||||
else if(base instanceof MultipleFieldComparator){
|
||||
// must equal the first one
|
||||
MultipleFieldComparator baseComps = (MultipleFieldComparator)base;
|
||||
if(baseComps.getComps().length > 0){
|
||||
return isDerivedFrom(baseComps.getComps()[0]);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -32,18 +32,22 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
|||
* Wraps multiple Comparators to provide sub-sorting.
|
||||
**/
|
||||
|
||||
public class MultiComp implements Comparator<Tuple>, Expressible, Serializable {
|
||||
public class MultipleFieldComparator implements StreamComparator {
|
||||
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
private Comparator<Tuple>[] comps;
|
||||
private StreamComparator[] comps;
|
||||
|
||||
public MultiComp(Comparator<Tuple>... comps) {
|
||||
public MultipleFieldComparator(StreamComparator... comps) {
|
||||
this.comps = comps;
|
||||
}
|
||||
|
||||
public StreamComparator[] getComps(){
|
||||
return comps;
|
||||
}
|
||||
|
||||
public int compare(Tuple t1, Tuple t2) {
|
||||
for(Comparator<Tuple> comp : comps) {
|
||||
for(StreamComparator comp : comps) {
|
||||
int i = comp.compare(t1, t2);
|
||||
if(i != 0) {
|
||||
return i;
|
||||
|
@ -56,7 +60,7 @@ public class MultiComp implements Comparator<Tuple>, Expressible, Serializable {
|
|||
@Override
|
||||
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(Comparator<Tuple> comp : comps){
|
||||
for(StreamComparator comp : comps){
|
||||
if(comp instanceof Expressible){
|
||||
if(sb.length() > 0){ sb.append(","); }
|
||||
sb.append(((Expressible)comp).toExpression(factory));
|
||||
|
@ -68,4 +72,24 @@ public class MultiComp implements Comparator<Tuple>, Expressible, Serializable {
|
|||
|
||||
return new StreamExpressionValue(sb.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDerivedFrom(StreamComparator base){
|
||||
if(null == base){ return false; }
|
||||
if(base instanceof MultipleFieldComparator){
|
||||
MultipleFieldComparator baseComp = (MultipleFieldComparator)base;
|
||||
|
||||
if(baseComp.comps.length >= comps.length){
|
||||
for(int idx = 0; idx < comps.length; ++idx){
|
||||
if(!comps[idx].isDerivedFrom(baseComp.comps[idx])){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -26,92 +26,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
|
|||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
|
||||
/**
|
||||
* An equality field Comparator which compares a field of two Tuples and determines sort order.
|
||||
**/
|
||||
public class StreamComparator implements Comparator<Tuple>, Expressible, Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
private String leftField;
|
||||
private String rightField;
|
||||
private final ComparatorOrder order;
|
||||
private ComparatorLambda comparator;
|
||||
|
||||
public StreamComparator(String field, ComparatorOrder order) {
|
||||
this.leftField = field;
|
||||
this.rightField = field;
|
||||
this.order = order;
|
||||
assignComparator();
|
||||
}
|
||||
public StreamComparator(String leftField, String rightField, ComparatorOrder order){
|
||||
this.leftField = leftField;
|
||||
this.rightField = rightField;
|
||||
this.order = order;
|
||||
assignComparator();
|
||||
}
|
||||
|
||||
public StreamExpressionParameter toExpression(StreamFactory factory){
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
sb.append(leftField);
|
||||
|
||||
if(!leftField.equals(rightField)){
|
||||
sb.append("=");
|
||||
sb.append(rightField);
|
||||
}
|
||||
|
||||
sb.append(" ");
|
||||
sb.append(order);
|
||||
|
||||
return new StreamExpressionValue(sb.toString());
|
||||
}
|
||||
|
||||
/*
|
||||
* What're we doing here messing around with lambdas for the comparator logic?
|
||||
* We want the compare(...) function to run as fast as possible because it will be called many many
|
||||
* times over the lifetime of this object. For that reason we want to limit the number of comparisons
|
||||
* taking place in the compare(...) function. Because this class supports both ascending and
|
||||
* descending comparisons and the logic for each is slightly different, we want to do the
|
||||
* if(ascending){ compare like this } else { compare like this }
|
||||
* check only once - we can do that in the constructor of this class, create a lambda, and then execute
|
||||
* that lambda in the compare function. A little bit of branch prediction savings right here.
|
||||
*/
|
||||
private void assignComparator(){
|
||||
if(ComparatorOrder.DESCENDING == order){
|
||||
comparator = new ComparatorLambda() {
|
||||
@Override
|
||||
public int compare(Tuple leftTuple, Tuple rightTuple) {
|
||||
Comparable leftComp = (Comparable)leftTuple.get(leftField);
|
||||
Comparable rightComp = (Comparable)rightTuple.get(rightField);
|
||||
|
||||
if(leftComp == rightComp){ return 0; } // if both null then they are equal. if both are same ref then are equal
|
||||
if(null == leftComp){ return 1; }
|
||||
if(null == rightComp){ return -1; }
|
||||
|
||||
return rightComp.compareTo(leftComp);
|
||||
}
|
||||
};
|
||||
}
|
||||
else{
|
||||
// See above for black magic reasoning.
|
||||
comparator = new ComparatorLambda() {
|
||||
@Override
|
||||
public int compare(Tuple leftTuple, Tuple rightTuple) {
|
||||
Comparable leftComp = (Comparable)leftTuple.get(leftField);
|
||||
Comparable rightComp = (Comparable)rightTuple.get(rightField);
|
||||
|
||||
if(leftComp == rightComp){ return 0; } // if both null then they are equal. if both are same ref then are equal
|
||||
if(null == leftComp){ return -1; }
|
||||
if(null == rightComp){ return 1; }
|
||||
|
||||
return leftComp.compareTo(rightComp);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public int compare(Tuple leftTuple, Tuple rightTuple) {
|
||||
return comparator.compare(leftTuple, rightTuple);
|
||||
}
|
||||
/** Defines a comparator we can use with TupleStreams */
|
||||
public interface StreamComparator extends Comparator<Tuple>, Expressible, Serializable {
|
||||
public boolean isDerivedFrom(StreamComparator base);
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.io.eq;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.FieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
|
||||
/**
|
||||
* An equality field Equalitor which compares a field of two Tuples and determines if they are equal.
|
||||
**/
|
||||
public class FieldEqualitor implements StreamEqualitor {
|
||||
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
private String leftFieldName;
|
||||
private String rightFieldName;
|
||||
|
||||
public FieldEqualitor(String fieldName) {
|
||||
init(fieldName, fieldName);
|
||||
}
|
||||
public FieldEqualitor(String leftFieldName, String rightFieldName){
|
||||
init(leftFieldName, rightFieldName);
|
||||
}
|
||||
|
||||
private void init(String leftFieldName, String rightFieldName){
|
||||
this.leftFieldName = leftFieldName;
|
||||
this.rightFieldName = rightFieldName;
|
||||
}
|
||||
|
||||
public StreamExpressionParameter toExpression(StreamFactory factory){
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
sb.append(leftFieldName);
|
||||
|
||||
if(!leftFieldName.equals(rightFieldName)){
|
||||
sb.append("=");
|
||||
sb.append(rightFieldName);
|
||||
}
|
||||
|
||||
return new StreamExpressionValue(sb.toString());
|
||||
}
|
||||
|
||||
public boolean test(Tuple leftTuple, Tuple rightTuple) {
|
||||
|
||||
Comparable leftComp = (Comparable)leftTuple.get(leftFieldName);
|
||||
Comparable rightComp = (Comparable)rightTuple.get(rightFieldName);
|
||||
|
||||
if(leftComp == rightComp){ return true; } // if both null then they are equal. if both are same ref then are equal
|
||||
if(null == leftComp || null == rightComp){ return false; }
|
||||
|
||||
return 0 == leftComp.compareTo(rightComp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDerivedFrom(StreamEqualitor base){
|
||||
if(null == base){ return false; }
|
||||
if(base instanceof FieldEqualitor){
|
||||
FieldEqualitor baseEq = (FieldEqualitor)base;
|
||||
return leftFieldName.equals(baseEq.leftFieldName) && rightFieldName.equals(baseEq.rightFieldName);
|
||||
}
|
||||
else if(base instanceof MultipleFieldEqualitor){
|
||||
// must equal the first one
|
||||
MultipleFieldEqualitor baseEqs = (MultipleFieldEqualitor)base;
|
||||
if(baseEqs.getEqs().length > 0){
|
||||
return isDerivedFrom(baseEqs.getEqs()[0]);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDerivedFrom(StreamComparator base){
|
||||
if(null == base){ return false; }
|
||||
if(base instanceof FieldComparator){
|
||||
FieldComparator baseComp = (FieldComparator)base;
|
||||
return leftFieldName.equals(baseComp.getFieldName()) && rightFieldName.equals(baseComp.getFieldName());
|
||||
}
|
||||
else if(base instanceof MultipleFieldComparator){
|
||||
// must equal the first one
|
||||
MultipleFieldComparator baseComps = (MultipleFieldComparator)base;
|
||||
if(baseComps.getComps().length > 0){
|
||||
return isDerivedFrom(baseComps.getComps()[0]);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -18,10 +18,10 @@
|
|||
package org.apache.solr.client.solrj.io.eq;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
|
||||
|
@ -32,15 +32,19 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
|||
* Wraps multiple Equalitors.
|
||||
**/
|
||||
|
||||
public class MultiEqualitor implements Equalitor<Tuple>, Expressible, Serializable {
|
||||
public class MultipleFieldEqualitor implements StreamEqualitor {
|
||||
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
private Equalitor<Tuple>[] eqs;
|
||||
private StreamEqualitor[] eqs;
|
||||
|
||||
public MultiEqualitor(Equalitor<Tuple>... eqs) {
|
||||
public MultipleFieldEqualitor(StreamEqualitor... eqs) {
|
||||
this.eqs = eqs;
|
||||
}
|
||||
|
||||
public StreamEqualitor[] getEqs(){
|
||||
return eqs;
|
||||
}
|
||||
|
||||
public boolean test(Tuple t1, Tuple t2) {
|
||||
for(Equalitor<Tuple> eq : eqs) {
|
||||
|
@ -67,4 +71,42 @@ public class MultiEqualitor implements Equalitor<Tuple>, Expressible, Serializab
|
|||
|
||||
return new StreamExpressionValue(sb.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDerivedFrom(StreamEqualitor base){
|
||||
if(null == base){ return false; }
|
||||
if(base instanceof MultipleFieldEqualitor){
|
||||
MultipleFieldEqualitor baseEq = (MultipleFieldEqualitor)base;
|
||||
|
||||
if(baseEq.eqs.length >= eqs.length){
|
||||
for(int idx = 0; idx < eqs.length; ++idx){
|
||||
if(!eqs[idx].isDerivedFrom(baseEq.eqs[idx])){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDerivedFrom(StreamComparator base){
|
||||
if(null == base){ return false; }
|
||||
if(base instanceof StreamComparator){
|
||||
MultipleFieldComparator baseComps = (MultipleFieldComparator)base;
|
||||
|
||||
if(baseComps.getComps().length >= eqs.length){
|
||||
for(int idx = 0; idx < eqs.length; ++idx){
|
||||
if(!eqs[idx].isDerivedFrom(baseComps.getComps()[idx])){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -18,54 +18,13 @@
|
|||
package org.apache.solr.client.solrj.io.eq;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
|
||||
/**
|
||||
* An equality field Equalitor which compares a field of two Tuples and determines if they are equal.
|
||||
**/
|
||||
public class StreamEqualitor implements Equalitor<Tuple>, Expressible, Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
private String leftFieldName;
|
||||
private String rightFieldName;
|
||||
private StreamComparator comparator;
|
||||
|
||||
public StreamEqualitor(String fieldName) {
|
||||
init(fieldName, fieldName);
|
||||
}
|
||||
public StreamEqualitor(String leftFieldName, String rightFieldName){
|
||||
init(leftFieldName, rightFieldName);
|
||||
}
|
||||
|
||||
private void init(String leftFieldName, String rightFieldName){
|
||||
this.leftFieldName = leftFieldName;
|
||||
this.rightFieldName = rightFieldName;
|
||||
this.comparator = new StreamComparator(leftFieldName, rightFieldName, ComparatorOrder.ASCENDING);
|
||||
}
|
||||
|
||||
public StreamExpressionParameter toExpression(StreamFactory factory){
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
sb.append(leftFieldName);
|
||||
|
||||
if(!leftFieldName.equals(rightFieldName)){
|
||||
sb.append("=");
|
||||
sb.append(rightFieldName);
|
||||
}
|
||||
|
||||
return new StreamExpressionValue(sb.toString());
|
||||
}
|
||||
|
||||
public boolean test(Tuple leftTuple, Tuple rightTuple) {
|
||||
return 0 == comparator.compare(leftTuple, rightTuple);
|
||||
}
|
||||
/** Defines a comparator we can use with TupleStreams */
|
||||
public interface StreamEqualitor extends Equalitor<Tuple>, Expressible, Serializable {
|
||||
public boolean isDerivedFrom(StreamEqualitor base);
|
||||
public boolean isDerivedFrom(StreamComparator base);
|
||||
}
|
|
@ -38,7 +38,8 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
|||
import org.apache.solr.client.solrj.io.SolrClientCache;
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
|
||||
import org.apache.solr.client.solrj.io.comp.MultiComp;
|
||||
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.FieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
|
@ -69,7 +70,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
|
|||
protected String collection;
|
||||
protected Map<String,String> params;
|
||||
private Map<String, String> fieldMappings;
|
||||
protected Comparator<Tuple> comp;
|
||||
protected StreamComparator comp;
|
||||
private int zkConnectTimeout = 10000;
|
||||
private int zkClientTimeout = 10000;
|
||||
private int numWorkers;
|
||||
|
@ -242,7 +243,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
|
|||
return solrStreams;
|
||||
}
|
||||
|
||||
private Comparator<Tuple> parseComp(String sort, String fl) throws IOException {
|
||||
private StreamComparator parseComp(String sort, String fl) throws IOException {
|
||||
|
||||
String[] fls = fl.split(",");
|
||||
HashSet fieldSet = new HashSet();
|
||||
|
@ -251,7 +252,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
|
|||
}
|
||||
|
||||
String[] sorts = sort.split(",");
|
||||
Comparator[] comps = new Comparator[sorts.length];
|
||||
StreamComparator[] comps = new StreamComparator[sorts.length];
|
||||
for(int i=0; i<sorts.length; i++) {
|
||||
String s = sorts[i];
|
||||
|
||||
|
@ -269,11 +270,11 @@ public class CloudSolrStream extends TupleStream implements Expressible {
|
|||
fieldName = fieldMappings.get(fieldName);
|
||||
}
|
||||
|
||||
comps[i] = new StreamComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
|
||||
comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
|
||||
}
|
||||
|
||||
if(comps.length > 1) {
|
||||
return new MultiComp(comps);
|
||||
return new MultipleFieldComparator(comps);
|
||||
} else {
|
||||
return comps[0];
|
||||
}
|
||||
|
@ -351,6 +352,11 @@ public class CloudSolrStream extends TupleStream implements Expressible {
|
|||
cloudSolrClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
/** Return the stream sort - ie, the order in which records are returned */
|
||||
public StreamComparator getStreamSort(){
|
||||
return comp;
|
||||
}
|
||||
|
||||
public Tuple read() throws IOException {
|
||||
return _read();
|
||||
|
|
|
@ -19,11 +19,11 @@ package org.apache.solr.client.solrj.io.stream;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.FieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
|
@ -43,12 +43,10 @@ public class MergeStream extends TupleStream implements Expressible {
|
|||
|
||||
private PushBackStream streamA;
|
||||
private PushBackStream streamB;
|
||||
private Comparator<Tuple> comp;
|
||||
private StreamComparator comp;
|
||||
|
||||
public MergeStream(TupleStream streamA, TupleStream streamB, Comparator<Tuple> comp) {
|
||||
this.streamA = new PushBackStream(streamA);
|
||||
this.streamB = new PushBackStream(streamB);
|
||||
this.comp = comp;
|
||||
public MergeStream(TupleStream streamA, TupleStream streamB, StreamComparator comp) throws IOException {
|
||||
init(streamA, streamB, comp);
|
||||
}
|
||||
|
||||
public MergeStream(StreamExpression expression,StreamFactory factory) throws IOException {
|
||||
|
@ -64,15 +62,26 @@ public class MergeStream extends TupleStream implements Expressible {
|
|||
if(2 != streamExpressions.size()){
|
||||
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting two streams but found %d (must be PushBackStream types)",expression, streamExpressions.size()));
|
||||
}
|
||||
this.streamA = new PushBackStream(factory.constructStream(streamExpressions.get(0)));
|
||||
this.streamB = new PushBackStream(factory.constructStream(streamExpressions.get(1)));
|
||||
|
||||
|
||||
if(null == onExpression || !(onExpression.getParameter() instanceof StreamExpressionValue)){
|
||||
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'on' parameter listing fields to merge on but didn't find one",expression));
|
||||
}
|
||||
|
||||
// Merge is always done over equality, so always use an EqualTo comparator
|
||||
this.comp = factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), StreamComparator.class);
|
||||
init( factory.constructStream(streamExpressions.get(0)),
|
||||
factory.constructStream(streamExpressions.get(1)),
|
||||
factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldComparator.class)
|
||||
);
|
||||
}
|
||||
|
||||
private void init(TupleStream streamA, TupleStream streamB, StreamComparator comp) throws IOException {
|
||||
this.streamA = new PushBackStream(streamA);
|
||||
this.streamB = new PushBackStream(streamB);
|
||||
this.comp = comp;
|
||||
|
||||
// streamA and streamB must both be sorted so that comp can be derived from
|
||||
if(!comp.isDerivedFrom(streamA.getStreamSort()) || !comp.isDerivedFrom(streamB.getStreamSort())){
|
||||
throw new IOException("Invalid MergeStream - both substream comparators (sort) must be a superset of this stream's comparator.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -85,12 +94,7 @@ public class MergeStream extends TupleStream implements Expressible {
|
|||
expression.addParameter(streamB.toExpression(factory));
|
||||
|
||||
// on
|
||||
if(comp instanceof Expressible){
|
||||
expression.addParameter(new StreamExpressionNamedParameter("on",((Expressible)comp).toExpression(factory)));
|
||||
}
|
||||
else{
|
||||
throw new IOException("This MergeStream contains a non-expressible comparator - it cannot be converted to an expression");
|
||||
}
|
||||
expression.addParameter(new StreamExpressionNamedParameter("on",comp.toExpression(factory)));
|
||||
|
||||
return expression;
|
||||
}
|
||||
|
@ -145,6 +149,12 @@ public class MergeStream extends TupleStream implements Expressible {
|
|||
return b;
|
||||
}
|
||||
}
|
||||
|
||||
/** Return the stream sort - ie, the order in which records are returned */
|
||||
public StreamComparator getStreamSort(){
|
||||
return comp;
|
||||
}
|
||||
|
||||
|
||||
public int getCost() {
|
||||
return 0;
|
||||
|
|
|
@ -24,7 +24,6 @@ import java.net.URLEncoder;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -34,6 +33,7 @@ import java.util.Map.Entry;
|
|||
import java.util.Random;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.FieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
|
@ -65,7 +65,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
|
|||
String collection,
|
||||
TupleStream tupleStream,
|
||||
int workers,
|
||||
Comparator<Tuple> comp) throws IOException {
|
||||
StreamComparator comp) throws IOException {
|
||||
init(zkHost,collection,tupleStream,workers,comp);
|
||||
}
|
||||
|
||||
|
@ -74,7 +74,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
|
|||
String collection,
|
||||
String expressionString,
|
||||
int workers,
|
||||
Comparator<Tuple> comp) throws IOException {
|
||||
StreamComparator comp) throws IOException {
|
||||
objectSerialize = false;
|
||||
TupleStream tStream = this.streamFactory.constructStream(expressionString);
|
||||
init(zkHost,collection, tStream, workers,comp);
|
||||
|
@ -140,12 +140,12 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
|
|||
|
||||
// We've got all the required items
|
||||
TupleStream stream = factory.constructStream(streamExpressions.get(0));
|
||||
Comparator<Tuple> comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), StreamComparator.class);
|
||||
StreamComparator comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), FieldComparator.class);
|
||||
streamFactory = factory;
|
||||
init(zkHost,collectionName,stream,workersInt,comp);
|
||||
}
|
||||
|
||||
private void init(String zkHost,String collection,TupleStream tupleStream,int workers,Comparator<Tuple> comp) throws IOException{
|
||||
private void init(String zkHost,String collection,TupleStream tupleStream,int workers,StreamComparator comp) throws IOException{
|
||||
this.zkHost = zkHost;
|
||||
this.collection = collection;
|
||||
this.workers = workers;
|
||||
|
@ -179,12 +179,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
|
|||
}
|
||||
|
||||
// sort
|
||||
if(comp instanceof Expressible){
|
||||
expression.addParameter(new StreamExpressionNamedParameter("sort",((Expressible)comp).toExpression(factory)));
|
||||
}
|
||||
else{
|
||||
throw new IOException("This ParallelStream contains a non-expressible comparator - it cannot be converted to an expression");
|
||||
}
|
||||
expression.addParameter(new StreamExpressionNamedParameter("sort",comp.toExpression(factory)));
|
||||
|
||||
// zkHost
|
||||
expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
|
@ -82,6 +83,13 @@ public class PushBackStream extends TupleStream {
|
|||
return stream.read();
|
||||
}
|
||||
}
|
||||
|
||||
/** Return the stream sort - ie, the order in which records are returned
|
||||
* This returns the streamSort of the substream */
|
||||
public StreamComparator getStreamSort(){
|
||||
return stream.getStreamSort();
|
||||
}
|
||||
|
||||
|
||||
public int getCost() {
|
||||
return 0;
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Locale;
|
|||
import java.util.PriorityQueue;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.FieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
|
@ -43,14 +44,14 @@ public class RankStream extends TupleStream implements Expressible {
|
|||
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
private TupleStream tupleStream;
|
||||
private Comparator<Tuple> comp;
|
||||
private TupleStream stream;
|
||||
private StreamComparator comp;
|
||||
private int size;
|
||||
private transient PriorityQueue<Tuple> top;
|
||||
private transient boolean finished = false;
|
||||
private transient LinkedList<Tuple> topList;
|
||||
|
||||
public RankStream(TupleStream tupleStream, int size, Comparator<Tuple> comp) {
|
||||
public RankStream(TupleStream tupleStream, int size, StreamComparator comp) throws IOException {
|
||||
init(tupleStream,size,comp);
|
||||
}
|
||||
|
||||
|
@ -87,15 +88,17 @@ public class RankStream extends TupleStream implements Expressible {
|
|||
}
|
||||
|
||||
TupleStream stream = factory.constructStream(streamExpressions.get(0));
|
||||
Comparator<Tuple> comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), StreamComparator.class);
|
||||
StreamComparator comp = factory.constructComparator(((StreamExpressionValue)sortExpression.getParameter()).getValue(), FieldComparator.class);
|
||||
|
||||
init(stream,nInt,comp);
|
||||
}
|
||||
|
||||
private void init(TupleStream tupleStream, int size, Comparator<Tuple> comp){
|
||||
this.tupleStream = tupleStream;
|
||||
private void init(TupleStream tupleStream, int size, StreamComparator comp) throws IOException{
|
||||
this.stream = tupleStream;
|
||||
this.comp = comp;
|
||||
this.size = size;
|
||||
|
||||
// Rank stream does not demand that its order is derivable from the order of the incoming stream. No derivation check required
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -107,52 +110,47 @@ public class RankStream extends TupleStream implements Expressible {
|
|||
expression.addParameter(new StreamExpressionNamedParameter("n", Integer.toString(size)));
|
||||
|
||||
// stream
|
||||
if(tupleStream instanceof Expressible){
|
||||
expression.addParameter(((Expressible)tupleStream).toExpression(factory));
|
||||
if(stream instanceof Expressible){
|
||||
expression.addParameter(((Expressible)stream).toExpression(factory));
|
||||
}
|
||||
else{
|
||||
throw new IOException("This RankStream contains a non-expressible TupleStream - it cannot be converted to an expression");
|
||||
}
|
||||
|
||||
// sort
|
||||
if(comp instanceof Expressible){
|
||||
expression.addParameter(new StreamExpressionNamedParameter("sort",((Expressible)comp).toExpression(factory)));
|
||||
}
|
||||
else{
|
||||
throw new IOException("This RankStream contains a non-expressible comparator - it cannot be converted to an expression");
|
||||
}
|
||||
expression.addParameter(new StreamExpressionNamedParameter("sort",comp.toExpression(factory)));
|
||||
|
||||
return expression;
|
||||
}
|
||||
|
||||
public void setStreamContext(StreamContext context) {
|
||||
this.tupleStream.setStreamContext(context);
|
||||
this.stream.setStreamContext(context);
|
||||
}
|
||||
|
||||
public List<TupleStream> children() {
|
||||
List<TupleStream> l = new ArrayList();
|
||||
l.add(tupleStream);
|
||||
l.add(stream);
|
||||
return l;
|
||||
}
|
||||
|
||||
public void open() throws IOException {
|
||||
this.top = new PriorityQueue(size, new ReverseComp(comp));
|
||||
this.topList = new LinkedList();
|
||||
tupleStream.open();
|
||||
stream.open();
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
tupleStream.close();
|
||||
stream.close();
|
||||
}
|
||||
|
||||
public Comparator<Tuple> getComparator(){
|
||||
public StreamComparator getComparator(){
|
||||
return this.comp;
|
||||
}
|
||||
|
||||
public Tuple read() throws IOException {
|
||||
if(!finished) {
|
||||
while(true) {
|
||||
Tuple tuple = tupleStream.read();
|
||||
Tuple tuple = stream.read();
|
||||
if(tuple.EOF) {
|
||||
finished = true;
|
||||
int s = top.size();
|
||||
|
@ -178,6 +176,11 @@ public class RankStream extends TupleStream implements Expressible {
|
|||
|
||||
return topList.pollFirst();
|
||||
}
|
||||
|
||||
/** Return the stream sort - ie, the order in which records are returned */
|
||||
public StreamComparator getStreamSort(){
|
||||
return comp;
|
||||
}
|
||||
|
||||
public int getCost() {
|
||||
return 0;
|
||||
|
@ -185,14 +188,16 @@ public class RankStream extends TupleStream implements Expressible {
|
|||
|
||||
class ReverseComp implements Comparator<Tuple>, Serializable {
|
||||
|
||||
private Comparator<Tuple> comp;
|
||||
private StreamComparator comp;
|
||||
|
||||
public ReverseComp(Comparator<Tuple> comp) {
|
||||
public ReverseComp(StreamComparator comp) {
|
||||
this.comp = comp;
|
||||
}
|
||||
|
||||
public int compare(Tuple t1, Tuple t2) {
|
||||
return comp.compare(t1, t2)*(-1);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
|
@ -19,13 +19,13 @@ package org.apache.solr.client.solrj.io.stream;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.FieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
|
@ -52,15 +52,13 @@ public class ReducerStream extends TupleStream implements Expressible {
|
|||
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
private PushBackStream tupleStream;
|
||||
private Comparator<Tuple> comp;
|
||||
private PushBackStream stream;
|
||||
private StreamComparator comp;
|
||||
|
||||
private transient Tuple currentGroupHead;
|
||||
|
||||
public ReducerStream(TupleStream tupleStream,
|
||||
Comparator<Tuple> comp) {
|
||||
this.tupleStream = new PushBackStream(tupleStream);
|
||||
this.comp = comp;
|
||||
public ReducerStream(TupleStream stream,StreamComparator comp) throws IOException {
|
||||
init(stream,comp);
|
||||
}
|
||||
|
||||
public ReducerStream(StreamExpression expression, StreamFactory factory) throws IOException{
|
||||
|
@ -75,15 +73,25 @@ public class ReducerStream extends TupleStream implements Expressible {
|
|||
|
||||
if(1 != streamExpressions.size()){
|
||||
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
|
||||
}
|
||||
this.tupleStream = new PushBackStream(factory.constructStream(streamExpressions.get(0)));
|
||||
|
||||
}
|
||||
if(null == byExpression || !(byExpression.getParameter() instanceof StreamExpressionValue)){
|
||||
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'by' parameter listing fields to group by but didn't find one",expression));
|
||||
}
|
||||
|
||||
// Reducing is always done over equality, so always use an EqualTo comparator
|
||||
this.comp = factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), StreamComparator.class);
|
||||
|
||||
init(factory.constructStream(streamExpressions.get(0)),
|
||||
factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldComparator.class)
|
||||
);
|
||||
}
|
||||
|
||||
private void init(TupleStream stream, StreamComparator comp) throws IOException{
|
||||
this.stream = new PushBackStream(stream);
|
||||
this.comp = comp;
|
||||
|
||||
if(!comp.isDerivedFrom(stream.getStreamSort())){
|
||||
throw new IOException("Invalid ReducerStream - substream comparator (sort) must be a superset of this stream's comparator.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -92,7 +100,7 @@ public class ReducerStream extends TupleStream implements Expressible {
|
|||
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
|
||||
|
||||
// stream
|
||||
expression.addParameter(tupleStream.toExpression(factory));
|
||||
expression.addParameter(stream.toExpression(factory));
|
||||
|
||||
// over
|
||||
if(comp instanceof Expressible){
|
||||
|
@ -106,32 +114,32 @@ public class ReducerStream extends TupleStream implements Expressible {
|
|||
}
|
||||
|
||||
public void setStreamContext(StreamContext context) {
|
||||
this.tupleStream.setStreamContext(context);
|
||||
this.stream.setStreamContext(context);
|
||||
}
|
||||
|
||||
public List<TupleStream> children() {
|
||||
List<TupleStream> l = new ArrayList();
|
||||
l.add(tupleStream);
|
||||
l.add(stream);
|
||||
return l;
|
||||
}
|
||||
|
||||
public void open() throws IOException {
|
||||
tupleStream.open();
|
||||
stream.open();
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
tupleStream.close();
|
||||
stream.close();
|
||||
}
|
||||
|
||||
public Tuple read() throws IOException {
|
||||
|
||||
List<Map> maps = new ArrayList();
|
||||
while(true) {
|
||||
Tuple t = tupleStream.read();
|
||||
Tuple t = stream.read();
|
||||
|
||||
if(t.EOF) {
|
||||
if(maps.size() > 0) {
|
||||
tupleStream.pushBack(t);
|
||||
stream.pushBack(t);
|
||||
Map map1 = maps.get(0);
|
||||
Map map2 = new HashMap();
|
||||
map2.putAll(map1);
|
||||
|
@ -151,7 +159,7 @@ public class ReducerStream extends TupleStream implements Expressible {
|
|||
maps.add(t.getMap());
|
||||
} else {
|
||||
Tuple groupHead = currentGroupHead.clone();
|
||||
tupleStream.pushBack(t);
|
||||
stream.pushBack(t);
|
||||
currentGroupHead = null;
|
||||
groupHead.setMaps(maps);
|
||||
return groupHead;
|
||||
|
@ -159,6 +167,11 @@ public class ReducerStream extends TupleStream implements Expressible {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Return the stream sort - ie, the order in which records are returned */
|
||||
public StreamComparator getStreamSort(){
|
||||
return comp;
|
||||
}
|
||||
|
||||
public int getCost() {
|
||||
return 0;
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.ArrayList;
|
|||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.HashKey;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
|
||||
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
|
||||
|
||||
|
@ -66,6 +67,10 @@ public class RollupStream extends TupleStream {
|
|||
tupleStream.close();
|
||||
}
|
||||
|
||||
public StreamComparator getStreamSort(){
|
||||
return tupleStream.getStreamSort();
|
||||
}
|
||||
|
||||
public Tuple read() throws IOException {
|
||||
|
||||
while(true) {
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Iterator;
|
|||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.io.SolrClientCache;
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
|
||||
|
@ -163,6 +164,11 @@ public class SolrStream extends TupleStream {
|
|||
return new Tuple(fields);
|
||||
}
|
||||
}
|
||||
|
||||
/** There is no known sort applied to a SolrStream */
|
||||
public StreamComparator getStreamSort(){
|
||||
return null;
|
||||
}
|
||||
|
||||
private Map mapFields(Map fields, Map<String,String> mappings) {
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.Serializable;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
|
||||
|
@ -43,7 +44,10 @@ public abstract class TupleStream implements Serializable {
|
|||
|
||||
public abstract Tuple read() throws IOException;
|
||||
|
||||
public abstract StreamComparator getStreamSort();
|
||||
|
||||
public int getCost() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
|
@ -24,8 +24,10 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.FieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.eq.Equalitor;
|
||||
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
|
||||
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
|
@ -44,13 +46,12 @@ public class UniqueStream extends TupleStream implements Expressible {
|
|||
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
private TupleStream tupleStream;
|
||||
private TupleStream stream;
|
||||
private Equalitor<Tuple> eq;
|
||||
private transient Tuple currentTuple;
|
||||
|
||||
public UniqueStream(TupleStream tupleStream, Equalitor<Tuple> eq) {
|
||||
this.tupleStream = tupleStream;
|
||||
this.eq = eq;
|
||||
public UniqueStream(TupleStream stream, StreamEqualitor eq) throws IOException {
|
||||
init(stream,eq);
|
||||
}
|
||||
|
||||
public UniqueStream(StreamExpression expression,StreamFactory factory) throws IOException {
|
||||
|
@ -66,14 +67,21 @@ public class UniqueStream extends TupleStream implements Expressible {
|
|||
if(1 != streamExpressions.size()){
|
||||
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
|
||||
}
|
||||
this.tupleStream = factory.constructStream(streamExpressions.get(0));
|
||||
|
||||
if(null == overExpression || !(overExpression.getParameter() instanceof StreamExpressionValue)){
|
||||
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'over' parameter listing fields to unique over but didn't find one",expression));
|
||||
}
|
||||
|
||||
// Uniqueness is always done over equality, so always use an EqualTo comparator
|
||||
this.eq = factory.constructEqualitor(((StreamExpressionValue)overExpression.getParameter()).getValue(), StreamEqualitor.class);
|
||||
init(factory.constructStream(streamExpressions.get(0)), factory.constructEqualitor(((StreamExpressionValue)overExpression.getParameter()).getValue(), FieldEqualitor.class));
|
||||
}
|
||||
|
||||
private void init(TupleStream stream, StreamEqualitor eq) throws IOException{
|
||||
this.stream = stream;
|
||||
this.eq = eq;
|
||||
|
||||
if(!eq.isDerivedFrom(stream.getStreamSort())){
|
||||
throw new IOException("Invalid UniqueStream - substream comparator (sort) must be a superset of this stream's equalitor.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -82,8 +90,8 @@ public class UniqueStream extends TupleStream implements Expressible {
|
|||
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
|
||||
|
||||
// streams
|
||||
if(tupleStream instanceof Expressible){
|
||||
expression.addParameter(((Expressible)tupleStream).toExpression(factory));
|
||||
if(stream instanceof Expressible){
|
||||
expression.addParameter(((Expressible)stream).toExpression(factory));
|
||||
}
|
||||
else{
|
||||
throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression");
|
||||
|
@ -101,25 +109,25 @@ public class UniqueStream extends TupleStream implements Expressible {
|
|||
}
|
||||
|
||||
public void setStreamContext(StreamContext context) {
|
||||
this.tupleStream.setStreamContext(context);
|
||||
this.stream.setStreamContext(context);
|
||||
}
|
||||
|
||||
public List<TupleStream> children() {
|
||||
List<TupleStream> l = new ArrayList<TupleStream>();
|
||||
l.add(tupleStream);
|
||||
l.add(stream);
|
||||
return l;
|
||||
}
|
||||
|
||||
public void open() throws IOException {
|
||||
tupleStream.open();
|
||||
stream.open();
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
tupleStream.close();
|
||||
stream.close();
|
||||
}
|
||||
|
||||
public Tuple read() throws IOException {
|
||||
Tuple tuple = tupleStream.read();
|
||||
Tuple tuple = stream.read();
|
||||
if(tuple.EOF) {
|
||||
return tuple;
|
||||
}
|
||||
|
@ -131,7 +139,7 @@ public class UniqueStream extends TupleStream implements Expressible {
|
|||
while(true) {
|
||||
if(eq.test(currentTuple, tuple)){
|
||||
//We have duplicate tuple so read the next tuple from the stream.
|
||||
tuple = tupleStream.read();
|
||||
tuple = stream.read();
|
||||
if(tuple.EOF) {
|
||||
return tuple;
|
||||
}
|
||||
|
@ -144,6 +152,11 @@ public class UniqueStream extends TupleStream implements Expressible {
|
|||
}
|
||||
}
|
||||
|
||||
/** Return the stream sort - ie, the order in which records are returned */
|
||||
public StreamComparator getStreamSort(){
|
||||
return stream.getStreamSort();
|
||||
}
|
||||
|
||||
public int getCost() {
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@ import java.io.Serializable;
|
|||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
@ -14,9 +13,11 @@ import java.util.Map.Entry;
|
|||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
|
||||
import org.apache.solr.client.solrj.io.comp.MultiComp;
|
||||
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.eq.Equalitor;
|
||||
import org.apache.solr.client.solrj.io.eq.MultiEqualitor;
|
||||
import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
|
||||
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
|
||||
import org.apache.solr.client.solrj.io.stream.TupleStream;
|
||||
|
||||
/*
|
||||
|
@ -179,14 +180,14 @@ public class StreamFactory implements Serializable {
|
|||
throw new IOException(String.format(Locale.ROOT,"Invalid stream expression %s - function '%s' is unknown (not mapped to a valid TupleStream)", expression, expression.getFunctionName()));
|
||||
}
|
||||
|
||||
public Comparator<Tuple> constructComparator(String comparatorString, Class comparatorType) throws IOException {
|
||||
public StreamComparator constructComparator(String comparatorString, Class comparatorType) throws IOException {
|
||||
if(comparatorString.contains(",")){
|
||||
String[] parts = comparatorString.split(",");
|
||||
Comparator[] comps = new Comparator[parts.length];
|
||||
StreamComparator[] comps = new StreamComparator[parts.length];
|
||||
for(int idx = 0; idx < parts.length; ++idx){
|
||||
comps[idx] = constructComparator(parts[idx].trim(), comparatorType);
|
||||
}
|
||||
return new MultiComp(comps);
|
||||
return new MultipleFieldComparator(comps);
|
||||
}
|
||||
else{
|
||||
String[] parts = comparatorString.split(" ");
|
||||
|
@ -197,18 +198,18 @@ public class StreamFactory implements Serializable {
|
|||
String fieldName = parts[0].trim();
|
||||
String order = parts[1].trim();
|
||||
|
||||
return (Comparator)createInstance(comparatorType, new Class[]{ String.class, ComparatorOrder.class }, new Object[]{ fieldName, ComparatorOrder.fromString(order) });
|
||||
return (StreamComparator)createInstance(comparatorType, new Class[]{ String.class, ComparatorOrder.class }, new Object[]{ fieldName, ComparatorOrder.fromString(order) });
|
||||
}
|
||||
}
|
||||
|
||||
public Equalitor<Tuple> constructEqualitor(String equalitorString, Class equalitorType) throws IOException {
|
||||
public StreamEqualitor constructEqualitor(String equalitorString, Class equalitorType) throws IOException {
|
||||
if(equalitorString.contains(",")){
|
||||
String[] parts = equalitorString.split(",");
|
||||
Equalitor[] eqs = new Equalitor[parts.length];
|
||||
StreamEqualitor[] eqs = new StreamEqualitor[parts.length];
|
||||
for(int idx = 0; idx < parts.length; ++idx){
|
||||
eqs[idx] = constructEqualitor(parts[idx].trim(), equalitorType);
|
||||
}
|
||||
return new MultiEqualitor(eqs);
|
||||
return new MultipleFieldEqualitor(eqs);
|
||||
}
|
||||
else{
|
||||
String leftFieldName;
|
||||
|
@ -227,7 +228,7 @@ public class StreamFactory implements Serializable {
|
|||
leftFieldName = rightFieldName = equalitorString.trim();
|
||||
}
|
||||
|
||||
return (Equalitor)createInstance(equalitorType, new Class[]{ String.class, String.class }, new Object[]{ leftFieldName, rightFieldName });
|
||||
return (StreamEqualitor)createInstance(equalitorType, new Class[]{ String.class, String.class }, new Object[]{ leftFieldName, rightFieldName });
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
|
||||
|
@ -96,4 +97,9 @@ public class CountStream extends TupleStream implements Expressible, Serializabl
|
|||
return t;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamComparator getStreamSort() {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -601,7 +601,12 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
|
|||
.withStreamFunction("group", ReducerStream.class)
|
||||
.withStreamFunction("parallel", ParallelStream.class);
|
||||
|
||||
ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, top(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), n=\"11\", sort=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
|
||||
ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel("
|
||||
+ "collection1, "
|
||||
+ "top("
|
||||
+ "search(collection1, q=\"*:*\", fl=\"id,a_s,a_i\", sort=\"a_i asc\", partitionKeys=\"a_i\"), "
|
||||
+ "n=\"11\", "
|
||||
+ "sort=\"a_i desc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_i desc\")");
|
||||
|
||||
List<Tuple> tuples = getTuples(pstream);
|
||||
|
||||
|
|
|
@ -78,10 +78,10 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase {
|
|||
String expressionString;
|
||||
|
||||
// Basic test
|
||||
stream = new UniqueStream(StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f asc\")"), factory);
|
||||
stream = new UniqueStream(StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f\")"), factory);
|
||||
expressionString = stream.toExpression(factory).toString();
|
||||
assertTrue(expressionString.contains("unique(search(collection1"));
|
||||
assertTrue(expressionString.contains("over=\"a_f asc\""));
|
||||
assertTrue(expressionString.contains("over=a_f"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -29,9 +29,9 @@ import org.apache.lucene.util.LuceneTestCase;
|
|||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
|
||||
import org.apache.solr.client.solrj.io.comp.MultiComp;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
|
||||
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.FieldComparator;
|
||||
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
|
||||
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
|
||||
|
@ -139,11 +139,23 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
|
||||
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
|
||||
UniqueStream ustream = new UniqueStream(stream, new StreamEqualitor("a_f"));
|
||||
UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
|
||||
List<Tuple> tuples = getTuples(ustream);
|
||||
assert(tuples.size() == 4);
|
||||
assertOrder(tuples, 0,1,3,4);
|
||||
|
||||
|
||||
try {
|
||||
params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
|
||||
stream = new CloudSolrStream(zkHost, "collection1", params);
|
||||
ustream = new UniqueStream(stream, new FieldEqualitor("a_i"));
|
||||
throw new Exception("Equalitors did not match but no excepion was thrown");
|
||||
} catch(Exception e) {
|
||||
if(!e.getMessage().equals("Invalid UniqueStream - substream comparator (sort) must be a superset of this stream's equalitor.")) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
del("*:*");
|
||||
commit();
|
||||
|
||||
|
@ -188,7 +200,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
Map paramsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
|
||||
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
|
||||
ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new StreamComparator("a_s",ComparatorOrder.ASCENDING));
|
||||
ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
|
||||
|
||||
List<Tuple> tuples = getTuples(pstream);
|
||||
|
||||
|
@ -221,8 +233,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
|
||||
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
|
||||
UniqueStream ustream = new UniqueStream(stream, new StreamEqualitor("a_f"));
|
||||
ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new StreamComparator("a_f",ComparatorOrder.ASCENDING));
|
||||
UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
|
||||
ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new FieldComparator("a_f",ComparatorOrder.ASCENDING));
|
||||
List<Tuple> tuples = getTuples(pstream);
|
||||
assert(tuples.size() == 5);
|
||||
assertOrder(tuples, 0,1,3,4,6);
|
||||
|
@ -255,7 +267,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
|
||||
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
|
||||
RankStream rstream = new RankStream(stream, 3, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
|
||||
RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
|
||||
List<Tuple> tuples = getTuples(rstream);
|
||||
|
||||
|
||||
|
@ -287,8 +299,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
|
||||
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
|
||||
RankStream rstream = new RankStream(stream, 11, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
|
||||
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
|
||||
RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
|
||||
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
|
||||
List<Tuple> tuples = getTuples(pstream);
|
||||
|
||||
assert(tuples.size() == 10);
|
||||
|
@ -354,7 +366,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
//Test with spaces in the parameter lists.
|
||||
Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc");
|
||||
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
|
||||
ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s",ComparatorOrder.ASCENDING));
|
||||
ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
|
||||
|
||||
List<Tuple> tuples = getTuples(rstream);
|
||||
|
||||
|
@ -373,6 +385,18 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
List<Map> maps2 = t2.getMaps();
|
||||
assertMaps(maps2, 4, 6);
|
||||
|
||||
try {
|
||||
|
||||
paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_i asc , a_f asc");
|
||||
stream = new CloudSolrStream(zkHost, "collection1", paramsA);
|
||||
rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
|
||||
throw new Exception("Sorts did not match up and Exception was not not thrown.");
|
||||
} catch (Exception e) {
|
||||
if(!e.getMessage().equals("Invalid ReducerStream - substream comparator (sort) must be a superset of this stream's comparator.")) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
del("*:*");
|
||||
|
@ -401,7 +425,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
//Test with spaces in the parameter lists.
|
||||
Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
|
||||
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
|
||||
ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s", ComparatorOrder.ASCENDING));
|
||||
ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
|
||||
|
||||
List<Tuple> tuples = getTuples(rstream);
|
||||
|
||||
|
@ -432,8 +456,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
|
||||
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
|
||||
ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s",ComparatorOrder.ASCENDING));
|
||||
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_s",ComparatorOrder.ASCENDING));
|
||||
ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
|
||||
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
|
||||
|
||||
List<Tuple> tuples = getTuples(pstream);
|
||||
|
||||
|
@ -456,8 +480,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
|
||||
stream = new CloudSolrStream(zkHost, "collection1", paramsA);
|
||||
rstream = new ReducerStream(stream, new StreamComparator("a_s",ComparatorOrder.DESCENDING));
|
||||
pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_s",ComparatorOrder.DESCENDING));
|
||||
rstream = new ReducerStream(stream, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
|
||||
pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
|
||||
|
||||
tuples = getTuples(pstream);
|
||||
|
||||
|
@ -639,7 +663,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
new CountMetric()};
|
||||
|
||||
RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
|
||||
ParallelStream parallelStream = new ParallelStream(zkHost, "collection1", rollupStream, 2, new StreamComparator("a_s", ComparatorOrder.ASCENDING));
|
||||
ParallelStream parallelStream = new ParallelStream(zkHost, "collection1", rollupStream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
|
||||
List<Tuple> tuples = getTuples(parallelStream);
|
||||
|
||||
assert(tuples.size() == 3);
|
||||
|
@ -739,8 +763,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
Map paramsA = mapParams("q","blah","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
|
||||
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
|
||||
ReducerStream rstream = new ReducerStream(stream, new StreamComparator("a_s", ComparatorOrder.ASCENDING));
|
||||
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new StreamComparator("a_s", ComparatorOrder.ASCENDING));
|
||||
ReducerStream rstream = new ReducerStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
|
||||
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
|
||||
|
||||
List<Tuple> tuples = getTuples(pstream);
|
||||
assert(tuples.size() == 0);
|
||||
|
@ -809,7 +833,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
Map paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i asc");
|
||||
CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
|
||||
|
||||
MergeStream mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
|
||||
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
|
||||
List<Tuple> tuples = getTuples(mstream);
|
||||
|
||||
assert(tuples.size() == 5);
|
||||
|
@ -822,7 +846,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i desc");
|
||||
streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
|
||||
|
||||
mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
|
||||
mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
|
||||
tuples = getTuples(mstream);
|
||||
|
||||
assert(tuples.size() == 5);
|
||||
|
@ -836,7 +860,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
|
||||
streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
|
||||
|
||||
mstream = new MergeStream(streamA, streamB, new MultiComp(new StreamComparator("a_f",ComparatorOrder.ASCENDING),new StreamComparator("a_i",ComparatorOrder.ASCENDING)));
|
||||
mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.ASCENDING)));
|
||||
tuples = getTuples(mstream);
|
||||
|
||||
assert(tuples.size() == 5);
|
||||
|
@ -848,12 +872,41 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
|
||||
streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
|
||||
|
||||
mstream = new MergeStream(streamA, streamB, new MultiComp(new StreamComparator("a_f",ComparatorOrder.ASCENDING),new StreamComparator("a_i",ComparatorOrder.DESCENDING)));
|
||||
mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING)));
|
||||
tuples = getTuples(mstream);
|
||||
|
||||
assert(tuples.size() == 5);
|
||||
assertOrder(tuples, 2,0,1,3,4);
|
||||
|
||||
try {
|
||||
paramsA = mapParams("q","id:(2 4 1)","fl","id,a_s,a_i,a_f","sort", "a_f desc,a_i desc");
|
||||
streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
|
||||
|
||||
paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
|
||||
streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
|
||||
mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING)));
|
||||
throw new Exception("Sorts did not match up and Exception was not not thrown.");
|
||||
} catch(Exception e) {
|
||||
if(!e.getMessage().equals("Invalid MergeStream - both substream comparators (sort) must be a superset of this stream's comparator.")) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
paramsA = mapParams("q","id:(2 4 1)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
|
||||
streamA = new CloudSolrStream(zkHost, "collection1", paramsA);
|
||||
|
||||
paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
|
||||
streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
|
||||
mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING)));
|
||||
throw new Exception("Sorts did not match up and Exception was not not thrown.");
|
||||
} catch(Exception e) {
|
||||
if(!e.getMessage().equals("Invalid MergeStream - both substream comparators (sort) must be a superset of this stream's comparator.")) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
del("*:*");
|
||||
commit();
|
||||
}
|
||||
|
@ -884,8 +937,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
|
||||
CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
|
||||
|
||||
MergeStream mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
|
||||
ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
|
||||
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
|
||||
ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
|
||||
List<Tuple> tuples = getTuples(pstream);
|
||||
|
||||
assert(tuples.size() == 9);
|
||||
|
@ -898,8 +951,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");
|
||||
streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
|
||||
|
||||
mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
|
||||
pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new StreamComparator("a_i",ComparatorOrder.DESCENDING));
|
||||
mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
|
||||
pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
|
||||
tuples = getTuples(pstream);
|
||||
|
||||
assert(tuples.size() == 8);
|
||||
|
@ -934,9 +987,9 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
|
|||
Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
|
||||
CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
|
||||
|
||||
MergeStream mstream = new MergeStream(streamA, streamB, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
|
||||
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
|
||||
CountStream cstream = new CountStream(mstream);
|
||||
ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new StreamComparator("a_i",ComparatorOrder.ASCENDING));
|
||||
ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
|
||||
List<Tuple> tuples = getTuples(pstream);
|
||||
|
||||
assert(tuples.size() == 9);
|
||||
|
|
Loading…
Reference in New Issue