Terms aggregations: make size=0 return all terms.

Terms aggregations return up to `size` terms, so up to now, the way to get all
matching terms back was to set `size` to an arbitrary high number that would be
larger than the number of unique terms.

Terms aggregators already made sure to not allocate memory based on the `size`
parameter so this commit mostly consists in making `0` an alias for the
maximum integer value in the TermsParser.

Close #4837
This commit is contained in:
Adrien Grand 2014-01-21 23:00:19 +01:00
parent 75778d082b
commit 9282ae4ffd
10 changed files with 97 additions and 12 deletions

View File

@ -65,6 +65,8 @@ the client.
NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sense). When it is, elasticsearch will
override it and reset it to be equal to `size`.
added[1.1.0] It is possible to not limit the number of terms that are returned by setting `size` to `0`. Don't use this
on high-cardinality fields as this will kill both your CPU since terms need to be return sorted, and your network.
==== Order

View File

@ -167,7 +167,7 @@ public class DoubleTerms extends InternalTerms {
this.name = in.readString();
this.order = InternalOrder.Streams.readOrder(in);
this.valueFormatter = ValueFormatterStreams.readOptional(in);
this.requiredSize = in.readVInt();
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<InternalTerms.Bucket>(size);
@ -183,7 +183,7 @@ public class DoubleTerms extends InternalTerms {
out.writeString(name);
InternalOrder.Streams.writeOrder(order, out);
ValueFormatterStreams.writeOptional(valueFormatter, out);
out.writeVInt(requiredSize);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {

View File

@ -21,6 +21,8 @@ package org.elasticsearch.search.aggregations.bucket.terms;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.ToXContent;
@ -29,6 +31,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;
import java.io.IOException;
import java.util.*;
/**
@ -185,4 +188,17 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
buckets = newBuckets;
}
// 0 actually means unlimited
protected static int readSize(StreamInput in) throws IOException {
final int size = in.readVInt();
return size == 0 ? Integer.MAX_VALUE : size;
}
protected static void writeSize(int size, StreamOutput out) throws IOException {
if (size == Integer.MAX_VALUE) {
size = 0;
}
out.writeVInt(size);
}
}

View File

@ -164,7 +164,7 @@ public class LongTerms extends InternalTerms {
this.name = in.readString();
this.order = InternalOrder.Streams.readOrder(in);
this.valueFormatter = ValueFormatterStreams.readOptional(in);
this.requiredSize = in.readVInt();
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<InternalTerms.Bucket>(size);
@ -180,7 +180,7 @@ public class LongTerms extends InternalTerms {
out.writeString(name);
InternalOrder.Streams.writeOrder(order, out);
ValueFormatterStreams.writeOptional(valueFormatter, out);
out.writeVInt(requiredSize);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {

View File

@ -96,7 +96,7 @@ public class StringTerms extends InternalTerms {
public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();
this.order = InternalOrder.Streams.readOrder(in);
this.requiredSize = in.readVInt();
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<InternalTerms.Bucket>(size);
@ -111,7 +111,7 @@ public class StringTerms extends InternalTerms {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
InternalOrder.Streams.writeOrder(order, out);
out.writeVInt(requiredSize);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {

View File

@ -176,6 +176,14 @@ public class TermsParser implements Aggregator.Parser {
}
}
if (shardSize == 0) {
shardSize = Integer.MAX_VALUE;
}
if (requiredSize == 0) {
requiredSize = Integer.MAX_VALUE;
}
// shard_size cannot be smaller than size as we need to at least fetch <size> entries from every shards in order to return <size>
if (shardSize < requiredSize) {
shardSize = requiredSize;

View File

@ -66,7 +66,7 @@ public class UnmappedTerms extends InternalTerms {
public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();
this.order = InternalOrder.Streams.readOrder(in);
this.requiredSize = in.readVInt();
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
this.buckets = BUCKETS;
this.bucketMap = BUCKETS_MAP;
@ -76,7 +76,7 @@ public class UnmappedTerms extends InternalTerms {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
InternalOrder.Streams.writeOrder(order, out);
out.writeVInt(requiredSize);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
}

View File

@ -64,7 +64,7 @@ public class DoubleTermsTests extends ElasticsearchIntegrationTest {
@Before
public void init() throws Exception {
createIndex("idx");
IndexRequestBuilder[] lowcardBuilders = new IndexRequestBuilder[NUM_DOCS];
for (int i = 0; i < lowcardBuilders.length; i++) {
lowcardBuilders[i] = client().prepareIndex("idx", "type").setSource(jsonBuilder()
@ -72,7 +72,7 @@ public class DoubleTermsTests extends ElasticsearchIntegrationTest {
.field("value", (double) i)
.startArray("values").value((double)i).value(i + 1d).endArray()
.endObject());
}
indexRandom(randomBoolean(), lowcardBuilders);
IndexRequestBuilder[] highCardBuilders = new IndexRequestBuilder[100]; // TODO: randomize the size?
@ -89,6 +89,24 @@ public class DoubleTermsTests extends ElasticsearchIntegrationTest {
ensureSearchable();
}
@Test
// the main purpose of this test is to make sure we're not allocating 2GB of memory per shard
public void sizeIsZero() {
SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type")
.addAggregation(terms("terms")
.field("value")
.minDocCount(randomInt(1))
.size(0))
.execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
assertThat(terms.buckets().size(), equalTo(100));
}
@Test
public void singleValueField() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type")
@ -550,7 +568,8 @@ public class DoubleTermsTests extends ElasticsearchIntegrationTest {
public void unmapped() throws Exception {
SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type")
.addAggregation(terms("terms")
.field("value"))
.field("value")
.size(randomInt(5)))
.execute().actionGet();
assertSearchResponse(response);

View File

@ -87,6 +87,24 @@ public class LongTermsTests extends ElasticsearchIntegrationTest {
ensureSearchable();
}
@Test
// the main purpose of this test is to make sure we're not allocating 2GB of memory per shard
public void sizeIsZero() {
SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type")
.addAggregation(terms("terms")
.field("value")
.minDocCount(randomInt(1))
.size(0))
.execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
assertThat(terms.buckets().size(), equalTo(100));
}
@Test
public void singleValueField() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type")
@ -544,7 +562,8 @@ public class LongTermsTests extends ElasticsearchIntegrationTest {
public void unmapped() throws Exception {
SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type")
.addAggregation(terms("terms")
.field("value"))
.field("value")
.size(randomInt(5)))
.execute().actionGet();
assertSearchResponse(response);

View File

@ -97,6 +97,26 @@ public class StringTermsTests extends ElasticsearchIntegrationTest {
ensureSearchable();
}
@Test
// the main purpose of this test is to make sure we're not allocating 2GB of memory per shard
public void sizeIsZero() {
final int minDocCount = randomInt(1);
SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type")
.addAggregation(terms("terms")
.executionHint(randomExecutionHint())
.field("value")
.minDocCount(minDocCount)
.size(0))
.execute().actionGet();
assertSearchResponse(response);System.out.println(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
assertThat(terms.buckets().size(), equalTo(minDocCount == 0 ? 105 : 100)); // 105 because of the other type
}
@Test
public void singleValueField() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type")
@ -686,6 +706,7 @@ public class StringTermsTests extends ElasticsearchIntegrationTest {
SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type")
.addAggregation(terms("terms")
.executionHint(randomExecutionHint())
.size(randomInt(5))
.field("value"))
.execute().actionGet();