From e04981edc7995280b488197611847f9a7b201e2e Mon Sep 17 00:00:00 2001 From: Joel Bernstein Date: Wed, 9 Dec 2015 19:52:28 +0000 Subject: [PATCH] SOLR-8266: Remove Java Serialization from the Streaming API git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1718947 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 4 +++ .../org/apache/solr/handler/SQLHandler.java | 2 -- .../apache/solr/handler/StreamHandler.java | 12 +------ .../conf/solrconfig.xml | 15 -------- .../conf/solrconfig.xml | 14 -------- .../solrj/io/stream/ParallelStream.java | 27 ++------------ .../client/solrj/io/stream/StreamingTest.java | 36 ++++++++++--------- 7 files changed, 27 insertions(+), 83 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 68fa78a3bcc..7ab31543551 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -178,6 +178,10 @@ Other Changes * SOLR-8131: Make ManagedIndexSchemaFactory the default schemaFactory when luceneMatchVersion >= 6 (Uwe Schindler, shalin, Varun Thacker) +* SOLR-8266: Remove Java Serialization from the Streaming API. The /stream handler now only accepts + Streaming Expressions. (Jason Gerlowski, Joel Bernstein) + + ======================= 5.5.0 ======================= Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release diff --git a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java index 463bf3dcb53..9078de421bc 100644 --- a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java @@ -227,7 +227,6 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { .withFunctionName("count", CountMetric.class); parallelStream.setStreamFactory(factory); - parallelStream.setObjectSerialize(false); tupleStream = parallelStream; } @@ -358,7 +357,6 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { .withFunctionName("unique", UniqueStream.class); parallelStream.setStreamFactory(factory); - parallelStream.setObjectSerialize(false); tupleStream = parallelStream; } diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 7558521583f..63cca8560d8 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -145,20 +145,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware { SolrParams params = req.getParams(); params = adjustParams(params); req.setParams(params); - boolean objectSerialize = params.getBool("objectSerialize", false); TupleStream tupleStream = null; try { - if (objectSerialize) { - String encodedStream = params.get("stream"); - encodedStream = URLDecoder.decode(encodedStream, "UTF-8"); - byte[] bytes = Base64.base64ToByteArray(encodedStream); - ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes); - ObjectInputStream objectInputStream = new ObjectInputStream(byteStream); - tupleStream = (TupleStream) objectInputStream.readObject(); - } else { - tupleStream = this.streamFactory.constructStream(params.get("stream")); - } + tupleStream = this.streamFactory.constructStream(params.get("stream")); } catch (Exception e) { //Catch exceptions that occur while the stream is being created. This will include streaming expression parse rules. SolrException.log(logger, e); diff --git a/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml b/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml index 2d491d48e0e..100b1c3dfbe 100644 --- a/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml +++ b/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml @@ -848,20 +848,6 @@ - diff --git a/solr/server/solr/configsets/sample_techproducts_configs/conf/solrconfig.xml b/solr/server/solr/configsets/sample_techproducts_configs/conf/solrconfig.xml index 48c652f7f09..642853902ad 100644 --- a/solr/server/solr/configsets/sample_techproducts_configs/conf/solrconfig.xml +++ b/solr/server/solr/configsets/sample_techproducts_configs/conf/solrconfig.xml @@ -876,18 +876,6 @@ - - diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java index 5ff0ccf77e7..6c43a14aea5 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java @@ -58,7 +58,6 @@ public class ParallelStream extends CloudSolrStream implements Expressible { private TupleStream tupleStream; private int workers; - private boolean objectSerialize = true; private transient StreamFactory streamFactory; public ParallelStream(String zkHost, @@ -75,7 +74,6 @@ public class ParallelStream extends CloudSolrStream implements Expressible { String expressionString, int workers, StreamComparator comp) throws IOException { - objectSerialize = false; TupleStream tStream = this.streamFactory.constructStream(expressionString); init(zkHost,collection, tStream, workers,comp); } @@ -86,7 +84,6 @@ public class ParallelStream extends CloudSolrStream implements Expressible { public ParallelStream(StreamExpression expression, StreamFactory factory) throws IOException { // grab all parameters out - objectSerialize = false; String collectionName = factory.getValueOperand(expression, 0); StreamExpressionNamedParameter workersParam = factory.getNamedOperand(expression, "workers"); List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); @@ -157,7 +154,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible { this.tupleStream = tupleStream; // requires Expressible stream and comparator - if(!objectSerialize && !(tupleStream instanceof Expressible)){ + if(! (tupleStream instanceof Expressible)){ throw new IOException("Unable to create ParallelStream with a non-expressible TupleStream."); } } @@ -236,18 +233,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible { protected void constructStreams() throws IOException { try { - Object pushStream = null; - - if (objectSerialize) { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - ObjectOutputStream out = new ObjectOutputStream(bout); - out.writeObject(tupleStream); - byte[] bytes = bout.toByteArray(); - String encoded = Base64.byteArrayToBase64(bytes, 0, bytes.length); - pushStream = URLEncoder.encode(encoded, "UTF-8"); - } else { - pushStream = ((Expressible) tupleStream).toExpression(streamFactory); - } + Object pushStream = ((Expressible) tupleStream).toExpression(streamFactory); ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader(); ClusterState clusterState = zkStateReader.getClusterState(); @@ -273,7 +259,6 @@ public class ParallelStream extends CloudSolrStream implements Expressible { params.put("workerID", w); params.put("stream", pushStream); params.put("qt","/stream"); - params.put("objectSerialize", objectSerialize); Replica rep = shuffler.get(w); ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep); String url = zkProps.getCoreUrl(); @@ -287,12 +272,4 @@ public class ParallelStream extends CloudSolrStream implements Expressible { throw new IOException(e); } } - - public void setObjectSerialize(boolean objectSerialize) { - this.objectSerialize = objectSerialize; - } - - public boolean getObjectSerialize() { - return objectSerialize; - } } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java index f6bccbbcfc6..a09acab9fbe 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java @@ -119,7 +119,8 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { .withFunctionName("top", RankStream.class) .withFunctionName("group", ReducerStream.class) .withFunctionName("count", RecordCountStream.class) - ; + .withFunctionName("rollup", RollupStream.class) + .withFunctionName("parallel", ParallelStream.class); } private void testUniqueStream() throws Exception { @@ -191,6 +192,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", paramsA); ParallelStream pstream = new ParallelStream(zkHost, "collection1", stream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING)); + attachStreamFactory(pstream); List tuples = getTuples(pstream); assert(tuples.size() == 20); // Each tuple will be double counted. @@ -200,9 +202,6 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { } - - - private void testParallelUniqueStream() throws Exception { indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); @@ -224,6 +223,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f")); ParallelStream pstream = new ParallelStream(zkHost, "collection1", ustream, 2, new FieldComparator("a_f",ComparatorOrder.ASCENDING)); + attachStreamFactory(pstream); List tuples = getTuples(pstream); assert(tuples.size() == 5); assertOrder(tuples, 0,1,3,4,6); @@ -290,6 +290,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); RankStream rstream = new RankStream(stream, 11, new FieldComparator("a_i",ComparatorOrder.DESCENDING)); ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING)); + attachStreamFactory(pstream); List tuples = getTuples(pstream); assert(tuples.size() == 10); @@ -457,6 +458,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s")); ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.ASCENDING)); + attachStreamFactory(pstream); List tuples = getTuples(pstream); assert(tuples.size() == 3); @@ -481,6 +483,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { rstream = new ReducerStream(stream, new FieldEqualitor("a_s")); pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s",ComparatorOrder.DESCENDING)); + attachStreamFactory(pstream); tuples = getTuples(pstream); assert(tuples.size() == 3); @@ -1396,6 +1399,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { RollupStream rollupStream = new RollupStream(stream, buckets, metrics); ParallelStream parallelStream = new ParallelStream(zkHost, "collection1", rollupStream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING)); + attachStreamFactory(parallelStream); List tuples = getTuples(parallelStream); assert(tuples.size() == 3); @@ -1498,6 +1502,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { ReducerStream rstream = new ReducerStream(stream, new FieldEqualitor("a_s")); ParallelStream pstream = new ParallelStream(zkHost, "collection1", rstream, 2, new FieldComparator("a_s", ComparatorOrder.ASCENDING)); + attachStreamFactory(pstream); List tuples = getTuples(pstream); assert(tuples.size() == 0); del("*:*"); @@ -1642,6 +1647,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING)); ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING)); + attachStreamFactory(pstream); List tuples = getTuples(pstream); assert(tuples.size() == 9); @@ -1656,6 +1662,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.DESCENDING)); pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.DESCENDING)); + attachStreamFactory(pstream); tuples = getTuples(pstream); assert(tuples.size() == 8); @@ -1691,24 +1698,15 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB); MergeStream mstream = new MergeStream(streamA, streamB, new FieldComparator("a_i",ComparatorOrder.ASCENDING)); - RecordCountStream cstream = new RecordCountStream(mstream); - ParallelStream pstream = new ParallelStream(zkHost, "collection1", cstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING)); + ParallelStream pstream = new ParallelStream(zkHost, "collection1", mstream, 2, new FieldComparator("a_i",ComparatorOrder.ASCENDING)); + + attachStreamFactory(pstream); List tuples = getTuples(pstream); assert(tuples.size() == 9); Map eofTuples = pstream.getEofTuples(); assert(eofTuples.size() == 2); // There should be an EOF Tuple for each worker. - long totalCount = 0; - - Iterator it = eofTuples.values().iterator(); - while(it.hasNext()) { - Tuple t = it.next(); - totalCount += t.getLong("count"); - } - - assert(tuples.size() == totalCount); - del("*:*"); commit(); } @@ -1897,4 +1895,10 @@ public class StreamingTest extends AbstractFullDistribZkTestBase { SolrInputDocument doc = getDoc(fields); indexDoc(doc); } + + private void attachStreamFactory(TupleStream tupleStream) { + StreamContext streamContext = new StreamContext(); + streamContext.setStreamFactory(streamFactory); + tupleStream.setStreamContext(streamContext); + } }