diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/CellIdSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/CellIdSource.java new file mode 100644 index 00000000000..268a27b4669 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/CellIdSource.java @@ -0,0 +1,95 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.geogrid; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; +import org.elasticsearch.common.geo.GeoHashUtils; +import org.elasticsearch.index.fielddata.AbstractSortingNumericDocValues; +import org.elasticsearch.index.fielddata.MultiGeoPointValues; +import org.elasticsearch.index.fielddata.SortedBinaryDocValues; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.search.aggregations.support.ValuesSource; + +import java.io.IOException; + +/** + * Wrapper class to help convert {@link MultiGeoPointValues} + * to numeric long values for bucketing. + */ +class CellIdSource extends ValuesSource.Numeric { + private final ValuesSource.GeoPoint valuesSource; + private final int precision; + + CellIdSource(GeoPoint valuesSource, int precision) { + this.valuesSource = valuesSource; + //different GeoPoints could map to the same or different geohash cells. + this.precision = precision; + } + + public int precision() { + return precision; + } + + @Override + public boolean isFloatingPoint() { + return false; + } + + @Override + public SortedNumericDocValues longValues(LeafReaderContext ctx) { + return new CellValues(valuesSource.geoPointValues(ctx), precision); + } + + @Override + public SortedNumericDoubleValues doubleValues(LeafReaderContext ctx) { + throw new UnsupportedOperationException(); + } + + @Override + public SortedBinaryDocValues bytesValues(LeafReaderContext ctx) { + throw new UnsupportedOperationException(); + } + + private static class CellValues extends AbstractSortingNumericDocValues { + private MultiGeoPointValues geoValues; + private int precision; + + protected CellValues(MultiGeoPointValues geoValues, int precision) { + this.geoValues = geoValues; + this.precision = precision; + } + + @Override + public boolean advanceExact(int docId) throws IOException { + if (geoValues.advanceExact(docId)) { + resize(geoValues.docValueCount()); + for (int i = 0; i < docValueCount(); ++i) { + org.elasticsearch.common.geo.GeoPoint target = geoValues.nextValue(); + values[i] = GeoHashUtils.longEncode(target.getLon(), target.getLat(), + precision); + } + sort(); + return true; + } else { + return false; + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java index 38469ff8753..85e4c8b228e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java @@ -19,21 +19,13 @@ package org.elasticsearch.search.aggregations.bucket.geogrid; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SortedNumericDocValues; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.geo.GeoHashUtils; -import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.geo.GeoUtils; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.fielddata.AbstractSortingNumericDocValues; -import org.elasticsearch.index.fielddata.MultiGeoPointValues; -import org.elasticsearch.index.fielddata.SortedBinaryDocValues; -import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; import org.elasticsearch.search.aggregations.AggregatorFactory; @@ -207,65 +199,5 @@ public class GeoGridAggregationBuilder extends ValuesSourceAggregationBuilder { + + protected long geohashAsLong; + protected long docCount; + protected InternalAggregations aggregations; + + GeoGridBucket(long geohashAsLong, long docCount, InternalAggregations aggregations) { + this.docCount = docCount; + this.aggregations = aggregations; + this.geohashAsLong = geohashAsLong; + } + + /** + * Read from a stream. + */ + GeoGridBucket(StreamInput in) throws IOException { + geohashAsLong = in.readLong(); + docCount = in.readVLong(); + aggregations = InternalAggregations.readAggregations(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(geohashAsLong); + out.writeVLong(docCount); + aggregations.writeTo(out); + } + + @Override + public String getKeyAsString() { + return GeoHashUtils.stringEncode(geohashAsLong); + } + + @Override + public GeoPoint getKey() { + return GeoPoint.fromGeohash(geohashAsLong); + } + + @Override + public long getDocCount() { + return docCount; + } + + @Override + public Aggregations getAggregations() { + return aggregations; + } + + @Override + public int compareTo(GeoGridBucket other) { + if (this.geohashAsLong > other.geohashAsLong) { + return 1; + } + if (this.geohashAsLong < other.geohashAsLong) { + return -1; + } + return 0; + } + + public GeoGridBucket reduce(List buckets, InternalAggregation.ReduceContext context) { + List aggregationsList = new ArrayList<>(buckets.size()); + long docCount = 0; + for (GeoGridBucket bucket : buckets) { + docCount += bucket.docCount; + aggregationsList.add(bucket.aggregations); + } + final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); + return new GeoGridBucket(geohashAsLong, docCount, aggs); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Aggregation.CommonFields.KEY.getPreferredName(), getKeyAsString()); + builder.field(Aggregation.CommonFields.DOC_COUNT.getPreferredName(), docCount); + aggregations.toXContentInternal(builder, params); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GeoGridBucket bucket = (GeoGridBucket) o; + return geohashAsLong == bucket.geohashAsLong && + docCount == bucket.docCount && + Objects.equals(aggregations, bucket.aggregations); + } + + @Override + public int hashCode() { + return Objects.hash(geohashAsLong, docCount, aggregations); + } + +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregator.java index f43cfae61ba..1ead747bb93 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregator.java @@ -44,10 +44,10 @@ public class GeoHashGridAggregator extends BucketsAggregator { private final int requiredSize; private final int shardSize; - private final GeoGridAggregationBuilder.CellIdSource valuesSource; + private final CellIdSource valuesSource; private final LongHash bucketOrds; - GeoHashGridAggregator(String name, AggregatorFactories factories, GeoGridAggregationBuilder.CellIdSource valuesSource, + GeoHashGridAggregator(String name, AggregatorFactories factories, CellIdSource valuesSource, int requiredSize, int shardSize, SearchContext aggregationContext, Aggregator parent, List pipelineAggregators, Map metaData) throws IOException { super(name, factories, aggregationContext, parent, pipelineAggregators, metaData); @@ -96,7 +96,7 @@ public class GeoHashGridAggregator extends BucketsAggregator { } // private impl that stores a bucket ord. This allows for computing the aggregations lazily. - static class OrdinalBucket extends InternalGeoHashGrid.Bucket { + static class OrdinalBucket extends GeoGridBucket { long bucketOrd; @@ -125,7 +125,7 @@ public class GeoHashGridAggregator extends BucketsAggregator { spare = (OrdinalBucket) ordered.insertWithOverflow(spare); } - final InternalGeoHashGrid.Bucket[] list = new InternalGeoHashGrid.Bucket[ordered.size()]; + final GeoGridBucket[] list = new GeoGridBucket[ordered.size()]; for (int i = ordered.size() - 1; i >= 0; --i) { final OrdinalBucket bucket = (OrdinalBucket) ordered.pop(); bucket.aggregations = bucketAggregations(bucket.bucketOrd); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregatorFactory.java index 13b48501564..b7cb50b5f44 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregatorFactory.java @@ -24,7 +24,6 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.NonCollectingAggregator; -import org.elasticsearch.search.aggregations.bucket.geogrid.GeoGridAggregationBuilder.CellIdSource; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSource.GeoPoint; @@ -56,7 +55,7 @@ public class GeoHashGridAggregatorFactory extends ValuesSourceAggregatorFactory< protected Aggregator createUnmapped(Aggregator parent, List pipelineAggregators, Map metaData) throws IOException { final InternalAggregation aggregation = new InternalGeoHashGrid(name, requiredSize, - Collections. emptyList(), pipelineAggregators, metaData); + Collections.emptyList(), pipelineAggregators, metaData); return new NonCollectingAggregator(name, context, parent, pipelineAggregators, metaData) { @Override public InternalAggregation buildEmptyAggregation() { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java index bc60f5945eb..6f887e644b3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java @@ -19,13 +19,10 @@ package org.elasticsearch.search.aggregations.bucket.geogrid; import org.apache.lucene.util.PriorityQueue; -import org.elasticsearch.common.geo.GeoHashUtils; -import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.LongObjectPagedHashMap; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; @@ -45,110 +42,14 @@ import static java.util.Collections.unmodifiableList; * All geohashes in a grid are of the same precision and held internally as a single long * for efficiency's sake. */ -public class InternalGeoHashGrid extends InternalMultiBucketAggregation implements +public class InternalGeoHashGrid extends InternalMultiBucketAggregation implements GeoHashGrid { - static class Bucket extends InternalMultiBucketAggregation.InternalBucket implements GeoHashGrid.Bucket, Comparable { - - protected long geohashAsLong; - protected long docCount; - protected InternalAggregations aggregations; - - Bucket(long geohashAsLong, long docCount, InternalAggregations aggregations) { - this.docCount = docCount; - this.aggregations = aggregations; - this.geohashAsLong = geohashAsLong; - } - - /** - * Read from a stream. - */ - private Bucket(StreamInput in) throws IOException { - geohashAsLong = in.readLong(); - docCount = in.readVLong(); - aggregations = InternalAggregations.readAggregations(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeLong(geohashAsLong); - out.writeVLong(docCount); - aggregations.writeTo(out); - } - - @Override - public String getKeyAsString() { - return GeoHashUtils.stringEncode(geohashAsLong); - } - - @Override - public GeoPoint getKey() { - return GeoPoint.fromGeohash(geohashAsLong); - } - - @Override - public long getDocCount() { - return docCount; - } - - @Override - public Aggregations getAggregations() { - return aggregations; - } - - @Override - public int compareTo(Bucket other) { - if (this.geohashAsLong > other.geohashAsLong) { - return 1; - } - if (this.geohashAsLong < other.geohashAsLong) { - return -1; - } - return 0; - } - - public Bucket reduce(List buckets, ReduceContext context) { - List aggregationsList = new ArrayList<>(buckets.size()); - long docCount = 0; - for (Bucket bucket : buckets) { - docCount += bucket.docCount; - aggregationsList.add(bucket.aggregations); - } - final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); - return new Bucket(geohashAsLong, docCount, aggs); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(CommonFields.KEY.getPreferredName(), getKeyAsString()); - builder.field(CommonFields.DOC_COUNT.getPreferredName(), docCount); - aggregations.toXContentInternal(builder, params); - builder.endObject(); - return builder; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Bucket bucket = (Bucket) o; - return geohashAsLong == bucket.geohashAsLong && - docCount == bucket.docCount && - Objects.equals(aggregations, bucket.aggregations); - } - - @Override - public int hashCode() { - return Objects.hash(geohashAsLong, docCount, aggregations); - } - - } private final int requiredSize; - private final List buckets; + private final List buckets; - InternalGeoHashGrid(String name, int requiredSize, List buckets, List pipelineAggregators, - Map metaData) { + InternalGeoHashGrid(String name, int requiredSize, List buckets, List pipelineAggregators, + Map metaData) { super(name, pipelineAggregators, metaData); this.requiredSize = requiredSize; this.buckets = buckets; @@ -160,7 +61,7 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation buckets) { + public InternalGeoHashGrid create(List buckets) { return new InternalGeoHashGrid(this.name, this.requiredSize, buckets, this.pipelineAggregators(), this.metaData); } @Override - public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) { - return new Bucket(prototype.geohashAsLong, prototype.docCount, aggregations); + public GeoGridBucket createBucket(InternalAggregations aggregations, GeoGridBucket prototype) { + return new GeoGridBucket(prototype.geohashAsLong, prototype.docCount, aggregations); } @Override - public List getBuckets() { + public List getBuckets() { return unmodifiableList(buckets); } @Override public InternalGeoHashGrid doReduce(List aggregations, ReduceContext reduceContext) { - LongObjectPagedHashMap> buckets = null; + LongObjectPagedHashMap> buckets = null; for (InternalAggregation aggregation : aggregations) { InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregation; if (buckets == null) { buckets = new LongObjectPagedHashMap<>(grid.buckets.size(), reduceContext.bigArrays()); } - for (Bucket bucket : grid.buckets) { - List existingBuckets = buckets.get(bucket.geohashAsLong); + for (GeoGridBucket bucket : grid.buckets) { + List existingBuckets = buckets.get(bucket.geohashAsLong); if (existingBuckets == null) { existingBuckets = new ArrayList<>(aggregations.size()); buckets.put(bucket.geohashAsLong, existingBuckets); @@ -209,9 +110,9 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation> cursor : buckets) { - List sameCellBuckets = cursor.value; - Bucket removed = ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext)); + for (LongObjectPagedHashMap.Cursor> cursor : buckets) { + List sameCellBuckets = cursor.value; + GeoGridBucket removed = ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext)); if (removed != null) { reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed)); } else { @@ -219,7 +120,7 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation= 0; i--) { list[i] = ordered.pop(); } @@ -229,7 +130,7 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation { + static class BucketPriorityQueue extends PriorityQueue { BucketPriorityQueue(int size) { super(size); } @Override - protected boolean lessThan(Bucket o1, Bucket o2) { + protected boolean lessThan(GeoGridBucket o1, GeoGridBucket o2) { int cmp = Long.compare(o2.getDocCount(), o1.getDocCount()); if (cmp == 0) { cmp = o2.compareTo(o1); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGridTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGridTests.java index 822e05ffa65..78016833dbc 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGridTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGridTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.test.InternalMultiBucketAggregationTestCase; import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; -import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid.Bucket; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import java.util.ArrayList; @@ -50,13 +49,13 @@ public class InternalGeoHashGridTests extends InternalMultiBucketAggregationTest Map metaData, InternalAggregations aggregations) { int size = randomNumberOfBuckets(); - List buckets = new ArrayList<>(size); + List buckets = new ArrayList<>(size); for (int i = 0; i < size; i++) { double latitude = randomDoubleBetween(-90.0, 90.0, false); double longitude = randomDoubleBetween(-180.0, 180.0, false); long geoHashAsLong = GeoHashUtils.longEncode(longitude, latitude, 4); - buckets.add(new InternalGeoHashGrid.Bucket(geoHashAsLong, randomInt(IndexWriter.MAX_DOCS), aggregations)); + buckets.add(new GeoGridBucket(geoHashAsLong, randomInt(IndexWriter.MAX_DOCS), aggregations)); } return new InternalGeoHashGrid(name, size, buckets, pipelineAggregators, metaData); } @@ -68,24 +67,24 @@ public class InternalGeoHashGridTests extends InternalMultiBucketAggregationTest @Override protected void assertReduced(InternalGeoHashGrid reduced, List inputs) { - Map> map = new HashMap<>(); + Map> map = new HashMap<>(); for (InternalGeoHashGrid input : inputs) { for (GeoHashGrid.Bucket bucket : input.getBuckets()) { - InternalGeoHashGrid.Bucket internalBucket = (InternalGeoHashGrid.Bucket) bucket; - List buckets = map.get(internalBucket.geohashAsLong); + GeoGridBucket internalBucket = (GeoGridBucket) bucket; + List buckets = map.get(internalBucket.geohashAsLong); if (buckets == null) { map.put(internalBucket.geohashAsLong, buckets = new ArrayList<>()); } buckets.add(internalBucket); } } - List expectedBuckets = new ArrayList<>(); - for (Map.Entry> entry : map.entrySet()) { + List expectedBuckets = new ArrayList<>(); + for (Map.Entry> entry : map.entrySet()) { long docCount = 0; - for (InternalGeoHashGrid.Bucket bucket : entry.getValue()) { + for (GeoGridBucket bucket : entry.getValue()) { docCount += bucket.docCount; } - expectedBuckets.add(new InternalGeoHashGrid.Bucket(entry.getKey(), docCount, InternalAggregations.EMPTY)); + expectedBuckets.add(new GeoGridBucket(entry.getKey(), docCount, InternalAggregations.EMPTY)); } expectedBuckets.sort((first, second) -> { int cmp = Long.compare(second.docCount, first.docCount); @@ -114,7 +113,7 @@ public class InternalGeoHashGridTests extends InternalMultiBucketAggregationTest protected InternalGeoHashGrid mutateInstance(InternalGeoHashGrid instance) { String name = instance.getName(); int size = instance.getRequiredSize(); - List buckets = instance.getBuckets(); + List buckets = instance.getBuckets(); List pipelineAggregators = instance.pipelineAggregators(); Map metaData = instance.getMetaData(); switch (between(0, 3)) { @@ -124,7 +123,7 @@ public class InternalGeoHashGridTests extends InternalMultiBucketAggregationTest case 1: buckets = new ArrayList<>(buckets); buckets.add( - new InternalGeoHashGrid.Bucket(randomNonNegativeLong(), randomInt(IndexWriter.MAX_DOCS), InternalAggregations.EMPTY)); + new GeoGridBucket(randomNonNegativeLong(), randomInt(IndexWriter.MAX_DOCS), InternalAggregations.EMPTY)); break; case 2: size = size + between(1, 10);