SOLR-7669: Add SelectStream and Tuple Operations to the Streaming API and Streaming Expressions

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1713967 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Dennis Gove 2015-11-12 04:37:37 +00:00
parent 19715d10ef
commit 1161f2d018
17 changed files with 1002 additions and 88 deletions

View File

@ -103,6 +103,8 @@ New Features
* SOLR-8188: Adds Hash and OuterHash Joins to the Streaming API and Streaming Expressions (Dennis Gove)
* SOLR-7669: Add SelectStream and Tuple Operations to the Streaming API and Streaming Expressions (Dennis Gove)
Optimizations
----------------------
* SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been

View File

@ -28,6 +28,7 @@ import com.facebook.presto.sql.ExpressionFormatter;
import com.facebook.presto.sql.tree.*;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
@ -40,8 +41,8 @@ import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.FacetStream;
import org.apache.solr.client.solrj.io.stream.ParallelStream;
import org.apache.solr.client.solrj.io.stream.RankStream;
import org.apache.solr.client.solrj.io.stream.EditStream;
import org.apache.solr.client.solrj.io.stream.RollupStream;
import org.apache.solr.client.solrj.io.stream.SelectStream;
import org.apache.solr.client.solrj.io.stream.StatsStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
@ -58,13 +59,14 @@ import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.util.plugin.SolrCoreAware;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.facebook.presto.sql.parser.SqlParser;
public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
@ -449,7 +451,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware {
tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
}
return new EditStream(tupleStream, remove);
return new SelectStream(tupleStream, sqlVisitor.fields);
}
private static TupleStream doGroupByWithAggregatesFacets(SQLVisitor sqlVisitor) throws IOException {

View File

@ -62,8 +62,8 @@ public class Tuple implements Cloneable {
public void put(Object key, Object value) {
this.fields.put(key, value);
}
public void remove(Object key) {
public void remove(Object key){
this.fields.remove(key);
}

View File

@ -17,6 +17,8 @@
package org.apache.solr.client.solrj.io.comp;
import java.util.Map;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
@ -74,7 +76,7 @@ public class FieldComparator implements StreamComparator {
StringBuilder sb = new StringBuilder();
sb.append(leftFieldName);
if(!leftFieldName.equals(rightFieldName)){
if(hasDifferentFieldNames()){
sb.append("=");
sb.append(rightFieldName);
}
@ -149,4 +151,13 @@ public class FieldComparator implements StreamComparator {
return false;
}
@Override
public FieldComparator copyAliased(Map<String,String> aliases){
return new FieldComparator(
aliases.containsKey(leftFieldName) ? aliases.get(leftFieldName) : leftFieldName,
aliases.containsKey(rightFieldName) ? aliases.get(rightFieldName) : rightFieldName,
order
);
}
}

View File

@ -18,8 +18,8 @@
package org.apache.solr.client.solrj.io.comp;
import java.io.IOException;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
@ -92,4 +92,15 @@ public class MultipleFieldComparator implements StreamComparator {
return false;
}
@Override
public MultipleFieldComparator copyAliased(Map<String,String> aliases){
StreamComparator[] aliasedComps = new StreamComparator[comps.length];
for(int idx = 0; idx < comps.length; ++idx){
aliasedComps[idx] = comps[idx].copyAliased(aliases);
}
return new MultipleFieldComparator(aliasedComps);
}
}

View File

@ -19,14 +19,13 @@ package org.apache.solr.client.solrj.io.comp;
import java.io.Serializable;
import java.util.Comparator;
import java.util.Map;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/** Defines a comparator we can use with TupleStreams */
public interface StreamComparator extends Comparator<Tuple>, Expressible, Serializable {
public boolean isDerivedFrom(StreamComparator base);
public StreamComparator copyAliased(Map<String,String> aliases);
}

View File

@ -16,6 +16,6 @@
*/
/**
* Primary APIs for communicating with a Solr Server from a Java client.
* Equalitors for the Streaming Aggregation API
**/
package org.apache.solr.client.solrj.io.eq;

View File

@ -0,0 +1,84 @@
package org.apache.solr.client.solrj.io.ops;
import java.io.IOException;
import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Replaces some tuple value with another. The replacement value can be either a given value or the
* value of another field in the tuple. The expression for a replace operation can be of multiple forms:
* replace(fieldA, 0, withValue=100) // for fieldA if equals 0 then set to 100
* replace(fieldA, null, withValue=0) // for fieldA if null then set to 0
* replace(fieldA, null, withField=fieldB) // for fieldA if null then set to the value of fieldB (if fieldB is null then fieldA will end up as null)
* replace(fieldA, 0, withField=fieldB) // for fieldA if 0 then set to the value of fieldB (if fieldB is 0 then fieldA will end up as 0)
* replace(fieldA, "Izzy and Kayden", withValue="my kids")
*
* You can also construct these without the field name in the expression but that does require that you provide the field name during construction.
* This is most useful during metric calculation because when calculating a metric you have already provided a field name in the metric so there
* is no reason to have to provide the field name again in the operation
* sum(fieldA, replace(null, withValue=0)) // performs the replacement on fieldA
*
* Equality is determined by the standard type .equals() functions.
*/
public class ReplaceOperation implements StreamOperation {
private static final long serialVersionUID = 1;
private StreamOperation replacer;
public ReplaceOperation(StreamExpression expression, StreamFactory factory) throws IOException {
this(null, expression, factory);
}
public ReplaceOperation(String forField, StreamExpression expression, StreamFactory factory) throws IOException {
StreamExpressionNamedParameter withValue = factory.getNamedOperand(expression, "withValue");
StreamExpressionNamedParameter withField = factory.getNamedOperand(expression, "withField");
if(null != withValue && null == withField){
replacer = new ReplaceWithValueOperation(forField, expression, factory);
}
else if(null != withField && null == withValue){
replacer = new ReplaceWithFieldOperation(forField, expression, factory);
}
else if(null != withValue && null != withField){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting either withValue or withField parameter but found both", expression));
}
else{
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting withValue or withField parameter but found neither", expression));
}
}
@Override
public void operate(Tuple tuple) {
replacer.operate(tuple);
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
return replacer.toExpression(factory);
}
}

View File

@ -0,0 +1,112 @@
package org.apache.solr.client.solrj.io.ops;
import java.io.IOException;
import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Implementation of replace(...., withField=fieldName)
* See ReplaceOperation for description.
*/
public class ReplaceWithFieldOperation implements StreamOperation {
private static final long serialVersionUID = 1;
private boolean wasBuiltWithFieldName;
private String originalFieldName;
private Object originalValue;
private String replacementFieldName;
public ReplaceWithFieldOperation(String forField, StreamExpression expression, StreamFactory factory) throws IOException {
if(2 == expression.getParameters().size()){
wasBuiltWithFieldName = false;
this.originalFieldName = forField;
this.originalValue = factory.constructPrimitiveObject(factory.getValueOperand(expression, 0));
}
else if(3 == expression.getParameters().size()){
wasBuiltWithFieldName = true;
this.originalFieldName = factory.getValueOperand(expression, 0);
this.originalValue = factory.constructPrimitiveObject(factory.getValueOperand(expression, 1));
}
else{
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
}
StreamExpressionNamedParameter replacementParameter = factory.getNamedOperand(expression, "withField");
if(null == replacementParameter){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a parameter named 'withField' but didn't find one.", expression));
}
if(!(replacementParameter.getParameter() instanceof StreamExpressionValue)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting parameter named 'withField' to be a field name.", expression));
}
this.replacementFieldName = ((StreamExpressionValue)replacementParameter.getParameter()).getValue();
}
@Override
public void operate(Tuple tuple) {
if(matchesOriginal(tuple)){
replace(tuple);
}
}
private boolean matchesOriginal(Tuple tuple){
Object value = tuple.get(originalFieldName);
if(null == value){
return null == originalValue;
}
else if(null != originalValue){
return originalValue.equals(value);
}
return false;
}
private void replace(Tuple tuple){
tuple.put(originalFieldName, tuple.get(replacementFieldName));
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
if(wasBuiltWithFieldName){
expression.addParameter(originalFieldName);
}
expression.addParameter(null == originalValue ? "null" : originalValue.toString());
expression.addParameter(new StreamExpressionNamedParameter("withField", replacementFieldName));
return expression;
}
}

View File

@ -0,0 +1,117 @@
package org.apache.solr.client.solrj.io.ops;
import java.io.IOException;
import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Implementation of replace(...., withValue="some value")
* See ReplaceOperation for description.
*/
public class ReplaceWithValueOperation implements StreamOperation {
private static final long serialVersionUID = 1;
private boolean wasBuiltWithFieldName;
private String fieldName;
private Object original;
private Object replacement;
public ReplaceWithValueOperation(String forField, StreamExpression expression, StreamFactory factory) throws IOException {
if(2 == expression.getParameters().size()){
wasBuiltWithFieldName = false;
this.fieldName = forField;
this.original = factory.constructPrimitiveObject(factory.getValueOperand(expression, 0));
}
else if(3 == expression.getParameters().size()){
wasBuiltWithFieldName = true;
this.fieldName = factory.getValueOperand(expression, 0);
this.original = factory.constructPrimitiveObject(factory.getValueOperand(expression, 1));
}
else{
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
}
StreamExpressionNamedParameter replacementParameter = factory.getNamedOperand(expression, "withValue");
if(null == replacementParameter){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a parameter named 'withValue' but didn't find one.", expression));
}
if(!(replacementParameter.getParameter() instanceof StreamExpressionValue)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting parameter named 'withValue' to be a primitive type.", expression));
}
this.replacement = factory.constructPrimitiveObject(((StreamExpressionValue)replacementParameter.getParameter()).getValue());
}
@Override
public void operate(Tuple tuple) {
if(matchesOriginal(tuple)){
replace(tuple);
}
}
private boolean matchesOriginal(Tuple tuple){
Object value = tuple.get(fieldName);
if(null == value){
return null == original;
}
else if(null != original){
return original.equals(value);
}
return false;
}
private void replace(Tuple tuple){
if(null == replacement){
tuple.remove(fieldName);
}
else{
tuple.put(fieldName, replacement);
}
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
if(wasBuiltWithFieldName){
expression.addParameter(fieldName);
}
expression.addParameter(null == original ? "null" : original.toString());
expression.addParameter(new StreamExpressionNamedParameter("withValue", null == replacement ? "null" : replacement.toString()));
return expression;
}
}

View File

@ -0,0 +1,33 @@
package org.apache.solr.client.solrj.io.ops;
import java.io.IOException;
import java.io.Serializable;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Interface for any operation one can perform on a tuple in a TupleStream
*/
public interface StreamOperation extends Expressible, Serializable {
public void operate(Tuple tuple);
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Operations for the Streaming Aggregation API
**/
package org.apache.solr.client.solrj.io.ops;

View File

@ -1,76 +0,0 @@
package org.apache.solr.client.solrj.io.stream;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
public class EditStream extends TupleStream {
private static final long serialVersionUID = 1;
private TupleStream stream;
private List<String> remove;
public EditStream(TupleStream stream, List<String> remove) {
this.stream = stream;
this.remove = remove;
}
public void setStreamContext(StreamContext context) {
this.stream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
l.add(stream);
return l;
}
public void open() throws IOException {
stream.open();
}
public void close() throws IOException {
stream.close();
}
public Tuple read() throws IOException {
Tuple tuple = stream.read();
if(tuple.EOF) {
return tuple;
} else {
for(String key : remove) {
tuple.remove(key);
}
return tuple;
}
}
public StreamComparator getStreamSort() {
return stream.getStreamSort();
}
public int getCost() {
return 0;
}
}

View File

@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.BiConsumer;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.ops.StreamOperation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/**
* Selects fields from the incoming stream and applies optional field renaming.
* Does not reorder the outgoing stream.
**/
public class SelectStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1;
private TupleStream stream;
private Map<String,String> selectedFields;
private List<StreamOperation> operations;
public SelectStream(TupleStream stream, List<String> selectedFields) throws IOException {
this.stream = stream;
this.selectedFields = new HashMap<>();
for(String selectedField : selectedFields){
this.selectedFields.put(selectedField, selectedField);
}
operations = new ArrayList<>();
}
public SelectStream(TupleStream stream, Map<String,String> selectedFields) throws IOException {
this.stream = stream;
this.selectedFields = selectedFields;
operations = new ArrayList<>();
}
public SelectStream(StreamExpression expression,StreamFactory factory) throws IOException {
// grab all parameters out
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
List<StreamExpressionParameter> selectFieldsExpressions = factory.getOperandsOfType(expression, StreamExpressionValue.class);
List<StreamExpression> operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, StreamOperation.class);
// validate expression contains only what we want.
if(expression.getParameters().size() != streamExpressions.size() + selectFieldsExpressions.size() + operationExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
}
if(1 != streamExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting single stream but found %d (must be TupleStream types)",expression, streamExpressions.size()));
}
if(0 == selectFieldsExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting at least one select field but found %d",expression, streamExpressions.size()));
}
stream = factory.constructStream(streamExpressions.get(0));
selectedFields = new HashMap<String,String>(selectFieldsExpressions.size());
for(StreamExpressionParameter parameter : selectFieldsExpressions){
StreamExpressionValue selectField = (StreamExpressionValue)parameter;
String value = selectField.getValue().trim().toLowerCase(Locale.ROOT);
if(value.contains(" as ")){
String[] parts = value.split(" as ");
if(2 != parts.length){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting select field of form 'fieldA' or 'fieldA as alias' but found %s",expression, value));
}
selectedFields.put(parts[0].trim(), parts[1].trim());
}
else{
selectedFields.put(value,value);
}
}
operations = new ArrayList<>();
for(StreamExpression expr : operationExpressions){
operations.add(factory.constructOperation(expr));
}
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// stream
if(stream instanceof Expressible){
expression.addParameter(((Expressible)stream).toExpression(factory));
}
else{
throw new IOException("This SelectStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
// selects
for(Map.Entry<String, String> selectField : selectedFields.entrySet()) {
if(selectField.getKey().equals(selectField.getValue())){
expression.addParameter(selectField.getKey());
}
else{
expression.addParameter(String.format(Locale.ROOT, "%s as %s", selectField.getKey(), selectField.getValue()));
}
}
for(StreamOperation operation : operations){
expression.addParameter(operation.toExpression(factory));
}
return expression;
}
public void setStreamContext(StreamContext context) {
this.stream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
l.add(stream);
return l;
}
public void open() throws IOException {
stream.open();
}
public void close() throws IOException {
stream.close();
}
public Tuple read() throws IOException {
Tuple original = stream.read();
if(original.EOF){
return original;
}
// create a copy with the limited set of fields
Tuple working = new Tuple(new HashMap<>());
for(Object fieldName : original.fields.keySet()){
if(selectedFields.containsKey(fieldName)){
working.put(selectedFields.get(fieldName), original.get(fieldName));
}
}
// apply all operations
for(StreamOperation operation : operations){
operation.operate(working);
}
return working;
}
/** Return the stream sort - ie, the order in which records are returned */
public StreamComparator getStreamSort(){
// apply aliasing to comparator
return stream.getStreamSort().copyAliased(selectedFields);
}
public int getCost() {
return 0;
}
}

View File

@ -16,6 +16,7 @@ import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.ops.StreamOperation;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
@ -281,6 +282,22 @@ public class StreamFactory implements Serializable {
return (StreamEqualitor)createInstance(equalitorType, new Class[]{ String.class, String.class }, new Object[]{ leftFieldName, rightFieldName });
}
}
public Metric constructOperation(String expressionClause) throws IOException {
return constructMetric(StreamExpressionParser.parse(expressionClause));
}
public StreamOperation constructOperation(StreamExpression expression) throws IOException{
String function = expression.getFunctionName();
if(functionNames.containsKey(function)){
Class clazz = functionNames.get(function);
if(Expressible.class.isAssignableFrom(clazz) && StreamOperation.class.isAssignableFrom(clazz)){
return (StreamOperation)createInstance(functionNames.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
}
}
throw new IOException(String.format(Locale.ROOT,"Invalid operation expression %s - function '%s' is unknown (not mapped to a valid StreamOperation)", expression, expression.getFunctionName()));
}
public <T> T createInstance(Class<T> clazz, Class<?>[] paramTypes, Object[] params) throws IOException{
Constructor<T> ctor;
@ -307,4 +324,17 @@ public class StreamFactory implements Serializable {
throw new IOException(String.format(Locale.ROOT, "Unable to find function name for class '%s'", clazz.getName()));
}
public Object constructPrimitiveObject(String original){
String lower = original.trim().toLowerCase(Locale.ROOT);
if("null".equals(lower)){ return null; }
if("true".equals(lower) || "false".equals(lower)){ return Boolean.parseBoolean(lower); }
try{ return Long.valueOf(original); } catch(Exception e){};
try{ if (original.matches(".{1,8}")){ return Float.valueOf(original); }} catch(Exception e){};
try{ if (original.matches(".{1,17}")){ return Double.valueOf(original); }} catch(Exception e){};
// is a string
return original;
}
}

View File

@ -21,11 +21,13 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
@ -134,6 +136,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
testLeftOuterJoinStream();
testHashJoinStream();
testOuterHashJoinStream();
testSelectStream();
}
private void testCloudSolrStream() throws Exception {
@ -1281,6 +1284,104 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
commit();
}
private void testSelectStream() throws Exception {
indexr(id, "1", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9
indexr(id, "15", "side_s", "left", "join1_i", "0", "join2_s", "a", "ident_s", "left_1"); // 8, 9
indexr(id, "2", "side_s", "left", "join1_i", "0", "join2_s", "b", "ident_s", "left_2");
indexr(id, "3", "side_s", "left", "join1_i", "1", "join2_s", "a", "ident_s", "left_3"); // 10
indexr(id, "4", "side_s", "left", "join1_i", "1", "join2_s", "b", "ident_s", "left_4"); // 11
indexr(id, "5", "side_s", "left", "join1_i", "1", "join2_s", "c", "ident_s", "left_5"); // 12
indexr(id, "6", "side_s", "left", "join1_i", "2", "join2_s", "d", "ident_s", "left_6");
indexr(id, "7", "side_s", "left", "join1_i", "3", "join2_s", "e", "ident_s", "left_7"); // 14
indexr(id, "8", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_1", "join3_i", "0"); // 1,15
indexr(id, "9", "side_s", "right", "join1_i", "0", "join2_s", "a", "ident_s", "right_2", "join3_i", "0"); // 1,15
indexr(id, "10", "side_s", "right", "join1_i", "1", "join2_s", "a", "ident_s", "right_3", "join3_i", "1"); // 3
indexr(id, "11", "side_s", "right", "join1_i", "1", "join2_s", "b", "ident_s", "right_4", "join3_i", "1"); // 4
indexr(id, "12", "side_s", "right", "join1_i", "1", "join2_s", "c", "ident_s", "right_5", "join3_i", "1"); // 5
indexr(id, "13", "side_s", "right", "join1_i", "2", "join2_s", "dad", "ident_s", "right_6", "join3_i", "2");
indexr(id, "14", "side_s", "right", "join1_i", "3", "join2_s", "e", "ident_s", "right_7", "join3_i", "3"); // 7
commit();
String clause;
TupleStream stream;
List<Tuple> tuples;
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("replace", ReplaceOperation.class);
// Basic test
clause = "select("
+ "id, join1_i as join1, join2_s as join2, ident_s as identity,"
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\")"
+ ")";
stream = factory.constructStream(clause);
tuples = getTuples(stream);
assertFields(tuples, "id", "join1", "join2", "identity");
assertNotFields(tuples, "join1_i", "join2_s", "ident_s");
// Basic with replacements test
clause = "select("
+ "id, join1_i as join1, join2_s as join2, ident_s as identity,"
+ "replace(join1, 0, withValue=12), replace(join1, 3, withValue=12), replace(join1, 2, withField=join2),"
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\")"
+ ")";
stream = factory.constructStream(clause);
tuples = getTuples(stream);
assertFields(tuples, "id", "join1", "join2", "identity");
assertNotFields(tuples, "join1_i", "join2_s", "ident_s");
assertLong(tuples.get(0), "join1", 12);
assertLong(tuples.get(1), "join1", 12);
assertLong(tuples.get(2), "join1", 12);
assertLong(tuples.get(7), "join1", 12);
assertString(tuples.get(6), "join1", "d");
// Inner stream test
clause = "innerJoin("
+ "select("
+ "id, join1_i as left.join1, join2_s as left.join2, ident_s as left.ident,"
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\")"
+ "),"
+ "select("
+ "join3_i as right.join1, join2_s as right.join2, ident_s as right.ident,"
+ "search(collection1, q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\"),"
+ "),"
+ "on=\"left.join1=right.join1, left.join2=right.join2\""
+ ")";
stream = factory.constructStream(clause);
tuples = getTuples(stream);
assertFields(tuples, "id", "left.join1", "left.join2", "left.ident", "right.join1", "right.join2", "right.ident");
// Wrapped select test
clause = "select("
+ "id, left.ident, right.ident,"
+ "innerJoin("
+ "select("
+ "id, join1_i as left.join1, join2_s as left.join2, ident_s as left.ident,"
+ "search(collection1, q=\"side_s:left\", fl=\"id,join1_i,join2_s,ident_s\", sort=\"join1_i asc, join2_s asc, id asc\")"
+ "),"
+ "select("
+ "join3_i as right.join1, join2_s as right.join2, ident_s as right.ident,"
+ "search(collection1, q=\"side_s:right\", fl=\"join3_i,join2_s,ident_s\", sort=\"join3_i asc, join2_s asc\"),"
+ "),"
+ "on=\"left.join1=right.join1, left.join2=right.join2\""
+ ")"
+ ")";
stream = factory.constructStream(clause);
tuples = getTuples(stream);
assertFields(tuples, "id", "left.ident", "right.ident");
assertNotFields(tuples, "left.join1", "left.join2", "right.join1", "right.join2");
del("*:*");
commit();
}
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
tupleStream.open();
List<Tuple> tuples = new ArrayList<Tuple>();
@ -1305,6 +1406,27 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
}
return true;
}
protected boolean assertFields(List<Tuple> tuples, String ... fields) throws Exception{
for(Tuple tuple : tuples){
for(String field : fields){
if(!tuple.fields.containsKey(field)){
throw new Exception(String.format(Locale.ROOT, "Expected field '%s' not found", field));
}
}
}
return true;
}
protected boolean assertNotFields(List<Tuple> tuples, String ... fields) throws Exception{
for(Tuple tuple : tuples){
for(String field : fields){
if(tuple.fields.containsKey(field)){
throw new Exception(String.format(Locale.ROOT, "Unexpected field '%s' found", field));
}
}
}
return true;
}
protected boolean assertGroupOrder(Tuple tuple, int... ids) throws Exception {
List<?> group = (List<?>)tuple.get("tuples");

View File

@ -0,0 +1,251 @@
package org.apache.solr.client.solrj.io.stream.ops;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Map;
import junit.framework.Assert;
import org.apache.commons.collections.map.HashedMap;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
import org.apache.solr.client.solrj.io.ops.StreamOperation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.junit.Test;
/**
**/
public class OperationsTest extends LuceneTestCase {
StreamFactory factory;
Map<String, Object> values;
public OperationsTest() {
super();
factory = new StreamFactory()
.withFunctionName("replace", ReplaceOperation.class);
values = new HashedMap();
}
@Test
public void replaceValueNullWithString() throws Exception{
Tuple tuple;
StreamOperation operation;
operation = new ReplaceOperation("fieldA", StreamExpressionParser.parse("replace(null, withValue=foo)"), factory);
// replace
values.clear();
values.put("fieldB", "bar");
values.put("fieldC", 123);
tuple = new Tuple(values);
operation.operate(tuple);
Assert.assertNotNull(tuple.get("fieldA"));
Assert.assertEquals("foo", tuple.get("fieldA"));
// don't replace
values.clear();
values.put("fieldA", "exists");
values.put("fieldB", "bar");
values.put("fieldC", 123);
tuple = new Tuple(values);
operation.operate(tuple);
Assert.assertNotNull(tuple.get("fieldA"));
Assert.assertEquals("exists", tuple.get("fieldA"));
}
@Test
public void replaceValueNullWithInt() throws Exception{
Tuple tuple;
StreamOperation operation;
operation = new ReplaceOperation("fieldA", StreamExpressionParser.parse("replace(null, withValue=123)"), factory);
// replace
values.clear();
values.put("fieldB", "bar");
values.put("fieldC", (long)123);
tuple = new Tuple(values);
operation.operate(tuple);
Assert.assertNotNull(tuple.get("fieldA"));
Assert.assertEquals((long)123, tuple.get("fieldA"));
// don't replace
values.clear();
values.put("fieldA", "exists");
values.put("fieldB", "bar");
values.put("fieldC", 123);
tuple = new Tuple(values);
operation.operate(tuple);
Assert.assertNotNull(tuple.get("fieldA"));
Assert.assertEquals("exists", tuple.get("fieldA"));
}
@Test
public void replaceValueNullWithFloat() throws Exception{
Tuple tuple;
StreamOperation operation;
operation = new ReplaceOperation("fieldA", StreamExpressionParser.parse("replace(null, withValue=123.45678)"), factory);
// replace
values.clear();
values.put("fieldB", "bar");
values.put("fieldC", 123);
tuple = new Tuple(values);
operation.operate(tuple);
Assert.assertNotNull(tuple.get("fieldA"));
Assert.assertEquals(123.45678, tuple.get("fieldA"));
// don't replace
values.clear();
values.put("fieldA", "exists");
values.put("fieldB", "bar");
values.put("fieldC", 123);
tuple = new Tuple(values);
operation.operate(tuple);
Assert.assertNotNull(tuple.get("fieldA"));
Assert.assertEquals("exists", tuple.get("fieldA"));
}
@Test
public void replaceValueNullWithDouble() throws Exception{
Tuple tuple;
StreamOperation operation;
operation = new ReplaceOperation("fieldA", StreamExpressionParser.parse("replace(null, withValue=123.45678912345)"), factory);
// replace
values.clear();
values.put("fieldB", "bar");
values.put("fieldC", 123);
tuple = new Tuple(values);
operation.operate(tuple);
Assert.assertNotNull(tuple.get("fieldA"));
Assert.assertEquals(123.45678912345, tuple.get("fieldA"));
// don't replace
values.clear();
values.put("fieldA", "exists");
values.put("fieldB", "bar");
values.put("fieldC", 123);
tuple = new Tuple(values);
operation.operate(tuple);
Assert.assertNotNull(tuple.get("fieldA"));
Assert.assertEquals("exists", tuple.get("fieldA"));
}
@Test
public void replaceFieldNullWithString() throws Exception{
Tuple tuple;
StreamOperation operation;
operation = new ReplaceOperation("fieldA", StreamExpressionParser.parse("replace(null, withField=fieldB)"), factory);
// replace
values.clear();
values.put("fieldB", "bar");
values.put("fieldC", 123);
tuple = new Tuple(values);
operation.operate(tuple);
Assert.assertNotNull(tuple.get("fieldA"));
Assert.assertEquals("bar", tuple.get("fieldA"));
// don't replace
values.clear();
values.put("fieldA", "exists");
values.put("fieldB", "bar");
values.put("fieldC", 123);
tuple = new Tuple(values);
operation.operate(tuple);
Assert.assertNotNull(tuple.get("fieldA"));
Assert.assertEquals("exists", tuple.get("fieldA"));
}
@Test
public void replaceFieldNullWithInt() throws Exception{
Tuple tuple;
StreamOperation operation;
operation = new ReplaceOperation("fieldA", StreamExpressionParser.parse("replace(null, withField=fieldC)"), factory);
// replace
values.clear();
values.put("fieldB", "bar");
values.put("fieldC", 123);
tuple = new Tuple(values);
operation.operate(tuple);
Assert.assertNotNull(tuple.get("fieldA"));
Assert.assertEquals(123, tuple.get("fieldA"));
// don't replace
values.clear();
values.put("fieldA", "exists");
values.put("fieldB", "bar");
values.put("fieldC", 123);
tuple = new Tuple(values);
operation.operate(tuple);
Assert.assertNotNull(tuple.get("fieldA"));
Assert.assertEquals("exists", tuple.get("fieldA"));
}
@Test
public void replaceFieldNullWithNonExistantField() throws Exception{
Tuple tuple;
StreamOperation operation;
operation = new ReplaceOperation("fieldA", StreamExpressionParser.parse("replace(null, withField=fieldD)"), factory);
// replace
values.clear();
values.put("fieldB", "bar");
values.put("fieldC", 123);
tuple = new Tuple(values);
operation.operate(tuple);
Assert.assertNull(tuple.get("fieldA"));
// don't replace
values.clear();
values.put("fieldA", "exists");
values.put("fieldB", "bar");
values.put("fieldC", 123);
tuple = new Tuple(values);
operation.operate(tuple);
Assert.assertNotNull(tuple.get("fieldA"));
Assert.assertEquals("exists", tuple.get("fieldA"));
}
}