diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index aaf15baddec..c5640a408d5 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -103,6 +103,8 @@ New Features * SOLR-8188: Adds Hash and OuterHash Joins to the Streaming API and Streaming Expressions (Dennis Gove) +* SOLR-7669: Add SelectStream and Tuple Operations to the Streaming API and Streaming Expressions (Dennis Gove) + Optimizations ---------------------- * SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been diff --git a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java index da5480d0e2f..cb709a26365 100644 --- a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java @@ -28,6 +28,7 @@ import com.facebook.presto.sql.ExpressionFormatter; import com.facebook.presto.sql.tree.*; import com.google.common.base.Strings; import com.google.common.collect.Iterables; + import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.ComparatorOrder; import org.apache.solr.client.solrj.io.comp.FieldComparator; @@ -40,8 +41,8 @@ import org.apache.solr.client.solrj.io.stream.CloudSolrStream; import org.apache.solr.client.solrj.io.stream.FacetStream; import org.apache.solr.client.solrj.io.stream.ParallelStream; import org.apache.solr.client.solrj.io.stream.RankStream; -import org.apache.solr.client.solrj.io.stream.EditStream; import org.apache.solr.client.solrj.io.stream.RollupStream; +import org.apache.solr.client.solrj.io.stream.SelectStream; import org.apache.solr.client.solrj.io.stream.StatsStream; import org.apache.solr.client.solrj.io.stream.StreamContext; import org.apache.solr.client.solrj.io.stream.TupleStream; @@ -58,13 +59,14 @@ import org.apache.solr.core.SolrCore; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.util.plugin.SolrCoreAware; + import java.util.List; import java.util.Map; import java.util.HashMap; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.facebook.presto.sql.parser.SqlParser; public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { @@ -449,7 +451,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware { tupleStream = new LimitStream(tupleStream, sqlVisitor.limit); } - return new EditStream(tupleStream, remove); + return new SelectStream(tupleStream, sqlVisitor.fields); } private static TupleStream doGroupByWithAggregatesFacets(SQLVisitor sqlVisitor) throws IOException { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java index 48cb68340cd..22bc5882dac 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java @@ -62,8 +62,8 @@ public class Tuple implements Cloneable { public void put(Object key, Object value) { this.fields.put(key, value); } - - public void remove(Object key) { + + public void remove(Object key){ this.fields.remove(key); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java index 4695829ef95..695b935613c 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java @@ -17,6 +17,8 @@ package org.apache.solr.client.solrj.io.comp; +import java.util.Map; + import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -74,7 +76,7 @@ public class FieldComparator implements StreamComparator { StringBuilder sb = new StringBuilder(); sb.append(leftFieldName); - if(!leftFieldName.equals(rightFieldName)){ + if(hasDifferentFieldNames()){ sb.append("="); sb.append(rightFieldName); } @@ -149,4 +151,13 @@ public class FieldComparator implements StreamComparator { return false; } + + @Override + public FieldComparator copyAliased(Map aliases){ + return new FieldComparator( + aliases.containsKey(leftFieldName) ? aliases.get(leftFieldName) : leftFieldName, + aliases.containsKey(rightFieldName) ? aliases.get(rightFieldName) : rightFieldName, + order + ); + } } \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultipleFieldComparator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultipleFieldComparator.java index d81d053d92a..5dd3e8d3cbf 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultipleFieldComparator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultipleFieldComparator.java @@ -18,8 +18,8 @@ package org.apache.solr.client.solrj.io.comp; import java.io.IOException; -import java.io.Serializable; -import java.util.Comparator; +import java.util.List; +import java.util.Map; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.stream.expr.Expressible; @@ -92,4 +92,15 @@ public class MultipleFieldComparator implements StreamComparator { return false; } + + @Override + public MultipleFieldComparator copyAliased(Map aliases){ + StreamComparator[] aliasedComps = new StreamComparator[comps.length]; + + for(int idx = 0; idx < comps.length; ++idx){ + aliasedComps[idx] = comps[idx].copyAliased(aliases); + } + + return new MultipleFieldComparator(aliasedComps); + } } \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java index e3e4620ffd9..c5fe11b741b 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/StreamComparator.java @@ -19,14 +19,13 @@ package org.apache.solr.client.solrj.io.comp; import java.io.Serializable; import java.util.Comparator; +import java.util.Map; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.stream.expr.Expressible; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; -import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; /** Defines a comparator we can use with TupleStreams */ public interface StreamComparator extends Comparator, Expressible, Serializable { public boolean isDerivedFrom(StreamComparator base); + public StreamComparator copyAliased(Map aliases); } \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/package-info.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/package-info.java index a28e5104a25..fd525ded618 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/package-info.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/package-info.java @@ -16,6 +16,6 @@ */ /** - * Primary APIs for communicating with a Solr Server from a Java client. + * Equalitors for the Streaming Aggregation API **/ package org.apache.solr.client.solrj.io.eq; \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceOperation.java new file mode 100644 index 00000000000..7ccd9da5b3d --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceOperation.java @@ -0,0 +1,84 @@ +package org.apache.solr.client.solrj.io.ops; + +import java.io.IOException; +import java.util.Locale; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Replaces some tuple value with another. The replacement value can be either a given value or the + * value of another field in the tuple. The expression for a replace operation can be of multiple forms: + * replace(fieldA, 0, withValue=100) // for fieldA if equals 0 then set to 100 + * replace(fieldA, null, withValue=0) // for fieldA if null then set to 0 + * replace(fieldA, null, withField=fieldB) // for fieldA if null then set to the value of fieldB (if fieldB is null then fieldA will end up as null) + * replace(fieldA, 0, withField=fieldB) // for fieldA if 0 then set to the value of fieldB (if fieldB is 0 then fieldA will end up as 0) + * replace(fieldA, "Izzy and Kayden", withValue="my kids") + * + * You can also construct these without the field name in the expression but that does require that you provide the field name during construction. + * This is most useful during metric calculation because when calculating a metric you have already provided a field name in the metric so there + * is no reason to have to provide the field name again in the operation + * sum(fieldA, replace(null, withValue=0)) // performs the replacement on fieldA + * + * Equality is determined by the standard type .equals() functions. + */ +public class ReplaceOperation implements StreamOperation { + + private static final long serialVersionUID = 1; + private StreamOperation replacer; + + public ReplaceOperation(StreamExpression expression, StreamFactory factory) throws IOException { + this(null, expression, factory); + } + + public ReplaceOperation(String forField, StreamExpression expression, StreamFactory factory) throws IOException { + + StreamExpressionNamedParameter withValue = factory.getNamedOperand(expression, "withValue"); + StreamExpressionNamedParameter withField = factory.getNamedOperand(expression, "withField"); + + if(null != withValue && null == withField){ + replacer = new ReplaceWithValueOperation(forField, expression, factory); + } + else if(null != withField && null == withValue){ + replacer = new ReplaceWithFieldOperation(forField, expression, factory); + } + else if(null != withValue && null != withField){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting either withValue or withField parameter but found both", expression)); + } + else{ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting withValue or withField parameter but found neither", expression)); + } + + } + + @Override + public void operate(Tuple tuple) { + replacer.operate(tuple); + } + + @Override + public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + return replacer.toExpression(factory); + } + +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceWithFieldOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceWithFieldOperation.java new file mode 100644 index 00000000000..4b2cc0569e9 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceWithFieldOperation.java @@ -0,0 +1,112 @@ +package org.apache.solr.client.solrj.io.ops; + +import java.io.IOException; +import java.util.Locale; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of replace(...., withField=fieldName) + * See ReplaceOperation for description. + */ +public class ReplaceWithFieldOperation implements StreamOperation { + + private static final long serialVersionUID = 1; + + private boolean wasBuiltWithFieldName; + private String originalFieldName; + private Object originalValue; + private String replacementFieldName; + + public ReplaceWithFieldOperation(String forField, StreamExpression expression, StreamFactory factory) throws IOException { + + if(2 == expression.getParameters().size()){ + wasBuiltWithFieldName = false; + + this.originalFieldName = forField; + this.originalValue = factory.constructPrimitiveObject(factory.getValueOperand(expression, 0)); + + } + else if(3 == expression.getParameters().size()){ + wasBuiltWithFieldName = true; + + this.originalFieldName = factory.getValueOperand(expression, 0); + this.originalValue = factory.constructPrimitiveObject(factory.getValueOperand(expression, 1)); + } + else{ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression)); + } + + StreamExpressionNamedParameter replacementParameter = factory.getNamedOperand(expression, "withField"); + if(null == replacementParameter){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a parameter named 'withField' but didn't find one.", expression)); + } + if(!(replacementParameter.getParameter() instanceof StreamExpressionValue)){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting parameter named 'withField' to be a field name.", expression)); + } + + this.replacementFieldName = ((StreamExpressionValue)replacementParameter.getParameter()).getValue(); + } + + @Override + public void operate(Tuple tuple) { + if(matchesOriginal(tuple)){ + replace(tuple); + } + } + + private boolean matchesOriginal(Tuple tuple){ + Object value = tuple.get(originalFieldName); + + if(null == value){ + return null == originalValue; + } + else if(null != originalValue){ + return originalValue.equals(value); + } + + return false; + } + + private void replace(Tuple tuple){ + tuple.put(originalFieldName, tuple.get(replacementFieldName)); + } + + @Override + public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + if(wasBuiltWithFieldName){ + expression.addParameter(originalFieldName); + } + + expression.addParameter(null == originalValue ? "null" : originalValue.toString()); + expression.addParameter(new StreamExpressionNamedParameter("withField", replacementFieldName)); + + return expression; + } + +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceWithValueOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceWithValueOperation.java new file mode 100644 index 00000000000..c3dc18474cd --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceWithValueOperation.java @@ -0,0 +1,117 @@ +package org.apache.solr.client.solrj.io.ops; + +import java.io.IOException; +import java.util.Locale; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of replace(...., withValue="some value") + * See ReplaceOperation for description. + */ +public class ReplaceWithValueOperation implements StreamOperation { + + private static final long serialVersionUID = 1; + + private boolean wasBuiltWithFieldName; + private String fieldName; + private Object original; + private Object replacement; + + public ReplaceWithValueOperation(String forField, StreamExpression expression, StreamFactory factory) throws IOException { + + if(2 == expression.getParameters().size()){ + wasBuiltWithFieldName = false; + + this.fieldName = forField; + this.original = factory.constructPrimitiveObject(factory.getValueOperand(expression, 0)); + + } + else if(3 == expression.getParameters().size()){ + wasBuiltWithFieldName = true; + + this.fieldName = factory.getValueOperand(expression, 0); + this.original = factory.constructPrimitiveObject(factory.getValueOperand(expression, 1)); + } + else{ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression)); + } + + StreamExpressionNamedParameter replacementParameter = factory.getNamedOperand(expression, "withValue"); + if(null == replacementParameter){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a parameter named 'withValue' but didn't find one.", expression)); + } + if(!(replacementParameter.getParameter() instanceof StreamExpressionValue)){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting parameter named 'withValue' to be a primitive type.", expression)); + } + + this.replacement = factory.constructPrimitiveObject(((StreamExpressionValue)replacementParameter.getParameter()).getValue()); + } + + @Override + public void operate(Tuple tuple) { + if(matchesOriginal(tuple)){ + replace(tuple); + } + } + + private boolean matchesOriginal(Tuple tuple){ + Object value = tuple.get(fieldName); + + if(null == value){ + return null == original; + } + else if(null != original){ + return original.equals(value); + } + + return false; + } + + private void replace(Tuple tuple){ + if(null == replacement){ + tuple.remove(fieldName); + } + else{ + tuple.put(fieldName, replacement); + } + } + + @Override + public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + if(wasBuiltWithFieldName){ + expression.addParameter(fieldName); + } + + expression.addParameter(null == original ? "null" : original.toString()); + expression.addParameter(new StreamExpressionNamedParameter("withValue", null == replacement ? "null" : replacement.toString())); + + return expression; + } + +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/StreamOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/StreamOperation.java new file mode 100644 index 00000000000..a9d381ee896 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/StreamOperation.java @@ -0,0 +1,33 @@ +package org.apache.solr.client.solrj.io.ops; + +import java.io.IOException; +import java.io.Serializable; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Interface for any operation one can perform on a tuple in a TupleStream + */ +public interface StreamOperation extends Expressible, Serializable { + public void operate(Tuple tuple); +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/package-info.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/package-info.java new file mode 100644 index 00000000000..0e9ffd08129 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Operations for the Streaming Aggregation API + **/ +package org.apache.solr.client.solrj.io.ops; + + + + diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EditStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EditStream.java deleted file mode 100644 index d0aa8415512..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EditStream.java +++ /dev/null @@ -1,76 +0,0 @@ -package org.apache.solr.client.solrj.io.stream; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.comp.StreamComparator; - -public class EditStream extends TupleStream { - - private static final long serialVersionUID = 1; - private TupleStream stream; - private List remove; - - public EditStream(TupleStream stream, List remove) { - this.stream = stream; - this.remove = remove; - } - - public void setStreamContext(StreamContext context) { - this.stream.setStreamContext(context); - } - - public List children() { - List l = new ArrayList(); - l.add(stream); - return l; - } - - public void open() throws IOException { - stream.open(); - } - - public void close() throws IOException { - stream.close(); - } - - public Tuple read() throws IOException { - Tuple tuple = stream.read(); - if(tuple.EOF) { - return tuple; - } else { - for(String key : remove) { - tuple.remove(key); - } - - return tuple; - } - } - - public StreamComparator getStreamSort() { - return stream.getStreamSort(); - } - - public int getCost() { - return 0; - } -} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java new file mode 100644 index 00000000000..ac8d68e751f --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.function.BiConsumer; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.FieldComparator; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.ops.StreamOperation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +/** +* Selects fields from the incoming stream and applies optional field renaming. +* Does not reorder the outgoing stream. +**/ + + +public class SelectStream extends TupleStream implements Expressible { + + private static final long serialVersionUID = 1; + + private TupleStream stream; + private Map selectedFields; + private List operations; + + public SelectStream(TupleStream stream, List selectedFields) throws IOException { + this.stream = stream; + this.selectedFields = new HashMap<>(); + for(String selectedField : selectedFields){ + this.selectedFields.put(selectedField, selectedField); + } + operations = new ArrayList<>(); + } + + public SelectStream(TupleStream stream, Map selectedFields) throws IOException { + this.stream = stream; + this.selectedFields = selectedFields; + operations = new ArrayList<>(); + } + + public SelectStream(StreamExpression expression,StreamFactory factory) throws IOException { + // grab all parameters out + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); + List selectFieldsExpressions = factory.getOperandsOfType(expression, StreamExpressionValue.class); + List operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, StreamOperation.class); + + // validate expression contains only what we want. + if(expression.getParameters().size() != streamExpressions.size() + selectFieldsExpressions.size() + operationExpressions.size()){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression)); + } + + if(1 != streamExpressions.size()){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single stream but found %d (must be TupleStream types)",expression, streamExpressions.size())); + } + + if(0 == selectFieldsExpressions.size()){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting at least one select field but found %d",expression, streamExpressions.size())); + } + + stream = factory.constructStream(streamExpressions.get(0)); + + selectedFields = new HashMap(selectFieldsExpressions.size()); + for(StreamExpressionParameter parameter : selectFieldsExpressions){ + StreamExpressionValue selectField = (StreamExpressionValue)parameter; + String value = selectField.getValue().trim().toLowerCase(Locale.ROOT); + if(value.contains(" as ")){ + String[] parts = value.split(" as "); + if(2 != parts.length){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting select field of form 'fieldA' or 'fieldA as alias' but found %s",expression, value)); + } + selectedFields.put(parts[0].trim(), parts[1].trim()); + } + else{ + selectedFields.put(value,value); + } + } + + operations = new ArrayList<>(); + for(StreamExpression expr : operationExpressions){ + operations.add(factory.constructOperation(expr)); + } + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + // stream + if(stream instanceof Expressible){ + expression.addParameter(((Expressible)stream).toExpression(factory)); + } + else{ + throw new IOException("This SelectStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + + // selects + for(Map.Entry selectField : selectedFields.entrySet()) { + if(selectField.getKey().equals(selectField.getValue())){ + expression.addParameter(selectField.getKey()); + } + else{ + expression.addParameter(String.format(Locale.ROOT, "%s as %s", selectField.getKey(), selectField.getValue())); + } + } + + for(StreamOperation operation : operations){ + expression.addParameter(operation.toExpression(factory)); + } + + return expression; + } + + public void setStreamContext(StreamContext context) { + this.stream.setStreamContext(context); + } + + public List children() { + List l = new ArrayList(); + l.add(stream); + return l; + } + + public void open() throws IOException { + stream.open(); + } + + public void close() throws IOException { + stream.close(); + } + + public Tuple read() throws IOException { + Tuple original = stream.read(); + + if(original.EOF){ + return original; + } + + // create a copy with the limited set of fields + Tuple working = new Tuple(new HashMap<>()); + for(Object fieldName : original.fields.keySet()){ + if(selectedFields.containsKey(fieldName)){ + working.put(selectedFields.get(fieldName), original.get(fieldName)); + } + } + + // apply all operations + for(StreamOperation operation : operations){ + operation.operate(working); + } + + return working; + } + + /** Return the stream sort - ie, the order in which records are returned */ + public StreamComparator getStreamSort(){ + // apply aliasing to comparator + return stream.getStreamSort().copyAliased(selectedFields); + } + + public int getCost() { + return 0; + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java index 26064ce24f8..425fe8a8c96 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java @@ -16,6 +16,7 @@ import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor; import org.apache.solr.client.solrj.io.eq.StreamEqualitor; +import org.apache.solr.client.solrj.io.ops.StreamOperation; import org.apache.solr.client.solrj.io.stream.TupleStream; import org.apache.solr.client.solrj.io.stream.metrics.Metric; @@ -281,6 +282,22 @@ public class StreamFactory implements Serializable { return (StreamEqualitor)createInstance(equalitorType, new Class[]{ String.class, String.class }, new Object[]{ leftFieldName, rightFieldName }); } } + + public Metric constructOperation(String expressionClause) throws IOException { + return constructMetric(StreamExpressionParser.parse(expressionClause)); + } + public StreamOperation constructOperation(StreamExpression expression) throws IOException{ + String function = expression.getFunctionName(); + if(functionNames.containsKey(function)){ + Class clazz = functionNames.get(function); + if(Expressible.class.isAssignableFrom(clazz) && StreamOperation.class.isAssignableFrom(clazz)){ + return (StreamOperation)createInstance(functionNames.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this}); + } + } + + throw new IOException(String.format(Locale.ROOT,"Invalid operation expression %s - function '%s' is unknown (not mapped to a valid StreamOperation)", expression, expression.getFunctionName())); + } + public T createInstance(Class clazz, Class[] paramTypes, Object[] params) throws IOException{ Constructor ctor; @@ -307,4 +324,17 @@ public class StreamFactory implements Serializable { throw new IOException(String.format(Locale.ROOT, "Unable to find function name for class '%s'", clazz.getName())); } + + public Object constructPrimitiveObject(String original){ + String lower = original.trim().toLowerCase(Locale.ROOT); + + if("null".equals(lower)){ return null; } + if("true".equals(lower) || "false".equals(lower)){ return Boolean.parseBoolean(lower); } + try{ return Long.valueOf(original); } catch(Exception e){}; + try{ if (original.matches(".{1,8}")){ return Float.valueOf(original); }} catch(Exception e){}; + try{ if (original.matches(".{1,17}")){ return Double.valueOf(original); }} catch(Exception e){}; + + // is a string + return original; + } } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 02354e7d830..a46a95d21c2 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -21,11 +21,13 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Map; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.ops.ReplaceOperation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; @@ -134,6 +136,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { testLeftOuterJoinStream(); testHashJoinStream(); testOuterHashJoinStream(); + testSelectStream(); } private void testCloudSolrStream() throws Exception { @@ -1281,6 +1284,104 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { commit(); } + private void testSelectStream() throws Exception { + + indexr(id, "1", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9 + indexr(id, "15", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9 + indexr(id, "2", "side_s", "left", "join1_i", "0", "join2_s", "b", "ident_s", "left_2"); + indexr(id, "3", "side_s", "left", "join1_i", "1", "join2_s", "a", "ident_s", "left_3"); // 10 + indexr(id, "4", "side_s", "left", "join1_i", "1", "join2_s", "b", "ident_s", "left_4"); // 11 + indexr(id, "5", "side_s", "left", "join1_i", "1", "join2_s", "c", "ident_s", "left_5"); // 12 + indexr(id, "6", "side_s", "left", "join1_i", "2", "join2_s", "d", "ident_s", "left_6"); + indexr(id, "7", "side_s", "left", "join1_i", "3", "join2_s", "e", "ident_s", "left_7"); // 14 + + indexr(id, "8", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_1", "join3_i", "0"); // 1,15 + indexr(id, "9", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_2", "join3_i", "0"); // 1,15 + indexr(id, "10", "side_s", "right", "join1_i", "1", "join2_s", "a", "ident_s", "right_3", "join3_i", "1"); // 3 + indexr(id, "11", "side_s", "right", "join1_i", "1", "join2_s", "b", "ident_s", "right_4", "join3_i", "1"); // 4 + indexr(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1"); // 5 + indexr(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2"); + indexr(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3"); // 7 + commit(); + + String clause; + TupleStream stream; + List tuples; + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", zkServer.getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("innerJoin", InnerJoinStream.class) + .withFunctionName("select", SelectStream.class) + .withFunctionName("replace", ReplaceOperation.class); + + // Basic test + clause = "select(" + + "id, join1_i as join1, join2_s as join2, ident_s as identity," + + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\")" + + ")"; + stream = factory.constructStream(clause); + tuples = getTuples(stream); + assertFields(tuples, "id", "join1", "join2", "identity"); + assertNotFields(tuples, "join1_i", "join2_s", "ident_s"); + + // Basic with replacements test + clause = "select(" + + "id, join1_i as join1, join2_s as join2, ident_s as identity," + + "replace(join1, 0, withValue=12), replace(join1, 3, withValue=12), replace(join1, 2, withField=join2)," + + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\")" + + ")"; + stream = factory.constructStream(clause); + tuples = getTuples(stream); + assertFields(tuples, "id", "join1", "join2", "identity"); + assertNotFields(tuples, "join1_i", "join2_s", "ident_s"); + assertLong(tuples.get(0), "join1", 12); + assertLong(tuples.get(1), "join1", 12); + assertLong(tuples.get(2), "join1", 12); + assertLong(tuples.get(7), "join1", 12); + assertString(tuples.get(6), "join1", "d"); + + + // Inner stream test + clause = "innerJoin(" + + "select(" + + "id, join1_i as left.join1, join2_s as left.join2, ident_s as left.ident," + + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\")" + + ")," + + "select(" + + "join3_i as right.join1, join2_s as right.join2, ident_s as right.ident," + + "search(collection1, q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\")," + + ")," + + "on=\"left.join1=right.join1, left.join2=right.join2\"" + + ")"; + stream = factory.constructStream(clause); + tuples = getTuples(stream); + assertFields(tuples, "id", "left.join1", "left.join2", "left.ident", "right.join1", "right.join2", "right.ident"); + + // Wrapped select test + clause = "select(" + + "id, left.ident, right.ident," + + "innerJoin(" + + "select(" + + "id, join1_i as left.join1, join2_s as left.join2, ident_s as left.ident," + + "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\")" + + ")," + + "select(" + + "join3_i as right.join1, join2_s as right.join2, ident_s as right.ident," + + "search(collection1, q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\")," + + ")," + + "on=\"left.join1=right.join1, left.join2=right.join2\"" + + ")" + + ")"; + stream = factory.constructStream(clause); + tuples = getTuples(stream); + assertFields(tuples, "id", "left.ident", "right.ident"); + assertNotFields(tuples, "left.join1", "left.join2", "right.join1", "right.join2"); + + del("*:*"); + commit(); + } + protected List getTuples(TupleStream tupleStream) throws IOException { tupleStream.open(); List tuples = new ArrayList(); @@ -1305,6 +1406,27 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase { } return true; } + + protected boolean assertFields(List tuples, String ... fields) throws Exception{ + for(Tuple tuple : tuples){ + for(String field : fields){ + if(!tuple.fields.containsKey(field)){ + throw new Exception(String.format(Locale.ROOT, "Expected field '%s' not found", field)); + } + } + } + return true; + } + protected boolean assertNotFields(List tuples, String ... fields) throws Exception{ + for(Tuple tuple : tuples){ + for(String field : fields){ + if(tuple.fields.containsKey(field)){ + throw new Exception(String.format(Locale.ROOT, "Unexpected field '%s' found", field)); + } + } + } + return true; + } protected boolean assertGroupOrder(Tuple tuple, int... ids) throws Exception { List group = (List)tuple.get("tuples"); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ops/OperationsTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ops/OperationsTest.java new file mode 100644 index 00000000000..48a83aec77b --- /dev/null +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ops/OperationsTest.java @@ -0,0 +1,251 @@ +package org.apache.solr.client.solrj.io.stream.ops; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Map; + +import junit.framework.Assert; + +import org.apache.commons.collections.map.HashedMap; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.ops.ReplaceOperation; +import org.apache.solr.client.solrj.io.ops.StreamOperation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.junit.Test; + +/** + **/ + +public class OperationsTest extends LuceneTestCase { + + StreamFactory factory; + Map values; + + public OperationsTest() { + super(); + + factory = new StreamFactory() + .withFunctionName("replace", ReplaceOperation.class); + values = new HashedMap(); + } + + @Test + public void replaceValueNullWithString() throws Exception{ + Tuple tuple; + StreamOperation operation; + + operation = new ReplaceOperation("fieldA", StreamExpressionParser.parse("replace(null, withValue=foo)"), factory); + + // replace + values.clear(); + values.put("fieldB", "bar"); + values.put("fieldC", 123); + tuple = new Tuple(values); + operation.operate(tuple); + + Assert.assertNotNull(tuple.get("fieldA")); + Assert.assertEquals("foo", tuple.get("fieldA")); + + // don't replace + values.clear(); + values.put("fieldA", "exists"); + values.put("fieldB", "bar"); + values.put("fieldC", 123); + tuple = new Tuple(values); + operation.operate(tuple); + + Assert.assertNotNull(tuple.get("fieldA")); + Assert.assertEquals("exists", tuple.get("fieldA")); + } + + @Test + public void replaceValueNullWithInt() throws Exception{ + Tuple tuple; + StreamOperation operation; + + operation = new ReplaceOperation("fieldA", StreamExpressionParser.parse("replace(null, withValue=123)"), factory); + + // replace + values.clear(); + values.put("fieldB", "bar"); + values.put("fieldC", (long)123); + tuple = new Tuple(values); + operation.operate(tuple); + + Assert.assertNotNull(tuple.get("fieldA")); + Assert.assertEquals((long)123, tuple.get("fieldA")); + + // don't replace + values.clear(); + values.put("fieldA", "exists"); + values.put("fieldB", "bar"); + values.put("fieldC", 123); + tuple = new Tuple(values); + operation.operate(tuple); + + Assert.assertNotNull(tuple.get("fieldA")); + Assert.assertEquals("exists", tuple.get("fieldA")); + } + + @Test + public void replaceValueNullWithFloat() throws Exception{ + Tuple tuple; + StreamOperation operation; + + operation = new ReplaceOperation("fieldA", StreamExpressionParser.parse("replace(null, withValue=123.45678)"), factory); + + // replace + values.clear(); + values.put("fieldB", "bar"); + values.put("fieldC", 123); + tuple = new Tuple(values); + operation.operate(tuple); + + Assert.assertNotNull(tuple.get("fieldA")); + Assert.assertEquals(123.45678, tuple.get("fieldA")); + + // don't replace + values.clear(); + values.put("fieldA", "exists"); + values.put("fieldB", "bar"); + values.put("fieldC", 123); + tuple = new Tuple(values); + operation.operate(tuple); + + Assert.assertNotNull(tuple.get("fieldA")); + Assert.assertEquals("exists", tuple.get("fieldA")); + } + + @Test + public void replaceValueNullWithDouble() throws Exception{ + Tuple tuple; + StreamOperation operation; + + operation = new ReplaceOperation("fieldA", StreamExpressionParser.parse("replace(null, withValue=123.45678912345)"), factory); + + // replace + values.clear(); + values.put("fieldB", "bar"); + values.put("fieldC", 123); + tuple = new Tuple(values); + operation.operate(tuple); + + Assert.assertNotNull(tuple.get("fieldA")); + Assert.assertEquals(123.45678912345, tuple.get("fieldA")); + + // don't replace + values.clear(); + values.put("fieldA", "exists"); + values.put("fieldB", "bar"); + values.put("fieldC", 123); + tuple = new Tuple(values); + operation.operate(tuple); + + Assert.assertNotNull(tuple.get("fieldA")); + Assert.assertEquals("exists", tuple.get("fieldA")); + } + + @Test + public void replaceFieldNullWithString() throws Exception{ + Tuple tuple; + StreamOperation operation; + + operation = new ReplaceOperation("fieldA", StreamExpressionParser.parse("replace(null, withField=fieldB)"), factory); + + // replace + values.clear(); + values.put("fieldB", "bar"); + values.put("fieldC", 123); + tuple = new Tuple(values); + operation.operate(tuple); + + Assert.assertNotNull(tuple.get("fieldA")); + Assert.assertEquals("bar", tuple.get("fieldA")); + + // don't replace + values.clear(); + values.put("fieldA", "exists"); + values.put("fieldB", "bar"); + values.put("fieldC", 123); + tuple = new Tuple(values); + operation.operate(tuple); + + Assert.assertNotNull(tuple.get("fieldA")); + Assert.assertEquals("exists", tuple.get("fieldA")); + } + + @Test + public void replaceFieldNullWithInt() throws Exception{ + Tuple tuple; + StreamOperation operation; + + operation = new ReplaceOperation("fieldA", StreamExpressionParser.parse("replace(null, withField=fieldC)"), factory); + + // replace + values.clear(); + values.put("fieldB", "bar"); + values.put("fieldC", 123); + tuple = new Tuple(values); + operation.operate(tuple); + + Assert.assertNotNull(tuple.get("fieldA")); + Assert.assertEquals(123, tuple.get("fieldA")); + + // don't replace + values.clear(); + values.put("fieldA", "exists"); + values.put("fieldB", "bar"); + values.put("fieldC", 123); + tuple = new Tuple(values); + operation.operate(tuple); + + Assert.assertNotNull(tuple.get("fieldA")); + Assert.assertEquals("exists", tuple.get("fieldA")); + } + + @Test + public void replaceFieldNullWithNonExistantField() throws Exception{ + Tuple tuple; + StreamOperation operation; + + operation = new ReplaceOperation("fieldA", StreamExpressionParser.parse("replace(null, withField=fieldD)"), factory); + + // replace + values.clear(); + values.put("fieldB", "bar"); + values.put("fieldC", 123); + tuple = new Tuple(values); + operation.operate(tuple); + + Assert.assertNull(tuple.get("fieldA")); + + // don't replace + values.clear(); + values.put("fieldA", "exists"); + values.put("fieldB", "bar"); + values.put("fieldC", 123); + tuple = new Tuple(values); + operation.operate(tuple); + + Assert.assertNotNull(tuple.get("fieldA")); + Assert.assertEquals("exists", tuple.get("fieldA")); + } + +}