Adding MultiValuesSource support classes and documentation to matrix stats agg module

This commit is contained in:
Nicholas Knize 2016-05-13 13:37:05 -05:00
parent f449666c59
commit 90b8f5d0d8
27 changed files with 1283 additions and 398 deletions

View File

@ -107,4 +107,12 @@ public class ParseField {
public String[] getDeprecatedNames() {
return deprecatedNames;
}
public static class CommonFields {
public static final ParseField FIELD = new ParseField("field");
public static final ParseField FIELDS = new ParseField("fields");
public static final ParseField FORMAT = new ParseField("format");
public static final ParseField MISSING = new ParseField("missing");
public static final ParseField TIME_ZONE = new ParseField("time_zone");
}
}

View File

@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
@ -84,4 +85,8 @@ public abstract class AggregationBuilder
*/
protected abstract AggregationBuilder subAggregations(AggregatorFactories.Builder subFactories);
/** Common xcontent fields shared among aggregator builders */
public static final class CommonFields extends ParseField.CommonFields {
public static final ParseField VALUE_TYPE = new ParseField("value_type");
}
}

View File

@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
@ -250,7 +251,8 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
/**
* Common xcontent fields that are shared among addAggregation
*/
public static final class CommonFields {
public static final class CommonFields extends ParseField.CommonFields {
// todo convert these to ParseField
public static final String META = "meta";
public static final String BUCKETS = "buckets";
public static final String VALUE = "value";

View File

@ -76,6 +76,21 @@ public class ValuesSourceConfig<VS extends ValuesSource> {
return this;
}
public ValuesSourceConfig<VS> format(final DocValueFormat format) {
this.format = format;
return this;
}
public ValuesSourceConfig<VS> missing(final Object missing) {
this.missing = missing;
return this;
}
public ValuesSourceConfig<VS> timezone(final DateTimeZone timeZone) {
this.timeZone= timeZone;
return this;
}
public DocValueFormat format() {
return format;
}

View File

@ -18,6 +18,10 @@ These settings can be dynamically updated on a live cluster with the
The modules in this section are:
<<modules-aggregations-matrix,Matrix Aggregations>>::
A family of aggregations that operate on multiple document fields and produce a matrix as output.
<<modules-cluster,Cluster-level routing and shard allocation>>::
Settings to control where, when, and how shards are allocated to nodes.
@ -80,6 +84,8 @@ The modules in this section are:
--
include::modules/aggregations-matrix.asciidoc[]
include::modules/cluster.asciidoc[]
include::modules/discovery.asciidoc[]

View File

@ -0,0 +1,9 @@
[[modules-aggregations-matrix]]
== Matrix Aggregations
experimental[]
The aggregations in this family operate on multiple fields and produce a matrix result based on the values extracted from
the requested document fields. Unlike metric and bucket aggregations, this aggregation family does not yet support scripting.
include::aggregations/matrix/stats.asciidoc[]

View File

@ -0,0 +1,112 @@
[[modules-matrix-aggregations-stats]]
=== Matrix Stats
The `matrix_stats` aggregation is a numeric aggregation that computes the following statistics over a set of document fields:
[horizontal]
`count`:: Number of per field samples included in the calculation.
`mean`:: The average value for each field.
`variance`:: Per field Measurement for how spread out the samples are from the mean.
`skewness`:: Per field measurement quantifying the asymmetric distribution around the mean.
`kurtosis`:: Per field measurement quantifying the shape of the distribution.
`covariance`:: A matrix that quantitatively describes how changes in one field are associated with another.
`correlation`:: The covariance matrix scaled to a range of -1 to 1, inclusive. Describes the relationship between field
distributions.
The following example demonstrates the use of matrix stats to describe the relationship between income and poverty.
[source,js]
--------------------------------------------------
{
"aggs": {
"matrixstats": {
"matrix_stats": {
"fields": ["poverty", "income"]
}
}
}
}
--------------------------------------------------
The aggregation type is `matrix_stats` and the `fields` setting defines the set of fields (as an array) for computing
the statistics. The above request returns the following response:
[source,js]
--------------------------------------------------
{
...
"aggregations": {
"matrixstats": {
"fields": [{
"name": "income",
"count": 50,
"mean": 51985.1,
"variance": 7.383377037755103E7,
"skewness": 0.5595114003506483,
"kurtosis": 2.5692365287787124,
"covariance": {
"income": 7.383377037755103E7,
"poverty": -21093.65836734694
},
"correlation": {
"income": 1.0,
"poverty": -0.8352655256272504
}
}, {
"name": "poverty",
"count": 50,
"mean": 12.732000000000001,
"variance": 8.637730612244896,
"skewness": 0.4516049811903419,
"kurtosis": 2.8615929677997767,
"covariance": {
"income": -21093.65836734694,
"poverty": 8.637730612244896
},
"correlation": {
"income": -0.8352655256272504,
"poverty": 1.0
}
}]
}
}
}
--------------------------------------------------
==== Multi Value Fields
The `matrix_stats` aggregation treats each document field as an independent sample. The `mode` parameter controls what
array value the aggregation will use for array or multi-valued fields. This parameter can take one of the following:
[horizontal]
`avg`:: (default) Use the average of all values.
`min`:: Pick the lowest value.
`max`:: Pick the highest value.
`sum`:: Use the sum of all values.
`median`:: Use the median of all values.
==== Missing Values
The `missing` parameter defines how documents that are missing a value should be treated.
By default they will be ignored but it is also possible to treat them as if they had a value.
This is done by adding a set of fieldname : value mappings to specify default values per field.
[source,js]
--------------------------------------------------
{
"aggs": {
"matrixstats": {
"matrix_stats": {
"fields": ["poverty", "income"],
"missing": {"income" : 50000} <1>
}
}
}
}
--------------------------------------------------
<1> Documents without a value in the `income` field will have the default value `50000`.
==== Script
This aggregation family does not yet support scripting.

View File

@ -21,15 +21,3 @@ esplugin {
description 'Adds aggregations whose input are a list of numeric fields and output includes a matrix.'
classname 'org.elasticsearch.search.aggregations.matrix.MatrixAggregationPlugin'
}
dependencies {
testCompile project(path: ':plugins:lang-javascript', configuration: 'runtime')
}
integTest {
cluster {
plugin 'lang-javascript', project(':plugins:lang-javascript')
setting 'script.inline', 'true'
setting 'script.stored', 'true'
}
}

View File

@ -19,7 +19,7 @@
package org.elasticsearch.search.aggregations;
import org.elasticsearch.search.aggregations.matrix.stats.MatrixStats;
import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregatorBuilder;
import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregationBuilder;
/**
*/
@ -27,7 +27,7 @@ public class MatrixStatsAggregationBuilders {
/**
* Create a new {@link MatrixStats} aggregation with the given name.
*/
public static MatrixStatsAggregatorBuilder matrixStats(String name) {
return new MatrixStatsAggregatorBuilder(name);
public static MatrixStatsAggregationBuilder matrixStats(String name) {
return new MatrixStatsAggregationBuilder(name);
}
}

View File

@ -22,15 +22,12 @@ package org.elasticsearch.search.aggregations.matrix;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.matrix.stats.InternalMatrixStats;
import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregatorBuilder;
import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregationBuilder;
import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsParser;
import java.io.IOException;
public class MatrixAggregationPlugin extends Plugin {
static {
InternalMatrixStats.registerStreams();
}
public MatrixAggregationPlugin() throws IOException {
}
@ -46,7 +43,8 @@ public class MatrixAggregationPlugin extends Plugin {
}
public void onModule(SearchModule searchModule) {
searchModule.registerAggregation(MatrixStatsAggregatorBuilder::new, new MatrixStatsParser(),
MatrixStatsAggregatorBuilder.AGGREGATION_NAME_FIELD);
InternalMatrixStats.registerStreams();
searchModule.registerAggregation(MatrixStatsAggregationBuilder::new, new MatrixStatsParser(),
MatrixStatsAggregationBuilder.AGGREGATION_NAME_FIELD);
}
}

View File

@ -27,7 +27,9 @@ import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Computes distribution statistics over multiple fields
@ -70,11 +72,13 @@ public class InternalMatrixStats extends InternalMetricsAggregation implements M
return TYPE;
}
/** get the number of documents */
@Override
public long getDocCount() {
return stats.docCount;
}
/** get the number of samples for the given field. == docCount - numMissing */
@Override
public long getFieldCount(String field) {
if (results == null) {
@ -83,65 +87,63 @@ public class InternalMatrixStats extends InternalMetricsAggregation implements M
return results.getFieldCount(field);
}
/** get the mean for the given field */
@Override
public Double getMean(String field) {
public double getMean(String field) {
if (results == null) {
return null;
return Double.NaN;
}
return results.getMean(field);
}
/** get the variance for the given field */
@Override
public Double getVariance(String field) {
public double getVariance(String field) {
if (results == null) {
return null;
return Double.NaN;
}
return results.getVariance(field);
}
/** get the distribution skewness for the given field */
@Override
public Double getSkewness(String field) {
public double getSkewness(String field) {
if (results == null) {
return null;
return Double.NaN;
}
return results.getSkewness(field);
}
/** get the distribution shape for the given field */
@Override
public Double getKurtosis(String field) {
public double getKurtosis(String field) {
if (results == null) {
return null;
return Double.NaN;
}
return results.getKurtosis(field);
}
/** get the covariance between the two fields */
@Override
public Double getCovariance(String fieldX, String fieldY) {
public double getCovariance(String fieldX, String fieldY) {
if (results == null) {
return null;
return Double.NaN;
}
return results.getCovariance(fieldX, fieldY);
}
/** get the correlation between the two fields */
@Override
public Map<String, HashMap<String, Double>> getCovariance() {
return results.getCovariances();
}
@Override
public Double getCorrelation(String fieldX, String fieldY) {
public double getCorrelation(String fieldX, String fieldY) {
if (results == null) {
return null;
return Double.NaN;
}
return results.getCorrelation(fieldX, fieldY);
}
@Override
public Map<String, HashMap<String, Double>> getCorrelation() {
return results.getCorrelations();
}
static class Fields {
public static final String FIELDS = "fields";
public static final String NAME = "name";
public static final String COUNT = "count";
public static final String MEAN = "mean";
public static final String VARIANCE = "variance";
@ -153,30 +155,38 @@ public class InternalMatrixStats extends InternalMetricsAggregation implements M
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
if (results != null) {
Set<String> fieldNames = results.getFieldCounts().keySet();
builder.field("field", fieldNames);
builder.field(Fields.COUNT, results.getFieldCounts().values());
builder.field(Fields.MEAN, results.getMeans().values());
builder.field(Fields.VARIANCE, results.getVariances().values());
builder.field(Fields.SKEWNESS, results.getSkewness().values());
builder.field(Fields.KURTOSIS, results.getKurtosis().values());
ArrayList<ArrayList<Double>> cov = new ArrayList<>(fieldNames.size());
ArrayList<ArrayList<Double>> cor = new ArrayList<>(fieldNames.size());
for (String y : fieldNames) {
ArrayList<Double> covRow = new ArrayList<>(fieldNames.size());
ArrayList<Double> corRow = new ArrayList<>(fieldNames.size());
for (String x : fieldNames) {
covRow.add(results.getCovariance(x, y));
corRow.add(results.getCorrelation(x, y));
if (results != null && results.getFieldCounts().keySet().isEmpty() == false) {
builder.startArray(Fields.FIELDS);
for (String fieldName : results.getFieldCounts().keySet()) {
builder.startObject();
// name
builder.field(Fields.NAME, fieldName);
// count
builder.field(Fields.COUNT, results.getFieldCount(fieldName));
// mean
builder.field(Fields.MEAN, results.getMean(fieldName));
// variance
builder.field(Fields.VARIANCE, results.getVariance(fieldName));
// skewness
builder.field(Fields.SKEWNESS, results.getSkewness(fieldName));
// kurtosis
builder.field(Fields.KURTOSIS, results.getKurtosis(fieldName));
// covariance
builder.startObject(Fields.COVARIANCE);
for (String fieldB : results.getFieldCounts().keySet()) {
builder.field(fieldB, results.getCovariance(fieldName, fieldB));
}
cov.add(covRow);
cor.add(corRow);
builder.endObject();
// correlation
builder.startObject(Fields.CORRELATION);
for (String fieldB : results.getFieldCounts().keySet()) {
builder.field(fieldB, results.getCorrelation(fieldName, fieldB));
}
builder.endObject();
builder.endObject();
}
builder.field(Fields.COVARIANCE, cov);
builder.field(Fields.CORRELATION, cor);
builder.endArray();
}
return builder;
}
@ -185,11 +195,11 @@ public class InternalMatrixStats extends InternalMetricsAggregation implements M
if (path.isEmpty()) {
return this;
} else if (path.size() == 1) {
String coordinate = path.get(0);
String element = path.get(0);
if (results == null) {
results = MatrixStatsResults.EMPTY();
results = new MatrixStatsResults();
}
switch (coordinate) {
switch (element) {
case "counts":
return results.getFieldCounts();
case "means":
@ -205,7 +215,7 @@ public class InternalMatrixStats extends InternalMetricsAggregation implements M
case "correlation":
return results.getCorrelations();
default:
throw new IllegalArgumentException("Found unknown path element [" + coordinate + "] in [" + getName() + "]");
throw new IllegalArgumentException("Found unknown path element [" + element + "] in [" + getName() + "]");
}
} else {
throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
@ -215,53 +225,35 @@ public class InternalMatrixStats extends InternalMetricsAggregation implements M
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
// write running stats
if (stats == null || stats.docCount == 0) {
out.writeVLong(0);
} else {
out.writeVLong(stats.docCount);
stats.writeTo(out);
}
out.writeOptionalWriteable(stats);
// write results
if (results == null || results.getDocCount() == 0) {
out.writeVLong(0);
} else {
out.writeVLong(results.getDocCount());
results.writeTo(out);
}
out.writeOptionalWriteable(results);
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
// read stats count
final long statsCount = in.readVLong();
if (statsCount > 0) {
stats = new RunningStats(in);
stats.docCount = statsCount;
}
stats = in.readOptionalWriteable(RunningStats::new);
// read count
final long count = in.readVLong();
if (count > 0) {
results = new MatrixStatsResults(in);
}
results = in.readOptionalWriteable(MatrixStatsResults::new);
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
// merge stats across all shards
aggregations.removeIf(p -> ((InternalMatrixStats)p).stats == null);
List<InternalAggregation> aggs = new ArrayList<>(aggregations);
aggs.removeIf(p -> ((InternalMatrixStats)p).stats == null);
// return empty result iff all stats are null
if (aggregations.isEmpty()) {
return new InternalMatrixStats(name, 0, null, MatrixStatsResults.EMPTY(), pipelineAggregators(), getMetaData());
if (aggs.isEmpty()) {
return new InternalMatrixStats(name, 0, null, new MatrixStatsResults(), pipelineAggregators(), getMetaData());
}
RunningStats runningStats = ((InternalMatrixStats) aggregations.get(0)).stats;
for (int i=1; i < aggregations.size(); ++i) {
runningStats.merge(((InternalMatrixStats) aggregations.get(i)).stats);
RunningStats runningStats = new RunningStats();
for (int i=0; i < aggs.size(); ++i) {
runningStats.merge(((InternalMatrixStats) aggs.get(i)).stats);
}
MatrixStatsResults results = new MatrixStatsResults(stats);
MatrixStatsResults results = new MatrixStatsResults(runningStats);
return new InternalMatrixStats(name, results.getDocCount(), runningStats, results, pipelineAggregators(), getMetaData());
}

View File

@ -20,9 +20,6 @@ package org.elasticsearch.search.aggregations.matrix.stats;
import org.elasticsearch.search.aggregations.Aggregation;
import java.util.HashMap;
import java.util.Map;
/**
* Interface for MatrixStats Metric Aggregation
*/
@ -32,19 +29,15 @@ public interface MatrixStats extends Aggregation {
/** return total field count (differs from docCount if there are missing values) */
long getFieldCount(String field);
/** return the field mean */
Double getMean(String field);
double getMean(String field);
/** return the field variance */
Double getVariance(String field);
double getVariance(String field);
/** return the skewness of the distribution */
Double getSkewness(String field);
double getSkewness(String field);
/** return the kurtosis of the distribution */
Double getKurtosis(String field);
/** return the upper triangle of the covariance matrix */
Map<String, HashMap<String, Double>> getCovariance();
double getKurtosis(String field);
/** return the covariance between field x and field y */
Double getCovariance(String fieldX, String fieldY);
/** return the upper triangle of the pearson product-moment correlation matrix */
Map<String, HashMap<String, Double>> getCorrelation();
double getCovariance(String fieldX, String fieldY);
/** return the correlation coefficient of field x and field y */
Double getCorrelation(String fieldX, String fieldY);
double getCorrelation(String fieldX, String fieldY);
}

View File

@ -23,10 +23,11 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregatorBuilder;
import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
@ -38,19 +39,21 @@ import java.util.Map;
/**
*/
public class MatrixStatsAggregatorBuilder
extends MultiValuesSourceAggregatorBuilder.LeafOnly<ValuesSource.Numeric, MatrixStatsAggregatorBuilder> {
public class MatrixStatsAggregationBuilder
extends MultiValuesSourceAggregationBuilder.LeafOnly<ValuesSource.Numeric, MatrixStatsAggregationBuilder> {
public static final String NAME = InternalMatrixStats.TYPE.name();
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
public MatrixStatsAggregatorBuilder(String name) {
private MultiValueMode multiValueMode = MultiValueMode.AVG;
public MatrixStatsAggregationBuilder(String name) {
super(name, InternalMatrixStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
}
/**
* Read from a stream.
*/
public MatrixStatsAggregatorBuilder(StreamInput in) throws IOException {
public MatrixStatsAggregationBuilder(StreamInput in) throws IOException {
super(in, InternalMatrixStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
}
@ -59,14 +62,24 @@ public class MatrixStatsAggregatorBuilder
// Do nothing, no extra state to write to stream
}
public MatrixStatsAggregationBuilder multiValueMode(MultiValueMode multiValueMode) {
this.multiValueMode = multiValueMode;
return this;
}
public MultiValueMode multiValueMode() {
return this.multiValueMode;
}
@Override
protected MatrixStatsAggregatorFactory innerBuild(AggregationContext context, Map<String, ValuesSourceConfig<Numeric>> configs,
AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder) throws IOException {
return new MatrixStatsAggregatorFactory(name, type, configs, context, parent, subFactoriesBuilder, metaData);
AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder) throws IOException {
return new MatrixStatsAggregatorFactory(name, type, configs, multiValueMode, context, parent, subFactoriesBuilder, metaData);
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field(MULTIVALUE_MODE_FIELD.getPreferredName(), multiValueMode);
return builder;
}

View File

@ -22,7 +22,8 @@ import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
@ -30,11 +31,10 @@ import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.MultiValuesSource.NumericMultiValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -43,89 +43,75 @@ import java.util.Map;
**/
public class MatrixStatsAggregator extends MetricsAggregator {
/** Multiple ValuesSource with field names */
final Map<String, ValuesSource.Numeric> valuesSources;
final NumericMultiValuesSource valuesSources;
/** array of descriptive stats, per shard, needed to compute the correlation */
ObjectArray<RunningStats> stats;
public MatrixStatsAggregator(String name, Map<String, ValuesSource.Numeric> valuesSources, AggregationContext context,
Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Aggregator parent, MultiValueMode multiValueMode, List<PipelineAggregator> pipelineAggregators,
Map<String,Object> metaData) throws IOException {
super(name, context, parent, pipelineAggregators, metaData);
this.valuesSources = valuesSources;
if (valuesSources != null && !valuesSources.isEmpty()) {
this.valuesSources = new NumericMultiValuesSource(valuesSources, multiValueMode);
stats = context.bigArrays().newObjectArray(1);
} else {
this.valuesSources = null;
}
}
@Override
public boolean needsScores() {
boolean needsScores = false;
if (valuesSources != null) {
for (Map.Entry<String, ValuesSource.Numeric> valueSource : valuesSources.entrySet()) {
needsScores |= valueSource.getValue().needsScores();
}
}
return needsScores;
return (valuesSources == null) ? false : valuesSources.needsScores();
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSources == null || valuesSources.isEmpty()) {
if (valuesSources == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
final HashMap<String, SortedNumericDoubleValues> values = new HashMap<>(valuesSources.size());
for (Map.Entry<String, ValuesSource.Numeric> valuesSource : valuesSources.entrySet()) {
values.put(valuesSource.getKey(), valuesSource.getValue().doubleValues(ctx));
final NumericDoubleValues[] values = new NumericDoubleValues[valuesSources.fieldNames().length];
for (int i = 0; i < values.length; ++i) {
values[i] = valuesSources.getField(i, ctx);
}
return new LeafBucketCollectorBase(sub, values) {
final String[] fieldNames = valuesSources.fieldNames();
final double[] fieldVals = new double[fieldNames.length];
@Override
public void collect(int doc, long bucket) throws IOException {
// get fields
Map<String, Double> fields = getFields(doc);
if (fields != null) {
if (includeDocument(doc) == true) {
stats = bigArrays.grow(stats, bucket + 1);
RunningStats stat = stats.get(bucket);
// add document fields to correlation stats
if (stat == null) {
stat = new RunningStats(fields);
stat = new RunningStats(fieldNames, fieldVals);
stats.set(bucket, stat);
} else {
stat.add(fields);
stat.add(fieldNames, fieldVals);
}
stats.set(bucket, stat);
}
}
/**
* return a map of field names and data
*/
private Map<String, Double> getFields(int doc) {
// get fieldNames to use as hash keys
ArrayList<String> fieldNames = new ArrayList<>(values.keySet());
HashMap<String, Double> fields = new HashMap<>(fieldNames.size());
private boolean includeDocument(int doc) {
// loop over fields
for (String fieldName : fieldNames) {
final SortedNumericDoubleValues doubleValues = values.get(fieldName);
doubleValues.setDocument(doc);
final int valuesCount = doubleValues.count();
// if document contains an empty field we omit the doc from the correlation
if (valuesCount <= 0) {
return null;
for (int i = 0; i < fieldVals.length; ++i) {
final NumericDoubleValues doubleValues = values[i];
final double value = doubleValues.get(doc);
// skip if value is missing
if (value == Double.NEGATIVE_INFINITY) {
return false;
}
// get the field value (multi-value is the average of all the values)
double fieldValue = 0;
for (int i = 0; i < valuesCount; ++i) {
if (Double.isNaN(doubleValues.valueAt(i)) == false) {
fieldValue += doubleValues.valueAt(i);
}
}
fields.put(fieldName, fieldValue / valuesCount);
fieldVals[i] = value;
}
return fields;
return true;
}
};
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations.matrix.stats;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
@ -37,23 +38,26 @@ import java.util.Map;
public class MatrixStatsAggregatorFactory
extends MultiValuesSourceAggregatorFactory<ValuesSource.Numeric, MatrixStatsAggregatorFactory> {
private final MultiValueMode multiValueMode;
public MatrixStatsAggregatorFactory(String name, InternalAggregation.Type type,
Map<String, ValuesSourceConfig<ValuesSource.Numeric>> configs, AggregationContext context,
AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metaData) throws IOException {
Map<String, ValuesSourceConfig<ValuesSource.Numeric>> configs, MultiValueMode multiValueMode,
AggregationContext context, AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metaData) throws IOException {
super(name, type, configs, context, parent, subFactoriesBuilder, metaData);
this.multiValueMode = multiValueMode;
}
@Override
protected Aggregator createUnmapped(Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
return new MatrixStatsAggregator(name, null, context, parent, pipelineAggregators, metaData);
return new MatrixStatsAggregator(name, null, context, parent, multiValueMode, pipelineAggregators, metaData);
}
@Override
protected Aggregator doCreateInternal(Map<String, ValuesSource.Numeric> valuesSources, Aggregator parent,
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new MatrixStatsAggregator(name, valuesSources, context, parent, pipelineAggregators, metaData);
return new MatrixStatsAggregator(name, valuesSources, context, parent, multiValueMode, pipelineAggregators, metaData);
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.matrix.stats;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.aggregations.support.MultiValuesSourceParser.NumericValuesSourceParser;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
@ -28,23 +29,36 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregationBuilder.MULTIVALUE_MODE_FIELD;
/**
*/
public class MatrixStatsParser extends NumericValuesSourceParser {
public MatrixStatsParser() {
super(true, true, false);
super(true);
}
@Override
protected boolean token(String aggregationName, String currentFieldName, XContentParser.Token token, XContentParser parser,
ParseFieldMatcher parseFieldMatcher, Map<ParseField, Object> otherOptions) throws IOException {
if (parseFieldMatcher.match(currentFieldName, MULTIVALUE_MODE_FIELD)) {
if (token == XContentParser.Token.VALUE_STRING) {
otherOptions.put(MULTIVALUE_MODE_FIELD, parser.text());
return true;
}
}
return false;
}
@Override
protected MatrixStatsAggregatorBuilder createFactory(String aggregationName, ValuesSourceType valuesSourceType,
ValueType targetValueType, Map<ParseField, Object> otherOptions) {
return new MatrixStatsAggregatorBuilder(aggregationName);
protected MatrixStatsAggregationBuilder createFactory(String aggregationName, ValuesSourceType valuesSourceType,
ValueType targetValueType, Map<ParseField, Object> otherOptions) {
MatrixStatsAggregationBuilder builder = new MatrixStatsAggregationBuilder(aggregationName);
String mode = (String)otherOptions.get(MULTIVALUE_MODE_FIELD);
if (mode != null) {
builder.multiValueMode(MultiValueMode.fromString(mode));
}
return builder;
}
}

View File

@ -21,7 +21,7 @@ package org.elasticsearch.search.aggregations.matrix.stats;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.Collections;
@ -34,42 +34,43 @@ import java.util.Map;
*
* @internal
*/
class MatrixStatsResults implements Streamable {
class MatrixStatsResults implements Writeable {
/** object holding results - computes results in place */
final RunningStats results;
final protected RunningStats results;
/** pearson product correlation coefficients */
protected HashMap<String, HashMap<String, Double>> correlation;
final protected Map<String, HashMap<String, Double>> correlation;
/** Base ctor */
private MatrixStatsResults() {
results = RunningStats.EMPTY();
public MatrixStatsResults() {
results = new RunningStats();
this.correlation = new HashMap<>();
}
/** creates and computes result from provided stats */
public MatrixStatsResults(RunningStats stats) {
try {
this.results = stats.clone();
this.correlation = new HashMap<>();
} catch (CloneNotSupportedException e) {
throw new ElasticsearchException("Error trying to create multifield_stats results", e);
}
this.results = stats.clone();
this.correlation = new HashMap<>();
this.compute();
}
/** creates a results object from the given stream */
@SuppressWarnings("unchecked")
protected MatrixStatsResults(StreamInput in) {
try {
results = new RunningStats(in);
this.readFrom(in);
correlation = (Map<String, HashMap<String, Double>>) in.readGenericValue();
} catch (IOException e) {
throw new ElasticsearchException("Error trying to create multifield_stats results from stream input", e);
}
}
/** create an empty results object **/
protected static MatrixStatsResults EMPTY() {
return new MatrixStatsResults();
/** Marshalls MatrixStatsResults */
@Override
public void writeTo(StreamOutput out) throws IOException {
// marshall results
results.writeTo(out);
// marshall correlation
out.writeGenericValue(correlation);
}
/** return document count */
@ -77,8 +78,8 @@ class MatrixStatsResults implements Streamable {
return results.docCount;
}
/** return the field counts */
public Map<String, Long> getFieldCounts() {
/** return the field counts - not public, used for getProperty() */
protected Map<String, Long> getFieldCounts() {
return Collections.unmodifiableMap(results.counts);
}
@ -90,60 +91,66 @@ class MatrixStatsResults implements Streamable {
return results.counts.get(field);
}
/** return the means */
public Map<String, Double> getMeans() {
/** return the means - not public, used for getProperty() */
protected Map<String, Double> getMeans() {
return Collections.unmodifiableMap(results.means);
}
/** return the mean for the requested field */
public Double getMean(String field) {
public double getMean(String field) {
checkField(field, results.means);
return results.means.get(field);
}
/** return the variances */
public Map<String, Double> getVariances() {
/** return the variances - not public, used for getProperty() */
protected Map<String, Double> getVariances() {
return Collections.unmodifiableMap(results.variances);
}
/** return the variance for the requested field */
public Double getVariance(String field) {
public double getVariance(String field) {
checkField(field, results.variances);
return results.variances.get(field);
}
/** return the skewness */
public Map<String, Double> getSkewness() {
/** return the skewness - not public, used for getProperty() */
protected Map<String, Double> getSkewness() {
return Collections.unmodifiableMap(results.skewness);
}
/** return the skewness for the requested field */
public Double getSkewness(String field) {
public double getSkewness(String field) {
checkField(field, results.skewness);
return results.skewness.get(field);
}
/** return the kurtosis */
public Map<String, Double> getKurtosis() {
protected Map<String, Double> getKurtosis() {
return Collections.unmodifiableMap(results.kurtosis);
}
/** return the kurtosis for the requested field */
public Double getKurtosis(String field) {
public double getKurtosis(String field) {
checkField(field, results.kurtosis);
return results.kurtosis.get(field);
}
/** return the covariances */
public Map<String, HashMap<String, Double>> getCovariances() {
/** return the covariances as a map - not public, used for getProperty() */
protected Map<String, HashMap<String, Double>> getCovariances() {
return Collections.unmodifiableMap(results.covariances);
}
/** return the covariance between two fields */
public Double getCovariance(String fieldX, String fieldY) {
public double getCovariance(String fieldX, String fieldY) {
if (fieldX.equals(fieldY)) {
checkField(fieldX, results.variances);
return results.variances.get(fieldX);
}
return getValFromUpperTriangularMatrix(results.covariances, fieldX, fieldY);
}
public Map<String, HashMap<String, Double>> getCorrelations() {
/** return the correlations as a map - not public, used for getProperty() */
protected Map<String, HashMap<String, Double>> getCorrelations() {
return Collections.unmodifiableMap(correlation);
}
@ -156,10 +163,10 @@ class MatrixStatsResults implements Streamable {
}
/** return the value for two fields in an upper triangular matrix, regardless of row col location. */
private Double getValFromUpperTriangularMatrix(HashMap<String, HashMap<String, Double>> map, String fieldX, String fieldY) {
private double getValFromUpperTriangularMatrix(Map<String, HashMap<String, Double>> map, String fieldX, String fieldY) {
// for the co-value to exist, one of the two (or both) fields has to be a row key
if (map.containsKey(fieldX) == false && map.containsKey(fieldY) == false) {
return null;
throw new IllegalArgumentException("neither field " + fieldX + " nor " + fieldY + " exist");
} else if (map.containsKey(fieldX)) {
// fieldX exists as a row key
if (map.get(fieldX).containsKey(fieldY)) {
@ -176,6 +183,15 @@ class MatrixStatsResults implements Streamable {
throw new IllegalArgumentException("Coefficient not computed between fields: " + fieldX + " and " + fieldY);
}
private void checkField(String field, Map<String, ?> map) {
if (field == null) {
throw new IllegalArgumentException("field name cannot be null");
}
if (map.containsKey(field) == false) {
throw new IllegalArgumentException("field " + field + " does not exist");
}
}
/** Computes final covariance, variance, and correlation */
private void compute() {
final double nM1 = results.docCount - 1D;
@ -214,29 +230,4 @@ class MatrixStatsResults implements Streamable {
correlation.put(rowName, corRow);
}
}
/** Unmarshalls MatrixStatsResults */
@Override
@SuppressWarnings("unchecked")
public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
correlation = (HashMap<String, HashMap<String, Double>>) (in.readGenericValue());
} else {
correlation = null;
}
}
/** Marshalls MatrixStatsResults */
@Override
public void writeTo(StreamOutput out) throws IOException {
// marshall results
results.writeTo(out);
// marshall correlation
if (correlation != null) {
out.writeBoolean(true);
out.writeGenericValue(correlation);
} else {
out.writeBoolean(false);
}
}
}

View File

@ -18,12 +18,14 @@
*/
package org.elasticsearch.search.aggregations.matrix.stats;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@ -35,7 +37,7 @@ import java.util.Map;
*
* @internal
*/
public class RunningStats implements Streamable, Cloneable {
public class RunningStats implements Writeable, Cloneable {
/** count of observations (same number of observations per field) */
protected long docCount = 0;
/** per field sum of observations */
@ -53,20 +55,14 @@ public class RunningStats implements Streamable, Cloneable {
/** covariance values */
protected HashMap<String, HashMap<String, Double>> covariances;
private RunningStats() {
public RunningStats() {
init();
}
/** Ctor to create an instance of running statistics */
public RunningStats(StreamInput in) throws IOException {
this();
this.readFrom(in);
}
public RunningStats(Map<String, Double> doc) {
if (doc != null && doc.isEmpty() == false) {
public RunningStats(final String[] fieldNames, final double[] fieldVals) {
if (fieldVals != null && fieldVals.length > 0) {
init();
this.add(doc);
this.add(fieldNames, fieldVals);
}
}
@ -80,15 +76,52 @@ public class RunningStats implements Streamable, Cloneable {
variances = new HashMap<>();
}
/** create an empty instance */
protected static RunningStats EMPTY() {
return new RunningStats();
/** Ctor to create an instance of running statistics */
@SuppressWarnings("unchecked")
public RunningStats(StreamInput in) throws IOException {
this();
// read fieldSum
fieldSum = (HashMap<String, Double>)in.readGenericValue();
// counts
counts = (HashMap<String, Long>)in.readGenericValue();
// means
means = (HashMap<String, Double>)in.readGenericValue();
// variances
variances = (HashMap<String, Double>)in.readGenericValue();
// skewness
skewness = (HashMap<String, Double>)in.readGenericValue();
// kurtosis
kurtosis = (HashMap<String, Double>)in.readGenericValue();
// read covariances
covariances = (HashMap<String, HashMap<String, Double>>)in.readGenericValue();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
// marshall fieldSum
out.writeGenericValue(fieldSum);
// counts
out.writeGenericValue(counts);
// mean
out.writeGenericValue(means);
// variances
out.writeGenericValue(variances);
// skewness
out.writeGenericValue(skewness);
// kurtosis
out.writeGenericValue(kurtosis);
// covariances
out.writeGenericValue(covariances);
}
/** updates running statistics with a documents field values **/
public void add(Map<String, Double> doc) {
if (doc == null || doc.isEmpty()) {
return;
public void add(final String[] fieldNames, final double[] fieldVals) {
if (fieldNames == null) {
throw new IllegalArgumentException("Cannot add statistics without field names.");
} else if (fieldVals == null) {
throw new IllegalArgumentException("Cannot add statistics without field values.");
} else if (fieldNames.length != fieldVals.length) {
throw new IllegalArgumentException("Number of field values do not match number of field names.");
}
// update total, mean, and variance
@ -98,9 +131,9 @@ public class RunningStats implements Streamable, Cloneable {
double m1, m2, m3, m4; // moments
double d, dn, dn2, t1;
final HashMap<String, Double> deltas = new HashMap<>();
for (Map.Entry<String, Double> field : doc.entrySet()) {
fieldName = field.getKey();
fieldValue = field.getValue();
for (int i = 0; i < fieldNames.length; ++i) {
fieldName = fieldNames[i];
fieldValue = fieldVals[i];
// update counts
counts.put(fieldName, 1 + (counts.containsKey(fieldName) ? counts.get(fieldName) : 0));
@ -133,17 +166,17 @@ public class RunningStats implements Streamable, Cloneable {
}
}
this.updateCovariance(doc, deltas);
this.updateCovariance(fieldNames, deltas);
}
/** Update covariance matrix */
private void updateCovariance(final Map<String, Double> doc, final Map<String, Double> deltas) {
private void updateCovariance(final String[] fieldNames, final Map<String, Double> deltas) {
// deep copy of hash keys (field names)
ArrayList<String> cFieldNames = new ArrayList<>(doc.keySet());
ArrayList<String> cFieldNames = new ArrayList<>(Arrays.asList(fieldNames));
String fieldName;
double dR, newVal;
for (Map.Entry<String, Double> field : doc.entrySet()) {
fieldName = field.getKey();
for (int i = 0; i < fieldNames.length; ++i) {
fieldName = fieldNames[i];
cFieldNames.remove(fieldName);
// update running covariances
dR = deltas.get(fieldName);
@ -170,6 +203,21 @@ public class RunningStats implements Streamable, Cloneable {
public void merge(final RunningStats other) {
if (other == null) {
return;
} else if (this.docCount == 0) {
for (Map.Entry<String, Double> fs : other.means.entrySet()) {
final String fieldName = fs.getKey();
this.means.put(fieldName, fs.getValue().doubleValue());
this.counts.put(fieldName, other.counts.get(fieldName).longValue());
this.fieldSum.put(fieldName, other.fieldSum.get(fieldName).doubleValue());
this.variances.put(fieldName, other.variances.get(fieldName).doubleValue());
this.skewness.put(fieldName , other.skewness.get(fieldName).doubleValue());
this.kurtosis.put(fieldName, other.kurtosis.get(fieldName).doubleValue());
if (other.covariances.containsKey(fieldName) == true) {
this.covariances.put(fieldName, other.covariances.get(fieldName));
}
this.docCount = other.docCount;
}
return;
}
final double nA = docCount;
final double nB = other.docCount;
@ -226,7 +274,7 @@ public class RunningStats implements Streamable, Cloneable {
}
/** Merges two covariance matrices */
private void mergeCovariance(final RunningStats other, final HashMap<String, Double> deltas) {
private void mergeCovariance(final RunningStats other, final Map<String, Double> deltas) {
final double countA = docCount - other.docCount;
double f, dR, newVal;
for (Map.Entry<String, Double> fs : other.means.entrySet()) {
@ -252,107 +300,11 @@ public class RunningStats implements Streamable, Cloneable {
}
@Override
public void writeTo(StreamOutput out) throws IOException {
// marshall fieldSum
if (fieldSum != null) {
out.writeBoolean(true);
out.writeGenericValue(fieldSum);
} else {
out.writeBoolean(false);
public RunningStats clone() {
try {
return (RunningStats) super.clone();
} catch (CloneNotSupportedException e) {
throw new ElasticsearchException("Error trying to create a copy of RunningStats");
}
// counts
if (counts != null) {
out.writeBoolean(true);
out.writeGenericValue(counts);
} else {
out.writeBoolean(false);
}
// mean
if (means != null) {
out.writeBoolean(true);
out.writeGenericValue(means);
} else {
out.writeBoolean(false);
}
// variances
if (variances != null) {
out.writeBoolean(true);
out.writeGenericValue(variances);
} else {
out.writeBoolean(false);
}
// skewness
if (skewness != null) {
out.writeBoolean(true);
out.writeGenericValue(skewness);
} else {
out.writeBoolean(false);
}
// kurtosis
if (kurtosis != null) {
out.writeBoolean(true);
out.writeGenericValue(kurtosis);
} else {
out.writeBoolean(false);
}
// covariances
if (covariances != null) {
out.writeBoolean(true);
out.writeGenericValue(covariances);
} else {
out.writeBoolean(false);
}
}
@Override
@SuppressWarnings("unchecked")
public void readFrom(StreamInput in) throws IOException {
// read fieldSum
if (in.readBoolean()) {
fieldSum = (HashMap<String, Double>)(in.readGenericValue());
} else {
fieldSum = null;
}
// counts
if (in.readBoolean()) {
counts = (HashMap<String, Long>)(in.readGenericValue());
} else {
counts = null;
}
// means
if (in.readBoolean()) {
means = (HashMap<String, Double>)(in.readGenericValue());
} else {
means = null;
}
// variances
if (in.readBoolean()) {
variances = (HashMap<String, Double>)(in.readGenericValue());
} else {
variances = null;
}
// skewness
if (in.readBoolean()) {
skewness = (HashMap<String, Double>)(in.readGenericValue());
} else {
skewness = null;
}
// kurtosis
if (in.readBoolean()) {
kurtosis = (HashMap<String, Double>)(in.readGenericValue());
} else {
kurtosis = null;
}
// read covariances
if (in.readBoolean()) {
covariances = (HashMap<String, HashMap<String, Double>>) (in.readGenericValue());
} else {
covariances = null;
}
}
@Override
public RunningStats clone() throws CloneNotSupportedException {
return (RunningStats)super.clone();
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.support;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;
import java.util.Map;
/**
* Class to encapsulate a set of ValuesSource objects labeled by field name
*/
public abstract class MultiValuesSource <VS extends ValuesSource> {
protected MultiValueMode multiValueMode;
protected String[] names;
protected VS[] values;
public static class NumericMultiValuesSource extends MultiValuesSource<ValuesSource.Numeric> {
public NumericMultiValuesSource(Map<String, ValuesSource.Numeric> valuesSources, MultiValueMode multiValueMode) {
super(valuesSources, multiValueMode);
if (valuesSources != null) {
this.values = valuesSources.values().toArray(new ValuesSource.Numeric[0]);
} else {
this.values = new ValuesSource.Numeric[0];
}
}
public NumericDoubleValues getField(final int ordinal, LeafReaderContext ctx) throws IOException {
if (ordinal > names.length) {
throw new IndexOutOfBoundsException("ValuesSource array index " + ordinal + " out of bounds");
}
return multiValueMode.select(values[ordinal].doubleValues(ctx), Double.NEGATIVE_INFINITY);
}
}
public static class BytesMultiValuesSource extends MultiValuesSource<ValuesSource.Bytes> {
public BytesMultiValuesSource(Map<String, ValuesSource.Bytes> valuesSources, MultiValueMode multiValueMode) {
super(valuesSources, multiValueMode);
this.values = valuesSources.values().toArray(new ValuesSource.Bytes[0]);
}
public Object getField(final int ordinal, LeafReaderContext ctx) throws IOException {
return values[ordinal].bytesValues(ctx);
}
}
public static class GeoPointValuesSource extends MultiValuesSource<ValuesSource.GeoPoint> {
public GeoPointValuesSource(Map<String, ValuesSource.GeoPoint> valuesSources, MultiValueMode multiValueMode) {
super(valuesSources, multiValueMode);
this.values = valuesSources.values().toArray(new ValuesSource.GeoPoint[0]);
}
}
private MultiValuesSource(Map<String, ?> valuesSources, MultiValueMode multiValueMode) {
if (valuesSources != null) {
this.names = valuesSources.keySet().toArray(new String[0]);
}
this.multiValueMode = multiValueMode;
}
public boolean needsScores() {
boolean needsScores = false;
for (ValuesSource value : values) {
needsScores |= value.needsScores();
}
return needsScores;
}
public String[] fieldNames() {
return this.names;
}
}

View File

@ -0,0 +1,366 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.support;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexGeoPointFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationInitializationException;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
*
*/
public abstract class MultiValuesSourceAggregationBuilder<VS extends ValuesSource, AB extends MultiValuesSourceAggregationBuilder<VS, AB>>
extends AggregationBuilder<AB> {
public static final ParseField MULTIVALUE_MODE_FIELD = new ParseField("mode");
public static abstract class LeafOnly<VS extends ValuesSource, AB extends MultiValuesSourceAggregationBuilder<VS, AB>>
extends MultiValuesSourceAggregationBuilder<VS, AB> {
protected LeafOnly(String name, Type type, ValuesSourceType valuesSourceType, ValueType targetValueType) {
super(name, type, valuesSourceType, targetValueType);
}
/**
* Read from a stream that does not serialize its targetValueType. This should be used by most subclasses.
*/
protected LeafOnly(StreamInput in, Type type, ValuesSourceType valuesSourceType, ValueType targetValueType) throws IOException {
super(in, type, valuesSourceType, targetValueType);
}
/**
* Read an aggregation from a stream that serializes its targetValueType. This should only be used by subclasses that override
* {@link #serializeTargetValueType()} to return true.
*/
protected LeafOnly(StreamInput in, Type type, ValuesSourceType valuesSourceType) throws IOException {
super(in, type, valuesSourceType);
}
@Override
public AB subAggregations(Builder subFactories) {
throw new AggregationInitializationException("Aggregator [" + name + "] of type [" +
type + "] cannot accept sub-aggregations");
}
}
private final ValuesSourceType valuesSourceType;
private final ValueType targetValueType;
private List<String> fields = Collections.emptyList();
private ValueType valueType = null;
private String format = null;
private Object missing = null;
private Map<String, Object> missingMap = Collections.emptyMap();
protected MultiValuesSourceAggregationBuilder(String name, Type type, ValuesSourceType valuesSourceType, ValueType targetValueType) {
super(name, type);
if (valuesSourceType == null) {
throw new IllegalArgumentException("[valuesSourceType] must not be null: [" + name + "]");
}
this.valuesSourceType = valuesSourceType;
this.targetValueType = targetValueType;
}
protected MultiValuesSourceAggregationBuilder(StreamInput in, Type type, ValuesSourceType valuesSourceType, ValueType targetValueType)
throws IOException {
super(in, type);
assert false == serializeTargetValueType() : "Wrong read constructor called for subclass that provides its targetValueType";
this.valuesSourceType = valuesSourceType;
this.targetValueType = targetValueType;
read(in);
}
protected MultiValuesSourceAggregationBuilder(StreamInput in, Type type, ValuesSourceType valuesSourceType) throws IOException {
super(in, type);
assert serializeTargetValueType() : "Wrong read constructor called for subclass that serializes its targetValueType";
this.valuesSourceType = valuesSourceType;
this.targetValueType = in.readOptionalWriteable(ValueType::readFromStream);
read(in);
}
/**
* Read from a stream.
*/
@SuppressWarnings("unchecked")
private void read(StreamInput in) throws IOException {
fields = (ArrayList<String>)in.readGenericValue();
valueType = in.readOptionalWriteable(ValueType::readFromStream);
format = in.readOptionalString();
missingMap = in.readMap();
}
@Override
protected final void doWriteTo(StreamOutput out) throws IOException {
if (serializeTargetValueType()) {
out.writeOptionalWriteable(targetValueType);
}
out.writeGenericValue(fields);
out.writeOptionalWriteable(valueType);
out.writeOptionalString(format);
out.writeMap(missingMap);
innerWriteTo(out);
}
/**
* Write subclass' state to the stream
*/
protected abstract void innerWriteTo(StreamOutput out) throws IOException;
/**
* Sets the field to use for this aggregation.
*/
@SuppressWarnings("unchecked")
public AB fields(List<String> fields) {
if (fields == null) {
throw new IllegalArgumentException("[field] must not be null: [" + name + "]");
}
this.fields = fields;
return (AB) this;
}
/**
* Gets the field to use for this aggregation.
*/
public List<String> fields() {
return fields;
}
/**
* Sets the {@link ValueType} for the value produced by this aggregation
*/
@SuppressWarnings("unchecked")
public AB valueType(ValueType valueType) {
if (valueType == null) {
throw new IllegalArgumentException("[valueType] must not be null: [" + name + "]");
}
this.valueType = valueType;
return (AB) this;
}
/**
* Gets the {@link ValueType} for the value produced by this aggregation
*/
public ValueType valueType() {
return valueType;
}
/**
* Sets the format to use for the output of the aggregation.
*/
@SuppressWarnings("unchecked")
public AB format(String format) {
if (format == null) {
throw new IllegalArgumentException("[format] must not be null: [" + name + "]");
}
this.format = format;
return (AB) this;
}
/**
* Gets the format to use for the output of the aggregation.
*/
public String format() {
return format;
}
/**
* Sets the value to use when the aggregation finds a missing value in a
* document
*/
@SuppressWarnings("unchecked")
public AB missingMap(Map<String, Object> missingMap) {
if (missingMap == null) {
throw new IllegalArgumentException("[missing] must not be null: [" + name + "]");
}
this.missingMap = missingMap;
return (AB) this;
}
/**
* Gets the value to use when the aggregation finds a missing value in a
* document
*/
public Map<String, Object> missingMap() {
return missingMap;
}
@Override
protected final MultiValuesSourceAggregatorFactory<VS, ?> doBuild(AggregationContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder) throws IOException {
Map<String, ValuesSourceConfig<VS>> configs = resolveConfig(context);
MultiValuesSourceAggregatorFactory<VS, ?> factory = innerBuild(context, configs, parent, subFactoriesBuilder);
return factory;
}
protected Map<String, ValuesSourceConfig<VS>> resolveConfig(AggregationContext context) {
HashMap<String, ValuesSourceConfig<VS>> configs = new HashMap<>();
for (String field : fields) {
ValuesSourceConfig<VS> config = config(context, field, null);
configs.put(field, config);
}
return configs;
}
protected abstract MultiValuesSourceAggregatorFactory<VS, ?> innerBuild(AggregationContext context,
Map<String, ValuesSourceConfig<VS>> configs, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder) throws IOException;
public ValuesSourceConfig<VS> config(AggregationContext context, String field, Script script) {
ValueType valueType = this.valueType != null ? this.valueType : targetValueType;
if (field == null) {
if (script == null) {
@SuppressWarnings("unchecked")
ValuesSourceConfig<VS> config = new ValuesSourceConfig<>(ValuesSourceType.ANY);
return config.format(resolveFormat(null, valueType));
}
ValuesSourceType valuesSourceType = valueType != null ? valueType.getValuesSourceType() : this.valuesSourceType;
if (valuesSourceType == null || valuesSourceType == ValuesSourceType.ANY) {
// the specific value source type is undefined, but for scripts,
// we need to have a specific value source
// type to know how to handle the script values, so we fallback
// on Bytes
valuesSourceType = ValuesSourceType.BYTES;
}
ValuesSourceConfig<VS> config = new ValuesSourceConfig<>(valuesSourceType);
config.missing(missingMap.get(field));
return config.format(resolveFormat(format, valueType));
}
MappedFieldType fieldType = context.searchContext().smartNameFieldType(field);
if (fieldType == null) {
ValuesSourceType valuesSourceType = valueType != null ? valueType.getValuesSourceType() : this.valuesSourceType;
ValuesSourceConfig<VS> config = new ValuesSourceConfig<>(valuesSourceType);
config.missing(missingMap.get(field));
config.format(resolveFormat(format, valueType));
return config.unmapped(true);
}
IndexFieldData<?> indexFieldData = context.searchContext().fieldData().getForField(fieldType);
ValuesSourceConfig<VS> config;
if (valuesSourceType == ValuesSourceType.ANY) {
if (indexFieldData instanceof IndexNumericFieldData) {
config = new ValuesSourceConfig<>(ValuesSourceType.NUMERIC);
} else if (indexFieldData instanceof IndexGeoPointFieldData) {
config = new ValuesSourceConfig<>(ValuesSourceType.GEOPOINT);
} else {
config = new ValuesSourceConfig<>(ValuesSourceType.BYTES);
}
} else {
config = new ValuesSourceConfig<>(valuesSourceType);
}
config.fieldContext(new FieldContext(field, indexFieldData, fieldType));
config.missing(missingMap.get(field));
return config.format(fieldType.docValueFormat(format, null));
}
private static DocValueFormat resolveFormat(@Nullable String format, @Nullable ValueType valueType) {
if (valueType == null) {
return DocValueFormat.RAW; // we can't figure it out
}
DocValueFormat valueFormat = valueType.defaultFormat();
if (valueFormat instanceof DocValueFormat.Decimal && format != null) {
valueFormat = new DocValueFormat.Decimal(format);
}
return valueFormat;
}
/**
* Should this builder serialize its targetValueType? Defaults to false. All subclasses that override this to true
* should use the three argument read constructor rather than the four argument version.
*/
protected boolean serializeTargetValueType() {
return false;
}
@Override
public final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
// todo add ParseField support to XContentBuilder
if (fields != null) {
builder.field(CommonFields.FIELDS.getPreferredName(), fields);
}
if (missing != null) {
builder.field(CommonFields.MISSING.getPreferredName(), missing);
}
if (format != null) {
builder.field(CommonFields.FORMAT.getPreferredName(), format);
}
if (valueType != null) {
builder.field(CommonFields.VALUE_TYPE.getPreferredName(), valueType.getPreferredName());
}
doXContentBody(builder, params);
builder.endObject();
return builder;
}
protected abstract XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException;
@Override
protected final int doHashCode() {
return Objects.hash(fields, format, missing, targetValueType, valueType, valuesSourceType,
innerHashCode());
}
protected abstract int innerHashCode();
@Override
protected final boolean doEquals(Object obj) {
MultiValuesSourceAggregationBuilder<?, ?> other = (MultiValuesSourceAggregationBuilder<?, ?>) obj;
if (!Objects.equals(fields, other.fields))
return false;
if (!Objects.equals(format, other.format))
return false;
if (!Objects.equals(missing, other.missing))
return false;
if (!Objects.equals(targetValueType, other.targetValueType))
return false;
if (!Objects.equals(valueType, other.valueType))
return false;
if (!Objects.equals(valuesSourceType, other.valuesSourceType))
return false;
return innerEquals(obj);
}
protected abstract boolean innerEquals(Object obj);
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.support;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public abstract class MultiValuesSourceAggregatorFactory<VS extends ValuesSource, AF extends MultiValuesSourceAggregatorFactory<VS, AF>>
extends AggregatorFactory<AF> {
protected Map<String, ValuesSourceConfig<VS>> configs;
public MultiValuesSourceAggregatorFactory(String name, Type type, Map<String, ValuesSourceConfig<VS>> configs,
AggregationContext context, AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metaData) throws IOException {
super(name, type, context, parent, subFactoriesBuilder, metaData);
this.configs = configs;
}
@Override
public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
HashMap<String, VS> valuesSources = new HashMap<>();
for (Map.Entry<String, ValuesSourceConfig<VS>> config : configs.entrySet()) {
VS vs = context.valuesSource(config.getValue(), context.searchContext());
if (vs != null) {
valuesSources.put(config.getKey(), vs);
}
}
if (valuesSources.isEmpty()) {
return createUnmapped(parent, pipelineAggregators, metaData);
}
return doCreateInternal(valuesSources, parent, collectsFromSingleBucket, pipelineAggregators, metaData);
}
protected abstract Aggregator createUnmapped(Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException;
protected abstract Aggregator doCreateInternal(Map<String, VS> valuesSources, Aggregator parent, boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException;
}

View File

@ -0,0 +1,239 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.support;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.script.Script.ScriptField;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregationBuilder.CommonFields;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
*/
public abstract class MultiValuesSourceParser<VS extends ValuesSource> implements Aggregator.Parser {
public abstract static class AnyValuesSourceParser extends MultiValuesSourceParser<ValuesSource> {
protected AnyValuesSourceParser(boolean formattable) {
super(formattable, ValuesSourceType.ANY, null);
}
}
public abstract static class NumericValuesSourceParser extends MultiValuesSourceParser<ValuesSource.Numeric> {
protected NumericValuesSourceParser(boolean formattable) {
super(formattable, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
}
}
public abstract static class BytesValuesSourceParser extends MultiValuesSourceParser<ValuesSource.Bytes> {
protected BytesValuesSourceParser(boolean formattable) {
super(formattable, ValuesSourceType.BYTES, ValueType.STRING);
}
}
public abstract static class GeoPointValuesSourceParser extends MultiValuesSourceParser<ValuesSource.GeoPoint> {
protected GeoPointValuesSourceParser(boolean formattable) {
super(formattable, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
}
}
private boolean formattable = false;
private ValuesSourceType valuesSourceType = null;
private ValueType targetValueType = null;
private MultiValuesSourceParser(boolean formattable, ValuesSourceType valuesSourceType, ValueType targetValueType) {
this.valuesSourceType = valuesSourceType;
this.targetValueType = targetValueType;
this.formattable = formattable;
}
@Override
public final MultiValuesSourceAggregationBuilder<VS, ?> parse(String aggregationName, QueryParseContext context)
throws IOException {
XContentParser parser = context.parser();
List<String> fields = null;
ValueType valueType = null;
String format = null;
Map<String, Object> missingMap = null;
Map<ParseField, Object> otherOptions = new HashMap<>();
final ParseFieldMatcher parseFieldMatcher = context.getParseFieldMatcher();
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (parseFieldMatcher.match(currentFieldName, CommonFields.FIELDS)) {
fields = Collections.singletonList(parser.text());
} else if (formattable && parseFieldMatcher.match(currentFieldName, CommonFields.FORMAT)) {
format = parser.text();
} else if (parseFieldMatcher.match(currentFieldName, CommonFields.VALUE_TYPE)) {
throw new ParsingException(parser.getTokenLocation(),
"Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "]. " +
"Multi-field aggregations do not support scripts.");
} else if (!token(aggregationName, currentFieldName, token, parser, context.getParseFieldMatcher(), otherOptions)) {
throw new ParsingException(parser.getTokenLocation(),
"Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "].");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (parseFieldMatcher.match(currentFieldName, CommonFields.MISSING)) {
missingMap = new HashMap<>();
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
parseMissingAndAdd(aggregationName, currentFieldName, parser, missingMap);
}
} else if (context.getParseFieldMatcher().match(currentFieldName, ScriptField.SCRIPT)) {
throw new ParsingException(parser.getTokenLocation(),
"Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "]. " +
"Multi-field aggregations do not support scripts.");
} else if (!token(aggregationName, currentFieldName, token, parser, context.getParseFieldMatcher(), otherOptions)) {
throw new ParsingException(parser.getTokenLocation(),
"Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "].");
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (context.getParseFieldMatcher().match(currentFieldName, ScriptField.SCRIPT)) {
throw new ParsingException(parser.getTokenLocation(),
"Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "]. " +
"Multi-field aggregations do not support scripts.");
} else if (parseFieldMatcher.match(currentFieldName, CommonFields.FIELDS)) {
fields = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_STRING) {
fields.add(parser.text());
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "].");
}
}
} else if (!token(aggregationName, currentFieldName, token, parser, context.getParseFieldMatcher(), otherOptions)) {
throw new ParsingException(parser.getTokenLocation(),
"Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "].");
}
} else if (!token(aggregationName, currentFieldName, token, parser, context.getParseFieldMatcher(), otherOptions)) {
throw new ParsingException(parser.getTokenLocation(),
"Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "].");
}
}
MultiValuesSourceAggregationBuilder<VS, ?> factory = createFactory(aggregationName, this.valuesSourceType, this.targetValueType,
otherOptions);
if (fields != null) {
factory.fields(fields);
}
if (valueType != null) {
factory.valueType(valueType);
}
if (format != null) {
factory.format(format);
}
if (missingMap != null) {
factory.missingMap(missingMap);
}
return factory;
}
private final void parseMissingAndAdd(final String aggregationName, final String currentFieldName,
XContentParser parser, final Map<String, Object> missing) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token == null) {
token = parser.nextToken();
}
if (token == XContentParser.Token.FIELD_NAME) {
final String fieldName = parser.currentName();
if (missing.containsKey(fieldName)) {
throw new ParsingException(parser.getTokenLocation(),
"Missing field [" + fieldName + "] already defined as [" + missing.get(fieldName)
+ "] in [" + aggregationName + "].");
}
parser.nextToken();
missing.put(fieldName, parser.objectText());
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "]");
}
}
/**
* Creates a {@link ValuesSourceAggregationBuilder} from the information
* gathered by the subclass. Options parsed in
* {@link MultiValuesSourceParser} itself will be added to the factory
* after it has been returned by this method.
*
* @param aggregationName
* the name of the aggregation
* @param valuesSourceType
* the type of the {@link ValuesSource}
* @param targetValueType
* the target type of the final value output by the aggregation
* @param otherOptions
* a {@link Map} containing the extra options parsed by the
* {@link #token(String, String, org.elasticsearch.common.xcontent.XContentParser.Token,
* XContentParser, ParseFieldMatcher, Map)}
* method
* @return the created factory
*/
protected abstract MultiValuesSourceAggregationBuilder<VS, ?> createFactory(String aggregationName, ValuesSourceType valuesSourceType,
ValueType targetValueType, Map<ParseField, Object> otherOptions);
/**
* Allows subclasses of {@link MultiValuesSourceParser} to parse extra
* parameters and store them in a {@link Map} which will later be passed to
* {@link #createFactory(String, ValuesSourceType, ValueType, Map)}.
*
* @param aggregationName
* the name of the aggregation
* @param currentFieldName
* the name of the current field being parsed
* @param token
* the current token for the parser
* @param parser
* the parser
* @param parseFieldMatcher
* the {@link ParseFieldMatcher} to use to match field names
* @param otherOptions
* a {@link Map} of options to be populated by successive calls
* to this method which will then be passed to the
* {@link #createFactory(String, ValuesSourceType, ValueType, Map)}
* method
* @return <code>true</code> if the current token was correctly parsed,
* <code>false</code> otherwise
* @throws IOException
* if an error occurs whilst parsing
*/
protected abstract boolean token(String aggregationName, String currentFieldName, XContentParser.Token token, XContentParser parser,
ParseFieldMatcher parseFieldMatcher, Map<ParseField, Object> otherOptions) throws IOException;
}

View File

@ -29,7 +29,7 @@ import static org.hamcrest.Matchers.equalTo;
/**
*
*/
public class MatrixStatsTestCase extends ESTestCase {
public abstract class BaseMatrixStatsTestCase extends ESTestCase {
protected final int numObs = atLeast(10000);
protected final ArrayList<Double> fieldA = new ArrayList<>(numObs);
protected final ArrayList<Double> fieldB = new ArrayList<>(numObs);

View File

@ -18,13 +18,12 @@
*/
package org.elasticsearch.search.aggregations.matrix.stats;
import java.util.HashMap;
import java.util.List;
/**
*
*/
public class RunningStatsTests extends MatrixStatsTestCase {
public class RunningStatsTests extends BaseMatrixStatsTestCase {
/** test running stats */
public void testRunningStats() throws Exception {
@ -56,15 +55,18 @@ public class RunningStatsTests extends MatrixStatsTestCase {
}
private RunningStats createRunningStats(List<Double> fieldAObs, List<Double> fieldBObs) {
RunningStats stats = RunningStats.EMPTY();
RunningStats stats = new RunningStats();
// create a document with two numeric fields
final HashMap<String, Double> doc = new HashMap<>(2);
final String[] fieldNames = new String[2];
fieldNames[0] = fieldAKey;
fieldNames[1] = fieldBKey;
final double[] fieldVals = new double[2];
// running stats computation
for (int n = 0; n < fieldAObs.size(); ++n) {
doc.put(fieldAKey, fieldAObs.get(n));
doc.put(fieldBKey, fieldBObs.get(n));
stats.add(doc);
fieldVals[0] = fieldAObs.get(n);
fieldVals[1] = fieldBObs.get(n);
stats.add(fieldNames, fieldVals);
}
return stats;
}

View File

@ -43,6 +43,6 @@
search:
index: empty_bucket_idx
type: test
body: {"aggs": {"histo": {"histogram": {"field": "val1", "interval": 1, "min_doc_count": 0}, "aggs": { "mfs" : { "matrix_stats": {"field": ["value", "val1"]} } } } } }
body: {"aggs": {"histo": {"histogram": {"field": "val1", "interval": 1, "min_doc_count": 0}, "aggs": { "mfs" : { "matrix_stats": {"fields": ["value", "val1"]} } } } } }
- match: {hits.total: 2}

View File

@ -130,7 +130,7 @@ setup:
search:
index: unmapped
type: test
body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "val3"]} } } }
body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2", "val3"]} } } }
- match: {hits.total: 0}
@ -141,10 +141,10 @@ setup:
search:
index: test
type: test
body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val3"]} } } }
body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val3"]} } } }
- match: {hits.total: 15}
- match: {aggregations.mfs.count.0: 15}
- match: {aggregations.mfs.fields.0.count: 15}
---
"Partially unmapped":
@ -153,32 +153,41 @@ setup:
search:
index: [test, unmapped]
type: test
body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "val3"]} } } }
body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2", "val3"]} } } }
- match: {hits.total: 15}
- match: {aggregations.mfs.count.0: 14}
- match: {aggregations.mfs.fields.0.count: 14}
- match: {aggregations.mfs.fields.2.correlation.val2: 0.9569513137793205}
---
"Partially unmapped with missing default":
- do:
search:
index: [test, unmapped]
type: test
body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2", "val3"], "missing" : {"val2" : 10} } } } }
- match: {hits.total: 15}
- match: {aggregations.mfs.fields.0.count: 15}
- match: {aggregations.mfs.fields.2.correlation.val2: 0.9567970467908384}
---
"With script":
- do:
catch: /parsing_exception/
search:
index: test
type: test
body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2"], "script" : { "my_script" : {"inline" : "1 + doc['val1'].value", "lang" : "js"} } } } } }
- match: {hits.total: 15}
- match: {aggregations.mfs.count.0: 14}
body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2"], "script" : { "my_script" : {"inline" : "1 + doc['val1'].value", "lang" : "js"} } } } } }
---
"With script params":
- do:
catch: /parsing_exception/
search:
index: test
type: test
body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "val3"], "script" : { "my_script" : {"inline" : "my_var + doc['val1'].value", "params" : { "my_var" : 1 }, "lang" : "js" } } } } } }
- match: {hits.total: 15}
- match: {aggregations.mfs.count.0: 14}
- match: {aggregations.mfs.correlation.1.2: 0.9569513137793205}
body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2", "val3"], "script" : { "my_script" : {"inline" : "my_var + doc['val1'].value", "params" : { "my_var" : 1 }, "lang" : "js" } } } } } }

View File

@ -130,21 +130,35 @@ setup:
search:
index: unmapped
type: test
body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "vals"]} } } }
body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2", "vals"]} } } }
- match: {hits.total: 0}
---
"Multi value field":
"Multi value field Max":
- do:
search:
index: test
type: test
body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val3"]} } } }
body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "vals"], "mode" : "max"} } } }
- match: {hits.total: 15}
- match: {aggregations.mfs.count.0: 15}
- match: {aggregations.mfs.fields.0.count: 14}
- match: {aggregations.mfs.fields.0.correlation.val1: 0.06838646533369998}
---
"Multi value field Min":
- do:
search:
index: test
type: test
body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "vals"], "mode" : "min"} } } }
- match: {hits.total: 15}
- match: {aggregations.mfs.fields.0.count: 14}
- match: {aggregations.mfs.fields.0.correlation.val1: -0.09777682707831963}
---
"Partially unmapped":
@ -153,32 +167,41 @@ setup:
search:
index: [test, unmapped]
type: test
body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "vals"]} } } }
body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2", "vals"]} } } }
- match: {hits.total: 15}
- match: {aggregations.mfs.count.0: 13}
- match: {aggregations.mfs.fields.0.count: 13}
- match: {aggregations.mfs.fields.0.correlation.val1: -0.044997535185684244}
---
"Partially unmapped with missing defaults":
- do:
search:
index: [test, unmapped]
type: test
body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2", "vals"], "missing" : {"val2" : 10, "vals" : 5 } } } } }
- match: {hits.total: 15}
- match: {aggregations.mfs.fields.0.count: 15}
- match: {aggregations.mfs.fields.0.correlation.val2: 0.04028024709708195}
---
"With script":
- do:
catch: /parsing_exception/
search:
index: test
type: test
body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["vals", "val3"], "script" : { "my_script" : {"inline" : "1 + doc['val1'].value", "lang" : "js"} } } } } }
- match: {hits.total: 15}
- match: {aggregations.mfs.count.0: 14}
body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["vals", "val3"], "script" : { "my_script" : {"inline" : "1 + doc['val1'].value", "lang" : "js"} } } } } }
---
"With script params":
- do:
catch: /parsing_exception/
search:
index: test
type: test
body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val3", "vals"], "script" : { "my_script" : {"inline" : "my_var + doc['val1'].value", "params" : { "my_var" : 1 }, "lang" : "js" } } } } } }
- match: {hits.total: 15}
- match: {aggregations.mfs.count.0: 14}
- match: {aggregations.mfs.correlation.1.2: -0.055971032866899535}
body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val3", "vals"], "script" : { "my_script" : {"inline" : "my_var + doc['val1'].value", "params" : { "my_var" : 1 }, "lang" : "js" } } } } } }