SOLR-7441:Improve overall robustness of the Streaming stack: Streaming API, Streaming Expressions, Parallel SQL

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1692193 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Joel Bernstein 2015-07-21 20:28:35 +00:00
parent 9e61daf0fa
commit 129a83b198
18 changed files with 355 additions and 161 deletions

View File

@ -19,7 +19,6 @@ package org.apache.solr.handler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
@ -43,7 +42,9 @@ import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
@ -63,9 +64,8 @@ import com.facebook.presto.sql.parser.SqlParser;
public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
private Map<String, TableSpec> tableMappings = new HashMap();
private String defaultZkhost = null;
private String defaultWorkerCollection = null;
private static String defaultZkhost = null;
private static String defaultWorkerCollection = null;
private Logger logger = LoggerFactory.getLogger(SQLHandler.class);
@ -77,41 +77,42 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
defaultZkhost = core.getCoreDescriptor().getCoreContainer().getZkController().getZkServerAddress();
defaultWorkerCollection = core.getCoreDescriptor().getCollectionName();
}
NamedList<String> tableConf = (NamedList<String>)initArgs.get("tables");
for(Entry<String,String> entry : tableConf) {
String tableName = entry.getKey();
if(entry.getValue().indexOf("@") > -1) {
String[] parts = entry.getValue().split("@");
tableMappings.put(tableName, new TableSpec(parts[0], parts[1]));
} else {
String collection = entry.getValue();
tableMappings.put(tableName, new TableSpec(collection, defaultZkhost));
}
}
}
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
SolrParams params = req.getParams();
params = adjustParams(params);
req.setParams(params);
String sql = params.get("sql");
int numWorkers = params.getInt("numWorkers", 1);
String workerCollection = params.get("workerCollection", defaultWorkerCollection);
String workerZkhost = params.get("workerZkhost",defaultZkhost);
StreamContext context = new StreamContext();
try {
TupleStream tupleStream = SQLTupleStreamParser.parse(sql, tableMappings, numWorkers, workerCollection, workerZkhost);
if(sql == null) {
throw new Exception("sql parameter cannot be null");
}
TupleStream tupleStream = SQLTupleStreamParser.parse(sql, numWorkers, workerCollection, workerZkhost);
context.numWorkers = numWorkers;
context.setSolrClientCache(StreamHandler.clientCache);
tupleStream.setStreamContext(context);
rsp.add("tuples", new ExceptionStream(tupleStream));
rsp.add("result-set", new StreamHandler.TimerStream(new ExceptionStream(tupleStream)));
} catch(Exception e) {
//Catch the SQL parsing and query transformation exceptions.
logger.error("Exception parsing SQL", e);
rsp.add("tuples", new StreamHandler.DummyErrorStream(e));
SolrException.log(logger, e);
rsp.add("result-set", new StreamHandler.DummyErrorStream(e));
}
}
private SolrParams adjustParams(SolrParams params) {
ModifiableSolrParams adjustedParams = new ModifiableSolrParams();
adjustedParams.add(params);
adjustedParams.add(CommonParams.OMIT_HEADER, "true");
return adjustedParams;
}
public String getDescription() {
return "SQLHandler";
}
@ -123,7 +124,6 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
public static class SQLTupleStreamParser {
public static TupleStream parse(String sql,
Map<String, TableSpec> tableMap,
int numWorkers,
String workerCollection,
String workerZkhost) throws IOException {
@ -137,30 +137,33 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
TupleStream sqlStream = null;
if(sqlVistor.groupByQuery) {
sqlStream = doGroupBy(sqlVistor, tableMap, numWorkers, workerCollection, workerZkhost);
sqlStream = doGroupByWithAggregates(sqlVistor, numWorkers, workerCollection, workerZkhost);
} else {
sqlStream = doSelect(sqlVistor, tableMap, numWorkers, workerCollection, workerZkhost);
sqlStream = doSelect(sqlVistor);
}
return sqlStream;
}
}
private static TupleStream doGroupBy(SQLVisitor sqlVisitor,
Map<String, TableSpec> tableMap,
int numWorkers,
String workerCollection,
String workerZkHost) throws IOException {
private static TupleStream doGroupByWithAggregates(SQLVisitor sqlVisitor,
int numWorkers,
String workerCollection,
String workerZkHost) throws IOException {
Set<String> fieldSet = new HashSet();
Bucket[] buckets = getBuckets(sqlVisitor.groupBy, fieldSet);
Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
if(metrics.length == 0) {
throw new IOException("Group by queries must include atleast one aggregate function.");
}
String fl = fields(fieldSet);
String sortDirection = getSortDirection(sqlVisitor.sorts);
String sort = bucketSort(buckets, sortDirection);
TableSpec tableSpec = tableMap.get(sqlVisitor.table);
TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
String zkHost = tableSpec.zkHost;
String collection = tableSpec.collection;
Map<String, String> params = new HashMap();
@ -229,17 +232,36 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
return tupleStream;
}
private static TupleStream doSelect(SQLVisitor sqlVisitor,
Map<String, TableSpec> tableMap,
int numWorkers,
String workerCollection,
String workerZkHost) throws IOException {
private static TupleStream doSelect(SQLVisitor sqlVisitor) throws IOException {
List<String> fields = sqlVisitor.fields;
StringBuilder flbuf = new StringBuilder();
boolean comma = false;
for(String field : fields) {
if(comma) {
if(fields.size() == 0) {
throw new IOException("Select columns must be specified.");
}
boolean score = false;
for (String field : fields) {
if(field.contains("(")) {
throw new IOException("Aggregate functions only supported with group by queries.");
}
if(field.contains("*")) {
throw new IOException("* is not supported for column selection.");
}
if(field.equals("score")) {
if(sqlVisitor.limit < 0) {
throw new IOException("score is not a valid field for unlimited select queries");
} else {
score = true;
}
}
if (comma) {
flbuf.append(",");
}
@ -254,21 +276,37 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
StringBuilder siBuf = new StringBuilder();
comma = false;
for(SortItem sortItem : sorts) {
if(comma) {
siBuf.append(",");
if(sorts != null) {
for (SortItem sortItem : sorts) {
if (comma) {
siBuf.append(",");
}
siBuf.append(stripQuotes(sortItem.getSortKey().toString()) + " " + ascDesc(sortItem.getOrdering().toString()));
}
} else {
if(sqlVisitor.limit < 0) {
throw new IOException("order by is required for unlimited select statements.");
} else {
siBuf.append("score desc");
if(!score) {
fl = fl+(",score");
}
}
siBuf.append(stripQuotes(sortItem.getSortKey().toString()) + " " + ascDesc(sortItem.getOrdering().toString()));
}
TableSpec tableSpec = tableMap.get(sqlVisitor.table);
TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
String zkHost = tableSpec.zkHost;
String collection = tableSpec.collection;
Map<String, String> params = new HashMap();
params.put("fl", fl.toString());
params.put("q", sqlVisitor.query);
params.put("sort", siBuf.toString());
if(siBuf.length() > 0) {
params.put("sort", siBuf.toString());
}
if(sqlVisitor.limit > -1) {
params.put("rows", Integer.toString(sqlVisitor.limit));
@ -384,15 +422,15 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
return buf.toString();
}
private static Metric[] getMetrics(List<String> fields, Set<String> fieldSet) {
private static Metric[] getMetrics(List<String> fields, Set<String> fieldSet) throws IOException {
List<Metric> metrics = new ArrayList();
for(String field : fields) {
if(field.contains("(")) {
field = field.substring(0, field.length()-1);
String[] parts = field.split("\\(");
String function = parts[0];
validateFunction(function);
String column = parts[1];
if(function.equals("min")) {
metrics.add(new MinMetric(column));
@ -414,6 +452,14 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
return metrics.toArray(new Metric[metrics.size()]);
}
private static void validateFunction(String function) throws IOException {
if(function.equals("min") || function.equals("max") || function.equals("sum") || function.equals("avg") || function.equals("count")) {
return;
} else {
throw new IOException("Invalid function: "+function);
}
}
private static Bucket[] getBuckets(List<String> fields, Set<String> fieldSet) {
List<Bucket> buckets = new ArrayList();
for(String field : fields) {
@ -466,13 +512,19 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
}
private class TableSpec {
private static class TableSpec {
private String collection;
private String zkHost;
public TableSpec(String collection, String zkHost) {
this.collection = collection;
this.zkHost = zkHost;
public TableSpec(String table, String defaultZkHost) {
if(table.contains("@")) {
String[] parts = table.split("@");
this.collection = parts[0];
this.zkHost = parts[1];
} else {
this.collection = table;
this.zkHost = defaultZkHost;
}
}
}
@ -496,7 +548,14 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
protected Void visitComparisonExpression(ComparisonExpression node, StringBuilder buf) {
String field = node.getLeft().toString();
String value = node.getRight().toString();
buf.append('(').append(stripQuotes(field) + ":" + stripSingleQuotes(value)).append(')');
value = stripSingleQuotes(value);
if(!value.startsWith("(") && !value.startsWith("[")) {
//If no parens default to a phrase search.
value = '"'+value+'"';
}
buf.append('(').append(stripQuotes(field) + ":" + value).append(')');
return null;
}
}
@ -805,9 +864,9 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
case EQUAL:
return td == d;
case GREATER_THAN:
return td <= d;
return td > d;
case GREATER_THAN_OR_EQUAL:
return td <= d;
return td >= d;
default:
return false;
}

View File

@ -19,6 +19,7 @@ package org.apache.solr.handler;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.io.IOException;
import java.net.URLDecoder;
import java.util.HashMap;
import java.util.List;
@ -45,6 +46,9 @@ import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.Base64;
import org.apache.solr.common.util.NamedList;
@ -129,7 +133,8 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
SolrParams params = req.getParams();
params = adjustParams(params);
req.setParams(params);
boolean objectSerialize = params.getBool("objectSerialize", false);
TupleStream tupleStream = null;
@ -146,8 +151,8 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
}
} catch (Exception e) {
//Catch exceptions that occur while the stream is being created. This will include streaming expression parse rules.
logger.error("Exception creating TupleStream", e);
rsp.add("tuples", new DummyErrorStream(e));
SolrException.log(logger, e);
rsp.add("result-set", new DummyErrorStream(e));
return;
}
@ -159,7 +164,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
context.numWorkers = numWorkers;
context.setSolrClientCache(clientCache);
tupleStream.setStreamContext(context);
rsp.add("tuples", new ExceptionStream(tupleStream));
rsp.add("result-set", new TimerStream(new ExceptionStream(tupleStream)));
}
private SolrParams adjustParams(SolrParams params) {
ModifiableSolrParams adjustedParams = new ModifiableSolrParams();
adjustedParams.add(params);
adjustedParams.add(CommonParams.OMIT_HEADER, "true");
return adjustedParams;
}
public String getDescription() {
@ -198,8 +210,49 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
String msg = e.getMessage();
Map m = new HashMap();
m.put("EOF", true);
m.put("_EXCEPTION_", msg);
m.put("EXCEPTION", msg);
return new Tuple(m);
}
}
public static class TimerStream extends TupleStream {
private long begin;
private TupleStream tupleStream;
public TimerStream(TupleStream tupleStream) {
this.tupleStream = tupleStream;
}
public StreamComparator getStreamSort() {
return this.tupleStream.getStreamSort();
}
public void close() throws IOException {
this.tupleStream.close();
}
public void open() throws IOException {
this.begin = System.nanoTime();
this.tupleStream.open();
}
public void setStreamContext(StreamContext context) {
this.tupleStream.setStreamContext(context);
}
public List<TupleStream> children() {
return this.tupleStream.children();
}
public Tuple read() throws IOException {
Tuple tuple = this.tupleStream.read();
if(tuple.EOF) {
long totalTime = (System.nanoTime() - begin) / 1000000;
tuple.fields.put("RESPONSE_TIME", totalTime);
}
return tuple;
}
}
}

View File

@ -37,6 +37,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.LongValues;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
@ -72,7 +73,7 @@ public class SortingResponseWriter implements QueryResponseWriter {
Exception e1 = res.getException();
if(e1 != null) {
if(!(e1 instanceof IgnoreException)) {
e1.printStackTrace(new PrintWriter(writer));
writeException(e1, writer, false);
}
return;
}
@ -128,16 +129,15 @@ public class SortingResponseWriter implements QueryResponseWriter {
exception = e;
}
writer.write("{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":"+totalHits+", \"docs\":[");
if(exception != null) {
//We have an exception. Send it back to the client and return.
writeException(exception, writer);
writer.write("]}}");
writer.flush();
writeException(exception, writer, true);
return;
}
writer.write("{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":"+totalHits+", \"docs\":[");
//Write the data.
List<LeafReaderContext> leaves = req.getSearcher().getTopReaderContext().leaves();
SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort());
@ -189,6 +189,7 @@ public class SortingResponseWriter implements QueryResponseWriter {
}
} catch(Throwable e) {
Throwable ex = e;
e.printStackTrace();
while(ex != null) {
String m = ex.getMessage();
if(m != null && m.contains("Broken pipe")) {
@ -242,10 +243,16 @@ public class SortingResponseWriter implements QueryResponseWriter {
}
}
protected void writeException(Exception e, Writer out) throws IOException{
out.write("{\"_EXCEPTION_\":\"");
protected void writeException(Exception e, Writer out, boolean log) throws IOException{
out.write("{\"responseHeader\": {\"status\": 400}, \"response\":{\"numFound\":0, \"docs\":[");
out.write("{\"EXCEPTION\":\"");
writeStr(e.getMessage(), out);
out.write("\"}");
out.write("]}}");
out.flush();
if(log) {
SolrException.log(logger, e);
}
}
protected FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException {
@ -1138,6 +1145,7 @@ public class SortingResponseWriter implements QueryResponseWriter {
public void setCurrentValue(int docId) {
int ord = currentVals.getOrd(docId);
if(ord < 0) {
currentOrd = -1;
} else {

View File

@ -293,20 +293,21 @@ public abstract class TextResponseWriter {
public void writeTupleStream(TupleStream tupleStream) throws IOException {
tupleStream.open();
writeStartDocumentList("response", -1, -1, -1, null);
tupleStream.writeStreamOpen(writer);
boolean isFirst = true;
while(true) {
Tuple tuple = tupleStream.read();
if(!isFirst) {
writer.write(",");
}
writer.write("\n");
writeMap(null, tuple.fields, false, true);
isFirst = false;
if(tuple.EOF) {
break;
}
}
writeEndDocumentList();
tupleStream.writeStreamClose(writer);
tupleStream.close();
}

View File

@ -62,14 +62,12 @@ public class ExportQParserPlugin extends QParserPlugin {
public class ExportQuery extends RankQuery {
private int leafCount;
private Query mainQuery;
private Object id;
public RankQuery clone() {
ExportQuery clone = new ExportQuery();
clone.id = id;
clone.leafCount = leafCount;
return clone;
}
@ -98,7 +96,8 @@ public class ExportQParserPlugin extends QParserPlugin {
public TopDocsCollector getTopDocsCollector(int len,
SolrIndexSearcher.QueryCommand cmd,
IndexSearcher searcher) throws IOException {
FixedBitSet[] sets = new FixedBitSet[this.leafCount];
int leafCount = searcher.getTopReaderContext().leaves().size();
FixedBitSet[] sets = new FixedBitSet[leafCount];
return new ExportCollector(sets);
}
@ -124,7 +123,6 @@ public class ExportQParserPlugin extends QParserPlugin {
}
public ExportQuery(SolrParams localParams, SolrParams params, SolrQueryRequest request) throws IOException {
this.leafCount = request.getSearcher().getTopReaderContext().leaves().size();
id = new Object();
}
}

View File

@ -71,10 +71,6 @@
<str name="wt">json</str>
<str name="distrib">false</str>
</lst>
<lst name="tables">
<str name="mytable">collection1</str>
</lst>
</requestHandler>

View File

@ -102,7 +102,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
SQLHandler.SQLVisitor sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
sqlVistor.process(statement, new Integer(0));
assert(sqlVistor.query.equals("(c:d)"));
assert(sqlVistor.query.equals("(c:\"d\")"));
//Add parens
parser = new SqlParser();
@ -111,11 +111,11 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
sqlVistor.process(statement, new Integer(0));
assert(sqlVistor.query.equals("(c:d)"));
assert(sqlVistor.query.equals("(c:\"d\")"));
//Phrase
parser = new SqlParser();
sql = "select a from b where (c = '\"d d\"')";
sql = "select a from b where (c = 'd d')";
statement = parser.createStatement(sql);
sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
sqlVistor.process(statement, new Integer(0));
@ -129,7 +129,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
sqlVistor.process(statement, new Integer(0));
assert(sqlVistor.query.equals("((c:d) AND (l:z))"));
assert(sqlVistor.query.equals("((c:\"d\") AND (l:\"z\"))"));
// OR
@ -139,7 +139,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
sqlVistor.process(statement, new Integer(0));
assert(sqlVistor.query.equals("((c:d) OR (l:z))"));
assert(sqlVistor.query.equals("((c:\"d\") OR (l:\"z\"))"));
// AND NOT
@ -149,7 +149,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
sqlVistor.process(statement, new Integer(0));
assert(sqlVistor.query.equals("((c:d) AND -(l:z))"));
assert(sqlVistor.query.equals("((c:\"d\") AND -(l:\"z\"))"));
// NESTED
parser = new SqlParser();
@ -158,7 +158,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
sqlVistor.process(statement, new Integer(0));
assert(sqlVistor.query.equals("((c:d) OR ((l:z) AND (m:j)))"));
assert(sqlVistor.query.equals("((c:\"d\") OR ((l:\"z\") AND (m:\"j\")))"));
// NESTED NOT
parser = new SqlParser();
@ -167,33 +167,33 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
sqlVistor.process(statement, new Integer(0));
assert(sqlVistor.query.equals("((c:d) OR ((l:z) AND -(m:j)))"));
assert(sqlVistor.query.equals("((c:\"d\") OR ((l:\"z\") AND -(m:\"j\")))"));
// RANGE - Will have to do until SQL BETWEEN is supported.
// NESTED
parser = new SqlParser();
sql = "select a from b where ((c = '[0 TO 100]') OR ((l = 'z') AND (m = 'j')))";
sql = "select a from b where ((c = '[0 TO 100]') OR ((l = '(z)') AND (m = 'j')))";
statement = parser.createStatement(sql);
sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
sqlVistor.process(statement, new Integer(0));
assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:z) AND (m:j)))"));
assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z)) AND (m:\"j\")))"));
// Wildcard
parser = new SqlParser();
sql = "select a from b where ((c = '[0 TO 100]') OR ((l = 'z*') AND (m = 'j')))";
sql = "select a from b where ((c = '[0 TO 100]') OR ((l = '(z*)') AND (m = 'j')))";
statement = parser.createStatement(sql);
sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
sqlVistor.process(statement, new Integer(0));
assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:z*) AND (m:j)))"));
assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z*)) AND (m:\"j\")))"));
// Complex Lucene/Solr Query
parser = new SqlParser();
sql = "select a from b where ((c = '[0 TO 100]') OR ((l = 'z*') AND (m = '(j OR (k NOT s))')))";
sql = "select a from b where ((c = '[0 TO 100]') OR ((l = '(z*)') AND (m = '(j OR (k NOT s))')))";
statement = parser.createStatement(sql);
sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
sqlVistor.process(statement, new Integer(0));
assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:z*) AND (m:(j OR (k NOT s)))))"));
assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z*)) AND (m:(j OR (k NOT s)))))"));
}
private void testBasicSelect() throws Exception {
@ -216,7 +216,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
commit();
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select id, field_i, str_s from mytable where text='XXXX' order by field_i desc");
params.put("sql", "select id, field_i, str_s from collection1 where text='XXXX' order by field_i desc");
SolrStream solrStream = new SolrStream(jetty.url, params);
List<Tuple> tuples = getTuples(solrStream);
@ -267,7 +267,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select id, field_i, str_s from mytable where text='XXXX' order by field_i desc limit 1");
params.put("sql", "select id, field_i, str_s from collection1 where text='XXXX' order by field_i desc limit 1");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@ -281,7 +281,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select id, field_i, str_s from mytable where text='XXXX' AND id='(1 2 3)' order by field_i desc");
params.put("sql", "select id, field_i, str_s from collection1 where text='XXXX' AND id='(1 2 3)' order by field_i desc");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@ -326,9 +326,10 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
commit();
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select id, field_i, str_s from mytable where text='XXXX' order by field_iff desc");
params.put("sql", "select id, field_i, str_s from collection1 where text='XXXX' order by field_iff desc");
SolrStream solrStream = new SolrStream(jetty.url, params);
Tuple tuple = getTuple(new ExceptionStream(solrStream));
@ -339,18 +340,18 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select id, field_iff, str_s from mytable where text='XXXX' order by field_iff desc");
params.put("sql", "select id, field_iff, str_s from collection1 where text='XXXX' order by field_iff desc");
solrStream = new SolrStream(jetty.url, params);
tuple = getTuple(new ExceptionStream(solrStream));
assert(tuple.EOF);
assert(tuple.EXCEPTION);
//An exception not detected by the parser thrown from the /select handler
assert(tuple.getException().contains("An exception has occurred on the server, refer to server log for details"));
assert(tuple.getException().contains("sort param field can't be found:"));
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select str_s, count(*), sum(field_iff), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))");
params.put("sql", "select str_s, count(*), sum(field_iff), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))");
solrStream = new SolrStream(jetty.url, params);
tuple = getTuple(new ExceptionStream(solrStream));
@ -359,6 +360,27 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
//An exception not detected by the parser thrown from the /export handler
assert(tuple.getException().contains("undefined field:"));
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select str_s, count(*), blah(field_iff), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))");
solrStream = new SolrStream(jetty.url, params);
tuple = getTuple(new ExceptionStream(solrStream));
assert(tuple.EOF);
assert(tuple.EXCEPTION);
//An exception not detected by the parser thrown from the /export handler
assert(tuple.getException().contains("Invalid function: blah"));
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select str_s from collection1 where text='XXXX' group by str_s");
solrStream = new SolrStream(jetty.url, params);
tuple = getTuple(new ExceptionStream(solrStream));
assert(tuple.EOF);
assert(tuple.EXCEPTION);
assert(tuple.getException().contains("Group by queries must include atleast one aggregate function."));
} finally {
delete();
}
@ -384,7 +406,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
commit();
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
SolrStream solrStream = new SolrStream(jetty.url, params);
List<Tuple> tuples = getTuples(solrStream);
@ -412,7 +434,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where (text='XXXX' AND NOT text='\"XXXX XXX\"') group by str_s order by str_s desc");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where (text='XXXX' AND NOT text='XXXX XXX') group by str_s order by str_s desc");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@ -449,7 +471,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having sum(field_i) = 19");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having sum(field_i) = 19");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@ -466,7 +488,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@ -484,7 +506,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@ -518,7 +540,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("numWorkers", "2");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
SolrStream solrStream = new SolrStream(jetty.url, params);
List<Tuple> tuples = getTuples(solrStream);
@ -547,7 +569,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("numWorkers", "2");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s order by str_s desc");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s order by str_s desc");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@ -584,7 +606,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("numWorkers", "2");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having sum(field_i) = 19");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having sum(field_i) = 19");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@ -611,7 +633,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("numWorkers", "2");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@ -630,7 +652,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("numWorkers", "2");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))");
params.put("sql", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), avg(field_i) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 100))");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@ -663,7 +685,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
commit();
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select year_i, sum(item_i) from mytable group by year_i order by year_i desc");
params.put("sql", "select year_i, sum(item_i) from collection1 group by year_i order by year_i desc");
SolrStream solrStream = new SolrStream(jetty.url, params);
List<Tuple> tuples = getTuples(solrStream);
@ -681,7 +703,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getLong("year_i") == 2014);
assert(tuple.getDouble("sum(item_i)") == 7);
params.put("sql", "select year_i, month_i, sum(item_i) from mytable group by year_i, month_i order by year_i desc, month_i desc");
params.put("sql", "select year_i, month_i, sum(item_i) from collection1 group by year_i, month_i order by year_i desc, month_i desc");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@ -708,7 +730,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select year_i, month_i, day_i, sum(item_i) from mytable group by year_i, month_i, day_i order by year_i desc, month_i desc, day_i desc");
params.put("sql", "select year_i, month_i, day_i, sum(item_i) from collection1 group by year_i, month_i, day_i order by year_i desc, month_i desc, day_i desc");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@ -781,7 +803,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("numWorkers", 2);
params.put("sql", "select year_i, sum(item_i) from mytable group by year_i order by year_i desc");
params.put("sql", "select year_i, sum(item_i) from collection1 group by year_i order by year_i desc");
SolrStream solrStream = new SolrStream(jetty.url, params);
List<Tuple> tuples = getTuples(solrStream);
@ -802,7 +824,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("numWorkers", 2);
params.put("sql", "select year_i, month_i, sum(item_i) from mytable group by year_i, month_i order by year_i desc, month_i desc");
params.put("sql", "select year_i, month_i, sum(item_i) from collection1 group by year_i, month_i order by year_i desc, month_i desc");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@ -831,7 +853,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("numWorkers", 2);
params.put("sql", "select year_i, month_i, day_i, sum(item_i) from mytable group by year_i, month_i, day_i order by year_i desc, month_i desc, day_i desc");
params.put("sql", "select year_i, month_i, day_i, sum(item_i) from collection1 group by year_i, month_i, day_i order by year_i desc, month_i desc, day_i desc");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);

