SOLR-7441, SOLR-7647: Improve overall robustness of the Streaming stack: Streaming API, Streaming Expressions, Parallel SQL

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1689045 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Joel Bernstein 2015-07-03 16:32:07 +00:00
parent 07f8c4ec5b
commit a4e7ab3796
11 changed files with 511 additions and 57 deletions

View File

@ -41,6 +41,7 @@ import org.apache.solr.client.solrj.io.stream.RankStream;
import org.apache.solr.client.solrj.io.stream.RollupStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
@ -52,8 +53,10 @@ import org.apache.solr.util.plugin.SolrCoreAware;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Optional;
import org.apache.solr.client.solrj.io.stream.metrics.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.facebook.presto.sql.parser.SqlParser;
@ -63,6 +66,8 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
private String defaultZkhost = null;
private String defaultWorkerCollection = null;
private Logger logger = LoggerFactory.getLogger(SQLHandler.class);
public void inform(SolrCore core) {
CoreContainer coreContainer = core.getCoreDescriptor().getCoreContainer();
@ -93,11 +98,17 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
String workerCollection = params.get("workerCollection", defaultWorkerCollection);
String workerZkhost = params.get("workerZkhost",defaultZkhost);
StreamContext context = new StreamContext();
TupleStream tupleStream = SQLTupleStreamParser.parse(sql, tableMappings, numWorkers, workerCollection, workerZkhost);
context.numWorkers = numWorkers;
context.setSolrClientCache(StreamHandler.clientCache);
tupleStream.setStreamContext(context);
rsp.add("tuples", tupleStream);
try {
TupleStream tupleStream = SQLTupleStreamParser.parse(sql, tableMappings, numWorkers, workerCollection, workerZkhost);
context.numWorkers = numWorkers;
context.setSolrClientCache(StreamHandler.clientCache);
tupleStream.setStreamContext(context);
rsp.add("tuples", new ExceptionStream(tupleStream));
} catch(Exception e) {
//Catch the SQL parsing and query transformation exceptions.
logger.error("Exception parsing SQL", e);
rsp.add("tuples", new StreamHandler.DummyErrorStream(e));
}
}
public String getDescription() {

View File

@ -20,10 +20,16 @@ package org.apache.solr.handler;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.net.URLDecoder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.MergeStream;
import org.apache.solr.client.solrj.io.stream.ParallelStream;
import org.apache.solr.client.solrj.io.stream.RankStream;
@ -42,12 +48,15 @@ import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
static SolrClientCache clientCache = new SolrClientCache();
private StreamFactory streamFactory = new StreamFactory();
private Logger logger = LoggerFactory.getLogger(StreamHandler.class);
public void inform(SolrCore core) {
/* The stream factory will always contain the zkUrl for the given collection
@ -108,15 +117,23 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
boolean objectSerialize = params.getBool("objectSerialize", false);
TupleStream tupleStream = null;
if(objectSerialize) {
String encodedStream = params.get("stream");
encodedStream = URLDecoder.decode(encodedStream, "UTF-8");
byte[] bytes = Base64.base64ToByteArray(encodedStream);
ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(byteStream);
tupleStream = (TupleStream)objectInputStream.readObject();
} else {
tupleStream = this.streamFactory.constructStream(params.get("stream"));
try {
if (objectSerialize) {
String encodedStream = params.get("stream");
encodedStream = URLDecoder.decode(encodedStream, "UTF-8");
byte[] bytes = Base64.base64ToByteArray(encodedStream);
ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(byteStream);
tupleStream = (TupleStream) objectInputStream.readObject();
} else {
tupleStream = this.streamFactory.constructStream(params.get("stream"));
}
} catch (Exception e) {
//Catch exceptions that occur while the stream is being created. This will include streaming expression parse rules.
logger.error("Exception creating TupleStream", e);
rsp.add("tuples", new DummyErrorStream(e));
return;
}
int worker = params.getInt("workerID", 0);
@ -126,7 +143,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
context.numWorkers = numWorkers;
context.setSolrClientCache(clientCache);
tupleStream.setStreamContext(context);
rsp.add("tuples", tupleStream);
rsp.add("tuples", new ExceptionStream(tupleStream));
}
public String getDescription() {
@ -136,4 +153,37 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
public String getSource() {
return null;
}
public static class DummyErrorStream extends TupleStream {
private Exception e;
public DummyErrorStream(Exception e) {
this.e = e;
}
public StreamComparator getStreamSort() {
return null;
}
public void close() {
}
public void open() {
}
public void setStreamContext(StreamContext context) {
}
public List<TupleStream> children() {
return null;
}
public Tuple read() {
String msg = e.getMessage();
Map m = new HashMap();
m.put("EOF", true);
m.put("_EXCEPTION_", msg);
return new Tuple(m);
}
}
}

View File

@ -76,29 +76,31 @@ public class SortingResponseWriter implements QueryResponseWriter {
}
return;
}
SolrRequestInfo info = SolrRequestInfo.getRequestInfo();
SortSpec sortSpec = info.getResponseBuilder().getSortSpec();
Exception exception = null;
if(sortSpec == null) {
throw new IOException(new SyntaxError("No sort criteria was provided."));
exception = new IOException(new SyntaxError("No sort criteria was provided."));
}
SolrIndexSearcher searcher = req.getSearcher();
Sort sort = searcher.weightSort(sortSpec.getSort());
if(sort == null) {
throw new IOException(new SyntaxError("No sort criteria was provided."));
exception = new IOException(new SyntaxError("No sort criteria was provided."));
}
if(sort.needsScores()) {
throw new IOException(new SyntaxError("Scoring is not currently supported with xsort."));
exception = new IOException(new SyntaxError("Scoring is not currently supported with xsort."));
}
FixedBitSet[] sets = (FixedBitSet[])req.getContext().get("export");
Integer th = (Integer)req.getContext().get("totalHits");
if(sets == null) {
throw new IOException(new SyntaxError("xport RankQuery is required for xsort: rq={!xport}"));
exception = new IOException(new SyntaxError("xport RankQuery is required for xsort: rq={!xport}"));
}
int totalHits = th.intValue();
@ -106,20 +108,36 @@ public class SortingResponseWriter implements QueryResponseWriter {
String fl = params.get("fl");
if(fl == null) {
throw new IOException(new SyntaxError("export field list (fl) must be specified."));
exception = new IOException(new SyntaxError("export field list (fl) must be specified."));
}
String[] fields = fl.split(",");
for(int i=0;i<fields.length; i++) {
if(fl.trim().equals("score")) {
throw new IOException(new SyntaxError("Scoring is not currently supported with xsort."));
exception = new IOException(new SyntaxError("Scoring is not currently supported with xsort."));
break;
}
}
FieldWriter[] fieldWriters = getFieldWriters(fields, req.getSearcher());
FieldWriter[] fieldWriters = null;
try {
fieldWriters = getFieldWriters(fields, req.getSearcher());
}catch(Exception e) {
exception = e;
}
writer.write("{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":"+totalHits+", \"docs\":[");
if(exception != null) {
//We have an exception. Send it back to the client and return.
writeException(exception, writer);
writer.write("]}}");
writer.flush();
return;
}
//Write the data.
List<LeafReaderContext> leaves = req.getSearcher().getTopReaderContext().leaves();
SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort());
@ -224,6 +242,12 @@ public class SortingResponseWriter implements QueryResponseWriter {
}
}
protected void writeException(Exception e, Writer out) throws IOException{
out.write("{\"_EXCEPTION_\":\"");
writeStr(e.getMessage(), out);
out.write("\"}");
}
protected FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException {
IndexSchema schema = searcher.getSchema();
FieldWriter[] writers = new FieldWriter[fields.length];
@ -1209,7 +1233,7 @@ public class SortingResponseWriter implements QueryResponseWriter {
out.write('"');
}
out.write(cref.toString());
writeStr(cref.toString(), out);
if(!numeric) {
out.write('"');
@ -1293,11 +1317,55 @@ public class SortingResponseWriter implements QueryResponseWriter {
out.write('"');
out.write(":");
out.write('"');
out.write(cref.toString());
writeStr(cref.toString(), out);
out.write('"');
}
}
private void writeStr(String val, Writer writer) throws IOException {
for (int i=0; i<val.length(); i++) {
char ch = val.charAt(i);
if ((ch > '#' && ch != '\\' && ch < '\u2028') || ch == ' ') { // fast path
writer.write(ch);
continue;
}
switch(ch) {
case '"':
case '\\':
writer.write('\\');
writer.write(ch);
break;
case '\r': writer.write('\\'); writer.write('r'); break;
case '\n': writer.write('\\'); writer.write('n'); break;
case '\t': writer.write('\\'); writer.write('t'); break;
case '\b': writer.write('\\'); writer.write('b'); break;
case '\f': writer.write('\\'); writer.write('f'); break;
case '\u2028': // fallthrough
case '\u2029':
unicodeEscape(writer,ch);
break;
// case '/':
default: {
if (ch <= 0x1F) {
unicodeEscape(writer,ch);
} else {
writer.write(ch);
}
}
}
}
}
private static char[] hexdigits = {'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'};
protected static void unicodeEscape(Appendable out, int ch) throws IOException {
out.append('\\');
out.append('u');
out.append(hexdigits[(ch>>>12) ]);
out.append(hexdigits[(ch>>>8) & 0xf]);
out.append(hexdigits[(ch>>>4) & 0xf]);
out.append(hexdigits[(ch) & 0xf]);
}
public abstract class PriorityQueue<T> {
protected int size = 0;
protected final int maxSize;

View File

@ -25,6 +25,8 @@ import org.apache.lucene.index.*;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.common.params.SolrParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
@ -33,6 +35,8 @@ import java.util.Set;
public class ExportQParserPlugin extends QParserPlugin {
public static final String NAME = "xport";
Logger logger = LoggerFactory.getLogger(ExportQParserPlugin.class);
public void init(NamedList namedList) {
}
@ -62,11 +66,10 @@ public class ExportQParserPlugin extends QParserPlugin {
private Query mainQuery;
private Object id;
public Query clone() {
public RankQuery clone() {
ExportQuery clone = new ExportQuery();
clone.id = id;
clone.leafCount = leafCount;
clone.mainQuery = mainQuery;
return clone;
}
@ -79,12 +82,17 @@ public class ExportQParserPlugin extends QParserPlugin {
return null;
}
public Weight createWeight(IndexSearcher searcher) throws IOException {
public Weight createWeight(IndexSearcher searcher, boolean needsScores) throws IOException{
return mainQuery.createWeight(searcher, true);
}
public Query rewrite(IndexReader reader) throws IOException {
return this.mainQuery.rewrite(reader);
Query q = mainQuery.rewrite(reader);
if(q == mainQuery) {
return this;
} else {
return clone().wrap(q);
}
}
public TopDocsCollector getTopDocsCollector(int len,
@ -157,7 +165,11 @@ public class ExportQParserPlugin extends QParserPlugin {
}
public TopDocs topDocs(int start, int howMany) {
assert(sets != null);
SolrRequestInfo info = SolrRequestInfo.getRequestInfo();
SolrQueryRequest req = null;
if(info != null && ((req = info.getReq()) != null)) {
Map context = req.getContext();
@ -175,4 +187,5 @@ public class ExportQParserPlugin extends QParserPlugin {
return true; // TODO: is this the case?
}
}
}

View File

@ -17,17 +17,16 @@ package org.apache.solr.handler;
* limitations under the License.
*/
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.tree.Statement;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.SolrStream;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
@ -89,6 +88,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
testPredicate();
testBasicSelect();
testBasicGrouping();
testSQLException();
testTimeSeriesGrouping();
testParallelBasicGrouping();
testParallelTimeSeriesGrouping();
@ -308,6 +308,62 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
}
}
private void testSQLException() throws Exception {
try {
CloudJettyRunner jetty = this.cloudJettys.get(0);
del("*:*");
commit();
indexr("id", "1", "text", "XXXX XXXX", "str_s", "a", "field_i", "7");
indexr("id", "2", "text", "XXXX XXXX", "str_s", "b", "field_i", "8");
indexr("id", "3", "text", "XXXX XXXX", "str_s", "a", "field_i", "20");
indexr("id", "4", "text", "XXXX XXXX", "str_s", "b", "field_i", "11");
indexr("id", "5", "text", "XXXX XXXX", "str_s", "c", "field_i", "30");
indexr("id", "6", "text", "XXXX XXXX", "str_s", "c", "field_i", "40");
indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
commit();
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select id, field_i, str_s from mytable where text='XXXX' order by field_iff desc");
SolrStream solrStream = new SolrStream(jetty.url, params);
Tuple tuple = getTuple(new ExceptionStream(solrStream));
assert(tuple.EOF);
assert(tuple.EXCEPTION);
//A parse exception detected before being sent to the search engine
assert(tuple.getException().contains("Fields in the sort spec must be included in the field list"));
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select id, field_iff, str_s from mytable where text='XXXX' order by field_iff desc");
solrStream = new SolrStream(jetty.url, params);
tuple = getTuple(new ExceptionStream(solrStream));
assert(tuple.EOF);
assert(tuple.EXCEPTION);
//An exception not detected by the parser thrown from the /select handler
assert(tuple.getException().contains("An exception has occurred on the server, refer to server log for details"));
params = new HashMap();
params.put(CommonParams.QT, "/sql");
params.put("sql", "select str_s, count(*), sum(field_iff), min(field_i), max(field_i), avg(field_i) from mytable where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))");
solrStream = new SolrStream(jetty.url, params);
tuple = getTuple(new ExceptionStream(solrStream));
assert(tuple.EOF);
assert(tuple.EXCEPTION);
//An exception not detected by the parser thrown from the /export handler
assert(tuple.getException().contains("undefined field:"));
} finally {
delete();
}
}
private void testBasicGrouping() throws Exception {
try {
@ -840,4 +896,12 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
tupleStream.close();
return tuples;
}
protected Tuple getTuple(TupleStream tupleStream) throws IOException {
tupleStream.open();
Tuple t = tupleStream.read();
tupleStream.close();
return t;
}
}

