From d5e5b5e948e151f61cddf1c57eda2940aebcc07d Mon Sep 17 00:00:00 2001 From: Joel Bernstein Date: Tue, 18 Apr 2017 11:21:15 -0400 Subject: [PATCH] SOLR-10504: Add echo Streaming Expression --- .../apache/solr/handler/StreamHandler.java | 1 + .../client/solrj/io/stream/EchoStream.java | 119 ++++++++++++++++++ .../solrj/io/stream/StreamExpressionTest.java | 92 ++++++++++++++ 3 files changed, 212 insertions(+) create mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EchoStream.java diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index e66d2346c57..ff228a45b52 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -159,6 +159,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, .withFunctionName("cartesianProduct", CartesianProductStream.class) .withFunctionName("shuffle", ShuffleStream.class) .withFunctionName("eval", CalculatorStream.class) + .withFunctionName("echo", EchoStream.class) // metrics .withFunctionName("min", MinMetric.class) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EchoStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EchoStream.java new file mode 100644 index 00000000000..2dd95fe5c42 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EchoStream.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +public class EchoStream extends TupleStream implements Expressible { + + private static final long serialVersionUID = 1; + private boolean finished; + private String echo; + + public EchoStream(String echo) throws IOException { + this.echo = stripQuotes(echo); + } + + public EchoStream(StreamExpression expression, StreamFactory factory) throws IOException { + this.echo = stripQuotes(factory.getValueOperand(expression, 0)); + this.echo = echo.replace("\\\"", "\""); + } + + private String stripQuotes(String s){ + if(s.startsWith("\"")) { + return s.substring(1, s.length()-1); + } else { + return s; + } + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + expression.addParameter("\""+echo.replace("\"", "\\\"")+"\""); + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()); + } + + public void setStreamContext(StreamContext context) { + } + + public List children() { + List l = new ArrayList(); + return l; + } + + public void open() throws IOException { + + } + + public void close() throws IOException { + } + + public Tuple read() throws IOException { + + if(finished) { + HashMap m = new HashMap(); + m.put("EOF", true); + Tuple tuple = new Tuple(m); + return tuple; + } else { + HashMap m = new HashMap(); + m.put("echo", echo); + Tuple tuple = new Tuple(m); + finished = true; + return tuple; + } + } + + /** Return the stream sort - ie, the order in which records are returned */ + public StreamComparator getStreamSort(){ + return null; + } + + public int getCost() { + return 0; + } + + +} \ No newline at end of file diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 3c651531157..8efc37d8847 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -4918,6 +4918,98 @@ public class StreamExpressionTest extends SolrCloudTestCase { } } + @Test + public void testEchoStream() throws Exception { + String expr = "echo(hello world)"; + ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", expr); + paramsLoc.set("qt", "/stream"); + + String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; + TupleStream solrStream = new SolrStream(url, paramsLoc); + + StreamContext context = new StreamContext(); + solrStream.setStreamContext(context); + List tuples = getTuples(solrStream); + assertTrue(tuples.size() == 1); + String s = (String)tuples.get(0).get("echo"); + assertTrue(s.equals("hello world")); + + expr = "echo(\"hello world\")"; + paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", expr); + paramsLoc.set("qt", "/stream"); + + solrStream = new SolrStream(url, paramsLoc); + + solrStream.setStreamContext(context); + tuples = getTuples(solrStream); + assertTrue(tuples.size() == 1); + s = (String)tuples.get(0).get("echo"); + assertTrue(s.equals("hello world")); + + expr = "echo(\"hello, world\")"; + paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", expr); + paramsLoc.set("qt", "/stream"); + + solrStream = new SolrStream(url, paramsLoc); + + solrStream.setStreamContext(context); + tuples = getTuples(solrStream); + assertTrue(tuples.size() == 1); + s = (String)tuples.get(0).get("echo"); + assertTrue(s.equals("hello, world")); + + expr = "echo(\"hello, \\\"t\\\" world\")"; + paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", expr); + paramsLoc.set("qt", "/stream"); + + solrStream = new SolrStream(url, paramsLoc); + + solrStream.setStreamContext(context); + tuples = getTuples(solrStream); + assertTrue(tuples.size() == 1); + s = (String)tuples.get(0).get("echo"); + + assertTrue(s.equals("hello, \"t\" world")); + + expr = "parallel("+COLLECTIONORALIAS+", workers=2, sort=\"echo asc\", echo(\"hello, \\\"t\\\" world\"))"; + paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", expr); + paramsLoc.set("qt", "/stream"); + + solrStream = new SolrStream(url, paramsLoc); + + solrStream.setStreamContext(context); + tuples = getTuples(solrStream); + assertTrue(tuples.size() == 2); + s = (String)tuples.get(0).get("echo"); + assertTrue(s.equals("hello, \"t\" world")); + s = (String)tuples.get(1).get("echo"); + assertTrue(s.equals("hello, \"t\" world")); + + expr = "echo(\"tuytuy iuyiuyi iuyiuyiu iuyiuyiuyiu iuyi iuyiyiuy iuyiuyiu iyiuyiu iyiuyiuyyiyiu yiuyiuyi" + + " yiuyiuyi yiuyiuuyiu yiyiuyiyiu iyiuyiuyiuiuyiu yiuyiuyi yiuyiy yiuiyiuiuy\")"; + paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", expr); + paramsLoc.set("qt", "/stream"); + + solrStream = new SolrStream(url, paramsLoc); + + solrStream.setStreamContext(context); + tuples = getTuples(solrStream); + assertTrue(tuples.size() == 1); + s = (String)tuples.get(0).get("echo"); + + assertTrue(s.equals("tuytuy iuyiuyi iuyiuyiu iuyiuyiuyiu iuyi iuyiyiuy iuyiuyiu iyiuyiu iyiuyiuyyiyiu yiuyiuyi yiuyiuyi " + + "yiuyiuuyiu yiyiuyiyiu iyiuyiuyiuiuyiu yiuyiuyi yiuyiy yiuiyiuiuy")); + + + + } + @Test public void testConvertEvaluator() throws Exception {