View File

@ -857,29 +857,38 @@
Do not change these defaults.
-->
<requestHandler name="/export" class="solr.SearchHandler">
<lst name="invariants">
<str name="rq">{!xport}</str>
<str name="wt">xsort</str>
<str name="distrib">false</str>
</lst>
<requestHandler name="/export" class="solr.SearchHandler">
<lst name="invariants">
<str name="rq">{!xport}</str>
<str name="wt">xsort</str>
<str name="distrib">false</str>
</lst>
<arr name="components">
<str>query</str>
</arr>
</requestHandler>
<arr name="components">
<str>query</str>
</arr>
</requestHandler>
<!--
Distributed Stream processing.
-->
<requestHandler name="/stream" class="solr.StreamHandler">
<lst name="invariants">
<str name="wt">json</str>
<str name="distrib">false</str>
</lst>
</requestHandler>
<requestHandler name="/stream" class="solr.StreamHandler">
<lst name="invariants">
<str name="wt">json</str>
<str name="distrib">false</str>
</lst>
</requestHandler>
<requestHandler name="/sql" class="solr.SQLHandler">
<lst name="invariants">
<str name="wt">json</str>
<str name="distrib">false</str>
</lst>
</requestHandler>
<!-- A Robust Example
@ -942,19 +951,6 @@
</requestHandler>
<!--
Distributed Stream processing.
-->
<requestHandler name="/stream" class="solr.StreamHandler">
<lst name="invariants">
<str name="wt">json</str>
<str name="distrib">false</str>
</lst>
</requestHandler>
<!-- Field Analysis Request Handler
RequestHandler that provides much the same functionality as

