From cd02dd7d4afcbd1dbbe20b1e5c700d4b7c8a8302 Mon Sep 17 00:00:00 2001 From: Joel Bernstein Date: Mon, 24 Apr 2017 17:27:37 -0400 Subject: [PATCH] SOLR-10559: Add let and get Streaming Expressions --- .../apache/solr/handler/StreamHandler.java | 3 +- .../client/solrj/io/stream/CellStream.java | 4 + .../client/solrj/io/stream/GetStream.java | 117 ++++++++++++++ .../client/solrj/io/stream/LetStream.java | 152 ++++++++++++++++++ .../client/solrj/io/stream/StreamContext.java | 7 + .../solrj/io/stream/StreamExpressionTest.java | 76 +++++++++ 6 files changed, 358 insertions(+), 1 deletion(-) create mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/GetStream.java create mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 155933d8374..a1f799374a4 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -163,7 +163,8 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, .withFunctionName("echo", EchoStream.class) .withFunctionName("cell", CellStream.class) .withFunctionName("list", ListStream.class) - + .withFunctionName("let", LetStream.class) + .withFunctionName("get", GetStream.class) // metrics .withFunctionName("min", MinMetric.class) .withFunctionName("max", MaxMetric.class) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java index aad99f63baf..fd33737c29d 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java @@ -56,6 +56,10 @@ public class CellStream extends TupleStream implements Expressible { init(name, tupleStream); } + public String getName() { + return this.name; + } + private void init(String name, TupleStream tupleStream) { this.name = name; this.stream = tupleStream; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/GetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/GetStream.java new file mode 100644 index 00000000000..5a89f0ff2da --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/GetStream.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +public class GetStream extends TupleStream implements Expressible { + + private static final long serialVersionUID = 1; + + private StreamContext streamContext; + private String name; + private Iterator tupleIterator; + + public GetStream(String name) throws IOException { + init(name); + } + + public GetStream(StreamExpression expression, StreamFactory factory) throws IOException { + String name = factory.getValueOperand(expression, 0); + init(name); + } + + private void init(String name) { + this.name = name; + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + expression.addParameter(name); + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_SOURCE); + explanation.setExpression(toExpression(factory, false).toString()); + return explanation; + } + + public void setStreamContext(StreamContext context) { + this.streamContext = context; + } + + public List children() { + List l = new ArrayList(); + return l; + } + + public Tuple read() throws IOException { + Map map = new HashMap(); + if(tupleIterator.hasNext()) { + Tuple t = tupleIterator.next(); + map.putAll(t.fields); + return new Tuple(map); + } else { + map.put("EOF", true); + return new Tuple(map); + } + } + + public void close() throws IOException { + } + + public void open() throws IOException { + Map> lets = streamContext.getLets(); + List tuples = lets.get(name); + tupleIterator = tuples.iterator(); + } + + /** Return the stream sort - ie, the order in which records are returned */ + public StreamComparator getStreamSort(){ + return null; + } + + public int getCost() { + return 0; + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java new file mode 100644 index 00000000000..3a17211c5ef --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +public class LetStream extends TupleStream implements Expressible { + + private static final long serialVersionUID = 1; + private TupleStream stream; + private List cellStreams; + private StreamContext streamContext; + + public LetStream(TupleStream stream, List cellStreams) throws IOException { + init(stream, cellStreams); + } + + public LetStream(StreamExpression expression, StreamFactory factory) throws IOException { + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); + + if(streamExpressions.size() < 2){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting atleast 2 streams but found %d",expression, streamExpressions.size())); + } + + TupleStream stream = null; + List cellStreams = new ArrayList(); + + for(StreamExpression streamExpression : streamExpressions) { + TupleStream s = factory.constructStream(streamExpression); + if(s instanceof CellStream) { + cellStreams.add((CellStream)s); + } else { + if(stream == null) { + stream = s; + } else { + throw new IOException("Found more then one stream that was not a CellStream"); + } + } + } + + init(stream, cellStreams); + } + + private void init(TupleStream _stream, List _cellStreams) { + this.stream = _stream; + this.cellStreams = _cellStreams; + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + expression.addParameter(((Expressible) stream).toExpression(factory)); + for(CellStream cellStream : cellStreams) { + expression.addParameter(((Expressible)cellStream).toExpression(factory)); + } + + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_DECORATOR); + explanation.setExpression(toExpression(factory, false).toString()); + explanation.addChild(stream.toExplanation(factory)); + + return explanation; + } + + public void setStreamContext(StreamContext context) { + this.streamContext = context; + this.stream.setStreamContext(context); + } + + public List children() { + List l = new ArrayList(); + l.add(stream); + + return l; + } + + public Tuple read() throws IOException { + return stream.read(); + } + + public void close() throws IOException { + stream.close(); + } + + public void open() throws IOException { + Map> lets = streamContext.getLets(); + for(CellStream cellStream : cellStreams) { + try { + cellStream.setStreamContext(streamContext); + cellStream.open(); + Tuple tup = cellStream.read(); + String name = cellStream.getName(); + List tuples = (List)tup.get(name); + lets.put(name, tuples); + } finally { + cellStream.close(); + } + } + stream.open(); + } + + /** Return the stream sort - ie, the order in which records are returned */ + public StreamComparator getStreamSort(){ + return null; + } + + public int getCost() { + return 0; + } + + +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java index 60a92749e10..5dcc7b32e86 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java @@ -19,8 +19,10 @@ package org.apache.solr.client.solrj.io.stream; import java.io.Serializable; import java.util.Map; import java.util.HashMap; +import java.util.List; import org.apache.solr.client.solrj.io.ModelCache; +import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; @@ -37,12 +39,17 @@ public class StreamContext implements Serializable{ private Map entries = new HashMap(); private Map tupleContext = new HashMap(); + private Map> lets = new HashMap(); public int workerID; public int numWorkers; private SolrClientCache clientCache; private ModelCache modelCache; private StreamFactory streamFactory; + public Map> getLets(){ + return lets; + } + public Object get(Object key) { return entries.get(key); } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index bb771b6f237..51c5301976d 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -5099,6 +5099,82 @@ public class StreamExpressionTest extends SolrCloudTestCase { } + @Test + public void testLetGetStream() throws Exception { + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.add(id, "hello", "test_t", "l b c d c e"); + updateRequest.add(id, "hello1", "test_t", "l b c d c"); + + updateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS); + + String expr = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=\"id,test_t\", sort=\"id desc\")"; + String cat = "let(cell(results,"+expr+"), get(results))"; + ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", cat); + paramsLoc.set("qt", "/stream"); + + String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; + TupleStream solrStream = new SolrStream(url, paramsLoc); + + StreamContext context = new StreamContext(); + solrStream.setStreamContext(context); + List tuples = getTuples(solrStream); + assertTrue(tuples.size() == 2); + assertTrue(tuples.get(0).get("id").equals("hello1")); + assertTrue(tuples.get(0).get("test_t").equals("l b c d c")); + assertTrue(tuples.get(1).get("id").equals("hello")); + assertTrue(tuples.get(1).get("test_t").equals("l b c d c e")); + + + //Test there are no side effects when transforming tuples. + expr = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=\"id,test_t\", sort=\"id desc\")"; + cat = "let(cell(results,"+expr+"), list(select(get(results), id as newid, test_t), get(results)))"; + paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", cat); + paramsLoc.set("qt", "/stream"); + + solrStream = new SolrStream(url, paramsLoc); + + context = new StreamContext(); + solrStream.setStreamContext(context); + tuples = getTuples(solrStream); + assertTrue(tuples.size() == 4); + assertTrue(tuples.get(0).get("newid").equals("hello1")); + assertTrue(tuples.get(0).get("test_t").equals("l b c d c")); + assertTrue(tuples.get(1).get("newid").equals("hello")); + assertTrue(tuples.get(1).get("test_t").equals("l b c d c e")); + assertTrue(tuples.get(2).get("id").equals("hello1")); + assertTrue(tuples.get(2).get("test_t").equals("l b c d c")); + assertTrue(tuples.get(3).get("id").equals("hello")); + assertTrue(tuples.get(3).get("test_t").equals("l b c d c e")); + + //Test multiple lets + + //Test there are no side effects when transforming tuples. + expr = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=\"id,test_t\", sort=\"id desc\")"; + String expr1 = "search("+COLLECTIONORALIAS+", q=\"*:*\", fl=\"id,test_t\", sort=\"id asc\")"; + + cat = "let(cell(results,"+expr+"), cell(results1,"+expr1+"), list(select(get(results), id as newid, test_t), get(results1)))"; + paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", cat); + paramsLoc.set("qt", "/stream"); + + solrStream = new SolrStream(url, paramsLoc); + + context = new StreamContext(); + solrStream.setStreamContext(context); + tuples = getTuples(solrStream); + assertTrue(tuples.size() == 4); + assertTrue(tuples.get(0).get("newid").equals("hello1")); + assertTrue(tuples.get(0).get("test_t").equals("l b c d c")); + assertTrue(tuples.get(1).get("newid").equals("hello")); + assertTrue(tuples.get(1).get("test_t").equals("l b c d c e")); + assertTrue(tuples.get(2).get("id").equals("hello")); + assertTrue(tuples.get(2).get("test_t").equals("l b c d c e")); + assertTrue(tuples.get(3).get("id").equals("hello1")); + assertTrue(tuples.get(3).get("test_t").equals("l b c d c")); + } + @Test public void testConvertEvaluator() throws Exception {