SOLR-8266: Remove Java Serialization from the Streaming API

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1718947 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Joel Bernstein 2015-12-09 19:52:28 +00:00
parent 40486d3220
commit e04981edc7
7 changed files with 27 additions and 83 deletions

View File

@ -178,6 +178,10 @@ Other Changes
* SOLR-8131: Make ManagedIndexSchemaFactory the default schemaFactory when luceneMatchVersion >= 6
(Uwe Schindler, shalin, Varun Thacker)
* SOLR-8266: Remove Java Serialization from the Streaming API. The /stream handler now only accepts
Streaming Expressions. (Jason Gerlowski, Joel Bernstein)
======================= 5.5.0 =======================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release

View File

@ -227,7 +227,6 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
.withFunctionName("count", CountMetric.class);
parallelStream.setStreamFactory(factory);
parallelStream.setObjectSerialize(false);
tupleStream = parallelStream;
}
@ -358,7 +357,6 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
.withFunctionName("unique", UniqueStream.class);
parallelStream.setStreamFactory(factory);
parallelStream.setObjectSerialize(false);
tupleStream = parallelStream;
}

View File

@ -145,20 +145,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
SolrParams params = req.getParams();
params = adjustParams(params);
req.setParams(params);
boolean objectSerialize = params.getBool("objectSerialize", false);
TupleStream tupleStream = null;
try {
if (objectSerialize) {
String encodedStream = params.get("stream");
encodedStream = URLDecoder.decode(encodedStream, "UTF-8");
byte[] bytes = Base64.base64ToByteArray(encodedStream);
ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(byteStream);
tupleStream = (TupleStream) objectInputStream.readObject();
} else {
tupleStream = this.streamFactory.constructStream(params.get("stream"));
}
tupleStream = this.streamFactory.constructStream(params.get("stream"));
} catch (Exception e) {
//Catch exceptions that occur while the stream is being created. This will include streaming expression parse rules.
SolrException.log(logger, e);

View File

@ -848,20 +848,6 @@
</requestHandler>
<!--
Uncomment for distributed Stream processing (also required by the /sql handler in map_reduce mode).
SECURTIY WARNING: This feature uses Java Serialization for RPC (Remote Procedure Calls) to send executable
Java Objects to Worker nodes.
Solr also currently has apache commons-collections in it's classpath.
This makes Solr vulnerable to this security exploit:
https://issues.apache.org/jira/browse/COLLECTIONS-580.
<requestHandler name="/stream" class="solr.StreamHandler">
<lst name="invariants">
<str name="wt">json</str>
@ -869,7 +855,6 @@
</lst>
</requestHandler>
-->
<requestHandler name="/sql" class="solr.SQLHandler">
<lst name="invariants">

View File

@ -876,18 +876,6 @@
</requestHandler>
<!--
Uncomment for distributed Stream processing (also required by the /sql handler in map_reduce mode).
SECURTIY WARNING: This feature uses Java Serialization for RPC (Remote Procedure Calls) to send executable
Java Objects to Worker nodes.
Solr also currently has apache commons-collections in it's classpath.
This makes Solr vulnerable to this security exploit:
https://issues.apache.org/jira/browse/COLLECTIONS-580.
<requestHandler name="/stream" class="solr.StreamHandler">
@ -897,8 +885,6 @@
</lst>
</requestHandler>
-->
<requestHandler name="/sql" class="solr.SQLHandler">
<lst name="invariants">

View File

@ -58,7 +58,6 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
private TupleStream tupleStream;
private int workers;
private boolean objectSerialize = true;
private transient StreamFactory streamFactory;
public ParallelStream(String zkHost,
@ -75,7 +74,6 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
String expressionString,
int workers,
StreamComparator comp) throws IOException {
objectSerialize = false;
TupleStream tStream = this.streamFactory.constructStream(expressionString);
init(zkHost,collection, tStream, workers,comp);
}
@ -86,7 +84,6 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
public ParallelStream(StreamExpression expression, StreamFactory factory) throws IOException {
// grab all parameters out
objectSerialize = false;
String collectionName = factory.getValueOperand(expression, 0);
StreamExpressionNamedParameter workersParam = factory.getNamedOperand(expression, "workers");
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
@ -157,7 +154,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
this.tupleStream = tupleStream;
// requires Expressible stream and comparator
if(!objectSerialize && !(tupleStream instanceof Expressible)){
if(! (tupleStream instanceof Expressible)){
throw new IOException("Unable to create ParallelStream with a non-expressible TupleStream.");
}
}
@ -236,18 +233,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
protected void constructStreams() throws IOException {
try {
Object pushStream = null;
if (objectSerialize) {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bout);
out.writeObject(tupleStream);
byte[] bytes = bout.toByteArray();
String encoded = Base64.byteArrayToBase64(bytes, 0, bytes.length);
pushStream = URLEncoder.encode(encoded, "UTF-8");
} else {
pushStream = ((Expressible) tupleStream).toExpression(streamFactory);
}
Object pushStream = ((Expressible) tupleStream).toExpression(streamFactory);
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
@ -273,7 +259,6 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
params.put("workerID", w);
params.put("stream", pushStream);
params.put("qt","/stream");
params.put("objectSerialize", objectSerialize);
Replica rep = shuffler.get(w);
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
String url = zkProps.getCoreUrl();
@ -287,12 +272,4 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
throw new IOException(e);
}
}
public void setObjectSerialize(boolean objectSerialize) {
this.objectSerialize = objectSerialize;
}
public boolean getObjectSerialize() {
return objectSerialize;
}
}

View File

@ -119,7 +119,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
.withFunctionName("top", RankStream.class)
.withFunctionName("group", ReducerStream.class)
.withFunctionName("count", RecordCountStream.class)
;
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("parallel", ParallelStream.class);
}
private void testUniqueStream() throws Exception {
@ -191,6 +192,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 20); // Each tuple will be double counted.
@ -200,9 +202,6 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
}
private void testParallelUniqueStream() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
@ -224,6 +223,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new FieldComparator("a_f",ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 5);
assertOrder(tuples, 0,1,3,4,6);
@ -290,6 +290,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 10);
@ -457,6 +458,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 3);
@ -481,6 +483,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
attachStreamFactory(pstream);
tuples = getTuples(pstream);
assert(tuples.size() == 3);
@ -1396,6 +1399,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
ParallelStream parallelStream = new ParallelStream(zkHost, "collection1", rollupStream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
attachStreamFactory(parallelStream);
List<Tuple> tuples = getTuples(parallelStream);
assert(tuples.size() == 3);
@ -1498,6 +1502,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 0);
del("*:*");
@ -1642,6 +1647,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 9);
@ -1656,6 +1662,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
attachStreamFactory(pstream);
tuples = getTuples(pstream);
assert(tuples.size() == 8);
@ -1691,24 +1698,15 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
RecordCountStream cstream = new RecordCountStream(mstream);
ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 9);
Map<String, Tuple> eofTuples = pstream.getEofTuples();
assert(eofTuples.size() == 2); // There should be an EOF Tuple for each worker.
long totalCount = 0;
Iterator<Tuple> it = eofTuples.values().iterator();
while(it.hasNext()) {
Tuple t = it.next();
totalCount += t.getLong("count");
}
assert(tuples.size() == totalCount);
del("*:*");
commit();
}
@ -1897,4 +1895,10 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
SolrInputDocument doc = getDoc(fields);
indexDoc(doc);
}
private void attachStreamFactory(TupleStream tupleStream) {
StreamContext streamContext = new StreamContext();
streamContext.setStreamFactory(streamFactory);
tupleStream.setStreamContext(streamContext);
}
}