From 602de046922a42e7f85d0da060cfb784eb863d68 Mon Sep 17 00:00:00 2001 From: markharwood Date: Fri, 29 Nov 2013 14:59:09 +0000 Subject: [PATCH] A GeoHashGrid aggregation that buckets GeoPoints into cells whose dimensions are determined by a choice of GeoHash resolution. Added a long-based representation of GeoHashes to GeoHashUtils for fast evaluation in aggregations. The new BucketUtils provides a common heuristic for determining the number of results to obtain from each shard in "top N" type requests. --- .../search/aggregations/bucket.asciidoc | 4 +- .../bucket/geohashgrid-aggregation.asciidoc | 116 +++++++ .../common/geo/GeoHashUtils.java | 142 +++++++++ .../aggregations/AggregationBuilders.java | 15 +- .../aggregations/AggregationModule.java | 12 +- .../TransportAggregationModule.java | 12 +- .../aggregations/bucket/BucketUtils.java | 49 +++ .../bucket/geogrid/GeoHashGrid.java | 42 +++ .../bucket/geogrid/GeoHashGridAggregator.java | 164 ++++++++++ .../bucket/geogrid/GeoHashGridBuilder.java | 85 +++++ .../bucket/geogrid/GeoHashGridParser.java | 208 +++++++++++++ .../bucket/geogrid/InternalGeoHashGrid.java | 292 ++++++++++++++++++ .../common/geo/GeoHashTests.java | 60 ++++ .../aggregations/bucket/GeoHashGridTests.java | 247 +++++++++++++++ 14 files changed, 1432 insertions(+), 16 deletions(-) create mode 100644 docs/reference/search/aggregations/bucket/geohashgrid-aggregation.asciidoc create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGrid.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregator.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridBuilder.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridParser.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java create mode 100644 src/test/java/org/elasticsearch/common/geo/GeoHashTests.java create mode 100644 src/test/java/org/elasticsearch/search/aggregations/bucket/GeoHashGridTests.java diff --git a/docs/reference/search/aggregations/bucket.asciidoc b/docs/reference/search/aggregations/bucket.asciidoc index 4e646a49706..6bdd5042abc 100644 --- a/docs/reference/search/aggregations/bucket.asciidoc +++ b/docs/reference/search/aggregations/bucket.asciidoc @@ -20,4 +20,6 @@ include::bucket/histogram-aggregation.asciidoc[] include::bucket/datehistogram-aggregation.asciidoc[] -include::bucket/geodistance-aggregation.asciidoc[] \ No newline at end of file +include::bucket/geodistance-aggregation.asciidoc[] + +include::bucket/geohashgrid-aggregation.asciidoc[] \ No newline at end of file diff --git a/docs/reference/search/aggregations/bucket/geohashgrid-aggregation.asciidoc b/docs/reference/search/aggregations/bucket/geohashgrid-aggregation.asciidoc new file mode 100644 index 00000000000..2d940861422 --- /dev/null +++ b/docs/reference/search/aggregations/bucket/geohashgrid-aggregation.asciidoc @@ -0,0 +1,116 @@ +[[search-aggregations-bucket-geohashgrid-aggregation]] +=== GeoHash grid + +A multi-bucket aggregation that works on `geo_point` fields and groups points into buckets that represent cells in a grid. +The resulting grid can be sparse and only contains cells that have matching data. Each cell is labeled using a http://en.wikipedia.org/wiki/Geohash[geohash] which is of user-definable precision. + +* High precision geohashes have a long string length and represent cells that cover only a small area. +* Low precision geohashes have a short string length and represent cells that each cover a large area. + +Geohashes used in this aggregation can have a choice of precision between 1 and 12. + +WARNING: The highest-precision geohash of length 12 produces cells that cover less than a square metre of land and so high-precision requests can be very costly in terms of RAM and result sizes. +Please see the example below on how to first filter the aggregation to a smaller geographic area before requesting high-levels of detail. + +The specified field must be of type `geo_point` (which can only be set explicitly in the mappings) and it can also hold an array of `geo_point` fields, in which case all points will be taken into account during aggregation. + + +==== Simple low-precision request + +[source,js] +-------------------------------------------------- +{ + "aggregations" : { + "myLarge-GrainGeoHashGrid" : { + "geohashgrid" : { + "field" : "location", + "precision" : 3 + } + } + } +} +-------------------------------------------------- + +Response: + +[source,js] +-------------------------------------------------- +{ + "aggregations": { + "myLarge-GrainGeoHashGrid": { + "buckets": [ + { + "key": "svz", + "doc_count": 10964 + }, + { + "key": "sv8", + "doc_count": 3198 + } + ] + } + } +} +-------------------------------------------------- + + + +==== High-precision requests + +When requesting detailed buckets (typically for displaying a "zoomed in" map) a filter like <> should be applied to narrow the subject area otherwise potentially millions of buckets will be created and returned. + +[source,js] +-------------------------------------------------- +{ + "aggregations" : { + "zoomedInView" : { + "filter" : { + "geo_bounding_box" : { + "location" : { + "top_left" : "51.73, 0.9", + "bottom_right" : "51.55, 1.1" + } + } + }, + "aggregations":{ + "zoom1":{ + "geohashgrid" : { + "field":"location", + "precision":8, + } + } + } + } + } + } +-------------------------------------------------- + +=== Cell dimensions at the equator +The table below shows the metric dimensions for cells covered by various string lengths of geohash. +Cell dimensions vary with latitude and so the table is for the worst-case scenario at the equator. +[horizontal] +*GeoHash length*:: *Area width x height* +1:: 5,009.4km x 4,992.6km +2:: 1,252.3km x 624.1km +3:: 156.5km x 156km +4:: 39.1km x 19.5km +5:: 4.9km x 4.9km +6:: 1.2km x 609.4m +7:: 152.9m x 152.4m +8:: 38.2m x 19m +9:: 4.8m x 4.8m +10:: 1.2m x 59.5cm +11:: 14.9cm x 14.9cm +12:: 3.7cm x 1.9cm + + + +=== Options + +[horizontal] +field:: Mandatory. The name of the field indexed with GeoPoints. +precision:: Optional. The string length of the geohashes used to define cells/buckets in the results. Defaults to 5. +size:: Optional. The maximum number of geohash buckets to return (defaults to 10,000). When results are trimmed, buckets are prioritised based on the volumes of documents they contain. +shard_size:: Optional. To allow for more accurate counting of the top cells returned in the final result the aggregation defaults to returning max(10,(size x number-of-shards)) buckets from each shard. If this heuristic is undesirable, the number considered from each shard can be over-ridden using this parameter. + + diff --git a/src/main/java/org/elasticsearch/common/geo/GeoHashUtils.java b/src/main/java/org/elasticsearch/common/geo/GeoHashUtils.java index 4b3ac1b4a4e..dfcb7e1cd4a 100644 --- a/src/main/java/org/elasticsearch/common/geo/GeoHashUtils.java +++ b/src/main/java/org/elasticsearch/common/geo/GeoHashUtils.java @@ -338,4 +338,146 @@ public class GeoHashUtils { } return interval; } + + //========== long-based encodings for geohashes ======================================== + + + /** + * Encodes latitude and longitude information into a single long with variable precision. + * Up to 12 levels of precision are supported which should offer sub-metre resolution. + * + * @param latitude + * @param longitude + * @param precision The required precision between 1 and 12 + * @return A single long where 4 bits are used for holding the precision and the remaining + * 60 bits are reserved for 5 bit cell identifiers giving up to 12 layers. + */ + public static long encodeAsLong(double latitude, double longitude, int precision) { + if((precision>12)||(precision<1)) + { + throw new ElasticsearchIllegalArgumentException("Illegal precision length of "+precision+ + ". Long-based geohashes only support precisions between 1 and 12"); + } + double latInterval0 = -90.0; + double latInterval1 = 90.0; + double lngInterval0 = -180.0; + double lngInterval1 = 180.0; + + long geohash = 0l; + boolean isEven = true; + + int bit = 0; + int ch = 0; + + int geohashLength=0; + while (geohashLength < precision) { + double mid = 0.0; + if (isEven) { + mid = (lngInterval0 + lngInterval1) / 2D; + if (longitude > mid) { + ch |= BITS[bit]; + lngInterval0 = mid; + } else { + lngInterval1 = mid; + } + } else { + mid = (latInterval0 + latInterval1) / 2D; + if (latitude > mid) { + ch |= BITS[bit]; + latInterval0 = mid; + } else { + latInterval1 = mid; + } + } + + isEven = !isEven; + + if (bit < 4) { + bit++; + } else { + geohashLength++; + geohash|=ch; + if(geohashLength>=4; + for (int i = precision-1; i >=0 ; i--) { + chars[i]= BASE_32[(int) (geohashAsLong&31)]; + geohashAsLong>>=5; + } + return new String(chars); + } + + + + public static GeoPoint decode(long geohash) { + GeoPoint point = new GeoPoint(); + decode(geohash, point); + return point; + } + + /** + * Decodes the given long-format geohash into a latitude and longitude + * + * @param geohash long format Geohash to decode + * @param ret The Geopoint into which the latitude and longitude will be stored + */ + public static void decode(long geohash, GeoPoint ret) { + double[] interval = decodeCell(geohash); + ret.reset((interval[0] + interval[1]) / 2D, (interval[2] + interval[3]) / 2D); + + } + + private static double[] decodeCell(long geohash) { + double[] interval = {-90.0, 90.0, -180.0, 180.0}; + boolean isEven = true; + + int precision= (int) (geohash&15); + geohash>>=4; + int[]cds=new int[precision]; + for (int i = precision-1; i >=0 ; i--) { + cds[i] = (int) (geohash&31); + geohash>>=5; + } + + for (int i = 0; i = 1; + if (numberOfShards == 1) { + return finalSize; + } + //Cap the multiplier used for shards to avoid excessive data transfer + final int shardSampleSize = finalSize * Math.min(10, numberOfShards); + // When finalSize is very small e.g. 1 and there is a low number of + // shards then we need to ensure we still gather a reasonable sample of statistics from each + // shard (at low cost) to improve the chances of the final result being accurate. + return Math.max(10, shardSampleSize); + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGrid.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGrid.java new file mode 100644 index 00000000000..acd937ed870 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGrid.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.geogrid; + +import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.search.aggregations.Aggregation; + +/** + * Represents the results of a GeoHashGrid aggregation + */ +public interface GeoHashGrid extends Iterable, Aggregation { + + public static interface Bucket extends org.elasticsearch.search.aggregations.bucket.Bucket { + + String getGeoHash(); + + GeoPoint getGeoPoint(); + + long getInternalKey(); + + } + + int getNumberOfBuckets(); + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregator.java new file mode 100644 index 00000000000..f03c57909f3 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregator.java @@ -0,0 +1,164 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.geogrid; + +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.index.fielddata.LongValues; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; +import org.elasticsearch.search.aggregations.bucket.LongHash; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.numeric.NumericValuesSource; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; + +/** + * Aggregates data expressed as GeoHash longs (for efficiency's sake) but formats results as Geohash strings. + * + */ + +public class GeoHashGridAggregator extends BucketsAggregator { + + private static final int INITIAL_CAPACITY = 50; // TODO sizing + + private final int requiredSize; + private final int shardSize; + private final NumericValuesSource valuesSource; + private final LongHash bucketOrds; + + public GeoHashGridAggregator(String name, AggregatorFactories factories, NumericValuesSource valuesSource, + int requiredSize, int shardSize, AggregationContext aggregationContext, Aggregator parent) { + super(name, BucketAggregationMode.PER_BUCKET, factories, INITIAL_CAPACITY, aggregationContext, parent); + this.valuesSource = valuesSource; + this.requiredSize = requiredSize; + this.shardSize = shardSize; + bucketOrds = new LongHash(INITIAL_CAPACITY,aggregationContext.pageCacheRecycler()); + } + + @Override + public boolean shouldCollect() { + return true; + } + + @Override + public void collect(int doc, long owningBucketOrdinal) throws IOException { + assert owningBucketOrdinal == 0; + final LongValues values = valuesSource.longValues(); + final int valuesCount = values.setDocument(doc); + + for (int i = 0; i < valuesCount; ++i) { + final long val = values.nextValue(); + long bucketOrdinal = bucketOrds.add(val); + if (bucketOrdinal < 0) { // already seen + bucketOrdinal = - 1 - bucketOrdinal; + } + collectBucket(doc, bucketOrdinal); + } + } + + // private impl that stores a bucket ord. This allows for computing the aggregations lazily. + static class OrdinalBucket extends InternalGeoHashGrid.Bucket { + + long bucketOrd; + + public OrdinalBucket() { + super(0, 0, (InternalAggregations) null); + } + + } + + + + @Override + public InternalGeoHashGrid buildAggregation(long owningBucketOrdinal) { + assert owningBucketOrdinal == 0; + final int size = (int) Math.min(bucketOrds.size(), shardSize); + + InternalGeoHashGrid.BucketPriorityQueue ordered = new InternalGeoHashGrid.BucketPriorityQueue(size); + OrdinalBucket spare = null; + for (long i = 0; i < bucketOrds.capacity(); ++i) { + final long ord = bucketOrds.id(i); + if (ord < 0) { + // slot is not allocated + continue; + } + + if (spare == null) { + spare = new OrdinalBucket(); + } + spare.geohashAsLong = bucketOrds.key(i); + spare.docCount = bucketDocCount(ord); + spare.bucketOrd = ord; + spare = (OrdinalBucket) ordered.insertWithOverflow(spare); + } + + final InternalGeoHashGrid.Bucket[] list = new InternalGeoHashGrid.Bucket[ordered.size()]; + for (int i = ordered.size() - 1; i >= 0; --i) { + final OrdinalBucket bucket = (OrdinalBucket) ordered.pop(); + bucket.aggregations = bucketAggregations(bucket.bucketOrd); + list[i] = bucket; + } + return new InternalGeoHashGrid(name, requiredSize, Arrays.asList(list)); + } + + @Override + public InternalGeoHashGrid buildEmptyAggregation() { + return new InternalGeoHashGrid(name, requiredSize, Collections.emptyList()); + } + + + @Override + public void doRelease() { + Releasables.release(bucketOrds); + } + + public static class Unmapped extends Aggregator { + private int requiredSize; + public Unmapped(String name, int requiredSize, AggregationContext aggregationContext, Aggregator parent) { + + super(name, BucketAggregationMode.PER_BUCKET, AggregatorFactories.EMPTY, 0, aggregationContext, parent); + this.requiredSize=requiredSize; + } + + @Override + public boolean shouldCollect() { + return false; + } + + @Override + public void collect(int doc, long owningBucketOrdinal) throws IOException { + } + + @Override + public InternalGeoHashGrid buildAggregation(long owningBucketOrdinal) { + return (InternalGeoHashGrid) buildEmptyAggregation(); + } + + @Override + public InternalGeoHashGrid buildEmptyAggregation() { + return new InternalGeoHashGrid(name, requiredSize, Collections.emptyList()); + } + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridBuilder.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridBuilder.java new file mode 100644 index 00000000000..2eefd84fe81 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridBuilder.java @@ -0,0 +1,85 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.geogrid; + +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilder; + +import java.io.IOException; + +/** + * Creates an aggregation based on bucketing points into GeoHashes + * + */ +public class GeoHashGridBuilder extends AggregationBuilder { + + + private String field; + private int precision=GeoHashGridParser.DEFAULT_PRECISION; + private int requiredSize=GeoHashGridParser.DEFAULT_MAX_NUM_CELLS; + private int shardSize=0; + + public GeoHashGridBuilder(String name) { + super(name, InternalGeoHashGrid.TYPE.name()); + } + + public GeoHashGridBuilder field(String field) { + this.field = field; + return this; + } + + public GeoHashGridBuilder precision(int precision) { + if((precision<1)||(precision>12)) + { + throw new ElasticsearchIllegalArgumentException("Invalid geohash aggregation precision of "+precision + +"must be between 1 and 12"); + } + this.precision = precision; + return this; + } + public GeoHashGridBuilder size(int requiredSize) { + this.requiredSize = requiredSize; + return this; + } + public GeoHashGridBuilder shardSize(int shardSize) { + this.shardSize = shardSize; + return this; + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (field != null) { + builder.field("field", field); + } + if (precision != GeoHashGridParser.DEFAULT_PRECISION) { + builder.field("precision", precision); + } + if (requiredSize != GeoHashGridParser.DEFAULT_MAX_NUM_CELLS) { + builder.field("size", requiredSize); + } + if (shardSize != 0) { + builder.field("shard_size", shardSize); + } + + return builder.endObject(); + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridParser.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridParser.java new file mode 100644 index 00000000000..00da1712bb7 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridParser.java @@ -0,0 +1,208 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.geogrid; + +import org.elasticsearch.common.geo.GeoHashUtils; +import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.fielddata.*; +import org.elasticsearch.index.mapper.FieldMapper; +import org.elasticsearch.index.query.GeoBoundingBoxFilterBuilder; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.bucket.BucketUtils; +import org.elasticsearch.search.aggregations.support.*; +import org.elasticsearch.search.aggregations.support.geopoints.GeoPointValuesSource; +import org.elasticsearch.search.aggregations.support.numeric.NumericValuesSource; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; + +/** + * Aggregates Geo information into cells determined by geohashes of a given precision. + * WARNING - for high-precision geohashes it may prove necessary to use a {@link GeoBoundingBoxFilterBuilder} + * aggregation to focus in on a smaller area to avoid generating too many buckets and using too much RAM + * + */ +public class GeoHashGridParser implements Aggregator.Parser { + + @Override + public String type() { + return InternalGeoHashGrid.TYPE.name(); + } + + + public static final int DEFAULT_PRECISION=5; + public static final int DEFAULT_MAX_NUM_CELLS=10000; + + + @Override + public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException { + + String field = null; + int precision=DEFAULT_PRECISION; + int requiredSize=DEFAULT_MAX_NUM_CELLS; + int shardSize=0; + + + XContentParser.Token token; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if ("field".equals(currentFieldName)) { + field = parser.text(); + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if ("precision".equals(currentFieldName)) { + precision = parser.intValue(); + }else if ("size".equals(currentFieldName)) { + requiredSize = parser.intValue(); + }else if ("shard_size".equals(currentFieldName)) { + shardSize = parser.intValue(); + }else if ("shardSize".equals(currentFieldName)) { + shardSize = parser.intValue(); + } + + } + } + if(shardSize==0) + { + //Use default heuristic to avoid any wrong-ranking caused by distributed counting + shardSize=BucketUtils.suggestShardSideQueueSize(requiredSize,context.numberOfShards()); + } + + ValuesSourceConfig config = new ValuesSourceConfig(GeoPointValuesSource.class); + if (field == null) { + return new GeoGridFactory(aggregationName, config,precision,requiredSize,shardSize); + } + + FieldMapper mapper = context.smartNameFieldMapper(field); + if (mapper == null) { + config.unmapped(true); + return new GeoGridFactory(aggregationName, config,precision,requiredSize,shardSize); + } + + IndexFieldData indexFieldData = context.fieldData().getForField(mapper); + config.fieldContext(new FieldContext(field, indexFieldData)); + return new GeoGridFactory(aggregationName, config,precision,requiredSize,shardSize); + } + + + + + private static class GeoGridFactory extends ValueSourceAggregatorFactory { + + private int precision; + private int requiredSize; + private int shardSize; + + public GeoGridFactory(String name, ValuesSourceConfig valueSourceConfig, + int precision,int requiredSize,int shardSize) { + super(name, InternalGeoHashGrid.TYPE.name(), valueSourceConfig); + this.precision=precision; + this.requiredSize=requiredSize; + this.shardSize=shardSize; + } + + @Override + protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent) { + return new GeoHashGridAggregator.Unmapped(name, requiredSize, aggregationContext, parent); + } + + @Override + protected Aggregator create(final GeoPointValuesSource valuesSource, long expectedBucketsCount, AggregationContext aggregationContext, Aggregator parent) { + final CellValues cellIdValues = new CellValues(valuesSource,precision); + FieldDataSource.Numeric cellIdSource = new CellIdSource(cellIdValues, valuesSource.metaData()); + if (cellIdSource.metaData().multiValued()) { + // we need to wrap to ensure uniqueness + cellIdSource = new FieldDataSource.Numeric.SortedAndUnique(cellIdSource); + } + final NumericValuesSource geohashIdSource = new NumericValuesSource(cellIdSource,null,null); + return new GeoHashGridAggregator(name, factories, geohashIdSource, requiredSize, + shardSize, aggregationContext, parent); + + } + + private static class CellValues extends LongValues { + + private GeoPointValuesSource geoPointValues; + private GeoPointValues geoValues; + private int precision; + + protected CellValues(GeoPointValuesSource geoPointValues, int precision ) { + super(true); + this.geoPointValues = geoPointValues; + this.precision=precision; + } + @Override + public int setDocument(int docId) { + geoValues = geoPointValues.values(); + return geoValues.setDocument(docId); + } + + @Override + public long nextValue() { + GeoPoint target = geoValues.nextValue(); + return GeoHashUtils.encodeAsLong(target.getLat(), target.getLon(), precision); + } + + } + + private static class CellIdSource extends FieldDataSource.Numeric { + private final LongValues values; + private MetaData metaData; + + public CellIdSource(LongValues values, MetaData delegate) { + this.values = values; + //different GeoPoints could map to the same or different geohash cells. + this.metaData = MetaData.builder(delegate).uniqueness(MetaData.Uniqueness.UNKNOWN).build(); + } + + @Override + public boolean isFloatingPoint() { + return false; + } + + @Override + public LongValues longValues() { + return values; + } + + @Override + public DoubleValues doubleValues() { + throw new UnsupportedOperationException(); + } + + @Override + public BytesValues bytesValues() { + throw new UnsupportedOperationException(); + } + + @Override + public MetaData metaData() { + return metaData; + } + + } + } + +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java new file mode 100644 index 00000000000..4032b105f1a --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java @@ -0,0 +1,292 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.geogrid; + +import com.carrotsearch.hppc.LongObjectOpenHashMap; +import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.cache.recycler.CacheRecycler; +import org.elasticsearch.common.geo.GeoHashUtils; +import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; + +import java.io.IOException; +import java.util.*; + +/** + * Represents a grid of cells where each cell's location is determined by a geohash. + * All geohashes in a grid are of the same precision and held internally as a single long + * for efficiency's sake. + */ +public class InternalGeoHashGrid extends InternalAggregation implements GeoHashGrid{ + + public static final Type TYPE = new Type("geohashgrid", "ghcells"); + protected Map bucketMap; + + public static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public InternalGeoHashGrid readResult(StreamInput in) throws IOException { + InternalGeoHashGrid buckets = new InternalGeoHashGrid(); + buckets.readFrom(in); + return buckets; + } + }; + + public static void registerStreams() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + } + + + static class Bucket implements GeoHashGrid.Bucket , Comparable { + + protected long geohashAsLong; + protected long docCount; + protected InternalAggregations aggregations; + + public Bucket(long geohashAsLong, long docCount, InternalAggregations aggregations) { + this.docCount=docCount; + this.aggregations=aggregations; + this.geohashAsLong = geohashAsLong; + } + + public GeoPoint getGeoPoint() { + return GeoHashUtils.decode(geohashAsLong); + } + + + public String getGeoHash() { + return GeoHashUtils.toString(geohashAsLong); + } + + @Override + public long getDocCount() { + return docCount; + } + + @Override + public Aggregations getAggregations() { + return aggregations; + } + + @Override + public int compareTo(Bucket other) { + if (this.geohashAsLong > other.geohashAsLong) { + return 1; + } + if (this.geohashAsLong < other.geohashAsLong) { + return -1; + } + return 0; + } + public Bucket reduce(List buckets, CacheRecycler cacheRecycler) { + if (buckets.size() == 1) { + return buckets.get(0); + } + Bucket reduced = null; + List aggregationsList = new ArrayList(buckets.size()); + for (Bucket bucket : buckets) { + if (reduced == null) { + reduced = bucket; + } else { + reduced.docCount += bucket.docCount; + } + aggregationsList.add(bucket.aggregations); + } + reduced.aggregations = InternalAggregations.reduce(aggregationsList, cacheRecycler); + return reduced; + } + + @Override + public long getInternalKey() { + return geohashAsLong; + } + + + } + + private int requiredSize; + private Collection buckets; + InternalGeoHashGrid() {} // for serialization + + public InternalGeoHashGrid(String name, int requiredSize, Collection buckets) { + super(name); + this.requiredSize=requiredSize; + this.buckets=buckets; + + } + + @Override + public Type type() { + return TYPE; + } + + static class BucketPriorityQueue extends PriorityQueue { + + public BucketPriorityQueue(int size) { + super(size); + } + + @Override + protected boolean lessThan(Bucket o1, Bucket o2) { + long i = o2.getDocCount() - o1.getDocCount(); + if (i == 0) { + i = o2.compareTo(o1); + if (i == 0) { + i = System.identityHashCode(o2) - System.identityHashCode(o1); + } + } + return i > 0 ? true : false; + } + } + + @Override + public InternalGeoHashGrid reduce(ReduceContext reduceContext) { + List aggregations = reduceContext.aggregations(); + if (aggregations.size() == 1) { + InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregations.get(0); + grid.trimExcessEntries(); + return grid; + } + InternalGeoHashGrid reduced = null; + + Recycler.V>> buckets = null; + for (InternalAggregation aggregation : aggregations) { + InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregation; + if (reduced == null) { + reduced = grid; + } + if (buckets == null) { + buckets = reduceContext.cacheRecycler().longObjectMap(grid.buckets.size()); + } + for (Bucket bucket : grid.buckets) { + List existingBuckets = buckets.v().get(bucket.geohashAsLong); + if (existingBuckets == null) { + existingBuckets = new ArrayList(aggregations.size()); + buckets.v().put(bucket.geohashAsLong, existingBuckets); + } + existingBuckets.add(bucket); + } + } + + if (reduced == null) { + // there are only unmapped terms, so we just return the first one (no need to reduce) + return (InternalGeoHashGrid) aggregations.get(0); + } + + // TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ? + final int size = Math.min(requiredSize, buckets.v().size()); + BucketPriorityQueue ordered = new BucketPriorityQueue(size); + Object[] internalBuckets = buckets.v().values; + boolean[] states = buckets.v().allocated; + for (int i = 0; i < states.length; i++) { + if (states[i]) { + List sameCellBuckets = (List) internalBuckets[i]; + ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext.cacheRecycler())); + } + } + buckets.release(); + Bucket[] list = new Bucket[ordered.size()]; + for (int i = ordered.size() - 1; i >= 0; i--) { + list[i] = (Bucket) ordered.pop(); + } + reduced.buckets = Arrays.asList(list); + return reduced; + } + + @Override + public Iterator iterator() { + Object o = buckets.iterator(); + return (Iterator) o; + } + + @Override + public int getNumberOfBuckets() { + return buckets.size(); + } + + + protected void trimExcessEntries() { + if (requiredSize >= buckets.size()) { + return; + } + + if (buckets instanceof List) { + buckets = ((List) buckets).subList(0, requiredSize); + return; + } + + int i = 0; + for (Iterator iter = buckets.iterator(); iter.hasNext();) { + iter.next(); + if (i++ >= requiredSize) { + iter.remove(); + } + } + } + + @Override + public void readFrom(StreamInput in) throws IOException { + this.name = in.readString(); + this.requiredSize = in.readVInt(); + int size = in.readVInt(); + List buckets = new ArrayList(size); + for (int i = 0; i < size; i++) { + buckets.add(new Bucket(in.readLong(), in.readVLong(), InternalAggregations.readAggregations(in))); + } + this.buckets = buckets; + this.bucketMap = null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeVInt(requiredSize); + out.writeVInt(buckets.size()); + for (Bucket bucket : buckets) { + out.writeLong(((Bucket) bucket).geohashAsLong); + out.writeVLong(bucket.getDocCount()); + ((InternalAggregations) bucket.getAggregations()).writeTo(out); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(name); + builder.startArray(CommonFields.BUCKETS); + for (Bucket bucket : buckets) { + builder.startObject(); + builder.field(CommonFields.KEY, ((Bucket) bucket).getGeoHash()); + builder.field(CommonFields.DOC_COUNT, bucket.getDocCount()); + ((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + +} diff --git a/src/test/java/org/elasticsearch/common/geo/GeoHashTests.java b/src/test/java/org/elasticsearch/common/geo/GeoHashTests.java new file mode 100644 index 00000000000..b57d06b9118 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/geo/GeoHashTests.java @@ -0,0 +1,60 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.geo; + +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + + + +/** + * Tests for {@link GeoHashUtils} + */ +public class GeoHashTests extends ElasticsearchTestCase { + + + @Test + public void testGeohashAsLongRoutines() { + + //Ensure that for all points at all supported levels of precision + // that the long encoding of a geohash is compatible with its + // String based counterpart + for (double lat=-90;lat<90;lat++) + { + for (double lng=-180;lng<180;lng++) + { + for(int p=1;p<=12;p++) + { + long geoAsLong = GeoHashUtils.encodeAsLong(lat,lng,p); + String geohash = GeoHashUtils.encode(lat,lng,p); + + String geohashFromLong=GeoHashUtils.toString(geoAsLong); + assertEquals(geohash, geohashFromLong); + GeoPoint pos=GeoHashUtils.decode(geohash); + GeoPoint pos2=GeoHashUtils.decode(geoAsLong); + assertEquals(pos, pos2); + } + } + + } + } + + +} diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/GeoHashGridTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/GeoHashGridTests.java new file mode 100644 index 00000000000..5383dbf6317 --- /dev/null +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/GeoHashGridTests.java @@ -0,0 +1,247 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import com.carrotsearch.hppc.ObjectIntMap; +import com.carrotsearch.hppc.ObjectIntOpenHashMap; +import com.carrotsearch.hppc.cursors.ObjectIntCursor; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.geo.GeoHashUtils; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.GeoBoundingBoxFilterBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.filter.Filter; +import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGrid; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.search.aggregations.AggregationBuilders.geohashGrid; +import static org.hamcrest.Matchers.equalTo; + +/** + * + */ +public class GeoHashGridTests extends ElasticsearchIntegrationTest { + + @Override + public Settings indexSettings() { + return ImmutableSettings.builder() + .put("index.number_of_shards", between(1, 5)) + .put("index.number_of_replicas", between(0, 1)) + .build(); + } + + private IndexRequestBuilder indexCity(String name, String latLon) throws Exception { + XContentBuilder source = jsonBuilder().startObject().field("city", name); + if (latLon != null) { + source = source.field("location", latLon); + } + source = source.endObject(); + return client().prepareIndex("idx", "type").setSource(source); + } + + + ObjectIntMapexpectedDocCountsForGeoHash=null; + int highestPrecisionGeohash=12; + int numRandomPoints=100; + + String smallestGeoHash=null; + + @Before + public void init() throws Exception { + prepareCreate("idx") + .addMapping("type", "location", "type=geo_point", "city", "type=string,index=not_analyzed") + .execute().actionGet(); + + createIndex("idx_unmapped"); + + List cities = new ArrayList(); + Random random = getRandom(); + expectedDocCountsForGeoHash=new ObjectIntOpenHashMap(numRandomPoints*2); + for (int i = 0; i < numRandomPoints; i++) { + //generate random point + double lat=(180d*random.nextDouble())-90d; + double lng=(360d*random.nextDouble())-180d; + String randomGeoHash=GeoHashUtils.encode(lat, lng,highestPrecisionGeohash); + //Index at the highest resolution + cities.add(indexCity(randomGeoHash, lat+", "+lng)); + expectedDocCountsForGeoHash.put(randomGeoHash, expectedDocCountsForGeoHash.getOrDefault(randomGeoHash, 0)+1); + //Update expected doc counts for all resolutions.. + for (int precision = highestPrecisionGeohash-1; precision >0; precision--) { + String hash=GeoHashUtils.encode(lat, lng,precision); + if((smallestGeoHash==null)||(hash.length() cursor : expectedDocCountsForGeoHash) { + if(cursor.key.length()==precision) + { + expectedBucketCount=Math.max(expectedBucketCount, cursor.value); + } + } + assertNotSame(bucketCount, 0); + assertEquals("Geohash "+geohash+" has wrong doc count ", + expectedBucketCount,bucketCount); + } + } + } + +}