SOLR-12355: Fixes hash conflict in HashJoinStream and OuterHashJoinStream

This commit is contained in:
Dennis Gove 2018-05-15 10:23:25 -04:00
parent 4da0d68981
commit f506bc9cb7
4 changed files with 85 additions and 7 deletions

View File

@ -223,6 +223,8 @@ Bug Fixes
* SOLR-12200: abandon OverseerExitThread when ZkController is closed. (Mikhail Khludnev)
* SOLR-12355: Fixes hash conflict in HashJoinStream and OuterHashJoinStream (Dennis Gove)
Optimizations
----------------------

View File

@ -50,10 +50,10 @@ public class HashJoinStream extends TupleStream implements Expressible {
protected TupleStream fullStream;
protected List<String> leftHashOn;
protected List<String> rightHashOn;
protected HashMap<Integer, List<Tuple>> hashedTuples;
protected HashMap<String, List<Tuple>> hashedTuples;
protected Tuple workingFullTuple = null;
protected Integer workingFullHash = null;
protected String workingFullHash = null;
protected int workngHashSetIdx = 0;
public HashJoinStream(TupleStream fullStream, TupleStream hashStream, List<String> hashOn) throws IOException {
@ -199,7 +199,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
Tuple tuple = hashStream.read();
while(!tuple.EOF){
Integer hash = calculateHash(tuple, rightHashOn);
String hash = computeHash(tuple, rightHashOn);
if(null != hash){
if(hashedTuples.containsKey(hash)){
hashedTuples.get(hash).add(tuple);
@ -214,7 +214,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
}
}
protected Integer calculateHash(Tuple tuple, List<String> hashOn){
protected String computeHash(Tuple tuple, List<String> hashOn){
StringBuilder sb = new StringBuilder();
for(String part : hashOn){
Object obj = tuple.get(part);
@ -225,7 +225,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
sb.append("::"); // this is here to separate fields
}
return sb.toString().hashCode();
return sb.toString();
}
public void close() throws IOException {
@ -246,7 +246,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
// If fullTuple doesn't have a valid hash or if there is no doc to
// join with then retry loop - keep going until we find one
Integer fullHash = calculateHash(fullTuple, leftHashOn);
String fullHash = computeHash(fullTuple, leftHashOn);
if(null == fullHash || !hashedTuples.containsKey(fullHash)){
continue findNextWorkingFullTuple;
}

View File

@ -100,7 +100,7 @@ public class OuterHashJoinStream extends HashJoinStream implements Expressible {
// If fullTuple doesn't have a valid hash or the hash cannot be found in the hashedTuples then
// return the tuple from fullStream.
// This is an outer join so there is no requirement there be a matching value in the hashed stream
Integer fullHash = calculateHash(fullTuple, leftHashOn);
String fullHash = computeHash(fullTuple, leftHashOn);
if(null == fullHash || !hashedTuples.containsKey(fullHash)){
return fullTuple.clone();
}

View File

@ -1942,6 +1942,82 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
solrClientCache.close();
}
}
@Test
public void testHashJoinStreamWithKnownConflict() throws Exception {
new UpdateRequest()
.add(id, "1", "type_s","left", "bbid_s", "MG!!00TNH1", "ykey_s", "Mtge")
.add(id, "2", "type_s","right", "bbid_s", "MG!!00TNGP", "ykey_s", "Mtge")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
StreamContext streamContext = new StreamContext();
SolrClientCache solrClientCache = new SolrClientCache();
streamContext.setSolrClientCache(solrClientCache);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("hashJoin", HashJoinStream.class);
try {
// Basic test
expression = StreamExpressionParser.parse("hashJoin("
+ " search(collection1, q=*:*, fl=\"bbid_s,ykey_s\", fq=\"type_s:left\", sort=\"bbid_s asc, ykey_s asc\"),"
+ " hashed=search(collection1, q=*:*, fl=\"bbid_s,ykey_s\", fq=\"type_s:right\", sort=\"bbid_s asc, ykey_s asc\"),"
+ " on=\"bbid_s,ykey_s\""
+ ")");
stream = new HashJoinStream(expression, factory);
stream.setStreamContext(streamContext);
tuples = getTuples(stream);
assertEquals(0, tuples.size());
} finally {
solrClientCache.close();
}
}
@Test
public void testOuterHashJoinStreamWithKnownConflict() throws Exception {
new UpdateRequest()
.add(id, "1", "type_s","left", "bbid_s", "MG!!00TNH1", "ykey_s", "Mtge")
.add(id, "2", "type_s","right", "bbid_s", "MG!!00TNGP", "ykey_s", "Mtge", "extra_s", "foo")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
StreamContext streamContext = new StreamContext();
SolrClientCache solrClientCache = new SolrClientCache();
streamContext.setSolrClientCache(solrClientCache);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("outerHashJoin", OuterHashJoinStream.class);
try {
// Basic test
expression = StreamExpressionParser.parse("outerHashJoin("
+ " search(collection1, q=*:*, fl=\"bbid_s,ykey_s\", fq=\"type_s:left\", sort=\"bbid_s asc, ykey_s asc\"),"
+ " hashed=search(collection1, q=*:*, fl=\"bbid_s,ykey_s,extra_s\", fq=\"type_s:right\", sort=\"bbid_s asc, ykey_s asc\"),"
+ " on=\"bbid_s,ykey_s\""
+ ")");
stream = new OuterHashJoinStream(expression, factory);
stream.setStreamContext(streamContext);
tuples = getTuples(stream);
assertEquals(1, tuples.size());
assertFalse(tuples.get(0).fields.containsKey("extra_s"));
} finally {
solrClientCache.close();
}
}
@Test
public void testOuterHashJoinStream() throws Exception {