SOLR-8467: CloudSolrStream and FacetStream should take a SolrParams object rather than a Map<String, String> to allow more complex Solr queries to be specified

This commit is contained in:
Erick Erickson 2016-05-09 12:37:32 -07:00
parent 7571e747c3
commit f4359ff8ff
16 changed files with 717 additions and 670 deletions

View File

@ -273,6 +273,9 @@ Other Changes
* SOLR-8458: Add Streaming Expressions tests for parameter substitution (Joel Bernstein, Cao Manh Dat, Dennis Gove, Kevin Risden)
* SOLR-8467: CloudSolrStream and FacetStream should take a SolrParams object rather than a
Map<String, String> to allow more complex Solr queries to be specified. (Erick Erickson)
================== 6.0.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release

View File

@ -144,9 +144,8 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
}
private SolrParams adjustParams(SolrParams params) {
ModifiableSolrParams adjustedParams = new ModifiableSolrParams();
adjustedParams.add(params);
adjustedParams.add(CommonParams.OMIT_HEADER, "true");
ModifiableSolrParams adjustedParams = new ModifiableSolrParams(params);
adjustedParams.set(CommonParams.OMIT_HEADER, "true");
return adjustedParams;
}
@ -230,18 +229,18 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
String zkHost = tableSpec.zkHost;
String collection = tableSpec.collection;
Map<String, String> params = new HashMap();
ModifiableSolrParams params = new ModifiableSolrParams();
params.put(CommonParams.FL, fl);
params.put(CommonParams.Q, sqlVisitor.query);
params.set(CommonParams.FL, fl);
params.set(CommonParams.Q, sqlVisitor.query);
//Always use the /export handler for Group By Queries because it requires exporting full result sets.
params.put(CommonParams.QT, "/export");
params.set(CommonParams.QT, "/export");
if(numWorkers > 1) {
params.put("partitionKeys", getPartitionKeys(buckets));
params.set("partitionKeys", getPartitionKeys(buckets));
}
params.put("sort", sort);
params.set("sort", sort);
TupleStream tupleStream = null;
@ -370,18 +369,18 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
String zkHost = tableSpec.zkHost;
String collection = tableSpec.collection;
Map<String, String> params = new HashMap();
ModifiableSolrParams params = new ModifiableSolrParams();
params.put(CommonParams.FL, fl);
params.put(CommonParams.Q, sqlVisitor.query);
params.set(CommonParams.FL, fl);
params.set(CommonParams.Q, sqlVisitor.query);
//Always use the /export handler for Distinct Queries because it requires exporting full result sets.
params.put(CommonParams.QT, "/export");
params.set(CommonParams.QT, "/export");
if(numWorkers > 1) {
params.put("partitionKeys", getPartitionKeys(buckets));
params.set("partitionKeys", getPartitionKeys(buckets));
}
params.put("sort", sort);
params.set("sort", sort);
TupleStream tupleStream = null;
@ -463,9 +462,9 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
String zkHost = tableSpec.zkHost;
String collection = tableSpec.collection;
Map<String, String> params = new HashMap();
ModifiableSolrParams params = new ModifiableSolrParams();
params.put(CommonParams.Q, sqlVisitor.query);
params.set(CommonParams.Q, sqlVisitor.query);
int limit = sqlVisitor.limit > 0 ? sqlVisitor.limit : 100;
@ -512,9 +511,9 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
String zkHost = tableSpec.zkHost;
String collection = tableSpec.collection;
Map<String, String> params = new HashMap();
ModifiableSolrParams params = new ModifiableSolrParams();
params.put(CommonParams.Q, sqlVisitor.query);
params.set(CommonParams.Q, sqlVisitor.query);
int limit = sqlVisitor.limit > 0 ? sqlVisitor.limit : 100;
@ -628,22 +627,22 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
}
}
Map<String, String> params = new HashMap();
params.put("fl", fl.toString());
params.put("q", sqlVisitor.query);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("fl", fl.toString());
params.set("q", sqlVisitor.query);
if(siBuf.length() > 0) {
params.put("sort", siBuf.toString());
params.set("sort", siBuf.toString());
}
TupleStream tupleStream;
if(sqlVisitor.limit > -1) {
params.put("rows", Integer.toString(sqlVisitor.limit));
params.set("rows", Integer.toString(sqlVisitor.limit));
tupleStream = new LimitStream(new CloudSolrStream(zkHost, collection, params), sqlVisitor.limit);
} else {
//Only use the export handler when no limit is specified.
params.put(CommonParams.QT, "/export");
params.set(CommonParams.QT, "/export");
tupleStream = new CloudSolrStream(zkHost, collection, params);
}
@ -681,9 +680,9 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
String zkHost = tableSpec.zkHost;
String collection = tableSpec.collection;
Map<String, String> params = new HashMap();
ModifiableSolrParams params = new ModifiableSolrParams();
params.put(CommonParams.Q, sqlVisitor.query);
params.set(CommonParams.Q, sqlVisitor.query);
TupleStream tupleStream = new StatsStream(zkHost,
collection,

View File

@ -45,7 +45,10 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParamete
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
public class GatherNodesStream extends TupleStream implements Expressible {
@ -404,7 +407,7 @@ public class GatherNodesStream extends TupleStream implements Expressible {
public List<Tuple> call() {
Map joinParams = new HashMap();
Set<String> flSet = new HashSet();
flSet.add(gather);
flSet.add(traverseTo);
@ -436,10 +439,10 @@ public class GatherNodesStream extends TupleStream implements Expressible {
}
}
joinParams.putAll(queryParams);
joinParams.put("fl", buf.toString());
joinParams.put("qt", "/export");
joinParams.put("sort", gather + " asc,"+traverseTo +" asc");
ModifiableSolrParams joinSParams = new ModifiableSolrParams(SolrParams.toMultiMap(new NamedList(queryParams)));
joinSParams.set("fl", buf.toString());
joinSParams.set("qt", "/export");
joinSParams.set("sort", gather + " asc,"+traverseTo +" asc");
StringBuffer nodeQuery = new StringBuffer();
@ -454,14 +457,14 @@ public class GatherNodesStream extends TupleStream implements Expressible {
if(maxDocFreq > -1) {
String docFreqParam = " maxDocFreq="+maxDocFreq;
joinParams.put("q", "{!graphTerms f=" + traverseTo + docFreqParam + "}" + nodeQuery.toString());
joinSParams.set("q", "{!graphTerms f=" + traverseTo + docFreqParam + "}" + nodeQuery.toString());
} else {
joinParams.put("q", "{!terms f=" + traverseTo+"}" + nodeQuery.toString());
joinSParams.set("q", "{!terms f=" + traverseTo+"}" + nodeQuery.toString());
}
TupleStream stream = null;
try {
stream = new UniqueStream(new CloudSolrStream(zkHost, collection, joinParams), new MultipleFieldEqualitor(new FieldEqualitor(gather), new FieldEqualitor(traverseTo)));
stream = new UniqueStream(new CloudSolrStream(zkHost, collection, joinSParams), new MultipleFieldEqualitor(new FieldEqualitor(gather), new FieldEqualitor(traverseTo)));
stream.setStreamContext(streamContext);
stream.open();
BATCH:

View File

@ -46,6 +46,9 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
@ -65,8 +68,9 @@ public class ShortestPathStream extends TupleStream implements Expressible {
private boolean found;
private StreamContext streamContext;
private int threads;
private Map<String,String> queryParams;
private SolrParams queryParams;
@Deprecated
public ShortestPathStream(String zkHost,
String collection,
String fromNode,
@ -78,6 +82,29 @@ public class ShortestPathStream extends TupleStream implements Expressible {
int threads,
int maxDepth) {
init(zkHost,
collection,
fromNode,
toNode,
fromField,
toField,
new MapSolrParams(queryParams),
joinBatchSize,
threads,
maxDepth);
}
public ShortestPathStream(String zkHost,
String collection,
String fromNode,
String toNode,
String fromField,
String toField,
SolrParams queryParams,
int joinBatchSize,
int threads,
int maxDepth) {
init(zkHost,
collection,
fromNode,
@ -162,7 +189,7 @@ public class ShortestPathStream extends TupleStream implements Expressible {
maxDepth = Integer.parseInt(((StreamExpressionValue) depthExpression.getParameter()).getValue());
}
Map<String,String> params = new HashMap<String,String>();
ModifiableSolrParams params = new ModifiableSolrParams();
for(StreamExpressionNamedParameter namedParam : namedParams){
if(!namedParam.getName().equals("zkHost") &&
!namedParam.getName().equals("to") &&
@ -172,7 +199,7 @@ public class ShortestPathStream extends TupleStream implements Expressible {
!namedParam.getName().equals("threads") &&
!namedParam.getName().equals("partitionSize"))
{
params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
params.set(namedParam.getName(), namedParam.getParameter().toString().trim());
}
}
@ -201,7 +228,7 @@ public class ShortestPathStream extends TupleStream implements Expressible {
String toNode,
String fromField,
String toField,
Map queryParams,
SolrParams queryParams,
int joinBatchSize,
int threads,
int maxDepth) {
@ -225,10 +252,10 @@ public class ShortestPathStream extends TupleStream implements Expressible {
// collection
expression.addParameter(collection);
Set<Map.Entry<String,String>> entries = queryParams.entrySet();
// parameters
for(Map.Entry param : entries){
String value = param.getValue().toString();
ModifiableSolrParams mParams = new ModifiableSolrParams(queryParams);
for(Map.Entry<String, String[]> param : mParams.getMap().entrySet()){
String value = String.join(",", param.getValue());
// SOLR-8409: This is a special case where the params contain a " character
// Do note that in any other BASE streams with parameters where a " might come into play
@ -263,7 +290,8 @@ public class ShortestPathStream extends TupleStream implements Expressible {
child.setFunctionName("solr (graph)");
child.setImplementingClass("Solr/Lucene");
child.setExpressionType(ExpressionType.DATASTORE);
child.setExpression(queryParams.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
ModifiableSolrParams mParams = new ModifiableSolrParams(queryParams);
child.setExpression(mParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
explanation.addChild(child);
return explanation;
@ -417,13 +445,12 @@ public class ShortestPathStream extends TupleStream implements Expressible {
public List<Edge> call() {
Map joinParams = new HashMap();
ModifiableSolrParams joinParams = new ModifiableSolrParams(queryParams);
String fl = fromField + "," + toField;
joinParams.putAll(queryParams);
joinParams.put("fl", fl);
joinParams.put("qt", "/export");
joinParams.put("sort", toField + " asc,"+fromField +" asc");
joinParams.set("fl", fl);
joinParams.set("qt", "/export");
joinParams.set("sort", toField + " asc,"+fromField +" asc");
StringBuffer nodeQuery = new StringBuffer();
@ -433,7 +460,7 @@ public class ShortestPathStream extends TupleStream implements Expressible {
String q = fromField + ":(" + nodeQuery.toString().trim() + ")";
joinParams.put("q", q);
joinParams.set("q", q);
TupleStream stream = null;
try {
stream = new UniqueStream(new CloudSolrStream(zkHost, collection, joinParams), new MultipleFieldEqualitor(new FieldEqualitor(toField), new FieldEqualitor(fromField)));

View File

@ -26,8 +26,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Random;
import org.apache.solr.client.solrj.io.stream.SolrStream;
@ -37,6 +35,7 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
class StatementImpl implements Statement {
@ -96,11 +95,11 @@ class StatementImpl implements Statement {
Collections.shuffle(shuffler, new Random());
Map<String, String> params = new HashMap<>();
params.put(CommonParams.QT, "/sql");
params.put("stmt", sql);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/sql");
params.set("stmt", sql);
for(String propertyName : this.connection.getProperties().stringPropertyNames()) {
params.put(propertyName, this.connection.getProperties().getProperty(propertyName));
params.set(propertyName, this.connection.getProperties().getProperty(propertyName));
}
Replica rep = shuffler.get(0);

View File

@ -36,7 +36,6 @@ import java.util.stream.Collectors;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
@ -56,6 +55,9 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
@ -72,16 +74,11 @@ public class CloudSolrStream extends TupleStream implements Expressible {
protected String zkHost;
protected String collection;
protected Map<String,String> params;
protected SolrParams params;
private Map<String, String> fieldMappings;
protected StreamComparator comp;
private int zkConnectTimeout = 10000;
private int zkClientTimeout = 10000;
private int numWorkers;
private int workerID;
private boolean trace;
protected transient Map<String, Tuple> eofTuples;
protected transient SolrClientCache cache;
protected transient CloudSolrClient cloudSolrClient;
protected transient List<TupleStream> solrStreams;
protected transient TreeSet<TupleWrapper> tuples;
@ -91,7 +88,34 @@ public class CloudSolrStream extends TupleStream implements Expressible {
protected CloudSolrStream(){
}
/**
* @param zkHost Zookeeper ensemble connection string
* @param collectionName Name of the collection to operate on
* @param params Map&lt;String, String&gt; of parameter/value pairs
* @throws IOException Something went wrong
* <p>
* This form does not allow specifying multiple clauses, say "fq" clauses, use the form that
* takes a SolrParams. Transition code can call the preferred method that takes SolrParams
* by calling CloudSolrStream(zkHost, collectionName,
* new ModifiableSolrParams(SolrParams.toMultiMap(new NamedList(Map&lt;String, String&gt;)));
* @deprecated Use the constructor that has a SolrParams obj rather than a Map
*/
@Deprecated
public CloudSolrStream(String zkHost, String collectionName, Map params) throws IOException {
init(collectionName, zkHost, new MapSolrParams(params));
}
/**
* @param zkHost Zookeeper ensemble connection string
* @param collectionName Name of the collection to operate on
* @param params Map&lt;String, String[]&gt; of parameter/value pairs
* @throws IOException Something went wrong
*/
public CloudSolrStream(String zkHost, String collectionName, SolrParams params) throws IOException {
init(collectionName, zkHost, params);
}
@ -117,16 +141,16 @@ public class CloudSolrStream extends TupleStream implements Expressible {
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
}
Map<String,String> params = new HashMap<String,String>();
ModifiableSolrParams mParams = new ModifiableSolrParams();
for(StreamExpressionNamedParameter namedParam : namedParams){
if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("aliases")){
params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
mParams.add(namedParam.getName(), namedParam.getParameter().toString().trim());
}
}
// Aliases, optional, if provided then need to split
if(null != aliasExpression && aliasExpression.getParameter() instanceof StreamExpressionValue){
fieldMappings = new HashMap<String,String>();
fieldMappings = new HashMap<>();
for(String mapping : ((StreamExpressionValue)aliasExpression.getParameter()).getValue().split(",")){
String[] parts = mapping.trim().split("=");
if(2 == parts.length){
@ -154,7 +178,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
}
// We've got all the required items
init(collectionName, zkHost, params);
init(collectionName, zkHost, mParams);
}
@Override
@ -168,8 +192,10 @@ public class CloudSolrStream extends TupleStream implements Expressible {
expression.addParameter(collection);
// parameters
for(Entry<String,String> param : params.entrySet()){
String value = param.getValue();
ModifiableSolrParams mParams = new ModifiableSolrParams(SolrParams.toMultiMap(params.toNamedList()));
for (Entry<String, String[]> param : mParams.getMap().entrySet()) {
String value = String.join(",", param.getValue());
// SOLR-8409: This is a special case where the params contain a " character
// Do note that in any other BASE streams with parameters where a " might come into play
@ -213,29 +239,34 @@ public class CloudSolrStream extends TupleStream implements Expressible {
child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
child.setImplementingClass("Solr/Lucene");
child.setExpressionType(ExpressionType.DATASTORE);
if(null != params){
child.setExpression(params.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
child.setExpression(mParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
}
explanation.addChild(child);
return explanation;
}
private void init(String collectionName, String zkHost, Map params) throws IOException {
private void init(String collectionName, String zkHost, SolrParams params) throws IOException {
this.zkHost = zkHost;
this.collection = collectionName;
this.params = params;
this.params = new ModifiableSolrParams(params);
// If the comparator is null then it was not explicitly set so we will create one using the sort parameter
// of the query. While doing this we will also take into account any aliases such that if we are sorting on
// fieldA but fieldA is aliased to alias.fieldA then the comparater will be against alias.fieldA.
if(!params.containsKey("fl")){
String fls = String.join(",", params.getParams("fl"));
if (fls == null) {
throw new IOException("fl param expected for a stream");
}
if(!params.containsKey("sort")){
String sorts = String.join(",", params.getParams("sort"));
if (sorts == null) {
throw new IOException("sort param expected for a stream");
}
this.comp = parseComp((String)params.get("sort"), (String)params.get("fl"));
this.comp = parseComp(sorts, fls);
}
public void setFieldMappings(Map<String, String> fieldMappings) {
@ -247,9 +278,6 @@ public class CloudSolrStream extends TupleStream implements Expressible {
}
public void setStreamContext(StreamContext context) {
this.numWorkers = context.numWorkers;
this.workerID = context.workerID;
this.cache = context.getSolrClientCache();
this.streamContext = context;
}
@ -261,8 +289,8 @@ public class CloudSolrStream extends TupleStream implements Expressible {
this.tuples = new TreeSet();
this.solrStreams = new ArrayList();
this.eofTuples = Collections.synchronizedMap(new HashMap());
if(this.cache != null) {
this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
if (this.streamContext != null && this.streamContext.getSolrClientCache() != null) {
this.cloudSolrClient = this.streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
} else {
this.cloudSolrClient = new Builder()
.withZkHost(zkHost)
@ -345,7 +373,8 @@ public class CloudSolrStream extends TupleStream implements Expressible {
}
}
params.put("distrib","false"); // We are the aggregator.
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams.set("distrib", "false"); // We are the aggregator.
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
@ -359,7 +388,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
Replica rep = shuffler.get(0);
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
String url = zkProps.getCoreUrl();
SolrStream solrStream = new SolrStream(url, params);
SolrStream solrStream = new SolrStream(url, mParams);
if(streamContext != null) {
solrStream.setStreamContext(streamContext);
}
@ -406,7 +435,9 @@ public class CloudSolrStream extends TupleStream implements Expressible {
}
}
if(cache == null && cloudSolrClient != null) {
if ((this.streamContext == null || this.streamContext.getSolrClientCache() == null) &&
cloudSolrClient != null) {
cloudSolrClient.close();
}
}

View File

@ -46,7 +46,9 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
/**
@ -65,11 +67,16 @@ public class FacetStream extends TupleStream implements Expressible {
private List<Tuple> tuples = new ArrayList<Tuple>();
private int index;
private String zkHost;
private Map<String, String> props;
private SolrParams params;
private String collection;
protected transient SolrClientCache cache;
protected transient CloudSolrClient cloudSolrClient;
/*
*
* @deprecated. Use the form that takes a SolrParams rather than Map&ltString, String&gt;
*/
@Deprecated
public FacetStream(String zkHost,
String collection,
Map<String, String> props,
@ -77,7 +84,17 @@ public class FacetStream extends TupleStream implements Expressible {
Metric[] metrics,
FieldComparator[] bucketSorts,
int bucketSizeLimit) throws IOException {
init(collection, props, buckets, bucketSorts, metrics, bucketSizeLimit, zkHost);
init(collection, new MapSolrParams(props), buckets, bucketSorts, metrics, bucketSizeLimit, zkHost);
}
public FacetStream(String zkHost,
String collection,
SolrParams params,
Bucket[] buckets,
Metric[] metrics,
FieldComparator[] bucketSorts,
int bucketSizeLimit) throws IOException {
init(collection, params, buckets, bucketSorts, metrics, bucketSizeLimit, zkHost);
}
public FacetStream(StreamExpression expression, StreamFactory factory) throws IOException{
@ -106,10 +123,10 @@ public class FacetStream extends TupleStream implements Expressible {
}
// pull out known named params
Map<String,String> params = new HashMap<String,String>();
ModifiableSolrParams params = new ModifiableSolrParams();
for(StreamExpressionNamedParameter namedParam : namedParams){
if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("buckets") && !namedParam.getName().equals("bucketSorts") && !namedParam.getName().equals("limit")){
params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
params.add(namedParam.getName(), namedParam.getParameter().toString().trim());
}
}
@ -205,9 +222,9 @@ public class FacetStream extends TupleStream implements Expressible {
return comps;
}
private void init(String collection, Map<String, String> props, Bucket[] buckets, FieldComparator[] bucketSorts, Metric[] metrics, int bucketSizeLimit, String zkHost) throws IOException {
private void init(String collection, SolrParams params, Bucket[] buckets, FieldComparator[] bucketSorts, Metric[] metrics, int bucketSizeLimit, String zkHost) throws IOException {
this.zkHost = zkHost;
this.props = props;
this.params = params;
this.buckets = buckets;
this.metrics = metrics;
this.bucketSizeLimit = bucketSizeLimit;
@ -233,8 +250,11 @@ public class FacetStream extends TupleStream implements Expressible {
expression.addParameter(collection);
// parameters
for(Entry<String,String> param : props.entrySet()){
expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue()));
ModifiableSolrParams tmpParams = new ModifiableSolrParams(params);
for (Entry<String, String[]> param : tmpParams.getMap().entrySet()) {
expression.addParameter(new StreamExpressionNamedParameter(param.getKey(),
String.join(",", param.getValue())));
}
// buckets
@ -289,7 +309,9 @@ public class FacetStream extends TupleStream implements Expressible {
child.setImplementingClass("Solr/Lucene");
child.setExpressionType(ExpressionType.DATASTORE);
child.setExpression(props.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
ModifiableSolrParams tmpParams = new ModifiableSolrParams(SolrParams.toMultiMap(params.toNamedList()));
child.setExpression(tmpParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
explanation.addChild(child);
@ -301,8 +323,7 @@ public class FacetStream extends TupleStream implements Expressible {
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
return l;
return new ArrayList();
}
public void open() throws IOException {
@ -317,11 +338,11 @@ public class FacetStream extends TupleStream implements Expressible {
FieldComparator[] adjustedSorts = adjustSorts(buckets, bucketSorts);
String json = getJsonFacetString(buckets, metrics, adjustedSorts, bucketSizeLimit);
ModifiableSolrParams params = getParams(this.props);
params.add("json.facet", json);
params.add("rows", "0");
ModifiableSolrParams paramsLoc = new ModifiableSolrParams(params);
paramsLoc.set("json.facet", json);
paramsLoc.set("rows", "0");
QueryRequest request = new QueryRequest(params);
QueryRequest request = new QueryRequest(paramsLoc);
try {
NamedList response = cloudSolrClient.request(request, collection);
getTuples(response, buckets, metrics);
@ -350,15 +371,6 @@ public class FacetStream extends TupleStream implements Expressible {
}
}
private ModifiableSolrParams getParams(Map<String, String> props) {
ModifiableSolrParams params = new ModifiableSolrParams();
for(String key : props.keySet()) {
String value = props.get(key);
params.add(key, value);
}
return params;
}
private String getJsonFacetString(Bucket[] _buckets, Metric[] _metrics, FieldComparator[] _sorts, int _limit) {
StringBuilder buf = new StringBuilder();
appendJson(buf, _buckets, _metrics, _sorts, _limit, 0);

View File

@ -16,21 +16,15 @@
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Set;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.Random;
import org.apache.solr.client.solrj.io.Tuple;
@ -49,7 +43,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Base64;
import org.apache.solr.common.params.ModifiableSolrParams;
/**
* The ParallelStream decorates a TupleStream implementation and pushes it to N workers for parallel execution.
@ -287,16 +281,17 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
Collections.shuffle(shuffler, new Random());
for(int w=0; w<workers; w++) {
HashMap params = new HashMap();
params.put("distrib","false"); // We are the aggregator.
params.put("numWorkers", workers);
params.put("workerID", w);
params.put("expr", pushStream);
params.put("qt","/stream");
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("distrib","false"); // We are the aggregator.
paramsLoc.set("numWorkers", workers);
paramsLoc.set("workerID", w);
paramsLoc.set("expr", pushStream.toString());
paramsLoc.set("qt","/stream");
Replica rep = shuffler.get(w);
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
String url = zkProps.getCoreUrl();
SolrStream solrStream = new SolrStream(url, params);
SolrStream solrStream = new SolrStream(url, paramsLoc);
solrStreams.add(solrStream);
}

View File

@ -32,6 +32,7 @@ import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.slf4j.Logger;
@ -48,7 +49,7 @@ public class SolrStream extends TupleStream {
private static final long serialVersionUID = 1;
private String baseUrl;
private Map params;
private SolrParams params;
private int numWorkers;
private int workerID;
private boolean trace;
@ -59,7 +60,25 @@ public class SolrStream extends TupleStream {
private String slice;
private long checkpoint = -1;
/**
* @param baseUrl Base URL of the stream.
* @param params Map&lt;String, String&gt; of parameters
* @deprecated, use the form that thakes SolrParams. Existing code can use
* new ModifiableSolrParams(SolrParams.toMultiMap(new NamedList(params)))
* for existing calls that use Map&lt;String, String&gt;
*/
@Deprecated
public SolrStream(String baseUrl, Map params) {
this.baseUrl = baseUrl;
this.params = new ModifiableSolrParams(new MapSolrParams(params));
}
/**
* @param baseUrl Base URL of the stream.
* @param params Map&lt;String, String&gt; of parameters
*/
public SolrStream(String baseUrl, SolrParams params) {
this.baseUrl = baseUrl;
this.params = params;
}
@ -118,9 +137,9 @@ public class SolrStream extends TupleStream {
this.checkpoint = checkpoint;
}
private SolrParams loadParams(Map params) throws IOException {
ModifiableSolrParams solrParams = new ModifiableSolrParams();
if(params.containsKey("partitionKeys")) {
private SolrParams loadParams(SolrParams paramsIn) throws IOException {
ModifiableSolrParams solrParams = new ModifiableSolrParams(paramsIn);
if (params.get("partitionKeys") != null) {
if(!params.get("partitionKeys").equals("none")) {
String partitionFilter = getPartitionFilter();
solrParams.add("fq", partitionFilter);
@ -135,12 +154,6 @@ public class SolrStream extends TupleStream {
solrParams.add("fq", "{!frange cost=100 incl=false l="+checkpoint+"}_version_");
}
Iterator<Map.Entry> it = params.entrySet().iterator();
while(it.hasNext()) {
Map.Entry entry = it.next();
solrParams.add((String)entry.getKey(), entry.getValue().toString());
}
return solrParams;
}

View File

@ -42,7 +42,9 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
public class StatsStream extends TupleStream implements Expressible {
@ -52,7 +54,7 @@ public class StatsStream extends TupleStream implements Expressible {
private Metric[] metrics;
private String zkHost;
private Tuple tuple;
private Map<String, String> props;
private SolrParams params;
private String collection;
private boolean done;
private long count;
@ -60,16 +62,25 @@ public class StatsStream extends TupleStream implements Expressible {
protected transient SolrClientCache cache;
protected transient CloudSolrClient cloudSolrClient;
// Use StatsStream(String, String, SolrParams, Metric[]
@Deprecated
public StatsStream(String zkHost,
String collection,
Map<String, String> props,
Metric[] metrics) {
init(zkHost, collection, props, metrics);
init(zkHost, collection, new MapSolrParams(props), metrics);
}
private void init(String zkHost, String collection, Map<String, String> props, Metric[] metrics) {
public StatsStream(String zkHost,
String collection,
SolrParams params,
Metric[] metrics) {
init(zkHost, collection, params, metrics);
}
private void init(String zkHost, String collection, SolrParams params, Metric[] metrics) {
this.zkHost = zkHost;
this.props = props;
this.params = params;
this.metrics = metrics;
this.collection = collection;
}
@ -96,10 +107,10 @@ public class StatsStream extends TupleStream implements Expressible {
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
}
Map<String,String> params = new HashMap<String,String>();
ModifiableSolrParams params = new ModifiableSolrParams();
for(StreamExpressionNamedParameter namedParam : namedParams){
if(!namedParam.getName().equals("zkHost")){
params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
params.set(namedParam.getName(), namedParam.getParameter().toString().trim());
}
}
@ -139,8 +150,9 @@ public class StatsStream extends TupleStream implements Expressible {
expression.addParameter(collection);
// parameters
for(Entry<String,String> param : props.entrySet()){
expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue()));
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
for (Entry<String, String[]> param : mParams.getMap().entrySet()) {
expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), String.join(",", param.getValue())));
}
// zkHost
@ -171,7 +183,8 @@ public class StatsStream extends TupleStream implements Expressible {
child.setImplementingClass("Solr/Lucene");
child.setExpressionType(ExpressionType.DATASTORE);
child.setExpression(props.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
child.setExpression(mParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
explanation.addChild(child);
return explanation;
@ -195,12 +208,12 @@ public class StatsStream extends TupleStream implements Expressible {
.build();
}
ModifiableSolrParams params = getParams(this.props);
addStats(params, metrics);
params.add("stats", "true");
params.add("rows", "0");
ModifiableSolrParams paramsLoc = new ModifiableSolrParams(this.params);
addStats(paramsLoc, metrics);
paramsLoc.set("stats", "true");
paramsLoc.set("rows", "0");
QueryRequest request = new QueryRequest(params);
QueryRequest request = new QueryRequest(paramsLoc);
try {
NamedList response = cloudSolrClient.request(request, collection);
this.tuple = getTuple(response);
@ -275,15 +288,6 @@ public class StatsStream extends TupleStream implements Expressible {
}
}
private ModifiableSolrParams getParams(Map<String, String> props) {
ModifiableSolrParams params = new ModifiableSolrParams();
for(String key : props.keySet()) {
String value = props.get(key);
params.add(key, value);
}
return params;
}
private Tuple getTuple(NamedList response) {
Map map = new HashMap();

View File

@ -56,6 +56,9 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.slf4j.Logger;
@ -75,6 +78,8 @@ public class TopicStream extends CloudSolrStream implements Expressible {
private Map<String, Long> checkpoints = new HashMap<String, Long>();
private String checkpointCollection;
// Use TopicStream that takes a SolrParams
@Deprecated
public TopicStream(String zkHost,
String checkpointCollection,
String collection,
@ -86,25 +91,42 @@ public class TopicStream extends CloudSolrStream implements Expressible {
collection,
id,
checkpointEvery,
params);
new MapSolrParams(params));
}
public TopicStream(String zkHost,
String checkpointCollection,
String collection,
String id,
long checkpointEvery,
SolrParams params) {
init(zkHost,
checkpointCollection,
collection,
id,
checkpointEvery,
params);
}
private void init(String zkHost,
String checkpointCollection,
String collection,
String id,
long checkpointEvery,
Map<String, String> params) {
SolrParams params) {
this.zkHost = zkHost;
this.params = params;
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
if(mParams.getParams("rows") == null) {
mParams.set("rows", "500");
}
this.params = mParams;
this.collection = collection;
this.checkpointCollection = checkpointCollection;
this.checkpointEvery = checkpointEvery;
this.id = id;
this.comp = new FieldComparator("_version_", ComparatorOrder.ASCENDING);
if(!params.containsKey("rows")) {
params.put("rows", "500");
}
}
public TopicStream(StreamExpression expression, StreamFactory factory) throws IOException{
@ -147,12 +169,12 @@ public class TopicStream extends CloudSolrStream implements Expressible {
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
}
Map<String,String> params = new HashMap<String,String>();
ModifiableSolrParams params = new ModifiableSolrParams();
for(StreamExpressionNamedParameter namedParam : namedParams){
if(!namedParam.getName().equals("zkHost") &&
!namedParam.getName().equals("id") &&
!namedParam.getName().equals("checkpointEvery")) {
params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
params.set(namedParam.getName(), namedParam.getParameter().toString().trim());
}
}
@ -189,8 +211,9 @@ public class TopicStream extends CloudSolrStream implements Expressible {
// collection
expression.addParameter(collection);
for(Entry<String,String> param : params.entrySet()) {
String value = param.getValue();
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
for(Entry<String, String[]> param : mParams.getMap().entrySet()) {
String value = String.join(",", param.getValue());
// SOLR-8409: This is a special case where the params contain a " character
// Do note that in any other BASE streams with parameters where a " might come into play
@ -227,7 +250,9 @@ public class TopicStream extends CloudSolrStream implements Expressible {
child.setImplementingClass("Solr/Lucene");
child.setExpressionType(ExpressionType.DATASTORE);
child.setExpression(params.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
child.setExpression(mParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
explanation.addChild(child);
}
@ -254,8 +279,8 @@ public class TopicStream extends CloudSolrStream implements Expressible {
this.solrStreams = new ArrayList();
this.eofTuples = Collections.synchronizedMap(new HashMap());
if(cache != null) {
cloudSolrClient = cache.getCloudSolrClient(zkHost);
if(streamContext.getSolrClientCache() != null) {
cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
} else {
cloudSolrClient = new Builder()
.withZkHost(zkHost)
@ -313,7 +338,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
}
if (cache == null) {
if (streamContext.getSolrClientCache() == null) {
cloudSolrClient.close();
}
}
@ -369,11 +394,11 @@ public class TopicStream extends CloudSolrStream implements Expressible {
private long getCheckpoint(Slice slice, Set<String> liveNodes) throws IOException {
Collection<Replica> replicas = slice.getReplicas();
long checkpoint = -1;
Map params = new HashMap();
params.put("q","*:*");
params.put("sort", "_version_ desc");
params.put("distrib", "false");
params.put("rows", 1);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("q","*:*");
params.set("sort", "_version_ desc");
params.set("distrib", "false");
params.set("rows", 1);
for(Replica replica : replicas) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
String coreUrl = replica.getCoreUrl();
@ -432,7 +457,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())){
HttpSolrClient httpClient = cache.getHttpSolrClient(replica.getCoreUrl());
HttpSolrClient httpClient = streamContext.getSolrClientCache().getHttpSolrClient(replica.getCoreUrl());
try {
SolrDocument doc = httpClient.getById(id);
@ -477,20 +502,19 @@ public class TopicStream extends CloudSolrStream implements Expressible {
throw new Exception("Collection not found:" + this.collection);
}
}
params.put("distrib", "false"); // We are the aggregator.
String fl = params.get("fl");
params.put("sort", "_version_ asc");
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams.set("distrib", "false"); // We are the aggregator.
String fl = mParams.get("fl");
mParams.set("sort", "_version_ asc");
if(!fl.contains("_version_")) {
fl += ",_version_";
}
params.put("fl", fl);
mParams.set("fl", fl);
Random random = new Random();
for(Slice slice : slices) {
Map localParams = new HashMap();
localParams.putAll(params);
ModifiableSolrParams localParams = new ModifiableSolrParams(mParams);
long checkpoint = checkpoints.get(slice.getName());
Collection<Replica> replicas = slice.getReplicas();

View File

@ -397,7 +397,6 @@ public class GraphExpressionTest extends SolrCloudTestCase {
.commit(cluster.getSolrClient(), COLLECTION);
List<Tuple> tuples = null;
Set<String> paths = null;
GatherNodesStream stream = null;
StreamContext context = new StreamContext();
SolrClientCache cache = new SolrClientCache();

View File

@ -29,11 +29,13 @@ import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.StreamingTest;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.params.SolrParams;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -100,8 +102,7 @@ public class GraphTest extends SolrCloudTestCase {
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);
Map params = new HashMap();
params.put("fq", "predicate_s:knows");
SolrParams sParams = StreamingTest.mapParams("fq", "predicate_s:knows");
stream = new ShortestPathStream(zkHost,
"collection1",
@ -109,7 +110,7 @@ public class GraphTest extends SolrCloudTestCase {
"steve",
"from_s",
"to_s",
params,
sParams,
20,
3,
6);
@ -131,7 +132,7 @@ public class GraphTest extends SolrCloudTestCase {
//Test with batch size of 1
params.put("fq", "predicate_s:knows");
sParams = StreamingTest.mapParams("fq", "predicate_s:knows");
stream = new ShortestPathStream(zkHost,
"collection1",
@ -139,7 +140,7 @@ public class GraphTest extends SolrCloudTestCase {
"steve",
"from_s",
"to_s",
params,
sParams,
1,
3,
6);
@ -159,7 +160,7 @@ public class GraphTest extends SolrCloudTestCase {
//Test with bad predicate
params.put("fq", "predicate_s:crap");
sParams = StreamingTest.mapParams("fq", "predicate_s:crap");
stream = new ShortestPathStream(zkHost,
"collection1",
@ -167,7 +168,7 @@ public class GraphTest extends SolrCloudTestCase {
"steve",
"from_s",
"to_s",
params,
sParams,
1,
3,
6);
@ -180,7 +181,7 @@ public class GraphTest extends SolrCloudTestCase {
//Test with depth 2
params.put("fq", "predicate_s:knows");
sParams = StreamingTest.mapParams("fq", "predicate_s:knows");
stream = new ShortestPathStream(zkHost,
"collection1",
@ -188,7 +189,7 @@ public class GraphTest extends SolrCloudTestCase {
"steve",
"from_s",
"to_s",
params,
sParams,
1,
3,
2);
@ -202,7 +203,7 @@ public class GraphTest extends SolrCloudTestCase {
//Take out alex
params.put("fq", "predicate_s:knows NOT to_s:alex");
sParams = StreamingTest.mapParams("fq", "predicate_s:knows NOT to_s:alex");
stream = new ShortestPathStream(zkHost,
"collection1",
@ -210,7 +211,7 @@ public class GraphTest extends SolrCloudTestCase {
"steve",
"from_s",
"to_s",
params,
sParams,
10,
3,
6);

View File

@ -19,7 +19,6 @@ package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -47,6 +46,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -175,6 +175,26 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assert(tuples.size() == 3);
assertOrder(tuples, 0, 3, 4);
assertLong(tuples.get(1), "a_i", 3);
// Test a couple of multile field lists.
expression = StreamExpressionParser.parse("search(collection1, fq=\"a_s:hello0\", fq=\"a_s:hello1\", q=\"id:(*)\", " +
"zkHost=" + cluster.getZkServer().getZkAddress()+ ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
assertEquals("fq clauses should have prevented any docs from coming back", tuples.size(), 0);
expression = StreamExpressionParser.parse("search(collection1, fq=\"a_s:(hello0 OR hello1)\", q=\"id:(*)\", " +
"zkHost=" + cluster.getZkServer().getZkAddress() + ", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
assertEquals("Combining an f1 clause should show us 2 docs", tuples.size(), 2);
}
@Test
@ -193,33 +213,33 @@ public class StreamExpressionTest extends SolrCloudTestCase {
TupleStream stream;
// Basic test
Map<String,String> params = new HashMap<>();
params.put("expr","merge("
ModifiableSolrParams sParams = new ModifiableSolrParams();
sParams.set("expr", "merge("
+ "${q1},"
+ "${q2},"
+ "on=${mySort})");
params.put(CommonParams.QT, "/stream");
params.put("q1", "search(" + COLLECTION + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
params.put("q2", "search(" + COLLECTION + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
params.put("mySort", "a_f asc");
stream = new SolrStream(url, params);
sParams.set(CommonParams.QT, "/stream");
sParams.set("q1", "search(" + COLLECTION + ", q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
sParams.set("q2", "search(" + COLLECTION + ", q=\"id:(1)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
sParams.set("mySort", "a_f asc");
stream = new SolrStream(url, sParams);
tuples = getTuples(stream);
assertEquals(4, tuples.size());
assertOrder(tuples, 0,1,3,4);
// Basic test desc
params.put("mySort", "a_f desc");
stream = new SolrStream(url, params);
sParams.set("mySort", "a_f desc");
stream = new SolrStream(url, sParams);
tuples = getTuples(stream);
assertEquals(4, tuples.size());
assertOrder(tuples, 4,3,1,0);
// Basic w/ multi comp
params.put("q2", "search(" + COLLECTION + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
params.put("mySort", "\"a_f asc, a_s asc\"");
stream = new SolrStream(url, params);
sParams.set("q2", "search(" + COLLECTION + ", q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=${mySort})");
sParams.set("mySort", "\"a_f asc, a_s asc\"");
stream = new SolrStream(url, sParams);
tuples = getTuples(stream);
assertEquals(5, tuples.size());
@ -2677,16 +2697,14 @@ public class StreamExpressionTest extends SolrCloudTestCase {
//Lets sleep long enough for daemon updates to run.
//Lets stop the daemons
Map params = new HashMap();
params.put(CommonParams.QT,"/stream");
params.put("action","list");
ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream", "action", "list"));
int workersComplete = 0;
for(JettySolrRunner jetty : cluster.getJettySolrRunners()) {
int iterations = 0;
INNER:
while(iterations == 0) {
SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", params);
SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams);
solrStream.open();
Tuple tupleResponse = solrStream.read();
if (tupleResponse.EOF) {
@ -2714,27 +2732,27 @@ public class StreamExpressionTest extends SolrCloudTestCase {
cluster.getSolrClient().commit("parallelDestinationCollection1");
//Lets stop the daemons
params = new HashMap();
params.put(CommonParams.QT,"/stream");
params.put("action", "stop");
params.put("id", "test");
sParams = new ModifiableSolrParams();
sParams.set(CommonParams.QT, "/stream");
sParams.set("action", "stop");
sParams.set("id", "test");
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", params);
SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", sParams);
solrStream.open();
Tuple tupleResponse = solrStream.read();
solrStream.close();
}
params = new HashMap();
params.put(CommonParams.QT,"/stream");
params.put("action","list");
sParams = new ModifiableSolrParams();
sParams.set(CommonParams.QT, "/stream");
sParams.set("action", "list");
workersComplete = 0;
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
long stopTime = 0;
INNER:
while(stopTime == 0) {
SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", params);
SolrStream solrStream = new SolrStream(jetty.getBaseUrl() + "/collection1", sParams);
solrStream.open();
Tuple tupleResponse = solrStream.read();
if (tupleResponse.EOF) {

View File

@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -43,6 +42,8 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
@ -107,8 +108,8 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTION);
Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
List<Tuple> tuples = getTuples(ustream);
assertEquals(4, tuples.size());
@ -119,13 +120,13 @@ public class StreamingTest extends SolrCloudTestCase {
@Test
public void testSpacesInParams() throws Exception {
Map params = mapParams("q", "*:*", "fl", "id , a_s , a_i , a_f", "sort", "a_f asc , a_i asc");
SolrParams sParams = StreamingTest.mapParams("q", "*:*", "fl", "id , a_s , a_i , a_f", "sort", "a_f asc , a_i asc");
//CloudSolrStream compares the values of the sort with the fl field.
//The constructor will throw an exception if the sort fields do not the
//a value in the field list.
CloudSolrStream stream = new CloudSolrStream("", "collection1", params);
CloudSolrStream stream = new CloudSolrStream("", "collection1", sParams);
}
@Test
@ -144,8 +145,8 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
Map paramsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
SolrParams sParamsA = StreamingTest.mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "none");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
@ -170,8 +171,8 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
.commit(cluster.getSolrClient(), COLLECTION);
Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc", "partitionKeys", "a_f");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, ustream, 2, new FieldComparator("a_f",ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
@ -186,6 +187,31 @@ public class StreamingTest extends SolrCloudTestCase {
}
@Test
public void testMultipleFqClauses() throws Exception {
new UpdateRequest()
.add(id, "0", "a_ss", "hello0", "a_ss", "hello1", "a_i", "0", "a_f", "0")
.add(id, "2", "a_ss", "hello2", "a_i", "2", "a_f", "0")
.add(id, "3", "a_ss", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_ss", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_ss", "hello1", "a_i", "1", "a_f", "1")
.add(id, "5", "a_ss", "hello1", "a_i", "10", "a_f", "1")
.add(id, "6", "a_ss", "hello1", "a_i", "11", "a_f", "5")
.add(id, "7", "a_ss", "hello1", "a_i", "12", "a_f", "5")
.add(id, "8", "a_ss", "hello1", "a_i", "13", "a_f", "4")
.commit(cluster.getSolrClient(), COLLECTION);
streamFactory.withCollectionZkHost(COLLECTION, zkHost);
ModifiableSolrParams params = new ModifiableSolrParams(mapParams("q", "*:*", "fl", "id,a_i",
"sort", "a_i asc", "fq", "a_ss:hello0", "fq", "a_ss:hello1"));
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
List<Tuple> tuples = getTuples(stream);
assertEquals("Multiple fq clauses should have been honored", tuples.size(), 1);
assertEquals("should only have gotten back document 0", tuples.get(0).getString("id"), "0");
}
@Test
public void testRankStream() throws Exception {
@ -198,8 +224,8 @@ public class StreamingTest extends SolrCloudTestCase {
.commit(cluster.getSolrClient(), COLLECTION);
Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
RankStream rstream = new RankStream(stream, 3, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
List<Tuple> tuples = getTuples(rstream);
@ -224,8 +250,8 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTION);
Map params = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, rstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
attachStreamFactory(pstream);
@ -253,8 +279,8 @@ public class StreamingTest extends SolrCloudTestCase {
.commit(cluster.getSolrClient(), COLLECTION);
//Test with spaces in the parameter lists.
Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i,a_f", "sort", "a_s asc,a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
stream.setTrace(true);
List<Tuple> tuples = getTuples(stream);
assert(tuples.get(0).get("_COLLECTION_").equals(COLLECTION));
@ -280,8 +306,8 @@ public class StreamingTest extends SolrCloudTestCase {
.commit(cluster.getSolrClient(), COLLECTION);
//Test with spaces in the parameter lists.
Map paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
@ -303,8 +329,8 @@ public class StreamingTest extends SolrCloudTestCase {
assertMaps(maps2, 4, 6);
//Test with spaces in the parameter lists using a comparator
paramsA = mapParams("q","*:*","fl","id,a_s, a_i, a_f","sort", "a_s asc , a_f asc");
stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
sParamsA = mapParams("q", "*:*", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
rstream = new ReducerStream(stream,
new FieldComparator("a_s", ComparatorOrder.ASCENDING),
new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
@ -345,8 +371,8 @@ public class StreamingTest extends SolrCloudTestCase {
.commit(cluster.getSolrClient(), COLLECTION);
//Test with spaces in the parameter lists.
Map paramsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s, a_i, a_f", "sort", "a_s asc , a_f asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
@ -373,8 +399,8 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
Map paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
SolrParams sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
@ -401,8 +427,8 @@ public class StreamingTest extends SolrCloudTestCase {
//Test Descending with Ascending subsort
paramsA = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
sParamsA = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_s desc,a_f asc", "partitionKeys", "a_s");
stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
@ -447,8 +473,8 @@ public class StreamingTest extends SolrCloudTestCase {
.commit(cluster.getSolrClient(), COLLECTION);
//Test an error that comes originates from the /select handler
Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
ExceptionStream estream = new ExceptionStream(stream);
Tuple t = getTuple(estream);
assert(t.EOF);
@ -456,8 +482,8 @@ public class StreamingTest extends SolrCloudTestCase {
assert(t.getException().contains("sort param field can't be found: blah"));
//Test an error that comes originates from the /export handler
paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export");
stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt", "/export");
stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
estream = new ExceptionStream(stream);
t = getTuple(estream);
assert(t.EOF);
@ -483,8 +509,8 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
ExceptionStream estream = new ExceptionStream(pstream);
Tuple t = getTuple(estream);
@ -495,8 +521,8 @@ public class StreamingTest extends SolrCloudTestCase {
//Test an error that originates from the /select handler
paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "partitionKeys","a_s");
stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "partitionKeys", "a_s");
stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
estream = new ExceptionStream(pstream);
t = getTuple(estream);
@ -506,8 +532,8 @@ public class StreamingTest extends SolrCloudTestCase {
//Test an error that originates from the /export handler
paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export", "partitionKeys","a_s");
stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt", "/export", "partitionKeys", "a_s");
stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
pstream = new ParallelStream(zkHost, COLLECTION, stream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
estream = new ExceptionStream(pstream);
t = getTuple(estream);
@ -533,7 +559,7 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
Map paramsA = mapParams("q", "*:*");
SolrParams sParamsA = mapParams("q", "*:*");
Metric[] metrics = {new SumMetric("a_i"),
new SumMetric("a_f"),
@ -545,7 +571,7 @@ public class StreamingTest extends SolrCloudTestCase {
new MeanMetric("a_f"),
new CountMetric()};
StatsStream statsStream = new StatsStream(zkHost, COLLECTION, paramsA, metrics);
StatsStream statsStream = new StatsStream(zkHost, COLLECTION, sParamsA, metrics);
List<Tuple> tuples = getTuples(statsStream);
@ -593,7 +619,7 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
Bucket[] buckets = {new Bucket("a_s")};
@ -610,7 +636,7 @@ public class StreamingTest extends SolrCloudTestCase {
FieldComparator[] sorts = {new FieldComparator("sum(a_i)",
ComparatorOrder.ASCENDING)};
FacetStream facetStream = new FacetStream(zkHost, COLLECTION, paramsA, buckets, metrics, sorts, 100);
FacetStream facetStream = new FacetStream(zkHost, COLLECTION, sParamsA, buckets, metrics, sorts, 100);
List<Tuple> tuples = getTuples(facetStream);
@ -692,7 +718,7 @@ public class StreamingTest extends SolrCloudTestCase {
sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);
facetStream = new FacetStream(zkHost, COLLECTION, paramsA, buckets, metrics, sorts, 100);
facetStream = new FacetStream(zkHost, COLLECTION, sParamsA, buckets, metrics, sorts, 100);
tuples = getTuples(facetStream);
@ -775,7 +801,7 @@ public class StreamingTest extends SolrCloudTestCase {
sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);
facetStream = new FacetStream(zkHost, COLLECTION, paramsA, buckets, metrics, sorts, 100);
facetStream = new FacetStream(zkHost, COLLECTION, sParamsA, buckets, metrics, sorts, 100);
tuples = getTuples(facetStream);
@ -856,7 +882,7 @@ public class StreamingTest extends SolrCloudTestCase {
sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);
facetStream = new FacetStream(zkHost, COLLECTION, paramsA, buckets, metrics, sorts, 100);
facetStream = new FacetStream(zkHost, COLLECTION, sParamsA, buckets, metrics, sorts, 100);
tuples = getTuples(facetStream);
@ -949,7 +975,7 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
Map paramsA = mapParams("q","*:*","fl","a_i,a_f");
SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_i,a_f");
Bucket[] buckets = {new Bucket("level1_s"), new Bucket("level2_s")};
@ -961,7 +987,7 @@ public class StreamingTest extends SolrCloudTestCase {
FacetStream facetStream = new FacetStream(
zkHost,
COLLECTION,
paramsA,
sParamsA,
buckets,
metrics,
sorts,
@ -1041,7 +1067,7 @@ public class StreamingTest extends SolrCloudTestCase {
facetStream = new FacetStream(
zkHost,
COLLECTION,
paramsA,
sParamsA,
buckets,
metrics,
sorts,
@ -1134,8 +1160,8 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
Map paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
Bucket[] buckets = {new Bucket("a_s")};
@ -1234,8 +1260,8 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "12", "a_s", null, "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc", "qt", "/export");
stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "qt", "/export");
stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
Bucket[] buckets1 = {new Bucket("a_s")};
@ -1285,12 +1311,9 @@ public class StreamingTest extends SolrCloudTestCase {
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);
Map params = new HashMap();
params.put("q","a_s:hello0");
params.put("rows", "500");
params.put("fl", "id");
SolrParams sParams = mapParams("q", "a_s:hello0", "rows", "500", "fl", "id");
TopicStream topicStream = new TopicStream(zkHost, COLLECTION, COLLECTION, "50000000", 1000000, params);
TopicStream topicStream = new TopicStream(zkHost, COLLECTION, COLLECTION, "50000000", 1000000, sParams);
DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
daemonStream.setStreamContext(context);
@ -1300,13 +1323,11 @@ public class StreamingTest extends SolrCloudTestCase {
// Wait for the checkpoint
JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
Map params1 = new HashMap();
params1.put("qt","/get");
params1.put("ids","50000000");
params1.put("fl","id");
SolrParams sParams1 = mapParams("qt", "/get", "ids", "50000000", "fl", "id");
int count = 0;
while(count == 0) {
SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTION, params1);
SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTION, sParams1);
List<Tuple> tuples = getTuples(solrStream);
count = tuples.size();
if(count > 0) {
@ -1364,8 +1385,8 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
Bucket[] buckets = {new Bucket("a_s")};
@ -1475,8 +1496,8 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTION);
Map paramsA = mapParams("q", "blah", "fl", "id,a_s,a_i,a_f","sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, paramsA);
SolrParams sParamsA = mapParams("q", "blah", "fl", "id,a_s,a_i,a_f", "sort", "a_s asc,a_f asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
ReducerStream rstream = new ReducerStream(stream,
new FieldEqualitor("a_s"),
new GroupOperation(new FieldComparator("a_s", ComparatorOrder.ASCENDING), 2));
@ -1497,8 +1518,8 @@ public class StreamingTest extends SolrCloudTestCase {
"1", "i_multi", "2", "f_multi", "1.2", "f_multi", "1.3")
.commit(cluster.getSolrClient(), COLLECTION);
Map params = mapParams("q","*:*","fl","id,a_s,a_i,a_f,s_multi,i_multi,f_multi","sort", "a_s asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f,s_multi,i_multi,f_multi", "sort", "a_s asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
List<Tuple> tuples = getTuples(stream);
Tuple tuple = tuples.get(0);
@ -1538,11 +1559,11 @@ public class StreamingTest extends SolrCloudTestCase {
.commit(cluster.getSolrClient(), COLLECTION);
//Test ascending
Map paramsA = mapParams("q","id:(4 1)","fl","id,a_s,a_i","sort", "a_i asc");
CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
SolrParams sParamsA = mapParams("q", "id:(4 1)", "fl", "id,a_s,a_i", "sort", "a_i asc");
CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
Map paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i asc");
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
SolrParams sParamsB = mapParams("q", "id:(0 2 3)", "fl", "id,a_s,a_i", "sort", "a_i asc");
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
List<Tuple> tuples = getTuples(mstream);
@ -1551,11 +1572,11 @@ public class StreamingTest extends SolrCloudTestCase {
assertOrder(tuples, 0,1,2,3,4);
//Test descending
paramsA = mapParams("q","id:(4 1)","fl","id,a_s,a_i","sort", "a_i desc");
streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
sParamsA = mapParams("q", "id:(4 1)", "fl", "id,a_s,a_i", "sort", "a_i desc");
streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
paramsB = mapParams("q","id:(0 2 3)","fl","id,a_s,a_i","sort", "a_i desc");
streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
sParamsB = mapParams("q", "id:(0 2 3)", "fl", "id,a_s,a_i", "sort", "a_i desc");
streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
tuples = getTuples(mstream);
@ -1565,11 +1586,11 @@ public class StreamingTest extends SolrCloudTestCase {
//Test compound sort
paramsA = mapParams("q","id:(2 4 1)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
sParamsA = mapParams("q", "id:(2 4 1)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
sParamsB = mapParams("q", "id:(0 3)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.ASCENDING)));
tuples = getTuples(mstream);
@ -1577,11 +1598,11 @@ public class StreamingTest extends SolrCloudTestCase {
assert(tuples.size() == 5);
assertOrder(tuples, 0,2,1,3,4);
paramsA = mapParams("q","id:(2 4 1)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
sParamsA = mapParams("q", "id:(2 4 1)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i desc");
streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
paramsB = mapParams("q","id:(0 3)","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
sParamsB = mapParams("q", "id:(0 3)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i desc");
streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
mstream = new MergeStream(streamA, streamB, new MultipleFieldComparator(new FieldComparator("a_f",ComparatorOrder.ASCENDING),new FieldComparator("a_i",ComparatorOrder.DESCENDING)));
tuples = getTuples(mstream);
@ -1608,11 +1629,11 @@ public class StreamingTest extends SolrCloudTestCase {
.commit(cluster.getSolrClient(), COLLECTION);
//Test ascending
Map paramsA = mapParams("q","id:(4 1 8 7 9)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
SolrParams sParamsA = mapParams("q", "id:(4 1 8 7 9)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
SolrParams sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
@ -1623,11 +1644,11 @@ public class StreamingTest extends SolrCloudTestCase {
assertOrder(tuples, 0,1,2,3,4,7,6,8,9);
//Test descending
paramsA = mapParams("q", "id:(4 1 8 9)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i");
streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
sParamsA = mapParams("q", "id:(4 1 8 9)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i");
streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i desc", "partitionKeys", "a_i");
streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i desc", "partitionKeys", "a_i");
streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
pstream = new ParallelStream(zkHost, COLLECTION, mstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
@ -1656,11 +1677,11 @@ public class StreamingTest extends SolrCloudTestCase {
.commit(cluster.getSolrClient(), COLLECTION);
//Test ascending
Map paramsA = mapParams("q","id:(4 1 8 7 9)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, paramsA);
SolrParams sParamsA = mapParams("q", "id:(4 1 8 7 9)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTION, sParamsA);
Map paramsB = mapParams("q","id:(0 2 3 6)","fl","id,a_s,a_i","sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, paramsB);
SolrParams sParamsB = mapParams("q", "id:(0 2 3 6)", "fl", "id,a_s,a_i", "sort", "a_i asc", "partitionKeys", "a_i");
CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTION, sParamsB);
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
ParallelStream pstream = new ParallelStream(zkHost, COLLECTION, mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
@ -1685,20 +1706,19 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTION);
Map params = null;
//Basic CloudSolrStream Test with Descending Sort
params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i desc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, params);
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i desc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
List<Tuple> tuples = getTuples(stream);
assert(tuples.size() == 5);
assertOrder(tuples, 4, 3, 2, 1, 0);
//With Ascending Sort
params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc");
stream = new CloudSolrStream(zkHost, COLLECTION, params);
sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
tuples = getTuples(stream);
assert(tuples.size() == 5);
@ -1706,16 +1726,16 @@ public class StreamingTest extends SolrCloudTestCase {
//Test compound sort
params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i desc");
stream = new CloudSolrStream(zkHost, COLLECTION, params);
sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i desc");
stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
tuples = getTuples(stream);
assert(tuples.size() == 5);
assertOrder(tuples, 2,0,1,3,4);
params = mapParams("q","*:*","fl","id,a_s,a_i,a_f","sort", "a_f asc,a_i asc");
stream = new CloudSolrStream(zkHost, COLLECTION, params);
sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
stream = new CloudSolrStream(zkHost, COLLECTION, sParams);
tuples = getTuples(stream);
assert (tuples.size() == 5);
@ -1723,21 +1743,6 @@ public class StreamingTest extends SolrCloudTestCase {
}
protected Map mapParams(String... vals) {
Map params = new HashMap();
String k = null;
for(String val : vals) {
if(k == null) {
k = val;
} else {
params.put(k, val);
k = null;
}
}
return params;
}
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
tupleStream.open();
List<Tuple> tuples = new ArrayList();
@ -1819,4 +1824,15 @@ public class StreamingTest extends SolrCloudTestCase {
streamContext.setStreamFactory(streamFactory);
tupleStream.setStreamContext(streamContext);
}
public static SolrParams mapParams(String... vals) {
ModifiableSolrParams params = new ModifiableSolrParams();
assertEquals("Parameters passed in here must be in pairs!", 0, (vals.length % 2));
for (int idx = 0; idx < vals.length; idx += 2) {
params.add(vals[idx], vals[idx + 1]);
}
return params;
}
}