Aggregations - support for partitioning set of terms used in aggregations so that multiple requests can be done without trying to compute everything in one request.

Closes #21487
This commit is contained in:
markharwood 2016-11-17 15:32:59 +00:00
parent 0871073f9b
commit aa60e5cc07
6 changed files with 348 additions and 6 deletions

View File

@ -131,7 +131,10 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
// to be unbounded and most instances may only aggregate few // to be unbounded and most instances may only aggregate few
// documents, so use hashed based // documents, so use hashed based
// global ordinals to keep the bucket ords dense. // global ordinals to keep the bucket ords dense.
if (Aggregator.descendsFromBucketAggregator(parent)) { // Additionally, if using partitioned terms the regular global
// ordinals would be sparse so we opt for hash
if (Aggregator.descendsFromBucketAggregator(parent) ||
(includeExclude != null && includeExclude.isPartitionBased())) {
execution = ExecutionMode.GLOBAL_ORDINALS_HASH; execution = ExecutionMode.GLOBAL_ORDINALS_HASH;
} else { } else {
if (factories == AggregatorFactories.EMPTY) { if (factories == AggregatorFactories.EMPTY) {

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.search.aggregations.bucket.terms.support; package org.elasticsearch.search.aggregations.bucket.terms.support;
import com.carrotsearch.hppc.BitMixer;
import com.carrotsearch.hppc.LongHashSet; import com.carrotsearch.hppc.LongHashSet;
import com.carrotsearch.hppc.LongSet; import com.carrotsearch.hppc.LongSet;
@ -35,6 +36,7 @@ import org.apache.lucene.util.automaton.CompiledAutomaton;
import org.apache.lucene.util.automaton.Operations; import org.apache.lucene.util.automaton.Operations;
import org.apache.lucene.util.automaton.RegExp; import org.apache.lucene.util.automaton.RegExp;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -46,6 +48,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -61,15 +64,34 @@ public class IncludeExclude implements Writeable, ToXContent {
private static final ParseField INCLUDE_FIELD = new ParseField("include"); private static final ParseField INCLUDE_FIELD = new ParseField("include");
private static final ParseField EXCLUDE_FIELD = new ParseField("exclude"); private static final ParseField EXCLUDE_FIELD = new ParseField("exclude");
private static final ParseField PATTERN_FIELD = new ParseField("pattern"); private static final ParseField PATTERN_FIELD = new ParseField("pattern");
private static final ParseField PARTITION_FIELD = new ParseField("partition");
private static final ParseField NUM_PARTITIONS_FIELD = new ParseField("num_partitions");
// The includeValue and excludeValue ByteRefs which are the result of the parsing // The includeValue and excludeValue ByteRefs which are the result of the parsing
// process are converted into a LongFilter when used on numeric fields // process are converted into a LongFilter when used on numeric fields
// in the index. // in the index.
public static class LongFilter { public abstract static class LongFilter {
public abstract boolean accept(long value);
}
public class PartitionedLongFilter extends LongFilter {
private final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
@Override
public boolean accept(long value) {
// hash the value to keep even distributions
final long hashCode = BitMixer.mix64(value);
return Math.floorMod(hashCode, incNumPartitions) == incZeroBasedPartition;
}
}
public static class SetBackedLongFilter extends LongFilter {
private LongSet valids; private LongSet valids;
private LongSet invalids; private LongSet invalids;
private LongFilter(int numValids, int numInvalids) { private SetBackedLongFilter(int numValids, int numInvalids) {
if (numValids > 0) { if (numValids > 0) {
valids = new LongHashSet(numValids); valids = new LongHashSet(numValids);
} }
@ -96,6 +118,13 @@ public class IncludeExclude implements Writeable, ToXContent {
public abstract boolean accept(BytesRef value); public abstract boolean accept(BytesRef value);
} }
class PartitionedStringFilter extends StringFilter {
@Override
public boolean accept(BytesRef value) {
return Math.floorMod(value.hashCode(), incNumPartitions) == incZeroBasedPartition;
}
}
static class AutomatonBackedStringFilter extends StringFilter { static class AutomatonBackedStringFilter extends StringFilter {
private final ByteRunAutomaton runAutomaton; private final ByteRunAutomaton runAutomaton;
@ -138,6 +167,25 @@ public class IncludeExclude implements Writeable, ToXContent {
} }
class PartitionedOrdinalsFilter extends OrdinalsFilter {
@Override
public LongBitSet acceptedGlobalOrdinals(RandomAccessOrds globalOrdinals) throws IOException {
final long numOrds = globalOrdinals.getValueCount();
final LongBitSet acceptedGlobalOrdinals = new LongBitSet(numOrds);
final TermsEnum termEnum = globalOrdinals.termsEnum();
BytesRef term = termEnum.next();
while (term != null) {
if (Math.floorMod(term.hashCode(), incNumPartitions) == incZeroBasedPartition) {
acceptedGlobalOrdinals.set(termEnum.ord());
}
term = termEnum.next();
}
return acceptedGlobalOrdinals;
}
}
static class AutomatonBackedOrdinalsFilter extends OrdinalsFilter { static class AutomatonBackedOrdinalsFilter extends OrdinalsFilter {
private final CompiledAutomaton compiled; private final CompiledAutomaton compiled;
@ -205,6 +253,8 @@ public class IncludeExclude implements Writeable, ToXContent {
private final RegExp include, exclude; private final RegExp include, exclude;
private final SortedSet<BytesRef> includeValues, excludeValues; private final SortedSet<BytesRef> includeValues, excludeValues;
private final int incZeroBasedPartition;
private final int incNumPartitions;
/** /**
* @param include The regular expression pattern for the terms to be included * @param include The regular expression pattern for the terms to be included
@ -218,6 +268,8 @@ public class IncludeExclude implements Writeable, ToXContent {
this.exclude = exclude; this.exclude = exclude;
this.includeValues = null; this.includeValues = null;
this.excludeValues = null; this.excludeValues = null;
this.incZeroBasedPartition = 0;
this.incNumPartitions = 0;
} }
public IncludeExclude(String include, String exclude) { public IncludeExclude(String include, String exclude) {
@ -234,6 +286,8 @@ public class IncludeExclude implements Writeable, ToXContent {
} }
this.include = null; this.include = null;
this.exclude = null; this.exclude = null;
this.incZeroBasedPartition = 0;
this.incNumPartitions = 0;
this.includeValues = includeValues; this.includeValues = includeValues;
this.excludeValues = excludeValues; this.excludeValues = excludeValues;
} }
@ -250,6 +304,21 @@ public class IncludeExclude implements Writeable, ToXContent {
this(convertToBytesRefSet(includeValues), convertToBytesRefSet(excludeValues)); this(convertToBytesRefSet(includeValues), convertToBytesRefSet(excludeValues));
} }
public IncludeExclude(int partition, int numPartitions) {
if (partition < 0 || partition >= numPartitions) {
throw new IllegalArgumentException("Partition must be >=0 and < numPartition which is "+numPartitions);
}
this.incZeroBasedPartition = partition;
this.incNumPartitions = numPartitions;
this.include = null;
this.exclude = null;
this.includeValues = null;
this.excludeValues = null;
}
/** /**
* Read from a stream. * Read from a stream.
*/ */
@ -257,6 +326,8 @@ public class IncludeExclude implements Writeable, ToXContent {
if (in.readBoolean()) { if (in.readBoolean()) {
includeValues = null; includeValues = null;
excludeValues = null; excludeValues = null;
incZeroBasedPartition = 0;
incNumPartitions = 0;
String includeString = in.readOptionalString(); String includeString = in.readOptionalString();
include = includeString == null ? null : new RegExp(includeString); include = includeString == null ? null : new RegExp(includeString);
String excludeString = in.readOptionalString(); String excludeString = in.readOptionalString();
@ -283,6 +354,13 @@ public class IncludeExclude implements Writeable, ToXContent {
} else { } else {
excludeValues = null; excludeValues = null;
} }
if (in.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) {
incNumPartitions = in.readVInt();
incZeroBasedPartition = in.readVInt();
} else {
incNumPartitions = 0;
incZeroBasedPartition = 0;
}
} }
@Override @Override
@ -309,6 +387,10 @@ public class IncludeExclude implements Writeable, ToXContent {
out.writeBytesRef(value); out.writeBytesRef(value);
} }
} }
if (out.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) {
out.writeVInt(incNumPartitions);
out.writeVInt(incZeroBasedPartition);
}
} }
} }
@ -436,11 +518,26 @@ public class IncludeExclude implements Writeable, ToXContent {
if (token == XContentParser.Token.START_OBJECT) { if (token == XContentParser.Token.START_OBJECT) {
if (parseFieldMatcher.match(currentFieldName, INCLUDE_FIELD)) { if (parseFieldMatcher.match(currentFieldName, INCLUDE_FIELD)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
// This "include":{"pattern":"foo.*"} syntax is undocumented since 2.0
// Regexes should be "include":"foo.*"
if (token == XContentParser.Token.FIELD_NAME) { if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName(); currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) { } else if (token == XContentParser.Token.VALUE_STRING) {
if (parseFieldMatcher.match(currentFieldName, PATTERN_FIELD)) { if (parseFieldMatcher.match(currentFieldName, PATTERN_FIELD)) {
otherOptions.put(INCLUDE_FIELD, parser.text()); otherOptions.put(INCLUDE_FIELD, parser.text());
} else {
throw new ElasticsearchParseException(
"Unknown string parameter in Include/Exclude clause: " + currentFieldName);
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (parseFieldMatcher.match(currentFieldName, NUM_PARTITIONS_FIELD)) {
otherOptions.put(NUM_PARTITIONS_FIELD, parser.intValue());
} else if (parseFieldMatcher.match(currentFieldName, PARTITION_FIELD)) {
otherOptions.put(INCLUDE_FIELD, parser.intValue());
} else {
throw new ElasticsearchParseException(
"Unknown numeric parameter in Include/Exclude clause: " + currentFieldName);
} }
} }
} }
@ -480,15 +577,43 @@ public class IncludeExclude implements Writeable, ToXContent {
public IncludeExclude createIncludeExclude(Map<ParseField, Object> otherOptions) { public IncludeExclude createIncludeExclude(Map<ParseField, Object> otherOptions) {
Object includeObject = otherOptions.get(INCLUDE_FIELD); Object includeObject = otherOptions.get(INCLUDE_FIELD);
String include = null; String include = null;
int partition = -1;
int numPartitions = -1;
SortedSet<BytesRef> includeValues = null; SortedSet<BytesRef> includeValues = null;
if (includeObject != null) { if (includeObject != null) {
if (includeObject instanceof String) { if (includeObject instanceof String) {
include = (String) includeObject; include = (String) includeObject;
} else if (includeObject instanceof SortedSet) { } else if (includeObject instanceof SortedSet) {
includeValues = (SortedSet<BytesRef>) includeObject; includeValues = (SortedSet<BytesRef>) includeObject;
} else if (includeObject instanceof Integer) {
partition = (Integer) includeObject;
Object numPartitionsObject = otherOptions.get(NUM_PARTITIONS_FIELD);
if (numPartitionsObject instanceof Integer) {
numPartitions = (Integer) numPartitionsObject;
if (numPartitions < 2) {
throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " must be >1");
}
if (partition < 0 || partition >= numPartitions) {
throw new IllegalArgumentException(
PARTITION_FIELD.getPreferredName() + " must be >=0 and <" + numPartitions);
}
} else {
if (numPartitionsObject == null) {
throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " parameter is missing");
}
throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " value must be an integer");
}
} }
} }
Object excludeObject = otherOptions.get(EXCLUDE_FIELD); Object excludeObject = otherOptions.get(EXCLUDE_FIELD);
if (numPartitions >0 ){
if(excludeObject!=null){
throw new IllegalArgumentException("Partitioned Include cannot be used in combination with excludes");
}
return new IncludeExclude(partition, numPartitions);
}
String exclude = null; String exclude = null;
SortedSet<BytesRef> excludeValues = null; SortedSet<BytesRef> excludeValues = null;
if (excludeObject != null) { if (excludeObject != null) {
@ -517,6 +642,10 @@ public class IncludeExclude implements Writeable, ToXContent {
return include != null || exclude != null; return include != null || exclude != null;
} }
public boolean isPartitionBased() {
return incNumPartitions > 0;
}
private Automaton toAutomaton() { private Automaton toAutomaton() {
Automaton a = null; Automaton a = null;
if (include != null) { if (include != null) {
@ -538,6 +667,9 @@ public class IncludeExclude implements Writeable, ToXContent {
if (isRegexBased()) { if (isRegexBased()) {
return new AutomatonBackedStringFilter(toAutomaton()); return new AutomatonBackedStringFilter(toAutomaton());
} }
if (isPartitionBased()){
return new PartitionedStringFilter();
}
return new TermListBackedStringFilter(parseForDocValues(includeValues, format), parseForDocValues(excludeValues, format)); return new TermListBackedStringFilter(parseForDocValues(includeValues, format), parseForDocValues(excludeValues, format));
} }
@ -559,13 +691,22 @@ public class IncludeExclude implements Writeable, ToXContent {
if (isRegexBased()) { if (isRegexBased()) {
return new AutomatonBackedOrdinalsFilter(toAutomaton()); return new AutomatonBackedOrdinalsFilter(toAutomaton());
} }
if (isPartitionBased()){
return new PartitionedOrdinalsFilter();
}
return new TermListBackedOrdinalsFilter(parseForDocValues(includeValues, format), parseForDocValues(excludeValues, format)); return new TermListBackedOrdinalsFilter(parseForDocValues(includeValues, format), parseForDocValues(excludeValues, format));
} }
public LongFilter convertToLongFilter(DocValueFormat format) { public LongFilter convertToLongFilter(DocValueFormat format) {
if(isPartitionBased()){
return new PartitionedLongFilter();
}
int numValids = includeValues == null ? 0 : includeValues.size(); int numValids = includeValues == null ? 0 : includeValues.size();
int numInvalids = excludeValues == null ? 0 : excludeValues.size(); int numInvalids = excludeValues == null ? 0 : excludeValues.size();
LongFilter result = new LongFilter(numValids, numInvalids); SetBackedLongFilter result = new SetBackedLongFilter(numValids, numInvalids);
if (includeValues != null) { if (includeValues != null) {
for (BytesRef val : includeValues) { for (BytesRef val : includeValues) {
result.addAccept(format.parseLong(val.utf8ToString(), false, null)); result.addAccept(format.parseLong(val.utf8ToString(), false, null));
@ -580,9 +721,13 @@ public class IncludeExclude implements Writeable, ToXContent {
} }
public LongFilter convertToDoubleFilter() { public LongFilter convertToDoubleFilter() {
if(isPartitionBased()){
return new PartitionedLongFilter();
}
int numValids = includeValues == null ? 0 : includeValues.size(); int numValids = includeValues == null ? 0 : includeValues.size();
int numInvalids = excludeValues == null ? 0 : excludeValues.size(); int numInvalids = excludeValues == null ? 0 : excludeValues.size();
LongFilter result = new LongFilter(numValids, numInvalids); SetBackedLongFilter result = new SetBackedLongFilter(numValids, numInvalids);
if (includeValues != null) { if (includeValues != null) {
for (BytesRef val : includeValues) { for (BytesRef val : includeValues) {
double dval = Double.parseDouble(val.utf8ToString()); double dval = Double.parseDouble(val.utf8ToString());

View File

@ -33,6 +33,7 @@ import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.metrics.avg.Avg; import org.elasticsearch.search.aggregations.metrics.avg.Avg;
import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.search.aggregations.metrics.max.Max;
@ -48,10 +49,12 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -360,6 +363,43 @@ public class DoubleTermsIT extends AbstractTermsTestCase {
} }
} }
public void testSingleValueFieldWithPartitionedFiltering() throws Exception {
runTestFieldWithPartitionedFiltering(SINGLE_VALUED_FIELD_NAME);
}
public void testMultiValueFieldWithPartitionedFiltering() throws Exception {
runTestFieldWithPartitionedFiltering(MULTI_VALUED_FIELD_NAME);
}
private void runTestFieldWithPartitionedFiltering(String field) throws Exception {
// Find total number of unique terms
SearchResponse allResponse = client().prepareSearch("idx").setTypes("type")
.addAggregation(terms("terms").field(field).size(10000).collectMode(randomFrom(SubAggCollectionMode.values())))
.execute().actionGet();
assertSearchResponse(allResponse);
Terms terms = allResponse.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
int expectedCardinality = terms.getBuckets().size();
// Gather terms using partitioned aggregations
final int numPartitions = randomIntBetween(2, 4);
Set<Number> foundTerms = new HashSet<>();
for (int partition = 0; partition < numPartitions; partition++) {
SearchResponse response = client().prepareSearch("idx").setTypes("type").addAggregation(terms("terms").field(field)
.includeExclude(new IncludeExclude(partition, numPartitions)).collectMode(randomFrom(SubAggCollectionMode.values())))
.execute().actionGet();
assertSearchResponse(response);
terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
for (Bucket bucket : terms.getBuckets()) {
assertTrue(foundTerms.add(bucket.getKeyAsNumber()));
}
}
assertEquals(expectedCardinality, foundTerms.size());
}
public void testSingleValueFieldOrderedByTermAsc() throws Exception { public void testSingleValueFieldOrderedByTermAsc() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type") SearchResponse response = client().prepareSearch("idx").setTypes("type")
.addAggregation(terms("terms") .addAggregation(terms("terms")

View File

@ -32,6 +32,7 @@ import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.metrics.avg.Avg; import org.elasticsearch.search.aggregations.metrics.avg.Avg;
import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.search.aggregations.metrics.max.Max;
@ -47,10 +48,12 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -327,6 +330,48 @@ public class LongTermsIT extends AbstractTermsTestCase {
} }
} }
public void testSingleValueFieldWithPartitionedFiltering() throws Exception {
runTestFieldWithPartitionedFiltering(SINGLE_VALUED_FIELD_NAME);
}
public void testMultiValueFieldWithPartitionedFiltering() throws Exception {
runTestFieldWithPartitionedFiltering(MULTI_VALUED_FIELD_NAME);
}
private void runTestFieldWithPartitionedFiltering(String field) throws Exception {
// Find total number of unique terms
SearchResponse allResponse = client().prepareSearch("idx").setTypes("type")
.addAggregation(terms("terms").field(field).collectMode(randomFrom(SubAggCollectionMode.values()))).execute().actionGet();
assertSearchResponse(allResponse);
Terms terms = allResponse.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
int expectedCardinality = terms.getBuckets().size();
// Gather terms using partitioned aggregations
final int numPartitions = randomIntBetween(2, 4);
Set<Number> foundTerms = new HashSet<>();
for (int partition = 0; partition < numPartitions; partition++) {
SearchResponse response = client().prepareSearch("idx").setTypes("type")
.addAggregation(
terms("terms").field(field).includeExclude(new IncludeExclude(partition, numPartitions))
.collectMode(randomFrom(SubAggCollectionMode.values())))
.execute().actionGet();
assertSearchResponse(response);
terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
for (Bucket bucket : terms.getBuckets()) {
assertFalse(foundTerms.contains(bucket.getKeyAsNumber()));
foundTerms.add(bucket.getKeyAsNumber());
}
}
assertEquals(expectedCardinality, foundTerms.size());
}
public void testSingleValueFieldWithMaxSize() throws Exception { public void testSingleValueFieldWithMaxSize() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type") SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type")
.addAggregation(terms("terms") .addAggregation(terms("terms")

View File

@ -18,6 +18,8 @@
*/ */
package org.elasticsearch.search.aggregations.bucket; package org.elasticsearch.search.aggregations.bucket;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.automaton.RegExp; import org.apache.lucene.util.automaton.RegExp;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
@ -37,6 +39,7 @@ import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.metrics.avg.Avg; import org.elasticsearch.search.aggregations.metrics.avg.Avg;
@ -54,10 +57,12 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -456,6 +461,44 @@ public class StringTermsIT extends AbstractTermsTestCase {
} }
public void testSingleValueFieldWithPartitionedFiltering() throws Exception {
runTestFieldWithPartitionedFiltering(SINGLE_VALUED_FIELD_NAME);
}
public void testMultiValueFieldWithPartitionedFiltering() throws Exception {
runTestFieldWithPartitionedFiltering(MULTI_VALUED_FIELD_NAME);
}
private void runTestFieldWithPartitionedFiltering(String field) throws Exception {
// Find total number of unique terms
SearchResponse allResponse = client().prepareSearch("idx").setTypes("type")
.addAggregation(terms("terms").field(field).size(10000).collectMode(randomFrom(SubAggCollectionMode.values())))
.execute().actionGet();
assertSearchResponse(allResponse);
Terms terms = allResponse.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
int expectedCardinality = terms.getBuckets().size();
// Gather terms using partitioned aggregations
final int numPartitions = randomIntBetween(2, 4);
Set<String> foundTerms = new HashSet<>();
for (int partition = 0; partition < numPartitions; partition++) {
SearchResponse response = client().prepareSearch("idx").setTypes("type").addAggregation(terms("terms").field(field)
.includeExclude(new IncludeExclude(partition, numPartitions)).collectMode(randomFrom(SubAggCollectionMode.values())))
.execute().actionGet();
assertSearchResponse(response);
terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
for (Bucket bucket : terms.getBuckets()) {
assertTrue(foundTerms.add(bucket.getKeyAsString()));
}
}
assertEquals(expectedCardinality, foundTerms.size());
}
public void testSingleValueFieldWithMaxSize() throws Exception { public void testSingleValueFieldWithMaxSize() throws Exception {
SearchResponse response = client() SearchResponse response = client()
.prepareSearch("idx") .prepareSearch("idx")

View File

@ -514,7 +514,10 @@ TIP: for indexed scripts replace the `file` parameter with an `id` parameter.
==== Filtering Values ==== Filtering Values
It is possible to filter the values for which buckets will be created. This can be done using the `include` and It is possible to filter the values for which buckets will be created. This can be done using the `include` and
`exclude` parameters which are based on regular expression strings or arrays of exact values. `exclude` parameters which are based on regular expression strings or arrays of exact values. Additionally,
`include` clauses can filter using `partition` expressions.
===== Filtering Values with regular expressions
[source,js] [source,js]
-------------------------------------------------- --------------------------------------------------
@ -538,6 +541,8 @@ both are defined, the `exclude` has precedence, meaning, the `include` is evalua
The syntax is the same as <<regexp-syntax,regexp queries>>. The syntax is the same as <<regexp-syntax,regexp queries>>.
===== Filtering Values with exact values
For matching based on exact values the `include` and `exclude` parameters can simply take an array of For matching based on exact values the `include` and `exclude` parameters can simply take an array of
strings that represent the terms as they are found in the index: strings that represent the terms as they are found in the index:
@ -561,6 +566,67 @@ strings that represent the terms as they are found in the index:
} }
-------------------------------------------------- --------------------------------------------------
===== Filtering Values with partitions
Sometimes there are too many unique terms to process in a single request/response pair so
it can be useful to break the analysis up into multiple requests.
This can be achieved by grouping the field's values into a number of partitions at query-time and processing
only one partition in each request.
Consider this request which is looking for accounts that have not logged any access recently:
[source,js]
--------------------------------------------------
{
"size": 0,
"aggs": {
"expired_sessions": {
"terms": {
"field": "account_id",
"include": {
"partition": 0,
"num_partitions": 20
},
"size": 10000,
"order": {
"last_access": "asc"
}
},
"aggs": {
"last_access": {
"max": {
"field": "access_date"
}
}
}
}
}
}
--------------------------------------------------
This request is finding the last logged access date for a subset of customer accounts because we
might want to expire some customer accounts who haven't been seen for a long while.
The `num_partitions` setting has requested that the unique account_ids are organized evenly into twenty
partitions (0 to 19). and the `partition` setting in this request filters to only consider account_ids falling
into partition 0. Subsequent requests should ask for partitions 1 then 2 etc to complete the expired-account analysis.
Note that the `size` setting for the number of results returned needs to be tuned with the `num_partitions`.
For this particular account-expiration example the process for balancing values for `size` and `num_partitions` would be as follows:
1. Use the `cardinality` aggregation to estimate the total number of unique account_id values
2. Pick a value for `num_partitions` to break the number from 1) up into more manageable chunks
3. Pick a `size` value for the number of responses we want from each partition
4. Run a test request
If we have a circuit-breaker error we are trying to do too much in one request and must increase `num_partitions`.
If the request was successful but the last account ID in the date-sorted test response was still an account we might want to
expire then we may be missing accounts of interest and have set our numbers too low. We must either
* increase the `size` parameter to return more results per partition (could be heavy on memory) or
* increase the `num_partitions` to consider less accounts per request (could increase overall processing time as we need to make more requests)
Ultimately this is a balancing act between managing the elasticsearch resources required to process a single request and the volume
of requests that the client application must issue to complete a task.
==== Multi-field terms aggregation ==== Multi-field terms aggregation
The `terms` aggregation does not support collecting terms from multiple fields The `terms` aggregation does not support collecting terms from multiple fields