diff --git a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java index 277511696cb..9b19c5c820b 100644 --- a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java @@ -41,6 +41,7 @@ import org.apache.solr.client.solrj.io.stream.RankStream; import org.apache.solr.client.solrj.io.stream.RollupStream; import org.apache.solr.client.solrj.io.stream.StreamContext; import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.ExceptionStream; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; @@ -52,8 +53,10 @@ import org.apache.solr.util.plugin.SolrCoreAware; import java.util.List; import java.util.Map; import java.util.HashMap; -import java.util.Optional; import org.apache.solr.client.solrj.io.stream.metrics.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.facebook.presto.sql.parser.SqlParser; @@ -63,6 +66,8 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { private String defaultZkhost = null; private String defaultWorkerCollection = null; + private Logger logger = LoggerFactory.getLogger(SQLHandler.class); + public void inform(SolrCore core) { CoreContainer coreContainer = core.getCoreDescriptor().getCoreContainer(); @@ -93,11 +98,17 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { String workerCollection = params.get("workerCollection", defaultWorkerCollection); String workerZkhost = params.get("workerZkhost",defaultZkhost); StreamContext context = new StreamContext(); - TupleStream tupleStream = SQLTupleStreamParser.parse(sql, tableMappings, numWorkers, workerCollection, workerZkhost); - context.numWorkers = numWorkers; - context.setSolrClientCache(StreamHandler.clientCache); - tupleStream.setStreamContext(context); - rsp.add("tuples", tupleStream); + try { + TupleStream tupleStream = SQLTupleStreamParser.parse(sql, tableMappings, numWorkers, workerCollection, workerZkhost); + context.numWorkers = numWorkers; + context.setSolrClientCache(StreamHandler.clientCache); + tupleStream.setStreamContext(context); + rsp.add("tuples", new ExceptionStream(tupleStream)); + } catch(Exception e) { + //Catch the SQL parsing and query transformation exceptions. + logger.error("Exception parsing SQL", e); + rsp.add("tuples", new StreamHandler.DummyErrorStream(e)); + } } public String getDescription() { diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 84169a63c8c..3a0fe7a9a7d 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -20,10 +20,16 @@ package org.apache.solr.handler; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.net.URLDecoder; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.stream.CloudSolrStream; +import org.apache.solr.client.solrj.io.stream.ExceptionStream; import org.apache.solr.client.solrj.io.stream.MergeStream; import org.apache.solr.client.solrj.io.stream.ParallelStream; import org.apache.solr.client.solrj.io.stream.RankStream; @@ -42,12 +48,15 @@ import org.apache.solr.core.SolrCore; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.util.plugin.SolrCoreAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { static SolrClientCache clientCache = new SolrClientCache(); private StreamFactory streamFactory = new StreamFactory(); - + private Logger logger = LoggerFactory.getLogger(StreamHandler.class); + public void inform(SolrCore core) { /* The stream factory will always contain the zkUrl for the given collection @@ -108,15 +117,23 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { boolean objectSerialize = params.getBool("objectSerialize", false); TupleStream tupleStream = null; - if(objectSerialize) { - String encodedStream = params.get("stream"); - encodedStream = URLDecoder.decode(encodedStream, "UTF-8"); - byte[] bytes = Base64.base64ToByteArray(encodedStream); - ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes); - ObjectInputStream objectInputStream = new ObjectInputStream(byteStream); - tupleStream = (TupleStream)objectInputStream.readObject(); - } else { - tupleStream = this.streamFactory.constructStream(params.get("stream")); + try { + if (objectSerialize) { + String encodedStream = params.get("stream"); + encodedStream = URLDecoder.decode(encodedStream, "UTF-8"); + byte[] bytes = Base64.base64ToByteArray(encodedStream); + ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes); + ObjectInputStream objectInputStream = new ObjectInputStream(byteStream); + tupleStream = (TupleStream) objectInputStream.readObject(); + } else { + tupleStream = this.streamFactory.constructStream(params.get("stream")); + } + } catch (Exception e) { + //Catch exceptions that occur while the stream is being created. This will include streaming expression parse rules. + logger.error("Exception creating TupleStream", e); + rsp.add("tuples", new DummyErrorStream(e)); + + return; } int worker = params.getInt("workerID", 0); @@ -126,7 +143,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { context.numWorkers = numWorkers; context.setSolrClientCache(clientCache); tupleStream.setStreamContext(context); - rsp.add("tuples", tupleStream); + rsp.add("tuples", new ExceptionStream(tupleStream)); } public String getDescription() { @@ -136,4 +153,37 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { public String getSource() { return null; } + + + public static class DummyErrorStream extends TupleStream { + private Exception e; + + public DummyErrorStream(Exception e) { + this.e = e; + } + public StreamComparator getStreamSort() { + return null; + } + + public void close() { + } + + public void open() { + } + + public void setStreamContext(StreamContext context) { + } + + public List children() { + return null; + } + + public Tuple read() { + String msg = e.getMessage(); + Map m = new HashMap(); + m.put("EOF", true); + m.put("_EXCEPTION_", msg); + return new Tuple(m); + } + } } diff --git a/solr/core/src/java/org/apache/solr/response/SortingResponseWriter.java b/solr/core/src/java/org/apache/solr/response/SortingResponseWriter.java index 662068385e0..035e0e4d8c4 100644 --- a/solr/core/src/java/org/apache/solr/response/SortingResponseWriter.java +++ b/solr/core/src/java/org/apache/solr/response/SortingResponseWriter.java @@ -76,29 +76,31 @@ public class SortingResponseWriter implements QueryResponseWriter { } return; } + SolrRequestInfo info = SolrRequestInfo.getRequestInfo(); SortSpec sortSpec = info.getResponseBuilder().getSortSpec(); + Exception exception = null; if(sortSpec == null) { - throw new IOException(new SyntaxError("No sort criteria was provided.")); + exception = new IOException(new SyntaxError("No sort criteria was provided.")); } SolrIndexSearcher searcher = req.getSearcher(); Sort sort = searcher.weightSort(sortSpec.getSort()); if(sort == null) { - throw new IOException(new SyntaxError("No sort criteria was provided.")); + exception = new IOException(new SyntaxError("No sort criteria was provided.")); } if(sort.needsScores()) { - throw new IOException(new SyntaxError("Scoring is not currently supported with xsort.")); + exception = new IOException(new SyntaxError("Scoring is not currently supported with xsort.")); } FixedBitSet[] sets = (FixedBitSet[])req.getContext().get("export"); Integer th = (Integer)req.getContext().get("totalHits"); if(sets == null) { - throw new IOException(new SyntaxError("xport RankQuery is required for xsort: rq={!xport}")); + exception = new IOException(new SyntaxError("xport RankQuery is required for xsort: rq={!xport}")); } int totalHits = th.intValue(); @@ -106,20 +108,36 @@ public class SortingResponseWriter implements QueryResponseWriter { String fl = params.get("fl"); if(fl == null) { - throw new IOException(new SyntaxError("export field list (fl) must be specified.")); + exception = new IOException(new SyntaxError("export field list (fl) must be specified.")); } String[] fields = fl.split(","); for(int i=0;i leaves = req.getSearcher().getTopReaderContext().leaves(); SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort()); @@ -224,6 +242,12 @@ public class SortingResponseWriter implements QueryResponseWriter { } } + protected void writeException(Exception e, Writer out) throws IOException{ + out.write("{\"_EXCEPTION_\":\""); + writeStr(e.getMessage(), out); + out.write("\"}"); + } + protected FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException { IndexSchema schema = searcher.getSchema(); FieldWriter[] writers = new FieldWriter[fields.length]; @@ -1209,7 +1233,7 @@ public class SortingResponseWriter implements QueryResponseWriter { out.write('"'); } - out.write(cref.toString()); + writeStr(cref.toString(), out); if(!numeric) { out.write('"'); @@ -1293,11 +1317,55 @@ public class SortingResponseWriter implements QueryResponseWriter { out.write('"'); out.write(":"); out.write('"'); - out.write(cref.toString()); + writeStr(cref.toString(), out); out.write('"'); } } + private void writeStr(String val, Writer writer) throws IOException { + for (int i=0; i '#' && ch != '\\' && ch < '\u2028') || ch == ' ') { // fast path + writer.write(ch); + continue; + } + switch(ch) { + case '"': + case '\\': + writer.write('\\'); + writer.write(ch); + break; + case '\r': writer.write('\\'); writer.write('r'); break; + case '\n': writer.write('\\'); writer.write('n'); break; + case '\t': writer.write('\\'); writer.write('t'); break; + case '\b': writer.write('\\'); writer.write('b'); break; + case '\f': writer.write('\\'); writer.write('f'); break; + case '\u2028': // fallthrough + case '\u2029': + unicodeEscape(writer,ch); + break; + // case '/': + default: { + if (ch <= 0x1F) { + unicodeEscape(writer,ch); + } else { + writer.write(ch); + } + } + } + } + } + + private static char[] hexdigits = {'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'}; + protected static void unicodeEscape(Appendable out, int ch) throws IOException { + out.append('\\'); + out.append('u'); + out.append(hexdigits[(ch>>>12) ]); + out.append(hexdigits[(ch>>>8) & 0xf]); + out.append(hexdigits[(ch>>>4) & 0xf]); + out.append(hexdigits[(ch) & 0xf]); + } + public abstract class PriorityQueue { protected int size = 0; protected final int maxSize; diff --git a/solr/core/src/java/org/apache/solr/search/ExportQParserPlugin.java b/solr/core/src/java/org/apache/solr/search/ExportQParserPlugin.java index 1b4ddca2113..b159ca5aa54 100644 --- a/solr/core/src/java/org/apache/solr/search/ExportQParserPlugin.java +++ b/solr/core/src/java/org/apache/solr/search/ExportQParserPlugin.java @@ -25,6 +25,8 @@ import org.apache.lucene.index.*; import org.apache.solr.common.util.NamedList; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.common.params.SolrParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; @@ -33,6 +35,8 @@ import java.util.Set; public class ExportQParserPlugin extends QParserPlugin { public static final String NAME = "xport"; + + Logger logger = LoggerFactory.getLogger(ExportQParserPlugin.class); public void init(NamedList namedList) { } @@ -62,11 +66,10 @@ public class ExportQParserPlugin extends QParserPlugin { private Query mainQuery; private Object id; - public Query clone() { + public RankQuery clone() { ExportQuery clone = new ExportQuery(); clone.id = id; clone.leafCount = leafCount; - clone.mainQuery = mainQuery; return clone; } @@ -79,12 +82,17 @@ public class ExportQParserPlugin extends QParserPlugin { return null; } - public Weight createWeight(IndexSearcher searcher) throws IOException { + public Weight createWeight(IndexSearcher searcher, boolean needsScores) throws IOException{ return mainQuery.createWeight(searcher, true); } public Query rewrite(IndexReader reader) throws IOException { - return this.mainQuery.rewrite(reader); + Query q = mainQuery.rewrite(reader); + if(q == mainQuery) { + return this; + } else { + return clone().wrap(q); + } } public TopDocsCollector getTopDocsCollector(int len, @@ -157,7 +165,11 @@ public class ExportQParserPlugin extends QParserPlugin { } public TopDocs topDocs(int start, int howMany) { + + assert(sets != null); + SolrRequestInfo info = SolrRequestInfo.getRequestInfo(); + SolrQueryRequest req = null; if(info != null && ((req = info.getReq()) != null)) { Map context = req.getContext(); @@ -175,4 +187,5 @@ public class ExportQParserPlugin extends QParserPlugin { return true; // TODO: is this the case? } } + } \ No newline at end of file diff --git a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java index 5470c2d7f19..f7fa6649a72 100644 --- a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java +++ b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java @@ -17,17 +17,16 @@ package org.apache.solr.handler; * limitations under the License. */ -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - import com.facebook.presto.sql.parser.SqlParser; import com.facebook.presto.sql.tree.Statement; import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.ExceptionStream; import org.apache.solr.client.solrj.io.stream.SolrStream; import org.apache.solr.client.solrj.io.stream.TupleStream; import org.apache.solr.cloud.AbstractFullDistribZkTestBase; @@ -89,6 +88,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { testPredicate(); testBasicSelect(); testBasicGrouping(); + testSQLException(); testTimeSeriesGrouping(); testParallelBasicGrouping(); testParallelTimeSeriesGrouping(); @@ -308,6 +308,62 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { } } + private void testSQLException() throws Exception { + try { + + CloudJettyRunner jetty = this.cloudJettys.get(0); + + del("*:*"); + + commit(); + + indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7"); + indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8"); + indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20"); + indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11"); + indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30"); + indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40"); + indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50"); + indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60"); + commit(); + Map params = new HashMap(); + params.put(CommonParams.QT, "/sql"); + params.put("sql", "select id, field_i, str_s from mytable where text='XXXX' order by field_iff desc"); + + SolrStream solrStream = new SolrStream(jetty.url, params); + Tuple tuple = getTuple(new ExceptionStream(solrStream)); + assert(tuple.EOF); + assert(tuple.EXCEPTION); + //A parse exception detected before being sent to the search engine + assert(tuple.getException().contains("Fields in the sort spec must be included in the field list")); + + params = new HashMap(); + params.put(CommonParams.QT, "/sql"); + params.put("sql", "select id, field_iff, str_s from mytable where text='XXXX' order by field_iff desc"); + + solrStream = new SolrStream(jetty.url, params); + tuple = getTuple(new ExceptionStream(solrStream)); + assert(tuple.EOF); + assert(tuple.EXCEPTION); + //An exception not detected by the parser thrown from the /select handler + assert(tuple.getException().contains("An exception has occurred on the server, refer to server log for details")); + + params = new HashMap(); + params.put(CommonParams.QT, "/sql"); + params.put("sql", "select str_s, count(*), sum(field_iff), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))"); + + solrStream = new SolrStream(jetty.url, params); + tuple = getTuple(new ExceptionStream(solrStream)); + assert(tuple.EOF); + assert(tuple.EXCEPTION); + //An exception not detected by the parser thrown from the /export handler + assert(tuple.getException().contains("undefined field:")); + + } finally { + delete(); + } + } + private void testBasicGrouping() throws Exception { try { @@ -840,4 +896,12 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase { tupleStream.close(); return tuples; } + + + protected Tuple getTuple(TupleStream tupleStream) throws IOException { + tupleStream.open(); + Tuple t = tupleStream.read(); + tupleStream.close(); + return t; + } } diff --git a/solr/core/src/test/org/apache/solr/response/TestSortingResponseWriter.java b/solr/core/src/test/org/apache/solr/response/TestSortingResponseWriter.java index 663a47d5d39..0e7f9b1fefe 100644 --- a/solr/core/src/test/org/apache/solr/response/TestSortingResponseWriter.java +++ b/solr/core/src/test/org/apache/solr/response/TestSortingResponseWriter.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; public class TestSortingResponseWriter extends SolrTestCaseJ4 { @BeforeClass public static void beforeClass() throws Exception { + System.setProperty("export.test", "true"); initCore("solrconfig-sortingresponse.xml","schema-sortingresponse.xml"); createIndex(); } @@ -44,7 +45,7 @@ public class TestSortingResponseWriter extends SolrTestCaseJ4 { "doubledv_m", "23232.2", "longdv_m", "43434343434", "longdv_m", "343332", - "stringdv_m", "manchester city", + "stringdv_m", "manchester \"city\"", "stringdv_m", "liverpool", "stringdv_m", "Everton")); @@ -77,7 +78,25 @@ public class TestSortingResponseWriter extends SolrTestCaseJ4 { "doubledv_m", "23232.2", "longdv_m", "43434343434", "longdv_m", "343332", - "stringdv_m", "manchester city", + "stringdv_m", "manchester \"city\"", + "stringdv_m", "liverpool", + "stringdv_m", "everton")); + assertU(commit()); + assertU(adoc("id","8", + "floatdv","2.1", + "intdv", "10000000", + "stringdv", "chello \"world\"", + "longdv", "323223232323", + "doubledv","2344.346", + "intdv_m","100", + "intdv_m","250", + "floatdv_m", "123.321", + "floatdv_m", "345.123", + "doubledv_m", "3444.222", + "doubledv_m", "23232.2", + "longdv_m", "43434343434", + "longdv_m", "343332", + "stringdv_m", "manchester \"city\"", "stringdv_m", "liverpool", "stringdv_m", "everton")); assertU(commit()); @@ -90,15 +109,16 @@ public class TestSortingResponseWriter extends SolrTestCaseJ4 { //Test single value DocValue output String s = h.query(req("q", "id:1", "qt", "/export", "fl", "floatdv,intdv,stringdv,longdv,doubledv", "sort", "intdv asc")); + assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":1, \"docs\":[{\"floatdv\":2.1,\"intdv\":1,\"stringdv\":\"hello world\",\"longdv\":323223232323,\"doubledv\":2344.345}]}}"); //Test null value string: - s = h.query(req("q", "id:7", "qt", "/export", "fl", "floatdv,intdv,stringdv,longdv,doubledv", "sort", "intdv asc")); + s = h.query(req("q", "id:7", "qt", "/export", "fl", "floatdv,intdv,stringdv,longdv,doubledv", "sort", "intdv asc")); assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":1, \"docs\":[{\"floatdv\":2.1,\"intdv\":7,\"stringdv\":\"\",\"longdv\":323223232323,\"doubledv\":2344.345}]}}"); //Test multiValue docValues output - s = h.query(req("q", "id:1", "qt", "/export", "fl", "intdv_m,floatdv_m,doubledv_m,longdv_m,stringdv_m", "sort", "intdv asc")); - assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":1, \"docs\":[{\"intdv_m\":[100,250],\"floatdv_m\":[123.321,345.123],\"doubledv_m\":[3444.222,23232.2],\"longdv_m\":[343332,43434343434],\"stringdv_m\":[\"Everton\",\"liverpool\",\"manchester city\"]}]}}"); + s = h.query(req("q", "id:1", "qt", "/export", "fl", "intdv_m,floatdv_m,doubledv_m,longdv_m,stringdv_m", "sort", "intdv asc")); + assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":1, \"docs\":[{\"intdv_m\":[100,250],\"floatdv_m\":[123.321,345.123],\"doubledv_m\":[3444.222,23232.2],\"longdv_m\":[343332,43434343434],\"stringdv_m\":[\"Everton\",\"liverpool\",\"manchester \\\"city\\\"\"]}]}}"); //Test multiValues docValues output with nulls s = h.query(req("q", "id:7", "qt", "/export", "fl", "intdv_m,floatdv_m,doubledv_m,longdv_m,stringdv_m", "sort", "intdv asc")); @@ -142,5 +162,15 @@ public class TestSortingResponseWriter extends SolrTestCaseJ4 { s = h.query(req("q", "id:(1 2 3)", "qt", "/export", "fl", "intdv", "sort", "doubledv desc")); assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":3, \"docs\":[{\"intdv\":3},{\"intdv\":1},{\"intdv\":2}]}}"); + s = h.query(req("q", "intdv:[2 TO 1000]", "qt", "/export", "fl", "intdv", "sort", "doubledv desc")); + assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":3, \"docs\":[{\"intdv\":3},{\"intdv\":7},{\"intdv\":2}]}}"); + + s = h.query(req("q", "stringdv:blah", "qt", "/export", "fl", "intdv", "sort", "doubledv desc")); + assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":0, \"docs\":[]}}"); + + s = h.query(req("q", "id:8", "qt", "/export", "fl", "stringdv", "sort", "intdv asc")); + assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":1, \"docs\":[{\"stringdv\":\"chello \\\"world\\\"\"}]}}"); + + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java index 8baa8daa87a..cef647a12cd 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java @@ -38,7 +38,9 @@ public class Tuple implements Cloneable { * metrics/aggregates gathered by underlying streams. * */ - public boolean EOF; + public boolean EOF; + public boolean EXCEPTION; + public Map fields = new HashMap(); public Tuple(Map fields) { @@ -46,6 +48,10 @@ public class Tuple implements Cloneable { EOF = true; } + if(fields.containsKey("_EXCEPTION_")){ + EXCEPTION = true; + } + this.fields.putAll(fields); } @@ -61,6 +67,8 @@ public class Tuple implements Cloneable { return this.fields.get(key).toString(); } + public String getException(){ return (String)this.fields.get("_EXCEPTION_"); } + public Long getLong(Object key) { Object o = this.fields.get(key); if(o instanceof Long) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java new file mode 100644 index 00000000000..414e415a5d6 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExceptionStream extends TupleStream { + + private TupleStream stream; + private Exception openException; + private Logger log = LoggerFactory.getLogger(ExceptionStream.class); + + public ExceptionStream(TupleStream stream) { + this.stream = stream; + } + + public List children() { + return null; + } + + public void open() { + try { + stream.open(); + } catch (Exception e) { + this.openException = e; + } + } + + public Tuple read() { + if(openException != null) { + //There was an exception during the open. + Map fields = new HashMap(); + fields.put("_EXCEPTION_", openException.getMessage()); + fields.put("EOF", true); + log.error("Error while opening Stream", openException); + return new Tuple(fields); + } + + try { + return stream.read(); + } catch (Exception e) { + Map fields = new HashMap(); + fields.put("_EXCEPTION_", e.getMessage()); + fields.put("EOF", true); + log.error("Error while reading Stream:" + e); + return new Tuple(fields); + } + } + + public StreamComparator getStreamSort() { + return this.stream.getStreamSort(); + } + + public void close() throws IOException { + stream.close(); + } + + public void setStreamContext(StreamContext context) { + this.stream.setStreamContext(context); + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java index 13e0493a230..8e9de6ca730 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java @@ -30,7 +30,6 @@ import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; - /** * Queries a single Solr instance and maps SolrDocs to a Stream of Tuples. **/ @@ -135,7 +134,11 @@ public class SolrStream extends TupleStream { * */ public void close() throws IOException { - jsonTupleStream.close(); + + if(jsonTupleStream != null) { + jsonTupleStream.close(); + } + if(cache == null) { client.close(); } @@ -146,22 +149,43 @@ public class SolrStream extends TupleStream { **/ public Tuple read() throws IOException { - Map fields = jsonTupleStream.next(); + try { + Map fields = jsonTupleStream.next(); - if(trace) { - fields.put("_CORE_", this.baseUrl); - } + if (fields == null) { + //Return the EOF tuple. + Map m = new HashMap(); + m.put("EOF", true); + return new Tuple(m); + } else { - if(fields == null) { - //Return the EOF tuple. - Map m = new HashMap(); - m.put("EOF", true); - return new Tuple(m); - } else { - if(fieldMappings != null) { - fields = mapFields(fields, fieldMappings); + String msg = (String) fields.get("_EXCEPTION_"); + if (msg != null) { + HandledException ioException = new HandledException(this.baseUrl + ":" + msg); + throw ioException; + } + + if (trace) { + fields.put("_CORE_", this.baseUrl); + } + + if (fieldMappings != null) { + fields = mapFields(fields, fieldMappings); + } + return new Tuple(fields); } - return new Tuple(fields); + } catch (HandledException e) { + throw e; + } catch (Exception e) { + //The Stream source did not provide an exception in a format that the SolrStream could propagate. + e.printStackTrace(); + throw new IOException(this.baseUrl+": An exception has occurred on the server, refer to server log for details."); + } + } + + public static class HandledException extends IOException { + public HandledException(String msg) { + super(msg); } } diff --git a/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml b/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml index 2d3d2e64917..0b0c84fc572 100644 --- a/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml +++ b/solr/solrj/src/test-files/solrj/solr/collection1/conf/solrconfig-streaming.xml @@ -34,6 +34,19 @@ + + + + {!xport} + xsort + false + + + + query + + + diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java index 8ccc3148b73..63a86103d08 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java @@ -96,7 +96,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { super.setUp(); // we expect this time of exception as shards go up and down... //ignoreException(".*"); - + //System.setProperty("export.test", "true"); System.setProperty("numShards", Integer.toString(sliceCount)); } @@ -167,7 +167,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { String zkHost = zkServer.getZkAddress(); streamFactory.withCollectionZkHost("collection1", zkHost); - Map params = mapParams("q","*:*","fl","id , a_s , a_i , a_f","sort", "a_f asc , a_i asc"); + Map params = mapParams("q", "*:*", "fl", "id , a_s , a_i , a_f", "sort", "a_f asc , a_i asc"); //CloudSolrStream compares the values of the sort with the fl field. //The constructor will throw an exception if the sort fields do not the @@ -508,6 +508,86 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { commit(); } + + private void testExceptionStream() throws Exception { + + indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1"); + indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2"); + indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); + indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); + indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5"); + indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6"); + indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7"); + indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8"); + indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9"); + indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10"); + + commit(); + + String zkHost = zkServer.getZkAddress(); + + + //Test an error that comes originates from the /select handler + Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc"); + CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); + ExceptionStream estream = new ExceptionStream(stream); + Tuple t = getTuple(estream); + assert(t.EOF); + assert(t.EXCEPTION); + //The /select handler does not return exceptions in tuple so the generic exception is returned. + assert(t.getException().contains("An exception has occurred on the server, refer to server log for details.")); + + + //Test an error that comes originates from the /export handler + paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export"); + stream = new CloudSolrStream(zkHost, "collection1", paramsA); + estream = new ExceptionStream(stream); + t = getTuple(estream); + assert(t.EOF); + assert(t.EXCEPTION); + //The /export handler will pass through a real exception. + assert(t.getException().contains("undefined field:")); + } + + private void testParallelExceptionStream() throws Exception { + + indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1"); + indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2"); + indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); + indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); + indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5"); + indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6"); + indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7"); + indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8"); + indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9"); + indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10"); + + commit(); + + String zkHost = zkServer.getZkAddress(); + + Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc"); + CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); + ParallelStream pstream = new ParallelStream(zkHost,"collection1", stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING)); + ExceptionStream estream = new ExceptionStream(pstream); + Tuple t = getTuple(estream); + assert(t.EOF); + assert(t.EXCEPTION); + //ParallelStream requires that partitionKeys be set. + assert(t.getException().contains("When numWorkers > 1 partitionKeys must be set.")); + + //Test an error that originates from the /export handler + paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export", "partitionKeys","a_s"); + stream = new CloudSolrStream(zkHost, "collection1", paramsA); + pstream = new ParallelStream(zkHost,"collection1", stream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING)); + estream = new ExceptionStream(pstream); + t = getTuple(estream); + assert(t.EOF); + assert(t.EXCEPTION); + //The /export handler will pass through a real exception. + assert(t.getException().contains("undefined field:")); + } + private void testRollupStream() throws Exception { indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1"); @@ -524,7 +604,6 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { commit(); String zkHost = zkServer.getZkAddress(); - streamFactory.withCollectionZkHost("collection1", zkHost); Map paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); @@ -647,7 +726,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { String zkHost = zkServer.getZkAddress(); streamFactory.withCollectionZkHost("collection1", zkHost); - Map paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc", "partitionKeys", "a_s"); + Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); Bucket[] buckets = {new Bucket("a_s")}; @@ -1084,12 +1163,14 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { testReducerStream(); testRollupStream(); testZeroReducerStream(); + testExceptionStream(); testParallelEOF(); testParallelUniqueStream(); testParallelRankStream(); testParallelMergeStream(); testParallelRollupStream(); testParallelReducerStream(); + testParallelExceptionStream(); testZeroParallelReducerStream(); } @@ -1123,6 +1204,14 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { return tuples; } + protected Tuple getTuple(TupleStream tupleStream) throws IOException { + tupleStream.open(); + Tuple t = tupleStream.read(); + tupleStream.close(); + return t; + } + + protected boolean assertOrder(List tuples, int... ids) throws Exception { int i = 0; for(int val : ids) {