SOLR-13625: Add CsvStream, TsvStream Streaming Expressions and supporting Stream Evaluators

This commit is contained in:
Joel Bernstein 2019-07-25 10:57:30 -04:00
parent d1706b36ba
commit d0674866ed
8 changed files with 603 additions and 1 deletions

View File

@ -292,6 +292,11 @@ public class Lang {
.withFunctionName("isNull", IsNullEvaluator.class) .withFunctionName("isNull", IsNullEvaluator.class)
.withFunctionName("matches", MatchesEvaluator.class) .withFunctionName("matches", MatchesEvaluator.class)
.withFunctionName("projectToBorder", ProjectToBorderEvaluator.class) .withFunctionName("projectToBorder", ProjectToBorderEvaluator.class)
.withFunctionName("parseCSV", CsvStream.class)
.withFunctionName("parseTSV", TsvStream.class)
.withFunctionName("double", DoubleEvaluator.class)
.withFunctionName("long", LongEvaluator.class)
.withFunctionName("dateTime", DateEvaluator.class)
// Boolean Stream Evaluators // Boolean Stream Evaluators

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.eval;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
import java.util.Locale;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class DateEvaluator extends RecursiveObjectEvaluator implements ManyValueWorker {
protected static final long serialVersionUID = 1L;
private static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.US);
private SimpleDateFormat parseFormat;
static {
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
}
public DateEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
super(expression, factory);
}
@Override
public Object doWork(Object values[]) throws IOException {
String sdate = values[0].toString();
String template = values[1].toString();
if(sdate.startsWith("\"")) {
sdate =sdate.replace("\"", "");
}
if(template.startsWith("\"")) {
template =template.replace("\"", "");
}
if(parseFormat == null) {
String timeZone = "UTC";
if(values.length == 3) {
timeZone = values[2].toString();
}
parseFormat = new SimpleDateFormat(template, Locale.US);
parseFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
}
try {
Date date = parseFormat.parse(sdate);
return dateFormat.format(date);
} catch(Exception e) {
throw new IOException(e);
}
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.eval;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class DoubleEvaluator extends RecursiveObjectEvaluator implements OneValueWorker {
protected static final long serialVersionUID = 1L;
public DoubleEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
super(expression, factory);
if(1 != containedEvaluators.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting exactly 1 value but found %d",expression,containedEvaluators.size()));
}
}
@Override
public Object doWork(Object value){
if(null == value){
return null;
}
else if(value instanceof List){
return ((List<?>)value).stream().map(innerValue -> doWork(innerValue)).collect(Collectors.toList());
}
else{
return Double.valueOf(value.toString());
}
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.eval;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class LongEvaluator extends RecursiveObjectEvaluator implements OneValueWorker {
protected static final long serialVersionUID = 1L;
public LongEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
super(expression, factory);
if(1 != containedEvaluators.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting exactly 1 value but found %d",expression,containedEvaluators.size()));
}
}
@Override
public Object doWork(Object value){
if(null == value){
return null;
}
else if(value instanceof List){
return ((List<?>)value).stream().map(innerValue -> doWork(innerValue)).collect(Collectors.toList());
}
else{
return Long.valueOf(value.toString());
}
}
}

View File

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class CsvStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1;
private String[] headers;
private String currentFile;
private int lineNumber;
protected TupleStream originalStream;
public CsvStream(StreamExpression expression,StreamFactory factory) throws IOException {
// grab all parameters out
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
// validate expression contains only what we want.
if(expression.getParameters().size() != streamExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
}
if(1 != streamExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
}
init(factory.constructStream(streamExpressions.get(0)));
}
private void init(TupleStream stream) throws IOException{
this.originalStream = stream;
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
if(includeStreams){
// streams
if(originalStream instanceof Expressible){
expression.addParameter(((Expressible)originalStream).toExpression(factory));
}
else{
throw new IOException("This CsvStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
expression.addParameter("<stream>");
}
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[] {
originalStream.toExplanation(factory)
// we're not including that this is wrapped with a ReducerStream stream because that's just an implementation detail
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory, false).toString());
}
public void setStreamContext(StreamContext context) {
this.originalStream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList<TupleStream>();
l.add(originalStream);
return l;
}
public void open() throws IOException {
originalStream.open();
}
public void close() throws IOException {
originalStream.close();
}
public Tuple read() throws IOException {
Tuple tuple = originalStream.read();
++lineNumber;
if(tuple.EOF) {
return tuple;
} else {
String file = tuple.getString("file");
String line = tuple.getString("line");
if (file.equals(currentFile)) {
String[] fields = split(line);
if(fields.length != headers.length) {
throw new IOException("Headers and lines must have the same number of fields [file:"+file+" line number:"+lineNumber+"]");
}
Tuple out = new Tuple(new HashMap());
for(int i=0; i<headers.length; i++) {
if(fields[i] != null && fields[i].length() > 0) {
out.put(headers[i], fields[i]);
}
}
return out;
} else {
this.currentFile = file;
this.headers = split(line);
this.lineNumber = 1; //New file so reset the lineNumber
return read();
}
}
}
protected String[] split(String line) {
String[] fields = line.split(",(?=([^\"]|\"[^\"]*\")*$)",-1);
for(int i=0; i<fields.length; i++) {
String f = fields[i];
if(f.startsWith("\"") && f.endsWith("\"")) {
f = f.substring(1, f.length()-1);
fields[i] = f;
}
}
return fields;
}
/** Return the stream sort - ie, the order in which records are returned */
public StreamComparator getStreamSort(){
return originalStream.getStreamSort();
}
public int getCost() {
return 0;
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class TsvStream extends CsvStream implements Expressible {
private static final long serialVersionUID = 1;
public TsvStream(StreamExpression expression,StreamFactory factory) throws IOException {
super(expression, factory);
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
if(includeStreams){
// streams
if(originalStream instanceof Expressible){
expression.addParameter(((Expressible)originalStream).toExpression(factory));
}
else{
throw new IOException("This TsvStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
expression.addParameter("<stream>");
}
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[] {
originalStream.toExplanation(factory)
// we're not including that this is wrapped with a ReducerStream stream because that's just an implementation detail
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory, false).toString());
}
protected String[] split(String line) {
String[] parts = line.split("\\t", -1);
for(String s : parts) {
System.out.println("part:"+s+":"+line.length()+":"+line+":");
}
return parts;
}
}

View File

@ -76,7 +76,7 @@ public class TestLang extends SolrTestCase {
"getAmplitude", "getPhase", "getAngularFrequency", "enclosingDisk", "getCenter", "getRadius", "getAmplitude", "getPhase", "getAngularFrequency", "enclosingDisk", "getCenter", "getRadius",
"getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim", "export", "getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim", "export",
"zplot", "natural", "repeat", "movingMAD", "hashRollup", "noop", "var", "stddev", "recNum", "isNull", "zplot", "natural", "repeat", "movingMAD", "hashRollup", "noop", "var", "stddev", "recNum", "isNull",
"notNull", "matches", "projectToBorder"}; "notNull", "matches", "projectToBorder", "double", "long", "parseCSV", "parseTSV", "dateTime"};
@Test @Test
public void testLang() { public void testLang() {

View File

@ -3151,6 +3151,175 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
} }
} }
@Test
public void testParseCSV() throws Exception {
String expr = "parseCSV(list(tuple(file=\"file1\", line=\"a,b,c\"), " +
" tuple(file=\"file1\", line=\"1,2,3\")," +
" tuple(file=\"file1\", line=\"\\\"hello, world\\\",9000,20\")," +
" tuple(file=\"file2\", line=\"field_1,field_2,field_3\"), "+
" tuple(file=\"file2\", line=\"8,9,\")))";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", expr);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
TupleStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertEquals(tuples.size(), 3);
assertEquals(tuples.get(0).getString("a"), "1");
assertEquals(tuples.get(0).getString("b"), "2");
assertEquals(tuples.get(0).getString("c"), "3");
assertEquals(tuples.get(1).getString("a"), "hello, world");
assertEquals(tuples.get(1).getString("b"), "9000");
assertEquals(tuples.get(1).getString("c"), "20");
assertEquals(tuples.get(2).getString("field_1"), "8");
assertEquals(tuples.get(2).getString("field_2"), "9");
assertNull(tuples.get(2).get("field_3"));
}
@Test
public void testParseTSV() throws Exception {
String expr = "parseTSV(list(tuple(file=\"file1\", line=\"a\tb\tc\"), " +
" tuple(file=\"file1\", line=\"1\t2\t3\")," +
" tuple(file=\"file1\", line=\"hello, world\t9000\t20\")," +
" tuple(file=\"file2\", line=\"field_1\tfield_2\tfield_3\"), "+
" tuple(file=\"file2\", line=\"8\t\t9\")))";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", expr);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
TupleStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertEquals(tuples.size(), 3);
assertEquals(tuples.get(0).getString("a"), "1");
assertEquals(tuples.get(0).getString("b"), "2");
assertEquals(tuples.get(0).getString("c"), "3");
assertEquals(tuples.get(1).getString("a"), "hello, world");
assertEquals(tuples.get(1).getString("b"), "9000");
assertEquals(tuples.get(1).getString("c"), "20");
assertEquals(tuples.get(2).getString("field_1"), "8");
assertNull(tuples.get(2).get("field_2"));
assertEquals(tuples.get(2).getString("field_3"), "9");
}
@Test
public void testDateTime() throws Exception {
String expr = "select(list(tuple(a=20001011:10:11:01), tuple(a=20071011:14:30:20)), dateTime(a, yyyyMMdd:kk:mm:ss) as date)";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", expr);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
TupleStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
String date = (String)tuples.get(0).get("date");
assertEquals(date, "2000-10-11T10:11:01Z");
date = (String)tuples.get(1).get("date");
assertEquals(date, "2007-10-11T14:30:20Z");
}
@Test
public void testDateTimeTZ() throws Exception {
String expr = "select(list(tuple(a=20001011), tuple(a=20071011)), dateTime(a, yyyyMMdd, UTC) as date, dateTime(a, yyyyMMdd, EST) as date1, dateTime(a, yyyyMMdd) as date2)";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", expr);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
TupleStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
String date = (String)tuples.get(0).get("date");
String date1 = (String)tuples.get(0).get("date1");
String date2 = (String)tuples.get(0).get("date2");
assertEquals(date, "2000-10-11T00:00:00Z");
assertEquals(date1, "2000-10-11T05:00:00Z");
assertEquals(date2, "2000-10-11T00:00:00Z");
date = (String)tuples.get(1).get("date");
date1 = (String)tuples.get(1).get("date1");
date2 = (String)tuples.get(1).get("date2");
assertEquals(date, "2007-10-11T00:00:00Z");
assertEquals(date1, "2007-10-11T05:00:00Z");
assertEquals(date2, "2007-10-11T00:00:00Z");
}
@Test
public void testDoubleLong() throws Exception {
String expr = "select(tuple(d=\"1.1\", l=\"5000\"), double(d) as d, long(l) as l)";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", expr);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
TupleStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertEquals(tuples.size(), 1);
assertTrue(tuples.get(0).get("d") instanceof Double);
assertTrue(tuples.get(0).get("l") instanceof Long);
assertEquals(tuples.get(0).getDouble("d"), 1.1D, 0);
assertEquals(tuples.get(0).getLong("l").longValue(), 5000L);
}
public void testDoubleLongArray() throws Exception {
String expr = "let(a=list(tuple(d=\"1.1\", l=\"5000\"), tuple(d=\"1.3\", l=\"7000\"))," +
" b=col(a, d)," +
" c=col(a, l)," +
" tuple(doubles=double(b)," +
" longs=long(c)))";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", expr);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
TupleStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertEquals(tuples.size(), 1);
List<Double> doubles = (List<Double>)tuples.get(0).get("doubles");
List<Long> longs = (List<Long>)tuples.get(0).get("longs");
assertEquals(doubles.get(0), 1.1, 0);
assertEquals(doubles.get(1), 1.3, 0);
assertEquals(longs.get(0).longValue(), 5000L);
assertEquals(longs.get(1).longValue(), 7000L);
}
@Test @Test
public void testCommitStream() throws Exception { public void testCommitStream() throws Exception {