SOLR-8593: Make SQL handler friendlier out of the box

This commit is contained in:
Joel Bernstein 2017-02-13 11:46:08 -05:00
parent de512d7402
commit ec6ee96ae6
7 changed files with 269 additions and 88 deletions

View File

@ -79,7 +79,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware, Per
params.set("numWorkers", params.getInt("numWorkers", 1));
params.set("workerCollection", params.get("workerCollection", defaultWorkerCollection));
params.set("workerZkhost", params.get("workerZkhost", defaultZkhost));
params.set("aggregationMode", params.get("aggregationMode", "map_reduce"));
params.set("aggregationMode", params.get("aggregationMode", "facet"));
TupleStream tupleStream = null;
try {

View File

@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
/** Enumerator that reads from a Solr collection. */
@ -34,6 +35,7 @@ class SolrEnumerator implements Enumerator<Object> {
private final TupleStream tupleStream;
private final List<Map.Entry<String, Class>> fields;
private Tuple current;
private char sep = 31;
/** Creates a SolrEnumerator.
*
@ -84,6 +86,17 @@ class SolrEnumerator implements Enumerator<Object> {
return val;
}
if(val instanceof ArrayList) {
ArrayList arrayList = (ArrayList) val;
StringBuilder buf = new StringBuilder();
for(Object o : arrayList) {
buf.append(sep);
buf.append(o.toString());
}
val = buf.toString();
}
return val;
}

View File

@ -132,7 +132,8 @@ class SolrFilter extends Filter implements SolrRel {
case NOT:
return "-" + translateComparison(((RexCall) node).getOperands().get(0));
case EQUALS:
String terms = binaryTranslated.getValue().getValue2().toString().trim();
String terms = binaryTranslated.getValue().toString().trim();
terms = terms.replace("'","");
if (!terms.startsWith("(") && !terms.startsWith("[") && !terms.startsWith("{")) {
terms = "\"" + terms + "\"";
}
@ -141,19 +142,19 @@ class SolrFilter extends Filter implements SolrRel {
this.negativeQuery = false;
return clause;
case NOT_EQUALS:
return "-(" + binaryTranslated.getKey() + ":" + binaryTranslated.getValue().getValue2() + ")";
return "-(" + binaryTranslated.getKey() + ":" + binaryTranslated.getValue() + ")";
case LESS_THAN:
this.negativeQuery = false;
return "(" + binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " })";
return "(" + binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue() + " })";
case LESS_THAN_OR_EQUAL:
this.negativeQuery = false;
return "(" + binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " ])";
return "(" + binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue() + " ])";
case GREATER_THAN:
this.negativeQuery = false;
return "(" + binaryTranslated.getKey() + ": { " + binaryTranslated.getValue().getValue2() + " TO * ])";
return "(" + binaryTranslated.getKey() + ": { " + binaryTranslated.getValue() + " TO * ])";
case GREATER_THAN_OR_EQUAL:
this.negativeQuery = false;
return "(" + binaryTranslated.getKey() + ": [ " + binaryTranslated.getValue().getValue2() + " TO * ])";
return "(" + binaryTranslated.getKey() + ": [ " + binaryTranslated.getValue() + " TO * ])";
default:
throw new AssertionError("cannot translate " + node);
}
@ -305,21 +306,20 @@ class SolrFilter extends Filter implements SolrRel {
}
switch (node.getKind()) {
case EQUALS:
String terms = binaryTranslated.getValue().getValue2().toString().trim();
String terms = binaryTranslated.getValue().toString().trim();
String clause = "eq(" + binaryTranslated.getKey() + "," + terms + ")";
return clause;
case NOT_EQUALS:
return "not(eq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + "))";
return "not(eq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue() + "))";
case LESS_THAN:
return "lt(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + ")";
return "lt(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue() + ")";
case LESS_THAN_OR_EQUAL:
return "lteq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + ")";
return "lteq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue() + ")";
case GREATER_THAN:
return "gt(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + ")";
return "gt(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue() + ")";
case GREATER_THAN_OR_EQUAL:
return "gteq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + ")";
return "gteq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue() + ")";
default:
throw new AssertionError("cannot translate " + node);
}

View File

@ -90,6 +90,7 @@ class SolrSchema extends AbstractSchema {
final RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
final RelDataTypeFactory.FieldInfoBuilder fieldInfo = typeFactory.builder();
Map<String, LukeResponse.FieldInfo> luceneFieldInfoMap = getFieldInfo(collection);
for(Map.Entry<String, LukeResponse.FieldInfo> entry : luceneFieldInfoMap.entrySet()) {
LukeResponse.FieldInfo luceneFieldInfo = entry.getValue();
@ -110,13 +111,17 @@ class SolrSchema extends AbstractSchema {
type = typeFactory.createJavaType(String.class);
}
EnumSet<FieldFlag> flags = luceneFieldInfo.getFlags();
EnumSet<FieldFlag> flags = luceneFieldInfo.parseFlags(luceneFieldInfo.getSchema());
/*
if(flags != null && flags.contains(FieldFlag.MULTI_VALUED)) {
type = typeFactory.createArrayType(type, -1);
}
*/
fieldInfo.add(entry.getKey(), type).nullable(true);
}
fieldInfo.add("_query_",typeFactory.createJavaType(String.class));
fieldInfo.add("score",typeFactory.createJavaType(Double.class));
return RelDataTypeImpl.proto(fieldInfo.build());
}

