mirror of https://github.com/apache/lucene.git
SOLR-8593: Push down the HAVING clause
This commit is contained in:
parent
a9cf1503b5
commit
de512d7402
|
@ -69,8 +69,11 @@ class SolrAggregate extends Aggregate implements SolrRel {
|
|||
|
||||
for(Pair<AggregateCall, String> namedAggCall : getNamedAggCalls()) {
|
||||
|
||||
|
||||
AggregateCall aggCall = namedAggCall.getKey();
|
||||
|
||||
Pair<String, String> metric = toSolrMetric(implementor, aggCall, inNames);
|
||||
implementor.addReverseAggMapping(namedAggCall.getValue(), metric.getKey().toLowerCase()+"("+metric.getValue()+")");
|
||||
implementor.addMetricPair(namedAggCall.getValue(), metric.getKey(), metric.getValue());
|
||||
if(aggCall.getName() == null) {
|
||||
implementor.addFieldMapping(namedAggCall.getValue(),
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.calcite.util.Pair;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Implementation of a {@link org.apache.calcite.rel.core.Filter} relational expression in Solr.
|
||||
|
@ -54,13 +55,18 @@ class SolrFilter extends Filter implements SolrRel {
|
|||
|
||||
public void implement(Implementor implementor) {
|
||||
implementor.visitChild(0, getInput());
|
||||
Translator translator = new Translator(SolrRules.solrFieldNames(getRowType()));
|
||||
String query = translator.translateMatch(condition);
|
||||
implementor.addQuery(query);
|
||||
implementor.setNegativeQuery(translator.negativeQuery);
|
||||
if(getInput() instanceof SolrAggregate) {
|
||||
HavingTranslator translator = new HavingTranslator(SolrRules.solrFieldNames(getRowType()), implementor.reverseAggMappings);
|
||||
String havingPredicate = translator.translateMatch(condition);
|
||||
implementor.setHavingPredicate(havingPredicate);
|
||||
} else {
|
||||
Translator translator = new Translator(SolrRules.solrFieldNames(getRowType()));
|
||||
String query = translator.translateMatch(condition);
|
||||
implementor.addQuery(query);
|
||||
implementor.setNegativeQuery(translator.negativeQuery);
|
||||
}
|
||||
}
|
||||
|
||||
/** Translates {@link RexNode} expressions into Solr query strings. */
|
||||
private static class Translator {
|
||||
|
||||
private final List<String> fieldNames;
|
||||
|
@ -71,11 +77,11 @@ class SolrFilter extends Filter implements SolrRel {
|
|||
}
|
||||
|
||||
private String translateMatch(RexNode condition) {
|
||||
if(condition.getKind().belongsTo(SqlKind.COMPARISON)) {
|
||||
if (condition.getKind().belongsTo(SqlKind.COMPARISON)) {
|
||||
return translateComparison(condition);
|
||||
} else if(condition.isA(SqlKind.AND)) {
|
||||
return "("+translateAnd(condition)+")";
|
||||
} else if(condition.isA(SqlKind.OR)) {
|
||||
} else if (condition.isA(SqlKind.AND)) {
|
||||
return "(" + translateAnd(condition) + ")";
|
||||
} else if (condition.isA(SqlKind.OR)) {
|
||||
return "(" + translateOr(condition) + ")";
|
||||
} else {
|
||||
return null;
|
||||
|
@ -90,8 +96,6 @@ class SolrFilter extends Filter implements SolrRel {
|
|||
return String.join(" OR ", ors);
|
||||
}
|
||||
|
||||
|
||||
|
||||
private String translateAnd(RexNode node0) {
|
||||
List<String> andStrings = new ArrayList();
|
||||
List<String> notStrings = new ArrayList();
|
||||
|
@ -101,18 +105,18 @@ class SolrFilter extends Filter implements SolrRel {
|
|||
RelOptUtil.decomposeConjunction(node0, ands, nots);
|
||||
|
||||
|
||||
for(RexNode node: ands) {
|
||||
for (RexNode node : ands) {
|
||||
andStrings.add(translateMatch(node));
|
||||
}
|
||||
|
||||
String andString = String.join(" AND ", andStrings);
|
||||
|
||||
if(nots.size() > 0) {
|
||||
for(RexNode node: nots) {
|
||||
if (nots.size() > 0) {
|
||||
for (RexNode node : nots) {
|
||||
notStrings.add(translateMatch(node));
|
||||
}
|
||||
String notString = String.join(" NOT ", notStrings);
|
||||
return "("+ andString +") NOT ("+notString+")";
|
||||
return "(" + andString + ") NOT (" + notString + ")";
|
||||
} else {
|
||||
return andString;
|
||||
}
|
||||
|
@ -126,39 +130,41 @@ class SolrFilter extends Filter implements SolrRel {
|
|||
|
||||
switch (node.getKind()) {
|
||||
case NOT:
|
||||
return "-"+translateComparison(((RexCall) node).getOperands().get(0));
|
||||
return "-" + translateComparison(((RexCall) node).getOperands().get(0));
|
||||
case EQUALS:
|
||||
String terms = binaryTranslated.getValue().getValue2().toString().trim();
|
||||
if(!terms.startsWith("(") && !terms.startsWith("[") && !terms.startsWith("{")){
|
||||
terms = "\""+terms+"\"";
|
||||
if (!terms.startsWith("(") && !terms.startsWith("[") && !terms.startsWith("{")) {
|
||||
terms = "\"" + terms + "\"";
|
||||
}
|
||||
|
||||
String clause = binaryTranslated.getKey() + ":" + terms;
|
||||
this.negativeQuery = false;
|
||||
return clause;
|
||||
case NOT_EQUALS:
|
||||
return "-(" + binaryTranslated.getKey() + ":" + binaryTranslated.getValue().getValue2()+")";
|
||||
return "-(" + binaryTranslated.getKey() + ":" + binaryTranslated.getValue().getValue2() + ")";
|
||||
case LESS_THAN:
|
||||
this.negativeQuery = false;
|
||||
return "("+binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " })";
|
||||
return "(" + binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " })";
|
||||
case LESS_THAN_OR_EQUAL:
|
||||
this.negativeQuery = false;
|
||||
return "("+binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " ])";
|
||||
return "(" + binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " ])";
|
||||
case GREATER_THAN:
|
||||
this.negativeQuery = false;
|
||||
return "("+binaryTranslated.getKey() + ": { " + binaryTranslated.getValue().getValue2() + " TO * ])";
|
||||
return "(" + binaryTranslated.getKey() + ": { " + binaryTranslated.getValue().getValue2() + " TO * ])";
|
||||
case GREATER_THAN_OR_EQUAL:
|
||||
this.negativeQuery = false;
|
||||
return "("+binaryTranslated.getKey() + ": [ " + binaryTranslated.getValue().getValue2() + " TO * ])";
|
||||
return "(" + binaryTranslated.getKey() + ": [ " + binaryTranslated.getValue().getValue2() + " TO * ])";
|
||||
default:
|
||||
throw new AssertionError("cannot translate " + node);
|
||||
}
|
||||
}
|
||||
|
||||
/** Translates a call to a binary operator, reversing arguments if necessary. */
|
||||
/**
|
||||
* Translates a call to a binary operator, reversing arguments if necessary.
|
||||
*/
|
||||
private Pair<String, RexLiteral> translateBinary(RexCall call) {
|
||||
List<RexNode> operands = call.getOperands();
|
||||
if(operands.size() != 2) {
|
||||
if (operands.size() != 2) {
|
||||
throw new AssertionError("Invalid number of arguments - " + operands.size());
|
||||
}
|
||||
final RexNode left = operands.get(0);
|
||||
|
@ -174,7 +180,9 @@ class SolrFilter extends Filter implements SolrRel {
|
|||
throw new AssertionError("cannot translate call " + call);
|
||||
}
|
||||
|
||||
/** Translates a call to a binary operator. Returns whether successful. */
|
||||
/**
|
||||
* Translates a call to a binary operator. Returns whether successful.
|
||||
*/
|
||||
private Pair<String, RexLiteral> translateBinary2(RexNode left, RexNode right) {
|
||||
switch (right.getKind()) {
|
||||
case LITERAL:
|
||||
|
@ -194,6 +202,177 @@ class SolrFilter extends Filter implements SolrRel {
|
|||
// String itemName = SolrRules.isItem((RexCall) left);
|
||||
// if (itemName != null) {
|
||||
// return translateOp2(op, itemName, rightLiteral);
|
||||
// }
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class HavingTranslator {
|
||||
|
||||
private final List<String> fieldNames;
|
||||
private Map<String,String> reverseAggMappings;
|
||||
|
||||
HavingTranslator(List<String> fieldNames, Map<String, String> reverseAggMappings) {
|
||||
this.fieldNames = fieldNames;
|
||||
this.reverseAggMappings = reverseAggMappings;
|
||||
}
|
||||
|
||||
private String translateMatch(RexNode condition) {
|
||||
if (condition.getKind().belongsTo(SqlKind.COMPARISON)) {
|
||||
return translateComparison(condition);
|
||||
} else if (condition.isA(SqlKind.AND)) {
|
||||
return translateAnd(condition);
|
||||
} else if (condition.isA(SqlKind.OR)) {
|
||||
return translateOr(condition);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private String translateOr(RexNode condition) {
|
||||
List<String> ors = new ArrayList<>();
|
||||
for (RexNode node : RelOptUtil.disjunctions(condition)) {
|
||||
ors.add(translateMatch(node));
|
||||
}
|
||||
StringBuilder builder = new StringBuilder();
|
||||
|
||||
builder.append("or(");
|
||||
int i = 0;
|
||||
for (i = 0; i < ors.size(); i++) {
|
||||
if (i > 0) {
|
||||
builder.append(",");
|
||||
}
|
||||
|
||||
builder.append(ors.get(i));
|
||||
}
|
||||
builder.append(")");
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
private String translateAnd(RexNode node0) {
|
||||
List<String> andStrings = new ArrayList();
|
||||
List<String> notStrings = new ArrayList();
|
||||
|
||||
List<RexNode> ands = new ArrayList();
|
||||
List<RexNode> nots = new ArrayList();
|
||||
|
||||
RelOptUtil.decomposeConjunction(node0, ands, nots);
|
||||
|
||||
for (RexNode node : ands) {
|
||||
andStrings.add(translateMatch(node));
|
||||
}
|
||||
|
||||
StringBuilder builder = new StringBuilder();
|
||||
|
||||
builder.append("and(");
|
||||
for (int i = 0; i < andStrings.size(); i++) {
|
||||
if (i > 0) {
|
||||
builder.append(",");
|
||||
}
|
||||
|
||||
builder.append(andStrings.get(i));
|
||||
}
|
||||
builder.append(")");
|
||||
|
||||
|
||||
if (nots.size() > 0) {
|
||||
for (RexNode node : nots) {
|
||||
notStrings.add(translateMatch(node));
|
||||
}
|
||||
|
||||
StringBuilder notBuilder = new StringBuilder();
|
||||
for(int i=0; i< notStrings.size(); i++) {
|
||||
if(i > 0) {
|
||||
notBuilder.append(",");
|
||||
}
|
||||
notBuilder.append("not(");
|
||||
notBuilder.append(notStrings.get(i));
|
||||
notBuilder.append(")");
|
||||
}
|
||||
|
||||
return "and(" + builder.toString() + ","+ notBuilder.toString()+")";
|
||||
} else {
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private String translateComparison(RexNode node) {
|
||||
Pair<String, RexLiteral> binaryTranslated = null;
|
||||
if (((RexCall) node).getOperands().size() == 2) {
|
||||
binaryTranslated = translateBinary((RexCall) node);
|
||||
}
|
||||
|
||||
switch (node.getKind()) {
|
||||
|
||||
case EQUALS:
|
||||
String terms = binaryTranslated.getValue().getValue2().toString().trim();
|
||||
String clause = "eq(" + binaryTranslated.getKey() + "," + terms + ")";
|
||||
return clause;
|
||||
case NOT_EQUALS:
|
||||
return "not(eq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + "))";
|
||||
case LESS_THAN:
|
||||
return "lt(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + ")";
|
||||
case LESS_THAN_OR_EQUAL:
|
||||
return "lteq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + ")";
|
||||
case GREATER_THAN:
|
||||
return "gt(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + ")";
|
||||
case GREATER_THAN_OR_EQUAL:
|
||||
return "gteq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + ")";
|
||||
default:
|
||||
throw new AssertionError("cannot translate " + node);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Translates a call to a binary operator, reversing arguments if necessary.
|
||||
*/
|
||||
private Pair<String, RexLiteral> translateBinary(RexCall call) {
|
||||
List<RexNode> operands = call.getOperands();
|
||||
if (operands.size() != 2) {
|
||||
throw new AssertionError("Invalid number of arguments - " + operands.size());
|
||||
}
|
||||
final RexNode left = operands.get(0);
|
||||
final RexNode right = operands.get(1);
|
||||
final Pair<String, RexLiteral> a = translateBinary2(left, right);
|
||||
|
||||
if (a != null) {
|
||||
if(reverseAggMappings.containsKey(a.getKey())) {
|
||||
return new Pair<String, RexLiteral>(reverseAggMappings.get(a.getKey()),a.getValue());
|
||||
}
|
||||
return a;
|
||||
}
|
||||
final Pair<String, RexLiteral> b = translateBinary2(right, left);
|
||||
if (b != null) {
|
||||
return b;
|
||||
}
|
||||
throw new AssertionError("cannot translate call " + call);
|
||||
}
|
||||
|
||||
/**
|
||||
* Translates a call to a binary operator. Returns whether successful.
|
||||
*/
|
||||
private Pair<String, RexLiteral> translateBinary2(RexNode left, RexNode right) {
|
||||
switch (right.getKind()) {
|
||||
case LITERAL:
|
||||
break;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
||||
final RexLiteral rightLiteral = (RexLiteral) right;
|
||||
switch (left.getKind()) {
|
||||
case INPUT_REF:
|
||||
final RexInputRef left1 = (RexInputRef) left;
|
||||
String name = fieldNames.get(left1.getIndex());
|
||||
return new Pair<>(name, rightLiteral);
|
||||
case CAST:
|
||||
return translateBinary2(((RexCall) left).operands.get(0), right);
|
||||
// case OTHER_FUNCTION:
|
||||
// String itemName = SolrRules.isItem((RexCall) left);
|
||||
// if (itemName != null) {
|
||||
// return translateOp2(op, itemName, rightLiteral);
|
||||
// }
|
||||
default:
|
||||
return null;
|
||||
|
|
|
@ -33,6 +33,7 @@ enum SolrMethod {
|
|||
List.class,
|
||||
List.class,
|
||||
String.class,
|
||||
String.class,
|
||||
String.class);
|
||||
|
||||
public final Method method;
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.calcite.plan.Convention;
|
|||
import org.apache.calcite.plan.RelOptTable;
|
||||
import org.apache.calcite.rel.RelNode;
|
||||
import org.apache.calcite.util.Pair;
|
||||
import org.apache.solr.client.solrj.io.ops.BooleanOperation;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
|
@ -35,7 +36,9 @@ interface SolrRel extends RelNode {
|
|||
/** Callback for the implementation process that converts a tree of {@link SolrRel} nodes into a Solr query. */
|
||||
class Implementor {
|
||||
final Map<String, String> fieldMappings = new HashMap<>();
|
||||
final Map<String, String> reverseAggMappings = new HashMap<>();
|
||||
String query = null;
|
||||
String havingPredicate;
|
||||
boolean negativeQuery;
|
||||
String limitValue = null;
|
||||
final List<Pair<String, String>> orders = new ArrayList<>();
|
||||
|
@ -51,6 +54,12 @@ interface SolrRel extends RelNode {
|
|||
}
|
||||
}
|
||||
|
||||
void addReverseAggMapping(String key, String val) {
|
||||
if(key != null && !reverseAggMappings.containsKey(key)) {
|
||||
this.reverseAggMappings.put(key, val);
|
||||
}
|
||||
}
|
||||
|
||||
void addQuery(String query) {
|
||||
this.query = query;
|
||||
}
|
||||
|
@ -79,6 +88,11 @@ interface SolrRel extends RelNode {
|
|||
}
|
||||
}
|
||||
|
||||
void setHavingPredicate(String havingPredicate) {
|
||||
this.havingPredicate = havingPredicate;
|
||||
}
|
||||
|
||||
|
||||
void setLimit(String limit) {
|
||||
limitValue = limit;
|
||||
}
|
||||
|
|
|
@ -111,7 +111,7 @@ class SolrRules {
|
|||
}
|
||||
|
||||
<R extends RelNode> SolrConverterRule(Class<R> clazz, Predicate<RelNode> predicate, String description) {
|
||||
super(clazz, predicate::test, Convention.NONE, SolrRel.CONVENTION, description);
|
||||
super(clazz, Convention.NONE, SolrRel.CONVENTION, description);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -120,8 +120,10 @@ class SolrRules {
|
|||
*/
|
||||
private static class SolrFilterRule extends SolrConverterRule {
|
||||
private static boolean isNotFilterByExpr(List<RexNode> rexNodes, List<String> fieldNames) {
|
||||
|
||||
// We dont have a way to filter by result of aggregator now
|
||||
boolean result = true;
|
||||
|
||||
for (RexNode rexNode : rexNodes) {
|
||||
if (rexNode instanceof RexCall) {
|
||||
result = result && isNotFilterByExpr(((RexCall) rexNode).getOperands(), fieldNames);
|
||||
|
|
|
@ -32,7 +32,17 @@ import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
|
|||
import org.apache.solr.client.solrj.io.comp.FieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.ops.AndOperation;
|
||||
import org.apache.solr.client.solrj.io.ops.BooleanOperation;
|
||||
import org.apache.solr.client.solrj.io.ops.EqualsOperation;
|
||||
import org.apache.solr.client.solrj.io.ops.GreaterThanEqualToOperation;
|
||||
import org.apache.solr.client.solrj.io.ops.GreaterThanOperation;
|
||||
import org.apache.solr.client.solrj.io.ops.LessThanEqualToOperation;
|
||||
import org.apache.solr.client.solrj.io.ops.LessThanOperation;
|
||||
import org.apache.solr.client.solrj.io.ops.NotOperation;
|
||||
import org.apache.solr.client.solrj.io.ops.OrOperation;
|
||||
import org.apache.solr.client.solrj.io.stream.*;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
import org.apache.solr.client.solrj.io.stream.metrics.*;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
|
@ -72,7 +82,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
|
|||
|
||||
private Enumerable<Object> query(final Properties properties) {
|
||||
return query(properties, Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(),
|
||||
Collections.emptyList(), null, null);
|
||||
Collections.emptyList(), null, null, null);
|
||||
}
|
||||
|
||||
/** Executes a Solr query on the underlying table.
|
||||
|
@ -89,7 +99,8 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
|
|||
final List<String> buckets,
|
||||
final List<Pair<String, String>> metricPairs,
|
||||
final String limit,
|
||||
final String negativeQuery) {
|
||||
final String negativeQuery,
|
||||
final String havingPredicate) {
|
||||
// SolrParams should be a ModifiableParams instead of a map
|
||||
boolean mapReduce = "map_reduce".equals(properties.getProperty("aggregationMode"));
|
||||
boolean negative = Boolean.parseBoolean(negativeQuery);
|
||||
|
@ -106,8 +117,6 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
|
|||
}
|
||||
}
|
||||
|
||||
System.out.println("####### Limit:"+limit);
|
||||
|
||||
TupleStream tupleStream;
|
||||
String zk = properties.getProperty("zk");
|
||||
try {
|
||||
|
@ -126,7 +135,8 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
|
|||
orders,
|
||||
buckets,
|
||||
metricPairs,
|
||||
limit);
|
||||
limit,
|
||||
havingPredicate);
|
||||
} else {
|
||||
tupleStream = handleGroupByFacet(zk,
|
||||
collection,
|
||||
|
@ -135,7 +145,8 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
|
|||
orders,
|
||||
buckets,
|
||||
metricPairs,
|
||||
limit);
|
||||
limit,
|
||||
havingPredicate);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -403,7 +414,8 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
|
|||
final List<Pair<String, String>> orders,
|
||||
final List<String> _buckets,
|
||||
final List<Pair<String, String>> metricPairs,
|
||||
final String limit) throws IOException {
|
||||
final String limit,
|
||||
final String havingPredicate) throws IOException {
|
||||
|
||||
int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
|
||||
|
||||
|
@ -438,21 +450,36 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
|
|||
CloudSolrStream cstream = new CloudSolrStream(zk, collection, params);
|
||||
tupleStream = new RollupStream(cstream, buckets, metrics);
|
||||
|
||||
StreamFactory factory = new StreamFactory()
|
||||
.withFunctionName("search", CloudSolrStream.class)
|
||||
.withFunctionName("parallel", ParallelStream.class)
|
||||
.withFunctionName("rollup", RollupStream.class)
|
||||
.withFunctionName("sum", SumMetric.class)
|
||||
.withFunctionName("min", MinMetric.class)
|
||||
.withFunctionName("max", MaxMetric.class)
|
||||
.withFunctionName("avg", MeanMetric.class)
|
||||
.withFunctionName("count", CountMetric.class)
|
||||
.withFunctionName("and", AndOperation.class)
|
||||
.withFunctionName("or", OrOperation.class)
|
||||
.withFunctionName("not", NotOperation.class)
|
||||
.withFunctionName("eq", EqualsOperation.class)
|
||||
.withFunctionName("gt", GreaterThanOperation.class)
|
||||
.withFunctionName("lt", LessThanOperation.class)
|
||||
.withFunctionName("lteq", LessThanEqualToOperation.class)
|
||||
.withFunctionName("having", HavingStream.class)
|
||||
.withFunctionName("gteq", GreaterThanEqualToOperation.class);
|
||||
|
||||
if(havingPredicate != null) {
|
||||
BooleanOperation booleanOperation = (BooleanOperation)factory.constructOperation(StreamExpressionParser.parse(havingPredicate));
|
||||
tupleStream = new HavingStream(tupleStream, booleanOperation);
|
||||
}
|
||||
|
||||
if(numWorkers > 1) {
|
||||
// Do the rollups in parallel
|
||||
// Maintain the sort of the Tuples coming from the workers.
|
||||
StreamComparator comp = bucketSortComp(buckets, sortDirection);
|
||||
ParallelStream parallelStream = new ParallelStream(zk, collection, tupleStream, numWorkers, comp);
|
||||
|
||||
StreamFactory factory = new StreamFactory()
|
||||
.withFunctionName("search", CloudSolrStream.class)
|
||||
.withFunctionName("parallel", ParallelStream.class)
|
||||
.withFunctionName("rollup", RollupStream.class)
|
||||
.withFunctionName("sum", SumMetric.class)
|
||||
.withFunctionName("min", MinMetric.class)
|
||||
.withFunctionName("max", MaxMetric.class)
|
||||
.withFunctionName("avg", MeanMetric.class)
|
||||
.withFunctionName("count", CountMetric.class);
|
||||
|
||||
parallelStream.setStreamFactory(factory);
|
||||
tupleStream = parallelStream;
|
||||
|
@ -508,7 +535,8 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
|
|||
final List<Pair<String, String>> orders,
|
||||
final List<String> bucketFields,
|
||||
final List<Pair<String, String>> metricPairs,
|
||||
final String lim) throws IOException {
|
||||
final String lim,
|
||||
final String havingPredicate) throws IOException {
|
||||
|
||||
ModifiableSolrParams solrParams = new ModifiableSolrParams();
|
||||
solrParams.add(CommonParams.Q, query);
|
||||
|
@ -542,6 +570,30 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
|
|||
limit);
|
||||
|
||||
|
||||
|
||||
StreamFactory factory = new StreamFactory()
|
||||
.withFunctionName("search", CloudSolrStream.class)
|
||||
.withFunctionName("parallel", ParallelStream.class)
|
||||
.withFunctionName("rollup", RollupStream.class)
|
||||
.withFunctionName("sum", SumMetric.class)
|
||||
.withFunctionName("min", MinMetric.class)
|
||||
.withFunctionName("max", MaxMetric.class)
|
||||
.withFunctionName("avg", MeanMetric.class)
|
||||
.withFunctionName("count", CountMetric.class)
|
||||
.withFunctionName("and", AndOperation.class)
|
||||
.withFunctionName("or", OrOperation.class)
|
||||
.withFunctionName("not", NotOperation.class)
|
||||
.withFunctionName("eq", EqualsOperation.class)
|
||||
.withFunctionName("gt", GreaterThanOperation.class)
|
||||
.withFunctionName("lt", LessThanOperation.class)
|
||||
.withFunctionName("lteq", LessThanEqualToOperation.class)
|
||||
.withFunctionName("gteq", GreaterThanEqualToOperation.class);
|
||||
|
||||
if(havingPredicate != null) {
|
||||
BooleanOperation booleanOperation = (BooleanOperation)factory.constructOperation(StreamExpressionParser.parse(havingPredicate));
|
||||
tupleStream = new HavingStream(tupleStream, booleanOperation);
|
||||
}
|
||||
|
||||
if(lim != null)
|
||||
{
|
||||
tupleStream = new LimitStream(tupleStream, limit);
|
||||
|
@ -623,8 +675,8 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
|
|||
*/
|
||||
@SuppressWarnings("UnusedDeclaration")
|
||||
public Enumerable<Object> query(List<Map.Entry<String, Class>> fields, String query, List<Pair<String, String>> order,
|
||||
List<String> buckets, List<Pair<String, String>> metricPairs, String limit, String negativeQuery) {
|
||||
return getTable().query(getProperties(), fields, query, order, buckets, metricPairs, limit, negativeQuery);
|
||||
List<String> buckets, List<Pair<String, String>> metricPairs, String limit, String negativeQuery, String havingPredicate) {
|
||||
return getTable().query(getProperties(), fields, query, order, buckets, metricPairs, limit, negativeQuery, havingPredicate);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -84,8 +84,9 @@ class SolrToEnumerableConverter extends ConverterImpl implements EnumerableRel {
|
|||
final Expression metricPairs = list.append("metricPairs", constantArrayList(solrImplementor.metricPairs, Pair.class));
|
||||
final Expression limit = list.append("limit", Expressions.constant(solrImplementor.limitValue));
|
||||
final Expression negativeQuery = list.append("negativeQuery", Expressions.constant(Boolean.toString(solrImplementor.negativeQuery), String.class));
|
||||
final Expression havingPredicate = list.append("havingTest", Expressions.constant(solrImplementor.havingPredicate, String.class));
|
||||
Expression enumerable = list.append("enumerable", Expressions.call(table, SolrMethod.SOLR_QUERYABLE_QUERY.method,
|
||||
fields, query, orders, buckets, metricPairs, limit, negativeQuery));
|
||||
fields, query, orders, buckets, metricPairs, limit, negativeQuery, havingPredicate));
|
||||
Hook.QUERY_PLAN.run(query);
|
||||
list.add(Expressions.return_(null, enumerable));
|
||||
return implementor.result(physType, list.toBlock());
|
||||
|
|
|
@ -18,12 +18,12 @@ package org.apache.solr.client.solrj.io.ops;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
|
@ -33,60 +33,47 @@ public class AndOperation implements BooleanOperation {
|
|||
private static final long serialVersionUID = 1;
|
||||
private UUID operationNodeId = UUID.randomUUID();
|
||||
|
||||
protected BooleanOperation leftOperand;
|
||||
protected BooleanOperation rightOperand;
|
||||
private List<BooleanOperation> booleanOperations = new ArrayList();
|
||||
|
||||
public void operate(Tuple tuple) {
|
||||
leftOperand.operate(tuple);
|
||||
rightOperand.operate(tuple);
|
||||
for(BooleanOperation booleanOperation : booleanOperations) {
|
||||
booleanOperation.operate(tuple);
|
||||
}
|
||||
}
|
||||
|
||||
public AndOperation(BooleanOperation leftOperand, BooleanOperation rightOperand) {
|
||||
this.leftOperand = leftOperand;
|
||||
this.rightOperand = rightOperand;
|
||||
public AndOperation(List<BooleanOperation> booleanOperations) {
|
||||
this.booleanOperations = booleanOperations;
|
||||
}
|
||||
|
||||
public AndOperation(StreamExpression expression, StreamFactory factory) throws IOException {
|
||||
List<StreamExpression> operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, BooleanOperation.class);
|
||||
if(operationExpressions != null && operationExpressions.size() == 2) {
|
||||
StreamExpression left = operationExpressions.get(0);
|
||||
StreamOperation leftOp = factory.constructOperation(left);
|
||||
if(leftOp instanceof BooleanOperation) {
|
||||
leftOperand = (BooleanOperation) leftOp;
|
||||
} else {
|
||||
throw new IOException("The And/Or Operation requires a BooleanOperation.");
|
||||
}
|
||||
|
||||
StreamExpression right = operationExpressions.get(1);
|
||||
StreamOperation rightOp = factory.constructOperation(right);
|
||||
if(rightOp instanceof BooleanOperation) {
|
||||
rightOperand = (BooleanOperation) rightOp;
|
||||
} else {
|
||||
throw new IOException("The And/Or Operation requires a BooleanOperation.");
|
||||
}
|
||||
List<StreamExpression> operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, BooleanOperation.class);
|
||||
for(StreamExpression se : operationExpressions) {
|
||||
StreamOperation op = factory.constructOperation(se);
|
||||
if(op instanceof BooleanOperation) {
|
||||
booleanOperations.add((BooleanOperation)op);
|
||||
} else {
|
||||
throw new IOException("The And/Or Operation requires a BooleanOperations.");
|
||||
throw new IOException("AndOperation requires BooleanOperation parameters");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean evaluate() {
|
||||
return leftOperand.evaluate() && rightOperand.evaluate();
|
||||
for(BooleanOperation booleanOperation : booleanOperations) {
|
||||
if(!booleanOperation.evaluate()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
|
||||
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
|
||||
if(leftOperand instanceof Expressible) {
|
||||
expression.addParameter(leftOperand.toExpression(factory));
|
||||
} else {
|
||||
throw new IOException("This left operand of the AndOperation contains a non-expressible operation - it cannot be converted to an expression");
|
||||
|
||||
for(BooleanOperation booleanOperation : booleanOperations) {
|
||||
expression.addParameter(booleanOperation.toExpression(factory));
|
||||
}
|
||||
|
||||
if(rightOperand instanceof Expressible) {
|
||||
expression.addParameter(rightOperand.toExpression(factory));
|
||||
} else {
|
||||
throw new IOException("This the right operand of the AndOperation contains a non-expressible operation - it cannot be converted to an expression");
|
||||
}
|
||||
return expression;
|
||||
}
|
||||
|
||||
|
|
|
@ -17,46 +17,63 @@
|
|||
package org.apache.solr.client.solrj.io.ops;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
|
||||
public class OrOperation extends AndOperation {
|
||||
public class OrOperation implements BooleanOperation {
|
||||
|
||||
private static final long serialVersionUID = 1;
|
||||
private UUID operationNodeId = UUID.randomUUID();
|
||||
|
||||
public OrOperation(BooleanOperation leftOperand, BooleanOperation rightOperand) {
|
||||
super(leftOperand, rightOperand);
|
||||
private List<BooleanOperation> booleanOperations = new ArrayList();
|
||||
|
||||
public void operate(Tuple tuple) {
|
||||
for(BooleanOperation booleanOperation : booleanOperations) {
|
||||
booleanOperation.operate(tuple);
|
||||
}
|
||||
}
|
||||
|
||||
public OrOperation(List<BooleanOperation> booleanOperations) {
|
||||
this.booleanOperations = booleanOperations;
|
||||
}
|
||||
|
||||
public OrOperation(StreamExpression expression, StreamFactory factory) throws IOException {
|
||||
super(expression, factory);
|
||||
List<StreamExpression> operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, BooleanOperation.class);
|
||||
for(StreamExpression se : operationExpressions) {
|
||||
StreamOperation op = factory.constructOperation(se);
|
||||
if(op instanceof BooleanOperation) {
|
||||
booleanOperations.add((BooleanOperation)op);
|
||||
} else {
|
||||
throw new IOException("AndOperation requires BooleanOperation parameters");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean evaluate() {
|
||||
return leftOperand.evaluate() || rightOperand.evaluate();
|
||||
for(BooleanOperation booleanOperation : booleanOperations) {
|
||||
if(booleanOperation.evaluate()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
|
||||
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
|
||||
if(leftOperand instanceof Expressible) {
|
||||
expression.addParameter(leftOperand.toExpression(factory));
|
||||
} else {
|
||||
throw new IOException("This left operand of the OrOperation contains a non-expressible operation - it cannot be converted to an expression");
|
||||
|
||||
for(BooleanOperation booleanOperation : booleanOperations) {
|
||||
expression.addParameter(booleanOperation.toExpression(factory));
|
||||
}
|
||||
|
||||
if(rightOperand instanceof Expressible) {
|
||||
expression.addParameter(rightOperand.toExpression(factory));
|
||||
} else {
|
||||
throw new IOException("This the right operand of the OrOperation contains a non-expressible operation - it cannot be converted to an expression");
|
||||
}
|
||||
return expression;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue