This commit is contained in:
Karl Wright 2016-04-15 03:01:34 -04:00
commit a0bc1e6427
4 changed files with 231 additions and 9 deletions

View File

@ -93,6 +93,9 @@ New Features
* SOLR-8976: Add SolrJ support for REBALANCELEADERS Collections API (Anshum Gupta) * SOLR-8976: Add SolrJ support for REBALANCELEADERS Collections API (Anshum Gupta)
* SOLR-8962: Add sort Streaming Expression. The expression takes a single input stream and a
comparator and outputs tuples in stable order of the comparator. (Dennis Gove)
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -95,8 +95,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
} }
streamFactory streamFactory
// streams // source streams
.withFunctionName("search", CloudSolrStream.class) .withFunctionName("search", CloudSolrStream.class)
.withFunctionName("facet", FacetStream.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("jdbc", JDBCStream.class)
.withFunctionName("topic", TopicStream.class)
// decorator streams
.withFunctionName("merge", MergeStream.class) .withFunctionName("merge", MergeStream.class)
.withFunctionName("unique", UniqueStream.class) .withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class) .withFunctionName("top", RankStream.class)
@ -109,17 +115,15 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("leftOuterJoin", LeftOuterJoinStream.class) .withFunctionName("leftOuterJoin", LeftOuterJoinStream.class)
.withFunctionName("hashJoin", HashJoinStream.class) .withFunctionName("hashJoin", HashJoinStream.class)
.withFunctionName("outerHashJoin", OuterHashJoinStream.class) .withFunctionName("outerHashJoin", OuterHashJoinStream.class)
.withFunctionName("facet", FacetStream.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("jdbc", JDBCStream.class)
.withFunctionName("intersect", IntersectStream.class) .withFunctionName("intersect", IntersectStream.class)
.withFunctionName("complement", ComplementStream.class) .withFunctionName("complement", ComplementStream.class)
.withFunctionName("daemon", DaemonStream.class) .withFunctionName("daemon", DaemonStream.class)
.withFunctionName("topic", TopicStream.class) .withFunctionName("sort", SortStream.class)
.withFunctionName("shortestPath", ShortestPathStream.class)
// graph streams
.withFunctionName("shortestPath", ShortestPathStream.class)
// metrics
// metrics
.withFunctionName("min", MinMetric.class) .withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class) .withFunctionName("max", MaxMetric.class)
.withFunctionName("avg", MeanMetric.class) .withFunctionName("avg", MeanMetric.class)

View File

@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/**
* The SortStream emits a stream of Tuples sorted by a Comparator.
**/
public class SortStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1;
private TupleStream stream;
private StreamComparator comparator;
private Worker worker;
public SortStream(TupleStream stream, StreamComparator comp) throws IOException {
init(stream,comp);
}
public SortStream(StreamExpression expression,StreamFactory factory) throws IOException {
// grab all parameters out
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
StreamExpressionNamedParameter byExpression = factory.getNamedOperand(expression, "by");
// validate expression contains only what we want.
if(expression.getParameters().size() != streamExpressions.size() + 1){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
}
if(1 != streamExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
}
if(null == byExpression || !(byExpression.getParameter() instanceof StreamExpressionValue)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'by' parameter listing fields to sort over but didn't find one",expression));
}
init(
factory.constructStream(streamExpressions.get(0)),
factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldComparator.class)
);
}
private void init(TupleStream stream, StreamComparator comp) throws IOException{
this.stream = stream;
this.comparator = comp;
// standard java modified merge sort
worker = new Worker() {
private LinkedList<Tuple> tuples = new LinkedList<Tuple>();
private Tuple eofTuple;
public void readStream(TupleStream stream) throws IOException {
Tuple tuple = stream.read();
while(!tuple.EOF){
tuples.add(tuple);
tuple = stream.read();
}
eofTuple = tuple;
}
public void sort() {
tuples.sort(comparator);
}
public Tuple read() {
if(tuples.isEmpty()){
return eofTuple;
}
return tuples.removeFirst();
}
};
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
if(stream instanceof Expressible){
expression.addParameter(((Expressible)stream).toExpression(factory));
}
else{
throw new IOException("This SortStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
// by
if(comparator instanceof Expressible){
expression.addParameter(new StreamExpressionNamedParameter("by",((Expressible)comparator).toExpression(factory)));
}
else{
throw new IOException("This SortStream contains a non-expressible equalitor - it cannot be converted to an expression");
}
return expression;
}
public void setStreamContext(StreamContext context) {
this.stream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList<TupleStream>();
l.add(stream);
return l;
}
public void open() throws IOException {
stream.open();
worker.readStream(stream);
worker.sort();
}
public void close() throws IOException {
stream.close();
}
public Tuple read() throws IOException {
// return next from sorted order
return worker.read();
}
/** Return the stream sort - ie, the order in which records are returned */
public StreamComparator getStreamSort(){
return comparator;
}
public int getCost() {
return 0;
}
private interface Worker {
public void readStream(TupleStream stream) throws IOException;
public void sort();
public Tuple read();
}
}

View File

@ -134,6 +134,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
testRankStream(); testRankStream();
testReducerStream(); testReducerStream();
testUniqueStream(); testUniqueStream();
testSortStream();
testRollupStream(); testRollupStream();
testStatsStream(); testStatsStream();
testNulls(); testNulls();
@ -306,6 +307,47 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
commit(); commit();
} }
private void testSortStream() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
indexr(id, "5", "a_s", "hello1", "a_i", "1", "a_f", "2");
commit();
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("sort", SortStream.class);
// Basic test
stream = factory.constructStream("sort(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")");
tuples = getTuples(stream);
assert(tuples.size() == 6);
assertOrder(tuples, 0,1,5,2,3,4);
// Basic test desc
stream = factory.constructStream("sort(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i desc\")");
tuples = getTuples(stream);
assert(tuples.size() == 6);
assertOrder(tuples, 4,3,2,1,5,0);
// Basic w/multi comp
stream = factory.constructStream("sort(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc, a_f desc\")");
tuples = getTuples(stream);
assert(tuples.size() == 6);
assertOrder(tuples, 0,5,1,2,3,4);
del("*:*");
commit();
}
private void testNulls() throws Exception { private void testNulls() throws Exception {