View File

@ -32,6 +32,9 @@ import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.ops.AndOperation;
import org.apache.solr.client.solrj.io.ops.BooleanOperation;
import org.apache.solr.client.solrj.io.ops.EqualsOperation;
@ -216,10 +219,10 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
}
}
private List<Metric> buildMetrics(List<Pair<String, String>> metricPairs) {
private List<Metric> buildMetrics(List<Pair<String, String>> metricPairs, boolean ifEmptyCount) {
List<Metric> metrics = new ArrayList<>(metricPairs.size());
metrics.addAll(metricPairs.stream().map(this::getMetric).collect(Collectors.toList()));
if(metrics.size() == 0) {
if(metrics.size() == 0 && ifEmptyCount) {
metrics.add(new CountMetric());
}
return metrics;
@ -253,15 +256,35 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
ModifiableSolrParams params = new ModifiableSolrParams();
params.add(CommonParams.Q, query);
//Validate the fields
for(Map.Entry<String, Class> entry : fields) {
String fname = entry.getKey();
if(limit == null && "score".equals(fname)) {
throw new IOException("score is not a valid field for unlimited queries.");
}
if(fname.contains("*")) {
throw new IOException("* is not supported for column selection.");
}
}
String fl = getFields(fields);
if(orders.size() > 0) {
params.add(CommonParams.SORT, getSort(orders));
} else {
params.add(CommonParams.SORT, "_version_ desc");
if(limit == null) {
params.add(CommonParams.SORT, "_version_ desc");
fl = fl+",_version_";
} else {
params.add(CommonParams.SORT, "score desc");
if(fl.indexOf("score") == -1) {
fl = fl + ",score";
}
}
}
if(fields.size() > 0) {
params.add(CommonParams.FL, getFields(fields));
}
params.add(CommonParams.FL, fl);
if (limit != null) {
params.add(CommonParams.ROWS, limit);
@ -284,26 +307,23 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
return buf.toString();
}
private String getSingleSort(Pair<String, String> order) {
StringBuilder buf = new StringBuilder();
buf.append(order.getKey()).append(" ").append(order.getValue());
return buf.toString();
}
private String getFields(List<Map.Entry<String, Class>> fields) {
StringBuilder buf = new StringBuilder();
boolean appendVersion = true;
for(Map.Entry<String, Class> field : fields) {
if(buf.length() > 0) {
buf.append(",");
}
if(field.getKey().equals("_version_")) {
appendVersion = false;
}
buf.append(field.getKey());
}
if(appendVersion){
buf.append(",_version_");
}
return buf.toString();
}
@ -420,7 +440,11 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
Bucket[] buckets = buildBuckets(_buckets, fields);
Metric[] metrics = buildMetrics(metricPairs).toArray(new Metric[0]);
Metric[] metrics = buildMetrics(metricPairs, false).toArray(new Metric[0]);
if(metrics.length == 0) {
return handleSelectDistinctMapReduce(zk, collection, properties, fields, query, orders, buckets, limit);
}
Set<String> fieldSet = getFieldSet(metrics, fields);
@ -527,7 +551,6 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
return bucketsArray;
}
private TupleStream handleGroupByFacet(String zkHost,
String collection,
final List<Map.Entry<String, Class>> fields,
@ -542,13 +565,13 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
solrParams.add(CommonParams.Q, query);
Bucket[] buckets = buildBuckets(bucketFields, fields);
Metric[] metrics = buildMetrics(metricPairs).toArray(new Metric[0]);
Metric[] metrics = buildMetrics(metricPairs, true).toArray(new Metric[0]);
if(metrics.length == 0) {
metrics = new Metric[1];
metrics[0] = new CountMetric();
}
int limit = lim != null ? Integer.parseInt(lim) : 100;
int limit = lim != null ? Integer.parseInt(lim) : 1000;
FieldComparator[] sorts = null;
@ -561,13 +584,15 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
sorts = getComps(orders);
}
int overfetch = (int)(limit * 1.25);
TupleStream tupleStream = new FacetStream(zkHost,
collection,
solrParams,
buckets,
metrics,
sorts,
limit);
overfetch);
@ -602,30 +627,144 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
return tupleStream;
}
private TupleStream handleSelectDistinctMapReduce(final Properties properties,
private TupleStream handleSelectDistinctMapReduce(final String zkHost,
final String collection,
final Properties properties,
final List<Map.Entry<String, Class>> fields,
final String query,
final List<Pair<String, String>> orders,
final List<String> buckets,
final List<Pair<String, String>> metricPairs,
final String limit) {
final Bucket[] buckets,
final String limit) throws IOException{
int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
String fl = getFields(fields);
String sort = null;
StreamEqualitor ecomp = null;
StreamComparator comp = null;
if(orders != null && orders.size() > 0) {
StreamComparator[] adjustedSorts = adjustSorts(orders, buckets);
// Because of the way adjustSorts works we know that each FieldComparator has a single
// field name. For this reason we can just look at the leftFieldName
FieldEqualitor[] fieldEqualitors = new FieldEqualitor[adjustedSorts.length];
StringBuilder buf = new StringBuilder();
for(int i=0; i<adjustedSorts.length; i++) {
FieldComparator fieldComparator = (FieldComparator)adjustedSorts[i];
fieldEqualitors[i] = new FieldEqualitor(fieldComparator.getLeftFieldName());
if(i>0) {
buf.append(",");
}
buf.append(fieldComparator.getLeftFieldName()).append(" ").append(fieldComparator.getOrder().toString());
}
sort = buf.toString();
return null;
if(adjustedSorts.length == 1) {
ecomp = fieldEqualitors[0];
comp = adjustedSorts[0];
} else {
ecomp = new MultipleFieldEqualitor(fieldEqualitors);
comp = new MultipleFieldComparator(adjustedSorts);
}
} else {
StringBuilder sortBuf = new StringBuilder();
FieldEqualitor[] equalitors = new FieldEqualitor[buckets.length];
StreamComparator[] streamComparators = new StreamComparator[buckets.length];
for(int i=0; i<buckets.length; i++) {
equalitors[i] = new FieldEqualitor(buckets[i].toString());
streamComparators[i] = new FieldComparator(buckets[i].toString(), ComparatorOrder.ASCENDING);
if(i>0) {
sortBuf.append(',');
}
sortBuf.append(buckets[i].toString()).append(" asc");
}
sort = sortBuf.toString();
if(equalitors.length == 1) {
ecomp = equalitors[0];
comp = streamComparators[0];
} else {
ecomp = new MultipleFieldEqualitor(equalitors);
comp = new MultipleFieldComparator(streamComparators);
}
}
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.FL, fl);
params.set(CommonParams.Q, query);
//Always use the /export handler for Distinct Queries because it requires exporting full result sets.
params.set(CommonParams.QT, "/export");
if(numWorkers > 1) {
params.set("partitionKeys", getPartitionKeys(buckets));
}
params.set("sort", sort);
TupleStream tupleStream = null;
CloudSolrStream cstream = new CloudSolrStream(zkHost, collection, params);
tupleStream = new UniqueStream(cstream, ecomp);
if(numWorkers > 1) {
// Do the unique in parallel
// Maintain the sort of the Tuples coming from the workers.
ParallelStream parallelStream = new ParallelStream(zkHost, collection, tupleStream, numWorkers, comp);
StreamFactory factory = new StreamFactory()
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("unique", UniqueStream.class);
parallelStream.setStreamFactory(factory);
tupleStream = parallelStream;
}
if(limit != null) {
tupleStream = new LimitStream(tupleStream, Integer.parseInt(limit));
}
return tupleStream;
}
private TupleStream handleSelectDistinctFacet(final Properties properties,
final List<Map.Entry<String, Class>> fields,
final String query,
final List<Pair<String, String>> orders,
final List<String> buckets,
final List<Pair<String, String>> metricPairs,
final String limit) {
return null;
private StreamComparator[] adjustSorts(List<Pair<String, String>> orders, Bucket[] buckets) throws IOException {
List<FieldComparator> adjustedSorts = new ArrayList();
Set<String> bucketFields = new HashSet();
Set<String> sortFields = new HashSet();
ComparatorOrder comparatorOrder = ComparatorOrder.ASCENDING;
for(Pair<String, String> order : orders) {
sortFields.add(order.getKey());
adjustedSorts.add(new FieldComparator(order.getKey(), ascDescComp(order.getValue())));
comparatorOrder = ascDescComp(order.getValue());
}
for(Bucket bucket : buckets) {
bucketFields.add(bucket.toString());
}
for(String sf : sortFields) {
if(!bucketFields.contains(sf)) {
throw new IOException("All sort fields must be in the field list.");
}
}
//Add sort fields if needed
if(sortFields.size() < buckets.length) {
for(Bucket bucket : buckets) {
String b = bucket.toString();
if(!sortFields.contains(b)) {
adjustedSorts.add(new FieldComparator(bucket.toString(), comparatorOrder));
}
}
}
return adjustedSorts.toArray(new FieldComparator[adjustedSorts.size()]);
}
private TupleStream handleStats(String zk,
@ -636,7 +775,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.add(CommonParams.Q, query);
Metric[] metrics = buildMetrics(metricPairs).toArray(new Metric[0]);
Metric[] metrics = buildMetrics(metricPairs, false).toArray(new Metric[0]);
return new StatsStream(zk, collection, solrParams, metrics);
}

View File

@ -115,7 +115,6 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
List<Tuple> tuples = getTuples(solrStream);
assert(tuples.size() == 8);
Tuple tuple;
tuple = tuples.get(0);
@ -478,7 +477,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
indexDoc(sdoc("id", "8", "Text_t", "XXXX XXXX", "Str_s", "c", "Field_i", "60"));
commit();
SolrParams sParams = mapParams(CommonParams.QT, "/sql",
SolrParams sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select id, Field_i, Str_s from collection1 where Text_t='XXXX' order by Field_i desc");
SolrStream solrStream = new SolrStream(jetty.url, sParams);
@ -545,7 +544,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.get("Str_s").equals("a"));
assert(tuple.getDouble("EXPR$1") == 7);
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select Str_s, sum(Field_i) from collection1 where id='(1 8)' group by Str_s having (sum(Field_i) = 7 OR sum(Field_i) = 60) order by sum(Field_i) desc");
solrStream = new SolrStream(jetty.url, sParams);
@ -584,7 +583,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
indexDoc(sdoc("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60"));
commit();
SolrParams sParams = mapParams(CommonParams.QT, "/sql",
SolrParams sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select id, str_s from collection1 where text='XXXX' order by field_iff desc");
SolrStream solrStream = new SolrStream(jetty.url, sParams);
@ -603,7 +602,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getException().contains("Column 'field_iff' not found in any table"));
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select str_s, count(*), sum(field_iff), min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))");
solrStream = new SolrStream(jetty.url, sParams);
@ -612,7 +611,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.EXCEPTION);
assert(tuple.getException().contains("Column 'field_iff' not found in any table"));
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select str_s, count(*), blah(field_i), min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
solrStream = new SolrStream(jetty.url, sParams);
@ -645,7 +644,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
indexr("id", "9", "text", "XXXX XXXY", "str_s", "d", "field_i", "70");
commit();
SolrParams sParams = mapParams(CommonParams.QT, "/sql",
SolrParams sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
SolrStream solrStream = new SolrStream(jetty.url, sParams);
@ -653,7 +652,6 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
//Only two results because of the limit.
assert(tuples.size() == 2);
Tuple tuple;
tuple = tuples.get(0);
@ -672,7 +670,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$4") == 20); //max(field_i)
assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select str_s as myString, count(*), sum(field_i) as mySum, min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s order by mySum asc limit 2");
solrStream = new SolrStream(jetty.url, sParams);
@ -697,7 +695,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$4") == 20); //max(field_i)
assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), "
+ "cast(avg(1.0 * field_i) as float) from collection1 where (text='XXXX' AND NOT ((text='XXXY') AND (text='XXXY' OR text='XXXY'))) "
+ "group by str_s order by str_s desc");
@ -735,7 +733,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select str_s as myString, count(*) as myCount, sum(field_i) as mySum, min(field_i) as myMin, "
+ "max(field_i) as myMax, cast(avg(1.0 * field_i) as float) as myAvg from collection1 "
+ "where (text='XXXX' AND NOT (text='XXXY')) group by str_s order by str_s desc");
@ -772,7 +770,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("myMax") == 20);
assert(tuple.getDouble("myAvg") == 13.5D);
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) " +
"from collection1 where text='XXXX' group by str_s having sum(field_i) = 19");
@ -789,7 +787,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$4") == 11); //max(field_i)
assert(tuple.getDouble("EXPR$5") == 9.5D); //avg(field_i)
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) " +
"from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
@ -806,7 +804,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$4") == 11); //max(field_i)
assert(tuple.getDouble("EXPR$5") == 9.5D); //avg(field_i)
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select str_s, count(*), sum(field_i) as mySum, min(field_i), max(field_i), " +
"cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s " +
"having ((sum(field_i) = 19) AND (min(field_i) = 8))");
@ -824,7 +822,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$4") == 11); //max(field_i)
assert(tuple.getDouble("EXPR$5") == 9.5D); //avg(field_i)
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), " +
"cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s " +
"having ((sum(field_i) = 19) AND (min(field_i) = 100))");
@ -1063,7 +1061,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
commit();
SolrParams sParams = mapParams(CommonParams.QT, "/sql",
SolrParams sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select distinct str_s, field_i from collection1 order by str_s asc, field_i asc");
System.out.println("##################### testSelectDistinct()");
@ -1071,8 +1069,8 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
TupleStream solrStream = new SolrStream(jetty.url, sParams);
List<Tuple> tuples = getTuples(solrStream);
assert(tuples.size() == 6);
assert(tuples.size() == 6);
Tuple tuple = tuples.get(0);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 1);
@ -1099,7 +1097,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
//reverse the sort
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc");
solrStream = new SolrStream(jetty.url, sParams);
@ -1134,7 +1132,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getLong("field_i") == 1);
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select distinct str_s as myString, field_i from collection1 order by myString desc, field_i desc");
solrStream = new SolrStream(jetty.url, sParams);
@ -1170,7 +1168,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
//test with limit
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc limit 2");
solrStream = new SolrStream(jetty.url, sParams);
@ -1188,7 +1186,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
// Test without a sort. Sort should be asc by default.
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select distinct str_s, field_i from collection1");
solrStream = new SolrStream(jetty.url, sParams);
@ -1221,7 +1219,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getLong("field_i") == 60);
// Test with a predicate.
sParams = mapParams(CommonParams.QT, "/sql",
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
"stmt", "select distinct str_s, field_i from collection1 where str_s = 'a'");
solrStream = new SolrStream(jetty.url, sParams);
@ -1258,7 +1256,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
commit();
SolrParams sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
SolrParams sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select distinct str_s, field_i from collection1 order by str_s asc, field_i asc");
SolrStream solrStream = new SolrStream(jetty.url, sParams);
@ -1294,7 +1292,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
//reverse the sort
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc");
solrStream = new SolrStream(jetty.url, sParams);
@ -1328,7 +1326,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
//reverse the sort
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select distinct str_s as myString, field_i from collection1 order by myString desc, field_i desc");
solrStream = new SolrStream(jetty.url, sParams);
@ -1364,7 +1362,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
//test with limit
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc limit 2");
solrStream = new SolrStream(jetty.url, sParams);
@ -1382,7 +1380,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
// Test without a sort. Sort should be asc by default.
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select distinct str_s, field_i from collection1");
solrStream = new SolrStream(jetty.url, sParams);
@ -1415,7 +1413,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getLong("field_i") == 60);
// Test with a predicate.
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select distinct str_s, field_i from collection1 where str_s = 'a'");
solrStream = new SolrStream(jetty.url, sParams);
@ -1643,7 +1641,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
commit();
SolrParams sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
SolrParams sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), " +
"cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s " +
"order by sum(field_i) asc limit 2");
@ -1673,7 +1671,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select str_s, count(*), sum(field_i) as mySum, min(field_i), max(field_i), " +
"cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s order by mySum asc limit 2");
@ -1700,7 +1698,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), " +
"cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s order by str_s desc");
@ -1737,7 +1735,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select str_s as myString, count(*), sum(field_i), min(field_i), max(field_i), " +
"cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s order by myString desc");
@ -1774,7 +1772,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), " +
"cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s having sum(field_i) = 19");
@ -1791,7 +1789,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$4") == 11); //max(field_i)
assert(tuple.getDouble("EXPR$5") == 9.5D); //avg(field_i)
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), " +
"cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s " +
"having ((sum(field_i) = 19) AND (min(field_i) = 8))");
@ -1809,7 +1807,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$4") == 11); //max(field_i)
assert(tuple.getDouble("EXPR$5") == 9.5D); //avg(field_i)
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), " +
"cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s " +
"having ((sum(field_i) = 19) AND (min(field_i) = 100))");
@ -2224,7 +2222,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
indexr("id", "8", "year_i", "2014", "month_i", "4", "day_i", "2", "item_i", "1");
commit();
SolrParams sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
SolrParams sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select year_i, sum(item_i) from collection1 group by year_i order by year_i desc");
SolrStream solrStream = new SolrStream(jetty.url, sParams);
@ -2243,7 +2241,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getLong("year_i") == 2014);
assert(tuple.getDouble("EXPR$1") == 7); //sum(item_i)
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select year_i, month_i, sum(item_i) from collection1 group by year_i, month_i " +
"order by year_i desc, month_i desc");
@ -2270,7 +2268,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$2") == 7); //sum(item_i)
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
"stmt", "select year_i, month_i, day_i, sum(item_i) from collection1 group by year_i, month_i, day_i " +
"order by year_i desc, month_i desc, day_i desc");

