This commit is contained in:
Karl Wright 2016-05-06 07:40:52 -04:00
commit a531323b19
6 changed files with 92 additions and 26 deletions

View File

@ -124,6 +124,8 @@ New Features
* SOLR-9049: RuleBasedAuthorizationPlugin supports regex in param values eg: "command" : "REGEX:(i?)create" (noble)
* SOLR-8972: Add GraphHandler and GraphMLResponseWriter to support graph visualizations (Joel Bernstein)
Bug Fixes
----------------------
@ -187,6 +189,9 @@ Bug Fixes
* SOLR-9036: Solr slave is doing full replication (entire index) of index after master restart.
(Lior Sapir, Mark Miller, shalin)
* SOLR-9058: Makes HashJoinStream and OuterHashJoinStream support different field names in the
incoming streams, eg. fieldA=fieldB. (Dennis Gove, Stephan Osthold)
Optimizations
----------------------
* SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.

View File

@ -593,6 +593,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
* the index hasn't changed. See SOLR-9036
*/
@Test
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-9036")
public void doTestIndexFetchOnMasterRestart() throws Exception {
useFactory(null);
try {

View File

@ -618,14 +618,14 @@ public class CloudSolrClient extends SolrClient {
return null;
}
NamedList<Throwable> exceptions = new NamedList<>();
NamedList<NamedList> shardResponses = new NamedList<>();
Map<String, LBHttpSolrClient.Req> routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField);
if (routes == null) {
return null;
}
final NamedList<Throwable> exceptions = new NamedList<>();
final NamedList<NamedList> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery
long start = System.nanoTime();
if (parallelUpdates) {

View File

@ -47,7 +47,8 @@ public class HashJoinStream extends TupleStream implements Expressible {
protected TupleStream hashStream;
protected TupleStream fullStream;
protected List<String> hashOn;
protected List<String> leftHashOn;
protected List<String> rightHashOn;
protected HashMap<Integer, List<Tuple>> hashedTuples;
protected Tuple workingFullTuple = null;
@ -97,8 +98,25 @@ public class HashJoinStream extends TupleStream implements Expressible {
private void init(TupleStream fullStream, TupleStream hashStream, List<String> hashOn) throws IOException {
this.fullStream = fullStream;
this.hashStream = hashStream;
this.hashOn = hashOn;
this.hashedTuples = new HashMap<>();
this.leftHashOn = new ArrayList<>();
this.rightHashOn = new ArrayList<>();
for(String hasher : hashOn){
String[] parts = hasher.split("=");
if(1 == parts.length){
String field = parts[0].trim();
leftHashOn.add(field);
rightHashOn.add(field);
}
else if(2 == parts.length){
leftHashOn.add(parts[0].trim());
rightHashOn.add(parts[1].trim());
}
else{
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - invalid 'on' parameter - expecting 1 or more instances if 'field' or 'field=hashedField' but found '%s'",hasher));
}
}
}
@Override
@ -127,12 +145,24 @@ public class HashJoinStream extends TupleStream implements Expressible {
// on
StringBuilder sb = new StringBuilder();
for(String part : hashOn){
for(int idx = 0; idx < leftHashOn.size(); ++idx){
if(sb.length() > 0){ sb.append(","); }
sb.append(part);
// we know that left and right hashOns are the same size
String left = leftHashOn.get(idx);
String right = rightHashOn.get(idx);
if(left.equals(right)){
sb.append(left);
}
else{
sb.append(left);
sb.append("=");
sb.append(right);
}
}
expression.addParameter(new StreamExpressionNamedParameter("on",sb.toString()));
expression.addParameter(new StreamExpressionNamedParameter("on",sb.toString()));
return expression;
}
@ -168,7 +198,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
Tuple tuple = hashStream.read();
while(!tuple.EOF){
Integer hash = calculateHash(tuple);
Integer hash = calculateHash(tuple, rightHashOn);
if(null != hash){
if(hashedTuples.containsKey(hash)){
hashedTuples.get(hash).add(tuple);
@ -183,7 +213,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
}
}
protected Integer calculateHash(Tuple tuple){
protected Integer calculateHash(Tuple tuple, List<String> hashOn){
StringBuilder sb = new StringBuilder();
for(String part : hashOn){
Object obj = tuple.get(part);
@ -191,7 +221,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
return null;
}
sb.append(obj.toString());
sb.append("::"); // this is here to seperate fields
sb.append("::"); // this is here to separate fields
}
return sb.toString().hashCode();
@ -215,7 +245,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
// If fullTuple doesn't have a valid hash or if there is no doc to
// join with then retry loop - keep going until we find one
Integer fullHash = calculateHash(fullTuple);
Integer fullHash = calculateHash(fullTuple, leftHashOn);
if(null == fullHash || !hashedTuples.containsKey(fullHash)){
continue findNextWorkingFullTuple;
}

View File

@ -65,9 +65,21 @@ public class OuterHashJoinStream extends HashJoinStream implements Expressible {
// on
StringBuilder sb = new StringBuilder();
for(String part : hashOn){
for(int idx = 0; idx < leftHashOn.size(); ++idx){
if(sb.length() > 0){ sb.append(","); }
sb.append(part);
// we know that left and right hashOns are the same size
String left = leftHashOn.get(idx);
String right = rightHashOn.get(idx);
if(left.equals(right)){
sb.append(left);
}
else{
sb.append(left);
sb.append("=");
sb.append(right);
}
}
expression.addParameter(new StreamExpressionNamedParameter("on",sb.toString()));
@ -87,7 +99,7 @@ public class OuterHashJoinStream extends HashJoinStream implements Expressible {
// If fullTuple doesn't have a valid hash or the hash cannot be found in the hashedTuples then
// return the tuple from fullStream.
// This is an outer join so there is no requirement there be a matching value in the hashed stream
Integer fullHash = calculateHash(fullTuple);
Integer fullHash = calculateHash(fullTuple, leftHashOn);
if(null == fullHash || !hashedTuples.containsKey(fullHash)){
return fullTuple.clone();
}

View File

@ -482,13 +482,12 @@ public class StreamExpressionTest extends SolrCloudTestCase {
@Test
public void testRandomStream() throws Exception {
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTION);
UpdateRequest update = new UpdateRequest();
for(int idx = 0; idx < 1000; ++idx){
String idxString = new Integer(idx).toString();
update.add(id,idxString, "a_s", "hello" + idxString, "a_i", idxString, "a_f", idxString);
}
update.commit(cluster.getSolrClient(), COLLECTION);
StreamExpression expression;
TupleStream stream;
@ -503,17 +502,17 @@ public class StreamExpressionTest extends SolrCloudTestCase {
try {
context.setSolrClientCache(cache);
expression = StreamExpressionParser.parse("random(" + COLLECTION + ", q=\"*:*\", rows=\"10\", fl=\"id, a_i\")");
expression = StreamExpressionParser.parse("random(" + COLLECTION + ", q=\"*:*\", rows=\"1000\", fl=\"id, a_i\")");
stream = factory.constructStream(expression);
stream.setStreamContext(context);
List<Tuple> tuples1 = getTuples(stream);
assert (tuples1.size() == 5);
assert (tuples1.size() == 1000);
expression = StreamExpressionParser.parse("random(" + COLLECTION + ", q=\"*:*\", rows=\"10\", fl=\"id, a_i\")");
expression = StreamExpressionParser.parse("random(" + COLLECTION + ", q=\"*:*\", rows=\"1000\", fl=\"id, a_i\")");
stream = factory.constructStream(expression);
stream.setStreamContext(context);
List<Tuple> tuples2 = getTuples(stream);
assert (tuples2.size() == 5);
assert (tuples2.size() == 1000);
boolean different = false;
for (int i = 0; i < tuples1.size(); i++) {
@ -1453,6 +1452,16 @@ public class StreamExpressionTest extends SolrCloudTestCase {
stream = new HashJoinStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 0);
// Basic test with "on" mapping
expression = StreamExpressionParser.parse("hashJoin("
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join3_i,ident_s\", sort=\"join1_i asc, join3_i asc, id asc\"),"
+ "hashed=search(collection1, q=\"side_s:right\", fl=\"join1_i,join3_i,ident_s\", sort=\"join1_i asc, join3_i asc\"),"
+ "on=\"join1_i=join3_i\")");
stream = new HashJoinStream(expression, factory);
tuples = getTuples(stream);
assertEquals(17, tuples.size());
assertOrder(tuples, 1, 1, 2, 2, 15, 15, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 7);
}
@ -1517,6 +1526,15 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assert(tuples.size() == 8);
assertOrder(tuples, 1,15,2,3,4,5,6,7);
// Basic test
expression = StreamExpressionParser.parse("outerHashJoin("
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\"),"
+ "hashed=search(collection1, q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join2_s asc\"),"
+ "on=\"join1_i=join3_i, join2_s\")");
stream = new OuterHashJoinStream(expression, factory);
tuples = getTuples(stream);
assert(tuples.size() == 10);
assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7);
}
@Test