diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 980d20bab08..7187360da05 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -223,6 +223,8 @@ Bug Fixes * SOLR-12200: abandon OverseerExitThread when ZkController is closed. (Mikhail Khludnev) +* SOLR-12355: Fixes hash conflict in HashJoinStream and OuterHashJoinStream (Dennis Gove) + Optimizations ---------------------- diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java index ffba0ca2c35..5a534df33b7 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java @@ -50,10 +50,10 @@ public class HashJoinStream extends TupleStream implements Expressible { protected TupleStream fullStream; protected List leftHashOn; protected List rightHashOn; - protected HashMap> hashedTuples; + protected HashMap> hashedTuples; protected Tuple workingFullTuple = null; - protected Integer workingFullHash = null; + protected String workingFullHash = null; protected int workngHashSetIdx = 0; public HashJoinStream(TupleStream fullStream, TupleStream hashStream, List hashOn) throws IOException { @@ -199,7 +199,7 @@ public class HashJoinStream extends TupleStream implements Expressible { Tuple tuple = hashStream.read(); while(!tuple.EOF){ - Integer hash = calculateHash(tuple, rightHashOn); + String hash = computeHash(tuple, rightHashOn); if(null != hash){ if(hashedTuples.containsKey(hash)){ hashedTuples.get(hash).add(tuple); @@ -214,7 +214,7 @@ public class HashJoinStream extends TupleStream implements Expressible { } } - protected Integer calculateHash(Tuple tuple, List hashOn){ + protected String computeHash(Tuple tuple, List hashOn){ StringBuilder sb = new StringBuilder(); for(String part : hashOn){ Object obj = tuple.get(part); @@ -225,7 +225,7 @@ public class HashJoinStream extends TupleStream implements Expressible { sb.append("::"); // this is here to separate fields } - return sb.toString().hashCode(); + return sb.toString(); } public void close() throws IOException { @@ -246,7 +246,7 @@ public class HashJoinStream extends TupleStream implements Expressible { // If fullTuple doesn't have a valid hash or if there is no doc to // join with then retry loop - keep going until we find one - Integer fullHash = calculateHash(fullTuple, leftHashOn); + String fullHash = computeHash(fullTuple, leftHashOn); if(null == fullHash || !hashedTuples.containsKey(fullHash)){ continue findNextWorkingFullTuple; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java index 813dc79c31c..aaec1111581 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/OuterHashJoinStream.java @@ -100,7 +100,7 @@ public class OuterHashJoinStream extends HashJoinStream implements Expressible { // If fullTuple doesn't have a valid hash or the hash cannot be found in the hashedTuples then // return the tuple from fullStream. // This is an outer join so there is no requirement there be a matching value in the hashed stream - Integer fullHash = calculateHash(fullTuple, leftHashOn); + String fullHash = computeHash(fullTuple, leftHashOn); if(null == fullHash || !hashedTuples.containsKey(fullHash)){ return fullTuple.clone(); } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java index 2afc74f5cda..a2412dfa20e 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java @@ -1942,6 +1942,82 @@ public class StreamDecoratorTest extends SolrCloudTestCase { solrClientCache.close(); } } + + @Test + public void testHashJoinStreamWithKnownConflict() throws Exception { + + new UpdateRequest() + .add(id, "1", "type_s","left", "bbid_s", "MG!!00TNH1", "ykey_s", "Mtge") + .add(id, "2", "type_s","right", "bbid_s", "MG!!00TNGP", "ykey_s", "Mtge") + .commit(cluster.getSolrClient(), COLLECTIONORALIAS); + + StreamExpression expression; + TupleStream stream; + List tuples; + StreamContext streamContext = new StreamContext(); + SolrClientCache solrClientCache = new SolrClientCache(); + streamContext.setSolrClientCache(solrClientCache); + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("hashJoin", HashJoinStream.class); + try { + // Basic test + expression = StreamExpressionParser.parse("hashJoin(" + + " search(collection1, q=*:*, fl=\"bbid_s,ykey_s\", fq=\"type_s:left\", sort=\"bbid_s asc, ykey_s asc\")," + + " hashed=search(collection1, q=*:*, fl=\"bbid_s,ykey_s\", fq=\"type_s:right\", sort=\"bbid_s asc, ykey_s asc\")," + + " on=\"bbid_s,ykey_s\"" + + ")"); + stream = new HashJoinStream(expression, factory); + stream.setStreamContext(streamContext); + tuples = getTuples(stream); + + assertEquals(0, tuples.size()); + + + } finally { + solrClientCache.close(); + } + } + + @Test + public void testOuterHashJoinStreamWithKnownConflict() throws Exception { + + new UpdateRequest() + .add(id, "1", "type_s","left", "bbid_s", "MG!!00TNH1", "ykey_s", "Mtge") + .add(id, "2", "type_s","right", "bbid_s", "MG!!00TNGP", "ykey_s", "Mtge", "extra_s", "foo") + .commit(cluster.getSolrClient(), COLLECTIONORALIAS); + + StreamExpression expression; + TupleStream stream; + List tuples; + StreamContext streamContext = new StreamContext(); + SolrClientCache solrClientCache = new SolrClientCache(); + streamContext.setSolrClientCache(solrClientCache); + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("outerHashJoin", OuterHashJoinStream.class); + try { + // Basic test + expression = StreamExpressionParser.parse("outerHashJoin(" + + " search(collection1, q=*:*, fl=\"bbid_s,ykey_s\", fq=\"type_s:left\", sort=\"bbid_s asc, ykey_s asc\")," + + " hashed=search(collection1, q=*:*, fl=\"bbid_s,ykey_s,extra_s\", fq=\"type_s:right\", sort=\"bbid_s asc, ykey_s asc\")," + + " on=\"bbid_s,ykey_s\"" + + ")"); + stream = new OuterHashJoinStream(expression, factory); + stream.setStreamContext(streamContext); + tuples = getTuples(stream); + + assertEquals(1, tuples.size()); + assertFalse(tuples.get(0).fields.containsKey("extra_s")); + + } finally { + solrClientCache.close(); + } + } @Test public void testOuterHashJoinStream() throws Exception {