View File

@ -910,7 +910,12 @@
</requestHandler>
<requestHandler name="/sql" class="solr.SQLHandler">
<lst name="invariants">
<str name="wt">json</str>
<str name="distrib">false</str>
</lst>
</requestHandler>

View File

@ -48,7 +48,7 @@ public class Tuple implements Cloneable {
EOF = true;
}
if(fields.containsKey("_EXCEPTION_")){
if(fields.containsKey("EXCEPTION")){
EXCEPTION = true;
}
@ -67,7 +67,7 @@ public class Tuple implements Cloneable {
return this.fields.get(key).toString();
}
public String getException(){ return (String)this.fields.get("_EXCEPTION_"); }
public String getException(){ return (String)this.fields.get("EXCEPTION"); }
public Long getLong(Object key) {
Object o = this.fields.get(key);

View File

@ -289,6 +289,11 @@ public class CloudSolrStream extends TupleStream implements Expressible {
//System.out.println("Connected to zk an got cluster state.");
Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
if(slices == null) {
throw new Exception("Collection not found:"+this.collection);
}
long time = System.currentTimeMillis();
params.put("distrib","false"); // We are the aggregator.

View File

@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.common.SolrException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,9 +54,9 @@ public class ExceptionStream extends TupleStream {
if(openException != null) {
//There was an exception during the open.
Map fields = new HashMap();
fields.put("_EXCEPTION_", openException.getMessage());
fields.put("EXCEPTION", openException.getMessage());
fields.put("EOF", true);
log.error("Error while opening Stream", openException);
SolrException.log(log, openException);
return new Tuple(fields);
}
@ -63,9 +64,9 @@ public class ExceptionStream extends TupleStream {
return stream.read();
} catch (Exception e) {
Map fields = new HashMap();
fields.put("_EXCEPTION_", e.getMessage());
fields.put("EXCEPTION", e.getMessage());
fields.put("EOF", true);
log.error("Error while reading Stream:" + e);
SolrException.log(log, e);
return new Tuple(fields);
}
}

View File

@ -114,6 +114,8 @@ public class JSONTupleStream {
String val = parser.getString();
if (key.equals(val)) {
return true;
} else if("error".equals(val)) {
handleError();
}
}
break;
@ -136,6 +138,26 @@ public class JSONTupleStream {
}
}
private void handleError() throws IOException {
for (;;) {
int event = parser.nextEvent();
if(event == JSONParser.STRING) {
String val = parser.getString();
if("msg".equals(val)) {
event = parser.nextEvent();
if(event == JSONParser.STRING) {
String msg = parser.getString();
if(msg != null) {
throw new SolrStream.HandledException(msg);
}
}
}
} else if (event == JSONParser.OBJECT_END) {
throw new IOException("");
}
}
}
private void skipArray(String key, boolean deepSearch) throws IOException {
for (;;) {
int event = parser.nextEvent();

View File

@ -205,14 +205,20 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
m.put("EOF", true);
Tuple t = new Tuple(m);
/*
Map<String, Map> metrics = new HashMap();
Iterator<Entry<String,Tuple>> it = this.eofTuples.entrySet().iterator();
while(it.hasNext()) {
Map.Entry<String, Tuple> entry = it.next();
metrics.put(entry.getKey(), entry.getValue().fields);
if(entry.getValue().fields.size() > 1) {
metrics.put(entry.getKey(), entry.getValue().fields);
}
}
t.setMetrics(metrics);
if(metrics.size() > 0) {
t.setMetrics(metrics);
}
*/
return t;
}

View File

@ -170,6 +170,11 @@ public class RollupStream extends TupleStream implements Expressible {
Tuple tuple = tupleStream.read();
if(tuple.EOF) {
if(!finished) {
if(currentMetrics == null) {
return tuple;
}
Map map = new HashMap();
for(Metric metric : currentMetrics) {
map.put(metric.getIdentifier(), metric.getValue());

View File

@ -159,9 +159,9 @@ public class SolrStream extends TupleStream {
return new Tuple(m);
} else {
String msg = (String) fields.get("_EXCEPTION_");
String msg = (String) fields.get("EXCEPTION");
if (msg != null) {
HandledException ioException = new HandledException(this.baseUrl + ":" + msg);
HandledException ioException = new HandledException(msg);
throw ioException;
}
@ -175,11 +175,10 @@ public class SolrStream extends TupleStream {
return new Tuple(fields);
}
} catch (HandledException e) {
throw e;
throw new IOException("--> "+this.baseUrl+":"+e.getMessage());
} catch (Exception e) {
//The Stream source did not provide an exception in a format that the SolrStream could propagate.
e.printStackTrace();
throw new IOException(this.baseUrl+": An exception has occurred on the server, refer to server log for details.");
throw new IOException("--> "+this.baseUrl+": An exception has occurred on the server, refer to server log for details.");
}
}

View File

@ -19,12 +19,12 @@ package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.io.Serializable;
import java.io.Writer;
import java.util.List;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public abstract class TupleStream implements Serializable {
@ -33,7 +33,15 @@ public abstract class TupleStream implements Serializable {
public TupleStream() {
}
public static void writeStreamOpen(Writer out) throws IOException {
out.write("{\"docs\":[");
}
public static void writeStreamClose(Writer out) throws IOException {
out.write("]}");
}
public abstract void setStreamContext(StreamContext context);
public abstract List<TupleStream> children();

View File

@ -511,9 +511,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
Tuple t = getTuple(estream);
assert(t.EOF);
assert(t.EXCEPTION);
//The /select handler does not return exceptions in tuple so the generic exception is returned.
assert(t.getException().contains("An exception has occurred on the server, refer to server log for details."));
assert(t.getException().contains("sort param field can't be found: blah"));
//Test an error that comes originates from the /export handler
paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export");
@ -553,6 +551,18 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
//ParallelStream requires that partitionKeys be set.
assert(t.getException().contains("When numWorkers > 1 partitionKeys must be set."));
//Test an error that originates from the /select handler
paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "partitionKeys","a_s");
stream = new CloudSolrStream(zkHost, "collection1", paramsA);
pstream = new ParallelStream(zkHost,"collection1", stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
estream = new ExceptionStream(pstream);
t = getTuple(estream);
assert(t.EOF);
assert(t.EXCEPTION);
assert(t.getException().contains("sort param field can't be found: blah"));
//Test an error that originates from the /export handler
paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export", "partitionKeys","a_s");
stream = new CloudSolrStream(zkHost, "collection1", paramsA);