diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 68fa78a3bcc..7ab31543551 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -178,6 +178,10 @@ Other Changes
* SOLR-8131: Make ManagedIndexSchemaFactory the default schemaFactory when luceneMatchVersion >= 6
(Uwe Schindler, shalin, Varun Thacker)
+* SOLR-8266: Remove Java Serialization from the Streaming API. The /stream handler now only accepts
+ Streaming Expressions. (Jason Gerlowski, Joel Bernstein)
+
+
======================= 5.5.0 =======================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release
diff --git a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
index 463bf3dcb53..9078de421bc 100644
--- a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
@@ -227,7 +227,6 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
.withFunctionName("count", CountMetric.class);
parallelStream.setStreamFactory(factory);
- parallelStream.setObjectSerialize(false);
tupleStream = parallelStream;
}
@@ -358,7 +357,6 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
.withFunctionName("unique", UniqueStream.class);
parallelStream.setStreamFactory(factory);
- parallelStream.setObjectSerialize(false);
tupleStream = parallelStream;
}
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index 7558521583f..63cca8560d8 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -145,20 +145,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
SolrParams params = req.getParams();
params = adjustParams(params);
req.setParams(params);
- boolean objectSerialize = params.getBool("objectSerialize", false);
TupleStream tupleStream = null;
try {
- if (objectSerialize) {
- String encodedStream = params.get("stream");
- encodedStream = URLDecoder.decode(encodedStream, "UTF-8");
- byte[] bytes = Base64.base64ToByteArray(encodedStream);
- ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
- ObjectInputStream objectInputStream = new ObjectInputStream(byteStream);
- tupleStream = (TupleStream) objectInputStream.readObject();
- } else {
- tupleStream = this.streamFactory.constructStream(params.get("stream"));
- }
+ tupleStream = this.streamFactory.constructStream(params.get("stream"));
} catch (Exception e) {
//Catch exceptions that occur while the stream is being created. This will include streaming expression parse rules.
SolrException.log(logger, e);
diff --git a/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml b/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml
index 2d491d48e0e..100b1c3dfbe 100644
--- a/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml
+++ b/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml
@@ -848,20 +848,6 @@
-
diff --git a/solr/server/solr/configsets/sample_techproducts_configs/conf/solrconfig.xml b/solr/server/solr/configsets/sample_techproducts_configs/conf/solrconfig.xml
index 48c652f7f09..642853902ad 100644
--- a/solr/server/solr/configsets/sample_techproducts_configs/conf/solrconfig.xml
+++ b/solr/server/solr/configsets/sample_techproducts_configs/conf/solrconfig.xml
@@ -876,18 +876,6 @@
-
-
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index 5ff0ccf77e7..6c43a14aea5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -58,7 +58,6 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
private TupleStream tupleStream;
private int workers;
- private boolean objectSerialize = true;
private transient StreamFactory streamFactory;
public ParallelStream(String zkHost,
@@ -75,7 +74,6 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
String expressionString,
int workers,
StreamComparator comp) throws IOException {
- objectSerialize = false;
TupleStream tStream = this.streamFactory.constructStream(expressionString);
init(zkHost,collection, tStream, workers,comp);
}
@@ -86,7 +84,6 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
public ParallelStream(StreamExpression expression, StreamFactory factory) throws IOException {
// grab all parameters out
- objectSerialize = false;
String collectionName = factory.getValueOperand(expression, 0);
StreamExpressionNamedParameter workersParam = factory.getNamedOperand(expression, "workers");
List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
@@ -157,7 +154,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
this.tupleStream = tupleStream;
// requires Expressible stream and comparator
- if(!objectSerialize && !(tupleStream instanceof Expressible)){
+ if(! (tupleStream instanceof Expressible)){
throw new IOException("Unable to create ParallelStream with a non-expressible TupleStream.");
}
}
@@ -236,18 +233,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
protected void constructStreams() throws IOException {
try {
- Object pushStream = null;
-
- if (objectSerialize) {
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(bout);
- out.writeObject(tupleStream);
- byte[] bytes = bout.toByteArray();
- String encoded = Base64.byteArrayToBase64(bytes, 0, bytes.length);
- pushStream = URLEncoder.encode(encoded, "UTF-8");
- } else {
- pushStream = ((Expressible) tupleStream).toExpression(streamFactory);
- }
+ Object pushStream = ((Expressible) tupleStream).toExpression(streamFactory);
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
@@ -273,7 +259,6 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
params.put("workerID", w);
params.put("stream", pushStream);
params.put("qt","/stream");
- params.put("objectSerialize", objectSerialize);
Replica rep = shuffler.get(w);
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
String url = zkProps.getCoreUrl();
@@ -287,12 +272,4 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
throw new IOException(e);
}
}
-
- public void setObjectSerialize(boolean objectSerialize) {
- this.objectSerialize = objectSerialize;
- }
-
- public boolean getObjectSerialize() {
- return objectSerialize;
- }
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index f6bccbbcfc6..a09acab9fbe 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -119,7 +119,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
.withFunctionName("top", RankStream.class)
.withFunctionName("group", ReducerStream.class)
.withFunctionName("count", RecordCountStream.class)
- ;
+ .withFunctionName("rollup", RollupStream.class)
+ .withFunctionName("parallel", ParallelStream.class);
}
private void testUniqueStream() throws Exception {
@@ -191,6 +192,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA);
ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+ attachStreamFactory(pstream);
List tuples = getTuples(pstream);
assert(tuples.size() == 20); // Each tuple will be double counted.
@@ -200,9 +202,6 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
}
-
-
-
private void testParallelUniqueStream() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
@@ -224,6 +223,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new FieldComparator("a_f",ComparatorOrder.ASCENDING));
+ attachStreamFactory(pstream);
List tuples = getTuples(pstream);
assert(tuples.size() == 5);
assertOrder(tuples, 0,1,3,4,6);
@@ -290,6 +290,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params);
RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+ attachStreamFactory(pstream);
List tuples = getTuples(pstream);
assert(tuples.size() == 10);
@@ -457,6 +458,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING));
+ attachStreamFactory(pstream);
List tuples = getTuples(pstream);
assert(tuples.size() == 3);
@@ -481,6 +483,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING));
+ attachStreamFactory(pstream);
tuples = getTuples(pstream);
assert(tuples.size() == 3);
@@ -1396,6 +1399,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
ParallelStream parallelStream = new ParallelStream(zkHost, "collection1", rollupStream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+ attachStreamFactory(parallelStream);
List tuples = getTuples(parallelStream);
assert(tuples.size() == 3);
@@ -1498,6 +1502,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s"));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
+ attachStreamFactory(pstream);
List tuples = getTuples(pstream);
assert(tuples.size() == 0);
del("*:*");
@@ -1642,6 +1647,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+ attachStreamFactory(pstream);
List tuples = getTuples(pstream);
assert(tuples.size() == 9);
@@ -1656,6 +1662,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING));
+ attachStreamFactory(pstream);
tuples = getTuples(pstream);
assert(tuples.size() == 8);
@@ -1691,24 +1698,15 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB);
MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
- RecordCountStream cstream = new RecordCountStream(mstream);
- ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+ ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING));
+
+ attachStreamFactory(pstream);
List tuples = getTuples(pstream);
assert(tuples.size() == 9);
Map eofTuples = pstream.getEofTuples();
assert(eofTuples.size() == 2); // There should be an EOF Tuple for each worker.
- long totalCount = 0;
-
- Iterator it = eofTuples.values().iterator();
- while(it.hasNext()) {
- Tuple t = it.next();
- totalCount += t.getLong("count");
- }
-
- assert(tuples.size() == totalCount);
-
del("*:*");
commit();
}
@@ -1897,4 +1895,10 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
SolrInputDocument doc = getDoc(fields);
indexDoc(doc);
}
+
+ private void attachStreamFactory(TupleStream tupleStream) {
+ StreamContext streamContext = new StreamContext();
+ streamContext.setStreamFactory(streamFactory);
+ tupleStream.setStreamContext(streamContext);
+ }
}