diff --git a/lucene/ivy-versions.properties b/lucene/ivy-versions.properties
index 70e33a22492..172b95bb100 100644
--- a/lucene/ivy-versions.properties
+++ b/lucene/ivy-versions.properties
@@ -17,8 +17,6 @@ com.carrotsearch.randomizedtesting.version = 2.5.0
/com.cybozu.labs/langdetect = 1.1-20120112
/com.drewnoakes/metadata-extractor = 2.8.1
-/com.facebook.presto/presto-parser = 0.122
-
com.fasterxml.jackson.core.version = 2.5.4
/com.fasterxml.jackson.core/jackson-annotations = ${com.fasterxml.jackson.core.version}
/com.fasterxml.jackson.core/jackson-core = ${com.fasterxml.jackson.core.version}
@@ -33,7 +31,7 @@ com.google.inject.guice.version = 3.0
/com.google.inject.extensions/guice-servlet = ${com.google.inject.guice.version}
/com.google.inject/guice = ${com.google.inject.guice.version}
-/com.google.protobuf/protobuf-java = 2.5.0
+/com.google.protobuf/protobuf-java = 3.1.0
/com.googlecode.juniversalchardet/juniversalchardet = 1.0.3
/com.googlecode.mp4parser/isoparser = 1.1.18
/com.healthmarketscience.jackcess/jackcess = 2.1.3
@@ -72,7 +70,6 @@ com.sun.jersey.version = 1.9
/dom4j/dom4j = 1.6.1
/hsqldb/hsqldb = 1.8.0.10
/info.ganglia.gmetric4j/gmetric4j = 1.0.7
-/io.airlift/slice = 0.10
io.dropwizard.metrics.version = 3.1.2
/io.dropwizard.metrics/metrics-core = ${io.dropwizard.metrics.version}
@@ -97,6 +94,7 @@ io.netty.netty-all.version = 4.0.36.Final
/mecab/mecab-naist-jdic = 0.6.3b-20111013
/net.arnx/jsonic = 1.2.7
/net.bytebuddy/byte-buddy = 1.6.2
+/net.hydromatic/eigenbase-properties = 1.1.5
/net.sf.ehcache/ehcache-core = 2.4.4
/net.sf.saxon/Saxon-HE = 9.6.0-2
/net.sourceforge.argparse4j/argparse4j = 0.4.3
@@ -106,6 +104,14 @@ io.netty.netty-all.version = 4.0.36.Final
/org.apache.ant/ant = 1.8.2
/org.apache.avro/avro = 1.7.5
+
+org.apache.calcite.avatica.version = 1.9.0
+/org.apache.calcite.avatica/avatica-core = ${org.apache.calcite.avatica.version}
+
+org.apache.calcite.version = 1.11.0
+/org.apache.calcite/calcite-core = ${org.apache.calcite.version}
+/org.apache.calcite/calcite-linq4j = ${org.apache.calcite.version}
+
/org.apache.commons/commons-compress = 1.11
/org.apache.commons/commons-exec = 1.3
/org.apache.commons/commons-math3 = 3.4.1
@@ -243,6 +249,10 @@ org.codehaus.jackson.version = 1.9.13
/org.codehaus.jackson/jackson-jaxrs = ${org.codehaus.jackson.version}
/org.codehaus.jackson/jackson-mapper-asl = ${org.codehaus.jackson.version}
+org.codehaus.janino.version = 2.7.6
+/org.codehaus.janino/commons-compiler = ${org.codehaus.janino.version}
+/org.codehaus.janino/janino = ${org.codehaus.janino.version}
+
/org.codehaus.woodstox/stax2-api = 3.1.4
/org.codehaus.woodstox/woodstox-core-asl = 4.4.1
/org.easymock/easymock = 3.0
diff --git a/solr/core/ivy.xml b/solr/core/ivy.xml
index 074e35f201f..dc58ad5c17a 100644
--- a/solr/core/ivy.xml
+++ b/solr/core/ivy.xml
@@ -60,8 +60,8 @@
-
-
+
+
@@ -142,10 +142,15 @@
-
-
-
+
+
+
+
+
+
+
+
diff --git a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
index 3074d9bb59d..d65ea560ab5 100644
--- a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
@@ -18,65 +18,35 @@ package org.apache.solr.handler;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.Iterator;
-import java.util.Locale;
-import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
-import com.facebook.presto.sql.tree.*;
-import com.google.common.base.Strings;
-import com.google.common.collect.Iterables;
-
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.calcite.config.Lex;
import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
-import org.apache.solr.client.solrj.io.comp.FieldComparator;
-import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
-import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
-import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
-import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
-import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
-import org.apache.solr.client.solrj.io.stream.FacetStream;
-import org.apache.solr.client.solrj.io.stream.ParallelStream;
-import org.apache.solr.client.solrj.io.stream.RankStream;
-import org.apache.solr.client.solrj.io.stream.RollupStream;
-import org.apache.solr.client.solrj.io.stream.SelectStream;
-import org.apache.solr.client.solrj.io.stream.StatsStream;
-import org.apache.solr.client.solrj.io.stream.StreamContext;
-import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
-import org.apache.solr.client.solrj.io.stream.UniqueStream;
-import org.apache.solr.client.solrj.io.stream.expr.Explanation;
-import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
-import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
-import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
-import org.apache.solr.client.solrj.io.stream.metrics.*;
+import org.apache.solr.client.solrj.io.stream.JDBCStream;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.sql.CalciteSolrDriver;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.security.AuthorizationContext;
import org.apache.solr.security.PermissionNameProvider;
import org.apache.solr.util.plugin.SolrCoreAware;
-
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.facebook.presto.sql.parser.SqlParser;
-
-public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , PermissionNameProvider {
+public class SQLHandler extends RequestHandlerBase implements SolrCoreAware, PermissionNameProvider {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -103,19 +73,15 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
}
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
- SolrParams params = req.getParams();
- params = adjustParams(params);
- req.setParams(params);
+ ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
String sql = params.get("stmt");
- int numWorkers = params.getInt("numWorkers", 1);
- String workerCollection = params.get("workerCollection", defaultWorkerCollection);
- String workerZkhost = params.get("workerZkhost", defaultZkhost);
- String mode = params.get("aggregationMode", "map_reduce");
- StreamContext context = new StreamContext();
-
- // JDBC driver requires metadata from the SQLHandler. Default to false since this adds a new Metadata stream.
- boolean includeMetadata = params.getBool("includeMetadata", false);
+ // Set defaults for parameters
+ params.set("numWorkers", params.getInt("numWorkers", 1));
+ params.set("workerCollection", params.get("workerCollection", defaultWorkerCollection));
+ params.set("workerZkhost", params.get("workerZkhost", defaultZkhost));
+ params.set("aggregationMode", params.get("aggregationMode", "facet"));
+ TupleStream tupleStream = null;
try {
if(!isCloud) {
@@ -126,30 +92,39 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
throw new Exception("stmt parameter cannot be null");
}
- context.setSolrClientCache(StreamHandler.clientCache);
+ String url = CalciteSolrDriver.CONNECT_STRING_PREFIX;
- TupleStream tupleStream = SQLTupleStreamParser.parse(sql,
- numWorkers,
- workerCollection,
- workerZkhost,
- AggregationMode.getMode(mode),
- includeMetadata,
- context);
+ Properties properties = new Properties();
+ // Add all query parameters
+ Iterator parameterNamesIterator = params.getParameterNamesIterator();
+ while(parameterNamesIterator.hasNext()) {
+ String param = parameterNamesIterator.next();
+ properties.setProperty(param, params.get(param));
+ }
- rsp.add("result-set", new StreamHandler.TimerStream(new ExceptionStream(tupleStream)));
+ // Set these last to ensure that they are set properly
+ properties.setProperty("lex", Lex.MYSQL.toString());
+ properties.setProperty("zk", defaultZkhost);
+
+ String driverClass = CalciteSolrDriver.class.getCanonicalName();
+
+ // JDBC driver requires metadata from the SQLHandler. Default to false since this adds a new Metadata stream.
+ boolean includeMetadata = params.getBool("includeMetadata", false);
+ tupleStream = new SqlHandlerStream(url, sql, null, properties, driverClass, includeMetadata);
+
+ tupleStream = new StreamHandler.TimerStream(new ExceptionStream(tupleStream));
+
+ rsp.add("result-set", tupleStream);
} catch(Exception e) {
//Catch the SQL parsing and query transformation exceptions.
+ if(tupleStream != null) {
+ tupleStream.close();
+ }
SolrException.log(logger, e);
rsp.add("result-set", new StreamHandler.DummyErrorStream(e));
}
}
- private SolrParams adjustParams(SolrParams params) {
- ModifiableSolrParams adjustedParams = new ModifiableSolrParams(params);
- adjustedParams.set(CommonParams.OMIT_HEADER, "true");
- return adjustedParams;
- }
-
public String getDescription() {
return "SQLHandler";
}
@@ -158,1569 +133,51 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
return null;
}
- public static class SQLTupleStreamParser {
-
- public static TupleStream parse(String sql,
- int numWorkers,
- String workerCollection,
- String workerZkhost,
- AggregationMode aggregationMode,
- boolean includeMetadata,
- StreamContext context) throws IOException {
- SqlParser parser = new SqlParser();
- Statement statement = parser.createStatement(sql);
-
- SQLVisitor sqlVistor = new SQLVisitor(new StringBuilder());
-
- sqlVistor.process(statement, new Integer(0));
- sqlVistor.reverseAliases();
-
- TupleStream sqlStream = null;
-
- if(sqlVistor.table.toUpperCase(Locale.ROOT).contains("_CATALOGS_")) {
- sqlStream = new SelectStream(new CatalogsStream(defaultZkhost), sqlVistor.columnAliases);
- } else if(sqlVistor.table.toUpperCase(Locale.ROOT).contains("_SCHEMAS_")) {
- sqlStream = new SelectStream(new SchemasStream(defaultZkhost), sqlVistor.columnAliases);
- } else if(sqlVistor.table.toUpperCase(Locale.ROOT).contains("_TABLES_")) {
- sqlStream = new SelectStream(new TableStream(defaultZkhost), sqlVistor.columnAliases);
- } else if(sqlVistor.groupByQuery) {
- if(aggregationMode == AggregationMode.FACET) {
- sqlStream = doGroupByWithAggregatesFacets(sqlVistor);
- } else {
- context.numWorkers = numWorkers;
- sqlStream = doGroupByWithAggregates(sqlVistor, numWorkers, workerCollection, workerZkhost);
- }
- } else if(sqlVistor.isDistinct) {
- if(aggregationMode == AggregationMode.FACET) {
- sqlStream = doSelectDistinctFacets(sqlVistor);
- } else {
- context.numWorkers = numWorkers;
- sqlStream = doSelectDistinct(sqlVistor, numWorkers, workerCollection, workerZkhost);
- }
- } else {
- sqlStream = doSelect(sqlVistor);
- }
-
- if(includeMetadata) {
- sqlStream = new MetadataStream(sqlStream, sqlVistor);
- }
-
- sqlStream.setStreamContext(context);
- return sqlStream;
- }
- }
-
- private static TupleStream doGroupByWithAggregates(SQLVisitor sqlVisitor,
- int numWorkers,
- String workerCollection,
- String workerZkHost) throws IOException {
-
- Set fieldSet = new HashSet();
- Bucket[] buckets = getBuckets(sqlVisitor.groupBy, fieldSet);
- Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
- if(metrics.length == 0) {
- throw new IOException("Group by queries must include atleast one aggregate function.");
- }
-
- String fl = fields(fieldSet);
- String sortDirection = getSortDirection(sqlVisitor.sorts);
- String sort = bucketSort(buckets, sortDirection);
-
- TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
-
- String zkHost = tableSpec.zkHost;
- String collection = tableSpec.collection;
- ModifiableSolrParams params = new ModifiableSolrParams();
-
- params.set(CommonParams.FL, fl);
- params.set(CommonParams.Q, sqlVisitor.query);
- //Always use the /export handler for Group By Queries because it requires exporting full result sets.
- params.set(CommonParams.QT, "/export");
-
- if(numWorkers > 1) {
- params.set("partitionKeys", getPartitionKeys(buckets));
- }
-
- params.set("sort", sort);
-
- TupleStream tupleStream = null;
-
- CloudSolrStream cstream = new CloudSolrStream(zkHost, collection, params);
- tupleStream = new RollupStream(cstream, buckets, metrics);
-
- if(numWorkers > 1) {
- // Do the rollups in parallel
- // Maintain the sort of the Tuples coming from the workers.
- StreamComparator comp = bucketSortComp(buckets, sortDirection);
- ParallelStream parallelStream = new ParallelStream(workerZkHost, workerCollection, tupleStream, numWorkers, comp);
-
- StreamFactory factory = new StreamFactory()
- .withFunctionName("search", CloudSolrStream.class)
- .withFunctionName("parallel", ParallelStream.class)
- .withFunctionName("rollup", RollupStream.class)
- .withFunctionName("sum", SumMetric.class)
- .withFunctionName("min", MinMetric.class)
- .withFunctionName("max", MaxMetric.class)
- .withFunctionName("avg", MeanMetric.class)
- .withFunctionName("count", CountMetric.class);
-
- parallelStream.setStreamFactory(factory);
- tupleStream = parallelStream;
- }
-
- //TODO: This should be done on the workers, but it won't serialize because it relies on Presto classes.
- // Once we make this a Expressionable the problem will be solved.
-
- if(sqlVisitor.havingExpression != null) {
- tupleStream = new HavingStream(tupleStream, sqlVisitor.havingExpression, sqlVisitor.reverseColumnAliases );
- }
-
- if(sqlVisitor.sorts != null && sqlVisitor.sorts.size() > 0) {
- if(!sortsEqual(buckets, sortDirection, sqlVisitor.sorts, sqlVisitor.reverseColumnAliases)) {
- int limit = sqlVisitor.limit == -1 ? 100 : sqlVisitor.limit;
- StreamComparator comp = getComp(sqlVisitor.sorts, sqlVisitor.reverseColumnAliases);
- //Rank the Tuples
- //If parallel stream is used ALL the Rolled up tuples from the workers will be ranked
- //Providing a true Top or Bottom.
- tupleStream = new RankStream(tupleStream, limit, comp);
- } else {
- // Sort is the same as the same as the underlying stream
- // Only need to limit the result, not Rank the result
- if(sqlVisitor.limit > -1) {
- tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
- }
- }
- }
-
- if(sqlVisitor.hasColumnAliases) {
- tupleStream = new SelectStream(tupleStream, sqlVisitor.columnAliases);
- }
-
- return tupleStream;
- }
-
- private static TupleStream doSelectDistinct(SQLVisitor sqlVisitor,
- int numWorkers,
- String workerCollection,
- String workerZkHost) throws IOException {
-
- Set fieldSet = new HashSet();
- Bucket[] buckets = getBuckets(sqlVisitor.fields, fieldSet);
- Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
-
- if(metrics.length > 0) {
- throw new IOException("Select Distinct queries cannot include aggregate functions.");
- }
-
- String fl = fields(fieldSet);
-
- String sort = null;
- StreamEqualitor ecomp = null;
- StreamComparator comp = null;
-
- if(sqlVisitor.sorts != null && sqlVisitor.sorts.size() > 0) {
- StreamComparator[] adjustedSorts = adjustSorts(sqlVisitor.sorts, buckets, sqlVisitor.reverseColumnAliases);
- // Because of the way adjustSorts works we know that each FieldComparator has a single
- // field name. For this reason we can just look at the leftFieldName
- FieldEqualitor[] fieldEqualitors = new FieldEqualitor[adjustedSorts.length];
- StringBuilder buf = new StringBuilder();
- for(int i=0; i0) {
- buf.append(",");
- }
- buf.append(fieldComparator.getLeftFieldName()).append(" ").append(fieldComparator.getOrder().toString());
- }
-
- sort = buf.toString();
-
- if(adjustedSorts.length == 1) {
- ecomp = fieldEqualitors[0];
- comp = adjustedSorts[0];
- } else {
- ecomp = new MultipleFieldEqualitor(fieldEqualitors);
- comp = new MultipleFieldComparator(adjustedSorts);
- }
- } else {
- StringBuilder sortBuf = new StringBuilder();
- FieldEqualitor[] equalitors = new FieldEqualitor[buckets.length];
- StreamComparator[] streamComparators = new StreamComparator[buckets.length];
- for(int i=0; i0) {
- sortBuf.append(',');
- }
- sortBuf.append(buckets[i].toString()).append(" asc");
- }
-
- sort = sortBuf.toString();
-
- if(equalitors.length == 1) {
- ecomp = equalitors[0];
- comp = streamComparators[0];
- } else {
- ecomp = new MultipleFieldEqualitor(equalitors);
- comp = new MultipleFieldComparator(streamComparators);
- }
- }
-
- TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
-
- String zkHost = tableSpec.zkHost;
- String collection = tableSpec.collection;
- ModifiableSolrParams params = new ModifiableSolrParams();
-
- params.set(CommonParams.FL, fl);
- params.set(CommonParams.Q, sqlVisitor.query);
- //Always use the /export handler for Distinct Queries because it requires exporting full result sets.
- params.set(CommonParams.QT, "/export");
-
- if(numWorkers > 1) {
- params.set("partitionKeys", getPartitionKeys(buckets));
- }
-
- params.set("sort", sort);
-
- TupleStream tupleStream = null;
-
- CloudSolrStream cstream = new CloudSolrStream(zkHost, collection, params);
- tupleStream = new UniqueStream(cstream, ecomp);
-
- if(numWorkers > 1) {
- // Do the unique in parallel
- // Maintain the sort of the Tuples coming from the workers.
- ParallelStream parallelStream = new ParallelStream(workerZkHost, workerCollection, tupleStream, numWorkers, comp);
-
- StreamFactory factory = new StreamFactory()
- .withFunctionName("search", CloudSolrStream.class)
- .withFunctionName("parallel", ParallelStream.class)
- .withFunctionName("unique", UniqueStream.class);
-
- parallelStream.setStreamFactory(factory);
- tupleStream = parallelStream;
- }
-
- if(sqlVisitor.limit > 0) {
- tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
- }
-
- if(sqlVisitor.hasColumnAliases) {
- tupleStream = new SelectStream(tupleStream, sqlVisitor.columnAliases);
- }
-
- return tupleStream;
- }
-
- private static StreamComparator[] adjustSorts(List sorts, Bucket[] buckets, Map reverseColumnAliases) throws IOException {
- List adjustedSorts = new ArrayList();
- Set bucketFields = new HashSet();
- Set sortFields = new HashSet();
-
- for(SortItem sortItem : sorts) {
-
- sortFields.add(getSortField(sortItem, reverseColumnAliases));
- adjustedSorts.add(new FieldComparator(getSortField(sortItem, reverseColumnAliases),
- ascDescComp(sortItem.getOrdering().toString())));
- }
-
- for(Bucket bucket : buckets) {
- bucketFields.add(bucket.toString());
- }
-
- for(SortItem sortItem : sorts) {
- String sortField = getSortField(sortItem, reverseColumnAliases);
- if(!bucketFields.contains(sortField)) {
- throw new IOException("All sort fields must be in the field list.");
- }
- }
-
- //Add sort fields if needed
- if(sorts.size() < buckets.length) {
- for(Bucket bucket : buckets) {
- String b = bucket.toString();
- if(!sortFields.contains(b)) {
- adjustedSorts.add(new FieldComparator(bucket.toString(), ComparatorOrder.ASCENDING));
- }
- }
- }
-
- return adjustedSorts.toArray(new FieldComparator[adjustedSorts.size()]);
- }
-
- private static TupleStream doSelectDistinctFacets(SQLVisitor sqlVisitor) throws IOException {
-
- Set fieldSet = new HashSet();
- Bucket[] buckets = getBuckets(sqlVisitor.fields, fieldSet);
- Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
-
- if(metrics.length > 0) {
- throw new IOException("Select Distinct queries cannot include aggregate functions.");
- }
-
- TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
-
- String zkHost = tableSpec.zkHost;
- String collection = tableSpec.collection;
- ModifiableSolrParams params = new ModifiableSolrParams();
-
- params.set(CommonParams.Q, sqlVisitor.query);
-
- int limit = sqlVisitor.limit > 0 ? sqlVisitor.limit : 100;
-
- FieldComparator[] sorts = null;
-
- if(sqlVisitor.sorts == null) {
- sorts = new FieldComparator[buckets.length];
- for(int i=0; i 0) {
- tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
- }
-
- return new SelectStream(tupleStream, sqlVisitor.columnAliases);
- }
-
- private static TupleStream doGroupByWithAggregatesFacets(SQLVisitor sqlVisitor) throws IOException {
-
- Set fieldSet = new HashSet();
- Bucket[] buckets = getBuckets(sqlVisitor.groupBy, fieldSet);
- Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
- if(metrics.length == 0) {
- throw new IOException("Group by queries must include at least one aggregate function.");
- }
-
- TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
-
- String zkHost = tableSpec.zkHost;
- String collection = tableSpec.collection;
- ModifiableSolrParams params = new ModifiableSolrParams();
-
- params.set(CommonParams.Q, sqlVisitor.query);
-
- int limit = sqlVisitor.limit > 0 ? sqlVisitor.limit : 100;
-
- FieldComparator[] sorts = null;
-
- if(sqlVisitor.sorts == null) {
- sorts = new FieldComparator[buckets.length];
- for(int i=0; i 0)
- {
- tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
- }
-
- if(sqlVisitor.hasColumnAliases) {
- tupleStream = new SelectStream(tupleStream, sqlVisitor.columnAliases);
- }
-
- return tupleStream;
- }
-
- private static TupleStream doSelect(SQLVisitor sqlVisitor) throws IOException {
- List fields = sqlVisitor.fields;
- Set fieldSet = new HashSet();
- Metric[] metrics = getMetrics(fields, fieldSet);
- if(metrics.length > 0) {
- return doAggregates(sqlVisitor, metrics);
- }
-
- StringBuilder flbuf = new StringBuilder();
- boolean comma = false;
-
- if(fields.size() == 0) {
- throw new IOException("Select columns must be specified.");
- }
-
- TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
-
- String zkHost = tableSpec.zkHost;
- String collection = tableSpec.collection;
-
- boolean score = false;
-
- for (String field : fields) {
-
- if(field.contains("(")) {
- throw new IOException("Aggregate functions only supported with group by queries.");
- }
-
- if(field.contains("*")) {
- throw new IOException("* is not supported for column selection.");
- }
-
- if(field.equals("score")) {
- if(sqlVisitor.limit < 0) {
- throw new IOException("score is not a valid field for unlimited select queries");
- } else {
- score = true;
- }
- }
-
- if (comma) {
- flbuf.append(",");
- }
-
- comma = true;
- flbuf.append(field);
- }
-
- String fl = flbuf.toString();
-
- List sorts = sqlVisitor.sorts;
-
- StringBuilder siBuf = new StringBuilder();
-
- comma = false;
-
- if(sorts != null) {
- for (SortItem sortItem : sorts) {
- if (comma) {
- siBuf.append(",");
- }
- siBuf.append(getSortField(sortItem, sqlVisitor.reverseColumnAliases) + " " + ascDesc(sortItem.getOrdering().toString()));
- }
- } else {
- if(sqlVisitor.limit < 0) {
- siBuf.append("_version_ desc");
- fl = fl+",_version_";
- } else {
- siBuf.append("score desc");
- if(!score) {
- fl = fl+",score";
- }
- }
- }
-
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set("fl", fl.toString());
- params.set("q", sqlVisitor.query);
-
- if(siBuf.length() > 0) {
- params.set("sort", siBuf.toString());
- }
-
- TupleStream tupleStream;
-
- if(sqlVisitor.limit > -1) {
- params.set("rows", Integer.toString(sqlVisitor.limit));
- tupleStream = new LimitStream(new CloudSolrStream(zkHost, collection, params), sqlVisitor.limit);
- } else {
- //Only use the export handler when no limit is specified.
- params.set(CommonParams.QT, "/export");
- tupleStream = new CloudSolrStream(zkHost, collection, params);
- }
-
- return new SelectStream(tupleStream, sqlVisitor.columnAliases);
- }
-
- private static boolean sortsEqual(Bucket[] buckets, String direction, List sortItems, Map reverseColumnAliases) {
- if(buckets.length != sortItems.size()) {
- return false;
- }
-
- for(int i=0; i< buckets.length; i++) {
- Bucket bucket = buckets[i];
- SortItem sortItem = sortItems.get(i);
- if(!bucket.toString().equals(getSortField(sortItem, reverseColumnAliases))) {
- return false;
- }
-
-
- if(!sortItem.getOrdering().toString().toLowerCase(Locale.ROOT).contains(direction.toLowerCase(Locale.ROOT))) {
- return false;
- }
- }
-
- return true;
- }
-
- private static TupleStream doAggregates(SQLVisitor sqlVisitor, Metric[] metrics) throws IOException {
-
- if(metrics.length != sqlVisitor.fields.size()) {
- throw new IOException("Only aggregate functions are allowed when group by is not specified.");
- }
-
- TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
-
- String zkHost = tableSpec.zkHost;
- String collection = tableSpec.collection;
- ModifiableSolrParams params = new ModifiableSolrParams();
-
- params.set(CommonParams.Q, sqlVisitor.query);
-
- TupleStream tupleStream = new StatsStream(zkHost,
- collection,
- params,
- metrics);
-
- if(sqlVisitor.hasColumnAliases) {
- tupleStream = new SelectStream(tupleStream, sqlVisitor.columnAliases);
- }
-
- return tupleStream;
- }
-
- private static String bucketSort(Bucket[] buckets, String dir) {
- StringBuilder buf = new StringBuilder();
- boolean comma = false;
- for(Bucket bucket : buckets) {
- if(comma) {
- buf.append(",");
- }
- buf.append(bucket.toString()).append(" ").append(dir);
- comma = true;
- }
-
- return buf.toString();
- }
-
- private static String getPartitionKeys(Bucket[] buckets) {
- StringBuilder buf = new StringBuilder();
- boolean comma = false;
- for(Bucket bucket : buckets) {
- if(comma) {
- buf.append(",");
- }
- buf.append(bucket.toString());
- comma = true;
- }
- return buf.toString();
- }
-
- private static String getSortDirection(List sorts) {
- if(sorts != null && sorts.size() > 0) {
- for(SortItem item : sorts) {
- return ascDesc(stripSingleQuotes(stripQuotes(item.getOrdering().toString())));
- }
- }
-
- return "asc";
- }
-
- private static StreamComparator bucketSortComp(Bucket[] buckets, String dir) {
- FieldComparator[] comps = new FieldComparator[buckets.length];
- for(int i=0; i sortItems, Map reverseColumnAliases) {
- FieldComparator[] comps = new FieldComparator[sortItems.size()];
- for(int i=0; i sortItems, Map reverseColumnAliases) {
- FieldComparator[] comps = new FieldComparator[sortItems.size()];
- for(int i=0; i fieldSet) {
- StringBuilder buf = new StringBuilder();
- boolean comma = false;
- for(String field : fieldSet) {
- if(comma) {
- buf.append(",");
- }
- buf.append(field);
- comma = true;
- }
-
- return buf.toString();
- }
-
- private static Metric[] getMetrics(List fields, Set fieldSet) throws IOException {
- List metrics = new ArrayList();
- for(String field : fields) {
- if(field.contains("(")) {
-
- field = field.substring(0, field.length()-1);
- String[] parts = field.split("\\(");
- String function = parts[0];
- validateFunction(function);
- String column = parts[1];
- if(function.equals("min")) {
- metrics.add(new MinMetric(column));
- fieldSet.add(column);
- } else if(function.equals("max")) {
- metrics.add(new MaxMetric(column));
- fieldSet.add(column);
- } else if(function.equals("sum")) {
- metrics.add(new SumMetric(column));
- fieldSet.add(column);
- } else if(function.equals("avg")) {
- metrics.add(new MeanMetric(column));
- fieldSet.add(column);
- } else if(function.equals("count")) {
- metrics.add(new CountMetric());
- }
- }
- }
- return metrics.toArray(new Metric[metrics.size()]);
- }
-
- private static void validateFunction(String function) throws IOException {
- if(function.equals("min") || function.equals("max") || function.equals("sum") || function.equals("avg") || function.equals("count")) {
- return;
- } else {
- throw new IOException("Invalid function: "+function);
- }
- }
-
- private static Bucket[] getBuckets(List fields, Set fieldSet) {
- List buckets = new ArrayList();
- for(String field : fields) {
- String f = stripQuotes(field);
- buckets.add(new Bucket(f));
- fieldSet.add(f);
- }
-
- return buckets.toArray(new Bucket[buckets.size()]);
- }
-
- private static String ascDesc(String s) {
- if(s.toLowerCase(Locale.ROOT).contains("desc")) {
- return "desc";
- } else {
- return "asc";
- }
- }
-
- private static ComparatorOrder ascDescComp(String s) {
- if(s.toLowerCase(Locale.ROOT).contains("desc")) {
- return ComparatorOrder.DESCENDING;
- } else {
- return ComparatorOrder.ASCENDING;
- }
- }
-
- private static String stripQuotes(String s) {
- StringBuilder buf = new StringBuilder();
- for(int i=0; i {
-
- protected Void visitLogicalBinaryExpression(LogicalBinaryExpression node, StringBuilder buf) {
- buf.append("(");
- process(node.getLeft(), buf);
- buf.append(" ").append(node.getType().toString()).append(" ");
- process(node.getRight(), buf);
- buf.append(")");
- return null;
- }
-
- protected Void visitNotExpression(NotExpression node, StringBuilder buf) {
- buf.append("-");
- process(node.getValue(), buf);
- return null;
- }
-
- protected Void visitComparisonExpression(ComparisonExpression node, StringBuilder buf) {
- if (!(node.getLeft() instanceof StringLiteral || node.getLeft() instanceof QualifiedNameReference)) {
- throw new RuntimeException("Left side of comparison must be a literal.");
- }
-
- String field = getPredicateField(node.getLeft());
- String value = node.getRight().toString();
- value = stripSingleQuotes(value);
-
- if(!value.startsWith("(") && !value.startsWith("[")) {
- //If no parens default to a phrase search.
- value = '"'+value+'"';
- }
-
- String lowerBound;
- String upperBound;
- String lowerValue;
- String upperValue;
-
- ComparisonExpression.Type t = node.getType();
- switch(t) {
- case NOT_EQUAL:
- buf.append('(').append('-').append(field).append(":").append(value).append(')');
- return null;
- case EQUAL:
- buf.append('(').append(field).append(":").append(value).append(')');
- return null;
- case LESS_THAN:
- lowerBound = "[";
- upperBound = "}";
- lowerValue = "*";
- upperValue = value;
- buf.append('(').append(field).append(":").append(lowerBound).append(lowerValue).append(" TO ").append(upperValue).append(upperBound).append(')');
- return null;
- case LESS_THAN_OR_EQUAL:
- lowerBound = "[";
- upperBound = "]";
- lowerValue = "*";
- upperValue = value;
- buf.append('(').append(field).append(":").append(lowerBound).append(lowerValue).append(" TO ").append(upperValue).append(upperBound).append(')');
- return null;
- case GREATER_THAN:
- lowerBound = "{";
- upperBound = "]";
- lowerValue = value;
- upperValue = "*";
- buf.append('(').append(field).append(":").append(lowerBound).append(lowerValue).append(" TO ").append(upperValue).append(upperBound).append(')');
- return null;
- case GREATER_THAN_OR_EQUAL:
- lowerBound = "[";
- upperBound = "]";
- lowerValue = value;
- upperValue = "*";
- buf.append('(').append(field).append(":").append(lowerBound).append(lowerValue).append(" TO ").append(upperValue).append(upperBound).append(')');
- return null;
- }
-
- return null;
- }
- }
-
- static class SQLVisitor extends AstVisitor {
- private final StringBuilder builder;
- public String table;
- public List fields = new ArrayList();
- public List groupBy = new ArrayList();
- public List sorts;
- public String query ="*:*"; //If no query is specified pull all the records
- public int limit = -1;
- public boolean groupByQuery;
- public Expression havingExpression;
- public boolean isDistinct;
- public boolean hasColumnAliases;
- public Map columnAliases = new HashMap();
- public Map reverseColumnAliases = new HashMap();
-
- public SQLVisitor(StringBuilder builder) {
- this.builder = builder;
- }
-
- protected Void visitNode(Node node, Integer indent) {
- throw new UnsupportedOperationException("not yet implemented: " + node);
- }
-
- protected void reverseAliases() {
- for(String key : columnAliases.keySet()) {
- reverseColumnAliases.put(columnAliases.get(key), key);
- }
-
- //Handle the group by.
- List newGroups = new ArrayList();
-
- for(String g : groupBy) {
- if (reverseColumnAliases.containsKey(g)) {
- newGroups.add(reverseColumnAliases.get(g));
- } else {
- newGroups.add(g);
- }
- }
-
- groupBy = newGroups;
- }
-
-
-
-
- protected Void visitUnnest(Unnest node, Integer indent) {
- return null;
- }
-
- protected Void visitQuery(Query node, Integer indent) {
- if(node.getWith().isPresent()) {
- With confidence = (With)node.getWith().get();
- this.append(indent.intValue(), "WITH");
- if(confidence.isRecursive()) {
- }
-
- Iterator queries = confidence.getQueries().iterator();
-
- while(queries.hasNext()) {
- WithQuery query = (WithQuery)queries.next();
- this.process(new TableSubquery(query.getQuery()), indent);
- if(queries.hasNext()) {
- }
- }
- }
-
- this.processRelation(node.getQueryBody(), indent);
- if(!node.getOrderBy().isEmpty()) {
- this.sorts = node.getOrderBy();
- }
-
- if(node.getLimit().isPresent()) {
- }
-
- if(node.getApproximate().isPresent()) {
-
- }
-
- return null;
- }
-
- protected Void visitQuerySpecification(QuerySpecification node, Integer indent) {
- this.process(node.getSelect(), indent);
- if(node.getFrom().isPresent()) {
- this.process((Node)node.getFrom().get(), indent);
- }
-
- if(node.getWhere().isPresent()) {
- Expression ex = node.getWhere().get();
- ExpressionVisitor expressionVisitor = new ExpressionVisitor();
- StringBuilder buf = new StringBuilder();
- expressionVisitor.process(ex, buf);
- this.query = buf.toString();
- }
-
- if(!node.getGroupBy().isEmpty()) {
- this.groupByQuery = true;
- List groups = node.getGroupBy();
- for(Expression group : groups) {
- groupBy.add(getGroupField(group));
- }
- }
-
- if(node.getHaving().isPresent()) {
- this.havingExpression = node.getHaving().get();
- }
-
- if(!node.getOrderBy().isEmpty()) {
- this.sorts = node.getOrderBy();
- }
-
- if(node.getLimit().isPresent()) {
- this.limit = Integer.parseInt(stripQuotes(node.getLimit().get()));
- }
-
- return null;
- }
-
- protected Void visitComparisonExpression(ComparisonExpression node, Integer index) {
- String field = node.getLeft().toString();
- String value = node.getRight().toString();
- query = stripSingleQuotes(stripQuotes(field))+":"+stripQuotes(value);
- return null;
- }
-
- protected Void visitSelect(Select node, Integer indent) {
- this.append(indent.intValue(), "SELECT");
- if(node.isDistinct()) {
- this.isDistinct = true;
- }
-
- if(node.getSelectItems().size() > 1) {
- boolean first = true;
-
- for(Iterator var4 = node.getSelectItems().iterator(); var4.hasNext(); first = false) {
- SelectItem item = (SelectItem)var4.next();
- this.process(item, indent);
- }
- } else {
- this.process((Node) Iterables.getOnlyElement(node.getSelectItems()), indent);
- }
-
- return null;
- }
-
- protected Void visitSingleColumn(SingleColumn node, Integer indent) {
-
- Expression ex = node.getExpression();
- String field = null;
-
- if(ex instanceof QualifiedNameReference) {
-
- QualifiedNameReference ref = (QualifiedNameReference)ex;
- List parts = ref.getName().getOriginalParts();
- field = parts.get(0);
-
- } else if(ex instanceof FunctionCall) {
-
- FunctionCall functionCall = (FunctionCall)ex;
- List parts = functionCall.getName().getOriginalParts();
- List args = functionCall.getArguments();
- String col = null;
-
- if(args.size() > 0 && args.get(0) instanceof QualifiedNameReference) {
- QualifiedNameReference ref = (QualifiedNameReference) args.get(0);
- col = ref.getName().getOriginalParts().get(0);
- field = parts.get(0)+"("+stripSingleQuotes(col)+")";
- } else {
- field = stripSingleQuotes(stripQuotes(functionCall.toString()));
- }
-
- } else if(ex instanceof StringLiteral) {
- StringLiteral stringLiteral = (StringLiteral)ex;
- field = stripSingleQuotes(stringLiteral.toString());
- }
-
- fields.add(field);
-
- if(node.getAlias().isPresent()) {
- String alias = node.getAlias().get();
- columnAliases.put(field, alias);
- hasColumnAliases = true;
- } else {
- columnAliases.put(field, field);
- }
-
- return null;
- }
-
-
-
-
- protected Void visitAllColumns(AllColumns node, Integer context) {
- return null;
- }
-
- protected Void visitTable(Table node, Integer indent) {
- this.table = stripSingleQuotes(node.getName().toString());
- return null;
- }
-
- protected Void visitAliasedRelation(AliasedRelation node, Integer indent) {
- this.process(node.getRelation(), indent);
- return null;
- }
-
- protected Void visitValues(Values node, Integer indent) {
- boolean first = true;
-
- for(Iterator var4 = node.getRows().iterator(); var4.hasNext(); first = false) {
- Expression row = (Expression)var4.next();
-
- }
-
- return null;
- }
-
- private void processRelation(Relation relation, Integer indent) {
- if(relation instanceof Table) {
- } else {
- this.process(relation, indent);
- }
- }
-
- private StringBuilder append(int indent, String value) {
- return this.builder.append(indentString(indent)).append(value);
- }
-
- private static String indentString(int indent) {
- return Strings.repeat(" ", indent);
- }
- }
-
- private static String getSortField(SortItem sortItem, Map reverseColumnAliases)
- {
- String field;
- Expression ex = sortItem.getSortKey();
- if(ex instanceof QualifiedNameReference) {
- QualifiedNameReference ref = (QualifiedNameReference)ex;
- List parts = ref.getName().getOriginalParts();
- field = parts.get(0);
- } else if(ex instanceof FunctionCall) {
- FunctionCall functionCall = (FunctionCall)ex;
- List parts = functionCall.getName().getOriginalParts();
- List args = functionCall.getArguments();
- String col = null;
-
- if(args.size() > 0 && args.get(0) instanceof QualifiedNameReference) {
- QualifiedNameReference ref = (QualifiedNameReference) args.get(0);
- col = ref.getName().getOriginalParts().get(0);
- field = parts.get(0)+"("+stripSingleQuotes(col)+")";
- } else {
- field = stripSingleQuotes(stripQuotes(functionCall.toString()));
- }
-
- } else {
- StringLiteral stringLiteral = (StringLiteral)ex;
- field = stripSingleQuotes(stringLiteral.toString());
- }
-
- if(reverseColumnAliases.containsKey(field)) {
- field = reverseColumnAliases.get(field);
- }
-
- return field;
- }
-
-
- private static String getHavingField(Expression ex)
- {
- String field;
- if(ex instanceof QualifiedNameReference) {
- QualifiedNameReference ref = (QualifiedNameReference)ex;
- List parts = ref.getName().getOriginalParts();
- field = parts.get(0);
- } else if(ex instanceof FunctionCall) {
- FunctionCall functionCall = (FunctionCall)ex;
- List parts = functionCall.getName().getOriginalParts();
- List args = functionCall.getArguments();
- String col = null;
-
- if(args.size() > 0 && args.get(0) instanceof QualifiedNameReference) {
- QualifiedNameReference ref = (QualifiedNameReference) args.get(0);
- col = ref.getName().getOriginalParts().get(0);
- field = parts.get(0)+"("+stripSingleQuotes(col)+")";
- } else {
- field = stripSingleQuotes(stripQuotes(functionCall.toString()));
- }
-
- } else {
- StringLiteral stringLiteral = (StringLiteral)ex;
- field = stripSingleQuotes(stringLiteral.toString());
- }
-
- return field;
- }
-
-
- private static String getPredicateField(Expression ex)
- {
- String field;
- if(ex instanceof QualifiedNameReference) {
- QualifiedNameReference ref = (QualifiedNameReference)ex;
- List parts = ref.getName().getOriginalParts();
- field = parts.get(0);
- } else {
- StringLiteral stringLiteral = (StringLiteral)ex;
- field = stripSingleQuotes(stringLiteral.toString());
- }
-
- return field;
- }
-
- private static String getGroupField(Expression ex)
- {
- String field;
- if(ex instanceof QualifiedNameReference) {
- QualifiedNameReference ref = (QualifiedNameReference)ex;
- List parts = ref.getName().getOriginalParts();
- field = parts.get(0);
- } else {
- StringLiteral stringLiteral = (StringLiteral)ex;
- field = stripSingleQuotes(stringLiteral.toString());
- }
-
- return field;
- }
-
-
- private static class LimitStream extends TupleStream {
-
- private TupleStream stream;
- private int limit;
- private int count;
-
- public LimitStream(TupleStream stream, int limit) {
- this.stream = stream;
- this.limit = limit;
- }
-
- public void open() throws IOException {
- this.stream.open();
- }
-
- public void close() throws IOException {
- this.stream.close();
- }
-
- public List children() {
- List children = new ArrayList();
- children.add(stream);
- return children;
- }
-
- public StreamComparator getStreamSort(){
- return stream.getStreamSort();
- }
-
- public void setStreamContext(StreamContext context) {
- stream.setStreamContext(context);
- }
-
- @Override
- public Explanation toExplanation(StreamFactory factory) throws IOException {
-
- return new StreamExplanation(getStreamNodeId().toString())
- .withChildren(new Explanation[]{
- stream.toExplanation(factory)
- })
- .withFunctionName("SQL LIMIT")
- .withExpression("--non-expressible--")
- .withImplementingClass(this.getClass().getName())
- .withExpressionType(ExpressionType.STREAM_DECORATOR);
- }
-
- public Tuple read() throws IOException {
- ++count;
- if(count > limit) {
- Map fields = new HashMap();
- fields.put("EOF", "true");
- return new Tuple(fields);
- }
-
- Tuple tuple = stream.read();
- return tuple;
- }
- }
-
- public static enum AggregationMode {
-
- MAP_REDUCE,
- FACET;
-
- public static AggregationMode getMode(String mode) throws IOException{
- if(mode.equalsIgnoreCase("facet")) {
- return FACET;
- } else if(mode.equalsIgnoreCase("map_reduce")) {
- return MAP_REDUCE;
- } else {
- throw new IOException("Invalid aggregation mode:"+mode);
- }
- }
- }
-
- private static class HavingStream extends TupleStream {
-
- private TupleStream stream;
- private HavingVisitor havingVisitor;
- private Expression havingExpression;
-
- public HavingStream(TupleStream stream, Expression havingExpression, Map reverseAliasMap) {
- this.stream = stream;
- this.havingVisitor = new HavingVisitor(reverseAliasMap);
- this.havingExpression = havingExpression;
- }
-
- public void open() throws IOException {
- this.stream.open();
- }
-
- public void close() throws IOException {
- this.stream.close();
- }
-
- public StreamComparator getStreamSort(){
- return stream.getStreamSort();
- }
-
- public List children() {
- List children = new ArrayList();
- children.add(stream);
- return children;
- }
-
- @Override
- public Explanation toExplanation(StreamFactory factory) throws IOException {
-
- return new StreamExplanation(getStreamNodeId().toString())
- .withChildren(new Explanation[]{
- stream.toExplanation(factory)
- })
- .withFunctionName("SQL HAVING")
- .withExpression("--non-expressible--")
- .withImplementingClass(this.getClass().getName())
- .withExpressionType(ExpressionType.STREAM_DECORATOR);
- }
-
- public void setStreamContext(StreamContext context) {
- stream.setStreamContext(context);
- }
-
- public Tuple read() throws IOException {
- while (true) {
- Tuple tuple = stream.read();
- if (tuple.EOF) {
- return tuple;
- }
-
- if (havingVisitor.process(havingExpression, tuple)) {
- return tuple;
- }
- }
- }
- }
-
- private static class CatalogsStream extends TupleStream {
- private final String zkHost;
- private StreamContext context;
- private int currentIndex = 0;
- private List catalogs;
-
- CatalogsStream(String zkHost) {
- this.zkHost = zkHost;
- }
-
- public List children() {
- return new ArrayList<>();
- }
-
- public void open() throws IOException {
- this.catalogs = new ArrayList<>();
- this.catalogs.add(this.zkHost);
- }
-
- @Override
- public Explanation toExplanation(StreamFactory factory) throws IOException {
-
- return new StreamExplanation(getStreamNodeId().toString())
- .withFunctionName("SQL CATALOG")
- .withExpression("--non-expressible--")
- .withImplementingClass(this.getClass().getName())
- .withExpressionType(ExpressionType.STREAM_DECORATOR);
- }
-
- public Tuple read() throws IOException {
- Map fields = new HashMap<>();
- if (this.currentIndex < this.catalogs.size()) {
- fields.put("TABLE_CAT", this.catalogs.get(this.currentIndex));
- this.currentIndex += 1;
- } else {
- fields.put("EOF", "true");
- }
- return new Tuple(fields);
- }
-
- public StreamComparator getStreamSort() {
- return null;
- }
-
- public void close() throws IOException {
-
- }
-
- public void setStreamContext(StreamContext context) {
- this.context = context;
- }
- }
-
- private static class SchemasStream extends TupleStream {
- private final String zkHost;
- private StreamContext context;
-
- SchemasStream(String zkHost) {
- this.zkHost = zkHost;
- }
-
- public List children() {
- return new ArrayList<>();
- }
-
- public void open() throws IOException {
-
- }
-
- @Override
- public Explanation toExplanation(StreamFactory factory) throws IOException {
-
- return new StreamExplanation(getStreamNodeId().toString())
- .withFunctionName("SQL SCHEMA")
- .withExpression("--non-expressible--")
- .withImplementingClass(this.getClass().getName())
- .withExpressionType(ExpressionType.STREAM_DECORATOR);
- }
-
- public Tuple read() throws IOException {
- Map fields = new HashMap<>();
- fields.put("EOF", "true");
- return new Tuple(fields);
- }
-
- public StreamComparator getStreamSort() {
- return null;
- }
-
- public void close() throws IOException {
-
- }
-
- public void setStreamContext(StreamContext context) {
- this.context = context;
- }
- }
-
- private static class TableStream extends TupleStream {
- private final String zkHost;
- private StreamContext context;
- private int currentIndex = 0;
- private List tables;
-
- TableStream(String zkHost) {
- this.zkHost = zkHost;
- }
-
- public List children() {
- return new ArrayList<>();
- }
-
- public void open() throws IOException {
- this.tables = new ArrayList<>();
-
- CloudSolrClient cloudSolrClient = this.context.getSolrClientCache().getCloudSolrClient(this.zkHost);
- cloudSolrClient.connect();
- ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
- Set collections = zkStateReader.getClusterState().getCollectionStates().keySet();
- if (collections.size() != 0) {
- this.tables.addAll(collections);
- }
- Collections.sort(this.tables);
- }
-
- @Override
- public Explanation toExplanation(StreamFactory factory) throws IOException {
-
- return new StreamExplanation(getStreamNodeId().toString())
- .withFunctionName("SQL TABLE")
- .withExpression("--non-expressible--")
- .withImplementingClass(this.getClass().getName())
- .withExpressionType(ExpressionType.STREAM_DECORATOR);
- }
-
- public Tuple read() throws IOException {
- Map fields = new HashMap<>();
- if (this.currentIndex < this.tables.size()) {
- fields.put("TABLE_CAT", this.zkHost);
- fields.put("TABLE_SCHEM", null);
- fields.put("TABLE_NAME", this.tables.get(this.currentIndex));
- fields.put("TABLE_TYPE", "TABLE");
- fields.put("REMARKS", null);
- this.currentIndex += 1;
- } else {
- fields.put("EOF", "true");
- }
- return new Tuple(fields);
- }
-
- public StreamComparator getStreamSort() {
- return null;
- }
-
- public void close() throws IOException {
-
- }
-
- public void setStreamContext(StreamContext context) {
- this.context = context;
- }
- }
-
- private static class MetadataStream extends TupleStream {
-
- private final TupleStream stream;
- private final SQLVisitor sqlVisitor;
+ /*
+ * Only necessary for SolrJ JDBC driver since metadata has to be passed back
+ */
+ private class SqlHandlerStream extends JDBCStream {
+ private final boolean includeMetadata;
private boolean firstTuple = true;
- public MetadataStream(TupleStream stream, SQLVisitor sqlVistor) {
- this.stream = stream;
- this.sqlVisitor = sqlVistor;
+ SqlHandlerStream(String connectionUrl, String sqlQuery, StreamComparator definedSort,
+ Properties connectionProperties, String driverClassName, boolean includeMetadata)
+ throws IOException {
+ super(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClassName);
+
+ this.includeMetadata = includeMetadata;
}
- public List children() {
- return this.stream.children();
- }
-
- public void open() throws IOException {
- this.stream.open();
- }
-
@Override
- public Explanation toExplanation(StreamFactory factory) throws IOException {
-
- return new StreamExplanation(getStreamNodeId().toString())
- .withChildren(new Explanation[]{
- stream.toExplanation(factory)
- })
- .withFunctionName("SQL METADATA")
- .withExpression("--non-expressible--")
- .withImplementingClass(this.getClass().getName())
- .withExpressionType(ExpressionType.STREAM_DECORATOR);
- }
-
- // Return a metadata tuple as the first tuple and then pass through to the underlying stream.
public Tuple read() throws IOException {
- if(firstTuple) {
- firstTuple = false;
+ // Return a metadata tuple as the first tuple and then pass through to the JDBCStream.
+ if(includeMetadata && firstTuple) {
+ try {
+ Map fields = new HashMap<>();
- Map fields = new HashMap<>();
- fields.put("isMetadata", true);
- fields.put("fields", sqlVisitor.fields);
- fields.put("aliases", sqlVisitor.columnAliases);
- return new Tuple(fields);
- }
+ firstTuple = false;
- return this.stream.read();
- }
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
- public StreamComparator getStreamSort() {
- return this.stream.getStreamSort();
- }
+ List metadataFields = new ArrayList<>();
+ Map metadataAliases = new HashMap<>();
+ for(int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ String columnName = resultSetMetaData.getColumnName(i);
+ String columnLabel = resultSetMetaData.getColumnLabel(i);
+ metadataFields.add(columnName);
+ metadataAliases.put(columnName, columnLabel);
+ }
- public void close() throws IOException {
- this.stream.close();
- }
-
- public void setStreamContext(StreamContext context) {
- this.stream.setStreamContext(context);
- }
- }
-
- private static class HavingVisitor extends AstVisitor {
-
- private Map reverseAliasMap;
-
- public HavingVisitor(Map reverseAliasMap) {
- this.reverseAliasMap = reverseAliasMap;
- }
-
- protected Boolean visitLogicalBinaryExpression(LogicalBinaryExpression node, Tuple tuple) {
-
- Boolean b = process(node.getLeft(), tuple);
- if(node.getType() == LogicalBinaryExpression.Type.AND) {
- if(!b) {
- //Short circuit
- return false;
- } else {
- return process(node.getRight(), tuple);
+ fields.put("isMetadata", true);
+ fields.put("fields", metadataFields);
+ fields.put("aliases", metadataAliases);
+ return new Tuple(fields);
+ } catch (SQLException e) {
+ throw new IOException(e);
}
} else {
- if(b) {
- //Short circuit
- return true;
- } else {
- return process(node.getRight(), tuple);
- }
- }
- }
-
- protected Boolean visitComparisonExpression(ComparisonExpression node, Tuple tuple) {
- String field = getHavingField(node.getLeft());
-
- if(reverseAliasMap.containsKey(field)) {
- field = reverseAliasMap.get(field);
- }
-
- double d = Double.parseDouble(node.getRight().toString());
- double td = tuple.getDouble(field);
- ComparisonExpression.Type t = node.getType();
-
- switch(t) {
- case LESS_THAN:
- return td < d;
- case LESS_THAN_OR_EQUAL:
- return td <= d;
- case NOT_EQUAL:
- return td != d;
- case EQUAL:
- return td == d;
- case GREATER_THAN:
- return td > d;
- case GREATER_THAN_OR_EQUAL:
- return td >= d;
- default:
- return false;
+ return super.read();
}
}
}
- }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/CalciteSolrDriver.java b/solr/core/src/java/org/apache/solr/handler/sql/CalciteSolrDriver.java
new file mode 100644
index 00000000000..3a7640de83e
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/CalciteSolrDriver.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.sql;
+
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.Driver;
+import org.apache.calcite.schema.SchemaPlus;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * JDBC driver for Calcite Solr.
+ *
+ *
It accepts connect strings that start with "jdbc:calcitesolr:".
+ */
+public class CalciteSolrDriver extends Driver {
+ public final static String CONNECT_STRING_PREFIX = "jdbc:calcitesolr:";
+
+ private CalciteSolrDriver() {
+ super();
+ }
+
+ static {
+ new CalciteSolrDriver().register();
+ }
+
+ @Override
+ protected String getConnectStringPrefix() {
+ return CONNECT_STRING_PREFIX;
+ }
+
+ @Override
+ public Connection connect(String url, Properties info) throws SQLException {
+ if(!this.acceptsURL(url)) {
+ return null;
+ }
+
+ Connection connection = super.connect(url, info);
+ CalciteConnection calciteConnection = (CalciteConnection) connection;
+ final SchemaPlus rootSchema = calciteConnection.getRootSchema();
+
+ String schemaName = info.getProperty("zk");
+ if(schemaName == null) {
+ throw new SQLException("zk must be set");
+ }
+ rootSchema.add(schemaName, new SolrSchema(info));
+
+ // Set the default schema
+ calciteConnection.setSchema(schemaName);
+
+ return connection;
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java b/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java
new file mode 100644
index 00000000000..0d4bb72adf4
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.sql;
+
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class LimitStream extends TupleStream {
+
+ private final TupleStream stream;
+ private final int limit;
+ private int count;
+
+ LimitStream(TupleStream stream, int limit) {
+ this.stream = stream;
+ this.limit = limit;
+ }
+
+ public void open() throws IOException {
+ this.stream.open();
+ }
+
+ public void close() throws IOException {
+ this.stream.close();
+ }
+
+ public List children() {
+ List children = new ArrayList<>();
+ children.add(stream);
+ return children;
+ }
+
+ public StreamComparator getStreamSort(){
+ return stream.getStreamSort();
+ }
+
+ public void setStreamContext(StreamContext context) {
+ stream.setStreamContext(context);
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) throws IOException {
+
+ return new StreamExplanation(getStreamNodeId().toString())
+ .withChildren(new Explanation[]{
+ stream.toExplanation(factory)
+ })
+ .withFunctionName("SQL LIMIT")
+ .withExpression("--non-expressible--")
+ .withImplementingClass(this.getClass().getName())
+ .withExpressionType(Explanation.ExpressionType.STREAM_DECORATOR);
+ }
+
+ public Tuple read() throws IOException {
+ ++count;
+ if(count > limit) {
+ Map fields = new HashMap<>();
+ fields.put("EOF", "true");
+ return new Tuple(fields);
+ }
+
+ return stream.read();
+ }
+}
\ No newline at end of file
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java
new file mode 100644
index 00000000000..983ab7691be
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrAggregate.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.sql;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+
+import java.util.*;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Aggregate} relational expression in Solr.
+ */
+class SolrAggregate extends Aggregate implements SolrRel {
+ private static final List SUPPORTED_AGGREGATIONS = Arrays.asList(
+ SqlStdOperatorTable.COUNT,
+ SqlStdOperatorTable.SUM,
+ SqlStdOperatorTable.SUM0,
+ SqlStdOperatorTable.MIN,
+ SqlStdOperatorTable.MAX,
+ SqlStdOperatorTable.AVG
+ );
+
+ SolrAggregate(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode child,
+ boolean indicator,
+ ImmutableBitSet groupSet,
+ List groupSets,
+ List aggCalls) {
+ super(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls);
+ assert getConvention() == SolrRel.CONVENTION;
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override
+ public Aggregate copy(RelTraitSet traitSet, RelNode input,
+ boolean indicator, ImmutableBitSet groupSet,
+ List groupSets, List aggCalls) {
+ return new SolrAggregate(getCluster(), traitSet, input, indicator, groupSet, groupSets, aggCalls);
+ }
+
+ public void implement(Implementor implementor) {
+ implementor.visitChild(0, getInput());
+
+ final List inNames = SolrRules.solrFieldNames(getInput().getRowType());
+
+
+ for(Pair namedAggCall : getNamedAggCalls()) {
+
+
+ AggregateCall aggCall = namedAggCall.getKey();
+
+ Pair metric = toSolrMetric(implementor, aggCall, inNames);
+ implementor.addReverseAggMapping(namedAggCall.getValue(), metric.getKey().toLowerCase()+"("+metric.getValue()+")");
+ implementor.addMetricPair(namedAggCall.getValue(), metric.getKey(), metric.getValue());
+ if(aggCall.getName() == null) {
+ implementor.addFieldMapping(namedAggCall.getValue(),
+ aggCall.getAggregation().getName() + "(" + inNames.get(aggCall.getArgList().get(0)) + ")");
+ }
+ }
+
+ for(int group : getGroupSet()) {
+ String inName = inNames.get(group);
+ implementor.addBucket(inName);
+ }
+ }
+
+ private Pair toSolrMetric(Implementor implementor, AggregateCall aggCall, List inNames) {
+ SqlAggFunction aggregation = aggCall.getAggregation();
+ List args = aggCall.getArgList();
+ switch (args.size()) {
+ case 0:
+ if (aggregation.equals(SqlStdOperatorTable.COUNT)) {
+ return new Pair<>(aggregation.getName(), "*");
+ }
+ case 1:
+ String inName = inNames.get(args.get(0));
+ String name = implementor.fieldMappings.getOrDefault(inName, inName);
+ if(SUPPORTED_AGGREGATIONS.contains(aggregation)) {
+ return new Pair<>(aggregation.getName(), name);
+ }
+ default:
+ throw new AssertionError("Invalid aggregation " + aggregation + " with args " + args + " with names" + inNames);
+ }
+ }
+}
+
+// End SolrAggregate.java
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrEnumerator.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrEnumerator.java
new file mode 100644
index 00000000000..6f9dddfbf2f
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrEnumerator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.sql;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+
+/** Enumerator that reads from a Solr collection. */
+class SolrEnumerator implements Enumerator