View File

@ -17,6 +17,7 @@
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.sql.Array;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
@ -88,6 +89,7 @@ public class JDBCStream extends TupleStream implements Expressible {
private ResultSetValueSelector[] valueSelectors;
protected ResultSet resultSet;
protected transient StreamContext streamContext;
protected String sep = Character.toString((char)31);
public JDBCStream(String connectionUrl, String sqlQuery, StreamComparator definedSort) throws IOException {
this(connectionUrl, sqlQuery, definedSort, null, null);
@ -231,12 +233,20 @@ public class JDBCStream extends TupleStream implements Expressible {
final String columnName = metadata.getColumnLabel(columnNumber);
String className = metadata.getColumnClassName(columnNumber);
String typeName = metadata.getColumnTypeName(columnNumber);
if(directSupportedTypes.contains(className)){
valueSelectors[columnIdx] = new ResultSetValueSelector() {
public Object selectValue(ResultSet resultSet) throws SQLException {
Object obj = resultSet.getObject(columnNumber);
if(resultSet.wasNull()){ return null; }
if(obj instanceof String) {
String s = (String)obj;
if(s.indexOf(sep) > -1) {
s = s.substring(1);
return s.split(sep);
}
}
return obj;
}
public String getColumnName() {
@ -276,6 +286,22 @@ public class JDBCStream extends TupleStream implements Expressible {
return columnName;
}
};
} else if(Array.class.getName().equals(className)) {
valueSelectors[columnIdx] = new ResultSetValueSelector() {
public Object selectValue(ResultSet resultSet) throws SQLException {
Object o = resultSet.getObject(columnNumber);
if(resultSet.wasNull()){ return null; }
if(o instanceof Array) {
Array array = (Array)o;
return array.getArray();
} else {
return o;
}
}
public String getColumnName() {
return columnName;
}
};
} else {
throw new SQLException(String.format(Locale.ROOT,
"Unable to determine the valueSelector for column '%s' (col #%d) of java class '%s' and type '%s'",