mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-25 14:26:27 +00:00
Migrate geohash_grid and geo_bounds to NamedWriteable
Just another small step in removing Aggregation's custom streams implementation in favor of NamedWriteable.
This commit is contained in:
parent
f479219ca7
commit
06bd896ce0
@ -604,7 +604,6 @@
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]cardinality[/\\]CardinalityAggregator.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]cardinality[/\\]HyperLogLogPlusPlus.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]geobounds[/\\]GeoBoundsAggregator.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]geobounds[/\\]InternalGeoBounds.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]percentiles[/\\]tdigest[/\\]AbstractTDigestPercentilesAggregator.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]percentiles[/\\]tdigest[/\\]TDigestPercentileRanksAggregator.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]percentiles[/\\]tdigest[/\\]TDigestPercentilesAggregator.java" checks="LineLength" />
|
||||
|
@ -542,14 +542,16 @@ public class SearchModule extends AbstractModule {
|
||||
DateHistogramAggregationBuilder.AGGREGATION_NAME_FIELD);
|
||||
registerAggregation(new AggregationSpec(GeoDistanceAggregationBuilder::new, new GeoDistanceParser(),
|
||||
GeoDistanceAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalGeoDistance::new));
|
||||
registerAggregation(GeoGridAggregationBuilder::new, new GeoHashGridParser(), GeoGridAggregationBuilder.AGGREGATION_NAME_FIELD);
|
||||
registerAggregation(new AggregationSpec(GeoGridAggregationBuilder::new, new GeoHashGridParser(),
|
||||
GeoGridAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalGeoHashGrid::new));
|
||||
registerAggregation(NestedAggregationBuilder::new, NestedAggregationBuilder::parse,
|
||||
NestedAggregationBuilder.AGGREGATION_FIELD_NAME);
|
||||
registerAggregation(ReverseNestedAggregationBuilder::new, ReverseNestedAggregationBuilder::parse,
|
||||
ReverseNestedAggregationBuilder.AGGREGATION_NAME_FIELD);
|
||||
registerAggregation(TopHitsAggregationBuilder::new, TopHitsAggregationBuilder::parse,
|
||||
TopHitsAggregationBuilder.AGGREGATION_NAME_FIELD);
|
||||
registerAggregation(GeoBoundsAggregationBuilder::new, new GeoBoundsParser(), GeoBoundsAggregationBuilder.AGGREGATION_NAME_FIED);
|
||||
registerAggregation(new AggregationSpec(GeoBoundsAggregationBuilder::new, new GeoBoundsParser(),
|
||||
GeoBoundsAggregationBuilder.AGGREGATION_NAME_FIED).addResultReader(InternalGeoBounds::new));
|
||||
registerAggregation(new AggregationSpec(GeoCentroidAggregationBuilder::new, new GeoCentroidParser(),
|
||||
GeoCentroidAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalGeoCentroid::new));
|
||||
registerAggregation(new AggregationSpec(ScriptedMetricAggregationBuilder::new, ScriptedMetricAggregationBuilder::parse,
|
||||
@ -820,13 +822,11 @@ public class SearchModule extends AbstractModule {
|
||||
|
||||
static {
|
||||
// buckets
|
||||
InternalGeoHashGrid.registerStreams();
|
||||
InternalBinaryRange.registerStream();
|
||||
InternalHistogram.registerStream();
|
||||
InternalNested.registerStream();
|
||||
InternalReverseNested.registerStream();
|
||||
InternalTopHits.registerStreams();
|
||||
InternalGeoBounds.registerStream();
|
||||
InternalChildren.registerStream();
|
||||
|
||||
// Pipeline Aggregations
|
||||
|
@ -33,6 +33,7 @@ import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
|
||||
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
|
||||
import org.elasticsearch.index.fielddata.SortingNumericDocValues;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.bucket.BucketUtils;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||
@ -47,7 +48,8 @@ import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class GeoGridAggregationBuilder extends ValuesSourceAggregationBuilder<ValuesSource.GeoPoint, GeoGridAggregationBuilder> {
|
||||
public static final String NAME = InternalGeoHashGrid.TYPE.name();
|
||||
public static final String NAME = "geohash_grid";
|
||||
private static final Type TYPE = new Type(NAME);
|
||||
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
|
||||
|
||||
private int precision = GeoHashGridParser.DEFAULT_PRECISION;
|
||||
@ -55,14 +57,14 @@ public class GeoGridAggregationBuilder extends ValuesSourceAggregationBuilder<Va
|
||||
private int shardSize = -1;
|
||||
|
||||
public GeoGridAggregationBuilder(String name) {
|
||||
super(name, InternalGeoHashGrid.TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
|
||||
super(name, TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from a stream.
|
||||
*/
|
||||
public GeoGridAggregationBuilder(StreamInput in) throws IOException {
|
||||
super(in, InternalGeoHashGrid.TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
|
||||
super(in, TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
|
||||
precision = in.readVInt();
|
||||
requiredSize = in.readVInt();
|
||||
shardSize = in.readVInt();
|
||||
|
@ -25,7 +25,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.util.LongObjectPagedHashMap;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
@ -35,10 +34,11 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
|
||||
/**
|
||||
* Represents a grid of cells where each cell's location is determined by a geohash.
|
||||
* All geohashes in a grid are of the same precision and held internally as a single long
|
||||
@ -46,39 +46,35 @@ import java.util.Map;
|
||||
*/
|
||||
public class InternalGeoHashGrid extends InternalMultiBucketAggregation<InternalGeoHashGrid, InternalGeoHashGrid.Bucket> implements
|
||||
GeoHashGrid {
|
||||
|
||||
public static final Type TYPE = new Type("geohash_grid", "ghcells");
|
||||
|
||||
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
|
||||
@Override
|
||||
public InternalGeoHashGrid readResult(StreamInput in) throws IOException {
|
||||
InternalGeoHashGrid buckets = new InternalGeoHashGrid();
|
||||
buckets.readFrom(in);
|
||||
return buckets;
|
||||
}
|
||||
};
|
||||
|
||||
public static void registerStreams() {
|
||||
AggregationStreams.registerStream(STREAM, TYPE.stream());
|
||||
}
|
||||
|
||||
|
||||
static class Bucket extends InternalMultiBucketAggregation.InternalBucket implements GeoHashGrid.Bucket, Comparable<Bucket> {
|
||||
|
||||
protected long geohashAsLong;
|
||||
protected long docCount;
|
||||
protected InternalAggregations aggregations;
|
||||
|
||||
public Bucket() {
|
||||
// For Serialization only
|
||||
}
|
||||
|
||||
public Bucket(long geohashAsLong, long docCount, InternalAggregations aggregations) {
|
||||
this.docCount = docCount;
|
||||
this.aggregations = aggregations;
|
||||
this.geohashAsLong = geohashAsLong;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from a stream.
|
||||
*/
|
||||
private Bucket(StreamInput in) throws IOException {
|
||||
geohashAsLong = in.readLong();
|
||||
docCount = in.readVLong();
|
||||
aggregations = InternalAggregations.readAggregations(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeLong(geohashAsLong);
|
||||
out.writeVLong(docCount);
|
||||
aggregations.writeTo(out);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getKeyAsString() {
|
||||
return GeoHashUtils.stringEncode(geohashAsLong);
|
||||
@ -121,20 +117,6 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation<Internal
|
||||
return new Bucket(geohashAsLong, docCount, aggs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
geohashAsLong = in.readLong();
|
||||
docCount = in.readVLong();
|
||||
aggregations = InternalAggregations.readAggregations(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeLong(geohashAsLong);
|
||||
out.writeVLong(docCount);
|
||||
aggregations.writeTo(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
@ -145,23 +127,35 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation<Internal
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
private int requiredSize;
|
||||
private Collection<Bucket> buckets;
|
||||
protected Map<String, Bucket> bucketMap;
|
||||
|
||||
InternalGeoHashGrid() {
|
||||
} // for serialization
|
||||
private final int requiredSize;
|
||||
private final List<Bucket> buckets;
|
||||
|
||||
public InternalGeoHashGrid(String name, int requiredSize, Collection<Bucket> buckets, List<PipelineAggregator> pipelineAggregators,
|
||||
public InternalGeoHashGrid(String name, int requiredSize, List<Bucket> buckets, List<PipelineAggregator> pipelineAggregators,
|
||||
Map<String, Object> metaData) {
|
||||
super(name, pipelineAggregators, metaData);
|
||||
this.requiredSize = requiredSize;
|
||||
this.buckets = buckets;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from a stream.
|
||||
*/
|
||||
public InternalGeoHashGrid(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
requiredSize = readSize(in);
|
||||
buckets = in.readList(Bucket::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type type() {
|
||||
return TYPE;
|
||||
protected void doWriteTo(StreamOutput out) throws IOException {
|
||||
writeSize(requiredSize, out);
|
||||
out.writeList(buckets);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return GeoGridAggregationBuilder.NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -174,11 +168,9 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation<Internal
|
||||
return new Bucket(prototype.geohashAsLong, prototype.docCount, aggregations);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public List<GeoHashGrid.Bucket> getBuckets() {
|
||||
Object o = buckets;
|
||||
return (List<GeoHashGrid.Bucket>) o;
|
||||
return unmodifiableList(buckets);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -214,29 +206,6 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation<Internal
|
||||
return new InternalGeoHashGrid(getName(), requiredSize, Arrays.asList(list), pipelineAggregators(), getMetaData());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doReadFrom(StreamInput in) throws IOException {
|
||||
this.requiredSize = readSize(in);
|
||||
int size = in.readVInt();
|
||||
List<Bucket> buckets = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
Bucket bucket = new Bucket();
|
||||
bucket.readFrom(in);
|
||||
buckets.add(bucket);
|
||||
}
|
||||
this.buckets = buckets;
|
||||
this.bucketMap = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doWriteTo(StreamOutput out) throws IOException {
|
||||
writeSize(requiredSize, out);
|
||||
out.writeVInt(buckets.size());
|
||||
for (Bucket bucket : buckets) {
|
||||
bucket.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startArray(CommonFields.BUCKETS);
|
||||
|
@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||
@ -36,20 +37,21 @@ import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class GeoBoundsAggregationBuilder extends ValuesSourceAggregationBuilder<ValuesSource.GeoPoint, GeoBoundsAggregationBuilder> {
|
||||
public static final String NAME = InternalGeoBounds.TYPE.name();
|
||||
public static final String NAME = "geo_bounds";
|
||||
private static final Type TYPE = new Type(NAME);
|
||||
public static final ParseField AGGREGATION_NAME_FIED = new ParseField(NAME);
|
||||
|
||||
private boolean wrapLongitude = true;
|
||||
|
||||
public GeoBoundsAggregationBuilder(String name) {
|
||||
super(name, InternalGeoBounds.TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
|
||||
super(name, TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from a stream.
|
||||
*/
|
||||
public GeoBoundsAggregationBuilder(StreamInput in) throws IOException {
|
||||
super(in, InternalGeoBounds.TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
|
||||
super(in, TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
|
||||
wrapLongitude = in.readBoolean();
|
||||
}
|
||||
|
||||
|
@ -23,7 +23,6 @@ import org.elasticsearch.common.geo.GeoPoint;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
@ -33,28 +32,14 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class InternalGeoBounds extends InternalMetricsAggregation implements GeoBounds {
|
||||
private final double top;
|
||||
private final double bottom;
|
||||
private final double posLeft;
|
||||
private final double posRight;
|
||||
private final double negLeft;
|
||||
private final double negRight;
|
||||
private final boolean wrapLongitude;
|
||||
|
||||
public static final Type TYPE = new Type("geo_bounds");
|
||||
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
|
||||
@Override
|
||||
public InternalGeoBounds readResult(StreamInput in) throws IOException {
|
||||
InternalGeoBounds result = new InternalGeoBounds();
|
||||
result.readFrom(in);
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
private double top;
|
||||
private double bottom;
|
||||
private double posLeft;
|
||||
private double posRight;
|
||||
private double negLeft;
|
||||
private double negRight;
|
||||
private boolean wrapLongitude;
|
||||
|
||||
InternalGeoBounds() {
|
||||
}
|
||||
|
||||
InternalGeoBounds(String name, double top, double bottom, double posLeft, double posRight,
|
||||
double negLeft, double negRight, boolean wrapLongitude,
|
||||
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
|
||||
@ -68,9 +53,34 @@ public class InternalGeoBounds extends InternalMetricsAggregation implements Geo
|
||||
this.wrapLongitude = wrapLongitude;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from a stream.
|
||||
*/
|
||||
public InternalGeoBounds(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
top = in.readDouble();
|
||||
bottom = in.readDouble();
|
||||
posLeft = in.readDouble();
|
||||
posRight = in.readDouble();
|
||||
negLeft = in.readDouble();
|
||||
negRight = in.readDouble();
|
||||
wrapLongitude = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type type() {
|
||||
return TYPE;
|
||||
protected void doWriteTo(StreamOutput out) throws IOException {
|
||||
out.writeDouble(top);
|
||||
out.writeDouble(bottom);
|
||||
out.writeDouble(posLeft);
|
||||
out.writeDouble(posRight);
|
||||
out.writeDouble(negLeft);
|
||||
out.writeDouble(negRight);
|
||||
out.writeBoolean(wrapLongitude);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return GeoBoundsAggregationBuilder.NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -104,7 +114,8 @@ public class InternalGeoBounds extends InternalMetricsAggregation implements Geo
|
||||
negRight = bounds.negRight;
|
||||
}
|
||||
}
|
||||
return new InternalGeoBounds(name, top, bottom, posLeft, posRight, negLeft, negRight, wrapLongitude, pipelineAggregators(), getMetaData());
|
||||
return new InternalGeoBounds(name, top, bottom, posLeft, posRight, negLeft, negRight, wrapLongitude, pipelineAggregators(),
|
||||
getMetaData());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -173,32 +184,6 @@ public class InternalGeoBounds extends InternalMetricsAggregation implements Geo
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doReadFrom(StreamInput in) throws IOException {
|
||||
top = in.readDouble();
|
||||
bottom = in.readDouble();
|
||||
posLeft = in.readDouble();
|
||||
posRight = in.readDouble();
|
||||
negLeft = in.readDouble();
|
||||
negRight = in.readDouble();
|
||||
wrapLongitude = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doWriteTo(StreamOutput out) throws IOException {
|
||||
out.writeDouble(top);
|
||||
out.writeDouble(bottom);
|
||||
out.writeDouble(posLeft);
|
||||
out.writeDouble(posRight);
|
||||
out.writeDouble(negLeft);
|
||||
out.writeDouble(negRight);
|
||||
out.writeBoolean(wrapLongitude);
|
||||
}
|
||||
|
||||
public static void registerStream() {
|
||||
AggregationStreams.registerStream(STREAM, TYPE.stream());
|
||||
}
|
||||
|
||||
private static class BoundingBox {
|
||||
private final GeoPoint topLeft;
|
||||
private final GeoPoint bottomRight;
|
||||
|
@ -44,9 +44,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@ESIntegTestCase.SuiteScopeTestCase
|
||||
public class GeoBoundsIT extends AbstractGeoTestCase {
|
||||
private static final String aggName = "geoBounds";
|
||||
|
Loading…
x
Reference in New Issue
Block a user