SOLR-10426: Add shuffle Streaming Expression

This commit is contained in:
Joel Bernstein 2017-04-05 17:57:11 -04:00
parent dbd22a6ada
commit 37b6c60548
4 changed files with 199 additions and 42 deletions

View File

@ -74,43 +74,7 @@ import org.apache.solr.client.solrj.io.ops.ConcatOperation;
import org.apache.solr.client.solrj.io.ops.DistinctOperation; import org.apache.solr.client.solrj.io.ops.DistinctOperation;
import org.apache.solr.client.solrj.io.ops.GroupOperation; import org.apache.solr.client.solrj.io.ops.GroupOperation;
import org.apache.solr.client.solrj.io.ops.ReplaceOperation; import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
import org.apache.solr.client.solrj.io.stream.CartesianProductStream; import org.apache.solr.client.solrj.io.stream.*;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.CommitStream;
import org.apache.solr.client.solrj.io.stream.ComplementStream;
import org.apache.solr.client.solrj.io.stream.DaemonStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.ExecutorStream;
import org.apache.solr.client.solrj.io.stream.FacetStream;
import org.apache.solr.client.solrj.io.stream.FeaturesSelectionStream;
import org.apache.solr.client.solrj.io.stream.FetchStream;
import org.apache.solr.client.solrj.io.stream.HashJoinStream;
import org.apache.solr.client.solrj.io.stream.HavingStream;
import org.apache.solr.client.solrj.io.stream.InnerJoinStream;
import org.apache.solr.client.solrj.io.stream.IntersectStream;
import org.apache.solr.client.solrj.io.stream.JDBCStream;
import org.apache.solr.client.solrj.io.stream.LeftOuterJoinStream;
import org.apache.solr.client.solrj.io.stream.MergeStream;
import org.apache.solr.client.solrj.io.stream.ModelStream;
import org.apache.solr.client.solrj.io.stream.NullStream;
import org.apache.solr.client.solrj.io.stream.OuterHashJoinStream;
import org.apache.solr.client.solrj.io.stream.ParallelStream;
import org.apache.solr.client.solrj.io.stream.PriorityStream;
import org.apache.solr.client.solrj.io.stream.RandomStream;
import org.apache.solr.client.solrj.io.stream.RankStream;
import org.apache.solr.client.solrj.io.stream.ReducerStream;
import org.apache.solr.client.solrj.io.stream.RollupStream;
import org.apache.solr.client.solrj.io.stream.ScoreNodesStream;
import org.apache.solr.client.solrj.io.stream.SelectStream;
import org.apache.solr.client.solrj.io.stream.SignificantTermsStream;
import org.apache.solr.client.solrj.io.stream.SortStream;
import org.apache.solr.client.solrj.io.stream.StatsStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TextLogitStream;
import org.apache.solr.client.solrj.io.stream.TopicStream;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.UniqueStream;
import org.apache.solr.client.solrj.io.stream.UpdateStream;
import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.Expressible;
@ -223,6 +187,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("priority", PriorityStream.class) .withFunctionName("priority", PriorityStream.class)
.withFunctionName("significantTerms", SignificantTermsStream.class) .withFunctionName("significantTerms", SignificantTermsStream.class)
.withFunctionName("cartesianProduct", CartesianProductStream.class) .withFunctionName("cartesianProduct", CartesianProductStream.class)
.withFunctionName("shuffle", ShuffleStream.class)
// metrics // metrics
.withFunctionName("min", MinMetric.class) .withFunctionName("min", MinMetric.class)

View File

