refactor inner geogrid classes to own class files (#37596)
To make further refactoring of GeoGrid aggregations easier (related: #30320), splitting out these inner class dependencies into their own files makes it easier to map the relationship between classes
This commit is contained in:
parent
88b9810567
commit
106f900dfb
|
@ -0,0 +1,95 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.search.aggregations.bucket.geogrid;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.LeafReaderContext;
|
||||||
|
import org.apache.lucene.index.SortedNumericDocValues;
|
||||||
|
import org.elasticsearch.common.geo.GeoHashUtils;
|
||||||
|
import org.elasticsearch.index.fielddata.AbstractSortingNumericDocValues;
|
||||||
|
import org.elasticsearch.index.fielddata.MultiGeoPointValues;
|
||||||
|
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
|
||||||
|
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
|
||||||
|
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrapper class to help convert {@link MultiGeoPointValues}
|
||||||
|
* to numeric long values for bucketing.
|
||||||
|
*/
|
||||||
|
class CellIdSource extends ValuesSource.Numeric {
|
||||||
|
private final ValuesSource.GeoPoint valuesSource;
|
||||||
|
private final int precision;
|
||||||
|
|
||||||
|
CellIdSource(GeoPoint valuesSource, int precision) {
|
||||||
|
this.valuesSource = valuesSource;
|
||||||
|
//different GeoPoints could map to the same or different geohash cells.
|
||||||
|
this.precision = precision;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int precision() {
|
||||||
|
return precision;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isFloatingPoint() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SortedNumericDocValues longValues(LeafReaderContext ctx) {
|
||||||
|
return new CellValues(valuesSource.geoPointValues(ctx), precision);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SortedNumericDoubleValues doubleValues(LeafReaderContext ctx) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SortedBinaryDocValues bytesValues(LeafReaderContext ctx) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class CellValues extends AbstractSortingNumericDocValues {
|
||||||
|
private MultiGeoPointValues geoValues;
|
||||||
|
private int precision;
|
||||||
|
|
||||||
|
protected CellValues(MultiGeoPointValues geoValues, int precision) {
|
||||||
|
this.geoValues = geoValues;
|
||||||
|
this.precision = precision;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean advanceExact(int docId) throws IOException {
|
||||||
|
if (geoValues.advanceExact(docId)) {
|
||||||
|
resize(geoValues.docValueCount());
|
||||||
|
for (int i = 0; i < docValueCount(); ++i) {
|
||||||
|
org.elasticsearch.common.geo.GeoPoint target = geoValues.nextValue();
|
||||||
|
values[i] = GeoHashUtils.longEncode(target.getLon(), target.getLat(),
|
||||||
|
precision);
|
||||||
|
}
|
||||||
|
sort();
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,21 +19,13 @@
|
||||||
|
|
||||||
package org.elasticsearch.search.aggregations.bucket.geogrid;
|
package org.elasticsearch.search.aggregations.bucket.geogrid;
|
||||||
|
|
||||||
import org.apache.lucene.index.LeafReaderContext;
|
|
||||||
import org.apache.lucene.index.SortedNumericDocValues;
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.common.geo.GeoHashUtils;
|
|
||||||
import org.elasticsearch.common.geo.GeoPoint;
|
|
||||||
import org.elasticsearch.common.geo.GeoUtils;
|
import org.elasticsearch.common.geo.GeoUtils;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.index.fielddata.AbstractSortingNumericDocValues;
|
|
||||||
import org.elasticsearch.index.fielddata.MultiGeoPointValues;
|
|
||||||
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
|
|
||||||
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
|
|
||||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||||
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
|
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
|
||||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||||
|
@ -207,65 +199,5 @@ public class GeoGridAggregationBuilder extends ValuesSourceAggregationBuilder<Va
|
||||||
return NAME;
|
return NAME;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class CellValues extends AbstractSortingNumericDocValues {
|
|
||||||
private MultiGeoPointValues geoValues;
|
|
||||||
private int precision;
|
|
||||||
|
|
||||||
protected CellValues(MultiGeoPointValues geoValues, int precision) {
|
|
||||||
this.geoValues = geoValues;
|
|
||||||
this.precision = precision;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean advanceExact(int docId) throws IOException {
|
|
||||||
if (geoValues.advanceExact(docId)) {
|
|
||||||
resize(geoValues.docValueCount());
|
|
||||||
for (int i = 0; i < docValueCount(); ++i) {
|
|
||||||
GeoPoint target = geoValues.nextValue();
|
|
||||||
values[i] = GeoHashUtils.longEncode(target.getLon(), target.getLat(),
|
|
||||||
precision);
|
|
||||||
}
|
|
||||||
sort();
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static class CellIdSource extends ValuesSource.Numeric {
|
|
||||||
private final ValuesSource.GeoPoint valuesSource;
|
|
||||||
private final int precision;
|
|
||||||
|
|
||||||
CellIdSource(ValuesSource.GeoPoint valuesSource, int precision) {
|
|
||||||
this.valuesSource = valuesSource;
|
|
||||||
//different GeoPoints could map to the same or different geohash cells.
|
|
||||||
this.precision = precision;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int precision() {
|
|
||||||
return precision;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isFloatingPoint() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SortedNumericDocValues longValues(LeafReaderContext ctx) {
|
|
||||||
return new CellValues(valuesSource.geoPointValues(ctx), precision);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SortedNumericDoubleValues doubleValues(LeafReaderContext ctx) {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SortedBinaryDocValues bytesValues(LeafReaderContext ctx) {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,132 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.search.aggregations.bucket.geogrid;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.geo.GeoHashUtils;
|
||||||
|
import org.elasticsearch.common.geo.GeoPoint;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.search.aggregations.Aggregation;
|
||||||
|
import org.elasticsearch.search.aggregations.Aggregations;
|
||||||
|
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||||
|
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||||
|
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
class GeoGridBucket extends InternalMultiBucketAggregation.InternalBucket implements GeoHashGrid.Bucket, Comparable<GeoGridBucket> {
|
||||||
|
|
||||||
|
protected long geohashAsLong;
|
||||||
|
protected long docCount;
|
||||||
|
protected InternalAggregations aggregations;
|
||||||
|
|
||||||
|
GeoGridBucket(long geohashAsLong, long docCount, InternalAggregations aggregations) {
|
||||||
|
this.docCount = docCount;
|
||||||
|
this.aggregations = aggregations;
|
||||||
|
this.geohashAsLong = geohashAsLong;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read from a stream.
|
||||||
|
*/
|
||||||
|
GeoGridBucket(StreamInput in) throws IOException {
|
||||||
|
geohashAsLong = in.readLong();
|
||||||
|
docCount = in.readVLong();
|
||||||
|
aggregations = InternalAggregations.readAggregations(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
|
out.writeLong(geohashAsLong);
|
||||||
|
out.writeVLong(docCount);
|
||||||
|
aggregations.writeTo(out);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getKeyAsString() {
|
||||||
|
return GeoHashUtils.stringEncode(geohashAsLong);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GeoPoint getKey() {
|
||||||
|
return GeoPoint.fromGeohash(geohashAsLong);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getDocCount() {
|
||||||
|
return docCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Aggregations getAggregations() {
|
||||||
|
return aggregations;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(GeoGridBucket other) {
|
||||||
|
if (this.geohashAsLong > other.geohashAsLong) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (this.geohashAsLong < other.geohashAsLong) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GeoGridBucket reduce(List<? extends GeoGridBucket> buckets, InternalAggregation.ReduceContext context) {
|
||||||
|
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
|
||||||
|
long docCount = 0;
|
||||||
|
for (GeoGridBucket bucket : buckets) {
|
||||||
|
docCount += bucket.docCount;
|
||||||
|
aggregationsList.add(bucket.aggregations);
|
||||||
|
}
|
||||||
|
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
|
||||||
|
return new GeoGridBucket(geohashAsLong, docCount, aggs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
|
builder.startObject();
|
||||||
|
builder.field(Aggregation.CommonFields.KEY.getPreferredName(), getKeyAsString());
|
||||||
|
builder.field(Aggregation.CommonFields.DOC_COUNT.getPreferredName(), docCount);
|
||||||
|
aggregations.toXContentInternal(builder, params);
|
||||||
|
builder.endObject();
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
|
GeoGridBucket bucket = (GeoGridBucket) o;
|
||||||
|
return geohashAsLong == bucket.geohashAsLong &&
|
||||||
|
docCount == bucket.docCount &&
|
||||||
|
Objects.equals(aggregations, bucket.aggregations);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(geohashAsLong, docCount, aggregations);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -44,10 +44,10 @@ public class GeoHashGridAggregator extends BucketsAggregator {
|
||||||
|
|
||||||
private final int requiredSize;
|
private final int requiredSize;
|
||||||
private final int shardSize;
|
private final int shardSize;
|
||||||
private final GeoGridAggregationBuilder.CellIdSource valuesSource;
|
private final CellIdSource valuesSource;
|
||||||
private final LongHash bucketOrds;
|
private final LongHash bucketOrds;
|
||||||
|
|
||||||
GeoHashGridAggregator(String name, AggregatorFactories factories, GeoGridAggregationBuilder.CellIdSource valuesSource,
|
GeoHashGridAggregator(String name, AggregatorFactories factories, CellIdSource valuesSource,
|
||||||
int requiredSize, int shardSize, SearchContext aggregationContext, Aggregator parent,
|
int requiredSize, int shardSize, SearchContext aggregationContext, Aggregator parent,
|
||||||
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
|
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
|
||||||
super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
|
super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
|
||||||
|
@ -96,7 +96,7 @@ public class GeoHashGridAggregator extends BucketsAggregator {
|
||||||
}
|
}
|
||||||
|
|
||||||
// private impl that stores a bucket ord. This allows for computing the aggregations lazily.
|
// private impl that stores a bucket ord. This allows for computing the aggregations lazily.
|
||||||
static class OrdinalBucket extends InternalGeoHashGrid.Bucket {
|
static class OrdinalBucket extends GeoGridBucket {
|
||||||
|
|
||||||
long bucketOrd;
|
long bucketOrd;
|
||||||
|
|
||||||
|
@ -125,7 +125,7 @@ public class GeoHashGridAggregator extends BucketsAggregator {
|
||||||
spare = (OrdinalBucket) ordered.insertWithOverflow(spare);
|
spare = (OrdinalBucket) ordered.insertWithOverflow(spare);
|
||||||
}
|
}
|
||||||
|
|
||||||
final InternalGeoHashGrid.Bucket[] list = new InternalGeoHashGrid.Bucket[ordered.size()];
|
final GeoGridBucket[] list = new GeoGridBucket[ordered.size()];
|
||||||
for (int i = ordered.size() - 1; i >= 0; --i) {
|
for (int i = ordered.size() - 1; i >= 0; --i) {
|
||||||
final OrdinalBucket bucket = (OrdinalBucket) ordered.pop();
|
final OrdinalBucket bucket = (OrdinalBucket) ordered.pop();
|
||||||
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
|
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
|
||||||
|
|
|
@ -24,7 +24,6 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||||
import org.elasticsearch.search.aggregations.NonCollectingAggregator;
|
import org.elasticsearch.search.aggregations.NonCollectingAggregator;
|
||||||
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoGridAggregationBuilder.CellIdSource;
|
|
||||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||||
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
||||||
import org.elasticsearch.search.aggregations.support.ValuesSource.GeoPoint;
|
import org.elasticsearch.search.aggregations.support.ValuesSource.GeoPoint;
|
||||||
|
@ -56,7 +55,7 @@ public class GeoHashGridAggregatorFactory extends ValuesSourceAggregatorFactory<
|
||||||
protected Aggregator createUnmapped(Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
|
protected Aggregator createUnmapped(Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final InternalAggregation aggregation = new InternalGeoHashGrid(name, requiredSize,
|
final InternalAggregation aggregation = new InternalGeoHashGrid(name, requiredSize,
|
||||||
Collections.<InternalGeoHashGrid.Bucket> emptyList(), pipelineAggregators, metaData);
|
Collections.emptyList(), pipelineAggregators, metaData);
|
||||||
return new NonCollectingAggregator(name, context, parent, pipelineAggregators, metaData) {
|
return new NonCollectingAggregator(name, context, parent, pipelineAggregators, metaData) {
|
||||||
@Override
|
@Override
|
||||||
public InternalAggregation buildEmptyAggregation() {
|
public InternalAggregation buildEmptyAggregation() {
|
||||||
|
|
|
@ -19,13 +19,10 @@
|
||||||
package org.elasticsearch.search.aggregations.bucket.geogrid;
|
package org.elasticsearch.search.aggregations.bucket.geogrid;
|
||||||
|
|
||||||
import org.apache.lucene.util.PriorityQueue;
|
import org.apache.lucene.util.PriorityQueue;
|
||||||
import org.elasticsearch.common.geo.GeoHashUtils;
|
|
||||||
import org.elasticsearch.common.geo.GeoPoint;
|
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.util.LongObjectPagedHashMap;
|
import org.elasticsearch.common.util.LongObjectPagedHashMap;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.search.aggregations.Aggregations;
|
|
||||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||||
|
@ -45,110 +42,14 @@ import static java.util.Collections.unmodifiableList;
|
||||||
* All geohashes in a grid are of the same precision and held internally as a single long
|
* All geohashes in a grid are of the same precision and held internally as a single long
|
||||||
* for efficiency's sake.
|
* for efficiency's sake.
|
||||||
*/
|
*/
|
||||||
public class InternalGeoHashGrid extends InternalMultiBucketAggregation<InternalGeoHashGrid, InternalGeoHashGrid.Bucket> implements
|
public class InternalGeoHashGrid extends InternalMultiBucketAggregation<InternalGeoHashGrid, GeoGridBucket> implements
|
||||||
GeoHashGrid {
|
GeoHashGrid {
|
||||||
static class Bucket extends InternalMultiBucketAggregation.InternalBucket implements GeoHashGrid.Bucket, Comparable<Bucket> {
|
|
||||||
|
|
||||||
protected long geohashAsLong;
|
|
||||||
protected long docCount;
|
|
||||||
protected InternalAggregations aggregations;
|
|
||||||
|
|
||||||
Bucket(long geohashAsLong, long docCount, InternalAggregations aggregations) {
|
|
||||||
this.docCount = docCount;
|
|
||||||
this.aggregations = aggregations;
|
|
||||||
this.geohashAsLong = geohashAsLong;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Read from a stream.
|
|
||||||
*/
|
|
||||||
private Bucket(StreamInput in) throws IOException {
|
|
||||||
geohashAsLong = in.readLong();
|
|
||||||
docCount = in.readVLong();
|
|
||||||
aggregations = InternalAggregations.readAggregations(in);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
|
||||||
out.writeLong(geohashAsLong);
|
|
||||||
out.writeVLong(docCount);
|
|
||||||
aggregations.writeTo(out);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getKeyAsString() {
|
|
||||||
return GeoHashUtils.stringEncode(geohashAsLong);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public GeoPoint getKey() {
|
|
||||||
return GeoPoint.fromGeohash(geohashAsLong);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getDocCount() {
|
|
||||||
return docCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Aggregations getAggregations() {
|
|
||||||
return aggregations;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int compareTo(Bucket other) {
|
|
||||||
if (this.geohashAsLong > other.geohashAsLong) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
if (this.geohashAsLong < other.geohashAsLong) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Bucket reduce(List<? extends Bucket> buckets, ReduceContext context) {
|
|
||||||
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
|
|
||||||
long docCount = 0;
|
|
||||||
for (Bucket bucket : buckets) {
|
|
||||||
docCount += bucket.docCount;
|
|
||||||
aggregationsList.add(bucket.aggregations);
|
|
||||||
}
|
|
||||||
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
|
|
||||||
return new Bucket(geohashAsLong, docCount, aggs);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
|
||||||
builder.startObject();
|
|
||||||
builder.field(CommonFields.KEY.getPreferredName(), getKeyAsString());
|
|
||||||
builder.field(CommonFields.DOC_COUNT.getPreferredName(), docCount);
|
|
||||||
aggregations.toXContentInternal(builder, params);
|
|
||||||
builder.endObject();
|
|
||||||
return builder;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object o) {
|
|
||||||
if (this == o) return true;
|
|
||||||
if (o == null || getClass() != o.getClass()) return false;
|
|
||||||
Bucket bucket = (Bucket) o;
|
|
||||||
return geohashAsLong == bucket.geohashAsLong &&
|
|
||||||
docCount == bucket.docCount &&
|
|
||||||
Objects.equals(aggregations, bucket.aggregations);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return Objects.hash(geohashAsLong, docCount, aggregations);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private final int requiredSize;
|
private final int requiredSize;
|
||||||
private final List<Bucket> buckets;
|
private final List<GeoGridBucket> buckets;
|
||||||
|
|
||||||
InternalGeoHashGrid(String name, int requiredSize, List<Bucket> buckets, List<PipelineAggregator> pipelineAggregators,
|
InternalGeoHashGrid(String name, int requiredSize, List<GeoGridBucket> buckets, List<PipelineAggregator> pipelineAggregators,
|
||||||
Map<String, Object> metaData) {
|
Map<String, Object> metaData) {
|
||||||
super(name, pipelineAggregators, metaData);
|
super(name, pipelineAggregators, metaData);
|
||||||
this.requiredSize = requiredSize;
|
this.requiredSize = requiredSize;
|
||||||
this.buckets = buckets;
|
this.buckets = buckets;
|
||||||
|
@ -160,7 +61,7 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation<Internal
|
||||||
public InternalGeoHashGrid(StreamInput in) throws IOException {
|
public InternalGeoHashGrid(StreamInput in) throws IOException {
|
||||||
super(in);
|
super(in);
|
||||||
requiredSize = readSize(in);
|
requiredSize = readSize(in);
|
||||||
buckets = in.readList(Bucket::new);
|
buckets = in.readList(GeoGridBucket::new);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -175,30 +76,30 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation<Internal
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalGeoHashGrid create(List<Bucket> buckets) {
|
public InternalGeoHashGrid create(List<GeoGridBucket> buckets) {
|
||||||
return new InternalGeoHashGrid(this.name, this.requiredSize, buckets, this.pipelineAggregators(), this.metaData);
|
return new InternalGeoHashGrid(this.name, this.requiredSize, buckets, this.pipelineAggregators(), this.metaData);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) {
|
public GeoGridBucket createBucket(InternalAggregations aggregations, GeoGridBucket prototype) {
|
||||||
return new Bucket(prototype.geohashAsLong, prototype.docCount, aggregations);
|
return new GeoGridBucket(prototype.geohashAsLong, prototype.docCount, aggregations);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<InternalGeoHashGrid.Bucket> getBuckets() {
|
public List<GeoGridBucket> getBuckets() {
|
||||||
return unmodifiableList(buckets);
|
return unmodifiableList(buckets);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalGeoHashGrid doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
|
public InternalGeoHashGrid doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
|
||||||
LongObjectPagedHashMap<List<Bucket>> buckets = null;
|
LongObjectPagedHashMap<List<GeoGridBucket>> buckets = null;
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
for (InternalAggregation aggregation : aggregations) {
|
||||||
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregation;
|
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregation;
|
||||||
if (buckets == null) {
|
if (buckets == null) {
|
||||||
buckets = new LongObjectPagedHashMap<>(grid.buckets.size(), reduceContext.bigArrays());
|
buckets = new LongObjectPagedHashMap<>(grid.buckets.size(), reduceContext.bigArrays());
|
||||||
}
|
}
|
||||||
for (Bucket bucket : grid.buckets) {
|
for (GeoGridBucket bucket : grid.buckets) {
|
||||||
List<Bucket> existingBuckets = buckets.get(bucket.geohashAsLong);
|
List<GeoGridBucket> existingBuckets = buckets.get(bucket.geohashAsLong);
|
||||||
if (existingBuckets == null) {
|
if (existingBuckets == null) {
|
||||||
existingBuckets = new ArrayList<>(aggregations.size());
|
existingBuckets = new ArrayList<>(aggregations.size());
|
||||||
buckets.put(bucket.geohashAsLong, existingBuckets);
|
buckets.put(bucket.geohashAsLong, existingBuckets);
|
||||||
|
@ -209,9 +110,9 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation<Internal
|
||||||
|
|
||||||
final int size = Math.toIntExact(reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()));
|
final int size = Math.toIntExact(reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()));
|
||||||
BucketPriorityQueue ordered = new BucketPriorityQueue(size);
|
BucketPriorityQueue ordered = new BucketPriorityQueue(size);
|
||||||
for (LongObjectPagedHashMap.Cursor<List<Bucket>> cursor : buckets) {
|
for (LongObjectPagedHashMap.Cursor<List<GeoGridBucket>> cursor : buckets) {
|
||||||
List<Bucket> sameCellBuckets = cursor.value;
|
List<GeoGridBucket> sameCellBuckets = cursor.value;
|
||||||
Bucket removed = ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext));
|
GeoGridBucket removed = ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext));
|
||||||
if (removed != null) {
|
if (removed != null) {
|
||||||
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
|
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
|
||||||
} else {
|
} else {
|
||||||
|
@ -219,7 +120,7 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation<Internal
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
buckets.close();
|
buckets.close();
|
||||||
Bucket[] list = new Bucket[ordered.size()];
|
GeoGridBucket[] list = new GeoGridBucket[ordered.size()];
|
||||||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
for (int i = ordered.size() - 1; i >= 0; i--) {
|
||||||
list[i] = ordered.pop();
|
list[i] = ordered.pop();
|
||||||
}
|
}
|
||||||
|
@ -229,7 +130,7 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation<Internal
|
||||||
@Override
|
@Override
|
||||||
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.startArray(CommonFields.BUCKETS.getPreferredName());
|
builder.startArray(CommonFields.BUCKETS.getPreferredName());
|
||||||
for (Bucket bucket : buckets) {
|
for (GeoGridBucket bucket : buckets) {
|
||||||
bucket.toXContent(builder, params);
|
bucket.toXContent(builder, params);
|
||||||
}
|
}
|
||||||
builder.endArray();
|
builder.endArray();
|
||||||
|
@ -253,14 +154,14 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation<Internal
|
||||||
Objects.equals(buckets, other.buckets);
|
Objects.equals(buckets, other.buckets);
|
||||||
}
|
}
|
||||||
|
|
||||||
static class BucketPriorityQueue extends PriorityQueue<Bucket> {
|
static class BucketPriorityQueue extends PriorityQueue<GeoGridBucket> {
|
||||||
|
|
||||||
BucketPriorityQueue(int size) {
|
BucketPriorityQueue(int size) {
|
||||||
super(size);
|
super(size);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean lessThan(Bucket o1, Bucket o2) {
|
protected boolean lessThan(GeoGridBucket o1, GeoGridBucket o2) {
|
||||||
int cmp = Long.compare(o2.getDocCount(), o1.getDocCount());
|
int cmp = Long.compare(o2.getDocCount(), o1.getDocCount());
|
||||||
if (cmp == 0) {
|
if (cmp == 0) {
|
||||||
cmp = o2.compareTo(o1);
|
cmp = o2.compareTo(o1);
|
||||||
|
|
|
@ -24,7 +24,6 @@ import org.elasticsearch.common.io.stream.Writeable;
|
||||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||||
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
|
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
|
||||||
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
|
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
|
||||||
import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid.Bucket;
|
|
||||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -50,13 +49,13 @@ public class InternalGeoHashGridTests extends InternalMultiBucketAggregationTest
|
||||||
Map<String, Object> metaData,
|
Map<String, Object> metaData,
|
||||||
InternalAggregations aggregations) {
|
InternalAggregations aggregations) {
|
||||||
int size = randomNumberOfBuckets();
|
int size = randomNumberOfBuckets();
|
||||||
List<InternalGeoHashGrid.Bucket> buckets = new ArrayList<>(size);
|
List<GeoGridBucket> buckets = new ArrayList<>(size);
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
double latitude = randomDoubleBetween(-90.0, 90.0, false);
|
double latitude = randomDoubleBetween(-90.0, 90.0, false);
|
||||||
double longitude = randomDoubleBetween(-180.0, 180.0, false);
|
double longitude = randomDoubleBetween(-180.0, 180.0, false);
|
||||||
|
|
||||||
long geoHashAsLong = GeoHashUtils.longEncode(longitude, latitude, 4);
|
long geoHashAsLong = GeoHashUtils.longEncode(longitude, latitude, 4);
|
||||||
buckets.add(new InternalGeoHashGrid.Bucket(geoHashAsLong, randomInt(IndexWriter.MAX_DOCS), aggregations));
|
buckets.add(new GeoGridBucket(geoHashAsLong, randomInt(IndexWriter.MAX_DOCS), aggregations));
|
||||||
}
|
}
|
||||||
return new InternalGeoHashGrid(name, size, buckets, pipelineAggregators, metaData);
|
return new InternalGeoHashGrid(name, size, buckets, pipelineAggregators, metaData);
|
||||||
}
|
}
|
||||||
|
@ -68,24 +67,24 @@ public class InternalGeoHashGridTests extends InternalMultiBucketAggregationTest
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void assertReduced(InternalGeoHashGrid reduced, List<InternalGeoHashGrid> inputs) {
|
protected void assertReduced(InternalGeoHashGrid reduced, List<InternalGeoHashGrid> inputs) {
|
||||||
Map<Long, List<InternalGeoHashGrid.Bucket>> map = new HashMap<>();
|
Map<Long, List<GeoGridBucket>> map = new HashMap<>();
|
||||||
for (InternalGeoHashGrid input : inputs) {
|
for (InternalGeoHashGrid input : inputs) {
|
||||||
for (GeoHashGrid.Bucket bucket : input.getBuckets()) {
|
for (GeoHashGrid.Bucket bucket : input.getBuckets()) {
|
||||||
InternalGeoHashGrid.Bucket internalBucket = (InternalGeoHashGrid.Bucket) bucket;
|
GeoGridBucket internalBucket = (GeoGridBucket) bucket;
|
||||||
List<InternalGeoHashGrid.Bucket> buckets = map.get(internalBucket.geohashAsLong);
|
List<GeoGridBucket> buckets = map.get(internalBucket.geohashAsLong);
|
||||||
if (buckets == null) {
|
if (buckets == null) {
|
||||||
map.put(internalBucket.geohashAsLong, buckets = new ArrayList<>());
|
map.put(internalBucket.geohashAsLong, buckets = new ArrayList<>());
|
||||||
}
|
}
|
||||||
buckets.add(internalBucket);
|
buckets.add(internalBucket);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
List<InternalGeoHashGrid.Bucket> expectedBuckets = new ArrayList<>();
|
List<GeoGridBucket> expectedBuckets = new ArrayList<>();
|
||||||
for (Map.Entry<Long, List<InternalGeoHashGrid.Bucket>> entry : map.entrySet()) {
|
for (Map.Entry<Long, List<GeoGridBucket>> entry : map.entrySet()) {
|
||||||
long docCount = 0;
|
long docCount = 0;
|
||||||
for (InternalGeoHashGrid.Bucket bucket : entry.getValue()) {
|
for (GeoGridBucket bucket : entry.getValue()) {
|
||||||
docCount += bucket.docCount;
|
docCount += bucket.docCount;
|
||||||
}
|
}
|
||||||
expectedBuckets.add(new InternalGeoHashGrid.Bucket(entry.getKey(), docCount, InternalAggregations.EMPTY));
|
expectedBuckets.add(new GeoGridBucket(entry.getKey(), docCount, InternalAggregations.EMPTY));
|
||||||
}
|
}
|
||||||
expectedBuckets.sort((first, second) -> {
|
expectedBuckets.sort((first, second) -> {
|
||||||
int cmp = Long.compare(second.docCount, first.docCount);
|
int cmp = Long.compare(second.docCount, first.docCount);
|
||||||
|
@ -114,7 +113,7 @@ public class InternalGeoHashGridTests extends InternalMultiBucketAggregationTest
|
||||||
protected InternalGeoHashGrid mutateInstance(InternalGeoHashGrid instance) {
|
protected InternalGeoHashGrid mutateInstance(InternalGeoHashGrid instance) {
|
||||||
String name = instance.getName();
|
String name = instance.getName();
|
||||||
int size = instance.getRequiredSize();
|
int size = instance.getRequiredSize();
|
||||||
List<Bucket> buckets = instance.getBuckets();
|
List<GeoGridBucket> buckets = instance.getBuckets();
|
||||||
List<PipelineAggregator> pipelineAggregators = instance.pipelineAggregators();
|
List<PipelineAggregator> pipelineAggregators = instance.pipelineAggregators();
|
||||||
Map<String, Object> metaData = instance.getMetaData();
|
Map<String, Object> metaData = instance.getMetaData();
|
||||||
switch (between(0, 3)) {
|
switch (between(0, 3)) {
|
||||||
|
@ -124,7 +123,7 @@ public class InternalGeoHashGridTests extends InternalMultiBucketAggregationTest
|
||||||
case 1:
|
case 1:
|
||||||
buckets = new ArrayList<>(buckets);
|
buckets = new ArrayList<>(buckets);
|
||||||
buckets.add(
|
buckets.add(
|
||||||
new InternalGeoHashGrid.Bucket(randomNonNegativeLong(), randomInt(IndexWriter.MAX_DOCS), InternalAggregations.EMPTY));
|
new GeoGridBucket(randomNonNegativeLong(), randomInt(IndexWriter.MAX_DOCS), InternalAggregations.EMPTY));
|
||||||
break;
|
break;
|
||||||
case 2:
|
case 2:
|
||||||
size = size + between(1, 10);
|
size = size + between(1, 10);
|
||||||
|
|
Loading…
Reference in New Issue