SOLR-9058: Makes HashJoinStream and OuterHashJoinStream support different field names in the incoming streams, eg. fieldA=fieldB

This commit is contained in:
Dennis Gove 2016-05-05 20:30:39 -04:00
parent 06a675ce2c
commit d95a91a9cc
4 changed files with 76 additions and 12 deletions

View File

@ -158,6 +158,9 @@ Bug Fixes
* SOLR-9036: Solr slave is doing full replication (entire index) of index after master restart.
(Lior Sapir, Mark Miller, shalin)
* SOLR-9058: Makes HashJoinStream and OuterHashJoinStream support different field names in the
incoming streams, eg. fieldA=fieldB. (Dennis Gove, Stephan Osthold)
Optimizations
----------------------
* SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.

View File

@ -47,7 +47,8 @@ public class HashJoinStream extends TupleStream implements Expressible {
protected TupleStream hashStream;
protected TupleStream fullStream;
protected List<String> hashOn;
protected List<String> leftHashOn;
protected List<String> rightHashOn;
protected HashMap<Integer, List<Tuple>> hashedTuples;
protected Tuple workingFullTuple = null;
@ -97,8 +98,25 @@ public class HashJoinStream extends TupleStream implements Expressible {
private void init(TupleStream fullStream, TupleStream hashStream, List<String> hashOn) throws IOException {
this.fullStream = fullStream;
this.hashStream = hashStream;
this.hashOn = hashOn;
this.hashedTuples = new HashMap<>();
this.leftHashOn = new ArrayList<>();
this.rightHashOn = new ArrayList<>();
for(String hasher : hashOn){
String[] parts = hasher.split("=");
if(1 == parts.length){
String field = parts[0].trim();
leftHashOn.add(field);
rightHashOn.add(field);
}
else if(2 == parts.length){
leftHashOn.add(parts[0].trim());
rightHashOn.add(parts[1].trim());
}
else{
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - invalid 'on' parameter - expecting 1 or more instances if 'field' or 'field=hashedField' but found '%s'",hasher));
}
}
}
@Override
@ -127,12 +145,24 @@ public class HashJoinStream extends TupleStream implements Expressible {
// on
StringBuilder sb = new StringBuilder();
for(String part : hashOn){
for(int idx = 0; idx < leftHashOn.size(); ++idx){
if(sb.length() > 0){ sb.append(","); }
sb.append(part);
// we know that left and right hashOns are the same size
String left = leftHashOn.get(idx);
String right = rightHashOn.get(idx);
if(left.equals(right)){
sb.append(left);
}
else{
sb.append(left);
sb.append("=");
sb.append(right);
}
}
expression.addParameter(new StreamExpressionNamedParameter("on",sb.toString()));
expression.addParameter(new StreamExpressionNamedParameter("on",sb.toString()));
return expression;
}
@ -168,7 +198,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
Tuple tuple = hashStream.read();
while(!tuple.EOF){
Integer hash = calculateHash(tuple);
Integer hash = calculateHash(tuple, rightHashOn);
if(null != hash){
if(hashedTuples.containsKey(hash)){
hashedTuples.get(hash).add(tuple);
@ -183,7 +213,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
}
}
protected Integer calculateHash(Tuple tuple){
protected Integer calculateHash(Tuple tuple, List<String> hashOn){
StringBuilder sb = new StringBuilder();
for(String part : hashOn){
Object obj = tuple.get(part);
@ -191,7 +221,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
return null;
}
sb.append(obj.toString());
sb.append("::"); // this is here to seperate fields
sb.append("::"); // this is here to separate fields
}
return sb.toString().hashCode();
@ -215,7 +245,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
// If fullTuple doesn't have a valid hash or if there is no doc to
// join with then retry loop - keep going until we find one
Integer fullHash = calculateHash(fullTuple);
Integer fullHash = calculateHash(fullTuple, leftHashOn);
if(null == fullHash || !hashedTuples.containsKey(fullHash)){
continue findNextWorkingFullTuple;
}

View File

@ -65,9 +65,21 @@ public class OuterHashJoinStream extends HashJoinStream implements Expressible {
// on
StringBuilder sb = new StringBuilder();
for(String part : hashOn){
for(int idx = 0; idx < leftHashOn.size(); ++idx){
if(sb.length() > 0){ sb.append(","); }
sb.append(part);
// we know that left and right hashOns are the same size
String left = leftHashOn.get(idx);
String right = rightHashOn.get(idx);
if(left.equals(right)){
sb.append(left);
}
else{
sb.append(left);
sb.append("=");
sb.append(right);
}
}
expression.addParameter(new StreamExpressionNamedParameter("on",sb.toString()));
@ -87,7 +99,7 @@ public class OuterHashJoinStream extends HashJoinStream implements Expressible {
// If fullTuple doesn't have a valid hash or the hash cannot be found in the hashedTuples then
// return the tuple from fullStream.
// This is an outer join so there is no requirement there be a matching value in the hashed stream
Integer fullHash = calculateHash(fullTuple);
Integer fullHash = calculateHash(fullTuple, leftHashOn);
if(null == fullHash || !hashedTuples.containsKey(fullHash)){
return fullTuple.clone();
}

View File

@ -1452,6 +1452,16 @@ public class StreamExpressionTest extends SolrCloudTestCase {
stream = new HashJoinStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 0);
// Basic test with "on" mapping
expression = StreamExpressionParser.parse("hashJoin("
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join3_i,ident_s\", sort=\"join1_i asc, join3_i asc, id asc\"),"
+ "hashed=search(collection1, q=\"side_s:right\", fl=\"join1_i,join3_i,ident_s\", sort=\"join1_i asc, join3_i asc\"),"
+ "on=\"join1_i=join3_i\")");
stream = new HashJoinStream(expression, factory);
tuples = getTuples(stream);
assertEquals(17, tuples.size());
assertOrder(tuples, 1, 1, 2, 2, 15, 15, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 7);
}
@ -1516,6 +1526,15 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assert(tuples.size() == 8);
assertOrder(tuples, 1,15,2,3,4,5,6,7);
// Basic test
expression = StreamExpressionParser.parse("outerHashJoin("
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ "hashed=search(collection1, q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join2_s asc\"),"
+ "on=\"join1_i=join3_i, join2_s\")");
stream = new OuterHashJoinStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 10);
assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7);
}
@Test