SOLR-13057: Allow search, facet and timeseries Streaming Expressions to accept a comma delimited list of collections

This commit is contained in:
Joel Bernstein 2018-12-12 09:15:41 -05:00
parent ce9a8012c0
commit 7e4555a2fd
6 changed files with 155 additions and 8 deletions

View File

@ -170,7 +170,11 @@ public class CloudSolrStream extends TupleStream implements Expressible {
StreamExpression expression = new StreamExpression("search"); StreamExpression expression = new StreamExpression("search");
// collection // collection
expression.addParameter(collection); if(collection.indexOf(',') > -1) {
expression.addParameter("\""+collection+"\"");
} else {
expression.addParameter(collection);
}
for (Entry<String, String[]> param : params.getMap().entrySet()) { for (Entry<String, String[]> param : params.getMap().entrySet()) {
for (String val : param.getValue()) { for (String val : param.getValue()) {
@ -334,11 +338,18 @@ public class CloudSolrStream extends TupleStream implements Expressible {
// which is something already supported in other parts of Solr // which is something already supported in other parts of Solr
// check for alias or collection // check for alias or collection
List<String> collections = checkAlias
? zkStateReader.getAliases().resolveAliases(collectionName) // if not an alias, returns collectionName List<String> allCollections = new ArrayList();
: Collections.singletonList(collectionName); String[] collectionNames = collectionName.split(",");
for(String col : collectionNames) {
List<String> collections = checkAlias
? zkStateReader.getAliases().resolveAliases(col) // if not an alias, returns collectionName
: Collections.singletonList(collectionName);
allCollections.addAll(collections);
}
// Lookup all actives slices for these collections // Lookup all actives slices for these collections
List<Slice> slices = collections.stream() List<Slice> slices = allCollections.stream()
.map(collectionsMap::get) .map(collectionsMap::get)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.flatMap(docCol -> Arrays.stream(docCol.getActiveSlicesArr())) .flatMap(docCol -> Arrays.stream(docCol.getActiveSlicesArr()))

View File

@ -100,6 +100,11 @@ public class FacetStream extends TupleStream implements Expressible {
public FacetStream(StreamExpression expression, StreamFactory factory) throws IOException{ public FacetStream(StreamExpression expression, StreamFactory factory) throws IOException{
// grab all parameters out // grab all parameters out
String collectionName = factory.getValueOperand(expression, 0); String collectionName = factory.getValueOperand(expression, 0);
if(collectionName.indexOf('"') > -1) {
collectionName = collectionName.replaceAll("\"", "").replaceAll(" ", "");
}
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression); List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
StreamExpressionNamedParameter bucketExpression = factory.getNamedOperand(expression, "buckets"); StreamExpressionNamedParameter bucketExpression = factory.getNamedOperand(expression, "buckets");
StreamExpressionNamedParameter bucketSortExpression = factory.getNamedOperand(expression, "bucketSorts"); StreamExpressionNamedParameter bucketSortExpression = factory.getNamedOperand(expression, "bucketSorts");
@ -378,7 +383,11 @@ public class FacetStream extends TupleStream implements Expressible {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// collection // collection
expression.addParameter(collection); if(collection.indexOf(',') > -1) {
expression.addParameter("\""+collection+"\"");
} else {
expression.addParameter(collection);
}
// parameters // parameters

View File

@ -48,6 +48,12 @@ public class SearchFacadeStream extends TupleStream implements Expressible {
public SearchFacadeStream(StreamExpression expression, StreamFactory factory) throws IOException{ public SearchFacadeStream(StreamExpression expression, StreamFactory factory) throws IOException{
// grab all parameters out // grab all parameters out
String collectionName = factory.getValueOperand(expression, 0); String collectionName = factory.getValueOperand(expression, 0);
//Handle comma delimited list of collections.
if(collectionName.indexOf('"') > -1) {
collectionName = collectionName.replaceAll("\"", "").replaceAll(" ", "");
}
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression); List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost"); StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");

View File

@ -128,7 +128,11 @@ public class SearchStream extends TupleStream implements Expressible {
StreamExpression expression = new StreamExpression("search"); StreamExpression expression = new StreamExpression("search");
// collection // collection
expression.addParameter(collection); if(collection.indexOf(',') > -1) {
expression.addParameter("\""+collection+"\"");
} else {
expression.addParameter(collection);
}
for (Entry<String, String[]> param : params.getMap().entrySet()) { for (Entry<String, String[]> param : params.getMap().entrySet()) {
for (String val : param.getValue()) { for (String val : param.getValue()) {

View File

@ -89,6 +89,11 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
public TimeSeriesStream(StreamExpression expression, StreamFactory factory) throws IOException{ public TimeSeriesStream(StreamExpression expression, StreamFactory factory) throws IOException{
// grab all parameters out // grab all parameters out
String collectionName = factory.getValueOperand(expression, 0); String collectionName = factory.getValueOperand(expression, 0);
if(collectionName.indexOf('"') > -1) {
collectionName = collectionName.replaceAll("\"", "").replaceAll(" ", "");
}
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression); List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
StreamExpressionNamedParameter startExpression = factory.getNamedOperand(expression, "start"); StreamExpressionNamedParameter startExpression = factory.getNamedOperand(expression, "start");
StreamExpressionNamedParameter endExpression = factory.getNamedOperand(expression, "end"); StreamExpressionNamedParameter endExpression = factory.getNamedOperand(expression, "end");
@ -212,7 +217,11 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
// function name // function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// collection // collection
expression.addParameter(collection); if(collection.indexOf(',') > -1) {
expression.addParameter("\""+collection+"\"");
} else {
expression.addParameter(collection);
}
// parameters // parameters
ModifiableSolrParams tmpParams = new ModifiableSolrParams(params); ModifiableSolrParams tmpParams = new ModifiableSolrParams(params);

View File

@ -1399,6 +1399,114 @@ public class StreamExpressionTest extends SolrCloudTestCase {
} }
@Test
public void testMultiCollection() throws Exception {
CollectionAdminRequest.createCollection("collection2", "conf", 2, 1).process(cluster.getSolrClient());
cluster.waitForActiveCollection("collection2", 2, 2);
new UpdateRequest()
.add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4", "i_multi", "7")
.add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44", "i_multi", "77")
.add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "test_dt", getDateString("2016", "5", "1"), "i_multi", "444", "i_multi", "777")
.add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4444", "i_multi", "7777")
.add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44444", "i_multi", "77777")
.commit(cluster.getSolrClient(), "collection1");
new UpdateRequest()
.add(id, "10", "a_s", "hello", "a_i", "10", "a_f", "0", "s_multi", "aaaa", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4", "i_multi", "7")
.add(id, "12", "a_s", "hello", "a_i", "12", "a_f", "0", "s_multi", "aaaa1", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44", "i_multi", "77")
.add(id, "13", "a_s", "hello", "a_i", "13", "a_f", "3", "s_multi", "aaaa2", "test_dt", getDateString("2016", "5", "1"), "i_multi", "444", "i_multi", "777")
.add(id, "14", "a_s", "hello", "a_i", "14", "a_f", "4", "s_multi", "aaaa3", "test_dt", getDateString("2016", "5", "1"), "i_multi", "4444", "i_multi", "7777")
.add(id, "11", "a_s", "hello", "a_i", "11", "a_f", "1", "s_multi", "aaaa4", "test_dt", getDateString("2016", "5", "1"), "i_multi", "44444", "i_multi", "77777")
.commit(cluster.getSolrClient(), "collection2");
List<Tuple> tuples;
StreamContext streamContext = new StreamContext();
SolrClientCache solrClientCache = new SolrClientCache();
streamContext.setSolrClientCache(solrClientCache);
List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
try {
StringBuilder buf = new StringBuilder();
for (String shardUrl : shardUrls) {
if (buf.length() > 0) {
buf.append(",");
}
buf.append(shardUrl);
}
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.add("qt", "/stream");
solrParams.add("expr", "search(\"collection1, collection2\", q=\"*:*\", fl=\"id, a_i\", rows=50, sort=\"a_i asc\")");
SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
solrStream.setStreamContext(streamContext);
tuples = getTuples(solrStream);
assert (tuples.size() == 10);
assertOrder(tuples, 0, 1, 2, 3, 4,10,11,12,13,14);
//Test with export handler, different code path.
solrParams = new ModifiableSolrParams();
solrParams.add("qt", "/stream");
solrParams.add("expr", "search(\"collection1, collection2\", q=\"*:*\", fl=\"id, a_i\", sort=\"a_i asc\", qt=\"/export\")");
solrStream = new SolrStream(shardUrls.get(0), solrParams);
solrStream.setStreamContext(streamContext);
tuples = getTuples(solrStream);
assert (tuples.size() == 10);
assertOrder(tuples, 0, 1, 2, 3, 4,10,11,12,13,14);
solrParams = new ModifiableSolrParams();
solrParams.add("qt", "/stream");
solrParams.add("expr", "facet(\"collection1, collection2\", q=\"*:*\", buckets=\"a_s\", bucketSorts=\"count(*) asc\", count(*))");
solrStream = new SolrStream(shardUrls.get(0), solrParams);
solrStream.setStreamContext(streamContext);
tuples = getTuples(solrStream);
assert (tuples.size() == 1);
Tuple tuple = tuples.get(0);
assertEquals(tuple.getString("a_s"), "hello");
assertEquals(tuple.getLong("count(*)").longValue(), 10);
String expr = "timeseries(\"collection1, collection2\", q=\"*:*\", " +
"start=\"2016-01-01T01:00:00.000Z\", " +
"end=\"2016-12-01T01:00:00.000Z\", " +
"gap=\"+1YEAR\", " +
"field=\"test_dt\", " +
"format=\"yyyy\","+
"count(*))";
solrParams = new ModifiableSolrParams();
solrParams.add("qt", "/stream");
solrParams.add("expr", expr);
solrStream = new SolrStream(shardUrls.get(0), solrParams);
solrStream.setStreamContext(streamContext);
tuples = getTuples(solrStream);
assert (tuples.size() == 1);
tuple = tuples.get(0);
assertEquals(tuple.getString("test_dt"), "2016");
assertEquals(tuple.getLong("count(*)").longValue(), 10);
//Test parallel
solrParams = new ModifiableSolrParams();
solrParams.add("qt", "/stream");
solrParams.add("expr", "parallel(collection1, sort=\"a_i asc\", workers=2, search(\"collection1, collection2\", q=\"*:*\", fl=\"id, a_i\", sort=\"a_i asc\", qt=\"/export\", partitionKeys=\"a_s\"))");
solrStream = new SolrStream(shardUrls.get(0), solrParams);
solrStream.setStreamContext(streamContext);
tuples = getTuples(solrStream);
assert (tuples.size() == 10);
assertOrder(tuples, 0, 1, 2, 3, 4,10,11,12,13,14);
} finally {
CollectionAdminRequest.deleteCollection("collection2").process(cluster.getSolrClient());
solrClientCache.close();
}
}
@Test @Test
public void testSubFacetStream() throws Exception { public void testSubFacetStream() throws Exception {