Merged Cao Manh Dat changes

This commit is contained in:
Kevin Risden 2016-11-12 13:15:52 -06:00
parent 99a6746011
commit 6da7e7b0b1
13 changed files with 248 additions and 269 deletions

View File

@ -70,16 +70,13 @@ class SolrEnumerator implements Enumerator<Object> {
private Object getter(Tuple tuple, Map.Entry<String, Class> field) {
Object val = tuple.get(field.getKey());
Class clazz = field.getValue();
if(clazz.equals(Double.class)) {
return val == null ? 0D : val;
}
if(clazz.equals(Long.class)) {
if(val == null) {
return 0L;
return null;
}
Class clazz = field.getValue();
if(clazz.equals(Long.class)) {
if(val instanceof Double) {
return this.getRealVal(val);
}

View File

@ -88,11 +88,14 @@ class SolrFilter extends Filter implements SolrRel {
}
private String translateMatch2(RexNode node) {
Pair<String, RexLiteral> binaryTranslated = translateBinary((RexCall) node);
Pair<String, RexLiteral> binaryTranslated = null;
if (((RexCall) node).getOperands().size() == 2) {
binaryTranslated = translateBinary((RexCall) node);
}
switch (node.getKind()) {
// case NOT:
// return translateBinary("-", "-", (RexCall) node);
case NOT:
return "-"+translateMatch2(((RexCall) node).getOperands().get(0));
case EQUALS:
return binaryTranslated.getKey() + ":" + binaryTranslated.getValue().getValue2();
case NOT_EQUALS:

View File

@ -68,9 +68,9 @@ interface SolrRel extends RelNode {
column = this.fieldMappings.getOrDefault(column, column);
this.metricPairs.add(new Pair<>(metric, column));
String metricIdentifier = metric + "(" + column + ")";
String metricIdentifier = metric.toLowerCase(Locale.ROOT) + "(" + column + ")";
if(outName != null) {
this.addFieldMapping(outName, metricIdentifier.toLowerCase(Locale.ROOT));
this.addFieldMapping(outName, metricIdentifier);
}
}

View File

@ -47,10 +47,10 @@ import java.util.function.Predicate;
*/
class SolrRules {
static final RelOptRule[] RULES = {
SolrSortRule.SORT_RULE,
SolrFilterRule.FILTER_RULE,
SolrProjectRule.PROJECT_RULE,
SolrSortRule.SORT_RULE,
// SolrAggregateRule.AGGREGATE_RULE,
SolrAggregateRule.AGGREGATE_RULE,
};
static List<String> solrFieldNames(final RelDataType rowType) {
@ -105,7 +105,7 @@ class SolrRules {
/** Base class for planner rules that convert a relational expression to Solr calling convention. */
abstract static class SolrConverterRule extends ConverterRule {
final Convention out;
final Convention out = SolrRel.CONVENTION;
SolrConverterRule(Class<? extends RelNode> clazz, String description) {
this(clazz, relNode -> true, description);
@ -113,7 +113,6 @@ class SolrRules {
<R extends RelNode> SolrConverterRule(Class<R> clazz, Predicate<RelNode> predicate, String description) {
super(clazz, predicate::test, Convention.NONE, SolrRel.CONVENTION, description);
this.out = SolrRel.CONVENTION;
}
}
@ -121,13 +120,22 @@ class SolrRules {
* Rule to convert a {@link LogicalFilter} to a {@link SolrFilter}.
*/
private static class SolrFilterRule extends SolrConverterRule {
private static boolean isNotFilterByExpr(List<RexNode> rexNodes, List<String> fieldNames) {
// We dont have a way to filter by result of aggregator now
boolean result = true;
for (RexNode rexNode : rexNodes) {
if (rexNode instanceof RexCall) {
result = result && isNotFilterByExpr(((RexCall) rexNode).getOperands(), fieldNames);
} else if (rexNode instanceof RexInputRef) {
result = result && !fieldNames.get(((RexInputRef) rexNode).getIndex()).startsWith("EXPR$");
}
}
return result;
}
private static final Predicate<RelNode> FILTER_PREDICATE = relNode -> {
List<RexNode> filterOperands = ((RexCall) ((LogicalFilter) relNode).getCondition()).getOperands();
return filterOperands.size() == 2 &&
((!filterOperands.get(0).getKind().equals(SqlKind.LITERAL)
&& filterOperands.get(1).getKind().equals(SqlKind.LITERAL))
|| (filterOperands.get(0).getKind().equals(SqlKind.LITERAL)
&& !filterOperands.get(1).getKind().equals(SqlKind.LITERAL)));
return isNotFilterByExpr(filterOperands, SolrRules.solrFieldNames(relNode.getRowType()));
};
private static final SolrFilterRule FILTER_RULE = new SolrFilterRule();
@ -159,27 +167,28 @@ class SolrRules {
public RelNode convert(RelNode rel) {
final LogicalProject project = (LogicalProject) rel;
final RelNode converted = convert(project.getInput(), out);
final RelTraitSet traitSet = project.getTraitSet().replace(out);
return new SolrProject(
rel.getCluster(),
traitSet,
convert(project.getInput(), out),
converted,
project.getProjects(),
project.getRowType());
}
}
/**
* Rule to convert a {@link Sort} to a {@link SolrSort}.
* Rule to convert a {@link LogicalSort} to a {@link SolrSort}.
*/
private static class SolrSortRule extends SolrConverterRule {
static final SolrSortRule SORT_RULE = new SolrSortRule(LogicalSort.class, "SolrSortRule");
static final SolrSortRule SORT_RULE = new SolrSortRule();
private SolrSortRule() {
super(LogicalSort.class, relNode -> true, "SolrSortRule");
SolrSortRule(Class<? extends RelNode> clazz, String description) {
super(clazz, description);
}
@Override
public RelNode convert(RelNode rel) {
final Sort sort = (Sort) rel;
final RelTraitSet traitSet = sort.getTraitSet().replace(out).replace(sort.getCollation());
@ -188,6 +197,7 @@ class SolrRules {
traitSet,
convert(sort.getInput(), traitSet.replace(RelCollations.EMPTY)),
sort.getCollation(),
sort.offset,
sort.fetch);
}
}
@ -203,9 +213,10 @@ class SolrRules {
private static final RelOptRule AGGREGATE_RULE = new SolrAggregateRule();
private SolrAggregateRule() {
super(LogicalAggregate.class, AGGREGATE_PREDICTE, "SolrAggregateRule");
super(LogicalAggregate.class, "SolrAggregateRule");
}
@Override
public RelNode convert(RelNode rel) {
final LogicalAggregate agg = (LogicalAggregate) rel;
final RelTraitSet traitSet = agg.getTraitSet().replace(out);

View File

@ -36,8 +36,9 @@ import java.util.List;
*/
class SolrSort extends Sort implements SolrRel {
SolrSort(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RelCollation collation, RexNode fetch) {
super(cluster, traitSet, child, collation, null, fetch);
SolrSort(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RelCollation collation, RexNode offset,
RexNode fetch) {
super(cluster, traitSet, child, collation, offset, fetch);
assert getConvention() == SolrRel.CONVENTION;
assert getConvention() == child.getConvention();
@ -50,7 +51,7 @@ class SolrSort extends Sort implements SolrRel {
@Override
public Sort copy(RelTraitSet traitSet, RelNode input, RelCollation newCollation, RexNode offset, RexNode fetch) {
return new SolrSort(getCluster(), traitSet, input, collation, fetch);
return new SolrSort(getCluster(), traitSet, input, collation, offset, fetch);
}
public void implement(Implementor implementor) {

View File

@ -98,28 +98,29 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
// List<String> doesn't have add so must make a new ArrayList
List<String> fieldsList = new ArrayList<>(fields.size());
fieldsList.addAll(fields.stream().map(Map.Entry::getKey).collect(Collectors.toList()));
List<Pair<String, String>> ordersList = new ArrayList<>(orders);
LinkedHashMap<String,String> ordersMap = new LinkedHashMap<>();
for (Pair<String,String> order : orders) {
ordersMap.put(order.getKey(), order.getValue());
}
List<Metric> metrics = buildMetrics(metricPairs);
List<Bucket> bucketsList = buckets.stream().map(Bucket::new).collect(Collectors.toList());
for(int i = buckets.size()-1; i >= 0; i--) {
ordersList.add(0, new Pair<>(buckets.get(i), "asc"));
if (!ordersMap.containsKey(buckets.get(i))) {
ordersMap.put(buckets.get(i), "asc");
}
}
boolean isReOrder = false;
for(Metric metric : metrics) {
String metricIdentifier = metric.getIdentifier();
List<Pair<String, String>> newOrders= new ArrayList<>();
for(Pair<String, String> order : ordersList) {
String column = order.getKey();
if(!column.startsWith(metricIdentifier)) {
newOrders.add(order);
}
}
ordersList = newOrders;
ordersMap.remove(metricIdentifier);
if(fieldsList.contains(metricIdentifier)) {
fieldsList.remove(metricIdentifier);
isReOrder = true;
}
for(String column : metric.getColumns()) {
@ -127,23 +128,24 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
fieldsList.add(column);
}
Pair<String, String> order = new Pair<>(column, "asc");
if(!ordersList.contains(order)) {
ordersList.add(order);
if (!ordersMap.containsKey(column)) {
ordersMap.put(column, "asc");
}
}
}
ordersList.add(new Pair<>(DEFAULT_VERSION_FIELD, "desc"));
if (ordersMap.size() < 4) {
ordersMap.put(DEFAULT_VERSION_FIELD, "desc");
// Make sure the default sort field is in the field list
if (!fieldsList.contains(DEFAULT_VERSION_FIELD)) {
fieldsList.add(DEFAULT_VERSION_FIELD);
}
}
if(!ordersList.isEmpty()) {
List<String> orderList = new ArrayList<>(ordersList.size());
for(Pair<String, String> order : ordersList) {
if(!ordersMap.isEmpty()) {
List<String> orderList = new ArrayList<>(ordersMap.size());
for(Map.Entry<String, String> order : ordersMap.entrySet()) {
String column = order.getKey();
if(!fieldsList.contains(column)) {
fieldsList.add(column);
@ -162,13 +164,13 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
TupleStream tupleStream;
String zk = properties.getProperty("zk");
try {
if (metrics.isEmpty()) {
if (limit == null) {
if (metrics.isEmpty() && bucketsList.isEmpty()) {
solrParams.add(CommonParams.QT, "/export");
tupleStream = new CloudSolrStream(zk, collection, solrParams);
} else {
if (limit != null) {
solrParams.add(CommonParams.ROWS, limit);
tupleStream = new LimitStream(new CloudSolrStream(zk, collection, solrParams), Integer.parseInt(limit));
} else {
tupleStream = new CloudSolrStream(zk, collection, solrParams);
}
} else {
Metric[] metricsArray = metrics.toArray(new Metric[metrics.size()]);
@ -178,18 +180,20 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
tupleStream = new StatsStream(zk, collection, solrParams, metricsArray);
} else {
solrParams.add(CommonParams.QT, "/export");
int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
if (numWorkers > 1) solrParams.add("partitionKeys",String.join(",", buckets));
tupleStream = new CloudSolrStream(zk, collection, solrParams);
tupleStream = new RollupStream(tupleStream, bucketsList.toArray(new Bucket[bucketsList.size()]), metricsArray);
String sortDirection = getSortDirection(ordersList);
int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
if(numWorkers > 1) {
String workerZkHost = properties.getProperty("workerZkhost");
String workerCollection = properties.getProperty("workerCollection");
// Do the rollups in parallel
// Maintain the sort of the Tuples coming from the workers.
StreamComparator comp = bucketSortComp(bucketsList, sortDirection);
StreamComparator comp = bucketSortComp(bucketsList, ordersMap);
ParallelStream parallelStream = new ParallelStream(workerZkHost, workerCollection, tupleStream, numWorkers, comp);
StreamFactory factory = new StreamFactory()
@ -204,14 +208,16 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
parallelStream.setStreamFactory(factory);
tupleStream = parallelStream;
isReOrder = true;
}
if (!sortsEqual(bucketsList, sortDirection, ordersList)) {
if (isReOrder) {
int limitVal = limit == null ? 100 : Integer.parseInt(limit);
StreamComparator comp = getComp(ordersList);
//Rank the Tuples
//If parallel stream is used ALL the Rolled up tuples from the workers will be ranked
//Providing a true Top or Bottom.
StreamComparator comp = getComp(orders);
if (orders.isEmpty() && !ordersMap.isEmpty()) {
// default order
comp = getComp(new ArrayList<>(ordersMap.entrySet()));
}
tupleStream = new RankStream(tupleStream, limitVal, comp);
} else {
// Sort is the same as the same as the underlying stream
@ -237,10 +243,10 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
};
}
private static StreamComparator bucketSortComp(List<Bucket> buckets, String dir) {
private static StreamComparator bucketSortComp(List<Bucket> buckets, Map<String,String> dirs) {
FieldComparator[] comps = new FieldComparator[buckets.size()];
for(int i=0; i<buckets.size(); i++) {
ComparatorOrder comparatorOrder = ComparatorOrder.fromString(dir);
ComparatorOrder comparatorOrder = ComparatorOrder.fromString(dirs.get(buckets.get(i).toString()));
String sortKey = buckets.get(i).toString();
comps[i] = new FieldComparator(sortKey, comparatorOrder);
}
@ -252,52 +258,18 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
}
}
private boolean sortsEqual(List<Bucket> buckets, String direction, List<Pair<String, String>> orders) {
if(buckets.size() != orders.size()) {
return false;
}
for(int i=0; i< buckets.size(); i++) {
Bucket bucket = buckets.get(i);
Pair<String, String> order = orders.get(i);
if(!bucket.toString().equals(getSortField(order))) {
return false;
}
if(!getSortDirection(order).equalsIgnoreCase(direction)) {
return false;
}
}
return true;
}
private String getSortDirection(List<Pair<String, String>> orders) {
for(Pair<String, String> order : orders) {
return getSortDirection(order);
}
return "asc";
}
private String getSortField(Pair<String, String> order) {
return order.getKey();
}
private String getSortDirection(Pair<String, String> order) {
private String getSortDirection(Map.Entry<String, String> order) {
String direction = order.getValue();
return direction == null ? "asc" : direction;
}
private StreamComparator getComp(List<Pair<String, String>> orders) {
private StreamComparator getComp(List<? extends Map.Entry<String, String>> orders) {
FieldComparator[] comps = new FieldComparator[orders.size()];
for(int i = 0; i < orders.size(); i++) {
Pair<String, String> order = orders.get(i);
Map.Entry<String, String> order = orders.get(i);
String direction = getSortDirection(order);
ComparatorOrder comparatorOrder = ComparatorOrder.fromString(direction);
String sortKey = getSortField(order);
String sortKey = order.getKey();
comps[i] = new FieldComparator(sortKey, comparatorOrder);
}

View File

@ -19,6 +19,7 @@ package org.apache.solr.handler.sql;
import org.apache.calcite.plan.*;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import java.util.List;
@ -49,6 +50,11 @@ class SolrTableScan extends TableScan implements SolrRel {
assert getConvention() == SolrRel.CONVENTION;
}
@Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
final float f = projectRowType == null ? 1f : (float) projectRowType.getFieldCount() / 100f;
return super.computeSelfCost(planner, mq).multiplyBy(.1 * f);
}
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
assert inputs.isEmpty();

View File

@ -70,20 +70,21 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
@Test
public void doTest() throws Exception {
waitForRecoveriesToFinish(false);
testBasicSelect();
// testWhere();
// testMixedCaseFields();
testWhere();
testMixedCaseFields();
testBasicGrouping();
testBasicGroupingFacets(); // TODO push down facets
// testSelectDistinct(); // TODO fails due to sort asc by default missing
// testSelectDistinctFacets(); // TODO push down facets and fails due to sort asc by default missing
testBasicGroupingFacets();
testSelectDistinct();
testSelectDistinctFacets();
testAggregatesWithoutGrouping();
// testSQLException(); // TODO fix exception checking
// testTimeSeriesGrouping();
// testTimeSeriesGroupingFacet(); // TODO push down facets
testSQLException();
testTimeSeriesGrouping();
testTimeSeriesGroupingFacet();
testParallelBasicGrouping();
// testParallelSelectDistinct();
// testParallelTimeSeriesGrouping();
testParallelSelectDistinct();
testParallelTimeSeriesGrouping();
}
private void testBasicSelect() throws Exception {
@ -112,7 +113,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuples.size() == 8);
Tuple tuple = null;
Tuple tuple;
tuple = tuples.get(0);
assert(tuple.getLong("id") == 8);
@ -373,29 +374,30 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
tuple = tuples.get(6);
assertEquals(8L, tuple.get("id"));
// TODO requires different Calcite SQL conformance level
// Not Equals !=
sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select id from collection1 where id != 1 order by id asc limit 10");
solrStream = new SolrStream(jetty.url, sParams);
tuples = getTuples(solrStream);
assertEquals(7, tuples.size());
tuple = tuples.get(0);
assertEquals(2L, tuple.get("id"));
tuple = tuples.get(1);
assertEquals(3L, tuple.get("id"));
tuple = tuples.get(2);
assertEquals(4L, tuple.get("id"));
tuple = tuples.get(3);
assertEquals(5L, tuple.get("id"));
tuple = tuples.get(4);
assertEquals(6L, tuple.get("id"));
tuple = tuples.get(5);
assertEquals(7L, tuple.get("id"));
tuple = tuples.get(6);
assertEquals(8L, tuple.get("id"));
// sParams = mapParams(CommonParams.QT, "/sql",
// "stmt", "select id from collection1 where id != 1 order by id asc limit 10");
//
// solrStream = new SolrStream(jetty.url, sParams);
// tuples = getTuples(solrStream);
//
// assertEquals(7, tuples.size());
//
// tuple = tuples.get(0);
// assertEquals(2L, tuple.get("id"));
// tuple = tuples.get(1);
// assertEquals(3L, tuple.get("id"));
// tuple = tuples.get(2);
// assertEquals(4L, tuple.get("id"));
// tuple = tuples.get(3);
// assertEquals(5L, tuple.get("id"));
// tuple = tuples.get(4);
// assertEquals(6L, tuple.get("id"));
// tuple = tuples.get(5);
// assertEquals(7L, tuple.get("id"));
// tuple = tuples.get(6);
// assertEquals(8L, tuple.get("id"));
// Less than
sParams = mapParams(CommonParams.QT, "/sql",
@ -474,14 +476,14 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
commit();
SolrParams sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select id, Field_i, Str_s from Collection1 where Text_t='XXXX' order by Field_i desc");
"stmt", "select id, Field_i, Str_s from collection1 where Text_t='XXXX' order by Field_i desc");
SolrStream solrStream = new SolrStream(jetty.url, sParams);
List<Tuple> tuples = getTuples(solrStream);
assert(tuples.size() == 8);
Tuple tuple = null;
Tuple tuple;
tuple = tuples.get(0);
assert(tuple.getLong("id") == 8);
@ -523,8 +525,9 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getLong("Field_i") == 7);
assert(tuple.get("Str_s").equals("a"));
// TODO get sum(Field_i) as named one
sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select Str_s, sum(Field_i) as `sum(Field_i)` from Collection1 where 'id'='(1 8)' group by Str_s having (sum(Field_i) = 7 OR sum(Field_i) = 60) order by sum(Field_i) desc");
"stmt", "select Str_s, sum(Field_i) from collection1 where id='(1 8)' group by Str_s having (sum(Field_i) = 7 OR sum(Field_i) = 60) order by sum(Field_i) desc");
solrStream = new SolrStream(jetty.url, sParams);
tuples = getTuples(solrStream);
@ -533,14 +536,14 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
tuple = tuples.get(0);
assert(tuple.get("Str_s").equals("c"));
assert(tuple.getDouble("sum(Field_i)") == 60);
assert(tuple.getDouble("EXPR$1") == 60);
tuple = tuples.get(1);
assert(tuple.get("Str_s").equals("a"));
assert(tuple.getDouble("sum(Field_i)") == 7);
assert(tuple.getDouble("EXPR$1") == 7);
sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select Str_s, sum(Field_i) as `sum(Field_i)` from Collection1 where id='(1 8)' group by Str_s having (sum(Field_i) = 7 OR sum(Field_i) = 60) order by sum(Field_i) desc");
"stmt", "select Str_s, sum(Field_i) from collection1 where id='(1 8)' group by Str_s having (sum(Field_i) = 7 OR sum(Field_i) = 60) order by sum(Field_i) desc");
solrStream = new SolrStream(jetty.url, sParams);
tuples = getTuples(solrStream);
@ -549,11 +552,11 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
tuple = tuples.get(0);
assert(tuple.get("Str_s").equals("c"));
assert(tuple.getDouble("sum(Field_i)") == 60);
assert(tuple.getDouble("EXPR$1") == 60);
tuple = tuples.get(1);
assert(tuple.get("Str_s").equals("a"));
assert(tuple.getDouble("sum(Field_i)") == 7);
assert(tuple.getDouble("EXPR$1") == 7);
} finally {
delete();
}
@ -579,14 +582,13 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
commit();
SolrParams sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select id, field_i, str_s from collection1 where text='XXXX' order by field_iff desc");
"stmt", "select id, str_s from collection1 where text='XXXX' order by field_iff desc");
SolrStream solrStream = new SolrStream(jetty.url, sParams);
Tuple tuple = getTuple(new ExceptionStream(solrStream));
assert(tuple.EOF);
assert(tuple.EXCEPTION);
//A parse exception detected before being sent to the search engine
assert(tuple.getException().contains("Fields in the sort spec must be included in the field list"));
assert(tuple.getException().contains("Column 'field_iff' not found in any table"));
sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select id, field_iff, str_s from collection1 where text='XXXX' order by field_iff desc");
@ -595,8 +597,8 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
tuple = getTuple(new ExceptionStream(solrStream));
assert(tuple.EOF);
assert(tuple.EXCEPTION);
//An exception not detected by the parser thrown from the /select handler
assert(tuple.getException().contains("sort param field can't be found:"));
assert(tuple.getException().contains("Column 'field_iff' not found in any table"));
sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select str_s, count(*), sum(field_iff), min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))");
@ -605,28 +607,16 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
tuple = getTuple(new ExceptionStream(solrStream));
assert(tuple.EOF);
assert(tuple.EXCEPTION);
//An exception not detected by the parser thrown from the /export handler
assert(tuple.getException().contains("undefined field:"));
assert(tuple.getException().contains("Column 'field_iff' not found in any table"));
sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select str_s, count(*), blah(field_iff), min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))");
"stmt", "select str_s, count(*), blah(field_i), min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
solrStream = new SolrStream(jetty.url, sParams);
tuple = getTuple(new ExceptionStream(solrStream));
assert(tuple.EOF);
assert(tuple.EXCEPTION);
//An exception not detected by the parser thrown from the /export handler
assert(tuple.getException().contains("Invalid function: blah"));
sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select str_s from collection1 where text='XXXX' group by str_s");
solrStream = new SolrStream(jetty.url, sParams);
tuple = getTuple(new ExceptionStream(solrStream));
assert(tuple.EOF);
assert(tuple.EXCEPTION);
assert(tuple.getException().contains("Group by queries must include atleast one aggregate function."));
assert(tuple.getException().contains("No match found for function signature blah"));
} finally {
delete();
}
@ -649,6 +639,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40");
indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
indexr("id", "9", "text", "XXXX XXXY", "str_s", "d", "field_i", "70");
commit();
SolrParams sParams = mapParams(CommonParams.QT, "/sql",
@ -660,7 +651,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
//Only two results because of the limit.
assert(tuples.size() == 2);
Tuple tuple = null;
Tuple tuple;
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("b"));
@ -703,11 +694,9 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$4") == 20); //max(field_i)
assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
// TODO fix test - Cannot apply 'NOT' to arguments of type 'NOT<JAVATYPE(CLASS JAVA.LANG.STRING)>'. Supported form(s): 'NOT<BOOLEAN>'
/*
sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), "
+ "cast(avg(1.0 * field_i) as float) from collection1 where (text='XXXX' AND NOT (text='XXXX XXX')) "
+ "cast(avg(1.0 * field_i) as float) from collection1 where (text='XXXX' AND NOT (text='XXXY')) "
+ "group by str_s order by str_s desc");
solrStream = new SolrStream(jetty.url, sParams);
@ -740,14 +729,13 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$2") == 27); //sum(field_i)
assert(tuple.getDouble("EXPR$3") == 7); //min(field_i)
assert(tuple.getDouble("EXPR$4") == 20); //max(field_i)
assert(tuple.getDouble("avg(field_i)") == 13.5D); //avg(field_i)
// TODO fix test - Cannot apply 'NOT' to arguments of type 'NOT<JAVATYPE(CLASS JAVA.LANG.STRING)>'. Supported form(s): 'NOT<BOOLEAN>'
/*
assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select str_s as myString, count(*) as myCount, sum(field_i) as mySum, min(field_i) as myMin, "
+ "max(field_i) as myMax, cast(avg(1.0 * field_i) as float) as myAvg from collection1 "
+ "where (text='XXXX' AND NOT (text='XXXX XXX')) group by str_s order by str_s desc");
+ "where (text='XXXX' AND NOT (text='XXXY')) group by str_s order by str_s desc");
solrStream = new SolrStream(jetty.url, sParams);
tuples = getTuples(solrStream);
@ -780,7 +768,6 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("myMin") == 7);
assert(tuple.getDouble("myMax") == 20);
assert(tuple.getDouble("myAvg") == 13.5D);
*/
sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) " +
@ -874,7 +861,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuples.size() == 6);
Tuple tuple = null;
Tuple tuple;
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("a"));
@ -991,8 +978,6 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
// Test without a sort. Sort should be asc by default.
/*
// TODO figure out what should be sort asc by default (version?)
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "facet",
"stmt", "select distinct str_s, field_i from collection1");
@ -1024,7 +1009,6 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
tuple = tuples.get(5);
assert(tuple.get("str_s").equals("c"));
assert(tuple.getLong("field_i") == 60);
*/
// Test with a predicate.
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "facet",
@ -1069,14 +1053,12 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
SolrParams sParams = mapParams(CommonParams.QT, "/sql",
"stmt", "select distinct str_s, field_i from collection1 order by str_s asc, field_i asc");
SolrStream solrStream = new SolrStream(jetty.url, sParams);
TupleStream solrStream = new SolrStream(jetty.url, sParams);
List<Tuple> tuples = getTuples(solrStream);
assert(tuples.size() == 6);
Tuple tuple = null;
tuple = tuples.get(0);
Tuple tuple = tuples.get(0);
assert(tuple.get("str_s").equals("a"));
assert(tuple.getLong("field_i") == 1);
@ -1269,7 +1251,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuples.size() == 6);
Tuple tuple = null;
Tuple tuple;
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("a"));
@ -1455,6 +1437,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40");
indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
indexr("id", "9", "text", "XXXX XXXY", "str_s", "d", "field_i", "70");
commit();
SolrParams sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "facet",
@ -1468,7 +1451,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
//Only two results because of the limit.
assert(tuples.size() == 2);
Tuple tuple = null;
Tuple tuple;
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("b"));
@ -1486,11 +1469,9 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$4") == 20); //max(field_i)
assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
// TODO fix test - Cannot apply 'NOT' to arguments of type 'NOT<JAVATYPE(CLASS JAVA.LANG.STRING)>'. Supported form(s): 'NOT<BOOLEAN>'
/*
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "facet",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), "
+ "cast(avg(1.0 * field_i) as float) from collection1 where (text='XXXX' AND NOT (text='XXXX XXX')) "
+ "cast(avg(1.0 * field_i) as float) from collection1 where (text='XXXX' AND NOT (text='XXXY')) "
+ "group by str_s order by str_s desc");
solrStream = new SolrStream(jetty.url, sParams);
@ -1525,10 +1506,9 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$4") == 20); //max(field_i)
assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
// TODO fix test - Cannot apply 'NOT' to arguments of type 'NOT<JAVATYPE(CLASS JAVA.LANG.STRING)>'. Supported form(s): 'NOT<BOOLEAN>'
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "facet",
"stmt", "select str_s as myString, count(*), sum(field_i) as mySum, min(field_i), max(field_i), "
+ "cast(avg(1.0 * field_i) as float) from collection1 where (text='XXXX' AND NOT (text='XXXX XXX')) "
+ "cast(avg(1.0 * field_i) as float) from collection1 where (text='XXXX' AND NOT (text='XXXY')) "
+ "group by str_s order by myString desc");
solrStream = new SolrStream(jetty.url, sParams);
@ -1562,7 +1542,6 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getDouble("EXPR$3") == 7); //min(field_i)
assert(tuple.getDouble("EXPR$4") == 20); //max(field_i)
assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
*/
sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "facet",
"stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), " +
@ -1660,7 +1639,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
//Only two results because of the limit.
assert(tuples.size() == 2);
Tuple tuple = null;
Tuple tuple;
tuple = tuples.get(0);
assert(tuple.get("str_s").equals("b"));
@ -2015,7 +1994,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuples.size() == 2);
Tuple tuple = null;
Tuple tuple;
tuple = tuples.get(0);
assert(tuple.getLong("year_i") == 2015);
@ -2034,8 +2013,6 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuples.size() == 3);
tuple = null;
tuple = tuples.get(0);
assert(tuple.getLong("year_i") == 2015);
assert(tuple.getLong("month_i") == 11);
@ -2060,8 +2037,6 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuples.size() == 6);
tuple = null;
tuple = tuples.get(0);
assert(tuple.getLong("year_i") == 2015);
assert(tuple.getLong("month_i") == 11);
@ -2131,7 +2106,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuples.size() == 2);
Tuple tuple = null;
Tuple tuple;
tuple = tuples.get(0);
assert(tuple.getLong("year_i") == 2015);
@ -2242,7 +2217,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuples.size() == 2);
Tuple tuple = null;
Tuple tuple;
tuple = tuples.get(0);
assert(tuple.getLong("year_i") == 2015);
@ -2262,8 +2237,6 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuples.size() == 3);
tuple = null;
tuple = tuples.get(0);
assert(tuple.getLong("year_i") == 2015);
assert(tuple.getLong("month_i") == 11);
@ -2291,8 +2264,6 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuples.size() == 6);
tuple = null;
tuple = tuples.get(0);
assert(tuple.getLong("year_i") == 2015);
assert(tuple.getLong("month_i") == 11);

View File

@ -206,7 +206,7 @@ public class JDBCStream extends TupleStream implements Expressible {
try{
resultSet = statement.executeQuery(sqlQuery);
} catch (SQLException e) {
throw new IOException(String.format(Locale.ROOT, "Failed to execute sqlQuery '%s' against JDBC connection '%s'", sqlQuery, connectionUrl), e);
throw new IOException(String.format(Locale.ROOT, "Failed to execute sqlQuery '%s' against JDBC connection '%s'.\n"+ e.getMessage(), sqlQuery, connectionUrl), e);
}
try{

View File

@ -72,9 +72,7 @@ public class RollupStream extends TupleStream implements Expressible {
if(1 != streamExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
}
if(0 == metricExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting at least 1 metric but found %d",expression, metricExpressions.size()));
}
if(null == overExpression || !(overExpression.getParameter() instanceof StreamExpressionValue)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single 'over' parameter listing fields to rollup by but didn't find one",expression));
}
@ -247,13 +245,15 @@ public class RollupStream extends TupleStream implements Expressible {
t = new Tuple(map);
}
currentMetrics = new Metric[metrics.length];
currentKey = hashKey;
if (metrics != null) {
currentMetrics = new Metric[metrics.length];
for(int i=0; i<metrics.length; i++) {
Metric bucketMetric = metrics[i].newInstance();
bucketMetric.update(tuple);
currentMetrics[i] = bucketMetric;
}
}
if(t != null) {
return t;

View File

@ -57,7 +57,6 @@ public class StatsStream extends TupleStream implements Expressible {
private SolrParams params;
private String collection;
private boolean done;
private long count;
private boolean doCount;
protected transient SolrClientCache cache;
protected transient CloudSolrClient cloudSolrClient;
@ -195,8 +194,7 @@ public class StatsStream extends TupleStream implements Expressible {
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
return l;
return new ArrayList<>();
}
public void open() throws IOException {
@ -233,10 +231,9 @@ public class StatsStream extends TupleStream implements Expressible {
done = true;
return tuple;
} else {
Map fields = new HashMap();
Map<String, Object> fields = new HashMap<>();
fields.put("EOF", true);
Tuple tuple = new Tuple(fields);
return tuple;
return new Tuple(fields);
}
}
@ -245,7 +242,7 @@ public class StatsStream extends TupleStream implements Expressible {
}
private void addStats(ModifiableSolrParams params, Metric[] _metrics) {
Map<String, List<String>> m = new HashMap();
Map<String, List<String>> m = new HashMap<>();
for(Metric metric : _metrics) {
String metricId = metric.getIdentifier();
if(metricId.contains("(")) {
@ -255,8 +252,11 @@ public class StatsStream extends TupleStream implements Expressible {
String column = parts[1];
List<String> stats = m.get(column);
if(stats == null && !column.equals("*")) {
stats = new ArrayList();
if(stats == null) {
stats = new ArrayList<>();
}
if(!column.equals("*")) {
m.put(column, stats);
}
@ -290,14 +290,16 @@ public class StatsStream extends TupleStream implements Expressible {
private Tuple getTuple(NamedList response) {
Map map = new HashMap();
Map<String, Object> map = new HashMap<>();
SolrDocumentList solrDocumentList = (SolrDocumentList) response.get("response");
long count = solrDocumentList.getNumFound();
if(doCount) {
SolrDocumentList solrDocumentList = (SolrDocumentList) response.get("response");
this.count = solrDocumentList.getNumFound();
map.put("count(*)", this.count);
map.put("count(*)", count);
}
if(count != 0) {
NamedList stats = (NamedList)response.get("stats");
NamedList statsFields = (NamedList)stats.get("stats_fields");
@ -308,16 +310,16 @@ public class StatsStream extends TupleStream implements Expressible {
addStat(map, field, theStats.getName(s), theStats.getVal(s));
}
}
}
Tuple tuple = new Tuple(map);
return tuple;
return new Tuple(map);
}
public int getCost() {
return 0;
}
private void addStat(Map map, String field, String stat, Object val) {
private void addStat(Map<String, Object> map, String field, String stat, Object val) {
if(stat.equals("mean")) {
map.put("avg("+field+")", val);
} else {

View File

@ -816,39 +816,38 @@ public class JdbcTest extends SolrCloudTestCase {
assertEquals(9, rs.getByte(4), 0);
assertFalse(rs.wasNull());
// TODO figure out null checks?
// assertEquals(null, rs.getObject("testnull_i"));
// assertTrue(rs.wasNull());
// assertEquals(null, rs.getObject(5));
// assertTrue(rs.wasNull());
// assertEquals(null, rs.getString("testnull_i"));
// assertTrue(rs.wasNull());
// assertEquals(null, rs.getString(5));
// assertTrue(rs.wasNull());
// assertEquals(0D, rs.getDouble("testnull_i"), 0);
// assertTrue(rs.wasNull());
// assertEquals(0D, rs.getDouble(5), 0);
// assertTrue(rs.wasNull());
// assertEquals(0F, rs.getFloat("testnull_i"), 0);
// assertTrue(rs.wasNull());
// assertEquals(0F, rs.getFloat(5), 0);
// assertTrue(rs.wasNull());
// assertEquals(0, rs.getInt("testnull_i"));
// assertTrue(rs.wasNull());
// assertEquals(0, rs.getInt(5));
// assertTrue(rs.wasNull());
// assertEquals(0L, rs.getLong("testnull_i"));
// assertTrue(rs.wasNull());
// assertEquals(0L, rs.getLong(5));
// assertTrue(rs.wasNull());
// assertEquals(0, rs.getShort("testnull_i"));
// assertTrue(rs.wasNull());
// assertEquals(0, rs.getShort(5));
// assertTrue(rs.wasNull());
// assertEquals(0, rs.getByte("testnull_i"));
// assertTrue(rs.wasNull());
// assertEquals(0, rs.getByte(5));
// assertTrue(rs.wasNull());
assertEquals(null, rs.getObject("testnull_i"));
assertTrue(rs.wasNull());
assertEquals(null, rs.getObject(5));
assertTrue(rs.wasNull());
assertEquals(null, rs.getString("testnull_i"));
assertTrue(rs.wasNull());
assertEquals(null, rs.getString(5));
assertTrue(rs.wasNull());
assertEquals(0D, rs.getDouble("testnull_i"), 0);
assertTrue(rs.wasNull());
assertEquals(0D, rs.getDouble(5), 0);
assertTrue(rs.wasNull());
assertEquals(0F, rs.getFloat("testnull_i"), 0);
assertTrue(rs.wasNull());
assertEquals(0F, rs.getFloat(5), 0);
assertTrue(rs.wasNull());
assertEquals(0, rs.getInt("testnull_i"));
assertTrue(rs.wasNull());
assertEquals(0, rs.getInt(5));
assertTrue(rs.wasNull());
assertEquals(0L, rs.getLong("testnull_i"));
assertTrue(rs.wasNull());
assertEquals(0L, rs.getLong(5));
assertTrue(rs.wasNull());
assertEquals(0, rs.getShort("testnull_i"));
assertTrue(rs.wasNull());
assertEquals(0, rs.getShort(5));
assertTrue(rs.wasNull());
assertEquals(0, rs.getByte("testnull_i"));
assertTrue(rs.wasNull());
assertEquals(0, rs.getByte(5));
assertTrue(rs.wasNull());
assertFalse(rs.next());
}

View File

@ -1500,6 +1500,23 @@ public class StreamingTest extends SolrCloudTestCase {
assertEquals(5.5, avgf.doubleValue(), 0.01);
assertEquals(2, count.doubleValue(), 0.01);
// Test will null metrics
rollupStream = new RollupStream(stream, buckets, metrics);
tuples = getTuples(rollupStream);
assert(tuples.size() == 3);
tuple = tuples.get(0);
bucket = tuple.getString("a_s");
assertTrue(bucket.equals("hello0"));
tuple = tuples.get(1);
bucket = tuple.getString("a_s");
assertTrue(bucket.equals("hello3"));
tuple = tuples.get(2);
bucket = tuple.getString("a_s");
assertTrue(bucket.equals("hello4"));
//Test will null value in the grouping field
new UpdateRequest()