SOLR-8086: Add support for SELECT DISTINCT queries to the SQL interface

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1707819 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Joel Bernstein 2015-10-09 21:34:43 +00:00
parent 0a4b0833a2
commit 6085c0a4c2
6 changed files with 954 additions and 42 deletions

View File

@ -71,6 +71,8 @@ New Features
* SOLR-8038: Add the StatsStream to the Streaming API and wire it into the SQLHandler (Joel Bernstein)
* SOLR-8086: Add support for SELECT DISTINCT queries to the SQL interface (Joel Bernstein)
* SOLR-7543: Basic graph traversal query
Example: {!graph from="node_id" to="edge_id"}id:doc_1
(Kevin Watters, yonik)

View File

@ -33,15 +33,20 @@ import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.FacetStream;
import org.apache.solr.client.solrj.io.stream.ParallelStream;
import org.apache.solr.client.solrj.io.stream.RankStream;
import org.apache.solr.client.solrj.io.stream.EditStream;
import org.apache.solr.client.solrj.io.stream.RollupStream;
import org.apache.solr.client.solrj.io.stream.StatsStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.UniqueStream;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.*;
import org.apache.solr.common.SolrException;
@ -66,6 +71,12 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
private static String defaultZkhost = null;
private static String defaultWorkerCollection = null;
private static List<String> remove;
static {
remove = new ArrayList();
remove.add("count(*)");
}
private Logger logger = LoggerFactory.getLogger(SQLHandler.class);
@ -144,6 +155,12 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
} else {
sqlStream = doGroupByWithAggregates(sqlVistor, numWorkers, workerCollection, workerZkhost);
}
} else if(sqlVistor.isDistinct) {
if(aggregationMode == AggregationMode.FACET) {
sqlStream = doSelectDistinctFacets(sqlVistor);
} else {
sqlStream = doSelectDistinct(sqlVistor, numWorkers, workerCollection, workerZkhost);
}
} else {
sqlStream = doSelect(sqlVistor);
}
@ -238,6 +255,200 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
return tupleStream;
}
private static TupleStream doSelectDistinct(SQLVisitor sqlVisitor,
int numWorkers,
String workerCollection,
String workerZkHost) throws IOException {
Set<String> fieldSet = new HashSet();
Bucket[] buckets = getBuckets(sqlVisitor.fields, fieldSet);
Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
if(metrics.length > 0) {
throw new IOException("Select Distinct queries cannot include aggregate functions.");
}
String fl = fields(fieldSet);
String sort = null;
StreamEqualitor ecomp = null;
StreamComparator comp = null;
if(sqlVisitor.sorts != null && sqlVisitor.sorts.size() > 0) {
StreamComparator[] adjustedSorts = adjustSorts(sqlVisitor.sorts, buckets);
FieldEqualitor[] fieldEqualitors = new FieldEqualitor[adjustedSorts.length];
StringBuilder buf = new StringBuilder();
for(int i=0; i<adjustedSorts.length; i++) {
FieldComparator fieldComparator = (FieldComparator)adjustedSorts[i];
fieldEqualitors[i] = new FieldEqualitor(fieldComparator.getFieldName());
if(i>0) {
buf.append(",");
}
buf.append(fieldComparator.getFieldName()).append(" ").append(fieldComparator.getOrder().toString());
}
sort = buf.toString();
if(adjustedSorts.length == 1) {
ecomp = fieldEqualitors[0];
comp = adjustedSorts[0];
} else {
ecomp = new MultipleFieldEqualitor(fieldEqualitors);
comp = new MultipleFieldComparator(adjustedSorts);
}
} else {
StringBuilder sortBuf = new StringBuilder();
FieldEqualitor[] equalitors = new FieldEqualitor[buckets.length];
StreamComparator[] streamComparators = new StreamComparator[buckets.length];
for(int i=0; i<buckets.length; i++) {
equalitors[i] = new FieldEqualitor(buckets[i].toString());
streamComparators[i] = new FieldComparator(buckets[i].toString(), ComparatorOrder.ASCENDING);
if(i>0) {
sortBuf.append(',');
}
sortBuf.append(buckets[i].toString()).append(" asc");
}
sort = sortBuf.toString();
if(equalitors.length == 1) {
ecomp = equalitors[0];
comp = streamComparators[0];
} else {
ecomp = new MultipleFieldEqualitor(equalitors);
comp = new MultipleFieldComparator(streamComparators);
}
}
TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
String zkHost = tableSpec.zkHost;
String collection = tableSpec.collection;
Map<String, String> params = new HashMap();
params.put(CommonParams.FL, fl);
params.put(CommonParams.Q, sqlVisitor.query);
//Always use the /export handler for Distinct Queries because it requires exporting full result sets.
params.put(CommonParams.QT, "/export");
if(numWorkers > 1) {
params.put("partitionKeys", getPartitionKeys(buckets));
}
params.put("sort", sort);
TupleStream tupleStream = null;
CloudSolrStream cstream = new CloudSolrStream(zkHost, collection, params);
tupleStream = new UniqueStream(cstream, ecomp);
if(numWorkers > 1) {
// Do the unique in parallel
// Maintain the sort of the Tuples coming from the workers.
ParallelStream parallelStream = new ParallelStream(workerZkHost, workerCollection, tupleStream, numWorkers, comp);
StreamFactory factory = new StreamFactory()
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("unique", UniqueStream.class);
parallelStream.setStreamFactory(factory);
parallelStream.setObjectSerialize(false);
tupleStream = parallelStream;
}
if(sqlVisitor.limit > 0) {
tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
}
return tupleStream;
}
private static StreamComparator[] adjustSorts(List<SortItem> sorts, Bucket[] buckets) throws IOException {
List<FieldComparator> adjustedSorts = new ArrayList();
Set<String> bucketFields = new HashSet();
Set<String> sortFields = new HashSet();
for(SortItem sortItem : sorts) {
sortFields.add(stripSingleQuotes(stripQuotes(sortItem.getSortKey().toString())));
adjustedSorts.add(new FieldComparator(stripSingleQuotes(stripQuotes(sortItem.getSortKey().toString())),
ascDescComp(sortItem.getOrdering().toString())));
}
for(Bucket bucket : buckets) {
bucketFields.add(bucket.toString());
}
for(SortItem sortItem : sorts) {
String sortField = stripSingleQuotes(stripQuotes(sortItem.getSortKey().toString()));
if(!bucketFields.contains(sortField)) {
throw new IOException("All sort fields must be in the field list.");
}
}
//Add sort fields if needed
if(sorts.size() < buckets.length) {
for(Bucket bucket : buckets) {
String b = bucket.toString();
if(!sortFields.contains(b)) {
adjustedSorts.add(new FieldComparator(bucket.toString(), ComparatorOrder.ASCENDING));
}
}
}
return adjustedSorts.toArray(new FieldComparator[adjustedSorts.size()]);
}
private static TupleStream doSelectDistinctFacets(SQLVisitor sqlVisitor) throws IOException {
Set<String> fieldSet = new HashSet();
Bucket[] buckets = getBuckets(sqlVisitor.fields, fieldSet);
Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
if(metrics.length > 0) {
throw new IOException("Select Distinct queries cannot include aggregate functions.");
}
TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
String zkHost = tableSpec.zkHost;
String collection = tableSpec.collection;
Map<String, String> params = new HashMap();
params.put(CommonParams.Q, sqlVisitor.query);
int limit = sqlVisitor.limit > 0 ? sqlVisitor.limit : 100;
FieldComparator[] sorts = null;
if(sqlVisitor.sorts == null) {
sorts = new FieldComparator[buckets.length];
for(int i=0; i<sorts.length; i++) {
sorts[i] = new FieldComparator("index", ComparatorOrder.ASCENDING);
}
} else {
StreamComparator[] comps = adjustSorts(sqlVisitor.sorts, buckets);
sorts = new FieldComparator[comps.length];
for(int i=0; i<comps.length; i++) {
sorts[i] = (FieldComparator)comps[i];
}
}
TupleStream tupleStream = new FacetStream(zkHost,
collection,
params,
buckets,
metrics,
sorts,
limit);
if(sqlVisitor.limit > 0) {
tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
}
return new EditStream(tupleStream, remove);
}
private static TupleStream doGroupByWithAggregatesFacets(SQLVisitor sqlVisitor) throws IOException {
Set<String> fieldSet = new HashSet();
@ -344,7 +555,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
if (comma) {
siBuf.append(",");
}
siBuf.append(stripQuotes(sortItem.getSortKey().toString()) + " " + ascDesc(sortItem.getOrdering().toString()));
siBuf.append(stripSingleQuotes(stripQuotes(sortItem.getSortKey().toString())) + " " + ascDesc(sortItem.getOrdering().toString()));
}
} else {
if(sqlVisitor.limit < 0) {
@ -388,7 +599,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
for(int i=0; i< buckets.length; i++) {
Bucket bucket = buckets[i];
SortItem sortItem = sortItems.get(i);
if(!bucket.toString().equals(stripQuotes(sortItem.getSortKey().toString()))) {
if(!bucket.toString().equals(stripSingleQuotes(stripQuotes(sortItem.getSortKey().toString())))) {
return false;
}
@ -453,7 +664,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
public static String getSortDirection(List<SortItem> sorts) {
if(sorts != null && sorts.size() > 0) {
for(SortItem item : sorts) {
return ascDesc(stripQuotes(item.getOrdering().toString()));
return ascDesc(stripSingleQuotes(stripQuotes(item.getOrdering().toString())));
}
}
@ -482,7 +693,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
String ordering = sortItem.getOrdering().toString();
ComparatorOrder comparatorOrder = ascDescComp(ordering);
String sortKey = sortItem.getSortKey().toString();
comps[i] = new FieldComparator(stripQuotes(sortKey), comparatorOrder);
comps[i] = new FieldComparator(stripSingleQuotes(stripQuotes(sortKey)), comparatorOrder);
}
if(comps.length == 1) {
@ -499,7 +710,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
String ordering = sortItem.getOrdering().toString();
ComparatorOrder comparatorOrder = ascDescComp(ordering);
String sortKey = sortItem.getSortKey().toString();
comps[i] = new FieldComparator(stripQuotes(sortKey), comparatorOrder);
comps[i] = new FieldComparator(stripSingleQuotes(stripQuotes(sortKey)), comparatorOrder);
}
return comps;
@ -653,7 +864,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
value = '"'+value+'"';
}
buf.append('(').append(stripQuotes(field) + ":" + value).append(')');
buf.append('(').append(stripQuotes(stripSingleQuotes(field)) + ":" + value).append(')');
return null;
}
}
@ -668,6 +879,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
public int limit = -1;
public boolean groupByQuery;
public Expression havingExpression;
public boolean isDistinct;
public SQLVisitor(StringBuilder builder) {
this.builder = builder;
@ -731,8 +943,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
this.groupByQuery = true;
List<Expression> groups = node.getGroupBy();
for(Expression group : groups) {
groupBy.add(stripQuotes(group.toString()));
groupBy.add(stripSingleQuotes(stripQuotes(group.toString())));
}
}
@ -756,14 +967,14 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
protected Void visitComparisonExpression(ComparisonExpression node, Integer index) {
String field = node.getLeft().toString();
String value = node.getRight().toString();
query = stripQuotes(field)+":"+stripQuotes(value);
query = stripSingleQuotes(stripQuotes(field))+":"+stripQuotes(value);
return null;
}
protected Void visitSelect(Select node, Integer indent) {
this.append(indent.intValue(), "SELECT");
if(node.isDistinct()) {
this.isDistinct = true;
}
if(node.getSelectItems().size() > 1) {
@ -781,7 +992,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
}
protected Void visitSingleColumn(SingleColumn node, Integer indent) {
fields.add(stripQuotes(ExpressionFormatter.formatExpression(node.getExpression())));
fields.add(stripSingleQuotes(stripQuotes(ExpressionFormatter.formatExpression(node.getExpression()))));
if(node.getAlias().isPresent()) {
}
@ -794,7 +1005,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
}
protected Void visitTable(Table node, Integer indent) {
this.table = node.getName().toString();
this.table = stripSingleQuotes(node.getName().toString());
return null;
}
@ -892,7 +1103,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
}
}
public static class HavingStream extends TupleStream {
private static class HavingStream extends TupleStream {
private TupleStream stream;
private HavingVisitor havingVisitor;

View File

@ -28,14 +28,7 @@ import com.facebook.presto.sql.tree.Statement;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.SolrStream;
import org.apache.solr.client.solrj.io.stream.StatsStream;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.common.params.CommonParams;
import org.junit.After;
@ -95,13 +88,17 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
waitForRecoveriesToFinish(false);
testPredicate();
testBasicSelect();
testStringLiteralFields();
testBasicGrouping();
testBasicGroupingFacets();
testSelectDistinct();
testSelectDistinctFacets();
testAggregatesWithoutGrouping();
testSQLException();
testTimeSeriesGrouping();
testTimeSeriesGroupingFacet();
testParallelBasicGrouping();
testParallelSelectDistinct();
testParallelTimeSeriesGrouping();
}
@ -124,6 +121,15 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(sqlVistor.query.equals("(c:\"d\")"));
//Upper case
parser = new SqlParser();
sql = "select a from b where ('CcC' = 'D')";
statement = parser.createStatement(sql);
sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
sqlVistor.process(statement, new Integer(0));
assert(sqlVistor.query.equals("(CcC:\"D\")"));
//Phrase
parser = new SqlParser();
sql = "select a from b where (c = 'd d')";
@ -200,11 +206,11 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
// Complex Lucene/Solr Query
parser = new SqlParser();
sql = "select a from b where ((c = '[0 TO 100]') OR ((l = '(z*)') AND (m = '(j OR (k NOT s))')))";
sql = "select a from b where (('c' = '[0 TO 100]') OR ((l = '(z*)') AND ('M' = '(j OR (k NOT s))')))";
statement = parser.createStatement(sql);
sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
sqlVistor.process(statement, new Integer(0));
assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z*)) AND (m:(j OR (k NOT s)))))"));
assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z*)) AND (M:(j OR (k NOT s)))))"));
}
private void testBasicSelect() throws Exception {
@ -216,18 +222,18 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
commit();
indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7");
indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8");
indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11");
indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40");
indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
indexDoc(sdoc("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7"));
indexDoc(sdoc("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8"));
indexDoc(sdoc("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20"));
indexDoc(sdoc("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11"));
indexDoc(sdoc("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30"));
indexDoc(sdoc("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40"));
indexDoc(sdoc("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50"));
indexDoc(sdoc("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60"));
commit();
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select id, field_i, str_s from collection1 where text='XXXX' order by field_i desc");
params.put("sql", "select 'id', field_i, str_s from collection1 where 'text'='XXXX' order by field_i desc");
SolrStream solrStream = new SolrStream(jetty.url, params);
List<Tuple> tuples = getTuples(solrStream);
@ -319,6 +325,81 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
}
}
private void testStringLiteralFields() throws Exception {
try {
CloudJettyRunner jetty = this.cloudJettys.get(0);
del("*:*");
commit();
indexDoc(sdoc("id", "1", "Text_t", "XXXX XXXX", "str_s", "a", "Field_i", "7"));
indexDoc(sdoc("id", "2", "Text_t", "XXXX XXXX", "str_s", "b", "Field_i", "8"));
indexDoc(sdoc("id", "3", "Text_t", "XXXX XXXX", "str_s", "a", "Field_i", "20"));
indexDoc(sdoc("id", "4", "Text_t", "XXXX XXXX", "str_s", "b", "Field_i", "11"));
indexDoc(sdoc("id", "5", "Text_t", "XXXX XXXX", "str_s", "c", "Field_i", "30"));
indexDoc(sdoc("id", "6", "Text_t", "XXXX XXXX", "str_s", "c", "Field_i", "40"));
indexDoc(sdoc("id", "7", "Text_t", "XXXX XXXX", "str_s", "c", "Field_i", "50"));
indexDoc(sdoc("id", "8", "Text_t", "XXXX XXXX", "str_s", "c", "Field_i", "60"));
commit();
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select id, 'Field_i', str_s from Collection1 where 'Text_t'='XXXX' order by 'Field_i' desc");
SolrStream solrStream = new SolrStream(jetty.url, params);
List<Tuple> tuples = getTuples(solrStream);
assert(tuples.size() == 8);
Tuple tuple = null;
tuple = tuples.get(0);
assert(tuple.getLong("id") == 8);
assert(tuple.getLong("Field_i") == 60);
assert(tuple.get("str_s").equals("c"));
tuple = tuples.get(1);
assert(tuple.getLong("id") == 7);
assert(tuple.getLong("Field_i") == 50);
assert(tuple.get("str_s").equals("c"));
tuple = tuples.get(2);
assert(tuple.getLong("id") == 6);
assert(tuple.getLong("Field_i") == 40);
assert(tuple.get("str_s").equals("c"));
tuple = tuples.get(3);
assert(tuple.getLong("id") == 5);
assert(tuple.getLong("Field_i") == 30);
assert(tuple.get("str_s").equals("c"));
tuple = tuples.get(4);
assert(tuple.getLong("id") == 3);
assert(tuple.getLong("Field_i") == 20);
assert(tuple.get("str_s").equals("a"));
tuple = tuples.get(5);
assert(tuple.getLong("id") == 4);
assert(tuple.getLong("Field_i") == 11);
assert(tuple.get("str_s").equals("b"));
tuple = tuples.get(6);
assert(tuple.getLong("id") == 2);
assert(tuple.getLong("Field_i") == 8);
assert(tuple.get("str_s").equals("b"));
tuple = tuples.get(7);
assert(tuple.getLong("id") == 1);
assert(tuple.getLong("Field_i") == 7);
assert(tuple.get("str_s").equals("a"));
} finally {
delete();
}
}
private void testSQLException() throws Exception {
try {
@ -328,14 +409,14 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
commit();
indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7");
indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8");
indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11");
indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40");
indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
indexDoc(sdoc("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7"));
indexDoc(sdoc("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8"));
indexDoc(sdoc("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20"));
indexDoc(sdoc("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11"));
indexDoc(sdoc("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30"));
indexDoc(sdoc("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40"));
indexDoc(sdoc("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50"));
indexDoc(sdoc("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60"));
commit();
Map params = new HashMap();
@ -417,7 +498,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
commit();
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
params.put("sql", "select str_s, 'count(*)', sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by 'str_s' order by 'sum(field_i)' asc limit 2");
SolrStream solrStream = new SolrStream(jetty.url, params);
List<Tuple> tuples = getTuples(solrStream);
@ -530,6 +611,530 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
}
}
private void testSelectDistinctFacets() throws Exception {
try {
CloudJettyRunner jetty = this.cloudJettys.get(0);
del("*:*");
commit();
indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "1");
indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "2");
indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "2");
indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
commit();
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("aggregationMode", "facet");
params.put("sql", "select distinct 'str_s', 'field_i' from collection1 order by 'str_s' asc, 'field_i' asc");
SolrStream solrStream = new SolrStream(jetty.url, params);
List<Tuple> tuples = getTuples(solrStream);
assert(tuples.size() == 6);
Tuple tuple = null;
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 1);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 20);
tuple = tuples.get(2);
assert(tuple.get("str_s").equals("b"));
assert(tuple.getLong("field_i") == 2);
tuple = tuples.get(3);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 30);
tuple = tuples.get(4);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 50);
tuple = tuples.get(5);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 60);
//reverse the sort
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("aggregationMode", "facet");
params.put("sql", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
assert(tuples.size() == 6);
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 60);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 50);
tuple = tuples.get(2);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 30);
tuple = tuples.get(3);
assert(tuple.get("str_s").equals("b"));
assert(tuple.getLong("field_i") == 2);
tuple = tuples.get(4);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 20);
tuple = tuples.get(5);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 1);
//test with limit
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("aggregationMode", "facet");
params.put("sql", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc limit 2");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
assert(tuples.size() == 2);
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 60);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 50);
// Test without a sort. Sort should be asc by default.
new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("aggregationMode", "facet");
params.put("sql", "select distinct str_s, field_i from collection1");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
assert(tuples.size() == 6);
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 1);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 20);
tuple = tuples.get(2);
assert(tuple.get("str_s").equals("b"));
assert(tuple.getLong("field_i") == 2);
tuple = tuples.get(3);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 30);
tuple = tuples.get(4);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 50);
tuple = tuples.get(5);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 60);
// Test with a predicate.
new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("aggregationMode", "facet");
params.put("sql", "select distinct str_s, field_i from collection1 where str_s = 'a'");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
assert(tuples.size() == 2);
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("a"));
assert (tuple.getLong("field_i") == 1);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 20);
} finally {
delete();
}
}
private void testSelectDistinct() throws Exception {
try {
CloudJettyRunner jetty = this.cloudJettys.get(0);
del("*:*");
commit();
indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "1");
indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "2");
indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "2");
indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
commit();
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select distinct 'str_s', 'field_i' from collection1 order by 'str_s' asc, 'field_i' asc");
SolrStream solrStream = new SolrStream(jetty.url, params);
List<Tuple> tuples = getTuples(solrStream);
assert(tuples.size() == 6);
Tuple tuple = null;
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 1);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 20);
tuple = tuples.get(2);
assert(tuple.get("str_s").equals("b"));
assert(tuple.getLong("field_i") == 2);
tuple = tuples.get(3);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 30);
tuple = tuples.get(4);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 50);
tuple = tuples.get(5);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 60);
//reverse the sort
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
assert(tuples.size() == 6);
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 60);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 50);
tuple = tuples.get(2);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 30);
tuple = tuples.get(3);
assert(tuple.get("str_s").equals("b"));
assert(tuple.getLong("field_i") == 2);
tuple = tuples.get(4);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 20);
tuple = tuples.get(5);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 1);
//test with limit
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc limit 2");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
assert(tuples.size() == 2);
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 60);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 50);
// Test without a sort. Sort should be asc by default.
new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select distinct str_s, field_i from collection1");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
assert(tuples.size() == 6);
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 1);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 20);
tuple = tuples.get(2);
assert(tuple.get("str_s").equals("b"));
assert(tuple.getLong("field_i") == 2);
tuple = tuples.get(3);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 30);
tuple = tuples.get(4);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 50);
tuple = tuples.get(5);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 60);
// Test with a predicate.
new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select distinct str_s, field_i from collection1 where str_s = 'a'");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
assert(tuples.size() == 2);
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 1);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 20);
} finally {
delete();
}
}
private void testParallelSelectDistinct() throws Exception {
try {
CloudJettyRunner jetty = this.cloudJettys.get(0);
del("*:*");
commit();
indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "1");
indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "2");
indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "2");
indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
commit();
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("numWorkers", "2");
params.put("sql", "select distinct str_s, field_i from collection1 order by str_s asc, field_i asc");
SolrStream solrStream = new SolrStream(jetty.url, params);
List<Tuple> tuples = getTuples(solrStream);
assert(tuples.size() == 6);
Tuple tuple = null;
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 1);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 20);
tuple = tuples.get(2);
assert(tuple.get("str_s").equals("b"));
assert(tuple.getLong("field_i") == 2);
tuple = tuples.get(3);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 30);
tuple = tuples.get(4);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 50);
tuple = tuples.get(5);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 60);
//reverse the sort
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("numWorkers", "2");
params.put("sql", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
assert(tuples.size() == 6);
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 60);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 50);
tuple = tuples.get(2);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 30);
tuple = tuples.get(3);
assert(tuple.get("str_s").equals("b"));
assert(tuple.getLong("field_i") == 2);
tuple = tuples.get(4);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 20);
tuple = tuples.get(5);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 1);
//test with limit
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("numWorkers", "2");
params.put("sql", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc limit 2");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
assert(tuples.size() == 2);
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 60);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 50);
// Test without a sort. Sort should be asc by default.
new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("numWorkers", "2");
params.put("sql", "select distinct str_s, field_i from collection1");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
assert(tuples.size() == 6);
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 1);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 20);
tuple = tuples.get(2);
assert(tuple.get("str_s").equals("b"));
assert(tuple.getLong("field_i") == 2);
tuple = tuples.get(3);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 30);
tuple = tuples.get(4);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 50);
tuple = tuples.get(5);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 60);
// Test with a predicate.
new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("numWorkers", "2");
params.put("sql", "select distinct str_s, field_i from collection1 where str_s = 'a'");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
assert(tuples.size() == 2);
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 1);
tuple = tuples.get(1);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 20);
} finally {
delete();
}
}
private void testBasicGroupingFacets() throws Exception {
try {
@ -551,7 +1156,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("aggregationMode", "facet");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
params.put("sql", "select 'str_s', 'count(*)', sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by 'str_s' order by 'sum(field_i)' asc limit 2");
SolrStream solrStream = new SolrStream(jetty.url, params);
List<Tuple> tuples = getTuples(solrStream);
@ -719,6 +1324,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("max(field_i)") == 20);
assert(tuple.getDouble("avg(field_i)") == 13.5D);
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("numWorkers", "2");

View File

@ -63,6 +63,10 @@ public class Tuple implements Cloneable {
this.fields.put(key, value);
}
public void remove(Object key) {
this.fields.remove(key);
}
public String getString(Object key) {
return this.fields.get(key).toString();
}

View File

@ -286,13 +286,26 @@ public class CloudSolrStream extends TupleStream implements Expressible {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
//System.out.println("Connected to zk an got cluster state.");
Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
if(slices == null) {
String colLower = this.collection.toLowerCase(Locale.getDefault());
//Try case insensitive match
for(String col : clusterState.getCollections()) {
if(col.toLowerCase(Locale.getDefault()).equals(colLower)) {
slices = clusterState.getActiveSlices(col);
break;
}
}
if(slices == null) {
throw new Exception("Collection not found:" + this.collection);
}
}
params.put("distrib","false"); // We are the aggregator.

View File

@ -0,0 +1,76 @@
package org.apache.solr.client.solrj.io.stream;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
public class EditStream extends TupleStream {
private static final long serialVersionUID = 1;
private TupleStream stream;
private List<String> remove;
public EditStream(TupleStream stream, List<String> remove) {
this.stream = stream;
this.remove = remove;
}
public void setStreamContext(StreamContext context) {
this.stream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
l.add(stream);
return l;
}
public void open() throws IOException {
stream.open();
}
public void close() throws IOException {
stream.close();
}
public Tuple read() throws IOException {
Tuple tuple = stream.read();
if(tuple.EOF) {
return tuple;
} else {
for(String key : remove) {
tuple.remove(key);
}
return tuple;
}
}
public StreamComparator getStreamSort() {
return stream.getStreamSort();
}
public int getCost() {
return 0;
}
}