SOLR-7938: MergeStream now supports merging more than 2 streams together

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1713190 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Dennis Gove 2015-11-07 22:08:56 +00:00
parent 3f21788fbd
commit 8309dc9b32
3 changed files with 125 additions and 54 deletions

View File

@ -83,6 +83,7 @@ New Features
* SOLR-6273: Cross Data Center Replication. Active/passive replication for separate
SolrClouds hosted on separate data centers. (Renaud Delbru, Yonik Seeley via Erick Erickson)
* SOLR-7938: MergeStream now supports merging more than 2 streams together (Dennis Gove)
Optimizations
----------------------

View File

@ -32,21 +32,22 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/**
* Unions streamA with streamB ordering the Tuples based on a Comparator.
* Both streams must be sorted by the fields being compared.
* Merges two or more streams together ordering the Tuples based on a Comparator.
* All streams must be sorted by the fields being compared - this will be validated on construction.
**/
public class MergeStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1;
private PushBackStream streamA;
private PushBackStream streamB;
private PushBackStream[] streams;
private StreamComparator comp;
public MergeStream(TupleStream streamA, TupleStream streamB, StreamComparator comp) throws IOException {
init(streamA, streamB, comp);
init(comp, streamA, streamB);
}
public MergeStream(StreamComparator comp, TupleStream ... streams) throws IOException {
init(comp, streams);
}
public MergeStream(StreamExpression expression,StreamFactory factory) throws IOException {
@ -59,29 +60,39 @@ public class MergeStream extends TupleStream implements Expressible {
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
}
if(2 != streamExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting two streams but found %d (must be PushBackStream types)",expression, streamExpressions.size()));
if(streamExpressions.size() < 2){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting at least two streams but found %d (must be PushBackStream types)",expression, streamExpressions.size()));
}
if(null == onExpression || !(onExpression.getParameter() instanceof StreamExpressionValue)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'on' parameter listing fields to merge on but didn't find one",expression));
}
init( factory.constructStream(streamExpressions.get(0)),
factory.constructStream(streamExpressions.get(1)),
factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldComparator.class)
TupleStream[] streams = new TupleStream[streamExpressions.size()];
for(int idx = 0; idx < streamExpressions.size(); ++idx){
streams[idx] = factory.constructStream(streamExpressions.get(idx));
}
init( factory.constructComparator(((StreamExpressionValue)onExpression.getParameter()).getValue(), FieldComparator.class),
streams
);
}
private void init(TupleStream streamA, TupleStream streamB, StreamComparator comp) throws IOException {
this.streamA = new PushBackStream(streamA);
this.streamB = new PushBackStream(streamB);
this.comp = comp;
// streamA and streamB must both be sorted so that comp can be derived from
if(!comp.isDerivedFrom(streamA.getStreamSort()) || !comp.isDerivedFrom(streamB.getStreamSort())){
throw new IOException("Invalid MergeStream - both substream comparators (sort) must be a superset of this stream's comparator.");
private void init(StreamComparator comp, TupleStream ... streams) throws IOException {
// All streams must both be sorted so that comp can be derived from
for(TupleStream stream : streams){
if(!comp.isDerivedFrom(stream.getStreamSort())){
throw new IOException("Invalid MergeStream - all substream comparators (sort) must be a superset of this stream's comparator.");
}
}
// Convert to PushBack streams so we can push back tuples
this.streams = new PushBackStream[streams.length];
for(int idx = 0; idx < streams.length; ++idx){
this.streams[idx] = new PushBackStream(streams[idx]);
}
this.comp = comp;
}
@Override
@ -90,8 +101,9 @@ public class MergeStream extends TupleStream implements Expressible {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
expression.addParameter(streamA.toExpression(factory));
expression.addParameter(streamB.toExpression(factory));
for(PushBackStream stream : streams){
expression.addParameter(stream.toExpression(factory));
}
// on
expression.addParameter(new StreamExpressionNamedParameter("on",comp.toExpression(factory)));
@ -100,54 +112,101 @@ public class MergeStream extends TupleStream implements Expressible {
}
public void setStreamContext(StreamContext context) {
this.streamA.setStreamContext(context);
this.streamB.setStreamContext(context);
for(PushBackStream stream : streams){
stream.setStreamContext(context);
}
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
l.add(streamA);
l.add(streamB);
for(PushBackStream stream : streams){
l.add(stream);
}
return l;
}
public void open() throws IOException {
streamA.open();
streamB.open();
for(PushBackStream stream : streams){
stream.open();
}
}
public void close() throws IOException {
streamA.close();
streamB.close();
for(PushBackStream stream : streams){
stream.close();
}
}
public Tuple read() throws IOException {
Tuple a = streamA.read();
Tuple b = streamB.read();
if(a.EOF && b.EOF) {
return a;
// might be able to optimize this by sorting the streams based on the next to read tuple from each.
// if we can ensure the sort of the streams and update it in less than linear time then there would
// be some performance gain. But, assuming the # of streams is kinda small then this might not be
// worth it
Tuple minimum = null;
PushBackStream minimumStream = null;
for(PushBackStream stream : streams){
Tuple current = stream.read();
if(current.EOF){
stream.pushBack(current);
continue;
}
if(null == minimum){
minimum = current;
minimumStream = stream;
continue;
}
if(comp.compare(current, minimum) < 0){
// Push back on its stream
minimumStream.pushBack(minimum);
minimum = current;
minimumStream = stream;
continue;
}
else{
stream.pushBack(current);
}
}
if(a.EOF) {
streamA.pushBack(a);
return b;
}
if(b.EOF) {
streamB.pushBack(b);
return a;
}
int c = comp.compare(a,b);
if(c < 0) {
streamB.pushBack(b);
return a;
} else {
streamA.pushBack(a);
return b;
// If all EOF then min will be null, else min is the current minimum
if(null == minimum){
// return EOF, doesn't matter which cause we're done
return streams[0].read();
}
return minimum;
// Tuple a = streamA.read();
// Tuple b = streamB.read();
//
// if(a.EOF && b.EOF) {
// return a;
// }
//
// if(a.EOF) {
// streamA.pushBack(a);
// return b;
// }
//
// if(b.EOF) {
// streamB.pushBack(b);
// return a;
// }
//
// int c = comp.compare(a,b);
//
// if(c < 0) {
// streamB.pushBack(b);
// return a;
// } else {
// streamA.pushBack(a);
// return b;
// }
}
/** Return the stream sort - ie, the order in which records are returned */

View File

@ -341,6 +341,17 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
assert(tuples.size() == 5);
assertOrder(tuples, 0,2,1,3,4);
// full factory w/multi streams
stream = factory.constructStream("merge("
+ "search(collection1, q=\"id:(0 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "search(collection1, q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "search(collection1, q=\"id:(2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "on=\"a_f asc\")");
tuples = getTuples(stream);
assert(tuples.size() == 4);
assertOrder(tuples, 0,2,1,4);
del("*:*");
commit();
}