View File

@ -25,6 +25,7 @@ import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
public class TestSortingResponseWriter extends SolrTestCaseJ4 {
@BeforeClass
public static void beforeClass() throws Exception {
System.setProperty("export.test", "true");
initCore("solrconfig-sortingresponse.xml","schema-sortingresponse.xml");
createIndex();
}
@ -44,7 +45,7 @@ public class TestSortingResponseWriter extends SolrTestCaseJ4 {
"doubledv_m", "23232.2",
"longdv_m", "43434343434",
"longdv_m", "343332",
"stringdv_m", "manchester city",
"stringdv_m", "manchester \"city\"",
"stringdv_m", "liverpool",
"stringdv_m", "Everton"));
@ -77,7 +78,25 @@ public class TestSortingResponseWriter extends SolrTestCaseJ4 {
"doubledv_m", "23232.2",
"longdv_m", "43434343434",
"longdv_m", "343332",
"stringdv_m", "manchester city",
"stringdv_m", "manchester \"city\"",
"stringdv_m", "liverpool",
"stringdv_m", "everton"));
assertU(commit());
assertU(adoc("id","8",
"floatdv","2.1",
"intdv", "10000000",
"stringdv", "chello \"world\"",
"longdv", "323223232323",
"doubledv","2344.346",
"intdv_m","100",
"intdv_m","250",
"floatdv_m", "123.321",
"floatdv_m", "345.123",
"doubledv_m", "3444.222",
"doubledv_m", "23232.2",
"longdv_m", "43434343434",
"longdv_m", "343332",
"stringdv_m", "manchester \"city\"",
"stringdv_m", "liverpool",
"stringdv_m", "everton"));
assertU(commit());
@ -90,15 +109,16 @@ public class TestSortingResponseWriter extends SolrTestCaseJ4 {
//Test single value DocValue output
String s = h.query(req("q", "id:1", "qt", "/export", "fl", "floatdv,intdv,stringdv,longdv,doubledv", "sort", "intdv asc"));
assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":1, \"docs\":[{\"floatdv\":2.1,\"intdv\":1,\"stringdv\":\"hello world\",\"longdv\":323223232323,\"doubledv\":2344.345}]}}");
//Test null value string:
s = h.query(req("q", "id:7", "qt", "/export", "fl", "floatdv,intdv,stringdv,longdv,doubledv", "sort", "intdv asc"));
s = h.query(req("q", "id:7", "qt", "/export", "fl", "floatdv,intdv,stringdv,longdv,doubledv", "sort", "intdv asc"));
assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":1, \"docs\":[{\"floatdv\":2.1,\"intdv\":7,\"stringdv\":\"\",\"longdv\":323223232323,\"doubledv\":2344.345}]}}");
//Test multiValue docValues output
s = h.query(req("q", "id:1", "qt", "/export", "fl", "intdv_m,floatdv_m,doubledv_m,longdv_m,stringdv_m", "sort", "intdv asc"));
assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":1, \"docs\":[{\"intdv_m\":[100,250],\"floatdv_m\":[123.321,345.123],\"doubledv_m\":[3444.222,23232.2],\"longdv_m\":[343332,43434343434],\"stringdv_m\":[\"Everton\",\"liverpool\",\"manchester city\"]}]}}");
s = h.query(req("q", "id:1", "qt", "/export", "fl", "intdv_m,floatdv_m,doubledv_m,longdv_m,stringdv_m", "sort", "intdv asc"));
assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":1, \"docs\":[{\"intdv_m\":[100,250],\"floatdv_m\":[123.321,345.123],\"doubledv_m\":[3444.222,23232.2],\"longdv_m\":[343332,43434343434],\"stringdv_m\":[\"Everton\",\"liverpool\",\"manchester \\\"city\\\"\"]}]}}");
//Test multiValues docValues output with nulls
s = h.query(req("q", "id:7", "qt", "/export", "fl", "intdv_m,floatdv_m,doubledv_m,longdv_m,stringdv_m", "sort", "intdv asc"));
@ -142,5 +162,15 @@ public class TestSortingResponseWriter extends SolrTestCaseJ4 {
s = h.query(req("q", "id:(1 2 3)", "qt", "/export", "fl", "intdv", "sort", "doubledv desc"));
assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":3, \"docs\":[{\"intdv\":3},{\"intdv\":1},{\"intdv\":2}]}}");
s = h.query(req("q", "intdv:[2 TO 1000]", "qt", "/export", "fl", "intdv", "sort", "doubledv desc"));
assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":3, \"docs\":[{\"intdv\":3},{\"intdv\":7},{\"intdv\":2}]}}");
s = h.query(req("q", "stringdv:blah", "qt", "/export", "fl", "intdv", "sort", "doubledv desc"));
assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":0, \"docs\":[]}}");
s = h.query(req("q", "id:8", "qt", "/export", "fl", "stringdv", "sort", "intdv asc"));
assertEquals(s, "{\"responseHeader\": {\"status\": 0}, \"response\":{\"numFound\":1, \"docs\":[{\"stringdv\":\"chello \\\"world\\\"\"}]}}");
}
}

