From d9f0245b108806fb9265250ff07fad8e04e272db Mon Sep 17 00:00:00 2001 From: Christos Soulios <1561376+csoulios@users.noreply.github.com> Date: Fri, 15 Nov 2019 14:36:21 +0200 Subject: [PATCH] [7.x] Implement stats aggregation for string terms (#49097) Backport of #47468 to 7.x This PR adds a new metric aggregation called string_stats that operates on string terms of a document and returns the following: min_length: The length of the shortest term max_length: The length of the longest term avg_length: The average length of all terms distribution: The probability distribution of all characters appearing in all terms entropy: The total Shannon entropy value calculated for all terms This aggregation has been implemented as an analytics plugin. --- docs/build.gradle | 2 +- docs/reference/aggregations/metrics.asciidoc | 2 + .../metrics/string-stats-aggregation.asciidoc | 217 +++++++++++++ .../AnalyticsAggregationBuilders.java | 7 +- .../xpack/analytics/AnalyticsPlugin.java | 28 +- .../stringstats/InternalStringStats.java | 287 ++++++++++++++++++ .../StringStatsAggregationBuilder.java | 126 ++++++++ .../stringstats/StringStatsAggregator.java | 175 +++++++++++ .../StringStatsAggregatorFactory.java | 56 ++++ .../StringStatsAggregatorTests.java | 262 ++++++++++++++++ 10 files changed, 1153 insertions(+), 9 deletions(-) create mode 100644 docs/reference/aggregations/metrics/string-stats-aggregation.asciidoc create mode 100644 x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStats.java create mode 100644 x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregationBuilder.java create mode 100644 x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregator.java create mode 100644 x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregatorFactory.java create mode 100644 x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregatorTests.java diff --git a/docs/build.gradle b/docs/build.gradle index 597e304cc54..f88d68d65f1 100644 --- a/docs/build.gradle +++ b/docs/build.gradle @@ -177,7 +177,7 @@ buildRestTests.setups['ledger'] = ''' {"index":{}} {"date": "2015/01/01 00:00:00", "amount": 200, "type": "sale", "description": "something"} {"index":{}} - {"date": "2015/01/01 00:00:00", "amount": 10, "type": "expense", "decription": "another thing"} + {"date": "2015/01/01 00:00:00", "amount": 10, "type": "expense", "description": "another thing"} {"index":{}} {"date": "2015/01/01 00:00:00", "amount": 150, "type": "sale", "description": "blah"} {"index":{}} diff --git a/docs/reference/aggregations/metrics.asciidoc b/docs/reference/aggregations/metrics.asciidoc index 1d3daa2ceca..5bcc96d9ae8 100644 --- a/docs/reference/aggregations/metrics.asciidoc +++ b/docs/reference/aggregations/metrics.asciidoc @@ -35,6 +35,8 @@ include::metrics/scripted-metric-aggregation.asciidoc[] include::metrics/stats-aggregation.asciidoc[] +include::metrics/string-stats-aggregation.asciidoc[] + include::metrics/sum-aggregation.asciidoc[] include::metrics/tophits-aggregation.asciidoc[] diff --git a/docs/reference/aggregations/metrics/string-stats-aggregation.asciidoc b/docs/reference/aggregations/metrics/string-stats-aggregation.asciidoc new file mode 100644 index 00000000000..be3a5a3b867 --- /dev/null +++ b/docs/reference/aggregations/metrics/string-stats-aggregation.asciidoc @@ -0,0 +1,217 @@ +[role="xpack"] +[testenv="basic"] +[[search-aggregations-metrics-string-stats-aggregation]] +=== String Stats Aggregation + +A `multi-value` metrics aggregation that computes statistics over string values extracted from the aggregated documents. +These values can be retrieved either from specific `keyword` fields in the documents or can be generated by a provided script. + +The string stats aggregation returns the following results: + +* `count` - The number of non-empty fields counted. +* `min_length` - The length of the shortest term. +* `max_length` - The length of the longest term. +* `avg_length` - The average length computed over all terms. +* `entropy` - The https://en.wikipedia.org/wiki/Entropy_(information_theory)[Shannon Entropy] value computed over all terms collected by +the aggregation. Shannon entropy quantifies the amount of information contained in the field. It is a very useful metric for +measuring a wide range of properties of a data set, such as diversity, similarity, randomness etc. + +Assuming the data consists of a twitter messages: + +[source,console] +-------------------------------------------------- +POST /twitter/_search?size=0 +{ + "aggs" : { + "message_stats" : { "string_stats" : { "field" : "message.keyword" } } + } +} +-------------------------------------------------- +// TEST[setup:twitter] + +The above aggregation computes the string statistics for the `message` field in all documents. The aggregation type +is `string_stats` and the `field` parameter defines the field of the documents the stats will be computed on. +The above will return the following: + +[source,console-result] +-------------------------------------------------- +{ + ... + + "aggregations": { + "message_stats" : { + "count" : 5, + "min_length" : 24, + "max_length" : 30, + "avg_length" : 28.8, + "entropy" : 3.94617750050791 + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/] + +The name of the aggregation (`message_stats` above) also serves as the key by which the aggregation result can be retrieved from +the returned response. + +==== Character distribution + +The computation of the Shannon Entropy value is based on the probability of each character appearing in all terms collected +by the aggregation. To view the probability distribution for all characters, we can add the `show_distribution` (default: `false`) parameter. + +[source,console] +-------------------------------------------------- +POST /twitter/_search?size=0 +{ + "aggs" : { + "message_stats" : { + "string_stats" : { + "field" : "message.keyword", + "show_distribution": true <1> + } + } + } +} +-------------------------------------------------- +// TEST[setup:twitter] + +<1> Set the `show_distribution` parameter to `true`, so that probability distribution for all characters is returned in the results. + +[source,console-result] +-------------------------------------------------- +{ + ... + + "aggregations": { + "message_stats" : { + "count" : 5, + "min_length" : 24, + "max_length" : 30, + "avg_length" : 28.8, + "entropy" : 3.94617750050791, + "distribution" : { + " " : 0.1527777777777778, + "e" : 0.14583333333333334, + "s" : 0.09722222222222222, + "m" : 0.08333333333333333, + "t" : 0.0763888888888889, + "h" : 0.0625, + "a" : 0.041666666666666664, + "i" : 0.041666666666666664, + "r" : 0.041666666666666664, + "g" : 0.034722222222222224, + "n" : 0.034722222222222224, + "o" : 0.034722222222222224, + "u" : 0.034722222222222224, + "b" : 0.027777777777777776, + "w" : 0.027777777777777776, + "c" : 0.013888888888888888, + "E" : 0.006944444444444444, + "l" : 0.006944444444444444, + "1" : 0.006944444444444444, + "2" : 0.006944444444444444, + "3" : 0.006944444444444444, + "4" : 0.006944444444444444, + "y" : 0.006944444444444444 + } + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/] + +The `distribution` object shows the probability of each character appearing in all terms. The characters are sorted by descending probability. + +==== Script + +Computing the message string stats based on a script: + +[source,console] +-------------------------------------------------- +POST /twitter/_search?size=0 +{ + "aggs" : { + "message_stats" : { + "string_stats" : { + "script" : { + "lang": "painless", + "source": "doc['message.keyword'].value" + } + } + } + } +} +-------------------------------------------------- +// TEST[setup:twitter] + +This will interpret the `script` parameter as an `inline` script with the `painless` script language and no script parameters. +To use a stored script use the following syntax: + +[source,console] +-------------------------------------------------- +POST /twitter/_search?size=0 +{ + "aggs" : { + "message_stats" : { + "string_stats" : { + "script" : { + "id": "my_script", + "params" : { + "field" : "message.keyword" + } + } + } + } + } +} +-------------------------------------------------- +// TEST[setup:twitter,stored_example_script] + +===== Value Script + +We can use a value script to modify the message (eg we can add a prefix) and compute the new stats: + +[source,console] +-------------------------------------------------- +POST /twitter/_search?size=0 +{ + "aggs" : { + "message_stats" : { + "string_stats" : { + "field" : "message.keyword", + "script" : { + "lang": "painless", + "source": "params.prefix + _value", + "params" : { + "prefix" : "Message: " + } + } + } + } + } +} +-------------------------------------------------- +// TEST[setup:twitter] + +==== Missing value + +The `missing` parameter defines how documents that are missing a value should be treated. +By default they will be ignored but it is also possible to treat them as if they had a value. + +[source,console] +-------------------------------------------------- +POST /twitter/_search?size=0 +{ + "aggs" : { + "message_stats" : { + "string_stats" : { + "field" : "message.keyword", + "missing": "[empty message]" <1> + } + } + } +} +-------------------------------------------------- +// TEST[setup:twitter] + +<1> Documents without a value in the `message` field will be treated as documents that have the value `[empty message]`. diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsAggregationBuilders.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsAggregationBuilders.java index e6a7b8cb9c2..ede8d965741 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsAggregationBuilders.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsAggregationBuilders.java @@ -6,10 +6,15 @@ package org.elasticsearch.xpack.analytics; import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder; +import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder; public class AnalyticsAggregationBuilders { - public static CumulativeCardinalityPipelineAggregationBuilder cumulativeCaardinality(String name, String bucketsPath) { + public static CumulativeCardinalityPipelineAggregationBuilder cumulativeCardinality(String name, String bucketsPath) { return new CumulativeCardinalityPipelineAggregationBuilder(name, bucketsPath); } + + public static StringStatsAggregationBuilder stringStats(String name) { + return new StringStatsAggregationBuilder(name); + } } diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java index 6dbe0ff05a6..3beed3712cd 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java @@ -13,11 +13,13 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SearchPlugin; -import org.elasticsearch.xpack.core.XPackPlugin; -import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction; import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction; import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder; import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregator; +import org.elasticsearch.xpack.analytics.stringstats.InternalStringStats; +import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction; import java.util.ArrayList; import java.util.Collection; @@ -40,11 +42,23 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi @Override public List getPipelineAggregations() { - return singletonList(new PipelineAggregationSpec( - CumulativeCardinalityPipelineAggregationBuilder.NAME, - CumulativeCardinalityPipelineAggregationBuilder::new, - CumulativeCardinalityPipelineAggregator::new, - CumulativeCardinalityPipelineAggregationBuilder::parse)); + return singletonList( + new PipelineAggregationSpec( + CumulativeCardinalityPipelineAggregationBuilder.NAME, + CumulativeCardinalityPipelineAggregationBuilder::new, + CumulativeCardinalityPipelineAggregator::new, + CumulativeCardinalityPipelineAggregationBuilder::parse) + ); + } + + @Override + public List getAggregations() { + return singletonList( + new AggregationSpec( + StringStatsAggregationBuilder.NAME, + StringStatsAggregationBuilder::new, + StringStatsAggregationBuilder::parse).addResultReader(InternalStringStats::new) + ); } @Override diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStats.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStats.java new file mode 100644 index 00000000000..88cb505615c --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStats.java @@ -0,0 +1,287 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.analytics.stringstats; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.metrics.CompensatedSum; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class InternalStringStats extends InternalAggregation { + + enum Metrics { + count { + Object getFieldValue(InternalStringStats stats) { + return stats.getCount(); + } + }, + min_length { + Object getFieldValue(InternalStringStats stats) { + return stats.getMinLength(); + } + }, max_length { + Object getFieldValue(InternalStringStats stats) { + return stats.getMaxLength(); + } + }, + avg_length { + Object getFieldValue(InternalStringStats stats) { + return stats.getAvgLength(); + } + }, + entropy { + Object getFieldValue(InternalStringStats stats) { + return stats.getEntropy(); + } + }; + + abstract Object getFieldValue(InternalStringStats stats); + } + + private final DocValueFormat format; + private final boolean showDistribution; + private final long count; + private final long totalLength; + private final int minLength; + private final int maxLength; + private final Map charOccurrences; + + public InternalStringStats(String name, long count, long totalLength, int minLength, int maxLength, + Map charOccurences, boolean showDistribution, + DocValueFormat formatter, + List pipelineAggregators, + Map metaData) { + super(name, pipelineAggregators, metaData); + this.format = formatter; + this.showDistribution = showDistribution; + this.count = count; + this.totalLength = totalLength; + this.minLength = minLength; + this.maxLength = maxLength; + this.charOccurrences = charOccurences; + } + + /** Read from a stream. */ + public InternalStringStats(StreamInput in) throws IOException { + super(in); + format = in.readNamedWriteable(DocValueFormat.class); + showDistribution = in.readBoolean(); + count = in.readVLong(); + totalLength = in.readVLong(); + minLength = in.readVInt(); + maxLength = in.readVInt(); + charOccurrences = in.readMap(StreamInput::readString, StreamInput::readLong); + } + + @Override + protected final void doWriteTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(format); + out.writeBoolean(showDistribution); + out.writeVLong(count); + out.writeVLong(totalLength); + out.writeVInt(minLength); + out.writeVInt(maxLength); + out.writeMap(charOccurrences, StreamOutput::writeString, StreamOutput::writeLong); + } + + public String getWriteableName() { + return StringStatsAggregationBuilder.NAME; + } + + public long getCount() { + return count; + } + + public int getMinLength() { + return minLength; + } + + public int getMaxLength() { + return maxLength; + } + + public double getAvgLength() { + return (double) totalLength / count; + } + + public double getEntropy() { + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + CompensatedSum kahanSummation = new CompensatedSum(0, 0); + + for (double p : getDistribution().values()) { + if (p > 0) { + double value = p * log2(p); + kahanSummation.add(value); + } + } + return -kahanSummation.value(); + } + + /** + * Convert the character occurrences map to character frequencies. + * + * @return A map with the character as key and the probability of + * this character to occur as value. The map is ordered by frequency descending. + */ + Map getDistribution() { + return charOccurrences.entrySet().stream() + .sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue())) + .collect( + Collectors.toMap(e -> e.getKey(), e -> (double) e.getValue() / totalLength, + (e1, e2) -> e2, LinkedHashMap::new) + ); + } + + /** Calculate base 2 logarithm */ + static double log2(double d) { + return Math.log(d) / Math.log(2.0); + } + + public String getCountAsString() { + return format.format(getCount()).toString(); + } + + public String getMinLengthAsString() { + return format.format(getMinLength()).toString(); + } + + public String getMaxLengthAsString() { + return format.format(getMaxLength()).toString(); + } + + public String getAvgLengthAsString() { + return format.format(getAvgLength()).toString(); + } + + public String getEntropyAsString() { + return format.format(getEntropy()).toString(); + } + + public Object value(String name) { + try { + return Metrics.valueOf(name).getFieldValue(this); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Unknown value [" + name + "] in string stats aggregation"); + } + } + + @Override + public InternalStringStats doReduce(List aggregations, ReduceContext reduceContext) { + long count = 0; + long totalLength = 0; + int minLength = Integer.MAX_VALUE; + int maxLength = Integer.MIN_VALUE; + Map occurs = new HashMap<>(); + + for (InternalAggregation aggregation : aggregations) { + InternalStringStats stats = (InternalStringStats) aggregation; + count += stats.getCount(); + minLength = Math.min(minLength, stats.getMinLength()); + maxLength = Math.max(maxLength, stats.getMaxLength()); + totalLength += stats.totalLength; + stats.charOccurrences.forEach((k, v) -> + occurs.merge(k, v, (oldValue, newValue) -> oldValue + newValue) + ); + } + + return new InternalStringStats(name, count, totalLength, minLength, maxLength, occurs, + showDistribution, format, pipelineAggregators(), getMetaData()); + } + + @Override + public Object getProperty(List path) { + if (path.isEmpty()) { + return this; + } else if (path.size() == 1) { + return value(path.get(0)); + } else { + throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path); + } + } + + static class Fields { + public static final ParseField COUNT = new ParseField("count"); + public static final ParseField MIN_LENGTH = new ParseField("min_length"); + public static final ParseField MIN_LENGTH_AS_STRING = new ParseField("min_length_as_string"); + public static final ParseField MAX_LENGTH = new ParseField("max_length"); + public static final ParseField MAX_LENGTH_AS_STRING = new ParseField("max_as_string"); + public static final ParseField AVG_LENGTH = new ParseField("avg_length"); + public static final ParseField AVG_LENGTH_AS_STRING = new ParseField("avg_length_as_string"); + public static final ParseField ENTROPY = new ParseField("entropy"); + public static final ParseField ENTROPY_AS_STRING = new ParseField("entropy_string"); + public static final ParseField DISTRIBUTION = new ParseField("distribution"); + public static final ParseField DISTRIBUTION_AS_STRING = new ParseField("distribution_string"); + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.COUNT.getPreferredName(), count); + if (count > 0) { + builder.field(Fields.MIN_LENGTH.getPreferredName(), minLength); + builder.field(Fields.MAX_LENGTH.getPreferredName(), maxLength); + builder.field(Fields.AVG_LENGTH.getPreferredName(), getAvgLength()); + builder.field(Fields.ENTROPY.getPreferredName(), getEntropy()); + if (showDistribution == true) { + builder.field(Fields.DISTRIBUTION.getPreferredName(), getDistribution()); + } + if (format != DocValueFormat.RAW) { + builder.field(Fields.MIN_LENGTH_AS_STRING.getPreferredName(), format.format(getMinLength())); + builder.field(Fields.MAX_LENGTH_AS_STRING.getPreferredName(), format.format(getMaxLength())); + builder.field(Fields.AVG_LENGTH_AS_STRING.getPreferredName(), format.format(getAvgLength())); + builder.field(Fields.ENTROPY_AS_STRING.getPreferredName(), format.format(getEntropy())); + if (showDistribution == true) { + builder.startObject(Fields.DISTRIBUTION_AS_STRING.getPreferredName()); + for (Map.Entry e: getDistribution().entrySet()) { + builder.field(e.getKey(), format.format(e.getValue()).toString()); + } + builder.endObject(); + } + } + } else { + builder.nullField(Fields.MIN_LENGTH.getPreferredName()); + builder.nullField(Fields.MAX_LENGTH.getPreferredName()); + builder.nullField(Fields.AVG_LENGTH.getPreferredName()); + builder.field(Fields.ENTROPY.getPreferredName(), 0.0); + + if (showDistribution == true) { + builder.nullField(Fields.DISTRIBUTION.getPreferredName()); + } + } + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), count, minLength, maxLength, totalLength, charOccurrences, showDistribution); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + if (super.equals(obj) == false) return false; + + InternalStringStats other = (InternalStringStats) obj; + return count == other.count && + minLength == other.minLength && + maxLength == other.maxLength && + totalLength == other.totalLength && + showDistribution == other.showDistribution; + } +} diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregationBuilder.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregationBuilder.java new file mode 100644 index 00000000000..59097a69e16 --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregationBuilder.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.analytics.stringstats; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValueType; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +public class StringStatsAggregationBuilder extends ValuesSourceAggregationBuilder { + + public static final String NAME = "string_stats"; + private boolean showDistribution = false; + + private static final ObjectParser PARSER; + private static final ParseField SHOW_DISTRIBUTION_FIELD = new ParseField("show_distribution"); + + static { + PARSER = new ObjectParser<>(StringStatsAggregationBuilder.NAME); + ValuesSourceParserHelper.declareBytesFields(PARSER, true, true); + + PARSER.declareBoolean(StringStatsAggregationBuilder::showDistribution, StringStatsAggregationBuilder.SHOW_DISTRIBUTION_FIELD); + } + + public static StringStatsAggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException { + return PARSER.parse(parser, new StringStatsAggregationBuilder(aggregationName), null); + } + + public StringStatsAggregationBuilder(String name) { + super(name, ValuesSourceType.BYTES, ValueType.STRING); + } + + public StringStatsAggregationBuilder(StringStatsAggregationBuilder clone, + AggregatorFactories.Builder factoriesBuilder, + Map metaData) { + super(clone, factoriesBuilder, metaData); + this.showDistribution = clone.showDistribution(); + } + + /** Read from a stream. */ + public StringStatsAggregationBuilder(StreamInput in) throws IOException { + super(in, ValuesSourceType.BYTES, ValueType.STRING); + this.showDistribution = in.readBoolean(); + } + + @Override + protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metaData) { + return new StringStatsAggregationBuilder(this, factoriesBuilder, metaData); + } + + @Override + protected void innerWriteTo(StreamOutput out) throws IOException { + out.writeBoolean(showDistribution); + } + + @Override + protected StringStatsAggregatorFactory innerBuild(QueryShardContext queryShardContext, + ValuesSourceConfig config, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder) throws IOException { + return new StringStatsAggregatorFactory(name, config, showDistribution, queryShardContext, parent, subFactoriesBuilder, metaData); + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.field(StringStatsAggregationBuilder.SHOW_DISTRIBUTION_FIELD.getPreferredName(), showDistribution); + + return builder; + } + + @Override + public String getType() { + return NAME; + } + + /** + * Return whether to include the probability distribution of each character in the results. + * {@code showDistribution} is true, distribution will be included. + */ + public boolean showDistribution() { + return showDistribution; + } + + /** + * Set whether to include the probability distribution of each character in the results. + * + * @return the builder so that calls can be chained + */ + public StringStatsAggregationBuilder showDistribution(boolean showDistribution) { + this.showDistribution = showDistribution; + return this; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), showDistribution); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + if (super.equals(obj) == false) return false; + StringStatsAggregationBuilder other = (StringStatsAggregationBuilder) obj; + return showDistribution == other.showDistribution; + } +} diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregator.java new file mode 100644 index 00000000000..91f40de16ea --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregator.java @@ -0,0 +1,175 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.analytics.stringstats; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.IntArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.index.fielddata.SortedBinaryDocValues; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.metrics.MetricsAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Metric aggregator that operates over string and returns statistics such as + * minimum length, maximum length, average length, the total Shannon entropy and + * probability distribution for each character appearing in all terms. + */ +public class StringStatsAggregator extends MetricsAggregator { + + final ValuesSource.Bytes valuesSource; + final DocValueFormat format; + + /** Option to show the probability distribution for each character appearing in all terms. */ + private final boolean showDistribution; + + LongArray count; + IntArray minLength; + IntArray maxLength; + /** Accummulates the total length of all fields. Used for calculate average length and char frequencies. */ + LongArray totalLength; + /** Map that stores the number of occurrences for each character. */ + Map charOccurrences; + + StringStatsAggregator(String name, boolean showDistribution, ValuesSource.Bytes valuesSource, DocValueFormat format, + SearchContext context, Aggregator parent, + List pipelineAggregators, Map metaData) throws IOException { + super(name, context, parent, pipelineAggregators, metaData); + this.showDistribution = showDistribution; + this.valuesSource = valuesSource; + if (valuesSource != null) { + final BigArrays bigArrays = context.bigArrays(); + count = bigArrays.newLongArray(1, true); + totalLength = bigArrays.newLongArray(1, true); + minLength = bigArrays.newIntArray(1, false); + minLength.fill(0, minLength.size(), Integer.MAX_VALUE); + maxLength = bigArrays.newIntArray(1, false); + maxLength.fill(0, maxLength.size(), Integer.MIN_VALUE); + charOccurrences = new HashMap<>(); + } + this.format = format; + } + + @Override + public ScoreMode scoreMode() { + return (valuesSource != null && valuesSource.needsScores()) ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, + final LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + final BigArrays bigArrays = context.bigArrays(); + final SortedBinaryDocValues values = valuesSource.bytesValues(ctx); + + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + final long overSize = BigArrays.overSize(bucket + 1); + if (bucket >= count.size()) { + final long from = count.size(); + count = bigArrays.resize(count, overSize); + totalLength = bigArrays.resize(totalLength, overSize); + minLength = bigArrays.resize(minLength, overSize); + maxLength = bigArrays.resize(maxLength, overSize); + minLength.fill(from, overSize, Integer.MAX_VALUE); + maxLength.fill(from, overSize, Integer.MIN_VALUE); + } + + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + count.increment(bucket, valuesCount); + + for (int i = 0; i < valuesCount; i++) { + BytesRef value = values.nextValue(); + if (value.length > 0) { + String valueStr = value.utf8ToString(); + int length = valueStr.length(); + totalLength.increment(bucket, length); + + // Update min/max length for string + int min = Math.min(minLength.get(bucket), length); + int max = Math.max(maxLength.get(bucket), length); + minLength.set(bucket, min); + maxLength.set(bucket, max); + + // Parse string chars and count occurrences + for (Character c : valueStr.toCharArray()) { + LongArray occ = charOccurrences.get(c); + if (occ == null) { + occ = bigArrays.newLongArray(overSize, true); + } else { + if (bucket >= occ.size()) { + occ = bigArrays.resize(occ, overSize); + } + } + occ.increment(bucket, 1); + charOccurrences.put(c, occ); + } + } + } + } + } + }; + } + + @Override + public InternalAggregation buildAggregation(long bucket) { + if (valuesSource == null || bucket >= count.size()) { + return buildEmptyAggregation(); + } + + // Convert Map entries: Character -> String and LongArray -> Long + // Include only characters that have at least one occurrence + Map occurrences = new HashMap<>(charOccurrences.size()); + for (Map.Entry e : charOccurrences.entrySet()) { + if (e.getValue().size() > bucket) { + long occ = e.getValue().get(bucket); + if (occ > 0) { + occurrences.put(e.getKey().toString(), occ); + } + } + } + + return new InternalStringStats(name, count.get(bucket), totalLength.get(bucket), + minLength.get(bucket), maxLength.get(bucket), occurrences, showDistribution, + format, pipelineAggregators(), metaData()); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalStringStats(name, + 0, 0, Integer.MAX_VALUE, Integer.MIN_VALUE, + Collections.emptyMap(), showDistribution, format, + pipelineAggregators(), metaData()); + } + + @Override + public void doClose() { + Releasables.close(maxLength, minLength, count, totalLength); + if (charOccurrences != null) { + Releasables.close(charOccurrences.values()); + } + } +} diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregatorFactory.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregatorFactory.java new file mode 100644 index 00000000000..5a80bc0f589 --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregatorFactory.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.stringstats; + +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +class StringStatsAggregatorFactory extends ValuesSourceAggregatorFactory { + + private final boolean showDistribution; + + StringStatsAggregatorFactory(String name, ValuesSourceConfig config, + Boolean showDistribution, + QueryShardContext queryShardContext, + AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder, Map metaData) + throws IOException { + super(name, config, queryShardContext, parent, subFactoriesBuilder, metaData); + this.showDistribution = showDistribution; + } + + @Override + protected Aggregator createUnmapped(SearchContext searchContext, + Aggregator parent, + List pipelineAggregators, + Map metaData) throws IOException { + return new StringStatsAggregator(name, showDistribution,null, config.format(), searchContext, parent, + pipelineAggregators, metaData); + } + + @Override + protected Aggregator doCreateInternal(ValuesSource.Bytes valuesSource, + SearchContext searchContext, + Aggregator parent, + boolean collectsFromSingleBucket, + List pipelineAggregators, + Map metaData) throws IOException { + return new StringStatsAggregator(name, showDistribution, valuesSource, config.format(), searchContext, parent, + pipelineAggregators, metaData); + } + +} diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregatorTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregatorTests.java new file mode 100644 index 00000000000..5e565cc833c --- /dev/null +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/stringstats/StringStatsAggregatorTests.java @@ -0,0 +1,262 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.analytics.stringstats; + +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermInSetQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.index.mapper.TextFieldMapper; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator; +import org.elasticsearch.search.aggregations.support.ValueType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; + +import static java.util.Collections.singleton; + +public class StringStatsAggregatorTests extends AggregatorTestCase { + + private void testCase(Query query, + CheckedConsumer buildIndex, + Consumer verify) throws IOException { + TextFieldMapper.TextFieldType fieldType = new TextFieldMapper.TextFieldType(); + fieldType.setName("text"); + fieldType.setFielddata(true); + + AggregationBuilder aggregationBuilder = new StringStatsAggregationBuilder("_name").field("text"); + testCase(aggregationBuilder, query, buildIndex, verify, fieldType); + } + + private void testCase(AggregationBuilder aggregationBuilder, Query query, + CheckedConsumer buildIndex, + Consumer verify, MappedFieldType fieldType) throws IOException { + Directory directory = newDirectory(); + RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); + buildIndex.accept(indexWriter); + indexWriter.close(); + + IndexReader indexReader = DirectoryReader.open(directory); + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + StringStatsAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType); + aggregator.preCollection(); + indexSearcher.search(query, aggregator); + aggregator.postCollection(); + verify.accept((InternalStringStats) aggregator.buildAggregation(0L)); + + indexReader.close(); + directory.close(); + } + + public void testNoDocs() throws IOException { + this.testCase(new MatchAllDocsQuery(), iw -> { + // Intentionally not writing any docs + }, stats -> { + assertEquals(0, stats.getCount()); + assertEquals(Integer.MIN_VALUE, stats.getMaxLength()); + assertEquals(Integer.MAX_VALUE, stats.getMinLength()); + assertEquals(Double.NaN, stats.getAvgLength(), 0); + assertTrue(stats.getDistribution().isEmpty()); + assertEquals(0.0, stats.getEntropy(), 0); + }); + } + + public void testUnmappedField() throws IOException { + StringStatsAggregationBuilder aggregationBuilder = new StringStatsAggregationBuilder("_name").field("text"); + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for(int i = 0; i < 10; i++) { + iw.addDocument(singleton(new TextField("text", "test" + i, Field.Store.NO))); + } + }, stats -> { + assertEquals(0, stats.getCount()); + assertEquals(Integer.MIN_VALUE, stats.getMaxLength()); + assertEquals(Integer.MAX_VALUE, stats.getMinLength()); + assertEquals(Double.NaN, stats.getAvgLength(), 0); + assertTrue(stats.getDistribution().isEmpty()); + assertEquals(0.0, stats.getEntropy(), 0); + + }, null); + } + + public void testUnmappedWithMissingField() throws IOException { + StringStatsAggregationBuilder aggregationBuilder = new StringStatsAggregationBuilder("_name") + .field("text") + .missing("abca"); + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for(int i=0; i < 10; i++) { + iw.addDocument(singleton(new TextField("text", "test" + i, Field.Store.NO))); + } + }, stats -> { + assertEquals(10, stats.getCount()); + assertEquals(4, stats.getMaxLength()); + assertEquals(4, stats.getMinLength()); + assertEquals(4.0, stats.getAvgLength(), 0); + assertEquals(3, stats.getDistribution().size()); + assertEquals(0.50, stats.getDistribution().get("a"), 0); + assertEquals(0.25, stats.getDistribution().get("b"), 0); + assertEquals(0.25, stats.getDistribution().get("c"), 0); + assertEquals(1.5, stats.getEntropy(), 0); + }, null); + } + + public void testSingleValuedField() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + for(int i=0; i < 10; i++) { + iw.addDocument(singleton(new TextField("text", "test" + i, Field.Store.NO))); + } + }, stats -> { + assertEquals(10, stats.getCount()); + assertEquals(5, stats.getMaxLength()); + assertEquals(5, stats.getMinLength()); + assertEquals(5.0, stats.getAvgLength(), 0); + assertEquals(13, stats.getDistribution().size()); + assertEquals(0.4, stats.getDistribution().get("t"), 0); + assertEquals(0.2, stats.getDistribution().get("e"), 0); + assertEquals(0.02, stats.getDistribution().get("0"), 0); + assertEquals(2.58631, stats.getEntropy(), 0.00001); + }); + } + + public void testNoMatchingField() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + for(int i=0; i < 10; i++) { + iw.addDocument(singleton(new TextField("wrong_field", "test" + i, Field.Store.NO))); + } + }, stats -> { + assertEquals(0, stats.getCount()); + assertEquals(Integer.MIN_VALUE, stats.getMaxLength()); + assertEquals(Integer.MAX_VALUE, stats.getMinLength()); + assertEquals(Double.NaN, stats.getAvgLength(), 0); + assertTrue(stats.getDistribution().isEmpty()); + assertEquals(0.0, stats.getEntropy(), 0); + }); + } + + public void testQueryFiltering() throws IOException { + testCase(new TermInSetQuery("text", new BytesRef("test0"), new BytesRef("test1")), iw -> { + for(int i=0; i < 10; i++) { + iw.addDocument(singleton(new TextField("text", "test" + i, Field.Store.NO))); + } + }, stats -> { + assertEquals(2, stats.getCount()); + assertEquals(5, stats.getMaxLength()); + assertEquals(5, stats.getMinLength()); + assertEquals(5.0, stats.getAvgLength(), 0); + assertEquals(5, stats.getDistribution().size()); + assertEquals(0.4, stats.getDistribution().get("t"), 0); + assertEquals(0.2, stats.getDistribution().get("e"), 0); + assertEquals(0.1, stats.getDistribution().get("0"), 0); + assertEquals(2.12193, stats.getEntropy(), 0.00001); + }); + } + + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47469") + public void testSingleValuedFieldWithFormatter() throws IOException { + TextFieldMapper.TextFieldType fieldType = new TextFieldMapper.TextFieldType(); + fieldType.setName("text"); + fieldType.setFielddata(true); + + StringStatsAggregationBuilder aggregationBuilder = new StringStatsAggregationBuilder("_name") + .field("text") + .format("0000.00") + .showDistribution(true); + + testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + for(int i=0; i < 10; i++) { + iw.addDocument(singleton(new TextField("text", "test" + i, Field.Store.NO))); + } + }, stats -> { + assertEquals("0010.00", stats.getCountAsString()); + assertEquals("0005.00", stats.getMaxLengthAsString()); + assertEquals("0005.00", stats.getMinLengthAsString()); + assertEquals("0005.00", stats.getAvgLengthAsString()); + assertEquals("0002.58", stats.getEntropyAsString()); + }, fieldType); + } + + /** + * Test a string_stats aggregation as a subaggregation of a terms aggregation + */ + public void testNestedAggregation() throws IOException { + MappedFieldType numericFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.INTEGER); + numericFieldType.setName("value"); + numericFieldType.setHasDocValues(true); + + TextFieldMapper.TextFieldType textFieldType = new TextFieldMapper.TextFieldType(); + textFieldType.setName("text"); + textFieldType.setFielddata(true); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("terms", ValueType.NUMERIC) + .field("value") + .subAggregation(new StringStatsAggregationBuilder("text_stats").field("text").valueType(ValueType.STRING)); + + Directory directory = newDirectory(); + RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); + final int numDocs = 10; + for (int i = 0; i < numDocs; i++) { + for (int j = 0; j < 4; j++) + indexWriter.addDocument(Arrays.asList( + new NumericDocValuesField("value", i + 1), + new TextField("text", "test" + j, Field.Store.NO)) + ); + } + indexWriter.close(); + + IndexReader indexReader = DirectoryReader.open(directory); + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + TermsAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, numericFieldType, textFieldType); + aggregator.preCollection(); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + + Terms terms = (Terms) aggregator.buildAggregation(0L); + assertNotNull(terms); + List buckets = terms.getBuckets(); + assertNotNull(buckets); + assertEquals(10, buckets.size()); + + for (int i = 0; i < 10; i++) { + Terms.Bucket bucket = buckets.get(i); + assertNotNull(bucket); + assertEquals((long) i + 1, bucket.getKeyAsNumber()); + assertEquals(4L, bucket.getDocCount()); + + InternalStringStats stats = bucket.getAggregations().get("text_stats"); + assertNotNull(stats); + assertEquals(4L, stats.getCount()); + assertEquals(5, stats.getMaxLength()); + assertEquals(5, stats.getMinLength()); + assertEquals(5.0, stats.getAvgLength(), 0); + assertEquals(7, stats.getDistribution().size()); + assertEquals(0.4, stats.getDistribution().get("t"), 0); + assertEquals(0.2, stats.getDistribution().get("e"), 0); + assertEquals(2.32193, stats.getEntropy(), 0.00001); + } + + indexReader.close(); + directory.close(); + } + +}