diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java index d64e320ca7d..60ce437c945 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java @@ -292,6 +292,11 @@ public class Lang { .withFunctionName("isNull", IsNullEvaluator.class) .withFunctionName("matches", MatchesEvaluator.class) .withFunctionName("projectToBorder", ProjectToBorderEvaluator.class) + .withFunctionName("parseCSV", CsvStream.class) + .withFunctionName("parseTSV", TsvStream.class) + .withFunctionName("double", DoubleEvaluator.class) + .withFunctionName("long", LongEvaluator.class) + .withFunctionName("dateTime", DateEvaluator.class) // Boolean Stream Evaluators diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DateEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DateEvaluator.java new file mode 100644 index 00000000000..582e3b6851a --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DateEvaluator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.eval; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.TimeZone; +import java.util.Locale; + +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +public class DateEvaluator extends RecursiveObjectEvaluator implements ManyValueWorker { + protected static final long serialVersionUID = 1L; + + private static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.US); + private SimpleDateFormat parseFormat; + + + static { + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + } + + public DateEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{ + super(expression, factory); + } + + @Override + public Object doWork(Object values[]) throws IOException { + String sdate = values[0].toString(); + String template = values[1].toString(); + + if(sdate.startsWith("\"")) { + sdate =sdate.replace("\"", ""); + } + + if(template.startsWith("\"")) { + template =template.replace("\"", ""); + } + + + if(parseFormat == null) { + String timeZone = "UTC"; + if(values.length == 3) { + timeZone = values[2].toString(); + } + parseFormat = new SimpleDateFormat(template, Locale.US); + parseFormat.setTimeZone(TimeZone.getTimeZone(timeZone)); + } + + try { + Date date = parseFormat.parse(sdate); + return dateFormat.format(date); + } catch(Exception e) { + throw new IOException(e); + } + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DoubleEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DoubleEvaluator.java new file mode 100644 index 00000000000..7fce45f0f86 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DoubleEvaluator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.eval; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; + +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +public class DoubleEvaluator extends RecursiveObjectEvaluator implements OneValueWorker { + protected static final long serialVersionUID = 1L; + + public DoubleEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{ + super(expression, factory); + + if(1 != containedEvaluators.size()){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting exactly 1 value but found %d",expression,containedEvaluators.size())); + } + } + + @Override + public Object doWork(Object value){ + if(null == value){ + return null; + } + else if(value instanceof List){ + return ((List)value).stream().map(innerValue -> doWork(innerValue)).collect(Collectors.toList()); + } + else{ + return Double.valueOf(value.toString()); + } + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/LongEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/LongEvaluator.java new file mode 100644 index 00000000000..4547d8cdf83 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/LongEvaluator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.eval; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; + +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +public class LongEvaluator extends RecursiveObjectEvaluator implements OneValueWorker { + protected static final long serialVersionUID = 1L; + + public LongEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{ + super(expression, factory); + + if(1 != containedEvaluators.size()){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting exactly 1 value but found %d",expression,containedEvaluators.size())); + } + } + + @Override + public Object doWork(Object value){ + if(null == value){ + return null; + } + else if(value instanceof List){ + return ((List)value).stream().map(innerValue -> doWork(innerValue)).collect(Collectors.toList()); + } + else{ + return Long.valueOf(value.toString()); + } + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CsvStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CsvStream.java new file mode 100644 index 00000000000..b8c479e9aba --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CsvStream.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +public class CsvStream extends TupleStream implements Expressible { + + private static final long serialVersionUID = 1; + private String[] headers; + private String currentFile; + private int lineNumber; + + protected TupleStream originalStream; + + public CsvStream(StreamExpression expression,StreamFactory factory) throws IOException { + // grab all parameters out + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); + + // validate expression contains only what we want. + if(expression.getParameters().size() != streamExpressions.size()){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression)); + } + + if(1 != streamExpressions.size()){ + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size())); + } + + init(factory.constructStream(streamExpressions.get(0))); + } + + private void init(TupleStream stream) throws IOException{ + this.originalStream = stream; + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + if(includeStreams){ + // streams + if(originalStream instanceof Expressible){ + expression.addParameter(((Expressible)originalStream).toExpression(factory)); + } + else{ + throw new IOException("This CsvStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + } + else{ + expression.addParameter(""); + } + + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[] { + originalStream.toExplanation(factory) + // we're not including that this is wrapped with a ReducerStream stream because that's just an implementation detail + }) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()); + } + + public void setStreamContext(StreamContext context) { + this.originalStream.setStreamContext(context); + } + + public List children() { + List l = new ArrayList(); + l.add(originalStream); + return l; + } + + public void open() throws IOException { + originalStream.open(); + } + + public void close() throws IOException { + originalStream.close(); + } + + public Tuple read() throws IOException { + Tuple tuple = originalStream.read(); + ++lineNumber; + if(tuple.EOF) { + return tuple; + } else { + String file = tuple.getString("file"); + String line = tuple.getString("line"); + if (file.equals(currentFile)) { + String[] fields = split(line); + if(fields.length != headers.length) { + throw new IOException("Headers and lines must have the same number of fields [file:"+file+" line number:"+lineNumber+"]"); + } + Tuple out = new Tuple(new HashMap()); + for(int i=0; i 0) { + out.put(headers[i], fields[i]); + } + } + return out; + } else { + this.currentFile = file; + this.headers = split(line); + this.lineNumber = 1; //New file so reset the lineNumber + return read(); + } + } + } + + protected String[] split(String line) { + String[] fields = line.split(",(?=([^\"]|\"[^\"]*\")*$)",-1); + for(int i=0; i"); + } + + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[] { + originalStream.toExplanation(factory) + // we're not including that this is wrapped with a ReducerStream stream because that's just an implementation detail + }) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()); + } + + protected String[] split(String line) { + String[] parts = line.split("\\t", -1); + for(String s : parts) { + System.out.println("part:"+s+":"+line.length()+":"+line+":"); + } + return parts; + } + +} diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java index 60ac6ec1a4b..f2e6a42abc1 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/TestLang.java @@ -76,7 +76,7 @@ public class TestLang extends SolrTestCase { "getAmplitude", "getPhase", "getAngularFrequency", "enclosingDisk", "getCenter", "getRadius", "getSupportPoints", "pairSort", "log10", "plist", "recip", "pivot", "ltrim", "rtrim", "export", "zplot", "natural", "repeat", "movingMAD", "hashRollup", "noop", "var", "stddev", "recNum", "isNull", - "notNull", "matches", "projectToBorder"}; + "notNull", "matches", "projectToBorder", "double", "long", "parseCSV", "parseTSV", "dateTime"}; @Test public void testLang() { diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java index 504b74b8c07..65815e10cfa 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java @@ -3151,6 +3151,175 @@ public class StreamDecoratorTest extends SolrCloudTestCase { } } + + @Test + public void testParseCSV() throws Exception { + String expr = "parseCSV(list(tuple(file=\"file1\", line=\"a,b,c\"), " + + " tuple(file=\"file1\", line=\"1,2,3\")," + + " tuple(file=\"file1\", line=\"\\\"hello, world\\\",9000,20\")," + + " tuple(file=\"file2\", line=\"field_1,field_2,field_3\"), "+ + " tuple(file=\"file2\", line=\"8,9,\")))"; + ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", expr); + paramsLoc.set("qt", "/stream"); + + String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS; + TupleStream solrStream = new SolrStream(url, paramsLoc); + + StreamContext context = new StreamContext(); + solrStream.setStreamContext(context); + List tuples = getTuples(solrStream); + assertEquals(tuples.size(), 3); + assertEquals(tuples.get(0).getString("a"), "1"); + assertEquals(tuples.get(0).getString("b"), "2"); + assertEquals(tuples.get(0).getString("c"), "3"); + + assertEquals(tuples.get(1).getString("a"), "hello, world"); + assertEquals(tuples.get(1).getString("b"), "9000"); + assertEquals(tuples.get(1).getString("c"), "20"); + + assertEquals(tuples.get(2).getString("field_1"), "8"); + assertEquals(tuples.get(2).getString("field_2"), "9"); + assertNull(tuples.get(2).get("field_3")); + } + + + @Test + public void testParseTSV() throws Exception { + String expr = "parseTSV(list(tuple(file=\"file1\", line=\"a\tb\tc\"), " + + " tuple(file=\"file1\", line=\"1\t2\t3\")," + + " tuple(file=\"file1\", line=\"hello, world\t9000\t20\")," + + " tuple(file=\"file2\", line=\"field_1\tfield_2\tfield_3\"), "+ + " tuple(file=\"file2\", line=\"8\t\t9\")))"; + ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", expr); + paramsLoc.set("qt", "/stream"); + + String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS; + TupleStream solrStream = new SolrStream(url, paramsLoc); + + StreamContext context = new StreamContext(); + solrStream.setStreamContext(context); + List tuples = getTuples(solrStream); + assertEquals(tuples.size(), 3); + assertEquals(tuples.get(0).getString("a"), "1"); + assertEquals(tuples.get(0).getString("b"), "2"); + assertEquals(tuples.get(0).getString("c"), "3"); + + assertEquals(tuples.get(1).getString("a"), "hello, world"); + assertEquals(tuples.get(1).getString("b"), "9000"); + assertEquals(tuples.get(1).getString("c"), "20"); + + assertEquals(tuples.get(2).getString("field_1"), "8"); + assertNull(tuples.get(2).get("field_2")); + assertEquals(tuples.get(2).getString("field_3"), "9"); + + } + + + @Test + public void testDateTime() throws Exception { + String expr = "select(list(tuple(a=20001011:10:11:01), tuple(a=20071011:14:30:20)), dateTime(a, yyyyMMdd:kk:mm:ss) as date)"; + ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", expr); + paramsLoc.set("qt", "/stream"); + + String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS; + TupleStream solrStream = new SolrStream(url, paramsLoc); + + StreamContext context = new StreamContext(); + solrStream.setStreamContext(context); + List tuples = getTuples(solrStream); + String date = (String)tuples.get(0).get("date"); + assertEquals(date, "2000-10-11T10:11:01Z"); + date = (String)tuples.get(1).get("date"); + assertEquals(date, "2007-10-11T14:30:20Z"); + } + + @Test + public void testDateTimeTZ() throws Exception { + String expr = "select(list(tuple(a=20001011), tuple(a=20071011)), dateTime(a, yyyyMMdd, UTC) as date, dateTime(a, yyyyMMdd, EST) as date1, dateTime(a, yyyyMMdd) as date2)"; + ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", expr); + paramsLoc.set("qt", "/stream"); + + String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS; + TupleStream solrStream = new SolrStream(url, paramsLoc); + + StreamContext context = new StreamContext(); + solrStream.setStreamContext(context); + List tuples = getTuples(solrStream); + String date = (String)tuples.get(0).get("date"); + String date1 = (String)tuples.get(0).get("date1"); + String date2 = (String)tuples.get(0).get("date2"); + + assertEquals(date, "2000-10-11T00:00:00Z"); + assertEquals(date1, "2000-10-11T05:00:00Z"); + assertEquals(date2, "2000-10-11T00:00:00Z"); + + + date = (String)tuples.get(1).get("date"); + date1 = (String)tuples.get(1).get("date1"); + date2 = (String)tuples.get(1).get("date2"); + + assertEquals(date, "2007-10-11T00:00:00Z"); + assertEquals(date1, "2007-10-11T05:00:00Z"); + assertEquals(date2, "2007-10-11T00:00:00Z"); + } + + + + @Test + public void testDoubleLong() throws Exception { + String expr = "select(tuple(d=\"1.1\", l=\"5000\"), double(d) as d, long(l) as l)"; + ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", expr); + paramsLoc.set("qt", "/stream"); + + String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS; + TupleStream solrStream = new SolrStream(url, paramsLoc); + + StreamContext context = new StreamContext(); + solrStream.setStreamContext(context); + List tuples = getTuples(solrStream); + assertEquals(tuples.size(), 1); + assertTrue(tuples.get(0).get("d") instanceof Double); + assertTrue(tuples.get(0).get("l") instanceof Long); + + assertEquals(tuples.get(0).getDouble("d"), 1.1D, 0); + assertEquals(tuples.get(0).getLong("l").longValue(), 5000L); + + } + + + public void testDoubleLongArray() throws Exception { + String expr = "let(a=list(tuple(d=\"1.1\", l=\"5000\"), tuple(d=\"1.3\", l=\"7000\"))," + + " b=col(a, d)," + + " c=col(a, l)," + + " tuple(doubles=double(b)," + + " longs=long(c)))"; + ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); + paramsLoc.set("expr", expr); + paramsLoc.set("qt", "/stream"); + + String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS; + TupleStream solrStream = new SolrStream(url, paramsLoc); + + StreamContext context = new StreamContext(); + solrStream.setStreamContext(context); + List tuples = getTuples(solrStream); + assertEquals(tuples.size(), 1); + + List doubles = (List)tuples.get(0).get("doubles"); + List longs = (List)tuples.get(0).get("longs"); + assertEquals(doubles.get(0), 1.1, 0); + assertEquals(doubles.get(1), 1.3, 0); + + assertEquals(longs.get(0).longValue(), 5000L); + assertEquals(longs.get(1).longValue(), 7000L); + } + + @Test public void testCommitStream() throws Exception {