From 8309dc9b3231979846558165401f1c4d55515d0a Mon Sep 17 00:00:00 2001 From: Dennis Gove Date: Sat, 7 Nov 2015 22:08:56 +0000 Subject: [PATCH] SOLR-7938: MergeStream now supports merging more than 2 streams together git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1713190 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 1 + .../client/solrj/io/stream/MergeStream.java | 167 ++++++++++++------ .../solrj/io/stream/StreamExpressionTest.java | 11 ++ 3 files changed, 125 insertions(+), 54 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 5399031f36c..dacafac8887 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -83,6 +83,7 @@ New Features * SOLR-6273: Cross Data Center Replication. Active/passive replication for separate SolrClouds hosted on separate data centers. (Renaud Delbru, Yonik Seeley via Erick Erickson) +* SOLR-7938: MergeStream now supports merging more than 2 streams together (Dennis Gove) Optimizations ---------------------- diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java index 5814c0cc855..ee776483e52 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java @@ -32,21 +32,22 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; /** -* Unions streamA with streamB ordering the Tuples based on a Comparator. -* Both streams must be sorted by the fields being compared. +* Merges two or more streams together ordering the Tuples based on a Comparator. +* All streams must be sorted by the fields being compared - this will be validated on construction. **/ - - public class MergeStream extends TupleStream implements Expressible { private static final long serialVersionUID = 1; - private PushBackStream streamA; - private PushBackStream streamB; + private PushBackStream[] streams; private StreamComparator comp; public MergeStream(TupleStream streamA, TupleStream streamB, StreamComparator comp) throws IOException { - init(streamA, streamB, comp); + init(comp, streamA, streamB); + } + + public MergeStream(StreamComparator comp, TupleStream ... streams) throws IOException { + init(comp, streams); } public MergeStream(StreamExpression expression,StreamFactory factory) throws IOException { @@ -59,29 +60,39 @@ public class MergeStream extends TupleStream implements Expressible { throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression)); } - if(2 != streamExpressions.size()){ - throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting two streams but found %d (must be PushBackStream types)",expression, streamExpressions.size())); + if(streamExpressions.size() < 2){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting at least two streams but found %d (must be PushBackStream types)",expression, streamExpressions.size())); } if(null == onExpression || !(onExpression.getParameter() instanceof StreamExpressionValue)){ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'on' parameter listing fields to merge on but didn't find one",expression)); } - init( factory.constructStream(streamExpressions.get(0)), - factory.constructStream(streamExpressions.get(1)), - factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldComparator.class) + TupleStream[] streams = new TupleStream[streamExpressions.size()]; + for(int idx = 0; idx < streamExpressions.size(); ++idx){ + streams[idx] = factory.constructStream(streamExpressions.get(idx)); + } + + init( factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldComparator.class), + streams ); } - private void init(TupleStream streamA, TupleStream streamB, StreamComparator comp) throws IOException { - this.streamA = new PushBackStream(streamA); - this.streamB = new PushBackStream(streamB); - this.comp = comp; - - // streamA and streamB must both be sorted so that comp can be derived from - if(!comp.isDerivedFrom(streamA.getStreamSort()) || !comp.isDerivedFrom(streamB.getStreamSort())){ - throw new IOException("Invalid MergeStream - both substream comparators (sort) must be a superset of this stream's comparator."); + private void init(StreamComparator comp, TupleStream ... streams) throws IOException { + + // All streams must both be sorted so that comp can be derived from + for(TupleStream stream : streams){ + if(!comp.isDerivedFrom(stream.getStreamSort())){ + throw new IOException("Invalid MergeStream - all substream comparators (sort) must be a superset of this stream's comparator."); + } } + + // Convert to PushBack streams so we can push back tuples + this.streams = new PushBackStream[streams.length]; + for(int idx = 0; idx < streams.length; ++idx){ + this.streams[idx] = new PushBackStream(streams[idx]); + } + this.comp = comp; } @Override @@ -90,8 +101,9 @@ public class MergeStream extends TupleStream implements Expressible { StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); // streams - expression.addParameter(streamA.toExpression(factory)); - expression.addParameter(streamB.toExpression(factory)); + for(PushBackStream stream : streams){ + expression.addParameter(stream.toExpression(factory)); + } // on expression.addParameter(new StreamExpressionNamedParameter("on",comp.toExpression(factory))); @@ -100,54 +112,101 @@ public class MergeStream extends TupleStream implements Expressible { } public void setStreamContext(StreamContext context) { - this.streamA.setStreamContext(context); - this.streamB.setStreamContext(context); + for(PushBackStream stream : streams){ + stream.setStreamContext(context); + } } public List children() { List l = new ArrayList(); - l.add(streamA); - l.add(streamB); + for(PushBackStream stream : streams){ + l.add(stream); + } return l; } public void open() throws IOException { - streamA.open(); - streamB.open(); + for(PushBackStream stream : streams){ + stream.open(); + } } public void close() throws IOException { - streamA.close(); - streamB.close(); + for(PushBackStream stream : streams){ + stream.close(); + } } public Tuple read() throws IOException { - Tuple a = streamA.read(); - Tuple b = streamB.read(); - - if(a.EOF && b.EOF) { - return a; + + // might be able to optimize this by sorting the streams based on the next to read tuple from each. + // if we can ensure the sort of the streams and update it in less than linear time then there would + // be some performance gain. But, assuming the # of streams is kinda small then this might not be + // worth it + + Tuple minimum = null; + PushBackStream minimumStream = null; + for(PushBackStream stream : streams){ + Tuple current = stream.read(); + + if(current.EOF){ + stream.pushBack(current); + continue; + } + + if(null == minimum){ + minimum = current; + minimumStream = stream; + continue; + } + + if(comp.compare(current, minimum) < 0){ + // Push back on its stream + minimumStream.pushBack(minimum); + + minimum = current; + minimumStream = stream; + continue; + } + else{ + stream.pushBack(current); + } } - - if(a.EOF) { - streamA.pushBack(a); - return b; - } - - if(b.EOF) { - streamB.pushBack(b); - return a; - } - - int c = comp.compare(a,b); - - if(c < 0) { - streamB.pushBack(b); - return a; - } else { - streamA.pushBack(a); - return b; + + // If all EOF then min will be null, else min is the current minimum + if(null == minimum){ + // return EOF, doesn't matter which cause we're done + return streams[0].read(); } + + return minimum; + +// Tuple a = streamA.read(); +// Tuple b = streamB.read(); +// +// if(a.EOF && b.EOF) { +// return a; +// } +// +// if(a.EOF) { +// streamA.pushBack(a); +// return b; +// } +// +// if(b.EOF) { +// streamB.pushBack(b); +// return a; +// } +// +// int c = comp.compare(a,b); +// +// if(c < 0) { +// streamB.pushBack(b); +// return a; +// } else { +// streamA.pushBack(a); +// return b; +// } } /** Return the stream sort - ie, the order in which records are returned */ diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 32644709a11..39c3ee964e6 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -341,6 +341,17 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { assert(tuples.size() == 5); assertOrder(tuples, 0,2,1,3,4); + // full factory w/multi streams + stream = factory.constructStream("merge(" + + "search(collection1, q=\"id:(0 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\")," + + "search(collection1, q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\")," + + "search(collection1, q=\"id:(2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\")," + + "on=\"a_f asc\")"); + tuples = getTuples(stream); + + assert(tuples.size() == 4); + assertOrder(tuples, 0,2,1,4); + del("*:*"); commit(); }