diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 515a90bfab8..155933d8374 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -161,8 +161,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, .withFunctionName("calc", CalculatorStream.class) .withFunctionName("eval",EvalStream.class) .withFunctionName("echo", EchoStream.class) + .withFunctionName("cell", CellStream.class) + .withFunctionName("list", ListStream.class) - // metrics + // metrics .withFunctionName("min", MinMetric.class) .withFunctionName("max", MaxMetric.class) .withFunctionName("avg", MeanMetric.class) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java new file mode 100644 index 00000000000..1696a1aa704 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +public class CellStream extends TupleStream implements Expressible { + + private static final long serialVersionUID = 1; + private TupleStream stream; + private String name; + private Tuple tuple; + private Tuple EOFTuple; + + public CellStream(String name, TupleStream stream) throws IOException { + init(name, stream); + } + + public CellStream(StreamExpression expression, StreamFactory factory) throws IOException { + String name = factory.getValueOperand(expression, 0); + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); + + if(streamExpressions.size() != 1){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting 1 stream but found %d",expression, streamExpressions.size())); + } + + TupleStream tupleStream = factory.constructStream(streamExpressions.get(0)); + init(name, tupleStream); + } + + private void init(String name, TupleStream tupleStream) { + this.name = name; + this.stream = tupleStream; + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + if(includeStreams) { + expression.addParameter(((Expressible)stream).toExpression(factory)); + } + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_DECORATOR); + explanation.setExpression(toExpression(factory, false).toString()); + explanation.addChild(stream.toExplanation(factory)); + + return explanation; + } + + public void setStreamContext(StreamContext context) { + this.stream.setStreamContext(context); + } + + public List children() { + List l = new ArrayList(); + l.add(stream); + + return l; + } + + public Tuple read() throws IOException { + if(tuple.EOF) { + return tuple; + } else { + Tuple t = tuple; + tuple = EOFTuple; + return t; + } + } + + public void close() throws IOException { + } + + public void open() throws IOException { + try { + stream.open(); + List list = new ArrayList(); + while(true) { + Tuple tuple = stream.read(); + if(tuple.EOF) { + EOFTuple = tuple; + break; + } else { + list.add(tuple); + } + } + + Map map = new HashMap(); + map.put(name, list); + tuple = new Tuple(map); + } finally { + stream.close(); + } + } + + /** Return the stream sort - ie, the order in which records are returned */ + public StreamComparator getStreamSort(){ + return null; + } + + public int getCost() { + return 0; + } + + +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java new file mode 100644 index 00000000000..e295a581597 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +public class ListStream extends TupleStream implements Expressible { + + private static final long serialVersionUID = 1; + private TupleStream[] streams; + private TupleStream currentStream; + private int streamIndex; + + public ListStream(TupleStream... streams) throws IOException { + init(streams); + } + + public ListStream(StreamExpression expression, StreamFactory factory) throws IOException { + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); + TupleStream[] streams = new TupleStream[streamExpressions.size()]; + for(int idx = 0; idx < streamExpressions.size(); ++idx){ + streams[idx] = factory.constructStream(streamExpressions.get(idx)); + } + + init(streams); + } + + private void init(TupleStream ... tupleStreams) { + this.streams = tupleStreams; + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + if(includeStreams) { + for(TupleStream stream : streams) { + expression.addParameter(((Expressible)stream).toExpression(factory)); + } + } + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_DECORATOR); + explanation.setExpression(toExpression(factory, false).toString()); + for(TupleStream stream : streams) { + explanation.addChild(stream.toExplanation(factory)); + } + + return explanation; + } + + public void setStreamContext(StreamContext context) { + for(TupleStream stream : streams) { + stream.setStreamContext(context); + } + } + + public List children() { + List l = new ArrayList(); + for(TupleStream stream : streams) { + l.add(stream); + } + return l; + } + + public Tuple read() throws IOException { + while(true) { + if (currentStream == null) { + if (streamIndex < streams.length) { + currentStream = streams[streamIndex]; + currentStream.open(); + } else { + HashMap map = new HashMap(); + map.put("EOF", true); + return new Tuple(map); + } + } + + Tuple tuple = currentStream.read(); + if (tuple.EOF) { + currentStream.close(); + currentStream = null; + ++streamIndex; + } else { + return tuple; + } + } + } + + public void close() throws IOException { + } + + public void open() throws IOException { + + + } + + /** Return the stream sort - ie, the order in which records are returned */ + public StreamComparator getStreamSort(){ + return null; + } + + public int getCost() { + return 0; + } + + +} \ No newline at end of file diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index ce0a4ce692c..250563957e3 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -242,6 +242,9 @@ public class StreamExpressionTest extends SolrCloudTestCase { } } + + + @Test public void testCloudSolrStreamWithZkHost() throws Exception { @@ -5034,6 +5037,58 @@ public class StreamExpressionTest extends SolrCloudTestCase { } + @Test + public void testListStream() throws Exception { + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.add(id, "hello", "test_t", "l b c d c"); + updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS); + + String expr = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=id, sort=\"id desc\")"; + String cat = "list("+expr+","+expr+")"; + ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", cat); + paramsLoc.set("qt", "/stream"); + + String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; + TupleStream solrStream = new SolrStream(url, paramsLoc); + + StreamContext context = new StreamContext(); + solrStream.setStreamContext(context); + List tuples = getTuples(solrStream); + assertTrue(tuples.size() == 2); + String s = (String)tuples.get(0).get("id"); + assertTrue(s.equals("hello")); + s = (String)tuples.get(1).get("id"); + assertTrue(s.equals("hello")); + + } + + @Test + public void testCellStream() throws Exception { + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.add(id, "hello", "test_t", "l b c d c"); + updateRequest.add(id, "hello1", "test_t", "l b c d c"); + + updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS); + + String expr = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=id, sort=\"id desc\")"; + String cat = "cell(results,"+expr+")"; + ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", cat); + paramsLoc.set("qt", "/stream"); + + String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; + TupleStream solrStream = new SolrStream(url, paramsLoc); + + StreamContext context = new StreamContext(); + solrStream.setStreamContext(context); + List tuples = getTuples(solrStream); + assertTrue(tuples.size() == 1); + List results = (List)tuples.get(0).get("results"); + assertTrue(results.get(0).get("id").equals("hello1")); + assertTrue(results.get(1).get("id").equals("hello")); + } + @Test public void testConvertEvaluator() throws Exception {