Make size=0 return all buckets for the geohash_grid aggregation.

Close #4875
This commit is contained in:
Adrien Grand 2014-02-05 09:33:24 +01:00
parent e1c1120949
commit 9cb17408cb
7 changed files with 64 additions and 21 deletions

View File

@ -117,11 +117,15 @@ precision:: Optional. The string length of the geohashes used to define
size:: Optional. The maximum number of geohash buckets to return size:: Optional. The maximum number of geohash buckets to return
(defaults to 10,000). When results are trimmed, buckets are (defaults to 10,000). When results are trimmed, buckets are
prioritised based on the volumes of documents they contain. prioritised based on the volumes of documents they contain.
added[1.1.0] A value of `0` will return all buckets that
contain a hit, use with caution as this could use a lot of CPU
and network bandwith if there are many buckets.
shard_size:: Optional. To allow for more accurate counting of the top cells shard_size:: Optional. To allow for more accurate counting of the top cells
returned in the final result the aggregation defaults to returned in the final result the aggregation defaults to
returning `max(10,(size x number-of-shards))` buckets from each returning `max(10,(size x number-of-shards))` buckets from each
shard. If this heuristic is undesirable, the number considered shard. If this heuristic is undesirable, the number considered
from each shard can be over-ridden using this parameter. from each shard can be over-ridden using this parameter.
added[1.1.0] A value of `0` makes the shard size unlimited.

View File

@ -21,10 +21,13 @@ package org.elasticsearch.search.aggregations;
import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
import java.util.List; import java.util.List;
/** /**
@ -125,6 +128,23 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
*/ */
public abstract InternalAggregation reduce(ReduceContext reduceContext); public abstract InternalAggregation reduce(ReduceContext reduceContext);
/**
* Read a size under the assumption that a value of 0 means unlimited.
*/
protected static int readSize(StreamInput in) throws IOException {
final int size = in.readVInt();
return size == 0 ? Integer.MAX_VALUE : size;
}
/**
* Write a size under the assumption that a value of 0 means unlimited.
*/
protected static void writeSize(int size, StreamOutput out) throws IOException {
if (size == Integer.MAX_VALUE) {
size = 0;
}
out.writeVInt(size);
}
/** /**
* Common xcontent fields that are shared among addAggregation * Common xcontent fields that are shared among addAggregation

View File

@ -39,11 +39,11 @@ public class BucketUtils {
return finalSize; return finalSize;
} }
//Cap the multiplier used for shards to avoid excessive data transfer //Cap the multiplier used for shards to avoid excessive data transfer
final int shardSampleSize = finalSize * Math.min(10, numberOfShards); final long shardSampleSize = (long) finalSize * Math.min(10, numberOfShards);
// When finalSize is very small e.g. 1 and there is a low number of // When finalSize is very small e.g. 1 and there is a low number of
// shards then we need to ensure we still gather a reasonable sample of statistics from each // shards then we need to ensure we still gather a reasonable sample of statistics from each
// shard (at low cost) to improve the chances of the final result being accurate. // shard (at low cost) to improve the chances of the final result being accurate.
return Math.max(10, shardSampleSize); return (int) Math.min(Integer.MAX_VALUE, Math.max(10, shardSampleSize));
} }
} }

View File

@ -55,7 +55,7 @@ public class GeoHashGridParser implements Aggregator.Parser {
String field = null; String field = null;
int precision = DEFAULT_PRECISION; int precision = DEFAULT_PRECISION;
int requiredSize = DEFAULT_MAX_NUM_CELLS; int requiredSize = DEFAULT_MAX_NUM_CELLS;
int shardSize = 0; int shardSize = -1;
XContentParser.Token token; XContentParser.Token token;
@ -78,11 +78,24 @@ public class GeoHashGridParser implements Aggregator.Parser {
} }
} }
if (shardSize == 0) { if (shardSize == 0) {
shardSize = Integer.MAX_VALUE;
}
if (requiredSize == 0) {
requiredSize = Integer.MAX_VALUE;
}
if (shardSize < 0) {
//Use default heuristic to avoid any wrong-ranking caused by distributed counting //Use default heuristic to avoid any wrong-ranking caused by distributed counting
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards()); shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards());
} }
if (shardSize < requiredSize) {
shardSize = requiredSize;
}
ValuesSourceConfig<GeoPointValuesSource> config = new ValuesSourceConfig<GeoPointValuesSource>(GeoPointValuesSource.class); ValuesSourceConfig<GeoPointValuesSource> config = new ValuesSourceConfig<GeoPointValuesSource>(GeoPointValuesSource.class);
if (field == null) { if (field == null) {
return new GeoGridFactory(aggregationName, config, precision, requiredSize, shardSize); return new GeoGridFactory(aggregationName, config, precision, requiredSize, shardSize);

View File

@ -256,7 +256,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
this.name = in.readString(); this.name = in.readString();
this.requiredSize = in.readVInt(); this.requiredSize = readSize(in);
int size = in.readVInt(); int size = in.readVInt();
List<Bucket> buckets = new ArrayList<Bucket>(size); List<Bucket> buckets = new ArrayList<Bucket>(size);
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
@ -269,7 +269,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(name); out.writeString(name);
out.writeVInt(requiredSize); writeSize(requiredSize, out);
out.writeVInt(buckets.size()); out.writeVInt(buckets.size());
for (Bucket bucket : buckets) { for (Bucket bucket : buckets) {
out.writeLong(bucket.geohashAsLong); out.writeLong(bucket.geohashAsLong);

View File

@ -21,8 +21,6 @@ package org.elasticsearch.search.aggregations.bucket.terms;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -31,7 +29,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue; import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;
import java.io.IOException;
import java.util.*; import java.util.*;
/** /**
@ -184,17 +181,4 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
buckets = newBuckets; buckets = newBuckets;
} }
// 0 actually means unlimited
protected static int readSize(StreamInput in) throws IOException {
final int size = in.readVInt();
return size == 0 ? Integer.MAX_VALUE : size;
}
protected static void writeSize(int size, StreamOutput out) throws IOException {
if (size == Integer.MAX_VALUE) {
size = 0;
}
out.writeVInt(size);
}
} }

View File

@ -42,6 +42,7 @@ import java.util.Random;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.geohashGrid; import static org.elasticsearch.search.aggregations.AggregationBuilders.geohashGrid;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
/** /**
* *
@ -241,4 +242,25 @@ public class GeoHashGridTests extends ElasticsearchIntegrationTest {
} }
} }
@Test
// making sure this doesn't runs into an OOME
public void sizeIsZero() {
for (int precision = 1; precision <= highestPrecisionGeohash; precision++) {
final int size = randomBoolean() ? 0 : randomIntBetween(1, Integer.MAX_VALUE);
final int shardSize = randomBoolean() ? -1 : 0;
SearchResponse response = client().prepareSearch("idx")
.addAggregation(geohashGrid("geohashgrid")
.field("location")
.size(size)
.shardSize(shardSize)
.precision(precision)
)
.execute().actionGet();
assertThat(response.getFailedShards(), equalTo(0));
GeoHashGrid geoGrid = response.getAggregations().get("geohashgrid");
assertThat(geoGrid.getBuckets().size(), greaterThanOrEqualTo(1));
}
}
} }