diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 262ca790d12..aaf15baddec 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -101,6 +101,8 @@ New Features * SOLR-7584: Adds Inner and LeftOuter Joins to the Streaming API and Streaming Expressions (Dennis Gove, Corey Wu) +* SOLR-8188: Adds Hash and OuterHash Joins to the Streaming API and Streaming Expressions (Dennis Gove) + Optimizations ---------------------- * SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index bf19046ac6f..c2b89734817 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -33,7 +33,9 @@ import org.apache.solr.client.solrj.io.stream.CloudSolrStream; import org.apache.solr.client.solrj.io.stream.ExceptionStream; import org.apache.solr.client.solrj.io.stream.InnerJoinStream; import org.apache.solr.client.solrj.io.stream.LeftOuterJoinStream; +import org.apache.solr.client.solrj.io.stream.HashJoinStream; import org.apache.solr.client.solrj.io.stream.MergeStream; +import org.apache.solr.client.solrj.io.stream.OuterHashJoinStream; import org.apache.solr.client.solrj.io.stream.ParallelStream; import org.apache.solr.client.solrj.io.stream.RankStream; import org.apache.solr.client.solrj.io.stream.ReducerStream; @@ -103,8 +105,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { .withFunctionName("rollup", RollupStream.class) .withFunctionName("stats", StatsStream.class) .withFunctionName("innerJoin", InnerJoinStream.class) - .withFunctionName("leftOuterJoin", LeftOuterJoinStream.class) - + .withFunctionName("leftOuterJoin", LeftOuterJoinStream.class) + .withFunctionName("hashJoin", HashJoinStream.class) + .withFunctionName("outerHashJoin", OuterHashJoinStream.class) + // metrics .withFunctionName("min", MinMetric.class) .withFunctionName("max", MaxMetric.class) @@ -112,7 +116,6 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { .withFunctionName("sum", SumMetric.class) .withFunctionName("count", CountMetric.class) ; - // This pulls all the overrides and additions from the config Object functionMappingsObj = initArgs.get("streamFunctions"); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java new file mode 100644 index 00000000000..068a0916684 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +/** + * Takes two streams (fullStream and hashStream) and joins them similar to an InnerJoinStream. The difference + * in a HashJoinStream is that the tuples in the hashStream will all be read and hashed when this stream is + * opened. This provides a few optimizations iff the hashStream has a relatively small number of documents. + * You are expected to provide a set of fields for which the hash will be calculated from. If a tuple does + * not contain a value (ie, null) for one of the fields the hash is being computed on then that tuple will + * not be considered a match to anything. Ie, all fields which are part of the hash must have a non-null value. +**/ +public class HashJoinStream extends TupleStream implements Expressible { + + private static final long serialVersionUID = 1L; + + protected TupleStream hashStream; + protected TupleStream fullStream; + protected List hashOn; + protected HashMap> hashedTuples; + + protected Tuple workingFullTuple = null; + protected Integer workingFullHash = null; + protected int workngHashSetIdx = 0; + + public HashJoinStream(TupleStream fullStream, TupleStream hashStream, List hashOn) throws IOException { + init(fullStream, hashStream, hashOn); + } + + public HashJoinStream(StreamExpression expression,StreamFactory factory) throws IOException { + // grab all parameters out + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); + StreamExpressionNamedParameter hashStreamExpression = factory.getNamedOperand(expression, "hashed"); + StreamExpressionNamedParameter onExpression = factory.getNamedOperand(expression, "on"); + + // validate expression contains only what we want. + if(expression.getParameters().size() != streamExpressions.size() + 2){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression)); + } + + if(1 != streamExpressions.size()){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting two streams but found %d",expression, streamExpressions.size())); + } + + if(null == hashStreamExpression || !(hashStreamExpression.getParameter() instanceof StreamExpression)){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'hashed' parameter containing the stream to hash but didn't find one",expression)); + } + + if(null == onExpression || !(onExpression.getParameter() instanceof StreamExpressionValue)){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'on' parameter listing fields to hash on but didn't find one",expression)); + } + + String hashOnValue = ((StreamExpressionValue)onExpression.getParameter()).getValue(); + String[] parts = hashOnValue.split(","); + List hashOn = new ArrayList(parts.length); + for(String part : parts){ + hashOn.add(part.trim()); + } + + init( factory.constructStream(streamExpressions.get(0)), + factory.constructStream((StreamExpression)hashStreamExpression.getParameter()), + hashOn + ); + } + + private void init(TupleStream fullStream, TupleStream hashStream, List hashOn) throws IOException { + this.fullStream = fullStream; + this.hashStream = hashStream; + this.hashOn = hashOn; + this.hashedTuples = new HashMap<>(); + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + // streams + if(hashStream instanceof Expressible && fullStream instanceof Expressible){ + expression.addParameter(((Expressible)fullStream).toExpression(factory)); + expression.addParameter(new StreamExpressionNamedParameter("hashed", ((Expressible)hashStream).toExpression(factory))); + } + else{ + throw new IOException("This HashJoinStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + + // on + StringBuilder sb = new StringBuilder(); + for(String part : hashOn){ + if(sb.length() > 0){ sb.append(","); } + sb.append(part); + } + expression.addParameter(new StreamExpressionNamedParameter("on",sb.toString())); + + return expression; + } + + public void setStreamContext(StreamContext context) { + this.hashStream.setStreamContext(context); + this.fullStream.setStreamContext(context); + } + + public List children() { + List l = new ArrayList(); + l.add(hashStream); + l.add(fullStream); + return l; + } + + public void open() throws IOException { + hashStream.open(); + fullStream.open(); + + Tuple tuple = hashStream.read(); + while(!tuple.EOF){ + Integer hash = calculateHash(tuple); + if(null != hash){ + if(hashedTuples.containsKey(hash)){ + hashedTuples.get(hash).add(tuple); + } + else{ + ArrayList set = new ArrayList(); + set.add(tuple); + hashedTuples.put(hash, set); + } + } + tuple = hashStream.read(); + } + } + + protected Integer calculateHash(Tuple tuple){ + StringBuilder sb = new StringBuilder(); + for(String part : hashOn){ + Object obj = tuple.get(part); + if(null == obj){ + return null; + } + sb.append(obj.toString()); + sb.append("::"); // this is here to seperate fields + } + + return sb.toString().hashCode(); + } + + public void close() throws IOException { + hashStream.close(); + fullStream.close(); + } + + public Tuple read() throws IOException { + + findNextWorkingFullTuple: + while(null == workingFullTuple){ + Tuple fullTuple = fullStream.read(); + + // We're at the end of the line + if(fullTuple.EOF){ + return fullTuple; + } + + // If fullTuple doesn't have a valid hash or if there is no doc to + // join with then retry loop - keep going until we find one + Integer fullHash = calculateHash(fullTuple); + if(null == fullHash || !hashedTuples.containsKey(fullHash)){ + continue findNextWorkingFullTuple; + } + + workingFullTuple = fullTuple; + workingFullHash = fullHash; + workngHashSetIdx = 0; + } + + // At this point we know we have at least one doc to match on + // Due to the check at the end, before returning, we know we have at least one to match with left + List matches = hashedTuples.get(workingFullHash); + Tuple returnTuple = workingFullTuple.clone(); + returnTuple.merge(matches.get(workngHashSetIdx)); + + // Increment this so the next time we hit the next matching tuple + workngHashSetIdx++; + + if(workngHashSetIdx >= matches.size()){ + // well, now we've reached all the matches, clear it all out + workingFullTuple = null; + workingFullHash = null; + workngHashSetIdx = 0; + } + + return returnTuple; + + } + + @Override + public StreamComparator getStreamSort() { + return fullStream.getStreamSort(); + } + + public int getCost() { + return 0; + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java new file mode 100644 index 00000000000..dfb83bfe049 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.List; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +/** + * Takes two streams (fullStream and hashStream) and joins them similar to an LeftOuterJoinStream. The difference + * in a OuterHashJoinStream is that the tuples in the hashStream will all be read and hashed when this stream is + * opened. This provides a few optimizations iff the hashStream has a relatively small number of documents. + * The difference between this and a HashJoinStream is that a tuple in the fullStream will be returned even + * if it doesn't have any matching tuples in the hashStream. + * You are expected to provide a set of fields for which the hash will be calculated from. If a tuple from the + * hashStream does not contain a value (ie, null) for one of the fields the hash is being computed on then that + * tuple will not be considered a match to anything. If a tuple from the fullStream does not contain a value (ie, null) + * for one of the fields the hash is being computed on then that tuple will be returned without any joined tuples + * from the hashStream +**/ +public class OuterHashJoinStream extends HashJoinStream implements Expressible { + + private static final long serialVersionUID = 1L; + + public OuterHashJoinStream(TupleStream fullStream, TupleStream hashStream, List hashOn) throws IOException { + super(fullStream, hashStream, hashOn); + } + + public OuterHashJoinStream(StreamExpression expression,StreamFactory factory) throws IOException { + super(expression, factory); + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + // streams + if(hashStream instanceof Expressible && fullStream instanceof Expressible){ + expression.addParameter(((Expressible)fullStream).toExpression(factory)); + expression.addParameter(new StreamExpressionNamedParameter("hashed", ((Expressible)hashStream).toExpression(factory))); + } + else{ + throw new IOException("This OuterHashJoinStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + + // on + StringBuilder sb = new StringBuilder(); + for(String part : hashOn){ + if(sb.length() > 0){ sb.append(","); } + sb.append(part); + } + expression.addParameter(new StreamExpressionNamedParameter("on",sb.toString())); + + return expression; + } + + public Tuple read() throws IOException { + + if(null == workingFullTuple){ + Tuple fullTuple = fullStream.read(); + + // We're at the end of the line + if(fullTuple.EOF){ + return fullTuple; + } + + // If fullTuple doesn't have a valid hash or the hash cannot be found in the hashedTuples then + // return the tuple from fullStream. + // This is an outer join so there is no requirement there be a matching value in the hashed stream + Integer fullHash = calculateHash(fullTuple); + if(null == fullHash || !hashedTuples.containsKey(fullHash)){ + return fullTuple.clone(); + } + + workingFullTuple = fullTuple; + workingFullHash = fullHash; + workngHashSetIdx = 0; + } + + // At this point we know we have at least one doc to match on + // Due to the check at the end, before returning, we know we have at least one to match with left + List matches = hashedTuples.get(workingFullHash); + Tuple returnTuple = workingFullTuple.clone(); + returnTuple.merge(matches.get(workngHashSetIdx)); + + // Increment this so the next time we hit the next matching tuple + workngHashSetIdx++; + + if(workngHashSetIdx >= matches.size()){ + // well, now we've reached all the matches, clear it all out + workingFullTuple = null; + workingFullHash = null; + workngHashSetIdx = 0; + } + + return returnTuple; + + } + +} \ No newline at end of file diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index ea483121c30..02354e7d830 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -132,6 +132,8 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { testParallelRollupStream(); testInnerJoinStream(); testLeftOuterJoinStream(); + testHashJoinStream(); + testOuterHashJoinStream(); } private void testCloudSolrStream() throws Exception { @@ -1153,6 +1155,131 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { del("*:*"); commit(); } + + private void testHashJoinStream() throws Exception { + + indexr(id, "1", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9 + indexr(id, "15", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9 + indexr(id, "2", "side_s", "left", "join1_i", "0", "join2_s", "b", "ident_s", "left_2"); + indexr(id, "3", "side_s", "left", "join1_i", "1", "join2_s", "a", "ident_s", "left_3"); // 10 + indexr(id, "4", "side_s", "left", "join1_i", "1", "join2_s", "b", "ident_s", "left_4"); // 11 + indexr(id, "5", "side_s", "left", "join1_i", "1", "join2_s", "c", "ident_s", "left_5"); // 12 + indexr(id, "6", "side_s", "left", "join1_i", "2", "join2_s", "d", "ident_s", "left_6"); + indexr(id, "7", "side_s", "left", "join1_i", "3", "join2_s", "e", "ident_s", "left_7"); // 14 + + indexr(id, "8", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_1", "join3_i", "0"); // 1,15 + indexr(id, "9", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_2", "join3_i", "0"); // 1,15 + indexr(id, "10", "side_s", "right", "join1_i", "1", "join2_s", "a", "ident_s", "right_3", "join3_i", "1"); // 3 + indexr(id, "11", "side_s", "right", "join1_i", "1", "join2_s", "b", "ident_s", "right_4", "join3_i", "1"); // 4 + indexr(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1"); // 5 + indexr(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2"); + indexr(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3"); // 7 + commit(); + + StreamExpression expression; + TupleStream stream; + List tuples; + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", zkServer.getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("hashJoin", HashJoinStream.class); + + // Basic test + expression = StreamExpressionParser.parse("hashJoin(" + + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\")," + + "hashed=search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\")," + + "on=\"join1_i, join2_s\")"); + stream = new HashJoinStream(expression, factory); + tuples = getTuples(stream); + assert(tuples.size() == 8); + assertOrder(tuples, 1,1,15,15,3,4,5,7); + + // Basic desc + expression = StreamExpressionParser.parse("hashJoin(" + + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\")," + + "hashed=search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\")," + + "on=\"join1_i, join2_s\")"); + stream = new HashJoinStream(expression, factory); + tuples = getTuples(stream); + assert(tuples.size() == 8); + assertOrder(tuples, 7,3,4,5,1,1,15,15); + + // Results in both searches, no join matches + expression = StreamExpressionParser.parse("hashJoin(" + + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\")," + + "hashed=search(collection1, q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\")," + + "on=\"ident_s\")"); + stream = new HashJoinStream(expression, factory); + tuples = getTuples(stream); + assert(tuples.size() == 0); + + del("*:*"); + commit(); + } + + private void testOuterHashJoinStream() throws Exception { + + indexr(id, "1", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9 + indexr(id, "15", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9 + indexr(id, "2", "side_s", "left", "join1_i", "0", "join2_s", "b", "ident_s", "left_2"); + indexr(id, "3", "side_s", "left", "join1_i", "1", "join2_s", "a", "ident_s", "left_3"); // 10 + indexr(id, "4", "side_s", "left", "join1_i", "1", "join2_s", "b", "ident_s", "left_4"); // 11 + indexr(id, "5", "side_s", "left", "join1_i", "1", "join2_s", "c", "ident_s", "left_5"); // 12 + indexr(id, "6", "side_s", "left", "join1_i", "2", "join2_s", "d", "ident_s", "left_6"); + indexr(id, "7", "side_s", "left", "join1_i", "3", "join2_s", "e", "ident_s", "left_7"); // 14 + + indexr(id, "8", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_1", "join3_i", "0"); // 1,15 + indexr(id, "9", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_2", "join3_i", "0"); // 1,15 + indexr(id, "10", "side_s", "right", "join1_i", "1", "join2_s", "a", "ident_s", "right_3", "join3_i", "1"); // 3 + indexr(id, "11", "side_s", "right", "join1_i", "1", "join2_s", "b", "ident_s", "right_4", "join3_i", "1"); // 4 + indexr(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1"); // 5 + indexr(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2"); + indexr(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3"); // 7 + commit(); + + StreamExpression expression; + TupleStream stream; + List tuples; + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", zkServer.getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("outerHashJoin", OuterHashJoinStream.class); + + // Basic test + expression = StreamExpressionParser.parse("outerHashJoin(" + + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\")," + + "hashed=search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc\")," + + "on=\"join1_i, join2_s\")"); + stream = new OuterHashJoinStream(expression, factory); + tuples = getTuples(stream); + assert(tuples.size() == 10); + assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7); + + // Basic desc + expression = StreamExpressionParser.parse("outerHashJoin(" + + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\")," + + "hashed=search(collection1, q=\"side_s:right\", fl=\"join1_i,join2_s,ident_s\", sort=\"join1_i desc, join2_s asc\")," + + "on=\"join1_i, join2_s\")"); + stream = new OuterHashJoinStream(expression, factory); + tuples = getTuples(stream); + assert(tuples.size() == 10); + assertOrder(tuples, 7,6,3,4,5,1,1,15,15,2); + + // Results in both searches, no join matches + expression = StreamExpressionParser.parse("outerHashJoin(" + + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\")," + + "hashed=search(collection1, q=\"side_s:right\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"ident_s asc\")," + + "on=\"ident_s\")"); + stream = new OuterHashJoinStream(expression, factory); + tuples = getTuples(stream); + assert(tuples.size() == 8); + assertOrder(tuples, 1,15,2,3,4,5,6,7); + + del("*:*"); + commit(); + } protected List getTuples(TupleStream tupleStream) throws IOException { tupleStream.open();