View File

@ -38,7 +38,9 @@ public class Tuple implements Cloneable {
* metrics/aggregates gathered by underlying streams.
* */
public boolean EOF;
public boolean EOF;
public boolean EXCEPTION;
public Map fields = new HashMap();
public Tuple(Map fields) {
@ -46,6 +48,10 @@ public class Tuple implements Cloneable {
EOF = true;
}
if(fields.containsKey("_EXCEPTION_")){
EXCEPTION = true;
}
this.fields.putAll(fields);
}
@ -61,6 +67,8 @@ public class Tuple implements Cloneable {
return this.fields.get(key).toString();
}
public String getException(){ return (String)this.fields.get("_EXCEPTION_"); }
public Long getLong(Object key) {
Object o = this.fields.get(key);
if(o instanceof Long) {

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExceptionStream extends TupleStream {
private TupleStream stream;
private Exception openException;
private Logger log = LoggerFactory.getLogger(ExceptionStream.class);
public ExceptionStream(TupleStream stream) {
this.stream = stream;
}
public List<TupleStream> children() {
return null;
}
public void open() {
try {
stream.open();
} catch (Exception e) {
this.openException = e;
}
}
public Tuple read() {
if(openException != null) {
//There was an exception during the open.
Map fields = new HashMap();
fields.put("_EXCEPTION_", openException.getMessage());
fields.put("EOF", true);
log.error("Error while opening Stream", openException);
return new Tuple(fields);
}
try {
return stream.read();
} catch (Exception e) {
Map fields = new HashMap();
fields.put("_EXCEPTION_", e.getMessage());
fields.put("EOF", true);
log.error("Error while reading Stream:" + e);
return new Tuple(fields);
}
}
public StreamComparator getStreamSort() {
return this.stream.getStreamSort();
}
public void close() throws IOException {
stream.close();
}
public void setStreamContext(StreamContext context) {
this.stream.setStreamContext(context);
}
}

View File

@ -30,7 +30,6 @@ import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
/**
* Queries a single Solr instance and maps SolrDocs to a Stream of Tuples.
**/
@ -135,7 +134,11 @@ public class SolrStream extends TupleStream {
* */
public void close() throws IOException {
jsonTupleStream.close();
if(jsonTupleStream != null) {
jsonTupleStream.close();
}
if(cache == null) {
client.close();
}
@ -146,22 +149,43 @@ public class SolrStream extends TupleStream {
**/
public Tuple read() throws IOException {
Map fields = jsonTupleStream.next();
try {
Map fields = jsonTupleStream.next();
if(trace) {
fields.put("_CORE_", this.baseUrl);
}
if (fields == null) {
//Return the EOF tuple.
Map m = new HashMap();
m.put("EOF", true);
return new Tuple(m);
} else {
if(fields == null) {
//Return the EOF tuple.
Map m = new HashMap();
m.put("EOF", true);
return new Tuple(m);
} else {
if(fieldMappings != null) {
fields = mapFields(fields, fieldMappings);
String msg = (String) fields.get("_EXCEPTION_");
if (msg != null) {
HandledException ioException = new HandledException(this.baseUrl + ":" + msg);
throw ioException;
}
if (trace) {
fields.put("_CORE_", this.baseUrl);
}
if (fieldMappings != null) {
fields = mapFields(fields, fieldMappings);
}
return new Tuple(fields);
}
return new Tuple(fields);
} catch (HandledException e) {
throw e;
} catch (Exception e) {
//The Stream source did not provide an exception in a format that the SolrStream could propagate.
e.printStackTrace();
throw new IOException(this.baseUrl+": An exception has occurred on the server, refer to server log for details.");
}
}
public static class HandledException extends IOException {
public HandledException(String msg) {
super(msg);
}
}

View File

@ -34,6 +34,19 @@
</updateLog>
</updateHandler>
<requestHandler name="/export" class="solr.SearchHandler">
<lst name="invariants">
<str name="rq">{!xport}</str>
<str name="wt">xsort</str>
<str name="distrib">false</str>
</lst>
<arr name="components">
<str>query</str>
</arr>
</requestHandler>
<!--
Distributed Stream processing.
-->

View File

@ -96,7 +96,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
super.setUp();
// we expect this time of exception as shards go up and down...
//ignoreException(".*");
//System.setProperty("export.test", "true");
System.setProperty("numShards", Integer.toString(sliceCount));
}
@ -167,7 +167,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
String zkHost = zkServer.getZkAddress();
streamFactory.withCollectionZkHost("collection1", zkHost);
Map params = mapParams("q","*:*","fl","id , a_s , a_i , a_f","sort", "a_f asc , a_i asc");
Map params = mapParams("q", "*:*", "fl", "id , a_s , a_i , a_f", "sort", "a_f asc , a_i asc");
//CloudSolrStream compares the values of the sort with the fl field.
//The constructor will throw an exception if the sort fields do not the
@ -508,6 +508,86 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
commit();
}
private void testExceptionStream() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
commit();
String zkHost = zkServer.getZkAddress();
//Test an error that comes originates from the /select handler
Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ExceptionStream estream = new ExceptionStream(stream);
Tuple t = getTuple(estream);
assert(t.EOF);
assert(t.EXCEPTION);
//The /select handler does not return exceptions in tuple so the generic exception is returned.
assert(t.getException().contains("An exception has occurred on the server, refer to server log for details."));
//Test an error that comes originates from the /export handler
paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export");
stream = new CloudSolrStream(zkHost, "collection1", paramsA);
estream = new ExceptionStream(stream);
t = getTuple(estream);
assert(t.EOF);
assert(t.EXCEPTION);
//The /export handler will pass through a real exception.
assert(t.getException().contains("undefined field:"));
}
private void testParallelExceptionStream() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
commit();
String zkHost = zkServer.getZkAddress();
Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ParallelStream pstream = new ParallelStream(zkHost,"collection1", stream, 2, new FieldComparator("blah", ComparatorOrder.ASCENDING));
ExceptionStream estream = new ExceptionStream(pstream);
Tuple t = getTuple(estream);
assert(t.EOF);
assert(t.EXCEPTION);
//ParallelStream requires that partitionKeys be set.
assert(t.getException().contains("When numWorkers > 1 partitionKeys must be set."));
//Test an error that originates from the /export handler
paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f,score", "sort", "a_s asc", "qt","/export", "partitionKeys","a_s");
stream = new CloudSolrStream(zkHost, "collection1", paramsA);
pstream = new ParallelStream(zkHost,"collection1", stream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
estream = new ExceptionStream(pstream);
t = getTuple(estream);
assert(t.EOF);
assert(t.EXCEPTION);
//The /export handler will pass through a real exception.
assert(t.getException().contains("undefined field:"));
}
private void testRollupStream() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
@ -524,7 +604,6 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
commit();
String zkHost = zkServer.getZkAddress();
streamFactory.withCollectionZkHost("collection1", zkHost);
Map paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
@ -647,7 +726,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
String zkHost = zkServer.getZkAddress();
streamFactory.withCollectionZkHost("collection1", zkHost);
Map paramsA = mapParams("q","*:*","fl","a_s,a_i,a_f","sort", "a_s asc", "partitionKeys", "a_s");
Map paramsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
Bucket[] buckets = {new Bucket("a_s")};
@ -1084,12 +1163,14 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
testReducerStream();
testRollupStream();
testZeroReducerStream();
testExceptionStream();
testParallelEOF();
testParallelUniqueStream();
testParallelRankStream();
testParallelMergeStream();
testParallelRollupStream();
testParallelReducerStream();
testParallelExceptionStream();
testZeroParallelReducerStream();
}
@ -1123,6 +1204,14 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
return tuples;
}
protected Tuple getTuple(TupleStream tupleStream) throws IOException {
tupleStream.open();
Tuple t = tupleStream.read();
tupleStream.close();
return t;
}
protected boolean assertOrder(List<Tuple> tuples, int... ids) throws Exception {
int i = 0;
for(int val : ids) {