diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index d301024aa46..536fe160259 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -17,32 +17,79 @@
package org.apache.solr.handler;
+import java.util.Map.Entry;
+import java.net.URLDecoder;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
-import java.net.URLDecoder;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.solr.client.solrj.io.SolrClientCache;
-import org.apache.solr.client.solrj.io.TupleStream;
-import org.apache.solr.client.solrj.io.StreamContext;
-import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
+import org.apache.solr.client.solrj.io.stream.ExpressibleStream;
+import org.apache.solr.client.solrj.io.stream.MergeStream;
+import org.apache.solr.client.solrj.io.stream.ParallelStream;
+import org.apache.solr.client.solrj.io.stream.RankStream;
+import org.apache.solr.client.solrj.io.stream.ReducerStream;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.UniqueStream;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.common.params.SolrParams;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.apache.solr.common.util.Base64;
-
public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
private SolrClientCache clientCache = new SolrClientCache();
-
+ private StreamFactory streamFactory = new StreamFactory();
+
public void inform(SolrCore core) {
+
+ /* The stream factory will always contain the zkUrl for the given collection
+ * Adds default streams with their corresponding function names. These
+ * defaults can be overridden or added to in the solrConfig in the stream
+ * RequestHandler def. Example config override
+ *
+ * org.apache.solr.client.solrj.io.stream.ReducerStream
+ * org.apache.solr.client.solrj.io.stream.CountStream
+ *
+ * */
- core.addCloseHook( new CloseHook() {
+ String defaultCollection = null;
+ String defaultZkhost = null;
+ CoreContainer coreContainer = core.getCoreDescriptor().getCoreContainer();
+
+ if(coreContainer.isZooKeeperAware()) {
+ defaultCollection = core.getCoreDescriptor().getCollectionName();
+ defaultZkhost = core.getCoreDescriptor().getCoreContainer().getZkController().getZkServerAddress();
+ streamFactory.withCollectionZkHost(defaultCollection, defaultZkhost);
+ }
+
+ streamFactory
+ .withStreamFunction("search", CloudSolrStream.class)
+ .withStreamFunction("merge", MergeStream.class)
+ .withStreamFunction("unique", UniqueStream.class)
+ .withStreamFunction("top", RankStream.class)
+ .withStreamFunction("group", ReducerStream.class)
+ .withStreamFunction("parallel", ParallelStream.class);
+
+
+ // This pulls all the overrides and additions from the config
+ Object functionMappingsObj = initArgs.get("streamFunctions");
+ if(null != functionMappingsObj){
+ NamedList> functionMappings = (NamedList>)functionMappingsObj;
+ for(Entry functionMapping : functionMappings){
+ Class> clazz = core.getResourceLoader().findClass((String)functionMapping.getValue(), ExpressibleStream.class);
+ streamFactory.withStreamFunction(functionMapping.getKey(), clazz);
+ }
+ }
+
+ core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {
//To change body of implemented methods use File | Settings | File Templates.
@@ -57,15 +104,23 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
SolrParams params = req.getParams();
- String encodedStream = params.get("stream");
- encodedStream = URLDecoder.decode(encodedStream, "UTF-8");
- byte[] bytes = Base64.base64ToByteArray(encodedStream);
- ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
- ObjectInputStream objectInputStream = new ObjectInputStream(byteStream);
- TupleStream tupleStream = (TupleStream)objectInputStream.readObject();
- int worker = params.getInt("workerID");
- int numWorkers = params.getInt("numWorkers");
+ boolean objectSerialize = params.getBool("objectSerialize", false);
+ TupleStream tupleStream = null;
+
+ if(objectSerialize) {
+ String encodedStream = params.get("stream");
+ encodedStream = URLDecoder.decode(encodedStream, "UTF-8");
+ byte[] bytes = Base64.base64ToByteArray(encodedStream);
+ ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
+ ObjectInputStream objectInputStream = new ObjectInputStream(byteStream);
+ tupleStream = (TupleStream)objectInputStream.readObject();
+ } else {
+ tupleStream = this.streamFactory.constructStream(params.get("stream"));
+ }
+
+ int worker = params.getInt("workerID", 0);
+ int numWorkers = params.getInt("numWorkers", 1);
StreamContext context = new StreamContext();
context.workerID = worker;
context.numWorkers = numWorkers;
@@ -81,4 +136,4 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
public String getSource() {
return null;
}
-}
\ No newline at end of file
+}
diff --git a/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java b/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
index e52941f5eed..308b07688ad 100644
--- a/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
+++ b/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.io.Writer;
import java.util.*;
-import org.apache.solr.client.solrj.io.TupleStream;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.lucene.index.StorableField;
import org.apache.lucene.index.StoredDocument;
diff --git a/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml b/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml
index 5602aa19c9e..7bf719c153b 100644
--- a/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml
+++ b/solr/server/solr/configsets/data_driven_schema_configs/conf/solrconfig.xml
@@ -854,6 +854,37 @@
+
+
+
+
+
+ {!xport}
+ xsort
+ false
+
+
+
+ query
+
+
+
+
+
+
+
+
+ json
+ false
+
+
+
+