From 90b8f5d0d8ab316a88ae17a60f04aa12187cd671 Mon Sep 17 00:00:00 2001 From: Nicholas Knize Date: Fri, 13 May 2016 13:37:05 -0500 Subject: [PATCH] Adding MultiValuesSource support classes and documentation to matrix stats agg module --- .../org/elasticsearch/common/ParseField.java | 8 + .../aggregations/AggregationBuilder.java | 5 + .../aggregations/InternalAggregation.java | 4 +- .../support/ValuesSourceConfig.java | 15 + docs/reference/modules.asciidoc | 6 + .../modules/aggregations-matrix.asciidoc | 9 + .../aggregations/matrix/stats.asciidoc | 112 ++++++ modules/aggs-matrix-stats/build.gradle | 12 - .../MatrixStatsAggregationBuilders.java | 6 +- .../matrix/MatrixAggregationPlugin.java | 10 +- .../matrix/stats/InternalMatrixStats.java | 148 ++++--- .../matrix/stats/MatrixStats.java | 19 +- ...ava => MatrixStatsAggregationBuilder.java} | 27 +- .../matrix/stats/MatrixStatsAggregator.java | 72 ++-- .../stats/MatrixStatsAggregatorFactory.java | 14 +- .../matrix/stats/MatrixStatsParser.java | 22 +- .../matrix/stats/MatrixStatsResults.java | 111 +++--- .../matrix/stats/RunningStats.java | 208 ++++------ .../support/MultiValuesSource.java | 90 +++++ .../MultiValuesSourceAggregationBuilder.java | 366 ++++++++++++++++++ .../MultiValuesSourceAggregatorFactory.java | 68 ++++ .../support/MultiValuesSourceParser.java | 239 ++++++++++++ ...Case.java => BaseMatrixStatsTestCase.java} | 2 +- .../matrix/stats/RunningStatsTests.java | 16 +- .../test/stats/20_empty_bucket.yaml | 2 +- .../test/stats/30_single_value_field.yaml | 37 +- .../test/stats/40_multi_value_field.yaml | 53 ++- 27 files changed, 1283 insertions(+), 398 deletions(-) create mode 100644 docs/reference/modules/aggregations-matrix.asciidoc create mode 100644 docs/reference/modules/aggregations/matrix/stats.asciidoc rename modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/{MatrixStatsAggregatorBuilder.java => MatrixStatsAggregationBuilder.java} (75%) create mode 100644 modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSource.java create mode 100644 modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSourceAggregationBuilder.java create mode 100644 modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSourceAggregatorFactory.java create mode 100644 modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSourceParser.java rename modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/{MatrixStatsTestCase.java => BaseMatrixStatsTestCase.java} (99%) diff --git a/core/src/main/java/org/elasticsearch/common/ParseField.java b/core/src/main/java/org/elasticsearch/common/ParseField.java index 6516f7952a7..c04bcb14dcb 100644 --- a/core/src/main/java/org/elasticsearch/common/ParseField.java +++ b/core/src/main/java/org/elasticsearch/common/ParseField.java @@ -107,4 +107,12 @@ public class ParseField { public String[] getDeprecatedNames() { return deprecatedNames; } + + public static class CommonFields { + public static final ParseField FIELD = new ParseField("field"); + public static final ParseField FIELDS = new ParseField("fields"); + public static final ParseField FORMAT = new ParseField("format"); + public static final ParseField MISSING = new ParseField("missing"); + public static final ParseField TIME_ZONE = new ParseField("time_zone"); + } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java index 1a16f653159..d718227aae8 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java @@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations; import org.elasticsearch.action.support.ToXContentToBytes; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.search.aggregations.InternalAggregation.Type; @@ -84,4 +85,8 @@ public abstract class AggregationBuilder */ protected abstract AggregationBuilder subAggregations(AggregatorFactories.Builder subFactories); + /** Common xcontent fields shared among aggregator builders */ + public static final class CommonFields extends ParseField.CommonFields { + public static final ParseField VALUE_TYPE = new ParseField("value_type"); + } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index 5ae78417f6f..b4fab7093f8 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; @@ -250,7 +251,8 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St /** * Common xcontent fields that are shared among addAggregation */ - public static final class CommonFields { + public static final class CommonFields extends ParseField.CommonFields { + // todo convert these to ParseField public static final String META = "meta"; public static final String BUCKETS = "buckets"; public static final String VALUE = "value"; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSourceConfig.java b/core/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSourceConfig.java index 2c5e6f2f335..acc2b063ac2 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSourceConfig.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSourceConfig.java @@ -76,6 +76,21 @@ public class ValuesSourceConfig { return this; } + public ValuesSourceConfig format(final DocValueFormat format) { + this.format = format; + return this; + } + + public ValuesSourceConfig missing(final Object missing) { + this.missing = missing; + return this; + } + + public ValuesSourceConfig timezone(final DateTimeZone timeZone) { + this.timeZone= timeZone; + return this; + } + public DocValueFormat format() { return format; } diff --git a/docs/reference/modules.asciidoc b/docs/reference/modules.asciidoc index 5a39cdfd790..01b0edd0676 100644 --- a/docs/reference/modules.asciidoc +++ b/docs/reference/modules.asciidoc @@ -18,6 +18,10 @@ These settings can be dynamically updated on a live cluster with the The modules in this section are: +<>:: + + A family of aggregations that operate on multiple document fields and produce a matrix as output. + <>:: Settings to control where, when, and how shards are allocated to nodes. @@ -80,6 +84,8 @@ The modules in this section are: -- +include::modules/aggregations-matrix.asciidoc[] + include::modules/cluster.asciidoc[] include::modules/discovery.asciidoc[] diff --git a/docs/reference/modules/aggregations-matrix.asciidoc b/docs/reference/modules/aggregations-matrix.asciidoc new file mode 100644 index 00000000000..e8f741e7df6 --- /dev/null +++ b/docs/reference/modules/aggregations-matrix.asciidoc @@ -0,0 +1,9 @@ +[[modules-aggregations-matrix]] +== Matrix Aggregations + +experimental[] + +The aggregations in this family operate on multiple fields and produce a matrix result based on the values extracted from +the requested document fields. Unlike metric and bucket aggregations, this aggregation family does not yet support scripting. + +include::aggregations/matrix/stats.asciidoc[] diff --git a/docs/reference/modules/aggregations/matrix/stats.asciidoc b/docs/reference/modules/aggregations/matrix/stats.asciidoc new file mode 100644 index 00000000000..2649ecd5fe7 --- /dev/null +++ b/docs/reference/modules/aggregations/matrix/stats.asciidoc @@ -0,0 +1,112 @@ +[[modules-matrix-aggregations-stats]] +=== Matrix Stats + +The `matrix_stats` aggregation is a numeric aggregation that computes the following statistics over a set of document fields: + +[horizontal] +`count`:: Number of per field samples included in the calculation. +`mean`:: The average value for each field. +`variance`:: Per field Measurement for how spread out the samples are from the mean. +`skewness`:: Per field measurement quantifying the asymmetric distribution around the mean. +`kurtosis`:: Per field measurement quantifying the shape of the distribution. +`covariance`:: A matrix that quantitatively describes how changes in one field are associated with another. +`correlation`:: The covariance matrix scaled to a range of -1 to 1, inclusive. Describes the relationship between field + distributions. + +The following example demonstrates the use of matrix stats to describe the relationship between income and poverty. + +[source,js] +-------------------------------------------------- +{ + "aggs": { + "matrixstats": { + "matrix_stats": { + "fields": ["poverty", "income"] + } + } + } +} +-------------------------------------------------- + +The aggregation type is `matrix_stats` and the `fields` setting defines the set of fields (as an array) for computing +the statistics. The above request returns the following response: + +[source,js] +-------------------------------------------------- +{ + ... + "aggregations": { + "matrixstats": { + "fields": [{ + "name": "income", + "count": 50, + "mean": 51985.1, + "variance": 7.383377037755103E7, + "skewness": 0.5595114003506483, + "kurtosis": 2.5692365287787124, + "covariance": { + "income": 7.383377037755103E7, + "poverty": -21093.65836734694 + }, + "correlation": { + "income": 1.0, + "poverty": -0.8352655256272504 + } + }, { + "name": "poverty", + "count": 50, + "mean": 12.732000000000001, + "variance": 8.637730612244896, + "skewness": 0.4516049811903419, + "kurtosis": 2.8615929677997767, + "covariance": { + "income": -21093.65836734694, + "poverty": 8.637730612244896 + }, + "correlation": { + "income": -0.8352655256272504, + "poverty": 1.0 + } + }] + } + } +} +-------------------------------------------------- + +==== Multi Value Fields + +The `matrix_stats` aggregation treats each document field as an independent sample. The `mode` parameter controls what +array value the aggregation will use for array or multi-valued fields. This parameter can take one of the following: + +[horizontal] +`avg`:: (default) Use the average of all values. +`min`:: Pick the lowest value. +`max`:: Pick the highest value. +`sum`:: Use the sum of all values. +`median`:: Use the median of all values. + +==== Missing Values + +The `missing` parameter defines how documents that are missing a value should be treated. +By default they will be ignored but it is also possible to treat them as if they had a value. +This is done by adding a set of fieldname : value mappings to specify default values per field. + +[source,js] +-------------------------------------------------- +{ + "aggs": { + "matrixstats": { + "matrix_stats": { + "fields": ["poverty", "income"], + "missing": {"income" : 50000} <1> + } + } + } +} +-------------------------------------------------- + +<1> Documents without a value in the `income` field will have the default value `50000`. + +==== Script + +This aggregation family does not yet support scripting. diff --git a/modules/aggs-matrix-stats/build.gradle b/modules/aggs-matrix-stats/build.gradle index 78938837806..b3060fa1786 100644 --- a/modules/aggs-matrix-stats/build.gradle +++ b/modules/aggs-matrix-stats/build.gradle @@ -21,15 +21,3 @@ esplugin { description 'Adds aggregations whose input are a list of numeric fields and output includes a matrix.' classname 'org.elasticsearch.search.aggregations.matrix.MatrixAggregationPlugin' } - -dependencies { - testCompile project(path: ':plugins:lang-javascript', configuration: 'runtime') -} - -integTest { - cluster { - plugin 'lang-javascript', project(':plugins:lang-javascript') - setting 'script.inline', 'true' - setting 'script.stored', 'true' - } -} \ No newline at end of file diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/MatrixStatsAggregationBuilders.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/MatrixStatsAggregationBuilders.java index 321f4fbceae..10758979ed9 100644 --- a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/MatrixStatsAggregationBuilders.java +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/MatrixStatsAggregationBuilders.java @@ -19,7 +19,7 @@ package org.elasticsearch.search.aggregations; import org.elasticsearch.search.aggregations.matrix.stats.MatrixStats; -import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregatorBuilder; +import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregationBuilder; /** */ @@ -27,7 +27,7 @@ public class MatrixStatsAggregationBuilders { /** * Create a new {@link MatrixStats} aggregation with the given name. */ - public static MatrixStatsAggregatorBuilder matrixStats(String name) { - return new MatrixStatsAggregatorBuilder(name); + public static MatrixStatsAggregationBuilder matrixStats(String name) { + return new MatrixStatsAggregationBuilder(name); } } diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/MatrixAggregationPlugin.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/MatrixAggregationPlugin.java index c1397fa410e..156ff05ad65 100644 --- a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/MatrixAggregationPlugin.java +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/MatrixAggregationPlugin.java @@ -22,15 +22,12 @@ package org.elasticsearch.search.aggregations.matrix; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.matrix.stats.InternalMatrixStats; -import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregatorBuilder; +import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregationBuilder; import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsParser; import java.io.IOException; public class MatrixAggregationPlugin extends Plugin { - static { - InternalMatrixStats.registerStreams(); - } public MatrixAggregationPlugin() throws IOException { } @@ -46,7 +43,8 @@ public class MatrixAggregationPlugin extends Plugin { } public void onModule(SearchModule searchModule) { - searchModule.registerAggregation(MatrixStatsAggregatorBuilder::new, new MatrixStatsParser(), - MatrixStatsAggregatorBuilder.AGGREGATION_NAME_FIELD); + InternalMatrixStats.registerStreams(); + searchModule.registerAggregation(MatrixStatsAggregationBuilder::new, new MatrixStatsParser(), + MatrixStatsAggregationBuilder.AGGREGATION_NAME_FIELD); } } diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java index da7683b4b3d..edef75389c8 100644 --- a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java @@ -27,7 +27,9 @@ import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** * Computes distribution statistics over multiple fields @@ -70,11 +72,13 @@ public class InternalMatrixStats extends InternalMetricsAggregation implements M return TYPE; } + /** get the number of documents */ @Override public long getDocCount() { return stats.docCount; } + /** get the number of samples for the given field. == docCount - numMissing */ @Override public long getFieldCount(String field) { if (results == null) { @@ -83,65 +87,63 @@ public class InternalMatrixStats extends InternalMetricsAggregation implements M return results.getFieldCount(field); } + /** get the mean for the given field */ @Override - public Double getMean(String field) { + public double getMean(String field) { if (results == null) { - return null; + return Double.NaN; } return results.getMean(field); } + /** get the variance for the given field */ @Override - public Double getVariance(String field) { + public double getVariance(String field) { if (results == null) { - return null; + return Double.NaN; } return results.getVariance(field); } + /** get the distribution skewness for the given field */ @Override - public Double getSkewness(String field) { + public double getSkewness(String field) { if (results == null) { - return null; + return Double.NaN; } return results.getSkewness(field); } + /** get the distribution shape for the given field */ @Override - public Double getKurtosis(String field) { + public double getKurtosis(String field) { if (results == null) { - return null; + return Double.NaN; } return results.getKurtosis(field); } + /** get the covariance between the two fields */ @Override - public Double getCovariance(String fieldX, String fieldY) { + public double getCovariance(String fieldX, String fieldY) { if (results == null) { - return null; + return Double.NaN; } return results.getCovariance(fieldX, fieldY); } + /** get the correlation between the two fields */ @Override - public Map> getCovariance() { - return results.getCovariances(); - } - - @Override - public Double getCorrelation(String fieldX, String fieldY) { + public double getCorrelation(String fieldX, String fieldY) { if (results == null) { - return null; + return Double.NaN; } return results.getCorrelation(fieldX, fieldY); } - @Override - public Map> getCorrelation() { - return results.getCorrelations(); - } - static class Fields { + public static final String FIELDS = "fields"; + public static final String NAME = "name"; public static final String COUNT = "count"; public static final String MEAN = "mean"; public static final String VARIANCE = "variance"; @@ -153,30 +155,38 @@ public class InternalMatrixStats extends InternalMetricsAggregation implements M @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { - if (results != null) { - Set fieldNames = results.getFieldCounts().keySet(); - builder.field("field", fieldNames); - builder.field(Fields.COUNT, results.getFieldCounts().values()); - builder.field(Fields.MEAN, results.getMeans().values()); - builder.field(Fields.VARIANCE, results.getVariances().values()); - builder.field(Fields.SKEWNESS, results.getSkewness().values()); - builder.field(Fields.KURTOSIS, results.getKurtosis().values()); - ArrayList> cov = new ArrayList<>(fieldNames.size()); - ArrayList> cor = new ArrayList<>(fieldNames.size()); - for (String y : fieldNames) { - ArrayList covRow = new ArrayList<>(fieldNames.size()); - ArrayList corRow = new ArrayList<>(fieldNames.size()); - for (String x : fieldNames) { - covRow.add(results.getCovariance(x, y)); - corRow.add(results.getCorrelation(x, y)); + if (results != null && results.getFieldCounts().keySet().isEmpty() == false) { + builder.startArray(Fields.FIELDS); + for (String fieldName : results.getFieldCounts().keySet()) { + builder.startObject(); + // name + builder.field(Fields.NAME, fieldName); + // count + builder.field(Fields.COUNT, results.getFieldCount(fieldName)); + // mean + builder.field(Fields.MEAN, results.getMean(fieldName)); + // variance + builder.field(Fields.VARIANCE, results.getVariance(fieldName)); + // skewness + builder.field(Fields.SKEWNESS, results.getSkewness(fieldName)); + // kurtosis + builder.field(Fields.KURTOSIS, results.getKurtosis(fieldName)); + // covariance + builder.startObject(Fields.COVARIANCE); + for (String fieldB : results.getFieldCounts().keySet()) { + builder.field(fieldB, results.getCovariance(fieldName, fieldB)); } - cov.add(covRow); - cor.add(corRow); + builder.endObject(); + // correlation + builder.startObject(Fields.CORRELATION); + for (String fieldB : results.getFieldCounts().keySet()) { + builder.field(fieldB, results.getCorrelation(fieldName, fieldB)); + } + builder.endObject(); + builder.endObject(); } - builder.field(Fields.COVARIANCE, cov); - builder.field(Fields.CORRELATION, cor); + builder.endArray(); } - return builder; } @@ -185,11 +195,11 @@ public class InternalMatrixStats extends InternalMetricsAggregation implements M if (path.isEmpty()) { return this; } else if (path.size() == 1) { - String coordinate = path.get(0); + String element = path.get(0); if (results == null) { - results = MatrixStatsResults.EMPTY(); + results = new MatrixStatsResults(); } - switch (coordinate) { + switch (element) { case "counts": return results.getFieldCounts(); case "means": @@ -205,7 +215,7 @@ public class InternalMatrixStats extends InternalMetricsAggregation implements M case "correlation": return results.getCorrelations(); default: - throw new IllegalArgumentException("Found unknown path element [" + coordinate + "] in [" + getName() + "]"); + throw new IllegalArgumentException("Found unknown path element [" + element + "] in [" + getName() + "]"); } } else { throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path); @@ -215,53 +225,35 @@ public class InternalMatrixStats extends InternalMetricsAggregation implements M @Override protected void doWriteTo(StreamOutput out) throws IOException { // write running stats - if (stats == null || stats.docCount == 0) { - out.writeVLong(0); - } else { - out.writeVLong(stats.docCount); - stats.writeTo(out); - } - + out.writeOptionalWriteable(stats); // write results - if (results == null || results.getDocCount() == 0) { - out.writeVLong(0); - } else { - out.writeVLong(results.getDocCount()); - results.writeTo(out); - } + out.writeOptionalWriteable(results); } @Override protected void doReadFrom(StreamInput in) throws IOException { // read stats count - final long statsCount = in.readVLong(); - if (statsCount > 0) { - stats = new RunningStats(in); - stats.docCount = statsCount; - } - + stats = in.readOptionalWriteable(RunningStats::new); // read count - final long count = in.readVLong(); - if (count > 0) { - results = new MatrixStatsResults(in); - } + results = in.readOptionalWriteable(MatrixStatsResults::new); } @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { // merge stats across all shards - aggregations.removeIf(p -> ((InternalMatrixStats)p).stats == null); + List aggs = new ArrayList<>(aggregations); + aggs.removeIf(p -> ((InternalMatrixStats)p).stats == null); // return empty result iff all stats are null - if (aggregations.isEmpty()) { - return new InternalMatrixStats(name, 0, null, MatrixStatsResults.EMPTY(), pipelineAggregators(), getMetaData()); + if (aggs.isEmpty()) { + return new InternalMatrixStats(name, 0, null, new MatrixStatsResults(), pipelineAggregators(), getMetaData()); } - RunningStats runningStats = ((InternalMatrixStats) aggregations.get(0)).stats; - for (int i=1; i < aggregations.size(); ++i) { - runningStats.merge(((InternalMatrixStats) aggregations.get(i)).stats); + RunningStats runningStats = new RunningStats(); + for (int i=0; i < aggs.size(); ++i) { + runningStats.merge(((InternalMatrixStats) aggs.get(i)).stats); } - MatrixStatsResults results = new MatrixStatsResults(stats); + MatrixStatsResults results = new MatrixStatsResults(runningStats); return new InternalMatrixStats(name, results.getDocCount(), runningStats, results, pipelineAggregators(), getMetaData()); } diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStats.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStats.java index 1de20faae93..f7604ff18ec 100644 --- a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStats.java +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStats.java @@ -20,9 +20,6 @@ package org.elasticsearch.search.aggregations.matrix.stats; import org.elasticsearch.search.aggregations.Aggregation; -import java.util.HashMap; -import java.util.Map; - /** * Interface for MatrixStats Metric Aggregation */ @@ -32,19 +29,15 @@ public interface MatrixStats extends Aggregation { /** return total field count (differs from docCount if there are missing values) */ long getFieldCount(String field); /** return the field mean */ - Double getMean(String field); + double getMean(String field); /** return the field variance */ - Double getVariance(String field); + double getVariance(String field); /** return the skewness of the distribution */ - Double getSkewness(String field); + double getSkewness(String field); /** return the kurtosis of the distribution */ - Double getKurtosis(String field); - /** return the upper triangle of the covariance matrix */ - Map> getCovariance(); + double getKurtosis(String field); /** return the covariance between field x and field y */ - Double getCovariance(String fieldX, String fieldY); - /** return the upper triangle of the pearson product-moment correlation matrix */ - Map> getCorrelation(); + double getCovariance(String fieldX, String fieldY); /** return the correlation coefficient of field x and field y */ - Double getCorrelation(String fieldX, String fieldY); + double getCorrelation(String fieldX, String fieldY); } diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregatorBuilder.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregationBuilder.java similarity index 75% rename from modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregatorBuilder.java rename to modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregationBuilder.java index ea20a3c32ad..5c6c3767177 100644 --- a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregatorBuilder.java +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregationBuilder.java @@ -23,10 +23,11 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.MultiValueMode; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.support.AggregationContext; -import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregatorBuilder; +import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.aggregations.support.ValuesSourceType; @@ -38,19 +39,21 @@ import java.util.Map; /** */ -public class MatrixStatsAggregatorBuilder - extends MultiValuesSourceAggregatorBuilder.LeafOnly { +public class MatrixStatsAggregationBuilder + extends MultiValuesSourceAggregationBuilder.LeafOnly { public static final String NAME = InternalMatrixStats.TYPE.name(); public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); - public MatrixStatsAggregatorBuilder(String name) { + private MultiValueMode multiValueMode = MultiValueMode.AVG; + + public MatrixStatsAggregationBuilder(String name) { super(name, InternalMatrixStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC); } /** * Read from a stream. */ - public MatrixStatsAggregatorBuilder(StreamInput in) throws IOException { + public MatrixStatsAggregationBuilder(StreamInput in) throws IOException { super(in, InternalMatrixStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC); } @@ -59,14 +62,24 @@ public class MatrixStatsAggregatorBuilder // Do nothing, no extra state to write to stream } + public MatrixStatsAggregationBuilder multiValueMode(MultiValueMode multiValueMode) { + this.multiValueMode = multiValueMode; + return this; + } + + public MultiValueMode multiValueMode() { + return this.multiValueMode; + } + @Override protected MatrixStatsAggregatorFactory innerBuild(AggregationContext context, Map> configs, - AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder) throws IOException { - return new MatrixStatsAggregatorFactory(name, type, configs, context, parent, subFactoriesBuilder, metaData); + AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder) throws IOException { + return new MatrixStatsAggregatorFactory(name, type, configs, multiValueMode, context, parent, subFactoriesBuilder, metaData); } @Override public XContentBuilder doXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.field(MULTIVALUE_MODE_FIELD.getPreferredName(), multiValueMode); return builder; } diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregator.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregator.java index 275163bf103..75f0c37d89f 100644 --- a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregator.java +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregator.java @@ -22,7 +22,8 @@ import org.apache.lucene.index.LeafReaderContext; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ObjectArray; -import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.index.fielddata.NumericDoubleValues; +import org.elasticsearch.search.MultiValueMode; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; @@ -30,11 +31,10 @@ import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.metrics.MetricsAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.MultiValuesSource.NumericMultiValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSource; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -43,89 +43,75 @@ import java.util.Map; **/ public class MatrixStatsAggregator extends MetricsAggregator { /** Multiple ValuesSource with field names */ - final Map valuesSources; + final NumericMultiValuesSource valuesSources; /** array of descriptive stats, per shard, needed to compute the correlation */ ObjectArray stats; public MatrixStatsAggregator(String name, Map valuesSources, AggregationContext context, - Aggregator parent, List pipelineAggregators, + Aggregator parent, MultiValueMode multiValueMode, List pipelineAggregators, Map metaData) throws IOException { super(name, context, parent, pipelineAggregators, metaData); - this.valuesSources = valuesSources; if (valuesSources != null && !valuesSources.isEmpty()) { + this.valuesSources = new NumericMultiValuesSource(valuesSources, multiValueMode); stats = context.bigArrays().newObjectArray(1); + } else { + this.valuesSources = null; } } @Override public boolean needsScores() { - boolean needsScores = false; - if (valuesSources != null) { - for (Map.Entry valueSource : valuesSources.entrySet()) { - needsScores |= valueSource.getValue().needsScores(); - } - } - return needsScores; + return (valuesSources == null) ? false : valuesSources.needsScores(); } @Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { - if (valuesSources == null || valuesSources.isEmpty()) { + if (valuesSources == null) { return LeafBucketCollector.NO_OP_COLLECTOR; } final BigArrays bigArrays = context.bigArrays(); - final HashMap values = new HashMap<>(valuesSources.size()); - for (Map.Entry valuesSource : valuesSources.entrySet()) { - values.put(valuesSource.getKey(), valuesSource.getValue().doubleValues(ctx)); + final NumericDoubleValues[] values = new NumericDoubleValues[valuesSources.fieldNames().length]; + for (int i = 0; i < values.length; ++i) { + values[i] = valuesSources.getField(i, ctx); } return new LeafBucketCollectorBase(sub, values) { + final String[] fieldNames = valuesSources.fieldNames(); + final double[] fieldVals = new double[fieldNames.length]; + @Override public void collect(int doc, long bucket) throws IOException { // get fields - Map fields = getFields(doc); - if (fields != null) { + if (includeDocument(doc) == true) { stats = bigArrays.grow(stats, bucket + 1); RunningStats stat = stats.get(bucket); // add document fields to correlation stats if (stat == null) { - stat = new RunningStats(fields); + stat = new RunningStats(fieldNames, fieldVals); + stats.set(bucket, stat); } else { - stat.add(fields); + stat.add(fieldNames, fieldVals); } - stats.set(bucket, stat); } } /** * return a map of field names and data */ - private Map getFields(int doc) { - // get fieldNames to use as hash keys - ArrayList fieldNames = new ArrayList<>(values.keySet()); - HashMap fields = new HashMap<>(fieldNames.size()); - + private boolean includeDocument(int doc) { // loop over fields - for (String fieldName : fieldNames) { - final SortedNumericDoubleValues doubleValues = values.get(fieldName); - doubleValues.setDocument(doc); - final int valuesCount = doubleValues.count(); - // if document contains an empty field we omit the doc from the correlation - if (valuesCount <= 0) { - return null; + for (int i = 0; i < fieldVals.length; ++i) { + final NumericDoubleValues doubleValues = values[i]; + final double value = doubleValues.get(doc); + // skip if value is missing + if (value == Double.NEGATIVE_INFINITY) { + return false; } - // get the field value (multi-value is the average of all the values) - double fieldValue = 0; - for (int i = 0; i < valuesCount; ++i) { - if (Double.isNaN(doubleValues.valueAt(i)) == false) { - fieldValue += doubleValues.valueAt(i); - } - } - fields.put(fieldName, fieldValue / valuesCount); + fieldVals[i] = value; } - return fields; + return true; } }; } diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregatorFactory.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregatorFactory.java index 30389c05836..26e7910dcac 100644 --- a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregatorFactory.java +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregatorFactory.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.search.aggregations.matrix.stats; +import org.elasticsearch.search.MultiValueMode; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactory; @@ -37,23 +38,26 @@ import java.util.Map; public class MatrixStatsAggregatorFactory extends MultiValuesSourceAggregatorFactory { + private final MultiValueMode multiValueMode; + public MatrixStatsAggregatorFactory(String name, InternalAggregation.Type type, - Map> configs, AggregationContext context, - AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder, - Map metaData) throws IOException { + Map> configs, MultiValueMode multiValueMode, + AggregationContext context, AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder, + Map metaData) throws IOException { super(name, type, configs, context, parent, subFactoriesBuilder, metaData); + this.multiValueMode = multiValueMode; } @Override protected Aggregator createUnmapped(Aggregator parent, List pipelineAggregators, Map metaData) throws IOException { - return new MatrixStatsAggregator(name, null, context, parent, pipelineAggregators, metaData); + return new MatrixStatsAggregator(name, null, context, parent, multiValueMode, pipelineAggregators, metaData); } @Override protected Aggregator doCreateInternal(Map valuesSources, Aggregator parent, boolean collectsFromSingleBucket, List pipelineAggregators, Map metaData) throws IOException { - return new MatrixStatsAggregator(name, valuesSources, context, parent, pipelineAggregators, metaData); + return new MatrixStatsAggregator(name, valuesSources, context, parent, multiValueMode, pipelineAggregators, metaData); } } diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsParser.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsParser.java index 950bd078086..ea383b642c2 100644 --- a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsParser.java +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsParser.java @@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.matrix.stats; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.MultiValueMode; import org.elasticsearch.search.aggregations.support.MultiValuesSourceParser.NumericValuesSourceParser; import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.aggregations.support.ValuesSourceType; @@ -28,23 +29,36 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceType; import java.io.IOException; import java.util.Map; +import static org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregationBuilder.MULTIVALUE_MODE_FIELD; + /** */ public class MatrixStatsParser extends NumericValuesSourceParser { public MatrixStatsParser() { - super(true, true, false); + super(true); } @Override protected boolean token(String aggregationName, String currentFieldName, XContentParser.Token token, XContentParser parser, ParseFieldMatcher parseFieldMatcher, Map otherOptions) throws IOException { + if (parseFieldMatcher.match(currentFieldName, MULTIVALUE_MODE_FIELD)) { + if (token == XContentParser.Token.VALUE_STRING) { + otherOptions.put(MULTIVALUE_MODE_FIELD, parser.text()); + return true; + } + } return false; } @Override - protected MatrixStatsAggregatorBuilder createFactory(String aggregationName, ValuesSourceType valuesSourceType, - ValueType targetValueType, Map otherOptions) { - return new MatrixStatsAggregatorBuilder(aggregationName); + protected MatrixStatsAggregationBuilder createFactory(String aggregationName, ValuesSourceType valuesSourceType, + ValueType targetValueType, Map otherOptions) { + MatrixStatsAggregationBuilder builder = new MatrixStatsAggregationBuilder(aggregationName); + String mode = (String)otherOptions.get(MULTIVALUE_MODE_FIELD); + if (mode != null) { + builder.multiValueMode(MultiValueMode.fromString(mode)); + } + return builder; } } diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsResults.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsResults.java index 65cedbc2676..2652b523bde 100644 --- a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsResults.java +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsResults.java @@ -21,7 +21,7 @@ package org.elasticsearch.search.aggregations.matrix.stats; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; import java.util.Collections; @@ -34,42 +34,43 @@ import java.util.Map; * * @internal */ -class MatrixStatsResults implements Streamable { +class MatrixStatsResults implements Writeable { /** object holding results - computes results in place */ - final RunningStats results; + final protected RunningStats results; /** pearson product correlation coefficients */ - protected HashMap> correlation; + final protected Map> correlation; /** Base ctor */ - private MatrixStatsResults() { - results = RunningStats.EMPTY(); + public MatrixStatsResults() { + results = new RunningStats(); this.correlation = new HashMap<>(); } /** creates and computes result from provided stats */ public MatrixStatsResults(RunningStats stats) { - try { - this.results = stats.clone(); - this.correlation = new HashMap<>(); - } catch (CloneNotSupportedException e) { - throw new ElasticsearchException("Error trying to create multifield_stats results", e); - } + this.results = stats.clone(); + this.correlation = new HashMap<>(); this.compute(); } /** creates a results object from the given stream */ + @SuppressWarnings("unchecked") protected MatrixStatsResults(StreamInput in) { try { results = new RunningStats(in); - this.readFrom(in); + correlation = (Map>) in.readGenericValue(); } catch (IOException e) { throw new ElasticsearchException("Error trying to create multifield_stats results from stream input", e); } } - /** create an empty results object **/ - protected static MatrixStatsResults EMPTY() { - return new MatrixStatsResults(); + /** Marshalls MatrixStatsResults */ + @Override + public void writeTo(StreamOutput out) throws IOException { + // marshall results + results.writeTo(out); + // marshall correlation + out.writeGenericValue(correlation); } /** return document count */ @@ -77,8 +78,8 @@ class MatrixStatsResults implements Streamable { return results.docCount; } - /** return the field counts */ - public Map getFieldCounts() { + /** return the field counts - not public, used for getProperty() */ + protected Map getFieldCounts() { return Collections.unmodifiableMap(results.counts); } @@ -90,60 +91,66 @@ class MatrixStatsResults implements Streamable { return results.counts.get(field); } - /** return the means */ - public Map getMeans() { + /** return the means - not public, used for getProperty() */ + protected Map getMeans() { return Collections.unmodifiableMap(results.means); } /** return the mean for the requested field */ - public Double getMean(String field) { + public double getMean(String field) { + checkField(field, results.means); return results.means.get(field); } - /** return the variances */ - public Map getVariances() { + /** return the variances - not public, used for getProperty() */ + protected Map getVariances() { return Collections.unmodifiableMap(results.variances); } /** return the variance for the requested field */ - public Double getVariance(String field) { + public double getVariance(String field) { + checkField(field, results.variances); return results.variances.get(field); } - /** return the skewness */ - public Map getSkewness() { + /** return the skewness - not public, used for getProperty() */ + protected Map getSkewness() { return Collections.unmodifiableMap(results.skewness); } /** return the skewness for the requested field */ - public Double getSkewness(String field) { + public double getSkewness(String field) { + checkField(field, results.skewness); return results.skewness.get(field); } /** return the kurtosis */ - public Map getKurtosis() { + protected Map getKurtosis() { return Collections.unmodifiableMap(results.kurtosis); } /** return the kurtosis for the requested field */ - public Double getKurtosis(String field) { + public double getKurtosis(String field) { + checkField(field, results.kurtosis); return results.kurtosis.get(field); } - /** return the covariances */ - public Map> getCovariances() { + /** return the covariances as a map - not public, used for getProperty() */ + protected Map> getCovariances() { return Collections.unmodifiableMap(results.covariances); } /** return the covariance between two fields */ - public Double getCovariance(String fieldX, String fieldY) { + public double getCovariance(String fieldX, String fieldY) { if (fieldX.equals(fieldY)) { + checkField(fieldX, results.variances); return results.variances.get(fieldX); } return getValFromUpperTriangularMatrix(results.covariances, fieldX, fieldY); } - public Map> getCorrelations() { + /** return the correlations as a map - not public, used for getProperty() */ + protected Map> getCorrelations() { return Collections.unmodifiableMap(correlation); } @@ -156,10 +163,10 @@ class MatrixStatsResults implements Streamable { } /** return the value for two fields in an upper triangular matrix, regardless of row col location. */ - private Double getValFromUpperTriangularMatrix(HashMap> map, String fieldX, String fieldY) { + private double getValFromUpperTriangularMatrix(Map> map, String fieldX, String fieldY) { // for the co-value to exist, one of the two (or both) fields has to be a row key if (map.containsKey(fieldX) == false && map.containsKey(fieldY) == false) { - return null; + throw new IllegalArgumentException("neither field " + fieldX + " nor " + fieldY + " exist"); } else if (map.containsKey(fieldX)) { // fieldX exists as a row key if (map.get(fieldX).containsKey(fieldY)) { @@ -176,6 +183,15 @@ class MatrixStatsResults implements Streamable { throw new IllegalArgumentException("Coefficient not computed between fields: " + fieldX + " and " + fieldY); } + private void checkField(String field, Map map) { + if (field == null) { + throw new IllegalArgumentException("field name cannot be null"); + } + if (map.containsKey(field) == false) { + throw new IllegalArgumentException("field " + field + " does not exist"); + } + } + /** Computes final covariance, variance, and correlation */ private void compute() { final double nM1 = results.docCount - 1D; @@ -214,29 +230,4 @@ class MatrixStatsResults implements Streamable { correlation.put(rowName, corRow); } } - - /** Unmarshalls MatrixStatsResults */ - @Override - @SuppressWarnings("unchecked") - public void readFrom(StreamInput in) throws IOException { - if (in.readBoolean()) { - correlation = (HashMap>) (in.readGenericValue()); - } else { - correlation = null; - } - } - - /** Marshalls MatrixStatsResults */ - @Override - public void writeTo(StreamOutput out) throws IOException { - // marshall results - results.writeTo(out); - // marshall correlation - if (correlation != null) { - out.writeBoolean(true); - out.writeGenericValue(correlation); - } else { - out.writeBoolean(false); - } - } } diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/RunningStats.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/RunningStats.java index 57a8a8948fe..1aa36e38bd6 100644 --- a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/RunningStats.java +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/RunningStats.java @@ -18,12 +18,14 @@ */ package org.elasticsearch.search.aggregations.matrix.stats; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -35,7 +37,7 @@ import java.util.Map; * * @internal */ -public class RunningStats implements Streamable, Cloneable { +public class RunningStats implements Writeable, Cloneable { /** count of observations (same number of observations per field) */ protected long docCount = 0; /** per field sum of observations */ @@ -53,20 +55,14 @@ public class RunningStats implements Streamable, Cloneable { /** covariance values */ protected HashMap> covariances; - private RunningStats() { + public RunningStats() { init(); } - /** Ctor to create an instance of running statistics */ - public RunningStats(StreamInput in) throws IOException { - this(); - this.readFrom(in); - } - - public RunningStats(Map doc) { - if (doc != null && doc.isEmpty() == false) { + public RunningStats(final String[] fieldNames, final double[] fieldVals) { + if (fieldVals != null && fieldVals.length > 0) { init(); - this.add(doc); + this.add(fieldNames, fieldVals); } } @@ -80,15 +76,52 @@ public class RunningStats implements Streamable, Cloneable { variances = new HashMap<>(); } - /** create an empty instance */ - protected static RunningStats EMPTY() { - return new RunningStats(); + /** Ctor to create an instance of running statistics */ + @SuppressWarnings("unchecked") + public RunningStats(StreamInput in) throws IOException { + this(); + // read fieldSum + fieldSum = (HashMap)in.readGenericValue(); + // counts + counts = (HashMap)in.readGenericValue(); + // means + means = (HashMap)in.readGenericValue(); + // variances + variances = (HashMap)in.readGenericValue(); + // skewness + skewness = (HashMap)in.readGenericValue(); + // kurtosis + kurtosis = (HashMap)in.readGenericValue(); + // read covariances + covariances = (HashMap>)in.readGenericValue(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + // marshall fieldSum + out.writeGenericValue(fieldSum); + // counts + out.writeGenericValue(counts); + // mean + out.writeGenericValue(means); + // variances + out.writeGenericValue(variances); + // skewness + out.writeGenericValue(skewness); + // kurtosis + out.writeGenericValue(kurtosis); + // covariances + out.writeGenericValue(covariances); } /** updates running statistics with a documents field values **/ - public void add(Map doc) { - if (doc == null || doc.isEmpty()) { - return; + public void add(final String[] fieldNames, final double[] fieldVals) { + if (fieldNames == null) { + throw new IllegalArgumentException("Cannot add statistics without field names."); + } else if (fieldVals == null) { + throw new IllegalArgumentException("Cannot add statistics without field values."); + } else if (fieldNames.length != fieldVals.length) { + throw new IllegalArgumentException("Number of field values do not match number of field names."); } // update total, mean, and variance @@ -98,9 +131,9 @@ public class RunningStats implements Streamable, Cloneable { double m1, m2, m3, m4; // moments double d, dn, dn2, t1; final HashMap deltas = new HashMap<>(); - for (Map.Entry field : doc.entrySet()) { - fieldName = field.getKey(); - fieldValue = field.getValue(); + for (int i = 0; i < fieldNames.length; ++i) { + fieldName = fieldNames[i]; + fieldValue = fieldVals[i]; // update counts counts.put(fieldName, 1 + (counts.containsKey(fieldName) ? counts.get(fieldName) : 0)); @@ -133,17 +166,17 @@ public class RunningStats implements Streamable, Cloneable { } } - this.updateCovariance(doc, deltas); + this.updateCovariance(fieldNames, deltas); } /** Update covariance matrix */ - private void updateCovariance(final Map doc, final Map deltas) { + private void updateCovariance(final String[] fieldNames, final Map deltas) { // deep copy of hash keys (field names) - ArrayList cFieldNames = new ArrayList<>(doc.keySet()); + ArrayList cFieldNames = new ArrayList<>(Arrays.asList(fieldNames)); String fieldName; double dR, newVal; - for (Map.Entry field : doc.entrySet()) { - fieldName = field.getKey(); + for (int i = 0; i < fieldNames.length; ++i) { + fieldName = fieldNames[i]; cFieldNames.remove(fieldName); // update running covariances dR = deltas.get(fieldName); @@ -170,6 +203,21 @@ public class RunningStats implements Streamable, Cloneable { public void merge(final RunningStats other) { if (other == null) { return; + } else if (this.docCount == 0) { + for (Map.Entry fs : other.means.entrySet()) { + final String fieldName = fs.getKey(); + this.means.put(fieldName, fs.getValue().doubleValue()); + this.counts.put(fieldName, other.counts.get(fieldName).longValue()); + this.fieldSum.put(fieldName, other.fieldSum.get(fieldName).doubleValue()); + this.variances.put(fieldName, other.variances.get(fieldName).doubleValue()); + this.skewness.put(fieldName , other.skewness.get(fieldName).doubleValue()); + this.kurtosis.put(fieldName, other.kurtosis.get(fieldName).doubleValue()); + if (other.covariances.containsKey(fieldName) == true) { + this.covariances.put(fieldName, other.covariances.get(fieldName)); + } + this.docCount = other.docCount; + } + return; } final double nA = docCount; final double nB = other.docCount; @@ -226,7 +274,7 @@ public class RunningStats implements Streamable, Cloneable { } /** Merges two covariance matrices */ - private void mergeCovariance(final RunningStats other, final HashMap deltas) { + private void mergeCovariance(final RunningStats other, final Map deltas) { final double countA = docCount - other.docCount; double f, dR, newVal; for (Map.Entry fs : other.means.entrySet()) { @@ -252,107 +300,11 @@ public class RunningStats implements Streamable, Cloneable { } @Override - public void writeTo(StreamOutput out) throws IOException { - // marshall fieldSum - if (fieldSum != null) { - out.writeBoolean(true); - out.writeGenericValue(fieldSum); - } else { - out.writeBoolean(false); + public RunningStats clone() { + try { + return (RunningStats) super.clone(); + } catch (CloneNotSupportedException e) { + throw new ElasticsearchException("Error trying to create a copy of RunningStats"); } - // counts - if (counts != null) { - out.writeBoolean(true); - out.writeGenericValue(counts); - } else { - out.writeBoolean(false); - } - // mean - if (means != null) { - out.writeBoolean(true); - out.writeGenericValue(means); - } else { - out.writeBoolean(false); - } - // variances - if (variances != null) { - out.writeBoolean(true); - out.writeGenericValue(variances); - } else { - out.writeBoolean(false); - } - // skewness - if (skewness != null) { - out.writeBoolean(true); - out.writeGenericValue(skewness); - } else { - out.writeBoolean(false); - } - // kurtosis - if (kurtosis != null) { - out.writeBoolean(true); - out.writeGenericValue(kurtosis); - } else { - out.writeBoolean(false); - } - // covariances - if (covariances != null) { - out.writeBoolean(true); - out.writeGenericValue(covariances); - } else { - out.writeBoolean(false); - } - } - - @Override - @SuppressWarnings("unchecked") - public void readFrom(StreamInput in) throws IOException { - // read fieldSum - if (in.readBoolean()) { - fieldSum = (HashMap)(in.readGenericValue()); - } else { - fieldSum = null; - } - // counts - if (in.readBoolean()) { - counts = (HashMap)(in.readGenericValue()); - } else { - counts = null; - } - // means - if (in.readBoolean()) { - means = (HashMap)(in.readGenericValue()); - } else { - means = null; - } - // variances - if (in.readBoolean()) { - variances = (HashMap)(in.readGenericValue()); - } else { - variances = null; - } - // skewness - if (in.readBoolean()) { - skewness = (HashMap)(in.readGenericValue()); - } else { - skewness = null; - } - // kurtosis - if (in.readBoolean()) { - kurtosis = (HashMap)(in.readGenericValue()); - } else { - kurtosis = null; - } - // read covariances - if (in.readBoolean()) { - covariances = (HashMap>) (in.readGenericValue()); - } else { - covariances = null; - } - } - - @Override - public RunningStats clone() throws CloneNotSupportedException { - return (RunningStats)super.clone(); } } diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSource.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSource.java new file mode 100644 index 00000000000..0274c1748dd --- /dev/null +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSource.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.support; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.index.fielddata.NumericDoubleValues; +import org.elasticsearch.search.MultiValueMode; + +import java.io.IOException; +import java.util.Map; + +/** + * Class to encapsulate a set of ValuesSource objects labeled by field name + */ +public abstract class MultiValuesSource { + protected MultiValueMode multiValueMode; + protected String[] names; + protected VS[] values; + + public static class NumericMultiValuesSource extends MultiValuesSource { + public NumericMultiValuesSource(Map valuesSources, MultiValueMode multiValueMode) { + super(valuesSources, multiValueMode); + if (valuesSources != null) { + this.values = valuesSources.values().toArray(new ValuesSource.Numeric[0]); + } else { + this.values = new ValuesSource.Numeric[0]; + } + } + + public NumericDoubleValues getField(final int ordinal, LeafReaderContext ctx) throws IOException { + if (ordinal > names.length) { + throw new IndexOutOfBoundsException("ValuesSource array index " + ordinal + " out of bounds"); + } + return multiValueMode.select(values[ordinal].doubleValues(ctx), Double.NEGATIVE_INFINITY); + } + } + + public static class BytesMultiValuesSource extends MultiValuesSource { + public BytesMultiValuesSource(Map valuesSources, MultiValueMode multiValueMode) { + super(valuesSources, multiValueMode); + this.values = valuesSources.values().toArray(new ValuesSource.Bytes[0]); + } + + public Object getField(final int ordinal, LeafReaderContext ctx) throws IOException { + return values[ordinal].bytesValues(ctx); + } + } + + public static class GeoPointValuesSource extends MultiValuesSource { + public GeoPointValuesSource(Map valuesSources, MultiValueMode multiValueMode) { + super(valuesSources, multiValueMode); + this.values = valuesSources.values().toArray(new ValuesSource.GeoPoint[0]); + } + } + + private MultiValuesSource(Map valuesSources, MultiValueMode multiValueMode) { + if (valuesSources != null) { + this.names = valuesSources.keySet().toArray(new String[0]); + } + this.multiValueMode = multiValueMode; + } + + public boolean needsScores() { + boolean needsScores = false; + for (ValuesSource value : values) { + needsScores |= value.needsScores(); + } + return needsScores; + } + + public String[] fieldNames() { + return this.names; + } +} diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSourceAggregationBuilder.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSourceAggregationBuilder.java new file mode 100644 index 00000000000..e4ea8df0497 --- /dev/null +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSourceAggregationBuilder.java @@ -0,0 +1,366 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.support; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.fielddata.IndexGeoPointFieldData; +import org.elasticsearch.index.fielddata.IndexNumericFieldData; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.script.Script; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.AggregationInitializationException; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * + */ +public abstract class MultiValuesSourceAggregationBuilder> + extends AggregationBuilder { + + public static final ParseField MULTIVALUE_MODE_FIELD = new ParseField("mode"); + + public static abstract class LeafOnly> + extends MultiValuesSourceAggregationBuilder { + + protected LeafOnly(String name, Type type, ValuesSourceType valuesSourceType, ValueType targetValueType) { + super(name, type, valuesSourceType, targetValueType); + } + + /** + * Read from a stream that does not serialize its targetValueType. This should be used by most subclasses. + */ + protected LeafOnly(StreamInput in, Type type, ValuesSourceType valuesSourceType, ValueType targetValueType) throws IOException { + super(in, type, valuesSourceType, targetValueType); + } + + /** + * Read an aggregation from a stream that serializes its targetValueType. This should only be used by subclasses that override + * {@link #serializeTargetValueType()} to return true. + */ + protected LeafOnly(StreamInput in, Type type, ValuesSourceType valuesSourceType) throws IOException { + super(in, type, valuesSourceType); + } + + @Override + public AB subAggregations(Builder subFactories) { + throw new AggregationInitializationException("Aggregator [" + name + "] of type [" + + type + "] cannot accept sub-aggregations"); + } + } + + private final ValuesSourceType valuesSourceType; + private final ValueType targetValueType; + private List fields = Collections.emptyList(); + private ValueType valueType = null; + private String format = null; + private Object missing = null; + private Map missingMap = Collections.emptyMap(); + + protected MultiValuesSourceAggregationBuilder(String name, Type type, ValuesSourceType valuesSourceType, ValueType targetValueType) { + super(name, type); + if (valuesSourceType == null) { + throw new IllegalArgumentException("[valuesSourceType] must not be null: [" + name + "]"); + } + this.valuesSourceType = valuesSourceType; + this.targetValueType = targetValueType; + } + + protected MultiValuesSourceAggregationBuilder(StreamInput in, Type type, ValuesSourceType valuesSourceType, ValueType targetValueType) + throws IOException { + super(in, type); + assert false == serializeTargetValueType() : "Wrong read constructor called for subclass that provides its targetValueType"; + this.valuesSourceType = valuesSourceType; + this.targetValueType = targetValueType; + read(in); + } + + protected MultiValuesSourceAggregationBuilder(StreamInput in, Type type, ValuesSourceType valuesSourceType) throws IOException { + super(in, type); + assert serializeTargetValueType() : "Wrong read constructor called for subclass that serializes its targetValueType"; + this.valuesSourceType = valuesSourceType; + this.targetValueType = in.readOptionalWriteable(ValueType::readFromStream); + read(in); + } + + /** + * Read from a stream. + */ + @SuppressWarnings("unchecked") + private void read(StreamInput in) throws IOException { + fields = (ArrayList)in.readGenericValue(); + valueType = in.readOptionalWriteable(ValueType::readFromStream); + format = in.readOptionalString(); + missingMap = in.readMap(); + } + + @Override + protected final void doWriteTo(StreamOutput out) throws IOException { + if (serializeTargetValueType()) { + out.writeOptionalWriteable(targetValueType); + } + out.writeGenericValue(fields); + out.writeOptionalWriteable(valueType); + out.writeOptionalString(format); + out.writeMap(missingMap); + innerWriteTo(out); + } + + /** + * Write subclass' state to the stream + */ + protected abstract void innerWriteTo(StreamOutput out) throws IOException; + + /** + * Sets the field to use for this aggregation. + */ + @SuppressWarnings("unchecked") + public AB fields(List fields) { + if (fields == null) { + throw new IllegalArgumentException("[field] must not be null: [" + name + "]"); + } + this.fields = fields; + return (AB) this; + } + + /** + * Gets the field to use for this aggregation. + */ + public List fields() { + return fields; + } + + /** + * Sets the {@link ValueType} for the value produced by this aggregation + */ + @SuppressWarnings("unchecked") + public AB valueType(ValueType valueType) { + if (valueType == null) { + throw new IllegalArgumentException("[valueType] must not be null: [" + name + "]"); + } + this.valueType = valueType; + return (AB) this; + } + + /** + * Gets the {@link ValueType} for the value produced by this aggregation + */ + public ValueType valueType() { + return valueType; + } + + /** + * Sets the format to use for the output of the aggregation. + */ + @SuppressWarnings("unchecked") + public AB format(String format) { + if (format == null) { + throw new IllegalArgumentException("[format] must not be null: [" + name + "]"); + } + this.format = format; + return (AB) this; + } + + /** + * Gets the format to use for the output of the aggregation. + */ + public String format() { + return format; + } + + /** + * Sets the value to use when the aggregation finds a missing value in a + * document + */ + @SuppressWarnings("unchecked") + public AB missingMap(Map missingMap) { + if (missingMap == null) { + throw new IllegalArgumentException("[missing] must not be null: [" + name + "]"); + } + this.missingMap = missingMap; + return (AB) this; + } + + /** + * Gets the value to use when the aggregation finds a missing value in a + * document + */ + public Map missingMap() { + return missingMap; + } + + @Override + protected final MultiValuesSourceAggregatorFactory doBuild(AggregationContext context, AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder) throws IOException { + Map> configs = resolveConfig(context); + MultiValuesSourceAggregatorFactory factory = innerBuild(context, configs, parent, subFactoriesBuilder); + return factory; + } + + protected Map> resolveConfig(AggregationContext context) { + HashMap> configs = new HashMap<>(); + for (String field : fields) { + ValuesSourceConfig config = config(context, field, null); + configs.put(field, config); + } + return configs; + } + + protected abstract MultiValuesSourceAggregatorFactory innerBuild(AggregationContext context, + Map> configs, AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder) throws IOException; + + public ValuesSourceConfig config(AggregationContext context, String field, Script script) { + + ValueType valueType = this.valueType != null ? this.valueType : targetValueType; + + if (field == null) { + if (script == null) { + @SuppressWarnings("unchecked") + ValuesSourceConfig config = new ValuesSourceConfig<>(ValuesSourceType.ANY); + return config.format(resolveFormat(null, valueType)); + } + ValuesSourceType valuesSourceType = valueType != null ? valueType.getValuesSourceType() : this.valuesSourceType; + if (valuesSourceType == null || valuesSourceType == ValuesSourceType.ANY) { + // the specific value source type is undefined, but for scripts, + // we need to have a specific value source + // type to know how to handle the script values, so we fallback + // on Bytes + valuesSourceType = ValuesSourceType.BYTES; + } + ValuesSourceConfig config = new ValuesSourceConfig<>(valuesSourceType); + config.missing(missingMap.get(field)); + return config.format(resolveFormat(format, valueType)); + } + + MappedFieldType fieldType = context.searchContext().smartNameFieldType(field); + if (fieldType == null) { + ValuesSourceType valuesSourceType = valueType != null ? valueType.getValuesSourceType() : this.valuesSourceType; + ValuesSourceConfig config = new ValuesSourceConfig<>(valuesSourceType); + config.missing(missingMap.get(field)); + config.format(resolveFormat(format, valueType)); + return config.unmapped(true); + } + + IndexFieldData indexFieldData = context.searchContext().fieldData().getForField(fieldType); + + ValuesSourceConfig config; + if (valuesSourceType == ValuesSourceType.ANY) { + if (indexFieldData instanceof IndexNumericFieldData) { + config = new ValuesSourceConfig<>(ValuesSourceType.NUMERIC); + } else if (indexFieldData instanceof IndexGeoPointFieldData) { + config = new ValuesSourceConfig<>(ValuesSourceType.GEOPOINT); + } else { + config = new ValuesSourceConfig<>(ValuesSourceType.BYTES); + } + } else { + config = new ValuesSourceConfig<>(valuesSourceType); + } + + config.fieldContext(new FieldContext(field, indexFieldData, fieldType)); + config.missing(missingMap.get(field)); + return config.format(fieldType.docValueFormat(format, null)); + } + + private static DocValueFormat resolveFormat(@Nullable String format, @Nullable ValueType valueType) { + if (valueType == null) { + return DocValueFormat.RAW; // we can't figure it out + } + DocValueFormat valueFormat = valueType.defaultFormat(); + if (valueFormat instanceof DocValueFormat.Decimal && format != null) { + valueFormat = new DocValueFormat.Decimal(format); + } + return valueFormat; + } + + /** + * Should this builder serialize its targetValueType? Defaults to false. All subclasses that override this to true + * should use the three argument read constructor rather than the four argument version. + */ + protected boolean serializeTargetValueType() { + return false; + } + + @Override + public final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + // todo add ParseField support to XContentBuilder + if (fields != null) { + builder.field(CommonFields.FIELDS.getPreferredName(), fields); + } + if (missing != null) { + builder.field(CommonFields.MISSING.getPreferredName(), missing); + } + if (format != null) { + builder.field(CommonFields.FORMAT.getPreferredName(), format); + } + if (valueType != null) { + builder.field(CommonFields.VALUE_TYPE.getPreferredName(), valueType.getPreferredName()); + } + doXContentBody(builder, params); + builder.endObject(); + return builder; + } + + protected abstract XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException; + + @Override + protected final int doHashCode() { + return Objects.hash(fields, format, missing, targetValueType, valueType, valuesSourceType, + innerHashCode()); + } + + protected abstract int innerHashCode(); + + @Override + protected final boolean doEquals(Object obj) { + MultiValuesSourceAggregationBuilder other = (MultiValuesSourceAggregationBuilder) obj; + if (!Objects.equals(fields, other.fields)) + return false; + if (!Objects.equals(format, other.format)) + return false; + if (!Objects.equals(missing, other.missing)) + return false; + if (!Objects.equals(targetValueType, other.targetValueType)) + return false; + if (!Objects.equals(valueType, other.valueType)) + return false; + if (!Objects.equals(valuesSourceType, other.valuesSourceType)) + return false; + return innerEquals(obj); + } + + protected abstract boolean innerEquals(Object obj); +} diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSourceAggregatorFactory.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSourceAggregatorFactory.java new file mode 100644 index 00000000000..956f953da10 --- /dev/null +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSourceAggregatorFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.support; + +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public abstract class MultiValuesSourceAggregatorFactory> + extends AggregatorFactory { + + protected Map> configs; + + public MultiValuesSourceAggregatorFactory(String name, Type type, Map> configs, + AggregationContext context, AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder, + Map metaData) throws IOException { + super(name, type, context, parent, subFactoriesBuilder, metaData); + this.configs = configs; + } + + @Override + public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List pipelineAggregators, + Map metaData) throws IOException { + HashMap valuesSources = new HashMap<>(); + + for (Map.Entry> config : configs.entrySet()) { + VS vs = context.valuesSource(config.getValue(), context.searchContext()); + if (vs != null) { + valuesSources.put(config.getKey(), vs); + } + } + if (valuesSources.isEmpty()) { + return createUnmapped(parent, pipelineAggregators, metaData); + } + return doCreateInternal(valuesSources, parent, collectsFromSingleBucket, pipelineAggregators, metaData); + } + + protected abstract Aggregator createUnmapped(Aggregator parent, List pipelineAggregators, + Map metaData) throws IOException; + + protected abstract Aggregator doCreateInternal(Map valuesSources, Aggregator parent, boolean collectsFromSingleBucket, + List pipelineAggregators, Map metaData) throws IOException; + +} diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSourceParser.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSourceParser.java new file mode 100644 index 00000000000..dd2b69696f3 --- /dev/null +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSourceParser.java @@ -0,0 +1,239 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.support; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.ParseFieldMatcher; +import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryParseContext; +import org.elasticsearch.script.Script.ScriptField; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregationBuilder.CommonFields; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * + */ +public abstract class MultiValuesSourceParser implements Aggregator.Parser { + + public abstract static class AnyValuesSourceParser extends MultiValuesSourceParser { + + protected AnyValuesSourceParser(boolean formattable) { + super(formattable, ValuesSourceType.ANY, null); + } + } + + public abstract static class NumericValuesSourceParser extends MultiValuesSourceParser { + + protected NumericValuesSourceParser(boolean formattable) { + super(formattable, ValuesSourceType.NUMERIC, ValueType.NUMERIC); + } + } + + public abstract static class BytesValuesSourceParser extends MultiValuesSourceParser { + + protected BytesValuesSourceParser(boolean formattable) { + super(formattable, ValuesSourceType.BYTES, ValueType.STRING); + } + } + + public abstract static class GeoPointValuesSourceParser extends MultiValuesSourceParser { + + protected GeoPointValuesSourceParser(boolean formattable) { + super(formattable, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT); + } + } + + private boolean formattable = false; + private ValuesSourceType valuesSourceType = null; + private ValueType targetValueType = null; + + private MultiValuesSourceParser(boolean formattable, ValuesSourceType valuesSourceType, ValueType targetValueType) { + this.valuesSourceType = valuesSourceType; + this.targetValueType = targetValueType; + this.formattable = formattable; + } + + @Override + public final MultiValuesSourceAggregationBuilder parse(String aggregationName, QueryParseContext context) + throws IOException { + + XContentParser parser = context.parser(); + List fields = null; + ValueType valueType = null; + String format = null; + Map missingMap = null; + Map otherOptions = new HashMap<>(); + final ParseFieldMatcher parseFieldMatcher = context.getParseFieldMatcher(); + + XContentParser.Token token; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if (parseFieldMatcher.match(currentFieldName, CommonFields.FIELDS)) { + fields = Collections.singletonList(parser.text()); + } else if (formattable && parseFieldMatcher.match(currentFieldName, CommonFields.FORMAT)) { + format = parser.text(); + } else if (parseFieldMatcher.match(currentFieldName, CommonFields.VALUE_TYPE)) { + throw new ParsingException(parser.getTokenLocation(), + "Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "]. " + + "Multi-field aggregations do not support scripts."); + } else if (!token(aggregationName, currentFieldName, token, parser, context.getParseFieldMatcher(), otherOptions)) { + throw new ParsingException(parser.getTokenLocation(), + "Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "]."); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (parseFieldMatcher.match(currentFieldName, CommonFields.MISSING)) { + missingMap = new HashMap<>(); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + parseMissingAndAdd(aggregationName, currentFieldName, parser, missingMap); + } + } else if (context.getParseFieldMatcher().match(currentFieldName, ScriptField.SCRIPT)) { + throw new ParsingException(parser.getTokenLocation(), + "Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "]. " + + "Multi-field aggregations do not support scripts."); + + } else if (!token(aggregationName, currentFieldName, token, parser, context.getParseFieldMatcher(), otherOptions)) { + throw new ParsingException(parser.getTokenLocation(), + "Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "]."); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if (context.getParseFieldMatcher().match(currentFieldName, ScriptField.SCRIPT)) { + throw new ParsingException(parser.getTokenLocation(), + "Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "]. " + + "Multi-field aggregations do not support scripts."); + } else if (parseFieldMatcher.match(currentFieldName, CommonFields.FIELDS)) { + fields = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token == XContentParser.Token.VALUE_STRING) { + fields.add(parser.text()); + } else { + throw new ParsingException(parser.getTokenLocation(), + "Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "]."); + } + } + } else if (!token(aggregationName, currentFieldName, token, parser, context.getParseFieldMatcher(), otherOptions)) { + throw new ParsingException(parser.getTokenLocation(), + "Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "]."); + } + } else if (!token(aggregationName, currentFieldName, token, parser, context.getParseFieldMatcher(), otherOptions)) { + throw new ParsingException(parser.getTokenLocation(), + "Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "]."); + } + } + + MultiValuesSourceAggregationBuilder factory = createFactory(aggregationName, this.valuesSourceType, this.targetValueType, + otherOptions); + if (fields != null) { + factory.fields(fields); + } + if (valueType != null) { + factory.valueType(valueType); + } + if (format != null) { + factory.format(format); + } + if (missingMap != null) { + factory.missingMap(missingMap); + } + return factory; + } + + private final void parseMissingAndAdd(final String aggregationName, final String currentFieldName, + XContentParser parser, final Map missing) throws IOException { + XContentParser.Token token = parser.currentToken(); + if (token == null) { + token = parser.nextToken(); + } + + if (token == XContentParser.Token.FIELD_NAME) { + final String fieldName = parser.currentName(); + if (missing.containsKey(fieldName)) { + throw new ParsingException(parser.getTokenLocation(), + "Missing field [" + fieldName + "] already defined as [" + missing.get(fieldName) + + "] in [" + aggregationName + "]."); + } + parser.nextToken(); + missing.put(fieldName, parser.objectText()); + } else { + throw new ParsingException(parser.getTokenLocation(), + "Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName + "]"); + } + } + + /** + * Creates a {@link ValuesSourceAggregationBuilder} from the information + * gathered by the subclass. Options parsed in + * {@link MultiValuesSourceParser} itself will be added to the factory + * after it has been returned by this method. + * + * @param aggregationName + * the name of the aggregation + * @param valuesSourceType + * the type of the {@link ValuesSource} + * @param targetValueType + * the target type of the final value output by the aggregation + * @param otherOptions + * a {@link Map} containing the extra options parsed by the + * {@link #token(String, String, org.elasticsearch.common.xcontent.XContentParser.Token, + * XContentParser, ParseFieldMatcher, Map)} + * method + * @return the created factory + */ + protected abstract MultiValuesSourceAggregationBuilder createFactory(String aggregationName, ValuesSourceType valuesSourceType, + ValueType targetValueType, Map otherOptions); + + /** + * Allows subclasses of {@link MultiValuesSourceParser} to parse extra + * parameters and store them in a {@link Map} which will later be passed to + * {@link #createFactory(String, ValuesSourceType, ValueType, Map)}. + * + * @param aggregationName + * the name of the aggregation + * @param currentFieldName + * the name of the current field being parsed + * @param token + * the current token for the parser + * @param parser + * the parser + * @param parseFieldMatcher + * the {@link ParseFieldMatcher} to use to match field names + * @param otherOptions + * a {@link Map} of options to be populated by successive calls + * to this method which will then be passed to the + * {@link #createFactory(String, ValuesSourceType, ValueType, Map)} + * method + * @return true if the current token was correctly parsed, + * false otherwise + * @throws IOException + * if an error occurs whilst parsing + */ + protected abstract boolean token(String aggregationName, String currentFieldName, XContentParser.Token token, XContentParser parser, + ParseFieldMatcher parseFieldMatcher, Map otherOptions) throws IOException; +} diff --git a/modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsTestCase.java b/modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/BaseMatrixStatsTestCase.java similarity index 99% rename from modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsTestCase.java rename to modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/BaseMatrixStatsTestCase.java index 84b032b82b6..b1296bb1146 100644 --- a/modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsTestCase.java +++ b/modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/BaseMatrixStatsTestCase.java @@ -29,7 +29,7 @@ import static org.hamcrest.Matchers.equalTo; /** * */ -public class MatrixStatsTestCase extends ESTestCase { +public abstract class BaseMatrixStatsTestCase extends ESTestCase { protected final int numObs = atLeast(10000); protected final ArrayList fieldA = new ArrayList<>(numObs); protected final ArrayList fieldB = new ArrayList<>(numObs); diff --git a/modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/RunningStatsTests.java b/modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/RunningStatsTests.java index 2992fd57a7e..f90f00d2a79 100644 --- a/modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/RunningStatsTests.java +++ b/modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/RunningStatsTests.java @@ -18,13 +18,12 @@ */ package org.elasticsearch.search.aggregations.matrix.stats; -import java.util.HashMap; import java.util.List; /** * */ -public class RunningStatsTests extends MatrixStatsTestCase { +public class RunningStatsTests extends BaseMatrixStatsTestCase { /** test running stats */ public void testRunningStats() throws Exception { @@ -56,15 +55,18 @@ public class RunningStatsTests extends MatrixStatsTestCase { } private RunningStats createRunningStats(List fieldAObs, List fieldBObs) { - RunningStats stats = RunningStats.EMPTY(); + RunningStats stats = new RunningStats(); // create a document with two numeric fields - final HashMap doc = new HashMap<>(2); + final String[] fieldNames = new String[2]; + fieldNames[0] = fieldAKey; + fieldNames[1] = fieldBKey; + final double[] fieldVals = new double[2]; // running stats computation for (int n = 0; n < fieldAObs.size(); ++n) { - doc.put(fieldAKey, fieldAObs.get(n)); - doc.put(fieldBKey, fieldBObs.get(n)); - stats.add(doc); + fieldVals[0] = fieldAObs.get(n); + fieldVals[1] = fieldBObs.get(n); + stats.add(fieldNames, fieldVals); } return stats; } diff --git a/modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/20_empty_bucket.yaml b/modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/20_empty_bucket.yaml index d91c53e9e42..b6fade2cf18 100644 --- a/modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/20_empty_bucket.yaml +++ b/modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/20_empty_bucket.yaml @@ -43,6 +43,6 @@ search: index: empty_bucket_idx type: test - body: {"aggs": {"histo": {"histogram": {"field": "val1", "interval": 1, "min_doc_count": 0}, "aggs": { "mfs" : { "matrix_stats": {"field": ["value", "val1"]} } } } } } + body: {"aggs": {"histo": {"histogram": {"field": "val1", "interval": 1, "min_doc_count": 0}, "aggs": { "mfs" : { "matrix_stats": {"fields": ["value", "val1"]} } } } } } - match: {hits.total: 2} diff --git a/modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/30_single_value_field.yaml b/modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/30_single_value_field.yaml index 0fe3590af48..6c2098f181e 100644 --- a/modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/30_single_value_field.yaml +++ b/modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/30_single_value_field.yaml @@ -130,7 +130,7 @@ setup: search: index: unmapped type: test - body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "val3"]} } } } + body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2", "val3"]} } } } - match: {hits.total: 0} @@ -141,10 +141,10 @@ setup: search: index: test type: test - body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val3"]} } } } + body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val3"]} } } } - match: {hits.total: 15} - - match: {aggregations.mfs.count.0: 15} + - match: {aggregations.mfs.fields.0.count: 15} --- "Partially unmapped": @@ -153,32 +153,41 @@ setup: search: index: [test, unmapped] type: test - body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "val3"]} } } } + body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2", "val3"]} } } } - match: {hits.total: 15} - - match: {aggregations.mfs.count.0: 14} + - match: {aggregations.mfs.fields.0.count: 14} + - match: {aggregations.mfs.fields.2.correlation.val2: 0.9569513137793205} + +--- +"Partially unmapped with missing default": + + - do: + search: + index: [test, unmapped] + type: test + body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2", "val3"], "missing" : {"val2" : 10} } } } } + + - match: {hits.total: 15} + - match: {aggregations.mfs.fields.0.count: 15} + - match: {aggregations.mfs.fields.2.correlation.val2: 0.9567970467908384} --- "With script": - do: + catch: /parsing_exception/ search: index: test type: test - body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2"], "script" : { "my_script" : {"inline" : "1 + doc['val1'].value", "lang" : "js"} } } } } } - - - match: {hits.total: 15} - - match: {aggregations.mfs.count.0: 14} + body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2"], "script" : { "my_script" : {"inline" : "1 + doc['val1'].value", "lang" : "js"} } } } } } --- "With script params": - do: + catch: /parsing_exception/ search: index: test type: test - body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "val3"], "script" : { "my_script" : {"inline" : "my_var + doc['val1'].value", "params" : { "my_var" : 1 }, "lang" : "js" } } } } } } - - - match: {hits.total: 15} - - match: {aggregations.mfs.count.0: 14} - - match: {aggregations.mfs.correlation.1.2: 0.9569513137793205} + body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2", "val3"], "script" : { "my_script" : {"inline" : "my_var + doc['val1'].value", "params" : { "my_var" : 1 }, "lang" : "js" } } } } } } diff --git a/modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/40_multi_value_field.yaml b/modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/40_multi_value_field.yaml index 87b5dea19fe..10b722555d8 100644 --- a/modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/40_multi_value_field.yaml +++ b/modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/40_multi_value_field.yaml @@ -130,21 +130,35 @@ setup: search: index: unmapped type: test - body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "vals"]} } } } + body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2", "vals"]} } } } - match: {hits.total: 0} --- -"Multi value field": +"Multi value field Max": - do: search: index: test type: test - body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val3"]} } } } + body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "vals"], "mode" : "max"} } } } - match: {hits.total: 15} - - match: {aggregations.mfs.count.0: 15} + - match: {aggregations.mfs.fields.0.count: 14} + - match: {aggregations.mfs.fields.0.correlation.val1: 0.06838646533369998} + +--- +"Multi value field Min": + + - do: + search: + index: test + type: test + body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "vals"], "mode" : "min"} } } } + + - match: {hits.total: 15} + - match: {aggregations.mfs.fields.0.count: 14} + - match: {aggregations.mfs.fields.0.correlation.val1: -0.09777682707831963} --- "Partially unmapped": @@ -153,32 +167,41 @@ setup: search: index: [test, unmapped] type: test - body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "vals"]} } } } + body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2", "vals"]} } } } - match: {hits.total: 15} - - match: {aggregations.mfs.count.0: 13} + - match: {aggregations.mfs.fields.0.count: 13} + - match: {aggregations.mfs.fields.0.correlation.val1: -0.044997535185684244} + +--- +"Partially unmapped with missing defaults": + + - do: + search: + index: [test, unmapped] + type: test + body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val2", "vals"], "missing" : {"val2" : 10, "vals" : 5 } } } } } + + - match: {hits.total: 15} + - match: {aggregations.mfs.fields.0.count: 15} + - match: {aggregations.mfs.fields.0.correlation.val2: 0.04028024709708195} --- "With script": - do: + catch: /parsing_exception/ search: index: test type: test - body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["vals", "val3"], "script" : { "my_script" : {"inline" : "1 + doc['val1'].value", "lang" : "js"} } } } } } - - - match: {hits.total: 15} - - match: {aggregations.mfs.count.0: 14} + body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["vals", "val3"], "script" : { "my_script" : {"inline" : "1 + doc['val1'].value", "lang" : "js"} } } } } } --- "With script params": - do: + catch: /parsing_exception/ search: index: test type: test - body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val3", "vals"], "script" : { "my_script" : {"inline" : "my_var + doc['val1'].value", "params" : { "my_var" : 1 }, "lang" : "js" } } } } } } - - - match: {hits.total: 15} - - match: {aggregations.mfs.count.0: 14} - - match: {aggregations.mfs.correlation.1.2: -0.055971032866899535} + body: {"aggs": { "mfs" : { "matrix_stats": {"fields": ["val1", "val3", "vals"], "script" : { "my_script" : {"inline" : "my_var + doc['val1'].value", "params" : { "my_var" : 1 }, "lang" : "js" } } } } } }