@ -80,7 +80,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
protected String zkHost; protected String zkHost;
protected String collection; protected String collection;
protected SolrParams params; protected SolrParams params;
private Map<String, String> fieldMappings; protected Map<String, String> fieldMappings;
protected StreamComparator comp; protected StreamComparator comp;
private boolean trace; private boolean trace;
protected transient Map<String, Tuple> eofTuples; protected transient Map<String, Tuple> eofTuples;
@ -191,7 +191,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
// functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."]) // functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."])
// function name // function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); StreamExpression expression = new StreamExpression(factory.getFunctionName(getClass()));
// collection // collection
expression.addParameter(collection); expression.addParameter(collection);
@ -254,7 +254,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
return explanation; return explanation;
} }
private void init(String collectionName, String zkHost, SolrParams params) throws IOException { protected void init(String collectionName, String zkHost, SolrParams params) throws IOException {
this.zkHost = zkHost; this.zkHost = zkHost;
this.collection = collectionName; this.collection = collectionName;
this.params = new ModifiableSolrParams(params); this.params = new ModifiableSolrParams(params);
@ -406,6 +406,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true); Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
ModifiableSolrParams mParams = new ModifiableSolrParams(params); ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams = adjustParams(mParams);
mParams.set(DISTRIB, "false"); // We are the aggregator. mParams.set(DISTRIB, "false"); // We are the aggregator.
Set<String> liveNodes = clusterState.getLiveNodes(); Set<String> liveNodes = clusterState.getLiveNodes();
@ -571,4 +572,8 @@ public class CloudSolrStream extends TupleStream implements Expressible {
} }
} }
} }
protected ModifiableSolrParams adjustParams(ModifiableSolrParams params) {
return params;
}
} }

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
public class ShuffleStream extends CloudSolrStream implements Expressible {
public ShuffleStream(StreamExpression expression, StreamFactory factory) throws IOException {
// grab all parameters out
String collectionName = factory.getValueOperand(expression, 0);
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
StreamExpressionNamedParameter aliasExpression = factory.getNamedOperand(expression, "aliases");
StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
// Collection Name
if(null == collectionName){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
}
// Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice
if(expression.getParameters().size() != 1 + namedParams.size()){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found",expression));
}
// Named parameters - passed directly to solr as solrparams
if(0 == namedParams.size()){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
}
ModifiableSolrParams mParams = new ModifiableSolrParams();
for(StreamExpressionNamedParameter namedParam : namedParams){
if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("aliases")){
mParams.add(namedParam.getName(), namedParam.getParameter().toString().trim());
}
}
// Aliases, optional, if provided then need to split
if(null != aliasExpression && aliasExpression.getParameter() instanceof StreamExpressionValue){
fieldMappings = new HashMap<>();
for(String mapping : ((StreamExpressionValue)aliasExpression.getParameter()).getValue().split(",")){
String[] parts = mapping.trim().split("=");
if(2 == parts.length){
fieldMappings.put(parts[0], parts[1]);
}
else{
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - alias expected of the format origName=newName",expression));
}
}
}
// zkHost, optional - if not provided then will look into factory list to get
String zkHost = null;
if(null == zkHostExpression){
zkHost = factory.getCollectionZkHost(collectionName);
if(zkHost == null) {
zkHost = factory.getDefaultZkHost();
}
}
else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
}
if(null == zkHost){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
}
// We've got all the required items
init(collectionName, zkHost, mParams);
}
public ModifiableSolrParams adjustParams(ModifiableSolrParams mParams) {
mParams.set(CommonParams.QT, "/export");
return mParams;
}
}

View File

@ -1096,7 +1096,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertTrue("blah blah blah 9".equals(t.getString("subject"))); assertTrue("blah blah blah 9".equals(t.getString("subject")));
//Change the batch size //Change the batch size
stream = factory.constructStream("fetch("+ COLLECTIONORALIAS +", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\")"); stream = factory.constructStream("fetch(" + COLLECTIONORALIAS + ", search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\")");
context = new StreamContext(); context = new StreamContext();
context.setSolrClientCache(solrClientCache); context.setSolrClientCache(solrClientCache);
stream.setStreamContext(context); stream.setStreamContext(context);
@ -1602,6 +1602,90 @@ public class StreamExpressionTest extends SolrCloudTestCase {
} }
@Test
public void testParallelShuffleStream() throws Exception {
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.add(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1")
.add(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5")
.add(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5")
.add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "9", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "10", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "11", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "12", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "13", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "14", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "15", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "16", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "17", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "18", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "19", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "20", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "21", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "22", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "23", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "24", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "25", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "26", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "27", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "28", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "29", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "30", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "31", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "32", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "33", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "34", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "35", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "36", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "37", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "38", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "39", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "40", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "41", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "42", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "43", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "44", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "45", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "46", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "47", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "48", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "49", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "50", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "51", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "52", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "53", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "54", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "55", "a_s", "hello1", "a_i", "13", "a_f", "4")
.add(id, "56", "a_s", "hello1", "a_i", "13", "a_f", "1000")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String zkHost = cluster.getZkServer().getZkAddress();
StreamFactory streamFactory = new StreamFactory().withCollectionZkHost(COLLECTIONORALIAS, zkHost)
.withFunctionName("shuffle", ShuffleStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("parallel", ParallelStream.class);
ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(" + COLLECTIONORALIAS + ", unique(shuffle(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\"), over=\"a_f\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_f asc\")");
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 6);
assertOrder(tuples, 0, 1, 3, 4, 6, 56);
//Test the eofTuples
Map<String,Tuple> eofTuples = pstream.getEofTuples();
assert(eofTuples.size() == 2); //There should be an EOF tuple for each worker.
assert(pstream.toExpression(streamFactory).toString().contains("shuffle"));
}
@Test @Test
public void testParallelReducerStream() throws Exception { public void testParallelReducerStream() throws Exception {