SOLR-14478: Allow the diff Stream Evaluator to operate on the rows of a matrix

This commit is contained in:
Joel Bernstein 2020-05-13 13:53:51 -04:00
parent fe2135963c
commit f1db56afaf
2 changed files with 113 additions and 23 deletions

View File

@ -19,6 +19,7 @@ package org.apache.solr.client.solrj.io.eval;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.ArrayList;
import java.util.Locale; import java.util.Locale;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -26,7 +27,7 @@ import java.util.stream.IntStream;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class TimeDifferencingEvaluator extends RecursiveNumericEvaluator implements ManyValueWorker{ public class TimeDifferencingEvaluator extends RecursiveObjectEvaluator implements ManyValueWorker{
protected static final long serialVersionUID = 1L; protected static final long serialVersionUID = 1L;
@ -38,32 +39,85 @@ public class TimeDifferencingEvaluator extends RecursiveNumericEvaluator impleme
} }
@Override @Override
public Object doWork(Object... values) throws IOException { public Object doWork(Object... values) throws IOException {
if (!(1 == values.length || values.length == 2)){ if (!(1 == values.length || values.length == 2)) {
throw new IOException(String.format(Locale.ROOT,"%s(...) only works with 1 or 2 values but %d were provided", constructingFactory.getFunctionName(getClass()), values.length)); throw new IOException(String.format(Locale.ROOT, "%s(...) only works with 1 or 2 values but %d were provided", constructingFactory.getFunctionName(getClass()), values.length));
} }
List<Number> timeseriesValues = (List<Number> )values[0]; if (values[0] instanceof List) {
Number lagValue = 1; List<Number> timeseriesValues = (List<Number>) values[0];
Number lagValue = 1;
if(1 == values.length) { if (1 == values.length) {
if (!(timeseriesValues instanceof List<?>)) { if (!(timeseriesValues instanceof List<?>)) {
throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - found type %s for the first value, expecting a List", toExpression(constructingFactory), values[0].getClass().getSimpleName())); throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - found type %s for the first value, expecting a List", toExpression(constructingFactory), values[0].getClass().getSimpleName()));
}
if (!(timeseriesValues.size() > 1)) {
throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - found list size of %s for the first value, expecting a List of size > 0.", toExpression(constructingFactory), timeseriesValues.size()));
}
} }
if (!(timeseriesValues.size() > 1)) { if (2 == values.length) {
throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - found list size of %s for the first value, expecting a List of size > 0.", toExpression(constructingFactory), timeseriesValues.size())); lagValue = (Number) values[1];
if (!(lagValue instanceof Number)) {
throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - found type %s for the second value, expecting a Number", toExpression(constructingFactory), values[1].getClass().getSimpleName()));
}
if (lagValue.intValue() > timeseriesValues.size()) {
throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - found a lag size of %s for the second value, the first value has a List size of %s, expecting a lag value less than the List size", toExpression(constructingFactory), lagValue.intValue(), timeseriesValues.size()));
}
} }
final int lag = lagValue.intValue();
return IntStream.range(lag, timeseriesValues.size())
.mapToObj(n -> (timeseriesValues.get(n).doubleValue() - timeseriesValues.get(n - lag).doubleValue()))
.collect(Collectors.toList());
} else if(values[0] instanceof Matrix) {
//Diff each row of the matrix
Matrix matrix = (Matrix)values[0];
double[][] data = matrix.getData();
double[][] diffedData = new double[data.length][];
Number lagValue = 1;
if (2 == values.length) {
lagValue = (Number) values[1];
if (!(lagValue instanceof Number)) {
throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - found type %s for the second value, expecting a Number", toExpression(constructingFactory), values[1].getClass().getSimpleName()));
}
}
int lag = lagValue.intValue();
for(int i=0; i<data.length; i++) {
double[] row = data[i];
List<Double> timeseriesValues = new ArrayList(row.length);
for(double d : row) {
timeseriesValues.add(d);
}
List<Number> diffedList = IntStream.range(lag, timeseriesValues.size())
.mapToObj(n -> (timeseriesValues.get(n).doubleValue() - timeseriesValues.get(n - lag).doubleValue()))
.collect(Collectors.toList());
double[] diffedRow = new double[diffedList.size()];
for(int r=0; r<diffedList.size(); r++) {
diffedRow[r] = diffedList.get(r).doubleValue();
}
diffedData[i] = diffedRow;
}
Matrix diffedMatrix = new Matrix(diffedData);
diffedMatrix.setRowLabels(matrix.getRowLabels());
List<String> columns = matrix.getColumnLabels();
if(columns != null) {
List<String> newColumns = new ArrayList(columns.size() - lag);
for (int i = lag; i < columns.size(); i++) {
newColumns.add(columns.get(i));
}
diffedMatrix.setColumnLabels(newColumns);
}
return diffedMatrix;
} else {
throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - first parameter must be list of matrix", toExpression(constructingFactory)));
} }
if(2 == values.length) {
lagValue = (Number) values[1];
if(!(lagValue instanceof Number)){
throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - found type %s for the second value, expecting a Number", toExpression(constructingFactory), values[1].getClass().getSimpleName()));
}
if (lagValue.intValue() > timeseriesValues.size()) {
throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - found a lag size of %s for the second value, the first value has a List size of %s, expecting a lag value less than the List size", toExpression(constructingFactory), lagValue.intValue(), timeseriesValues.size()));
}
}
final int lag = lagValue.intValue();
return IntStream.range(lag, timeseriesValues.size())
.mapToObj(n -> (timeseriesValues.get(n).doubleValue()-timeseriesValues.get(n-lag).doubleValue()))
.collect(Collectors.toList());
} }
} }

