[7.x] Implement stats aggregation for string terms (#49097)

Backport of #47468 to 7.x

This PR adds a new metric aggregation called string_stats that operates on string terms of a document and returns the following:

min_length: The length of the shortest term
max_length: The length of the longest term
avg_length: The average length of all terms
distribution: The probability distribution of all characters appearing in all terms
entropy: The total Shannon entropy value calculated for all terms

This aggregation has been implemented as an analytics plugin.
This commit is contained in:
Christos Soulios 2019-11-15 14:36:21 +02:00 committed by GitHub
parent 085d08cfd1
commit d9f0245b10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1153 additions and 9 deletions

View File

@ -177,7 +177,7 @@ buildRestTests.setups['ledger'] = '''
{"index":{}}
{"date": "2015/01/01 00:00:00", "amount": 200, "type": "sale", "description": "something"}
{"index":{}}
{"date": "2015/01/01 00:00:00", "amount": 10, "type": "expense", "decription": "another thing"}
{"date": "2015/01/01 00:00:00", "amount": 10, "type": "expense", "description": "another thing"}
{"index":{}}
{"date": "2015/01/01 00:00:00", "amount": 150, "type": "sale", "description": "blah"}
{"index":{}}

View File

@ -35,6 +35,8 @@ include::metrics/scripted-metric-aggregation.asciidoc[]
include::metrics/stats-aggregation.asciidoc[]
include::metrics/string-stats-aggregation.asciidoc[]
include::metrics/sum-aggregation.asciidoc[]
include::metrics/tophits-aggregation.asciidoc[]

View File

@ -0,0 +1,217 @@
[role="xpack"]
[testenv="basic"]
[[search-aggregations-metrics-string-stats-aggregation]]
=== String Stats Aggregation
A `multi-value` metrics aggregation that computes statistics over string values extracted from the aggregated documents.
These values can be retrieved either from specific `keyword` fields in the documents or can be generated by a provided script.
The string stats aggregation returns the following results:
* `count` - The number of non-empty fields counted.
* `min_length` - The length of the shortest term.
* `max_length` - The length of the longest term.
* `avg_length` - The average length computed over all terms.
* `entropy` - The https://en.wikipedia.org/wiki/Entropy_(information_theory)[Shannon Entropy] value computed over all terms collected by
the aggregation. Shannon entropy quantifies the amount of information contained in the field. It is a very useful metric for
measuring a wide range of properties of a data set, such as diversity, similarity, randomness etc.
Assuming the data consists of a twitter messages:
[source,console]
--------------------------------------------------
POST /twitter/_search?size=0
{
"aggs" : {
"message_stats" : { "string_stats" : { "field" : "message.keyword" } }
}
}
--------------------------------------------------
// TEST[setup:twitter]
The above aggregation computes the string statistics for the `message` field in all documents. The aggregation type
is `string_stats` and the `field` parameter defines the field of the documents the stats will be computed on.
The above will return the following:
[source,console-result]
--------------------------------------------------
{
...
"aggregations": {
"message_stats" : {
"count" : 5,
"min_length" : 24,
"max_length" : 30,
"avg_length" : 28.8,
"entropy" : 3.94617750050791
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
The name of the aggregation (`message_stats` above) also serves as the key by which the aggregation result can be retrieved from
the returned response.
==== Character distribution
The computation of the Shannon Entropy value is based on the probability of each character appearing in all terms collected
by the aggregation. To view the probability distribution for all characters, we can add the `show_distribution` (default: `false`) parameter.
[source,console]
--------------------------------------------------
POST /twitter/_search?size=0
{
"aggs" : {
"message_stats" : {
"string_stats" : {
"field" : "message.keyword",
"show_distribution": true <1>
}
}
}
}
--------------------------------------------------
// TEST[setup:twitter]
<1> Set the `show_distribution` parameter to `true`, so that probability distribution for all characters is returned in the results.
[source,console-result]
--------------------------------------------------
{
...
"aggregations": {
"message_stats" : {
"count" : 5,
"min_length" : 24,
"max_length" : 30,
"avg_length" : 28.8,
"entropy" : 3.94617750050791,
"distribution" : {
" " : 0.1527777777777778,
"e" : 0.14583333333333334,
"s" : 0.09722222222222222,
"m" : 0.08333333333333333,
"t" : 0.0763888888888889,
"h" : 0.0625,
"a" : 0.041666666666666664,
"i" : 0.041666666666666664,
"r" : 0.041666666666666664,
"g" : 0.034722222222222224,
"n" : 0.034722222222222224,
"o" : 0.034722222222222224,
"u" : 0.034722222222222224,
"b" : 0.027777777777777776,
"w" : 0.027777777777777776,
"c" : 0.013888888888888888,
"E" : 0.006944444444444444,
"l" : 0.006944444444444444,
"1" : 0.006944444444444444,
"2" : 0.006944444444444444,
"3" : 0.006944444444444444,
"4" : 0.006944444444444444,
"y" : 0.006944444444444444
}
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
The `distribution` object shows the probability of each character appearing in all terms. The characters are sorted by descending probability.
==== Script
Computing the message string stats based on a script:
[source,console]
--------------------------------------------------
POST /twitter/_search?size=0
{
"aggs" : {
"message_stats" : {
"string_stats" : {
"script" : {
"lang": "painless",
"source": "doc['message.keyword'].value"
}
}
}
}
}
--------------------------------------------------
// TEST[setup:twitter]
This will interpret the `script` parameter as an `inline` script with the `painless` script language and no script parameters.
To use a stored script use the following syntax:
[source,console]
--------------------------------------------------
POST /twitter/_search?size=0
{
"aggs" : {
"message_stats" : {
"string_stats" : {
"script" : {
"id": "my_script",
"params" : {
"field" : "message.keyword"
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:twitter,stored_example_script]
===== Value Script
We can use a value script to modify the message (eg we can add a prefix) and compute the new stats:
[source,console]
--------------------------------------------------
POST /twitter/_search?size=0
{
"aggs" : {
"message_stats" : {
"string_stats" : {
"field" : "message.keyword",
"script" : {
"lang": "painless",
"source": "params.prefix + _value",
"params" : {
"prefix" : "Message: "
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:twitter]
==== Missing value
The `missing` parameter defines how documents that are missing a value should be treated.
By default they will be ignored but it is also possible to treat them as if they had a value.
[source,console]
--------------------------------------------------
POST /twitter/_search?size=0
{
"aggs" : {
"message_stats" : {
"string_stats" : {
"field" : "message.keyword",
"missing": "[empty message]" <1>
}
}
}
}
--------------------------------------------------
// TEST[setup:twitter]
<1> Documents without a value in the `message` field will be treated as documents that have the value `[empty message]`.

View File

@ -6,10 +6,15 @@
package org.elasticsearch.xpack.analytics;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder;
public class AnalyticsAggregationBuilders {
public static CumulativeCardinalityPipelineAggregationBuilder cumulativeCaardinality(String name, String bucketsPath) {
public static CumulativeCardinalityPipelineAggregationBuilder cumulativeCardinality(String name, String bucketsPath) {
return new CumulativeCardinalityPipelineAggregationBuilder(name, bucketsPath);
}
public static StringStatsAggregationBuilder stringStats(String name) {
return new StringStatsAggregationBuilder(name);
}
}

View File

@ -13,11 +13,13 @@ import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregator;
import org.elasticsearch.xpack.analytics.stringstats.InternalStringStats;
import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
import java.util.ArrayList;
import java.util.Collection;
@ -40,11 +42,23 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
@Override
public List<PipelineAggregationSpec> getPipelineAggregations() {
return singletonList(new PipelineAggregationSpec(
CumulativeCardinalityPipelineAggregationBuilder.NAME,
CumulativeCardinalityPipelineAggregationBuilder::new,
CumulativeCardinalityPipelineAggregator::new,
CumulativeCardinalityPipelineAggregationBuilder::parse));
return singletonList(
new PipelineAggregationSpec(
CumulativeCardinalityPipelineAggregationBuilder.NAME,
CumulativeCardinalityPipelineAggregationBuilder::new,
CumulativeCardinalityPipelineAggregator::new,
CumulativeCardinalityPipelineAggregationBuilder::parse)
);
}
@Override
public List<AggregationSpec> getAggregations() {
return singletonList(
new AggregationSpec(
StringStatsAggregationBuilder.NAME,
StringStatsAggregationBuilder::new,
StringStatsAggregationBuilder::parse).addResultReader(InternalStringStats::new)
);
}
@Override

View File

@ -0,0 +1,287 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.stringstats;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
public class InternalStringStats extends InternalAggregation {
enum Metrics {
count {
Object getFieldValue(InternalStringStats stats) {
return stats.getCount();
}
},
min_length {
Object getFieldValue(InternalStringStats stats) {
return stats.getMinLength();
}
}, max_length {
Object getFieldValue(InternalStringStats stats) {
return stats.getMaxLength();
}
},
avg_length {
Object getFieldValue(InternalStringStats stats) {
return stats.getAvgLength();
}
},
entropy {
Object getFieldValue(InternalStringStats stats) {
return stats.getEntropy();
}
};
abstract Object getFieldValue(InternalStringStats stats);
}
private final DocValueFormat format;
private final boolean showDistribution;
private final long count;
private final long totalLength;
private final int minLength;
private final int maxLength;
private final Map<String, Long> charOccurrences;
public InternalStringStats(String name, long count, long totalLength, int minLength, int maxLength,
Map<String, Long> charOccurences, boolean showDistribution,
DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
this.format = formatter;
this.showDistribution = showDistribution;
this.count = count;
this.totalLength = totalLength;
this.minLength = minLength;
this.maxLength = maxLength;
this.charOccurrences = charOccurences;
}
/** Read from a stream. */
public InternalStringStats(StreamInput in) throws IOException {
super(in);
format = in.readNamedWriteable(DocValueFormat.class);
showDistribution = in.readBoolean();
count = in.readVLong();
totalLength = in.readVLong();
minLength = in.readVInt();
maxLength = in.readVInt();
charOccurrences = in.readMap(StreamInput::readString, StreamInput::readLong);
}
@Override
protected final void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeBoolean(showDistribution);
out.writeVLong(count);
out.writeVLong(totalLength);
out.writeVInt(minLength);
out.writeVInt(maxLength);
out.writeMap(charOccurrences, StreamOutput::writeString, StreamOutput::writeLong);
}
public String getWriteableName() {
return StringStatsAggregationBuilder.NAME;
}
public long getCount() {
return count;
}
public int getMinLength() {
return minLength;
}
public int getMaxLength() {
return maxLength;
}
public double getAvgLength() {
return (double) totalLength / count;
}
public double getEntropy() {
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
for (double p : getDistribution().values()) {
if (p > 0) {
double value = p * log2(p);
kahanSummation.add(value);
}
}
return -kahanSummation.value();
}
/**
* Convert the character occurrences map to character frequencies.
*
* @return A map with the character as key and the probability of
* this character to occur as value. The map is ordered by frequency descending.
*/
Map<String, Double> getDistribution() {
return charOccurrences.entrySet().stream()
.sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
.collect(
Collectors.toMap(e -> e.getKey(), e -> (double) e.getValue() / totalLength,
(e1, e2) -> e2, LinkedHashMap::new)
);
}
/** Calculate base 2 logarithm */
static double log2(double d) {
return Math.log(d) / Math.log(2.0);
}
public String getCountAsString() {
return format.format(getCount()).toString();
}
public String getMinLengthAsString() {
return format.format(getMinLength()).toString();
}
public String getMaxLengthAsString() {
return format.format(getMaxLength()).toString();
}
public String getAvgLengthAsString() {
return format.format(getAvgLength()).toString();
}
public String getEntropyAsString() {
return format.format(getEntropy()).toString();
}
public Object value(String name) {
try {
return Metrics.valueOf(name).getFieldValue(this);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unknown value [" + name + "] in string stats aggregation");
}
}
@Override
public InternalStringStats doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long count = 0;
long totalLength = 0;
int minLength = Integer.MAX_VALUE;
int maxLength = Integer.MIN_VALUE;
Map<String, Long> occurs = new HashMap<>();
for (InternalAggregation aggregation : aggregations) {
InternalStringStats stats = (InternalStringStats) aggregation;
count += stats.getCount();
minLength = Math.min(minLength, stats.getMinLength());
maxLength = Math.max(maxLength, stats.getMaxLength());
totalLength += stats.totalLength;
stats.charOccurrences.forEach((k, v) ->
occurs.merge(k, v, (oldValue, newValue) -> oldValue + newValue)
);
}
return new InternalStringStats(name, count, totalLength, minLength, maxLength, occurs,
showDistribution, format, pipelineAggregators(), getMetaData());
}
@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {
return this;
} else if (path.size() == 1) {
return value(path.get(0));
} else {
throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
}
}
static class Fields {
public static final ParseField COUNT = new ParseField("count");
public static final ParseField MIN_LENGTH = new ParseField("min_length");
public static final ParseField MIN_LENGTH_AS_STRING = new ParseField("min_length_as_string");
public static final ParseField MAX_LENGTH = new ParseField("max_length");
public static final ParseField MAX_LENGTH_AS_STRING = new ParseField("max_as_string");
public static final ParseField AVG_LENGTH = new ParseField("avg_length");
public static final ParseField AVG_LENGTH_AS_STRING = new ParseField("avg_length_as_string");
public static final ParseField ENTROPY = new ParseField("entropy");
public static final ParseField ENTROPY_AS_STRING = new ParseField("entropy_string");
public static final ParseField DISTRIBUTION = new ParseField("distribution");
public static final ParseField DISTRIBUTION_AS_STRING = new ParseField("distribution_string");
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.COUNT.getPreferredName(), count);
if (count > 0) {
builder.field(Fields.MIN_LENGTH.getPreferredName(), minLength);
builder.field(Fields.MAX_LENGTH.getPreferredName(), maxLength);
builder.field(Fields.AVG_LENGTH.getPreferredName(), getAvgLength());
builder.field(Fields.ENTROPY.getPreferredName(), getEntropy());
if (showDistribution == true) {
builder.field(Fields.DISTRIBUTION.getPreferredName(), getDistribution());
}
if (format != DocValueFormat.RAW) {
builder.field(Fields.MIN_LENGTH_AS_STRING.getPreferredName(), format.format(getMinLength()));
builder.field(Fields.MAX_LENGTH_AS_STRING.getPreferredName(), format.format(getMaxLength()));
builder.field(Fields.AVG_LENGTH_AS_STRING.getPreferredName(), format.format(getAvgLength()));
builder.field(Fields.ENTROPY_AS_STRING.getPreferredName(), format.format(getEntropy()));
if (showDistribution == true) {
builder.startObject(Fields.DISTRIBUTION_AS_STRING.getPreferredName());
for (Map.Entry<String, Double> e: getDistribution().entrySet()) {
builder.field(e.getKey(), format.format(e.getValue()).toString());
}
builder.endObject();
}
}
} else {
builder.nullField(Fields.MIN_LENGTH.getPreferredName());
builder.nullField(Fields.MAX_LENGTH.getPreferredName());
builder.nullField(Fields.AVG_LENGTH.getPreferredName());
builder.field(Fields.ENTROPY.getPreferredName(), 0.0);
if (showDistribution == true) {
builder.nullField(Fields.DISTRIBUTION.getPreferredName());
}
}
return builder;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), count, minLength, maxLength, totalLength, charOccurrences, showDistribution);
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
InternalStringStats other = (InternalStringStats) obj;
return count == other.count &&
minLength == other.minLength &&
maxLength == other.maxLength &&
totalLength == other.totalLength &&
showDistribution == other.showDistribution;
}
}

View File

@ -0,0 +1,126 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.stringstats;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
public class StringStatsAggregationBuilder extends ValuesSourceAggregationBuilder<ValuesSource.Bytes, StringStatsAggregationBuilder> {
public static final String NAME = "string_stats";
private boolean showDistribution = false;
private static final ObjectParser<StringStatsAggregationBuilder, Void> PARSER;
private static final ParseField SHOW_DISTRIBUTION_FIELD = new ParseField("show_distribution");
static {
PARSER = new ObjectParser<>(StringStatsAggregationBuilder.NAME);
ValuesSourceParserHelper.declareBytesFields(PARSER, true, true);
PARSER.declareBoolean(StringStatsAggregationBuilder::showDistribution, StringStatsAggregationBuilder.SHOW_DISTRIBUTION_FIELD);
}
public static StringStatsAggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException {
return PARSER.parse(parser, new StringStatsAggregationBuilder(aggregationName), null);
}
public StringStatsAggregationBuilder(String name) {
super(name, ValuesSourceType.BYTES, ValueType.STRING);
}
public StringStatsAggregationBuilder(StringStatsAggregationBuilder clone,
AggregatorFactories.Builder factoriesBuilder,
Map<String, Object> metaData) {
super(clone, factoriesBuilder, metaData);
this.showDistribution = clone.showDistribution();
}
/** Read from a stream. */
public StringStatsAggregationBuilder(StreamInput in) throws IOException {
super(in, ValuesSourceType.BYTES, ValueType.STRING);
this.showDistribution = in.readBoolean();
}
@Override
protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metaData) {
return new StringStatsAggregationBuilder(this, factoriesBuilder, metaData);
}
@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
out.writeBoolean(showDistribution);
}
@Override
protected StringStatsAggregatorFactory innerBuild(QueryShardContext queryShardContext,
ValuesSourceConfig<ValuesSource.Bytes> config,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder) throws IOException {
return new StringStatsAggregatorFactory(name, config, showDistribution, queryShardContext, parent, subFactoriesBuilder, metaData);
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(StringStatsAggregationBuilder.SHOW_DISTRIBUTION_FIELD.getPreferredName(), showDistribution);
return builder;
}
@Override
public String getType() {
return NAME;
}
/**
* Return whether to include the probability distribution of each character in the results.
* {@code showDistribution} is true, distribution will be included.
*/
public boolean showDistribution() {
return showDistribution;
}
/**
* Set whether to include the probability distribution of each character in the results.
*
* @return the builder so that calls can be chained
*/
public StringStatsAggregationBuilder showDistribution(boolean showDistribution) {
this.showDistribution = showDistribution;
return this;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), showDistribution);
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
StringStatsAggregationBuilder other = (StringStatsAggregationBuilder) obj;
return showDistribution == other.showDistribution;
}
}

View File

@ -0,0 +1,175 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.stringstats;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Metric aggregator that operates over string and returns statistics such as
* minimum length, maximum length, average length, the total Shannon entropy and
* probability distribution for each character appearing in all terms.
*/
public class StringStatsAggregator extends MetricsAggregator {
final ValuesSource.Bytes valuesSource;
final DocValueFormat format;
/** Option to show the probability distribution for each character appearing in all terms. */
private final boolean showDistribution;
LongArray count;
IntArray minLength;
IntArray maxLength;
/** Accummulates the total length of all fields. Used for calculate average length and char frequencies. */
LongArray totalLength;
/** Map that stores the number of occurrences for each character. */
Map<Character, LongArray> charOccurrences;
StringStatsAggregator(String name, boolean showDistribution, ValuesSource.Bytes valuesSource, DocValueFormat format,
SearchContext context, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, context, parent, pipelineAggregators, metaData);
this.showDistribution = showDistribution;
this.valuesSource = valuesSource;
if (valuesSource != null) {
final BigArrays bigArrays = context.bigArrays();
count = bigArrays.newLongArray(1, true);
totalLength = bigArrays.newLongArray(1, true);
minLength = bigArrays.newIntArray(1, false);
minLength.fill(0, minLength.size(), Integer.MAX_VALUE);
maxLength = bigArrays.newIntArray(1, false);
maxLength.fill(0, maxLength.size(), Integer.MIN_VALUE);
charOccurrences = new HashMap<>();
}
this.format = format;
}
@Override
public ScoreMode scoreMode() {
return (valuesSource != null && valuesSource.needsScores()) ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
final SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
final long overSize = BigArrays.overSize(bucket + 1);
if (bucket >= count.size()) {
final long from = count.size();
count = bigArrays.resize(count, overSize);
totalLength = bigArrays.resize(totalLength, overSize);
minLength = bigArrays.resize(minLength, overSize);
maxLength = bigArrays.resize(maxLength, overSize);
minLength.fill(from, overSize, Integer.MAX_VALUE);
maxLength.fill(from, overSize, Integer.MIN_VALUE);
}
if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
count.increment(bucket, valuesCount);
for (int i = 0; i < valuesCount; i++) {
BytesRef value = values.nextValue();
if (value.length > 0) {
String valueStr = value.utf8ToString();
int length = valueStr.length();
totalLength.increment(bucket, length);
// Update min/max length for string
int min = Math.min(minLength.get(bucket), length);
int max = Math.max(maxLength.get(bucket), length);
minLength.set(bucket, min);
maxLength.set(bucket, max);
// Parse string chars and count occurrences
for (Character c : valueStr.toCharArray()) {
LongArray occ = charOccurrences.get(c);
if (occ == null) {
occ = bigArrays.newLongArray(overSize, true);
} else {
if (bucket >= occ.size()) {
occ = bigArrays.resize(occ, overSize);
}
}
occ.increment(bucket, 1);
charOccurrences.put(c, occ);
}
}
}
}
}
};
}
@Override
public InternalAggregation buildAggregation(long bucket) {
if (valuesSource == null || bucket >= count.size()) {
return buildEmptyAggregation();
}
// Convert Map entries: Character -> String and LongArray -> Long
// Include only characters that have at least one occurrence
Map<String, Long> occurrences = new HashMap<>(charOccurrences.size());
for (Map.Entry<Character, LongArray> e : charOccurrences.entrySet()) {
if (e.getValue().size() > bucket) {
long occ = e.getValue().get(bucket);
if (occ > 0) {
occurrences.put(e.getKey().toString(), occ);
}
}
}
return new InternalStringStats(name, count.get(bucket), totalLength.get(bucket),
minLength.get(bucket), maxLength.get(bucket), occurrences, showDistribution,
format, pipelineAggregators(), metaData());
}
@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalStringStats(name,
0, 0, Integer.MAX_VALUE, Integer.MIN_VALUE,
Collections.emptyMap(), showDistribution, format,
pipelineAggregators(), metaData());
}
@Override
public void doClose() {
Releasables.close(maxLength, minLength, count, totalLength);
if (charOccurrences != null) {
Releasables.close(charOccurrences.values());
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.stringstats;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
class StringStatsAggregatorFactory extends ValuesSourceAggregatorFactory<ValuesSource.Bytes> {
private final boolean showDistribution;
StringStatsAggregatorFactory(String name, ValuesSourceConfig<ValuesSource.Bytes> config,
Boolean showDistribution,
QueryShardContext queryShardContext,
AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData)
throws IOException {
super(name, config, queryShardContext, parent, subFactoriesBuilder, metaData);
this.showDistribution = showDistribution;
}
@Override
protected Aggregator createUnmapped(SearchContext searchContext,
Aggregator parent,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new StringStatsAggregator(name, showDistribution,null, config.format(), searchContext, parent,
pipelineAggregators, metaData);
}
@Override
protected Aggregator doCreateInternal(ValuesSource.Bytes valuesSource,
SearchContext searchContext,
Aggregator parent,
boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new StringStatsAggregator(name, showDistribution, valuesSource, config.format(), searchContext, parent,
pipelineAggregators, metaData);
}
}

View File

@ -0,0 +1,262 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.stringstats;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermInSetQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.TextFieldMapper;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator;
import org.elasticsearch.search.aggregations.support.ValueType;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import static java.util.Collections.singleton;
public class StringStatsAggregatorTests extends AggregatorTestCase {
private void testCase(Query query,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
Consumer<InternalStringStats> verify) throws IOException {
TextFieldMapper.TextFieldType fieldType = new TextFieldMapper.TextFieldType();
fieldType.setName("text");
fieldType.setFielddata(true);
AggregationBuilder aggregationBuilder = new StringStatsAggregationBuilder("_name").field("text");
testCase(aggregationBuilder, query, buildIndex, verify, fieldType);
}
private void testCase(AggregationBuilder aggregationBuilder, Query query,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
Consumer<InternalStringStats> verify, MappedFieldType fieldType) throws IOException {
Directory directory = newDirectory();
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
buildIndex.accept(indexWriter);
indexWriter.close();
IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
StringStatsAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
aggregator.preCollection();
indexSearcher.search(query, aggregator);
aggregator.postCollection();
verify.accept((InternalStringStats) aggregator.buildAggregation(0L));
indexReader.close();
directory.close();
}
public void testNoDocs() throws IOException {
this.<InternalStringStats>testCase(new MatchAllDocsQuery(), iw -> {
// Intentionally not writing any docs
}, stats -> {
assertEquals(0, stats.getCount());
assertEquals(Integer.MIN_VALUE, stats.getMaxLength());
assertEquals(Integer.MAX_VALUE, stats.getMinLength());
assertEquals(Double.NaN, stats.getAvgLength(), 0);
assertTrue(stats.getDistribution().isEmpty());
assertEquals(0.0, stats.getEntropy(), 0);
});
}
public void testUnmappedField() throws IOException {
StringStatsAggregationBuilder aggregationBuilder = new StringStatsAggregationBuilder("_name").field("text");
testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
for(int i = 0; i < 10; i++) {
iw.addDocument(singleton(new TextField("text", "test" + i, Field.Store.NO)));
}
}, stats -> {
assertEquals(0, stats.getCount());
assertEquals(Integer.MIN_VALUE, stats.getMaxLength());
assertEquals(Integer.MAX_VALUE, stats.getMinLength());
assertEquals(Double.NaN, stats.getAvgLength(), 0);
assertTrue(stats.getDistribution().isEmpty());
assertEquals(0.0, stats.getEntropy(), 0);
}, null);
}
public void testUnmappedWithMissingField() throws IOException {
StringStatsAggregationBuilder aggregationBuilder = new StringStatsAggregationBuilder("_name")
.field("text")
.missing("abca");
testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
for(int i=0; i < 10; i++) {
iw.addDocument(singleton(new TextField("text", "test" + i, Field.Store.NO)));
}
}, stats -> {
assertEquals(10, stats.getCount());
assertEquals(4, stats.getMaxLength());
assertEquals(4, stats.getMinLength());
assertEquals(4.0, stats.getAvgLength(), 0);
assertEquals(3, stats.getDistribution().size());
assertEquals(0.50, stats.getDistribution().get("a"), 0);
assertEquals(0.25, stats.getDistribution().get("b"), 0);
assertEquals(0.25, stats.getDistribution().get("c"), 0);
assertEquals(1.5, stats.getEntropy(), 0);
}, null);
}
public void testSingleValuedField() throws IOException {
testCase(new MatchAllDocsQuery(), iw -> {
for(int i=0; i < 10; i++) {
iw.addDocument(singleton(new TextField("text", "test" + i, Field.Store.NO)));
}
}, stats -> {
assertEquals(10, stats.getCount());
assertEquals(5, stats.getMaxLength());
assertEquals(5, stats.getMinLength());
assertEquals(5.0, stats.getAvgLength(), 0);
assertEquals(13, stats.getDistribution().size());
assertEquals(0.4, stats.getDistribution().get("t"), 0);
assertEquals(0.2, stats.getDistribution().get("e"), 0);
assertEquals(0.02, stats.getDistribution().get("0"), 0);
assertEquals(2.58631, stats.getEntropy(), 0.00001);
});
}
public void testNoMatchingField() throws IOException {
testCase(new MatchAllDocsQuery(), iw -> {
for(int i=0; i < 10; i++) {
iw.addDocument(singleton(new TextField("wrong_field", "test" + i, Field.Store.NO)));
}
}, stats -> {
assertEquals(0, stats.getCount());
assertEquals(Integer.MIN_VALUE, stats.getMaxLength());
assertEquals(Integer.MAX_VALUE, stats.getMinLength());
assertEquals(Double.NaN, stats.getAvgLength(), 0);
assertTrue(stats.getDistribution().isEmpty());
assertEquals(0.0, stats.getEntropy(), 0);
});
}
public void testQueryFiltering() throws IOException {
testCase(new TermInSetQuery("text", new BytesRef("test0"), new BytesRef("test1")), iw -> {
for(int i=0; i < 10; i++) {
iw.addDocument(singleton(new TextField("text", "test" + i, Field.Store.NO)));
}
}, stats -> {
assertEquals(2, stats.getCount());
assertEquals(5, stats.getMaxLength());
assertEquals(5, stats.getMinLength());
assertEquals(5.0, stats.getAvgLength(), 0);
assertEquals(5, stats.getDistribution().size());
assertEquals(0.4, stats.getDistribution().get("t"), 0);
assertEquals(0.2, stats.getDistribution().get("e"), 0);
assertEquals(0.1, stats.getDistribution().get("0"), 0);
assertEquals(2.12193, stats.getEntropy(), 0.00001);
});
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47469")
public void testSingleValuedFieldWithFormatter() throws IOException {
TextFieldMapper.TextFieldType fieldType = new TextFieldMapper.TextFieldType();
fieldType.setName("text");
fieldType.setFielddata(true);
StringStatsAggregationBuilder aggregationBuilder = new StringStatsAggregationBuilder("_name")
.field("text")
.format("0000.00")
.showDistribution(true);
testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
for(int i=0; i < 10; i++) {
iw.addDocument(singleton(new TextField("text", "test" + i, Field.Store.NO)));
}
}, stats -> {
assertEquals("0010.00", stats.getCountAsString());
assertEquals("0005.00", stats.getMaxLengthAsString());
assertEquals("0005.00", stats.getMinLengthAsString());
assertEquals("0005.00", stats.getAvgLengthAsString());
assertEquals("0002.58", stats.getEntropyAsString());
}, fieldType);
}
/**
* Test a string_stats aggregation as a subaggregation of a terms aggregation
*/
public void testNestedAggregation() throws IOException {
MappedFieldType numericFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.INTEGER);
numericFieldType.setName("value");
numericFieldType.setHasDocValues(true);
TextFieldMapper.TextFieldType textFieldType = new TextFieldMapper.TextFieldType();
textFieldType.setName("text");
textFieldType.setFielddata(true);
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("terms", ValueType.NUMERIC)
.field("value")
.subAggregation(new StringStatsAggregationBuilder("text_stats").field("text").valueType(ValueType.STRING));
Directory directory = newDirectory();
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
final int numDocs = 10;
for (int i = 0; i < numDocs; i++) {
for (int j = 0; j < 4; j++)
indexWriter.addDocument(Arrays.asList(
new NumericDocValuesField("value", i + 1),
new TextField("text", "test" + j, Field.Store.NO))
);
}
indexWriter.close();
IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
TermsAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, numericFieldType, textFieldType);
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms terms = (Terms) aggregator.buildAggregation(0L);
assertNotNull(terms);
List<? extends Terms.Bucket> buckets = terms.getBuckets();
assertNotNull(buckets);
assertEquals(10, buckets.size());
for (int i = 0; i < 10; i++) {
Terms.Bucket bucket = buckets.get(i);
assertNotNull(bucket);
assertEquals((long) i + 1, bucket.getKeyAsNumber());
assertEquals(4L, bucket.getDocCount());
InternalStringStats stats = bucket.getAggregations().get("text_stats");
assertNotNull(stats);
assertEquals(4L, stats.getCount());
assertEquals(5, stats.getMaxLength());
assertEquals(5, stats.getMinLength());
assertEquals(5.0, stats.getAvgLength(), 0);
assertEquals(7, stats.getDistribution().size());
assertEquals(0.4, stats.getDistribution().get("t"), 0);
assertEquals(0.2, stats.getDistribution().get("e"), 0);
assertEquals(2.32193, stats.getEntropy(), 0.00001);
}
indexReader.close();
directory.close();
}
}