SOLR-11570: Add support for correlation matrices to the corr Stream Evaluator

This commit is contained in:
Joel Bernstein 2017-11-13 19:44:30 -05:00
parent c3513e9281
commit 59360b4617
7 changed files with 280 additions and 138 deletions

View File

@ -204,8 +204,6 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("copyOf", CopyOfEvaluator.class)
.withFunctionName("cov", CovarianceEvaluator.class)
.withFunctionName("corr", CorrelationEvaluator.class)
.withFunctionName("kendallsCorr", KendallsCorrelationEvaluator.class)
.withFunctionName("spearmansCorr", SpearmansCorrelationEvaluator.class)
.withFunctionName("describe", DescribeEvaluator.class)
.withFunctionName("distance", EuclideanDistanceEvaluator.class)
.withFunctionName("manhattanDistance", ManhattanDistanceEvaluator.class)
@ -283,6 +281,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("sumRows", SumRowsEvaluator.class)
.withFunctionName("sumColumns", SumColumnsEvaluator.class)
.withFunctionName("diff", TimeDifferencingEvaluator.class)
.withFunctionName("corrPValues", CorrelationSignificanceEvaluator.class)
// Boolean Stream Evaluators

View File

@ -21,37 +21,118 @@ import java.math.BigDecimal;
import java.util.List;
import java.util.Locale;
import org.apache.commons.math3.linear.Array2DRowRealMatrix;
import org.apache.commons.math3.linear.RealMatrix;
import org.apache.commons.math3.stat.correlation.PearsonsCorrelation;
import org.apache.commons.math3.stat.correlation.KendallsCorrelation;
import org.apache.commons.math3.stat.correlation.SpearmansCorrelation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class CorrelationEvaluator extends RecursiveNumericEvaluator implements TwoValueWorker {
public class CorrelationEvaluator extends RecursiveObjectEvaluator implements ManyValueWorker {
protected static final long serialVersionUID = 1L;
public enum CorrelationType {pearsons, kendalls, spearmans}
private CorrelationType type;
public CorrelationEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
super(expression, factory);
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
if(namedParams.size() > 0) {
if (namedParams.size() > 1) {
throw new IOException("corr function expects only one named parameter 'type'.");
}
StreamExpressionNamedParameter namedParameter = namedParams.get(0);
String name = namedParameter.getName();
if (!name.equalsIgnoreCase("type")) {
throw new IOException("corr function expects only one named parameter 'type'.");
}
String typeParam = namedParameter.getParameter().toString().trim();
this.type= CorrelationType.valueOf(typeParam);
} else {
this.type = CorrelationType.pearsons;
}
}
@Override
public Object doWork(Object first, Object second) throws IOException{
if(null == first){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - null found for the first value",toExpression(constructingFactory)));
public Object doWork(Object ... values) throws IOException{
if(values.length == 2) {
Object first = values[0];
Object second = values[1];
if (null == first) {
throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - null found for the first value", toExpression(constructingFactory)));
}
if(null == second){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - null found for the second value",toExpression(constructingFactory)));
if (null == second) {
throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - null found for the second value", toExpression(constructingFactory)));
}
if(!(first instanceof List<?>)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - found type %s for the first value, expecting a list of numbers",toExpression(constructingFactory), first.getClass().getSimpleName()));
if (!(first instanceof List<?>)) {
throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - found type %s for the first value, expecting a list of numbers", toExpression(constructingFactory), first.getClass().getSimpleName()));
}
if(!(second instanceof List<?>)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - found type %s for the second value, expecting a list of numbers",toExpression(constructingFactory), first.getClass().getSimpleName()));
if (!(second instanceof List<?>)) {
throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - found type %s for the second value, expecting a list of numbers", toExpression(constructingFactory), first.getClass().getSimpleName()));
}
if (type.equals(CorrelationType.pearsons)) {
PearsonsCorrelation pearsonsCorrelation = new PearsonsCorrelation();
return pearsonsCorrelation.correlation(
((List)first).stream().mapToDouble(value -> ((BigDecimal)value).doubleValue()).toArray(),
((List)second).stream().mapToDouble(value -> ((BigDecimal)value).doubleValue()).toArray()
((List) first).stream().mapToDouble(value -> ((BigDecimal) value).doubleValue()).toArray(),
((List) second).stream().mapToDouble(value -> ((BigDecimal) value).doubleValue()).toArray()
);
} else if (type.equals(CorrelationType.kendalls)) {
KendallsCorrelation kendallsCorrelation = new KendallsCorrelation();
return kendallsCorrelation.correlation(
((List) first).stream().mapToDouble(value -> ((BigDecimal) value).doubleValue()).toArray(),
((List) second).stream().mapToDouble(value -> ((BigDecimal) value).doubleValue()).toArray()
);
} else if (type.equals(CorrelationType.spearmans)) {
SpearmansCorrelation spearmansCorrelation = new SpearmansCorrelation();
return spearmansCorrelation.correlation(
((List) first).stream().mapToDouble(value -> ((BigDecimal) value).doubleValue()).toArray(),
((List) second).stream().mapToDouble(value -> ((BigDecimal) value).doubleValue()).toArray()
);
} else {
return null;
}
} else if(values.length == 1) {
if(values[0] instanceof Matrix) {
Matrix matrix = (Matrix)values[0];
double[][] data = matrix.getData();
if (type.equals(CorrelationType.pearsons)) {
PearsonsCorrelation pearsonsCorrelation = new PearsonsCorrelation(data);
RealMatrix corrMatrix = pearsonsCorrelation.getCorrelationMatrix();
double[][] corrMatrixData = corrMatrix.getData();
Matrix realMatrix = new Matrix(corrMatrixData);
realMatrix.addToContext("corr", pearsonsCorrelation);
return realMatrix;
} else if (type.equals(CorrelationType.kendalls)) {
KendallsCorrelation kendallsCorrelation = new KendallsCorrelation(data);
RealMatrix corrMatrix = kendallsCorrelation.getCorrelationMatrix();
double[][] corrMatrixData = corrMatrix.getData();
Matrix realMatrix = new Matrix(corrMatrixData);
realMatrix.addToContext("corr", kendallsCorrelation);
return realMatrix;
} else if (type.equals(CorrelationType.spearmans)) {
SpearmansCorrelation spearmansCorrelation = new SpearmansCorrelation(new Array2DRowRealMatrix(data));
RealMatrix corrMatrix = spearmansCorrelation.getCorrelationMatrix();
double[][] corrMatrixData = corrMatrix.getData();
Matrix realMatrix = new Matrix(corrMatrixData);
realMatrix.addToContext("corr", spearmansCorrelation.getRankCorrelation());
return realMatrix;
} else {
return null;
}
} else {
throw new IOException("corr function operates on either two numeric arrays or a single matrix as parameters.");
}
} else {
throw new IOException("corr function operates on either two numeric arrays or a single matrix as parameters.");
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.eval;
import java.io.IOException;
import java.util.Locale;
import org.apache.commons.math3.linear.RealMatrix;
import org.apache.commons.math3.stat.correlation.PearsonsCorrelation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class CorrelationSignificanceEvaluator extends RecursiveObjectEvaluator implements OneValueWorker {
protected static final long serialVersionUID = 1L;
public CorrelationSignificanceEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
super(expression, factory);
if(1 != containedEvaluators.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting exactly 1 value but found %d",expression,containedEvaluators.size()));
}
}
@Override
public Object doWork(Object value) throws IOException{
if(null == value){
return null;
} else if(value instanceof Matrix) {
Matrix matrix = (Matrix) value;
Object corr = matrix.getContextValue("corr");
if(corr instanceof PearsonsCorrelation) {
PearsonsCorrelation pcorr = (PearsonsCorrelation)corr;
RealMatrix realMatrix = pcorr.getCorrelationPValues();
return new Matrix(realMatrix.getData());
} else {
throw new IOException("Correlation pvalues are only available for Pearsons and Spearmans correlations");
}
} else {
throw new IOException("matrix parameter expected for transpose function");
}
}
}

View File

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.eval;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.Locale;
import org.apache.commons.math3.stat.correlation.KendallsCorrelation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class KendallsCorrelationEvaluator extends RecursiveNumericEvaluator implements TwoValueWorker {
protected static final long serialVersionUID = 1L;
public KendallsCorrelationEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
super(expression, factory);
}
@Override
public Object doWork(Object first, Object second) throws IOException{
if(null == first){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - null found for the first value",toExpression(constructingFactory)));
}
if(null == second){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - null found for the second value",toExpression(constructingFactory)));
}
if(!(first instanceof List<?>)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - found type %s for the first value, expecting a list of numbers",toExpression(constructingFactory), first.getClass().getSimpleName()));
}
if(!(second instanceof List<?>)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - found type %s for the second value, expecting a list of numbers",toExpression(constructingFactory), first.getClass().getSimpleName()));
}
KendallsCorrelation kendallsCorrelation = new KendallsCorrelation();
return kendallsCorrelation.correlation(
((List)first).stream().mapToDouble(value -> ((BigDecimal)value).doubleValue()).toArray(),
((List)second).stream().mapToDouble(value -> ((BigDecimal)value).doubleValue()).toArray()
);
}
}

View File

@ -17,6 +17,8 @@
package org.apache.solr.client.solrj.io.eval;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Iterator;
@ -24,11 +26,24 @@ import java.util.Iterator;
public class Matrix implements Iterable {
private double[][] data;
private Map context = new HashMap();
public Matrix(double[][] data) {
this.data = data;
}
public Map getContext() {
return this.context;
}
public void addToContext(Object key, Object value) {
this.context.put(key, value);
}
public Object getContextValue(Object key) {
return this.context.get(key);
}
public double[][] getData() {
return this.data;
}

View File

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.eval;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.Locale;
import org.apache.commons.math3.stat.correlation.SpearmansCorrelation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class SpearmansCorrelationEvaluator extends RecursiveNumericEvaluator implements TwoValueWorker {
protected static final long serialVersionUID = 1L;
public SpearmansCorrelationEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
super(expression, factory);
}
@Override
public Object doWork(Object first, Object second) throws IOException{
if(null == first){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - null found for the first value",toExpression(constructingFactory)));
}
if(null == second){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - null found for the second value",toExpression(constructingFactory)));
}
if(!(first instanceof List<?>)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - found type %s for the first value, expecting a list of numbers",toExpression(constructingFactory), first.getClass().getSimpleName()));
}
if(!(second instanceof List<?>)){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - found type %s for the second value, expecting a list of numbers",toExpression(constructingFactory), first.getClass().getSimpleName()));
}
SpearmansCorrelation spearmansCorrelation = new SpearmansCorrelation();
return spearmansCorrelation.correlation(
((List)first).stream().mapToDouble(value -> ((BigDecimal)value).doubleValue()).toArray(),
((List)second).stream().mapToDouble(value -> ((BigDecimal)value).doubleValue()).toArray()
);
}
}

View File

@ -5525,7 +5525,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
"count(*), sum(price_f), max(price_f), min(price_f))";
String cexpr = "let(a="+expr+", b=select("+expr+",mult(-1, count(*)) as nvalue), c=col(a, count(*)), d=col(b, nvalue), " +
"tuple(corr=corr(c,d), scorr=spearmansCorr(array(500, 50, 50, 50),d), kcorr=kendallsCorr(array(500, 50, 50, 50),d), d=d))";
"tuple(corr=corr(c,d), scorr=corr(array(500, 50, 50, 50),d, type=spearmans), kcorr=corr(array(500, 50, 50, 50),d, type=kendalls), d=d))";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cexpr);
@ -5544,6 +5544,8 @@ public class StreamExpressionTest extends SolrCloudTestCase {
}
@Test
public void testCovariance() throws Exception {
UpdateRequest updateRequest = new UpdateRequest();
@ -7345,6 +7347,108 @@ public class StreamExpressionTest extends SolrCloudTestCase {
assertEquals(row3.get(2).longValue(), 16);
}
@Test
public void testCorrMatrix() throws Exception {
String cexpr = "let(echo=true," +
"a=array(1,2,3), " +
"b=array(2,4,6), " +
"c=array(4, 8, 52), " +
"d=transpose(matrix(a, b, c)), " +
"f=corr(d), " +
"g=corr(d, type=kendalls), " +
"h=corr(d, type=spearmans)," +
"i=corrPValues(f))";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cexpr);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS;
TupleStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertTrue(tuples.size() == 1);
List<List<Number>> cm = (List<List<Number>>)tuples.get(0).get("f");
assertEquals(cm.size(), 3);
List<Number> row1 = cm.get(0);
assertEquals(row1.size(), 3);
assertEquals(row1.get(0).doubleValue(), 1, 0);
assertEquals(row1.get(1).doubleValue(), 1, 0);
assertEquals(row1.get(2).doubleValue(), 0.901127113779166, 0);
List<Number> row2 = cm.get(1);
assertEquals(row2.size(), 3);
assertEquals(row2.get(0).doubleValue(), 1, 0);
assertEquals(row2.get(1).doubleValue(), 1, 0);
assertEquals(row2.get(2).doubleValue(), 0.901127113779166, 0);
List<Number> row3 = cm.get(2);
assertEquals(row3.size(), 3);
assertEquals(row3.get(0).doubleValue(), 0.901127113779166, 0);
assertEquals(row3.get(1).doubleValue(), 0.901127113779166, 0);
assertEquals(row3.get(2).doubleValue(), 1, 0);
cm = (List<List<Number>>)tuples.get(0).get("g");
assertEquals(cm.size(), 3);
row1 = cm.get(0);
assertEquals(row1.size(), 3);
assertEquals(row1.get(0).doubleValue(), 1, 0);
assertEquals(row1.get(1).doubleValue(), 1, 0);
assertEquals(row1.get(2).doubleValue(), 1, 0);
row2 = cm.get(1);
assertEquals(row2.size(), 3);
assertEquals(row2.get(0).doubleValue(), 1, 0);
assertEquals(row2.get(1).doubleValue(), 1, 0);
assertEquals(row2.get(2).doubleValue(), 1, 0);
row3 = cm.get(2);
assertEquals(row3.size(), 3);
assertEquals(row3.get(0).doubleValue(), 1, 0);
assertEquals(row3.get(1).doubleValue(), 1, 0);
assertEquals(row3.get(2).doubleValue(), 1, 0);
cm = (List<List<Number>>)tuples.get(0).get("h");
assertEquals(cm.size(), 3);
row1 = cm.get(0);
assertEquals(row1.size(), 3);
assertEquals(row1.get(0).doubleValue(), 1, 0);
assertEquals(row1.get(1).doubleValue(), 1, 0);
assertEquals(row1.get(2).doubleValue(), 1, 0);
row2 = cm.get(1);
assertEquals(row2.size(), 3);
assertEquals(row2.get(0).doubleValue(), 1, 0);
assertEquals(row2.get(1).doubleValue(), 1, 0);
assertEquals(row2.get(2).doubleValue(), 1, 0);
row3 = cm.get(2);
assertEquals(row3.size(), 3);
assertEquals(row3.get(0).doubleValue(), 1, 0);
assertEquals(row3.get(1).doubleValue(), 1, 0);
assertEquals(row3.get(2).doubleValue(), 1, 0);
cm = (List<List<Number>>)tuples.get(0).get("i");
assertEquals(cm.size(), 3);
row1 = cm.get(0);
assertEquals(row1.size(), 3);
assertEquals(row1.get(0).doubleValue(), 0, 0);
assertEquals(row1.get(1).doubleValue(), 0, 0);
assertEquals(row1.get(2).doubleValue(), 0.28548201004998375, 0);
row2 = cm.get(1);
assertEquals(row2.size(), 3);
assertEquals(row2.get(0).doubleValue(), 0, 0);
assertEquals(row2.get(1).doubleValue(), 0, 0);
assertEquals(row2.get(2).doubleValue(), 0.28548201004998375, 0);
row3 = cm.get(2);
assertEquals(row3.size(), 3);
assertEquals(row3.get(0).doubleValue(), 0.28548201004998375, 0);
assertEquals(row3.get(1).doubleValue(), 0.28548201004998375, 0);
assertEquals(row3.get(2).doubleValue(), 0, 0);
}
@Test
public void testPrecision() throws Exception {
String cexpr = "let(echo=true, a=precision(array(1.44445, 1, 2.00006), 4), b=precision(1.44445, 4))";