View File

@ -4062,6 +4062,42 @@ public class MathExpressionTest extends SolrCloudTestCase {
assertEquals((double)out.get(20), 22.92, 0.009); assertEquals((double)out.get(20), 22.92, 0.009);
} }
@Test
public void testTimeDifferencingMatrix() throws Exception {
String cexpr = "let(echo=\"c, d\",\n" +
" a=matrix(array(1,2,3,4,5),array(7.5,9,11,15.5,50.2)),\n" +
" b=setColumnLabels(a, array(\"a\",\"b\",\"c\",\"d\",\"e\")),\n" +
" c=diff(b, 2),\n" +
" d=getColumnLabels(c))";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cexpr);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS;
TupleStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertTrue(tuples.size() == 1);
List<List<Number>> matrix = (List<List<Number>>)tuples.get(0).get("c");
List<String> columnsLabels = (List<String>)tuples.get(0).get("d");
assertEquals(columnsLabels.size(), 3);
assertEquals(columnsLabels.get(0), "c");
assertEquals(columnsLabels.get(1), "d");
assertEquals(columnsLabels.get(2), "e");
assertEquals(matrix.size(), 2);
List<Number> row1 = matrix.get(0);
List<Number> row2 = matrix.get(1);
assertEquals(row1.size(), 3);
assertEquals(row1.get(0).doubleValue(), 2.0, 0);
assertEquals(row1.get(1).doubleValue(), 2.0, 0);
assertEquals(row1.get(2).doubleValue(), 2.0, 0);
assertEquals(row2.size(), 3 );
assertEquals(row2.get(0).doubleValue(), 3.5, 0);
assertEquals(row2.get(1).doubleValue(), 6.5, 0);
assertEquals(row2.get(2).doubleValue(), 39.2, 0);
}
@Test @Test
public void testTimeDifferencingDefaultLag() throws Exception { public void testTimeDifferencingDefaultLag() throws Exception {
String cexpr = "diff(array(1709.0, 1621.0, 1973.0, 1812.0, 1975.0, 1862.0, 1940.0, 2013.0, 1596.0, 1725.0, 1676.0, 1814.0, 1615.0, 1557.0, 1891.0, 1956.0, 1885.0, 1623.0, 1903.0, 1997.0))"; String cexpr = "diff(array(1709.0, 1621.0, 1973.0, 1812.0, 1975.0, 1862.0, 1940.0, 2013.0, 1596.0, 1725.0, 1676.0, 1814.0, 1615.0, 1557.0, 1891.0, 1956.0, 1885.0, 1623.0, 1903.0, 1997.0))";