diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index bcf014ac4b4..06ba91a7fb1 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -158,6 +158,9 @@ Bug Fixes * SOLR-9036: Solr slave is doing full replication (entire index) of index after master restart. (Lior Sapir, Mark Miller, shalin) +* SOLR-9058: Makes HashJoinStream and OuterHashJoinStream support different field names in the + incoming streams, eg. fieldA=fieldB. (Dennis Gove, Stephan Osthold) + Optimizations ---------------------- * SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation. diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java index dfb678f63fc..feff591fbf2 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java @@ -47,7 +47,8 @@ public class HashJoinStream extends TupleStream implements Expressible { protected TupleStream hashStream; protected TupleStream fullStream; - protected List hashOn; + protected List leftHashOn; + protected List rightHashOn; protected HashMap> hashedTuples; protected Tuple workingFullTuple = null; @@ -97,8 +98,25 @@ public class HashJoinStream extends TupleStream implements Expressible { private void init(TupleStream fullStream, TupleStream hashStream, List hashOn) throws IOException { this.fullStream = fullStream; this.hashStream = hashStream; - this.hashOn = hashOn; this.hashedTuples = new HashMap<>(); + this.leftHashOn = new ArrayList<>(); + this.rightHashOn = new ArrayList<>(); + + for(String hasher : hashOn){ + String[] parts = hasher.split("="); + if(1 == parts.length){ + String field = parts[0].trim(); + leftHashOn.add(field); + rightHashOn.add(field); + } + else if(2 == parts.length){ + leftHashOn.add(parts[0].trim()); + rightHashOn.add(parts[1].trim()); + } + else{ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - invalid 'on' parameter - expecting 1 or more instances if 'field' or 'field=hashedField' but found '%s'",hasher)); + } + } } @Override @@ -127,12 +145,24 @@ public class HashJoinStream extends TupleStream implements Expressible { // on StringBuilder sb = new StringBuilder(); - for(String part : hashOn){ + for(int idx = 0; idx < leftHashOn.size(); ++idx){ if(sb.length() > 0){ sb.append(","); } - sb.append(part); + + // we know that left and right hashOns are the same size + String left = leftHashOn.get(idx); + String right = rightHashOn.get(idx); + + if(left.equals(right)){ + sb.append(left); + } + else{ + sb.append(left); + sb.append("="); + sb.append(right); + } } - expression.addParameter(new StreamExpressionNamedParameter("on",sb.toString())); + expression.addParameter(new StreamExpressionNamedParameter("on",sb.toString())); return expression; } @@ -168,7 +198,7 @@ public class HashJoinStream extends TupleStream implements Expressible { Tuple tuple = hashStream.read(); while(!tuple.EOF){ - Integer hash = calculateHash(tuple); + Integer hash = calculateHash(tuple, rightHashOn); if(null != hash){ if(hashedTuples.containsKey(hash)){ hashedTuples.get(hash).add(tuple); @@ -183,7 +213,7 @@ public class HashJoinStream extends TupleStream implements Expressible { } } - protected Integer calculateHash(Tuple tuple){ + protected Integer calculateHash(Tuple tuple, List hashOn){ StringBuilder sb = new StringBuilder(); for(String part : hashOn){ Object obj = tuple.get(part); @@ -191,7 +221,7 @@ public class HashJoinStream extends TupleStream implements Expressible { return null; } sb.append(obj.toString()); - sb.append("::"); // this is here to seperate fields + sb.append("::"); // this is here to separate fields } return sb.toString().hashCode(); @@ -215,7 +245,7 @@ public class HashJoinStream extends TupleStream implements Expressible { // If fullTuple doesn't have a valid hash or if there is no doc to // join with then retry loop - keep going until we find one - Integer fullHash = calculateHash(fullTuple); + Integer fullHash = calculateHash(fullTuple, leftHashOn); if(null == fullHash || !hashedTuples.containsKey(fullHash)){ continue findNextWorkingFullTuple; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java index 23a99a52416..9e8233492dc 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java @@ -65,9 +65,21 @@ public class OuterHashJoinStream extends HashJoinStream implements Expressible { // on StringBuilder sb = new StringBuilder(); - for(String part : hashOn){ + for(int idx = 0; idx < leftHashOn.size(); ++idx){ if(sb.length() > 0){ sb.append(","); } - sb.append(part); + + // we know that left and right hashOns are the same size + String left = leftHashOn.get(idx); + String right = rightHashOn.get(idx); + + if(left.equals(right)){ + sb.append(left); + } + else{ + sb.append(left); + sb.append("="); + sb.append(right); + } } expression.addParameter(new StreamExpressionNamedParameter("on",sb.toString())); @@ -87,7 +99,7 @@ public class OuterHashJoinStream extends HashJoinStream implements Expressible { // If fullTuple doesn't have a valid hash or the hash cannot be found in the hashedTuples then // return the tuple from fullStream. // This is an outer join so there is no requirement there be a matching value in the hashed stream - Integer fullHash = calculateHash(fullTuple); + Integer fullHash = calculateHash(fullTuple, leftHashOn); if(null == fullHash || !hashedTuples.containsKey(fullHash)){ return fullTuple.clone(); } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 60187e4a790..4d8e616c671 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -1452,6 +1452,16 @@ public class StreamExpressionTest extends SolrCloudTestCase { stream = new HashJoinStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 0); + + // Basic test with "on" mapping + expression = StreamExpressionParser.parse("hashJoin(" + + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join3_i,ident_s\", sort=\"join1_i asc, join3_i asc, id asc\")," + + "hashed=search(collection1, q=\"side_s:right\", fl=\"join1_i,join3_i,ident_s\", sort=\"join1_i asc, join3_i asc\")," + + "on=\"join1_i=join3_i\")"); + stream = new HashJoinStream(expression, factory); + tuples = getTuples(stream); + assertEquals(17, tuples.size()); + assertOrder(tuples, 1, 1, 2, 2, 15, 15, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 7); } @@ -1516,6 +1526,15 @@ public class StreamExpressionTest extends SolrCloudTestCase { assert(tuples.size() == 8); assertOrder(tuples, 1,15,2,3,4,5,6,7); + // Basic test + expression = StreamExpressionParser.parse("outerHashJoin(" + + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\")," + + "hashed=search(collection1, q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join2_s asc\")," + + "on=\"join1_i=join3_i, join2_s\")"); + stream = new OuterHashJoinStream(expression, factory); + tuples = getTuples(stream); + assert(tuples.size() == 10); + assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7); } @Test