From aa60e5cc07ef3d87fd7fc9adc3fc481785256924 Mon Sep 17 00:00:00 2001 From: markharwood Date: Thu, 17 Nov 2016 15:32:59 +0000 Subject: [PATCH] Aggregations - support for partitioning set of terms used in aggregations so that multiple requests can be done without trying to compute everything in one request. Closes #21487 --- .../bucket/terms/TermsAggregatorFactory.java | 5 +- .../bucket/terms/support/IncludeExclude.java | 153 +++++++++++++++++- .../aggregations/bucket/DoubleTermsIT.java | 40 +++++ .../aggregations/bucket/LongTermsIT.java | 45 ++++++ .../aggregations/bucket/StringTermsIT.java | 43 +++++ .../bucket/terms-aggregation.asciidoc | 68 +++++++- 6 files changed, 348 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index c01377d9761..3a7053d26d2 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -131,7 +131,10 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory 0) { valids = new LongHashSet(numValids); } @@ -96,6 +118,13 @@ public class IncludeExclude implements Writeable, ToXContent { public abstract boolean accept(BytesRef value); } + class PartitionedStringFilter extends StringFilter { + @Override + public boolean accept(BytesRef value) { + return Math.floorMod(value.hashCode(), incNumPartitions) == incZeroBasedPartition; + } + } + static class AutomatonBackedStringFilter extends StringFilter { private final ByteRunAutomaton runAutomaton; @@ -138,6 +167,25 @@ public class IncludeExclude implements Writeable, ToXContent { } + class PartitionedOrdinalsFilter extends OrdinalsFilter { + + @Override + public LongBitSet acceptedGlobalOrdinals(RandomAccessOrds globalOrdinals) throws IOException { + final long numOrds = globalOrdinals.getValueCount(); + final LongBitSet acceptedGlobalOrdinals = new LongBitSet(numOrds); + final TermsEnum termEnum = globalOrdinals.termsEnum(); + + BytesRef term = termEnum.next(); + while (term != null) { + if (Math.floorMod(term.hashCode(), incNumPartitions) == incZeroBasedPartition) { + acceptedGlobalOrdinals.set(termEnum.ord()); + } + term = termEnum.next(); + } + return acceptedGlobalOrdinals; + } + } + static class AutomatonBackedOrdinalsFilter extends OrdinalsFilter { private final CompiledAutomaton compiled; @@ -205,6 +253,8 @@ public class IncludeExclude implements Writeable, ToXContent { private final RegExp include, exclude; private final SortedSet includeValues, excludeValues; + private final int incZeroBasedPartition; + private final int incNumPartitions; /** * @param include The regular expression pattern for the terms to be included @@ -218,6 +268,8 @@ public class IncludeExclude implements Writeable, ToXContent { this.exclude = exclude; this.includeValues = null; this.excludeValues = null; + this.incZeroBasedPartition = 0; + this.incNumPartitions = 0; } public IncludeExclude(String include, String exclude) { @@ -234,6 +286,8 @@ public class IncludeExclude implements Writeable, ToXContent { } this.include = null; this.exclude = null; + this.incZeroBasedPartition = 0; + this.incNumPartitions = 0; this.includeValues = includeValues; this.excludeValues = excludeValues; } @@ -250,6 +304,21 @@ public class IncludeExclude implements Writeable, ToXContent { this(convertToBytesRefSet(includeValues), convertToBytesRefSet(excludeValues)); } + public IncludeExclude(int partition, int numPartitions) { + if (partition < 0 || partition >= numPartitions) { + throw new IllegalArgumentException("Partition must be >=0 and < numPartition which is "+numPartitions); + } + this.incZeroBasedPartition = partition; + this.incNumPartitions = numPartitions; + this.include = null; + this.exclude = null; + this.includeValues = null; + this.excludeValues = null; + + } + + + /** * Read from a stream. */ @@ -257,6 +326,8 @@ public class IncludeExclude implements Writeable, ToXContent { if (in.readBoolean()) { includeValues = null; excludeValues = null; + incZeroBasedPartition = 0; + incNumPartitions = 0; String includeString = in.readOptionalString(); include = includeString == null ? null : new RegExp(includeString); String excludeString = in.readOptionalString(); @@ -283,6 +354,13 @@ public class IncludeExclude implements Writeable, ToXContent { } else { excludeValues = null; } + if (in.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) { + incNumPartitions = in.readVInt(); + incZeroBasedPartition = in.readVInt(); + } else { + incNumPartitions = 0; + incZeroBasedPartition = 0; + } } @Override @@ -309,6 +387,10 @@ public class IncludeExclude implements Writeable, ToXContent { out.writeBytesRef(value); } } + if (out.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) { + out.writeVInt(incNumPartitions); + out.writeVInt(incZeroBasedPartition); + } } } @@ -436,11 +518,26 @@ public class IncludeExclude implements Writeable, ToXContent { if (token == XContentParser.Token.START_OBJECT) { if (parseFieldMatcher.match(currentFieldName, INCLUDE_FIELD)) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + + // This "include":{"pattern":"foo.*"} syntax is undocumented since 2.0 + // Regexes should be "include":"foo.*" if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); } else if (token == XContentParser.Token.VALUE_STRING) { if (parseFieldMatcher.match(currentFieldName, PATTERN_FIELD)) { otherOptions.put(INCLUDE_FIELD, parser.text()); + } else { + throw new ElasticsearchParseException( + "Unknown string parameter in Include/Exclude clause: " + currentFieldName); + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (parseFieldMatcher.match(currentFieldName, NUM_PARTITIONS_FIELD)) { + otherOptions.put(NUM_PARTITIONS_FIELD, parser.intValue()); + } else if (parseFieldMatcher.match(currentFieldName, PARTITION_FIELD)) { + otherOptions.put(INCLUDE_FIELD, parser.intValue()); + } else { + throw new ElasticsearchParseException( + "Unknown numeric parameter in Include/Exclude clause: " + currentFieldName); } } } @@ -480,15 +577,43 @@ public class IncludeExclude implements Writeable, ToXContent { public IncludeExclude createIncludeExclude(Map otherOptions) { Object includeObject = otherOptions.get(INCLUDE_FIELD); String include = null; + int partition = -1; + int numPartitions = -1; SortedSet includeValues = null; if (includeObject != null) { if (includeObject instanceof String) { include = (String) includeObject; } else if (includeObject instanceof SortedSet) { includeValues = (SortedSet) includeObject; + } else if (includeObject instanceof Integer) { + partition = (Integer) includeObject; + Object numPartitionsObject = otherOptions.get(NUM_PARTITIONS_FIELD); + if (numPartitionsObject instanceof Integer) { + numPartitions = (Integer) numPartitionsObject; + if (numPartitions < 2) { + throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " must be >1"); + } + if (partition < 0 || partition >= numPartitions) { + throw new IllegalArgumentException( + PARTITION_FIELD.getPreferredName() + " must be >=0 and <" + numPartitions); + } + } else { + if (numPartitionsObject == null) { + throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " parameter is missing"); + } + throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " value must be an integer"); + } } } Object excludeObject = otherOptions.get(EXCLUDE_FIELD); + if (numPartitions >0 ){ + if(excludeObject!=null){ + throw new IllegalArgumentException("Partitioned Include cannot be used in combination with excludes"); + } + return new IncludeExclude(partition, numPartitions); + } + + String exclude = null; SortedSet excludeValues = null; if (excludeObject != null) { @@ -517,6 +642,10 @@ public class IncludeExclude implements Writeable, ToXContent { return include != null || exclude != null; } + public boolean isPartitionBased() { + return incNumPartitions > 0; + } + private Automaton toAutomaton() { Automaton a = null; if (include != null) { @@ -538,6 +667,9 @@ public class IncludeExclude implements Writeable, ToXContent { if (isRegexBased()) { return new AutomatonBackedStringFilter(toAutomaton()); } + if (isPartitionBased()){ + return new PartitionedStringFilter(); + } return new TermListBackedStringFilter(parseForDocValues(includeValues, format), parseForDocValues(excludeValues, format)); } @@ -559,13 +691,22 @@ public class IncludeExclude implements Writeable, ToXContent { if (isRegexBased()) { return new AutomatonBackedOrdinalsFilter(toAutomaton()); } + if (isPartitionBased()){ + return new PartitionedOrdinalsFilter(); + } + return new TermListBackedOrdinalsFilter(parseForDocValues(includeValues, format), parseForDocValues(excludeValues, format)); } public LongFilter convertToLongFilter(DocValueFormat format) { + + if(isPartitionBased()){ + return new PartitionedLongFilter(); + } + int numValids = includeValues == null ? 0 : includeValues.size(); int numInvalids = excludeValues == null ? 0 : excludeValues.size(); - LongFilter result = new LongFilter(numValids, numInvalids); + SetBackedLongFilter result = new SetBackedLongFilter(numValids, numInvalids); if (includeValues != null) { for (BytesRef val : includeValues) { result.addAccept(format.parseLong(val.utf8ToString(), false, null)); @@ -580,9 +721,13 @@ public class IncludeExclude implements Writeable, ToXContent { } public LongFilter convertToDoubleFilter() { + if(isPartitionBased()){ + return new PartitionedLongFilter(); + } + int numValids = includeValues == null ? 0 : includeValues.size(); int numInvalids = excludeValues == null ? 0 : excludeValues.size(); - LongFilter result = new LongFilter(numValids, numInvalids); + SetBackedLongFilter result = new SetBackedLongFilter(numValids, numInvalids); if (includeValues != null) { for (BytesRef val : includeValues) { double dval = Double.parseDouble(val.utf8ToString()); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsIT.java index 1dc9943e8a3..ef477553bac 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsIT.java @@ -33,6 +33,7 @@ import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; import org.elasticsearch.search.aggregations.metrics.avg.Avg; import org.elasticsearch.search.aggregations.metrics.max.Max; @@ -48,10 +49,12 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.function.Function; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -359,6 +362,43 @@ public class DoubleTermsIT extends AbstractTermsTestCase { assertThat(bucket.getDocCount(), equalTo(1L)); } } + + public void testSingleValueFieldWithPartitionedFiltering() throws Exception { + runTestFieldWithPartitionedFiltering(SINGLE_VALUED_FIELD_NAME); + } + + public void testMultiValueFieldWithPartitionedFiltering() throws Exception { + runTestFieldWithPartitionedFiltering(MULTI_VALUED_FIELD_NAME); + } + + private void runTestFieldWithPartitionedFiltering(String field) throws Exception { + // Find total number of unique terms + SearchResponse allResponse = client().prepareSearch("idx").setTypes("type") + .addAggregation(terms("terms").field(field).size(10000).collectMode(randomFrom(SubAggCollectionMode.values()))) + .execute().actionGet(); + assertSearchResponse(allResponse); + Terms terms = allResponse.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + int expectedCardinality = terms.getBuckets().size(); + + // Gather terms using partitioned aggregations + final int numPartitions = randomIntBetween(2, 4); + Set foundTerms = new HashSet<>(); + for (int partition = 0; partition < numPartitions; partition++) { + SearchResponse response = client().prepareSearch("idx").setTypes("type").addAggregation(terms("terms").field(field) + .includeExclude(new IncludeExclude(partition, numPartitions)).collectMode(randomFrom(SubAggCollectionMode.values()))) + .execute().actionGet(); + assertSearchResponse(response); + terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + for (Bucket bucket : terms.getBuckets()) { + assertTrue(foundTerms.add(bucket.getKeyAsNumber())); + } + } + assertEquals(expectedCardinality, foundTerms.size()); + } public void testSingleValueFieldOrderedByTermAsc() throws Exception { SearchResponse response = client().prepareSearch("idx").setTypes("type") diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsIT.java index 1739d09a054..35905f91a91 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsIT.java @@ -32,6 +32,7 @@ import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; import org.elasticsearch.search.aggregations.metrics.avg.Avg; import org.elasticsearch.search.aggregations.metrics.max.Max; @@ -47,10 +48,12 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.function.Function; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -326,6 +329,48 @@ public class LongTermsIT extends AbstractTermsTestCase { assertThat(bucket.getDocCount(), equalTo(1L)); } } + + + + public void testSingleValueFieldWithPartitionedFiltering() throws Exception { + runTestFieldWithPartitionedFiltering(SINGLE_VALUED_FIELD_NAME); + } + + public void testMultiValueFieldWithPartitionedFiltering() throws Exception { + runTestFieldWithPartitionedFiltering(MULTI_VALUED_FIELD_NAME); + } + + private void runTestFieldWithPartitionedFiltering(String field) throws Exception { + // Find total number of unique terms + SearchResponse allResponse = client().prepareSearch("idx").setTypes("type") + .addAggregation(terms("terms").field(field).collectMode(randomFrom(SubAggCollectionMode.values()))).execute().actionGet(); + assertSearchResponse(allResponse); + Terms terms = allResponse.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + int expectedCardinality = terms.getBuckets().size(); + + // Gather terms using partitioned aggregations + final int numPartitions = randomIntBetween(2, 4); + Set foundTerms = new HashSet<>(); + for (int partition = 0; partition < numPartitions; partition++) { + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .addAggregation( + terms("terms").field(field).includeExclude(new IncludeExclude(partition, numPartitions)) + .collectMode(randomFrom(SubAggCollectionMode.values()))) + .execute().actionGet(); + assertSearchResponse(response); + terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + for (Bucket bucket : terms.getBuckets()) { + assertFalse(foundTerms.contains(bucket.getKeyAsNumber())); + foundTerms.add(bucket.getKeyAsNumber()); + } + } + assertEquals(expectedCardinality, foundTerms.size()); + } + public void testSingleValueFieldWithMaxSize() throws Exception { SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type") diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsIT.java index 3d5d13bf04a..46af395c476 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsIT.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.search.aggregations.bucket; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.automaton.RegExp; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -37,6 +39,7 @@ import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode; import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; import org.elasticsearch.search.aggregations.metrics.avg.Avg; @@ -54,10 +57,12 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.function.Function; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -455,6 +460,44 @@ public class StringTermsIT extends AbstractTermsTestCase { } } + + public void testSingleValueFieldWithPartitionedFiltering() throws Exception { + runTestFieldWithPartitionedFiltering(SINGLE_VALUED_FIELD_NAME); + } + + public void testMultiValueFieldWithPartitionedFiltering() throws Exception { + runTestFieldWithPartitionedFiltering(MULTI_VALUED_FIELD_NAME); + } + + private void runTestFieldWithPartitionedFiltering(String field) throws Exception { + // Find total number of unique terms + SearchResponse allResponse = client().prepareSearch("idx").setTypes("type") + .addAggregation(terms("terms").field(field).size(10000).collectMode(randomFrom(SubAggCollectionMode.values()))) + .execute().actionGet(); + assertSearchResponse(allResponse); + Terms terms = allResponse.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + int expectedCardinality = terms.getBuckets().size(); + + // Gather terms using partitioned aggregations + final int numPartitions = randomIntBetween(2, 4); + Set foundTerms = new HashSet<>(); + for (int partition = 0; partition < numPartitions; partition++) { + SearchResponse response = client().prepareSearch("idx").setTypes("type").addAggregation(terms("terms").field(field) + .includeExclude(new IncludeExclude(partition, numPartitions)).collectMode(randomFrom(SubAggCollectionMode.values()))) + .execute().actionGet(); + assertSearchResponse(response); + terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + for (Bucket bucket : terms.getBuckets()) { + assertTrue(foundTerms.add(bucket.getKeyAsString())); + } + } + assertEquals(expectedCardinality, foundTerms.size()); + } + public void testSingleValueFieldWithMaxSize() throws Exception { SearchResponse response = client() diff --git a/docs/reference/aggregations/bucket/terms-aggregation.asciidoc b/docs/reference/aggregations/bucket/terms-aggregation.asciidoc index fb3baca0967..180bcad1d0b 100644 --- a/docs/reference/aggregations/bucket/terms-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/terms-aggregation.asciidoc @@ -514,7 +514,10 @@ TIP: for indexed scripts replace the `file` parameter with an `id` parameter. ==== Filtering Values It is possible to filter the values for which buckets will be created. This can be done using the `include` and -`exclude` parameters which are based on regular expression strings or arrays of exact values. +`exclude` parameters which are based on regular expression strings or arrays of exact values. Additionally, +`include` clauses can filter using `partition` expressions. + +===== Filtering Values with regular expressions [source,js] -------------------------------------------------- @@ -538,6 +541,8 @@ both are defined, the `exclude` has precedence, meaning, the `include` is evalua The syntax is the same as <>. +===== Filtering Values with exact values + For matching based on exact values the `include` and `exclude` parameters can simply take an array of strings that represent the terms as they are found in the index: @@ -561,6 +566,67 @@ strings that represent the terms as they are found in the index: } -------------------------------------------------- +===== Filtering Values with partitions + +Sometimes there are too many unique terms to process in a single request/response pair so +it can be useful to break the analysis up into multiple requests. +This can be achieved by grouping the field's values into a number of partitions at query-time and processing +only one partition in each request. +Consider this request which is looking for accounts that have not logged any access recently: + +[source,js] +-------------------------------------------------- +{ + "size": 0, + "aggs": { + "expired_sessions": { + "terms": { + "field": "account_id", + "include": { + "partition": 0, + "num_partitions": 20 + }, + "size": 10000, + "order": { + "last_access": "asc" + } + }, + "aggs": { + "last_access": { + "max": { + "field": "access_date" + } + } + } + } + } +} +-------------------------------------------------- + +This request is finding the last logged access date for a subset of customer accounts because we +might want to expire some customer accounts who haven't been seen for a long while. +The `num_partitions` setting has requested that the unique account_ids are organized evenly into twenty +partitions (0 to 19). and the `partition` setting in this request filters to only consider account_ids falling +into partition 0. Subsequent requests should ask for partitions 1 then 2 etc to complete the expired-account analysis. + +Note that the `size` setting for the number of results returned needs to be tuned with the `num_partitions`. +For this particular account-expiration example the process for balancing values for `size` and `num_partitions` would be as follows: + +1. Use the `cardinality` aggregation to estimate the total number of unique account_id values +2. Pick a value for `num_partitions` to break the number from 1) up into more manageable chunks +3. Pick a `size` value for the number of responses we want from each partition +4. Run a test request + +If we have a circuit-breaker error we are trying to do too much in one request and must increase `num_partitions`. +If the request was successful but the last account ID in the date-sorted test response was still an account we might want to +expire then we may be missing accounts of interest and have set our numbers too low. We must either + +* increase the `size` parameter to return more results per partition (could be heavy on memory) or +* increase the `num_partitions` to consider less accounts per request (could increase overall processing time as we need to make more requests) + +Ultimately this is a balancing act between managing the elasticsearch resources required to process a single request and the volume +of requests that the client application must issue to complete a task. + ==== Multi-field terms aggregation The `terms` aggregation does not support collecting terms from multiple fields