A GeoHashGrid aggregation that buckets GeoPoints into cells whose dimensions are determined by a choice of GeoHash resolution.
Added a long-based representation of GeoHashes to GeoHashUtils for fast evaluation in aggregations. The new BucketUtils provides a common heuristic for determining the number of results to obtain from each shard in "top N" type requests.
This commit is contained in:
parent
abf68c472e
commit
602de04692
|
@ -21,3 +21,5 @@ include::bucket/histogram-aggregation.asciidoc[]
|
|||
include::bucket/datehistogram-aggregation.asciidoc[]
|
||||
|
||||
include::bucket/geodistance-aggregation.asciidoc[]
|
||||
|
||||
include::bucket/geohashgrid-aggregation.asciidoc[]
|
|
@ -0,0 +1,116 @@
|
|||
[[search-aggregations-bucket-geohashgrid-aggregation]]
|
||||
=== GeoHash grid
|
||||
|
||||
A multi-bucket aggregation that works on `geo_point` fields and groups points into buckets that represent cells in a grid.
|
||||
The resulting grid can be sparse and only contains cells that have matching data. Each cell is labeled using a http://en.wikipedia.org/wiki/Geohash[geohash] which is of user-definable precision.
|
||||
|
||||
* High precision geohashes have a long string length and represent cells that cover only a small area.
|
||||
* Low precision geohashes have a short string length and represent cells that each cover a large area.
|
||||
|
||||
Geohashes used in this aggregation can have a choice of precision between 1 and 12.
|
||||
|
||||
WARNING: The highest-precision geohash of length 12 produces cells that cover less than a square metre of land and so high-precision requests can be very costly in terms of RAM and result sizes.
|
||||
Please see the example below on how to first filter the aggregation to a smaller geographic area before requesting high-levels of detail.
|
||||
|
||||
The specified field must be of type `geo_point` (which can only be set explicitly in the mappings) and it can also hold an array of `geo_point` fields, in which case all points will be taken into account during aggregation.
|
||||
|
||||
|
||||
==== Simple low-precision request
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"aggregations" : {
|
||||
"myLarge-GrainGeoHashGrid" : {
|
||||
"geohashgrid" : {
|
||||
"field" : "location",
|
||||
"precision" : 3
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
Response:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"aggregations": {
|
||||
"myLarge-GrainGeoHashGrid": {
|
||||
"buckets": [
|
||||
{
|
||||
"key": "svz",
|
||||
"doc_count": 10964
|
||||
},
|
||||
{
|
||||
"key": "sv8",
|
||||
"doc_count": 3198
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
|
||||
|
||||
==== High-precision requests
|
||||
|
||||
When requesting detailed buckets (typically for displaying a "zoomed in" map) a filter like <<query-dsl-geo-bounding-box-filter,geo_bounding_box>> should be applied to narrow the subject area otherwise potentially millions of buckets will be created and returned.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"aggregations" : {
|
||||
"zoomedInView" : {
|
||||
"filter" : {
|
||||
"geo_bounding_box" : {
|
||||
"location" : {
|
||||
"top_left" : "51.73, 0.9",
|
||||
"bottom_right" : "51.55, 1.1"
|
||||
}
|
||||
}
|
||||
},
|
||||
"aggregations":{
|
||||
"zoom1":{
|
||||
"geohashgrid" : {
|
||||
"field":"location",
|
||||
"precision":8,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
=== Cell dimensions at the equator
|
||||
The table below shows the metric dimensions for cells covered by various string lengths of geohash.
|
||||
Cell dimensions vary with latitude and so the table is for the worst-case scenario at the equator.
|
||||
[horizontal]
|
||||
*GeoHash length*:: *Area width x height*
|
||||
1:: 5,009.4km x 4,992.6km
|
||||
2:: 1,252.3km x 624.1km
|
||||
3:: 156.5km x 156km
|
||||
4:: 39.1km x 19.5km
|
||||
5:: 4.9km x 4.9km
|
||||
6:: 1.2km x 609.4m
|
||||
7:: 152.9m x 152.4m
|
||||
8:: 38.2m x 19m
|
||||
9:: 4.8m x 4.8m
|
||||
10:: 1.2m x 59.5cm
|
||||
11:: 14.9cm x 14.9cm
|
||||
12:: 3.7cm x 1.9cm
|
||||
|
||||
|
||||
|
||||
=== Options
|
||||
|
||||
[horizontal]
|
||||
field:: Mandatory. The name of the field indexed with GeoPoints.
|
||||
precision:: Optional. The string length of the geohashes used to define cells/buckets in the results. Defaults to 5.
|
||||
size:: Optional. The maximum number of geohash buckets to return (defaults to 10,000). When results are trimmed, buckets are prioritised based on the volumes of documents they contain.
|
||||
shard_size:: Optional. To allow for more accurate counting of the top cells returned in the final result the aggregation defaults to returning max(10,(size x number-of-shards)) buckets from each shard. If this heuristic is undesirable, the number considered from each shard can be over-ridden using this parameter.
|
||||
|
||||
|
|
@ -338,4 +338,146 @@ public class GeoHashUtils {
|
|||
}
|
||||
return interval;
|
||||
}
|
||||
|
||||
//========== long-based encodings for geohashes ========================================
|
||||
|
||||
|
||||
/**
|
||||
* Encodes latitude and longitude information into a single long with variable precision.
|
||||
* Up to 12 levels of precision are supported which should offer sub-metre resolution.
|
||||
*
|
||||
* @param latitude
|
||||
* @param longitude
|
||||
* @param precision The required precision between 1 and 12
|
||||
* @return A single long where 4 bits are used for holding the precision and the remaining
|
||||
* 60 bits are reserved for 5 bit cell identifiers giving up to 12 layers.
|
||||
*/
|
||||
public static long encodeAsLong(double latitude, double longitude, int precision) {
|
||||
if((precision>12)||(precision<1))
|
||||
{
|
||||
throw new ElasticsearchIllegalArgumentException("Illegal precision length of "+precision+
|
||||
". Long-based geohashes only support precisions between 1 and 12");
|
||||
}
|
||||
double latInterval0 = -90.0;
|
||||
double latInterval1 = 90.0;
|
||||
double lngInterval0 = -180.0;
|
||||
double lngInterval1 = 180.0;
|
||||
|
||||
long geohash = 0l;
|
||||
boolean isEven = true;
|
||||
|
||||
int bit = 0;
|
||||
int ch = 0;
|
||||
|
||||
int geohashLength=0;
|
||||
while (geohashLength < precision) {
|
||||
double mid = 0.0;
|
||||
if (isEven) {
|
||||
mid = (lngInterval0 + lngInterval1) / 2D;
|
||||
if (longitude > mid) {
|
||||
ch |= BITS[bit];
|
||||
lngInterval0 = mid;
|
||||
} else {
|
||||
lngInterval1 = mid;
|
||||
}
|
||||
} else {
|
||||
mid = (latInterval0 + latInterval1) / 2D;
|
||||
if (latitude > mid) {
|
||||
ch |= BITS[bit];
|
||||
latInterval0 = mid;
|
||||
} else {
|
||||
latInterval1 = mid;
|
||||
}
|
||||
}
|
||||
|
||||
isEven = !isEven;
|
||||
|
||||
if (bit < 4) {
|
||||
bit++;
|
||||
} else {
|
||||
geohashLength++;
|
||||
geohash|=ch;
|
||||
if(geohashLength<precision){
|
||||
geohash<<=5;
|
||||
}
|
||||
bit = 0;
|
||||
ch = 0;
|
||||
}
|
||||
}
|
||||
geohash<<=4;
|
||||
geohash|=precision;
|
||||
return geohash;
|
||||
}
|
||||
|
||||
/**
|
||||
* Formats a geohash held as a long as a more conventional
|
||||
* String-based geohash
|
||||
* @param geohashAsLong a geohash encoded as a long
|
||||
* @return A traditional base32-based String representation of a geohash
|
||||
*/
|
||||
public static String toString(long geohashAsLong)
|
||||
{
|
||||
int precision= (int) (geohashAsLong&15);
|
||||
char[] chars=new char[precision];
|
||||
geohashAsLong>>=4;
|
||||
for (int i = precision-1; i >=0 ; i--) {
|
||||
chars[i]= BASE_32[(int) (geohashAsLong&31)];
|
||||
geohashAsLong>>=5;
|
||||
}
|
||||
return new String(chars);
|
||||
}
|
||||
|
||||
|
||||
|
||||
public static GeoPoint decode(long geohash) {
|
||||
GeoPoint point = new GeoPoint();
|
||||
decode(geohash, point);
|
||||
return point;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decodes the given long-format geohash into a latitude and longitude
|
||||
*
|
||||
* @param geohash long format Geohash to decode
|
||||
* @param ret The Geopoint into which the latitude and longitude will be stored
|
||||
*/
|
||||
public static void decode(long geohash, GeoPoint ret) {
|
||||
double[] interval = decodeCell(geohash);
|
||||
ret.reset((interval[0] + interval[1]) / 2D, (interval[2] + interval[3]) / 2D);
|
||||
|
||||
}
|
||||
|
||||
private static double[] decodeCell(long geohash) {
|
||||
double[] interval = {-90.0, 90.0, -180.0, 180.0};
|
||||
boolean isEven = true;
|
||||
|
||||
int precision= (int) (geohash&15);
|
||||
geohash>>=4;
|
||||
int[]cds=new int[precision];
|
||||
for (int i = precision-1; i >=0 ; i--) {
|
||||
cds[i] = (int) (geohash&31);
|
||||
geohash>>=5;
|
||||
}
|
||||
|
||||
for (int i = 0; i <cds.length ; i++) {
|
||||
final int cd = cds[i];
|
||||
for (int mask : BITS) {
|
||||
if (isEven) {
|
||||
if ((cd & mask) != 0) {
|
||||
interval[2] = (interval[2] + interval[3]) / 2D;
|
||||
} else {
|
||||
interval[3] = (interval[2] + interval[3]) / 2D;
|
||||
}
|
||||
} else {
|
||||
if ((cd & mask) != 0) {
|
||||
interval[0] = (interval[0] + interval[1]) / 2D;
|
||||
} else {
|
||||
interval[1] = (interval[0] + interval[1]) / 2D;
|
||||
}
|
||||
}
|
||||
isEven = !isEven;
|
||||
}
|
||||
}
|
||||
return interval;
|
||||
}
|
||||
}
|
|
@ -18,24 +18,25 @@
|
|||
*/
|
||||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.GlobalBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.missing.MissingBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.nested.NestedBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.range.RangeBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanceBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.range.ipv4.IPv4RangeBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.GlobalBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.missing.MissingBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.nested.NestedBuilder;
|
||||
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountBuilder;
|
||||
import org.elasticsearch.search.aggregations.metrics.avg.AvgBuilder;
|
||||
import org.elasticsearch.search.aggregations.metrics.max.MaxBuilder;
|
||||
import org.elasticsearch.search.aggregations.metrics.min.MinBuilder;
|
||||
import org.elasticsearch.search.aggregations.metrics.stats.StatsBuilder;
|
||||
import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStatsBuilder;
|
||||
import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
|
||||
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountBuilder;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -97,6 +98,10 @@ public class AggregationBuilders {
|
|||
return new HistogramBuilder(name);
|
||||
}
|
||||
|
||||
public static GeoHashGridBuilder geohashGrid(String name) {
|
||||
return new GeoHashGridBuilder(name);
|
||||
}
|
||||
|
||||
public static DateHistogramBuilder dateHistogram(String name) {
|
||||
return new DateHistogramBuilder(name);
|
||||
}
|
||||
|
|
|
@ -22,24 +22,25 @@ package org.elasticsearch.search.aggregations;
|
|||
import com.google.common.collect.Lists;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.multibindings.Multibinder;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.FilterParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.GlobalParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.missing.MissingParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.nested.NestedParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.range.RangeParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanceParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.range.ipv4.IpRangeParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.TermsParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.FilterParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.GlobalParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.missing.MissingParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.nested.NestedParser;
|
||||
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser;
|
||||
import org.elasticsearch.search.aggregations.metrics.avg.AvgParser;
|
||||
import org.elasticsearch.search.aggregations.metrics.max.MaxParser;
|
||||
import org.elasticsearch.search.aggregations.metrics.min.MinParser;
|
||||
import org.elasticsearch.search.aggregations.metrics.stats.StatsParser;
|
||||
import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStatsParser;
|
||||
import org.elasticsearch.search.aggregations.metrics.sum.SumParser;
|
||||
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
@ -69,6 +70,7 @@ public class AggregationModule extends AbstractModule {
|
|||
parsers.add(HistogramParser.class);
|
||||
parsers.add(DateHistogramParser.class);
|
||||
parsers.add(GeoDistanceParser.class);
|
||||
parsers.add(GeoHashGridParser.class);
|
||||
parsers.add(NestedParser.class);
|
||||
}
|
||||
|
||||
|
|
|
@ -20,8 +20,13 @@
|
|||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
|
||||
import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing;
|
||||
import org.elasticsearch.search.aggregations.bucket.nested.InternalNested;
|
||||
import org.elasticsearch.search.aggregations.bucket.range.InternalRange;
|
||||
import org.elasticsearch.search.aggregations.bucket.range.date.InternalDateRange;
|
||||
import org.elasticsearch.search.aggregations.bucket.range.geodistance.InternalGeoDistance;
|
||||
|
@ -30,17 +35,13 @@ import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
|
|||
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.UnmappedTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
|
||||
import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing;
|
||||
import org.elasticsearch.search.aggregations.bucket.nested.InternalNested;
|
||||
import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount;
|
||||
import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
|
||||
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
|
||||
import org.elasticsearch.search.aggregations.metrics.min.InternalMin;
|
||||
import org.elasticsearch.search.aggregations.metrics.stats.InternalStats;
|
||||
import org.elasticsearch.search.aggregations.metrics.stats.extended.InternalExtendedStats;
|
||||
import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
|
||||
import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount;
|
||||
|
||||
/**
|
||||
* A module that registers all the transport streams for the addAggregation
|
||||
|
@ -65,6 +66,7 @@ public class TransportAggregationModule extends AbstractModule {
|
|||
InternalMissing.registerStreams();
|
||||
StringTerms.registerStreams();
|
||||
LongTerms.registerStreams();
|
||||
InternalGeoHashGrid.registerStreams();
|
||||
DoubleTerms.registerStreams();
|
||||
UnmappedTerms.registerStreams();
|
||||
InternalRange.registerStream();
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.search.aggregations.bucket;
|
||||
|
||||
/**
|
||||
* Helper functions for common Bucketing functions
|
||||
*/
|
||||
public class BucketUtils {
|
||||
|
||||
/**
|
||||
* Heuristic used to determine the size of shard-side PriorityQueues when
|
||||
* selecting the top N terms from a distributed index.
|
||||
*
|
||||
* @param finalSize
|
||||
* The number of terms required in the final reduce phase.
|
||||
* @param numberOfShards
|
||||
* The number of shards being queried.
|
||||
* @return A suggested default for the size of any shard-side PriorityQueues
|
||||
*/
|
||||
public static int suggestShardSideQueueSize(int finalSize, int numberOfShards) {
|
||||
assert numberOfShards >= 1;
|
||||
if (numberOfShards == 1) {
|
||||
return finalSize;
|
||||
}
|
||||
//Cap the multiplier used for shards to avoid excessive data transfer
|
||||
final int shardSampleSize = finalSize * Math.min(10, numberOfShards);
|
||||
// When finalSize is very small e.g. 1 and there is a low number of
|
||||
// shards then we need to ensure we still gather a reasonable sample of statistics from each
|
||||
// shard (at low cost) to improve the chances of the final result being accurate.
|
||||
return Math.max(10, shardSampleSize);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.bucket.geogrid;
|
||||
|
||||
import org.elasticsearch.common.geo.GeoPoint;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
|
||||
/**
|
||||
* Represents the results of a GeoHashGrid aggregation
|
||||
*/
|
||||
public interface GeoHashGrid extends Iterable<GeoHashGrid.Bucket>, Aggregation {
|
||||
|
||||
public static interface Bucket extends org.elasticsearch.search.aggregations.bucket.Bucket {
|
||||
|
||||
String getGeoHash();
|
||||
|
||||
GeoPoint getGeoPoint();
|
||||
|
||||
long getInternalKey();
|
||||
|
||||
}
|
||||
|
||||
int getNumberOfBuckets();
|
||||
|
||||
}
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.bucket.geogrid;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.index.fielddata.LongValues;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
|
||||
import org.elasticsearch.search.aggregations.bucket.LongHash;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||
import org.elasticsearch.search.aggregations.support.numeric.NumericValuesSource;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* Aggregates data expressed as GeoHash longs (for efficiency's sake) but formats results as Geohash strings.
|
||||
*
|
||||
*/
|
||||
|
||||
public class GeoHashGridAggregator extends BucketsAggregator {
|
||||
|
||||
private static final int INITIAL_CAPACITY = 50; // TODO sizing
|
||||
|
||||
private final int requiredSize;
|
||||
private final int shardSize;
|
||||
private final NumericValuesSource valuesSource;
|
||||
private final LongHash bucketOrds;
|
||||
|
||||
public GeoHashGridAggregator(String name, AggregatorFactories factories, NumericValuesSource valuesSource,
|
||||
int requiredSize, int shardSize, AggregationContext aggregationContext, Aggregator parent) {
|
||||
super(name, BucketAggregationMode.PER_BUCKET, factories, INITIAL_CAPACITY, aggregationContext, parent);
|
||||
this.valuesSource = valuesSource;
|
||||
this.requiredSize = requiredSize;
|
||||
this.shardSize = shardSize;
|
||||
bucketOrds = new LongHash(INITIAL_CAPACITY,aggregationContext.pageCacheRecycler());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCollect() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(int doc, long owningBucketOrdinal) throws IOException {
|
||||
assert owningBucketOrdinal == 0;
|
||||
final LongValues values = valuesSource.longValues();
|
||||
final int valuesCount = values.setDocument(doc);
|
||||
|
||||
for (int i = 0; i < valuesCount; ++i) {
|
||||
final long val = values.nextValue();
|
||||
long bucketOrdinal = bucketOrds.add(val);
|
||||
if (bucketOrdinal < 0) { // already seen
|
||||
bucketOrdinal = - 1 - bucketOrdinal;
|
||||
}
|
||||
collectBucket(doc, bucketOrdinal);
|
||||
}
|
||||
}
|
||||
|
||||
// private impl that stores a bucket ord. This allows for computing the aggregations lazily.
|
||||
static class OrdinalBucket extends InternalGeoHashGrid.Bucket {
|
||||
|
||||
long bucketOrd;
|
||||
|
||||
public OrdinalBucket() {
|
||||
super(0, 0, (InternalAggregations) null);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public InternalGeoHashGrid buildAggregation(long owningBucketOrdinal) {
|
||||
assert owningBucketOrdinal == 0;
|
||||
final int size = (int) Math.min(bucketOrds.size(), shardSize);
|
||||
|
||||
InternalGeoHashGrid.BucketPriorityQueue ordered = new InternalGeoHashGrid.BucketPriorityQueue(size);
|
||||
OrdinalBucket spare = null;
|
||||
for (long i = 0; i < bucketOrds.capacity(); ++i) {
|
||||
final long ord = bucketOrds.id(i);
|
||||
if (ord < 0) {
|
||||
// slot is not allocated
|
||||
continue;
|
||||
}
|
||||
|
||||
if (spare == null) {
|
||||
spare = new OrdinalBucket();
|
||||
}
|
||||
spare.geohashAsLong = bucketOrds.key(i);
|
||||
spare.docCount = bucketDocCount(ord);
|
||||
spare.bucketOrd = ord;
|
||||
spare = (OrdinalBucket) ordered.insertWithOverflow(spare);
|
||||
}
|
||||
|
||||
final InternalGeoHashGrid.Bucket[] list = new InternalGeoHashGrid.Bucket[ordered.size()];
|
||||
for (int i = ordered.size() - 1; i >= 0; --i) {
|
||||
final OrdinalBucket bucket = (OrdinalBucket) ordered.pop();
|
||||
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
|
||||
list[i] = bucket;
|
||||
}
|
||||
return new InternalGeoHashGrid(name, requiredSize, Arrays.asList(list));
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalGeoHashGrid buildEmptyAggregation() {
|
||||
return new InternalGeoHashGrid(name, requiredSize, Collections.<InternalGeoHashGrid.Bucket>emptyList());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void doRelease() {
|
||||
Releasables.release(bucketOrds);
|
||||
}
|
||||
|
||||
public static class Unmapped extends Aggregator {
|
||||
private int requiredSize;
|
||||
public Unmapped(String name, int requiredSize, AggregationContext aggregationContext, Aggregator parent) {
|
||||
|
||||
super(name, BucketAggregationMode.PER_BUCKET, AggregatorFactories.EMPTY, 0, aggregationContext, parent);
|
||||
this.requiredSize=requiredSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCollect() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(int doc, long owningBucketOrdinal) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalGeoHashGrid buildAggregation(long owningBucketOrdinal) {
|
||||
return (InternalGeoHashGrid) buildEmptyAggregation();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalGeoHashGrid buildEmptyAggregation() {
|
||||
return new InternalGeoHashGrid(name, requiredSize, Collections.<InternalGeoHashGrid.Bucket>emptyList());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.search.aggregations.bucket.geogrid;
|
||||
|
||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Creates an aggregation based on bucketing points into GeoHashes
|
||||
*
|
||||
*/
|
||||
public class GeoHashGridBuilder extends AggregationBuilder<GeoHashGridBuilder> {
|
||||
|
||||
|
||||
private String field;
|
||||
private int precision=GeoHashGridParser.DEFAULT_PRECISION;
|
||||
private int requiredSize=GeoHashGridParser.DEFAULT_MAX_NUM_CELLS;
|
||||
private int shardSize=0;
|
||||
|
||||
public GeoHashGridBuilder(String name) {
|
||||
super(name, InternalGeoHashGrid.TYPE.name());
|
||||
}
|
||||
|
||||
public GeoHashGridBuilder field(String field) {
|
||||
this.field = field;
|
||||
return this;
|
||||
}
|
||||
|
||||
public GeoHashGridBuilder precision(int precision) {
|
||||
if((precision<1)||(precision>12))
|
||||
{
|
||||
throw new ElasticsearchIllegalArgumentException("Invalid geohash aggregation precision of "+precision
|
||||
+"must be between 1 and 12");
|
||||
}
|
||||
this.precision = precision;
|
||||
return this;
|
||||
}
|
||||
public GeoHashGridBuilder size(int requiredSize) {
|
||||
this.requiredSize = requiredSize;
|
||||
return this;
|
||||
}
|
||||
public GeoHashGridBuilder shardSize(int shardSize) {
|
||||
this.shardSize = shardSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
if (field != null) {
|
||||
builder.field("field", field);
|
||||
}
|
||||
if (precision != GeoHashGridParser.DEFAULT_PRECISION) {
|
||||
builder.field("precision", precision);
|
||||
}
|
||||
if (requiredSize != GeoHashGridParser.DEFAULT_MAX_NUM_CELLS) {
|
||||
builder.field("size", requiredSize);
|
||||
}
|
||||
if (shardSize != 0) {
|
||||
builder.field("shard_size", shardSize);
|
||||
}
|
||||
|
||||
return builder.endObject();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,208 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.bucket.geogrid;
|
||||
|
||||
import org.elasticsearch.common.geo.GeoHashUtils;
|
||||
import org.elasticsearch.common.geo.GeoPoint;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.fielddata.*;
|
||||
import org.elasticsearch.index.mapper.FieldMapper;
|
||||
import org.elasticsearch.index.query.GeoBoundingBoxFilterBuilder;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.bucket.BucketUtils;
|
||||
import org.elasticsearch.search.aggregations.support.*;
|
||||
import org.elasticsearch.search.aggregations.support.geopoints.GeoPointValuesSource;
|
||||
import org.elasticsearch.search.aggregations.support.numeric.NumericValuesSource;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Aggregates Geo information into cells determined by geohashes of a given precision.
|
||||
* WARNING - for high-precision geohashes it may prove necessary to use a {@link GeoBoundingBoxFilterBuilder}
|
||||
* aggregation to focus in on a smaller area to avoid generating too many buckets and using too much RAM
|
||||
*
|
||||
*/
|
||||
public class GeoHashGridParser implements Aggregator.Parser {
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return InternalGeoHashGrid.TYPE.name();
|
||||
}
|
||||
|
||||
|
||||
public static final int DEFAULT_PRECISION=5;
|
||||
public static final int DEFAULT_MAX_NUM_CELLS=10000;
|
||||
|
||||
|
||||
@Override
|
||||
public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
|
||||
|
||||
String field = null;
|
||||
int precision=DEFAULT_PRECISION;
|
||||
int requiredSize=DEFAULT_MAX_NUM_CELLS;
|
||||
int shardSize=0;
|
||||
|
||||
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
if ("field".equals(currentFieldName)) {
|
||||
field = parser.text();
|
||||
}
|
||||
} else if (token == XContentParser.Token.VALUE_NUMBER) {
|
||||
if ("precision".equals(currentFieldName)) {
|
||||
precision = parser.intValue();
|
||||
}else if ("size".equals(currentFieldName)) {
|
||||
requiredSize = parser.intValue();
|
||||
}else if ("shard_size".equals(currentFieldName)) {
|
||||
shardSize = parser.intValue();
|
||||
}else if ("shardSize".equals(currentFieldName)) {
|
||||
shardSize = parser.intValue();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
if(shardSize==0)
|
||||
{
|
||||
//Use default heuristic to avoid any wrong-ranking caused by distributed counting
|
||||
shardSize=BucketUtils.suggestShardSideQueueSize(requiredSize,context.numberOfShards());
|
||||
}
|
||||
|
||||
ValuesSourceConfig<GeoPointValuesSource> config = new ValuesSourceConfig<GeoPointValuesSource>(GeoPointValuesSource.class);
|
||||
if (field == null) {
|
||||
return new GeoGridFactory(aggregationName, config,precision,requiredSize,shardSize);
|
||||
}
|
||||
|
||||
FieldMapper<?> mapper = context.smartNameFieldMapper(field);
|
||||
if (mapper == null) {
|
||||
config.unmapped(true);
|
||||
return new GeoGridFactory(aggregationName, config,precision,requiredSize,shardSize);
|
||||
}
|
||||
|
||||
IndexFieldData<?> indexFieldData = context.fieldData().getForField(mapper);
|
||||
config.fieldContext(new FieldContext(field, indexFieldData));
|
||||
return new GeoGridFactory(aggregationName, config,precision,requiredSize,shardSize);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
private static class GeoGridFactory extends ValueSourceAggregatorFactory<GeoPointValuesSource> {
|
||||
|
||||
private int precision;
|
||||
private int requiredSize;
|
||||
private int shardSize;
|
||||
|
||||
public GeoGridFactory(String name, ValuesSourceConfig<GeoPointValuesSource> valueSourceConfig,
|
||||
int precision,int requiredSize,int shardSize) {
|
||||
super(name, InternalGeoHashGrid.TYPE.name(), valueSourceConfig);
|
||||
this.precision=precision;
|
||||
this.requiredSize=requiredSize;
|
||||
this.shardSize=shardSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent) {
|
||||
return new GeoHashGridAggregator.Unmapped(name, requiredSize, aggregationContext, parent);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Aggregator create(final GeoPointValuesSource valuesSource, long expectedBucketsCount, AggregationContext aggregationContext, Aggregator parent) {
|
||||
final CellValues cellIdValues = new CellValues(valuesSource,precision);
|
||||
FieldDataSource.Numeric cellIdSource = new CellIdSource(cellIdValues, valuesSource.metaData());
|
||||
if (cellIdSource.metaData().multiValued()) {
|
||||
// we need to wrap to ensure uniqueness
|
||||
cellIdSource = new FieldDataSource.Numeric.SortedAndUnique(cellIdSource);
|
||||
}
|
||||
final NumericValuesSource geohashIdSource = new NumericValuesSource(cellIdSource,null,null);
|
||||
return new GeoHashGridAggregator(name, factories, geohashIdSource, requiredSize,
|
||||
shardSize, aggregationContext, parent);
|
||||
|
||||
}
|
||||
|
||||
private static class CellValues extends LongValues {
|
||||
|
||||
private GeoPointValuesSource geoPointValues;
|
||||
private GeoPointValues geoValues;
|
||||
private int precision;
|
||||
|
||||
protected CellValues(GeoPointValuesSource geoPointValues, int precision ) {
|
||||
super(true);
|
||||
this.geoPointValues = geoPointValues;
|
||||
this.precision=precision;
|
||||
}
|
||||
@Override
|
||||
public int setDocument(int docId) {
|
||||
geoValues = geoPointValues.values();
|
||||
return geoValues.setDocument(docId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long nextValue() {
|
||||
GeoPoint target = geoValues.nextValue();
|
||||
return GeoHashUtils.encodeAsLong(target.getLat(), target.getLon(), precision);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class CellIdSource extends FieldDataSource.Numeric {
|
||||
private final LongValues values;
|
||||
private MetaData metaData;
|
||||
|
||||
public CellIdSource(LongValues values, MetaData delegate) {
|
||||
this.values = values;
|
||||
//different GeoPoints could map to the same or different geohash cells.
|
||||
this.metaData = MetaData.builder(delegate).uniqueness(MetaData.Uniqueness.UNKNOWN).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFloatingPoint() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongValues longValues() {
|
||||
return values;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleValues doubleValues() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BytesValues bytesValues() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetaData metaData() {
|
||||
return metaData;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,292 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.bucket.geogrid;
|
||||
|
||||
import com.carrotsearch.hppc.LongObjectOpenHashMap;
|
||||
import org.apache.lucene.util.PriorityQueue;
|
||||
import org.elasticsearch.cache.recycler.CacheRecycler;
|
||||
import org.elasticsearch.common.geo.GeoHashUtils;
|
||||
import org.elasticsearch.common.geo.GeoPoint;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.recycler.Recycler;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Represents a grid of cells where each cell's location is determined by a geohash.
|
||||
* All geohashes in a grid are of the same precision and held internally as a single long
|
||||
* for efficiency's sake.
|
||||
*/
|
||||
public class InternalGeoHashGrid extends InternalAggregation implements GeoHashGrid{
|
||||
|
||||
public static final Type TYPE = new Type("geohashgrid", "ghcells");
|
||||
protected Map<String, Bucket> bucketMap;
|
||||
|
||||
public static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
|
||||
@Override
|
||||
public InternalGeoHashGrid readResult(StreamInput in) throws IOException {
|
||||
InternalGeoHashGrid buckets = new InternalGeoHashGrid();
|
||||
buckets.readFrom(in);
|
||||
return buckets;
|
||||
}
|
||||
};
|
||||
|
||||
public static void registerStreams() {
|
||||
AggregationStreams.registerStream(STREAM, TYPE.stream());
|
||||
}
|
||||
|
||||
|
||||
static class Bucket implements GeoHashGrid.Bucket , Comparable<Bucket> {
|
||||
|
||||
protected long geohashAsLong;
|
||||
protected long docCount;
|
||||
protected InternalAggregations aggregations;
|
||||
|
||||
public Bucket(long geohashAsLong, long docCount, InternalAggregations aggregations) {
|
||||
this.docCount=docCount;
|
||||
this.aggregations=aggregations;
|
||||
this.geohashAsLong = geohashAsLong;
|
||||
}
|
||||
|
||||
public GeoPoint getGeoPoint() {
|
||||
return GeoHashUtils.decode(geohashAsLong);
|
||||
}
|
||||
|
||||
|
||||
public String getGeoHash() {
|
||||
return GeoHashUtils.toString(geohashAsLong);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDocCount() {
|
||||
return docCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregations getAggregations() {
|
||||
return aggregations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Bucket other) {
|
||||
if (this.geohashAsLong > other.geohashAsLong) {
|
||||
return 1;
|
||||
}
|
||||
if (this.geohashAsLong < other.geohashAsLong) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
public Bucket reduce(List<? extends Bucket> buckets, CacheRecycler cacheRecycler) {
|
||||
if (buckets.size() == 1) {
|
||||
return buckets.get(0);
|
||||
}
|
||||
Bucket reduced = null;
|
||||
List<InternalAggregations> aggregationsList = new ArrayList<InternalAggregations>(buckets.size());
|
||||
for (Bucket bucket : buckets) {
|
||||
if (reduced == null) {
|
||||
reduced = bucket;
|
||||
} else {
|
||||
reduced.docCount += bucket.docCount;
|
||||
}
|
||||
aggregationsList.add(bucket.aggregations);
|
||||
}
|
||||
reduced.aggregations = InternalAggregations.reduce(aggregationsList, cacheRecycler);
|
||||
return reduced;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getInternalKey() {
|
||||
return geohashAsLong;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
private int requiredSize;
|
||||
private Collection<Bucket> buckets;
|
||||
InternalGeoHashGrid() {} // for serialization
|
||||
|
||||
public InternalGeoHashGrid(String name, int requiredSize, Collection<Bucket> buckets) {
|
||||
super(name);
|
||||
this.requiredSize=requiredSize;
|
||||
this.buckets=buckets;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
static class BucketPriorityQueue extends PriorityQueue<Bucket> {
|
||||
|
||||
public BucketPriorityQueue(int size) {
|
||||
super(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean lessThan(Bucket o1, Bucket o2) {
|
||||
long i = o2.getDocCount() - o1.getDocCount();
|
||||
if (i == 0) {
|
||||
i = o2.compareTo(o1);
|
||||
if (i == 0) {
|
||||
i = System.identityHashCode(o2) - System.identityHashCode(o1);
|
||||
}
|
||||
}
|
||||
return i > 0 ? true : false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalGeoHashGrid reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregations.get(0);
|
||||
grid.trimExcessEntries();
|
||||
return grid;
|
||||
}
|
||||
InternalGeoHashGrid reduced = null;
|
||||
|
||||
Recycler.V<LongObjectOpenHashMap<List<Bucket>>> buckets = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregation;
|
||||
if (reduced == null) {
|
||||
reduced = grid;
|
||||
}
|
||||
if (buckets == null) {
|
||||
buckets = reduceContext.cacheRecycler().longObjectMap(grid.buckets.size());
|
||||
}
|
||||
for (Bucket bucket : grid.buckets) {
|
||||
List<Bucket> existingBuckets = buckets.v().get(bucket.geohashAsLong);
|
||||
if (existingBuckets == null) {
|
||||
existingBuckets = new ArrayList<Bucket>(aggregations.size());
|
||||
buckets.v().put(bucket.geohashAsLong, existingBuckets);
|
||||
}
|
||||
existingBuckets.add(bucket);
|
||||
}
|
||||
}
|
||||
|
||||
if (reduced == null) {
|
||||
// there are only unmapped terms, so we just return the first one (no need to reduce)
|
||||
return (InternalGeoHashGrid) aggregations.get(0);
|
||||
}
|
||||
|
||||
// TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ?
|
||||
final int size = Math.min(requiredSize, buckets.v().size());
|
||||
BucketPriorityQueue ordered = new BucketPriorityQueue(size);
|
||||
Object[] internalBuckets = buckets.v().values;
|
||||
boolean[] states = buckets.v().allocated;
|
||||
for (int i = 0; i < states.length; i++) {
|
||||
if (states[i]) {
|
||||
List<Bucket> sameCellBuckets = (List<Bucket>) internalBuckets[i];
|
||||
ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext.cacheRecycler()));
|
||||
}
|
||||
}
|
||||
buckets.release();
|
||||
Bucket[] list = new Bucket[ordered.size()];
|
||||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
||||
list[i] = (Bucket) ordered.pop();
|
||||
}
|
||||
reduced.buckets = Arrays.asList(list);
|
||||
return reduced;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<GeoHashGrid.Bucket> iterator() {
|
||||
Object o = buckets.iterator();
|
||||
return (Iterator<GeoHashGrid.Bucket>) o;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumberOfBuckets() {
|
||||
return buckets.size();
|
||||
}
|
||||
|
||||
|
||||
protected void trimExcessEntries() {
|
||||
if (requiredSize >= buckets.size()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (buckets instanceof List) {
|
||||
buckets = ((List) buckets).subList(0, requiredSize);
|
||||
return;
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
for (Iterator<Bucket> iter = buckets.iterator(); iter.hasNext();) {
|
||||
iter.next();
|
||||
if (i++ >= requiredSize) {
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
this.name = in.readString();
|
||||
this.requiredSize = in.readVInt();
|
||||
int size = in.readVInt();
|
||||
List<Bucket> buckets = new ArrayList<Bucket>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
buckets.add(new Bucket(in.readLong(), in.readVLong(), InternalAggregations.readAggregations(in)));
|
||||
}
|
||||
this.buckets = buckets;
|
||||
this.bucketMap = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(name);
|
||||
out.writeVInt(requiredSize);
|
||||
out.writeVInt(buckets.size());
|
||||
for (Bucket bucket : buckets) {
|
||||
out.writeLong(((Bucket) bucket).geohashAsLong);
|
||||
out.writeVLong(bucket.getDocCount());
|
||||
((InternalAggregations) bucket.getAggregations()).writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(name);
|
||||
builder.startArray(CommonFields.BUCKETS);
|
||||
for (Bucket bucket : buckets) {
|
||||
builder.startObject();
|
||||
builder.field(CommonFields.KEY, ((Bucket) bucket).getGeoHash());
|
||||
builder.field(CommonFields.DOC_COUNT, bucket.getDocCount());
|
||||
((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params);
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.geo;
|
||||
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Tests for {@link GeoHashUtils}
|
||||
*/
|
||||
public class GeoHashTests extends ElasticsearchTestCase {
|
||||
|
||||
|
||||
@Test
|
||||
public void testGeohashAsLongRoutines() {
|
||||
|
||||
//Ensure that for all points at all supported levels of precision
|
||||
// that the long encoding of a geohash is compatible with its
|
||||
// String based counterpart
|
||||
for (double lat=-90;lat<90;lat++)
|
||||
{
|
||||
for (double lng=-180;lng<180;lng++)
|
||||
{
|
||||
for(int p=1;p<=12;p++)
|
||||
{
|
||||
long geoAsLong = GeoHashUtils.encodeAsLong(lat,lng,p);
|
||||
String geohash = GeoHashUtils.encode(lat,lng,p);
|
||||
|
||||
String geohashFromLong=GeoHashUtils.toString(geoAsLong);
|
||||
assertEquals(geohash, geohashFromLong);
|
||||
GeoPoint pos=GeoHashUtils.decode(geohash);
|
||||
GeoPoint pos2=GeoHashUtils.decode(geoAsLong);
|
||||
assertEquals(pos, pos2);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,247 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.bucket;
|
||||
|
||||
import com.carrotsearch.hppc.ObjectIntMap;
|
||||
import com.carrotsearch.hppc.ObjectIntOpenHashMap;
|
||||
import com.carrotsearch.hppc.cursors.ObjectIntCursor;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.geo.GeoHashUtils;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.query.GeoBoundingBoxFilterBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
|
||||
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGrid;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.geohashGrid;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class GeoHashGridTests extends ElasticsearchIntegrationTest {
|
||||
|
||||
@Override
|
||||
public Settings indexSettings() {
|
||||
return ImmutableSettings.builder()
|
||||
.put("index.number_of_shards", between(1, 5))
|
||||
.put("index.number_of_replicas", between(0, 1))
|
||||
.build();
|
||||
}
|
||||
|
||||
private IndexRequestBuilder indexCity(String name, String latLon) throws Exception {
|
||||
XContentBuilder source = jsonBuilder().startObject().field("city", name);
|
||||
if (latLon != null) {
|
||||
source = source.field("location", latLon);
|
||||
}
|
||||
source = source.endObject();
|
||||
return client().prepareIndex("idx", "type").setSource(source);
|
||||
}
|
||||
|
||||
|
||||
ObjectIntMap<String>expectedDocCountsForGeoHash=null;
|
||||
int highestPrecisionGeohash=12;
|
||||
int numRandomPoints=100;
|
||||
|
||||
String smallestGeoHash=null;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
prepareCreate("idx")
|
||||
.addMapping("type", "location", "type=geo_point", "city", "type=string,index=not_analyzed")
|
||||
.execute().actionGet();
|
||||
|
||||
createIndex("idx_unmapped");
|
||||
|
||||
List<IndexRequestBuilder> cities = new ArrayList<IndexRequestBuilder>();
|
||||
Random random = getRandom();
|
||||
expectedDocCountsForGeoHash=new ObjectIntOpenHashMap<String>(numRandomPoints*2);
|
||||
for (int i = 0; i < numRandomPoints; i++) {
|
||||
//generate random point
|
||||
double lat=(180d*random.nextDouble())-90d;
|
||||
double lng=(360d*random.nextDouble())-180d;
|
||||
String randomGeoHash=GeoHashUtils.encode(lat, lng,highestPrecisionGeohash);
|
||||
//Index at the highest resolution
|
||||
cities.add(indexCity(randomGeoHash, lat+", "+lng));
|
||||
expectedDocCountsForGeoHash.put(randomGeoHash, expectedDocCountsForGeoHash.getOrDefault(randomGeoHash, 0)+1);
|
||||
//Update expected doc counts for all resolutions..
|
||||
for (int precision = highestPrecisionGeohash-1; precision >0; precision--) {
|
||||
String hash=GeoHashUtils.encode(lat, lng,precision);
|
||||
if((smallestGeoHash==null)||(hash.length()<smallestGeoHash.length())) {
|
||||
smallestGeoHash=hash;
|
||||
}
|
||||
expectedDocCountsForGeoHash.put(hash, expectedDocCountsForGeoHash.getOrDefault(hash, 0)+1);
|
||||
}
|
||||
}
|
||||
indexRandom(true, cities);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void simple() throws Exception {
|
||||
for (int precision = 1; precision <= highestPrecisionGeohash; precision++) {
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(geohashGrid("geohashgrid")
|
||||
.field("location")
|
||||
.precision(precision)
|
||||
)
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
|
||||
GeoHashGrid geoGrid = response.getAggregations().get("geohashgrid");
|
||||
for (GeoHashGrid.Bucket cell : geoGrid ){
|
||||
String geohash=cell.getGeoHash();
|
||||
|
||||
long bucketCount=cell.getDocCount();
|
||||
int expectedBucketCount=expectedDocCountsForGeoHash.get(geohash);
|
||||
assertNotSame(bucketCount, 0);
|
||||
assertEquals("Geohash "+geohash+" has wrong doc count ",
|
||||
expectedBucketCount,bucketCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
@Test
|
||||
public void filtered() throws Exception {
|
||||
GeoBoundingBoxFilterBuilder bbox = new GeoBoundingBoxFilterBuilder("location");
|
||||
bbox.topLeft(smallestGeoHash).bottomRight(smallestGeoHash).filterName("bbox");
|
||||
for (int precision = 1; precision <= highestPrecisionGeohash; precision++) {
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(
|
||||
AggregationBuilders.filter("filtered").filter(bbox)
|
||||
.subAggregation(
|
||||
geohashGrid("geohashgrid")
|
||||
.field("location")
|
||||
.precision(precision)
|
||||
)
|
||||
)
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
|
||||
|
||||
Filter filter =response.getAggregations().get("filtered");
|
||||
|
||||
GeoHashGrid geoGrid = filter.getAggregations().get("geohashgrid");
|
||||
for (GeoHashGrid.Bucket cell : geoGrid ){
|
||||
String geohash = cell.getGeoHash();
|
||||
long bucketCount=cell.getDocCount();
|
||||
int expectedBucketCount=expectedDocCountsForGeoHash.get(geohash);
|
||||
assertNotSame(bucketCount, 0);
|
||||
assertTrue("Buckets must be filtered", geohash.startsWith(smallestGeoHash));
|
||||
assertEquals("Geohash "+geohash+" has wrong doc count ",
|
||||
expectedBucketCount,bucketCount);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void unmapped() throws Exception {
|
||||
client().admin().cluster().prepareHealth("idx_unmapped").setWaitForYellowStatus().execute().actionGet();
|
||||
|
||||
|
||||
for (int precision = 1; precision <= highestPrecisionGeohash; precision++) {
|
||||
SearchResponse response = client().prepareSearch("idx_unmapped")
|
||||
.addAggregation(geohashGrid("geohashgrid")
|
||||
.field("location")
|
||||
.precision(precision)
|
||||
)
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
|
||||
GeoHashGrid geoGrid = response.getAggregations().get("geohashgrid");
|
||||
assertThat(geoGrid.getNumberOfBuckets(), equalTo(0));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void partiallyUnmapped() throws Exception {
|
||||
for (int precision = 1; precision <= highestPrecisionGeohash; precision++) {
|
||||
SearchResponse response = client().prepareSearch("idx","idx_unmapped")
|
||||
.addAggregation(geohashGrid("geohashgrid")
|
||||
.field("location")
|
||||
.precision(precision)
|
||||
)
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
|
||||
GeoHashGrid geoGrid = response.getAggregations().get("geohashgrid");
|
||||
for (GeoHashGrid.Bucket cell : geoGrid ){
|
||||
String geohash=cell.getGeoHash();
|
||||
|
||||
long bucketCount=cell.getDocCount();
|
||||
int expectedBucketCount=expectedDocCountsForGeoHash.get(geohash);
|
||||
assertNotSame(bucketCount, 0);
|
||||
assertEquals("Geohash "+geohash+" has wrong doc count ",
|
||||
expectedBucketCount,bucketCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopMatch() throws Exception {
|
||||
for (int precision = 1; precision <= highestPrecisionGeohash; precision++) {
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(geohashGrid("geohashgrid")
|
||||
.field("location")
|
||||
.size(1)
|
||||
.shardSize(100)
|
||||
.precision(precision)
|
||||
)
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(response.getFailedShards(), equalTo(0));
|
||||
|
||||
GeoHashGrid geoGrid = response.getAggregations().get("geohashgrid");
|
||||
//Check we only have one bucket with the best match for that resolution
|
||||
assertThat(geoGrid.getNumberOfBuckets(), equalTo(1));
|
||||
for (GeoHashGrid.Bucket cell : geoGrid ){
|
||||
String geohash=cell.getGeoHash();
|
||||
long bucketCount=cell.getDocCount();
|
||||
int expectedBucketCount=0;
|
||||
for (ObjectIntCursor<String> cursor : expectedDocCountsForGeoHash) {
|
||||
if(cursor.key.length()==precision)
|
||||
{
|
||||
expectedBucketCount=Math.max(expectedBucketCount, cursor.value);
|
||||
}
|
||||
}
|
||||
assertNotSame(bucketCount, 0);
|
||||
assertEquals("Geohash "+geohash+" has wrong doc count ",
|
||||
expectedBucketCount,bucketCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue