diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index fd71c1c026c..60647b2bc3e 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -64,6 +64,9 @@ New Features * SOLR-8976: Add SolrJ support for REBALANCELEADERS Collections API (Anshum Gupta) +* SOLR-8962: Add sort Streaming Expression. The expression takes a single input stream and a + comparator and outputs tuples in stable order of the comparator. (Dennis Gove) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 226058ee0c0..5ddd3129313 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -95,8 +95,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, } streamFactory - // streams + // source streams .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("facet", FacetStream.class) + .withFunctionName("update", UpdateStream.class) + .withFunctionName("jdbc", JDBCStream.class) + .withFunctionName("topic", TopicStream.class) + + // decorator streams .withFunctionName("merge", MergeStream.class) .withFunctionName("unique", UniqueStream.class) .withFunctionName("top", RankStream.class) @@ -109,17 +115,15 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, .withFunctionName("leftOuterJoin", LeftOuterJoinStream.class) .withFunctionName("hashJoin", HashJoinStream.class) .withFunctionName("outerHashJoin", OuterHashJoinStream.class) - .withFunctionName("facet", FacetStream.class) - .withFunctionName("update", UpdateStream.class) - .withFunctionName("jdbc", JDBCStream.class) .withFunctionName("intersect", IntersectStream.class) .withFunctionName("complement", ComplementStream.class) - .withFunctionName("daemon", DaemonStream.class) - .withFunctionName("topic", TopicStream.class) - .withFunctionName("shortestPath", ShortestPathStream.class) + .withFunctionName("daemon", DaemonStream.class) + .withFunctionName("sort", SortStream.class) + + // graph streams + .withFunctionName("shortestPath", ShortestPathStream.class) - - // metrics + // metrics .withFunctionName("min", MinMetric.class) .withFunctionName("max", MaxMetric.class) .withFunctionName("avg", MeanMetric.class) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java new file mode 100644 index 00000000000..d9a8526059f --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.FieldComparator; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + + +/** + * The SortStream emits a stream of Tuples sorted by a Comparator. + **/ + +public class SortStream extends TupleStream implements Expressible { + + private static final long serialVersionUID = 1; + + private TupleStream stream; + private StreamComparator comparator; + private Worker worker; + + public SortStream(TupleStream stream, StreamComparator comp) throws IOException { + init(stream,comp); + } + + public SortStream(StreamExpression expression,StreamFactory factory) throws IOException { + // grab all parameters out + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); + StreamExpressionNamedParameter byExpression = factory.getNamedOperand(expression, "by"); + + // validate expression contains only what we want. + if(expression.getParameters().size() != streamExpressions.size() + 1){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression)); + } + + if(1 != streamExpressions.size()){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size())); + } + + if(null == byExpression || !(byExpression.getParameter() instanceof StreamExpressionValue)){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'by' parameter listing fields to sort over but didn't find one",expression)); + } + + init( + factory.constructStream(streamExpressions.get(0)), + factory.constructComparator(((StreamExpressionValue)byExpression.getParameter()).getValue(), FieldComparator.class) + ); + } + + private void init(TupleStream stream, StreamComparator comp) throws IOException{ + this.stream = stream; + this.comparator = comp; + + // standard java modified merge sort + worker = new Worker() { + + private LinkedList tuples = new LinkedList(); + private Tuple eofTuple; + + public void readStream(TupleStream stream) throws IOException { + Tuple tuple = stream.read(); + while(!tuple.EOF){ + tuples.add(tuple); + tuple = stream.read(); + } + eofTuple = tuple; + } + + public void sort() { + tuples.sort(comparator); + } + + public Tuple read() { + if(tuples.isEmpty()){ + return eofTuple; + } + return tuples.removeFirst(); + } + }; + + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + // streams + if(stream instanceof Expressible){ + expression.addParameter(((Expressible)stream).toExpression(factory)); + } + else{ + throw new IOException("This SortStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + + // by + if(comparator instanceof Expressible){ + expression.addParameter(new StreamExpressionNamedParameter("by",((Expressible)comparator).toExpression(factory))); + } + else{ + throw new IOException("This SortStream contains a non-expressible equalitor - it cannot be converted to an expression"); + } + + return expression; + } + + public void setStreamContext(StreamContext context) { + this.stream.setStreamContext(context); + } + + public List children() { + List l = new ArrayList(); + l.add(stream); + return l; + } + + public void open() throws IOException { + stream.open(); + + worker.readStream(stream); + worker.sort(); + } + + public void close() throws IOException { + stream.close(); + } + + public Tuple read() throws IOException { + // return next from sorted order + return worker.read(); + } + + /** Return the stream sort - ie, the order in which records are returned */ + public StreamComparator getStreamSort(){ + return comparator; + } + + public int getCost() { + return 0; + } + + private interface Worker { + public void readStream(TupleStream stream) throws IOException; + public void sort(); + public Tuple read(); + } + +} \ No newline at end of file diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index e7f57c102b6..9ae6761ea29 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -134,6 +134,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { testRankStream(); testReducerStream(); testUniqueStream(); + testSortStream(); testRollupStream(); testStatsStream(); testNulls(); @@ -306,6 +307,47 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { commit(); } + private void testSortStream() throws Exception { + + indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); + indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0"); + indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); + indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); + indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); + indexr(id, "5", "a_s", "hello1", "a_i", "1", "a_f", "2"); + commit(); + + StreamExpression expression; + TupleStream stream; + List tuples; + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", zkServer.getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("sort", SortStream.class); + + // Basic test + stream = factory.constructStream("sort(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc\")"); + tuples = getTuples(stream); + assert(tuples.size() == 6); + assertOrder(tuples, 0,1,5,2,3,4); + + // Basic test desc + stream = factory.constructStream("sort(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i desc\")"); + tuples = getTuples(stream); + assert(tuples.size() == 6); + assertOrder(tuples, 4,3,2,1,5,0); + + // Basic w/multi comp + stream = factory.constructStream("sort(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), by=\"a_i asc, a_f desc\")"); + tuples = getTuples(stream); + assert(tuples.size() == 6); + assertOrder(tuples, 0,5,1,2,3,4); + + del("*:*"); + commit(); + } + private void testNulls() throws Exception {