diff --git a/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc b/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc index 0e08fcff5fc..0cc3582030c 100644 --- a/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc +++ b/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc @@ -65,6 +65,8 @@ the client. NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sense). When it is, elasticsearch will override it and reset it to be equal to `size`. +added[1.1.0] It is possible to not limit the number of terms that are returned by setting `size` to `0`. Don't use this +on high-cardinality fields as this will kill both your CPU since terms need to be return sorted, and your network. ==== Order diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java index de379f6ac6d..b30dac7a91e 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -167,7 +167,7 @@ public class DoubleTerms extends InternalTerms { this.name = in.readString(); this.order = InternalOrder.Streams.readOrder(in); this.valueFormatter = ValueFormatterStreams.readOptional(in); - this.requiredSize = in.readVInt(); + this.requiredSize = readSize(in); this.minDocCount = in.readVLong(); int size = in.readVInt(); List buckets = new ArrayList(size); @@ -183,7 +183,7 @@ public class DoubleTerms extends InternalTerms { out.writeString(name); InternalOrder.Streams.writeOrder(order, out); ValueFormatterStreams.writeOptional(valueFormatter, out); - out.writeVInt(requiredSize); + writeSize(requiredSize, out); out.writeVLong(minDocCount); out.writeVInt(buckets.size()); for (InternalTerms.Bucket bucket : buckets) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index f4d0f5a650a..c2717dfac9b 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -21,6 +21,8 @@ package org.elasticsearch.search.aggregations.bucket.terms; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.elasticsearch.cache.recycler.CacheRecycler; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.ToXContent; @@ -29,6 +31,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue; +import java.io.IOException; import java.util.*; /** @@ -185,4 +188,17 @@ public abstract class InternalTerms extends InternalAggregation implements Terms buckets = newBuckets; } + // 0 actually means unlimited + protected static int readSize(StreamInput in) throws IOException { + final int size = in.readVInt(); + return size == 0 ? Integer.MAX_VALUE : size; + } + + protected static void writeSize(int size, StreamOutput out) throws IOException { + if (size == Integer.MAX_VALUE) { + size = 0; + } + out.writeVInt(size); + } + } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java index 37600f81cde..0bdd68ab750 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java @@ -164,7 +164,7 @@ public class LongTerms extends InternalTerms { this.name = in.readString(); this.order = InternalOrder.Streams.readOrder(in); this.valueFormatter = ValueFormatterStreams.readOptional(in); - this.requiredSize = in.readVInt(); + this.requiredSize = readSize(in); this.minDocCount = in.readVLong(); int size = in.readVInt(); List buckets = new ArrayList(size); @@ -180,7 +180,7 @@ public class LongTerms extends InternalTerms { out.writeString(name); InternalOrder.Streams.writeOrder(order, out); ValueFormatterStreams.writeOptional(valueFormatter, out); - out.writeVInt(requiredSize); + writeSize(requiredSize, out); out.writeVLong(minDocCount); out.writeVInt(buckets.size()); for (InternalTerms.Bucket bucket : buckets) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java index f68cbe02df4..4d50e5c4a78 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java @@ -96,7 +96,7 @@ public class StringTerms extends InternalTerms { public void readFrom(StreamInput in) throws IOException { this.name = in.readString(); this.order = InternalOrder.Streams.readOrder(in); - this.requiredSize = in.readVInt(); + this.requiredSize = readSize(in); this.minDocCount = in.readVLong(); int size = in.readVInt(); List buckets = new ArrayList(size); @@ -111,7 +111,7 @@ public class StringTerms extends InternalTerms { public void writeTo(StreamOutput out) throws IOException { out.writeString(name); InternalOrder.Streams.writeOrder(order, out); - out.writeVInt(requiredSize); + writeSize(requiredSize, out); out.writeVLong(minDocCount); out.writeVInt(buckets.size()); for (InternalTerms.Bucket bucket : buckets) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java index 3634feb7e46..1bc08876dc4 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java @@ -176,6 +176,14 @@ public class TermsParser implements Aggregator.Parser { } } + if (shardSize == 0) { + shardSize = Integer.MAX_VALUE; + } + + if (requiredSize == 0) { + requiredSize = Integer.MAX_VALUE; + } + // shard_size cannot be smaller than size as we need to at least fetch entries from every shards in order to return if (shardSize < requiredSize) { shardSize = requiredSize; diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index bf04f3724a6..d86c7baf446 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -66,7 +66,7 @@ public class UnmappedTerms extends InternalTerms { public void readFrom(StreamInput in) throws IOException { this.name = in.readString(); this.order = InternalOrder.Streams.readOrder(in); - this.requiredSize = in.readVInt(); + this.requiredSize = readSize(in); this.minDocCount = in.readVLong(); this.buckets = BUCKETS; this.bucketMap = BUCKETS_MAP; @@ -76,7 +76,7 @@ public class UnmappedTerms extends InternalTerms { public void writeTo(StreamOutput out) throws IOException { out.writeString(name); InternalOrder.Streams.writeOrder(order, out); - out.writeVInt(requiredSize); + writeSize(requiredSize, out); out.writeVLong(minDocCount); } diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsTests.java index 54b0baa8392..c894d6aaf5f 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsTests.java @@ -64,7 +64,7 @@ public class DoubleTermsTests extends ElasticsearchIntegrationTest { @Before public void init() throws Exception { createIndex("idx"); - + IndexRequestBuilder[] lowcardBuilders = new IndexRequestBuilder[NUM_DOCS]; for (int i = 0; i < lowcardBuilders.length; i++) { lowcardBuilders[i] = client().prepareIndex("idx", "type").setSource(jsonBuilder() @@ -72,7 +72,7 @@ public class DoubleTermsTests extends ElasticsearchIntegrationTest { .field("value", (double) i) .startArray("values").value((double)i).value(i + 1d).endArray() .endObject()); - + } indexRandom(randomBoolean(), lowcardBuilders); IndexRequestBuilder[] highCardBuilders = new IndexRequestBuilder[100]; // TODO: randomize the size? @@ -89,6 +89,24 @@ public class DoubleTermsTests extends ElasticsearchIntegrationTest { ensureSearchable(); } + @Test + // the main purpose of this test is to make sure we're not allocating 2GB of memory per shard + public void sizeIsZero() { + SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type") + .addAggregation(terms("terms") + .field("value") + .minDocCount(randomInt(1)) + .size(0)) + .execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + assertThat(terms.buckets().size(), equalTo(100)); + } + @Test public void singleValueField() throws Exception { SearchResponse response = client().prepareSearch("idx").setTypes("type") @@ -550,7 +568,8 @@ public class DoubleTermsTests extends ElasticsearchIntegrationTest { public void unmapped() throws Exception { SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type") .addAggregation(terms("terms") - .field("value")) + .field("value") + .size(randomInt(5))) .execute().actionGet(); assertSearchResponse(response); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsTests.java index c480a553171..0c5db790b8a 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsTests.java @@ -87,6 +87,24 @@ public class LongTermsTests extends ElasticsearchIntegrationTest { ensureSearchable(); } + @Test + // the main purpose of this test is to make sure we're not allocating 2GB of memory per shard + public void sizeIsZero() { + SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type") + .addAggregation(terms("terms") + .field("value") + .minDocCount(randomInt(1)) + .size(0)) + .execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + assertThat(terms.buckets().size(), equalTo(100)); + } + @Test public void singleValueField() throws Exception { SearchResponse response = client().prepareSearch("idx").setTypes("type") @@ -544,7 +562,8 @@ public class LongTermsTests extends ElasticsearchIntegrationTest { public void unmapped() throws Exception { SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type") .addAggregation(terms("terms") - .field("value")) + .field("value") + .size(randomInt(5))) .execute().actionGet(); assertSearchResponse(response); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsTests.java index 3adb08ffef0..e78b15bdfc2 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsTests.java @@ -97,6 +97,26 @@ public class StringTermsTests extends ElasticsearchIntegrationTest { ensureSearchable(); } + @Test + // the main purpose of this test is to make sure we're not allocating 2GB of memory per shard + public void sizeIsZero() { + final int minDocCount = randomInt(1); + SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type") + .addAggregation(terms("terms") + .executionHint(randomExecutionHint()) + .field("value") + .minDocCount(minDocCount) + .size(0)) + .execute().actionGet(); + + assertSearchResponse(response);System.out.println(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + assertThat(terms.buckets().size(), equalTo(minDocCount == 0 ? 105 : 100)); // 105 because of the other type + } + @Test public void singleValueField() throws Exception { SearchResponse response = client().prepareSearch("idx").setTypes("type") @@ -686,6 +706,7 @@ public class StringTermsTests extends ElasticsearchIntegrationTest { SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type") .addAggregation(terms("terms") .executionHint(randomExecutionHint()) + .size(randomInt(5)) .field("value")) .execute().actionGet();