From 129a83b198e805392279208e647c23b4659a0ee0 Mon Sep 17 00:00:00 2001 From: Joel Bernstein Date: Tue, 21 Jul 2015 20:28:35 +0000 Subject: [PATCH] SOLR-7441:Improve overall robustness of the Streaming stack: Streaming API, Streaming Expressions, Parallel SQL git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1692193 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/solr/handler/SQLHandler.java | 163 ++++++++++++------ .../apache/solr/handler/StreamHandler.java | 63 ++++++- .../solr/response/SortingResponseWriter.java | 24 ++- .../solr/response/TextResponseWriter.java | 5 +- .../solr/search/ExportQParserPlugin.java | 6 +- .../solr/collection1/conf/solrconfig-sql.xml | 4 - .../apache/solr/handler/TestSQLHandler.java | 96 +++++++---- .../conf/solrconfig.xml | 54 +++--- .../conf/solrconfig.xml | 7 +- .../apache/solr/client/solrj/io/Tuple.java | 4 +- .../solrj/io/stream/CloudSolrStream.java | 5 + .../solrj/io/stream/ExceptionStream.java | 9 +- .../solrj/io/stream/JSONTupleStream.java | 22 +++ .../solrj/io/stream/ParallelStream.java | 10 +- .../client/solrj/io/stream/RollupStream.java | 5 + .../client/solrj/io/stream/SolrStream.java | 9 +- .../client/solrj/io/stream/TupleStream.java | 14 +- .../client/solrj/io/stream/StreamingTest.java | 16 +- 18 files changed, 355 insertions(+), 161 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java index 1f64e1f7f9c..7bcd4915349 100644 --- a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java @@ -19,7 +19,6 @@ package org.apache.solr.handler; import java.io.IOException; import java.util.ArrayList; -import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; import java.util.Locale; @@ -43,7 +42,9 @@ import org.apache.solr.client.solrj.io.stream.StreamContext; import org.apache.solr.client.solrj.io.stream.TupleStream; import org.apache.solr.client.solrj.io.stream.ExceptionStream; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.CoreContainer; @@ -63,9 +64,8 @@ import com.facebook.presto.sql.parser.SqlParser; public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { - private Map tableMappings = new HashMap(); - private String defaultZkhost = null; - private String defaultWorkerCollection = null; + private static String defaultZkhost = null; + private static String defaultWorkerCollection = null; private Logger logger = LoggerFactory.getLogger(SQLHandler.class); @@ -77,41 +77,42 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { defaultZkhost = core.getCoreDescriptor().getCoreContainer().getZkController().getZkServerAddress(); defaultWorkerCollection = core.getCoreDescriptor().getCollectionName(); } - - NamedList tableConf = (NamedList)initArgs.get("tables"); - - for(Entry entry : tableConf) { - String tableName = entry.getKey(); - if(entry.getValue().indexOf("@") > -1) { - String[] parts = entry.getValue().split("@"); - tableMappings.put(tableName, new TableSpec(parts[0], parts[1])); - } else { - String collection = entry.getValue(); - tableMappings.put(tableName, new TableSpec(collection, defaultZkhost)); - } - } } public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { SolrParams params = req.getParams(); + params = adjustParams(params); + req.setParams(params); String sql = params.get("sql"); int numWorkers = params.getInt("numWorkers", 1); String workerCollection = params.get("workerCollection", defaultWorkerCollection); String workerZkhost = params.get("workerZkhost",defaultZkhost); StreamContext context = new StreamContext(); try { - TupleStream tupleStream = SQLTupleStreamParser.parse(sql, tableMappings, numWorkers, workerCollection, workerZkhost); + + if(sql == null) { + throw new Exception("sql parameter cannot be null"); + } + + TupleStream tupleStream = SQLTupleStreamParser.parse(sql, numWorkers, workerCollection, workerZkhost); context.numWorkers = numWorkers; context.setSolrClientCache(StreamHandler.clientCache); tupleStream.setStreamContext(context); - rsp.add("tuples", new ExceptionStream(tupleStream)); + rsp.add("result-set", new StreamHandler.TimerStream(new ExceptionStream(tupleStream))); } catch(Exception e) { //Catch the SQL parsing and query transformation exceptions. - logger.error("Exception parsing SQL", e); - rsp.add("tuples", new StreamHandler.DummyErrorStream(e)); + SolrException.log(logger, e); + rsp.add("result-set", new StreamHandler.DummyErrorStream(e)); } } + private SolrParams adjustParams(SolrParams params) { + ModifiableSolrParams adjustedParams = new ModifiableSolrParams(); + adjustedParams.add(params); + adjustedParams.add(CommonParams.OMIT_HEADER, "true"); + return adjustedParams; + } + public String getDescription() { return "SQLHandler"; } @@ -123,7 +124,6 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { public static class SQLTupleStreamParser { public static TupleStream parse(String sql, - Map tableMap, int numWorkers, String workerCollection, String workerZkhost) throws IOException { @@ -137,30 +137,33 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { TupleStream sqlStream = null; if(sqlVistor.groupByQuery) { - sqlStream = doGroupBy(sqlVistor, tableMap, numWorkers, workerCollection, workerZkhost); + sqlStream = doGroupByWithAggregates(sqlVistor, numWorkers, workerCollection, workerZkhost); } else { - sqlStream = doSelect(sqlVistor, tableMap, numWorkers, workerCollection, workerZkhost); + sqlStream = doSelect(sqlVistor); } return sqlStream; } } - private static TupleStream doGroupBy(SQLVisitor sqlVisitor, - Map tableMap, - int numWorkers, - String workerCollection, - String workerZkHost) throws IOException { + private static TupleStream doGroupByWithAggregates(SQLVisitor sqlVisitor, + int numWorkers, + String workerCollection, + String workerZkHost) throws IOException { Set fieldSet = new HashSet(); Bucket[] buckets = getBuckets(sqlVisitor.groupBy, fieldSet); Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet); + if(metrics.length == 0) { + throw new IOException("Group by queries must include atleast one aggregate function."); + } String fl = fields(fieldSet); String sortDirection = getSortDirection(sqlVisitor.sorts); String sort = bucketSort(buckets, sortDirection); - TableSpec tableSpec = tableMap.get(sqlVisitor.table); + TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost); + String zkHost = tableSpec.zkHost; String collection = tableSpec.collection; Map params = new HashMap(); @@ -229,17 +232,36 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { return tupleStream; } - private static TupleStream doSelect(SQLVisitor sqlVisitor, - Map tableMap, - int numWorkers, - String workerCollection, - String workerZkHost) throws IOException { + private static TupleStream doSelect(SQLVisitor sqlVisitor) throws IOException { List fields = sqlVisitor.fields; StringBuilder flbuf = new StringBuilder(); boolean comma = false; - for(String field : fields) { - if(comma) { + if(fields.size() == 0) { + throw new IOException("Select columns must be specified."); + } + + boolean score = false; + + for (String field : fields) { + + if(field.contains("(")) { + throw new IOException("Aggregate functions only supported with group by queries."); + } + + if(field.contains("*")) { + throw new IOException("* is not supported for column selection."); + } + + if(field.equals("score")) { + if(sqlVisitor.limit < 0) { + throw new IOException("score is not a valid field for unlimited select queries"); + } else { + score = true; + } + } + + if (comma) { flbuf.append(","); } @@ -254,21 +276,37 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { StringBuilder siBuf = new StringBuilder(); comma = false; - for(SortItem sortItem : sorts) { - if(comma) { - siBuf.append(","); + + if(sorts != null) { + for (SortItem sortItem : sorts) { + if (comma) { + siBuf.append(","); + } + siBuf.append(stripQuotes(sortItem.getSortKey().toString()) + " " + ascDesc(sortItem.getOrdering().toString())); + } + } else { + if(sqlVisitor.limit < 0) { + throw new IOException("order by is required for unlimited select statements."); + } else { + siBuf.append("score desc"); + if(!score) { + fl = fl+(",score"); + } } - siBuf.append(stripQuotes(sortItem.getSortKey().toString()) + " " + ascDesc(sortItem.getOrdering().toString())); } - TableSpec tableSpec = tableMap.get(sqlVisitor.table); + TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost); + String zkHost = tableSpec.zkHost; String collection = tableSpec.collection; Map params = new HashMap(); params.put("fl", fl.toString()); params.put("q", sqlVisitor.query); - params.put("sort", siBuf.toString()); + + if(siBuf.length() > 0) { + params.put("sort", siBuf.toString()); + } if(sqlVisitor.limit > -1) { params.put("rows", Integer.toString(sqlVisitor.limit)); @@ -384,15 +422,15 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { return buf.toString(); } - private static Metric[] getMetrics(List fields, Set fieldSet) { + private static Metric[] getMetrics(List fields, Set fieldSet) throws IOException { List metrics = new ArrayList(); for(String field : fields) { - if(field.contains("(")) { field = field.substring(0, field.length()-1); String[] parts = field.split("\\("); String function = parts[0]; + validateFunction(function); String column = parts[1]; if(function.equals("min")) { metrics.add(new MinMetric(column)); @@ -414,6 +452,14 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { return metrics.toArray(new Metric[metrics.size()]); } + private static void validateFunction(String function) throws IOException { + if(function.equals("min") || function.equals("max") || function.equals("sum") || function.equals("avg") || function.equals("count")) { + return; + } else { + throw new IOException("Invalid function: "+function); + } + } + private static Bucket[] getBuckets(List fields, Set fieldSet) { List buckets = new ArrayList(); for(String field : fields) { @@ -466,13 +512,19 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { } - private class TableSpec { + private static class TableSpec { private String collection; private String zkHost; - public TableSpec(String collection, String zkHost) { - this.collection = collection; - this.zkHost = zkHost; + public TableSpec(String table, String defaultZkHost) { + if(table.contains("@")) { + String[] parts = table.split("@"); + this.collection = parts[0]; + this.zkHost = parts[1]; + } else { + this.collection = table; + this.zkHost = defaultZkHost; + } } } @@ -496,7 +548,14 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { protected Void visitComparisonExpression(ComparisonExpression node, StringBuilder buf) { String field = node.getLeft().toString(); String value = node.getRight().toString(); - buf.append('(').append(stripQuotes(field) + ":" + stripSingleQuotes(value)).append(')'); + value = stripSingleQuotes(value); + + if(!value.startsWith("(") && !value.startsWith("[")) { + //If no parens default to a phrase search. + value = '"'+value+'"'; + } + + buf.append('(').append(stripQuotes(field) + ":" + value).append(')'); return null; } } @@ -805,9 +864,9 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { case EQUAL: return td == d; case GREATER_THAN: - return td <= d; + return td > d; case GREATER_THAN_OR_EQUAL: - return td <= d; + return td >= d; default: return false; } diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index ee801e98074..64192035fdc 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -19,6 +19,7 @@ package org.apache.solr.handler; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; +import java.io.IOException; import java.net.URLDecoder; import java.util.HashMap; import java.util.List; @@ -45,6 +46,9 @@ import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric; import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric; import org.apache.solr.client.solrj.io.stream.metrics.MinMetric; import org.apache.solr.client.solrj.io.stream.metrics.SumMetric; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.Base64; import org.apache.solr.common.util.NamedList; @@ -129,7 +133,8 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { SolrParams params = req.getParams(); - + params = adjustParams(params); + req.setParams(params); boolean objectSerialize = params.getBool("objectSerialize", false); TupleStream tupleStream = null; @@ -146,8 +151,8 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { } } catch (Exception e) { //Catch exceptions that occur while the stream is being created. This will include streaming expression parse rules. - logger.error("Exception creating TupleStream", e); - rsp.add("tuples", new DummyErrorStream(e)); + SolrException.log(logger, e); + rsp.add("result-set", new DummyErrorStream(e)); return; } @@ -159,7 +164,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { context.numWorkers = numWorkers; context.setSolrClientCache(clientCache); tupleStream.setStreamContext(context); - rsp.add("tuples", new ExceptionStream(tupleStream)); + rsp.add("result-set", new TimerStream(new ExceptionStream(tupleStream))); + } + + private SolrParams adjustParams(SolrParams params) { + ModifiableSolrParams adjustedParams = new ModifiableSolrParams(); + adjustedParams.add(params); + adjustedParams.add(CommonParams.OMIT_HEADER, "true"); + return adjustedParams; } public String getDescription() { @@ -198,8 +210,49 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { String msg = e.getMessage(); Map m = new HashMap(); m.put("EOF", true); - m.put("_EXCEPTION_", msg); + m.put("EXCEPTION", msg); return new Tuple(m); } } + + public static class TimerStream extends TupleStream { + + private long begin; + private TupleStream tupleStream; + + public TimerStream(TupleStream tupleStream) { + this.tupleStream = tupleStream; + } + + public StreamComparator getStreamSort() { + return this.tupleStream.getStreamSort(); + } + + public void close() throws IOException { + this.tupleStream.close(); + } + + public void open() throws IOException { + this.begin = System.nanoTime(); + this.tupleStream.open(); + } + + public void setStreamContext(StreamContext context) { + this.tupleStream.setStreamContext(context); + } + + public List children() { + return this.tupleStream.children(); + } + + public Tuple read() throws IOException { + Tuple tuple = this.tupleStream.read(); + if(tuple.EOF) { + long totalTime = (System.nanoTime() - begin) / 1000000; + tuple.fields.put("RESPONSE_TIME", totalTime); + } + return tuple; + } + } + } diff --git a/solr/core/src/java/org/apache/solr/response/SortingResponseWriter.java b/solr/core/src/java/org/apache/solr/response/SortingResponseWriter.java index 035e0e4d8c4..a5fc87b8733 100644 --- a/solr/core/src/java/org/apache/solr/response/SortingResponseWriter.java +++ b/solr/core/src/java/org/apache/solr/response/SortingResponseWriter.java @@ -37,6 +37,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.CharsRefBuilder; import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.LongValues; +import org.apache.solr.common.SolrException; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.request.SolrQueryRequest; @@ -72,7 +73,7 @@ public class SortingResponseWriter implements QueryResponseWriter { Exception e1 = res.getException(); if(e1 != null) { if(!(e1 instanceof IgnoreException)) { - e1.printStackTrace(new PrintWriter(writer)); + writeException(e1, writer, false); } return; } @@ -128,16 +129,15 @@ public class SortingResponseWriter implements QueryResponseWriter { exception = e; } - writer.write("{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":"+totalHits+", \"docs\":["); if(exception != null) { - //We have an exception. Send it back to the client and return. - writeException(exception, writer); - writer.write("]}}"); - writer.flush(); + writeException(exception, writer, true); return; } + writer.write("{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":"+totalHits+", \"docs\":["); + + //Write the data. List leaves = req.getSearcher().getTopReaderContext().leaves(); SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort()); @@ -189,6 +189,7 @@ public class SortingResponseWriter implements QueryResponseWriter { } } catch(Throwable e) { Throwable ex = e; + e.printStackTrace(); while(ex != null) { String m = ex.getMessage(); if(m != null && m.contains("Broken pipe")) { @@ -242,10 +243,16 @@ public class SortingResponseWriter implements QueryResponseWriter { } } - protected void writeException(Exception e, Writer out) throws IOException{ - out.write("{\"_EXCEPTION_\":\""); + protected void writeException(Exception e, Writer out, boolean log) throws IOException{ + out.write("{\"responseHeader\": {\"status\": 400}, \"response\":{\"numFound\":0, \"docs\":["); + out.write("{\"EXCEPTION\":\""); writeStr(e.getMessage(), out); out.write("\"}"); + out.write("]}}"); + out.flush(); + if(log) { + SolrException.log(logger, e); + } } protected FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException { @@ -1138,6 +1145,7 @@ public class SortingResponseWriter implements QueryResponseWriter { public void setCurrentValue(int docId) { int ord = currentVals.getOrd(docId); + if(ord < 0) { currentOrd = -1; } else { diff --git a/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java b/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java index 4e51a606b52..f39eaf617d4 100644 --- a/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java +++ b/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java @@ -293,20 +293,21 @@ public abstract class TextResponseWriter { public void writeTupleStream(TupleStream tupleStream) throws IOException { tupleStream.open(); - writeStartDocumentList("response", -1, -1, -1, null); + tupleStream.writeStreamOpen(writer); boolean isFirst = true; while(true) { Tuple tuple = tupleStream.read(); if(!isFirst) { writer.write(","); } + writer.write("\n"); writeMap(null, tuple.fields, false, true); isFirst = false; if(tuple.EOF) { break; } } - writeEndDocumentList(); + tupleStream.writeStreamClose(writer); tupleStream.close(); } diff --git a/solr/core/src/java/org/apache/solr/search/ExportQParserPlugin.java b/solr/core/src/java/org/apache/solr/search/ExportQParserPlugin.java index b159ca5aa54..9a6d74b6177 100644 --- a/solr/core/src/java/org/apache/solr/search/ExportQParserPlugin.java +++ b/solr/core/src/java/org/apache/solr/search/ExportQParserPlugin.java @@ -62,14 +62,12 @@ public class ExportQParserPlugin extends QParserPlugin { public class ExportQuery extends RankQuery { - private int leafCount; private Query mainQuery; private Object id; public RankQuery clone() { ExportQuery clone = new ExportQuery(); clone.id = id; - clone.leafCount = leafCount; return clone; } @@ -98,7 +96,8 @@ public class ExportQParserPlugin extends QParserPlugin { public TopDocsCollector getTopDocsCollector(int len, SolrIndexSearcher.QueryCommand cmd, IndexSearcher searcher) throws IOException { - FixedBitSet[] sets = new FixedBitSet[this.leafCount]; + int leafCount = searcher.getTopReaderContext().leaves().size(); + FixedBitSet[] sets = new FixedBitSet[leafCount]; return new ExportCollector(sets); } @@ -124,7 +123,6 @@ public class ExportQParserPlugin extends QParserPlugin { } public ExportQuery(SolrParams localParams, SolrParams params, SolrQueryRequest request) throws IOException { - this.leafCount = request.getSearcher().getTopReaderContext().leaves().size(); id = new Object(); } } diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-sql.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-sql.xml index 4fc231e851f..59f30320979 100644 --- a/solr/core/src/test-files/solr/collection1/conf/solrconfig-sql.xml +++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-sql.xml @@ -71,10 +71,6 @@ json false - - collection1 - - diff --git a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java index f7fa6649a72..9d5cf9252f6 100644 --- a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java +++ b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java @@ -102,7 +102,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { SQLHandler.SQLVisitor sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder()); sqlVistor.process(statement, new Integer(0)); - assert(sqlVistor.query.equals("(c:d)")); + assert(sqlVistor.query.equals("(c:\"d\")")); //Add parens parser = new SqlParser(); @@ -111,11 +111,11 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder()); sqlVistor.process(statement, new Integer(0)); - assert(sqlVistor.query.equals("(c:d)")); + assert(sqlVistor.query.equals("(c:\"d\")")); //Phrase parser = new SqlParser(); - sql = "select a from b where (c = '\"d d\"')"; + sql = "select a from b where (c = 'd d')"; statement = parser.createStatement(sql); sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder()); sqlVistor.process(statement, new Integer(0)); @@ -129,7 +129,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder()); sqlVistor.process(statement, new Integer(0)); - assert(sqlVistor.query.equals("((c:d) AND (l:z))")); + assert(sqlVistor.query.equals("((c:\"d\") AND (l:\"z\"))")); // OR @@ -139,7 +139,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder()); sqlVistor.process(statement, new Integer(0)); - assert(sqlVistor.query.equals("((c:d) OR (l:z))")); + assert(sqlVistor.query.equals("((c:\"d\") OR (l:\"z\"))")); // AND NOT @@ -149,7 +149,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder()); sqlVistor.process(statement, new Integer(0)); - assert(sqlVistor.query.equals("((c:d) AND -(l:z))")); + assert(sqlVistor.query.equals("((c:\"d\") AND -(l:\"z\"))")); // NESTED parser = new SqlParser(); @@ -158,7 +158,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder()); sqlVistor.process(statement, new Integer(0)); - assert(sqlVistor.query.equals("((c:d) OR ((l:z) AND (m:j)))")); + assert(sqlVistor.query.equals("((c:\"d\") OR ((l:\"z\") AND (m:\"j\")))")); // NESTED NOT parser = new SqlParser(); @@ -167,33 +167,33 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder()); sqlVistor.process(statement, new Integer(0)); - assert(sqlVistor.query.equals("((c:d) OR ((l:z) AND -(m:j)))")); + assert(sqlVistor.query.equals("((c:\"d\") OR ((l:\"z\") AND -(m:\"j\")))")); // RANGE - Will have to do until SQL BETWEEN is supported. // NESTED parser = new SqlParser(); - sql = "select a from b where ((c = '[0 TO 100]') OR ((l = 'z') AND (m = 'j')))"; + sql = "select a from b where ((c = '[0 TO 100]') OR ((l = '(z)') AND (m = 'j')))"; statement = parser.createStatement(sql); sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder()); sqlVistor.process(statement, new Integer(0)); - assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:z) AND (m:j)))")); + assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z)) AND (m:\"j\")))")); // Wildcard parser = new SqlParser(); - sql = "select a from b where ((c = '[0 TO 100]') OR ((l = 'z*') AND (m = 'j')))"; + sql = "select a from b where ((c = '[0 TO 100]') OR ((l = '(z*)') AND (m = 'j')))"; statement = parser.createStatement(sql); sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder()); sqlVistor.process(statement, new Integer(0)); - assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:z*) AND (m:j)))")); + assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z*)) AND (m:\"j\")))")); // Complex Lucene/Solr Query parser = new SqlParser(); - sql = "select a from b where ((c = '[0 TO 100]') OR ((l = 'z*') AND (m = '(j OR (k NOT s))')))"; + sql = "select a from b where ((c = '[0 TO 100]') OR ((l = '(z*)') AND (m = '(j OR (k NOT s))')))"; statement = parser.createStatement(sql); sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder()); sqlVistor.process(statement, new Integer(0)); - assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:z*) AND (m:(j OR (k NOT s)))))")); + assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z*)) AND (m:(j OR (k NOT s)))))")); } private void testBasicSelect() throws Exception { @@ -216,7 +216,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { commit(); Map params = new HashMap(); params.put(CommonParams.QT, "/sql"); - params.put("sql", "select id, field_i, str_s from mytable where text='XXXX' order by field_i desc"); + params.put("sql", "select id, field_i, str_s from collection1 where text='XXXX' order by field_i desc"); SolrStream solrStream = new SolrStream(jetty.url, params); List tuples = getTuples(solrStream); @@ -267,7 +267,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { params = new HashMap(); params.put(CommonParams.QT, "/sql"); - params.put("sql", "select id, field_i, str_s from mytable where text='XXXX' order by field_i desc limit 1"); + params.put("sql", "select id, field_i, str_s from collection1 where text='XXXX' order by field_i desc limit 1"); solrStream = new SolrStream(jetty.url, params); tuples = getTuples(solrStream); @@ -281,7 +281,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { params = new HashMap(); params.put(CommonParams.QT, "/sql"); - params.put("sql", "select id, field_i, str_s from mytable where text='XXXX' AND id='(1 2 3)' order by field_i desc"); + params.put("sql", "select id, field_i, str_s from collection1 where text='XXXX' AND id='(1 2 3)' order by field_i desc"); solrStream = new SolrStream(jetty.url, params); tuples = getTuples(solrStream); @@ -326,9 +326,10 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50"); indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60"); commit(); + Map params = new HashMap(); params.put(CommonParams.QT, "/sql"); - params.put("sql", "select id, field_i, str_s from mytable where text='XXXX' order by field_iff desc"); + params.put("sql", "select id, field_i, str_s from collection1 where text='XXXX' order by field_iff desc"); SolrStream solrStream = new SolrStream(jetty.url, params); Tuple tuple = getTuple(new ExceptionStream(solrStream)); @@ -339,18 +340,18 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { params = new HashMap(); params.put(CommonParams.QT, "/sql"); - params.put("sql", "select id, field_iff, str_s from mytable where text='XXXX' order by field_iff desc"); + params.put("sql", "select id, field_iff, str_s from collection1 where text='XXXX' order by field_iff desc"); solrStream = new SolrStream(jetty.url, params); tuple = getTuple(new ExceptionStream(solrStream)); assert(tuple.EOF); assert(tuple.EXCEPTION); //An exception not detected by the parser thrown from the /select handler - assert(tuple.getException().contains("An exception has occurred on the server, refer to server log for details")); + assert(tuple.getException().contains("sort param field can't be found:")); params = new HashMap(); params.put(CommonParams.QT, "/sql"); - params.put("sql", "select str_s, count(*), sum(field_iff), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))"); + params.put("sql", "select str_s, count(*), sum(field_iff), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))"); solrStream = new SolrStream(jetty.url, params); tuple = getTuple(new ExceptionStream(solrStream)); @@ -359,6 +360,27 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { //An exception not detected by the parser thrown from the /export handler assert(tuple.getException().contains("undefined field:")); + params = new HashMap(); + params.put(CommonParams.QT, "/sql"); + params.put("sql", "select str_s, count(*), blah(field_iff), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))"); + + solrStream = new SolrStream(jetty.url, params); + tuple = getTuple(new ExceptionStream(solrStream)); + assert(tuple.EOF); + assert(tuple.EXCEPTION); + //An exception not detected by the parser thrown from the /export handler + assert(tuple.getException().contains("Invalid function: blah")); + + params = new HashMap(); + params.put(CommonParams.QT, "/sql"); + params.put("sql", "select str_s from collection1 where text='XXXX' group by str_s"); + + solrStream = new SolrStream(jetty.url, params); + tuple = getTuple(new ExceptionStream(solrStream)); + assert(tuple.EOF); + assert(tuple.EXCEPTION); + assert(tuple.getException().contains("Group by queries must include atleast one aggregate function.")); + } finally { delete(); } @@ -384,7 +406,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { commit(); Map params = new HashMap(); params.put(CommonParams.QT, "/sql"); - params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s order by sum(field_i) asc limit 2"); + params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2"); SolrStream solrStream = new SolrStream(jetty.url, params); List tuples = getTuples(solrStream); @@ -412,7 +434,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { params = new HashMap(); params.put(CommonParams.QT, "/sql"); - params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where (text='XXXX' AND NOT text='\"XXXX XXX\"') group by str_s order by str_s desc"); + params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where (text='XXXX' AND NOT text='XXXX XXX') group by str_s order by str_s desc"); solrStream = new SolrStream(jetty.url, params); tuples = getTuples(solrStream); @@ -449,7 +471,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { params = new HashMap(); params.put(CommonParams.QT, "/sql"); - params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having sum(field_i) = 19"); + params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having sum(field_i) = 19"); solrStream = new SolrStream(jetty.url, params); tuples = getTuples(solrStream); @@ -466,7 +488,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { params = new HashMap(); params.put(CommonParams.QT, "/sql"); - params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))"); + params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))"); solrStream = new SolrStream(jetty.url, params); tuples = getTuples(solrStream); @@ -484,7 +506,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { params = new HashMap(); params.put(CommonParams.QT, "/sql"); - params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))"); + params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))"); solrStream = new SolrStream(jetty.url, params); tuples = getTuples(solrStream); @@ -518,7 +540,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { Map params = new HashMap(); params.put(CommonParams.QT, "/sql"); params.put("numWorkers", "2"); - params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s order by sum(field_i) asc limit 2"); + params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2"); SolrStream solrStream = new SolrStream(jetty.url, params); List tuples = getTuples(solrStream); @@ -547,7 +569,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { params = new HashMap(); params.put(CommonParams.QT, "/sql"); params.put("numWorkers", "2"); - params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s order by str_s desc"); + params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by str_s desc"); solrStream = new SolrStream(jetty.url, params); tuples = getTuples(solrStream); @@ -584,7 +606,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { params = new HashMap(); params.put(CommonParams.QT, "/sql"); params.put("numWorkers", "2"); - params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having sum(field_i) = 19"); + params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having sum(field_i) = 19"); solrStream = new SolrStream(jetty.url, params); tuples = getTuples(solrStream); @@ -611,7 +633,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { params = new HashMap(); params.put(CommonParams.QT, "/sql"); params.put("numWorkers", "2"); - params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))"); + params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))"); solrStream = new SolrStream(jetty.url, params); tuples = getTuples(solrStream); @@ -630,7 +652,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { params = new HashMap(); params.put(CommonParams.QT, "/sql"); params.put("numWorkers", "2"); - params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))"); + params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))"); solrStream = new SolrStream(jetty.url, params); tuples = getTuples(solrStream); @@ -663,7 +685,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { commit(); Map params = new HashMap(); params.put(CommonParams.QT, "/sql"); - params.put("sql", "select year_i, sum(item_i) from mytable group by year_i order by year_i desc"); + params.put("sql", "select year_i, sum(item_i) from collection1 group by year_i order by year_i desc"); SolrStream solrStream = new SolrStream(jetty.url, params); List tuples = getTuples(solrStream); @@ -681,7 +703,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { assert(tuple.getLong("year_i") == 2014); assert(tuple.getDouble("sum(item_i)") == 7); - params.put("sql", "select year_i, month_i, sum(item_i) from mytable group by year_i, month_i order by year_i desc, month_i desc"); + params.put("sql", "select year_i, month_i, sum(item_i) from collection1 group by year_i, month_i order by year_i desc, month_i desc"); solrStream = new SolrStream(jetty.url, params); tuples = getTuples(solrStream); @@ -708,7 +730,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { params = new HashMap(); params.put(CommonParams.QT, "/sql"); - params.put("sql", "select year_i, month_i, day_i, sum(item_i) from mytable group by year_i, month_i, day_i order by year_i desc, month_i desc, day_i desc"); + params.put("sql", "select year_i, month_i, day_i, sum(item_i) from collection1 group by year_i, month_i, day_i order by year_i desc, month_i desc, day_i desc"); solrStream = new SolrStream(jetty.url, params); tuples = getTuples(solrStream); @@ -781,7 +803,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { Map params = new HashMap(); params.put(CommonParams.QT, "/sql"); params.put("numWorkers", 2); - params.put("sql", "select year_i, sum(item_i) from mytable group by year_i order by year_i desc"); + params.put("sql", "select year_i, sum(item_i) from collection1 group by year_i order by year_i desc"); SolrStream solrStream = new SolrStream(jetty.url, params); List tuples = getTuples(solrStream); @@ -802,7 +824,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { new HashMap(); params.put(CommonParams.QT, "/sql"); params.put("numWorkers", 2); - params.put("sql", "select year_i, month_i, sum(item_i) from mytable group by year_i, month_i order by year_i desc, month_i desc"); + params.put("sql", "select year_i, month_i, sum(item_i) from collection1 group by year_i, month_i order by year_i desc, month_i desc"); solrStream = new SolrStream(jetty.url, params); tuples = getTuples(solrStream); @@ -831,7 +853,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { new HashMap(); params.put(CommonParams.QT, "/sql"); params.put("numWorkers", 2); - params.put("sql", "select year_i, month_i, day_i, sum(item_i) from mytable group by year_i, month_i, day_i order by year_i desc, month_i desc, day_i desc"); + params.put("sql", "select year_i, month_i, day_i, sum(item_i) from collection1 group by year_i, month_i, day_i order by year_i desc, month_i desc, day_i desc"); solrStream = new SolrStream(jetty.url, params); tuples = getTuples(solrStream); diff --git a/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml b/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml index 80c638250cd..c3988aeee5a 100644 --- a/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml +++ b/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml @@ -857,29 +857,38 @@ Do not change these defaults. --> - - - {!xport} - xsort - false - + + + {!xport} + xsort + false + - - query - - + + query + + - - - json - false - - + + + json + false + + + + + + + json + false + + + - - - - json - false - - - - - "+this.baseUrl+":"+e.getMessage()); } catch (Exception e) { //The Stream source did not provide an exception in a format that the SolrStream could propagate. - e.printStackTrace(); - throw new IOException(this.baseUrl+": An exception has occurred on the server, refer to server log for details."); + throw new IOException("--> "+this.baseUrl+": An exception has occurred on the server, refer to server log for details."); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java index 4872578445b..4ea355df3e3 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java @@ -19,12 +19,12 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.io.Serializable; +import java.io.Writer; import java.util.List; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; -import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + public abstract class TupleStream implements Serializable { @@ -33,7 +33,15 @@ public abstract class TupleStream implements Serializable { public TupleStream() { } - + + public static void writeStreamOpen(Writer out) throws IOException { + out.write("{\"docs\":["); + } + + public static void writeStreamClose(Writer out) throws IOException { + out.write("]}"); + } + public abstract void setStreamContext(StreamContext context); public abstract List children(); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java index 4ee9bfe3974..45647f492ad 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java @@ -511,9 +511,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { Tuple t = getTuple(estream); assert(t.EOF); assert(t.EXCEPTION); - //The /select handler does not return exceptions in tuple so the generic exception is returned. - assert(t.getException().contains("An exception has occurred on the server, refer to server log for details.")); - + assert(t.getException().contains("sort param field can't be found: blah")); //Test an error that comes originates from the /export handler paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export"); @@ -553,6 +551,18 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { //ParallelStream requires that partitionKeys be set. assert(t.getException().contains("When numWorkers > 1 partitionKeys must be set.")); + + //Test an error that originates from the /select handler + paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "partitionKeys","a_s"); + stream = new CloudSolrStream(zkHost, "collection1", paramsA); + pstream = new ParallelStream(zkHost,"collection1", stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING)); + estream = new ExceptionStream(pstream); + t = getTuple(estream); + assert(t.EOF); + assert(t.EXCEPTION); + assert(t.getException().contains("sort param field can't be found: blah")); + + //Test an error that originates from the /export handler paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export", "partitionKeys","a_s"); stream = new CloudSolrStream(zkHost, "collection1", paramsA);