Migrate range, date_range, and geo_distance aggregations to NamedWriteable

This commit is contained in:
Nik Everett 2016-07-07 21:33:00 -04:00
parent 4b171b84cb
commit 89614586e9
9 changed files with 73 additions and 114 deletions

View File

@ -575,10 +575,7 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]nested[/\\]InternalReverseNested.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]nested[/\\]NestedAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]nested[/\\]ReverseNestedAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]range[/\\]InternalRange.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]range[/\\]RangeAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]range[/\\]date[/\\]InternalDateRange.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]range[/\\]ipv4[/\\]InternalIPv4Range.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]sampler[/\\]DiversifiedBytesHashSamplerAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]sampler[/\\]DiversifiedMapSamplerAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]sampler[/\\]DiversifiedNumericSamplerAggregator.java" checks="LineLength" />

View File

@ -572,14 +572,17 @@ public class SearchModule extends AbstractModule {
registerAggregation(SignificantTermsAggregationBuilder::new,
new SignificantTermsParser(significanceHeuristicParserRegistry, queryParserRegistry),
SignificantTermsAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(RangeAggregationBuilder::new, new RangeParser(), RangeAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(DateRangeAggregationBuilder::new, new DateRangeParser(), DateRangeAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(
new AggregationSpec(RangeAggregationBuilder::new, new RangeParser(), RangeAggregationBuilder.AGGREGATION_NAME_FIELD)
.addResultReader(InternalRange::new));
registerAggregation(new AggregationSpec(DateRangeAggregationBuilder::new, new DateRangeParser(),
DateRangeAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalDateRange::new));
registerAggregation(IpRangeAggregationBuilder::new, new IpRangeParser(), IpRangeAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(HistogramAggregationBuilder::new, new HistogramParser(), HistogramAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(DateHistogramAggregationBuilder::new, new DateHistogramParser(),
DateHistogramAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(GeoDistanceAggregationBuilder::new, new GeoDistanceParser(),
GeoDistanceAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(new AggregationSpec(GeoDistanceAggregationBuilder::new, new GeoDistanceParser(),
GeoDistanceAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalGeoDistance::new));
registerAggregation(GeoGridAggregationBuilder::new, new GeoHashGridParser(), GeoGridAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(NestedAggregationBuilder::new, NestedAggregationBuilder::parse,
NestedAggregationBuilder.AGGREGATION_FIELD_NAME);
@ -779,11 +782,8 @@ public class SearchModule extends AbstractModule {
SignificantLongTerms.registerStreams();
UnmappedSignificantTerms.registerStreams();
InternalGeoHashGrid.registerStreams();
InternalRange.registerStream();
InternalDateRange.registerStream();
InternalBinaryRange.registerStream();
InternalHistogram.registerStream();
InternalGeoDistance.registerStream();
InternalNested.registerStream();
InternalReverseNested.registerStream();
InternalTopHits.registerStreams();

View File

@ -22,7 +22,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -41,24 +40,8 @@ import java.util.Map;
*/
public class InternalRange<B extends InternalRange.Bucket, R extends InternalRange<B, R>> extends InternalMultiBucketAggregation<R, B>
implements Range {
static final Factory FACTORY = new Factory();
public static final Type TYPE = new Type("range");
private static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalRange readResult(StreamInput in) throws IOException {
InternalRange ranges = new InternalRange();
ranges.readFrom(in);
return ranges;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
public static class Bucket extends InternalMultiBucketAggregation.InternalBucket implements Range.Bucket {
protected final transient boolean keyed;
@ -201,9 +184,8 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan
}
public static class Factory<B extends Bucket, R extends InternalRange<B, R>> {
public Type type() {
return TYPE;
return RangeAggregationBuilder.TYPE;
}
public ValuesSourceType getValueSourceType() {
@ -243,8 +225,6 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan
protected DocValueFormat format;
protected boolean keyed;
public InternalRange() {} // for serialization
public InternalRange(String name, List<B> ranges, DocValueFormat format, boolean keyed,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
@ -254,9 +234,40 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan
this.keyed = keyed;
}
/**
* Read from a stream.
*/
public InternalRange(StreamInput in) throws IOException {
super(in);
format = in.readNamedWriteable(DocValueFormat.class);
keyed = in.readBoolean();
int size = in.readVInt();
List<B> ranges = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
String key = in.readOptionalString();
ranges.add(getFactory().createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(),
InternalAggregations.readAggregations(in), keyed, format));
}
this.ranges = ranges;
}
@Override
public Type type() {
return TYPE;
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeBoolean(keyed);
out.writeVInt(ranges.size());
for (B bucket : ranges) {
out.writeOptionalString(((Bucket) bucket).key);
out.writeDouble(((Bucket) bucket).from);
out.writeDouble(((Bucket) bucket).to);
out.writeVLong(((Bucket) bucket).docCount);
bucket.aggregations.writeTo(out);
}
}
@Override
public String getWriteableName() {
return RangeAggregationBuilder.NAME;
}
@Override
@ -301,33 +312,6 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan
return getFactory().create(name, ranges, format, keyed, pipelineAggregators(), getMetaData());
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
format = in.readNamedWriteable(DocValueFormat.class);
keyed = in.readBoolean();
int size = in.readVInt();
List<B> ranges = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
String key = in.readOptionalString();
ranges.add(getFactory().createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(), InternalAggregations.readAggregations(in), keyed, format));
}
this.ranges = ranges;
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeBoolean(keyed);
out.writeVInt(ranges.size());
for (B bucket : ranges) {
out.writeOptionalString(((Bucket) bucket).key);
out.writeDouble(((Bucket) bucket).from);
out.writeDouble(((Bucket) bucket).to);
out.writeVLong(((Bucket) bucket).docCount);
bucket.aggregations.writeTo(out);
}
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
if (keyed) {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregator.Range;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
@ -31,7 +32,8 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import java.io.IOException;
public class RangeAggregationBuilder extends AbstractRangeBuilder<RangeAggregationBuilder, Range> {
public static final String NAME = InternalRange.TYPE.name();
public static final String NAME = "range";
static final Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
public RangeAggregationBuilder(String name) {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.bucket.range.AbstractRangeBuilder;
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregator;
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregator.Range;
@ -34,7 +35,8 @@ import org.joda.time.DateTime;
import java.io.IOException;
public class DateRangeAggregationBuilder extends AbstractRangeBuilder<DateRangeAggregationBuilder, RangeAggregator.Range> {
public static final String NAME = InternalDateRange.TYPE.name();
public static final String NAME = "date_range";
static final Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
public DateRangeAggregationBuilder(String name) {

View File

@ -20,7 +20,6 @@ package org.elasticsearch.search.aggregations.bucket.range.date;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.range.InternalRange;
@ -37,22 +36,6 @@ import java.util.Map;
*
*/
public class InternalDateRange extends InternalRange<InternalDateRange.Bucket, InternalDateRange> {
public static final Type TYPE = new Type("date_range", "drange");
private static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalDateRange readResult(StreamInput in) throws IOException {
InternalDateRange ranges = new InternalDateRange();
ranges.readFrom(in);
return ranges;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
public static final Factory FACTORY = new Factory();
public static class Bucket extends InternalRange.Bucket {
@ -61,11 +44,13 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket, I
super(keyed, formatter);
}
public Bucket(String key, double from, double to, long docCount, List<InternalAggregation> aggregations, boolean keyed, DocValueFormat formatter) {
public Bucket(String key, double from, double to, long docCount, List<InternalAggregation> aggregations, boolean keyed,
DocValueFormat formatter) {
super(key, from, to, docCount, new InternalAggregations(aggregations), keyed, formatter);
}
public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, DocValueFormat formatter) {
public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed,
DocValueFormat formatter) {
super(key, from, to, docCount, aggregations, keyed, formatter);
}
@ -94,10 +79,9 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket, I
}
public static class Factory extends InternalRange.Factory<InternalDateRange.Bucket, InternalDateRange> {
@Override
public Type type() {
return TYPE;
return DateRangeAggregationBuilder.TYPE;
}
@Override
@ -118,7 +102,8 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket, I
}
@Override
public Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, DocValueFormat formatter) {
public Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed,
DocValueFormat formatter) {
return new Bucket(key, from, to, docCount, aggregations, keyed, formatter);
}
@ -129,16 +114,21 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket, I
}
}
InternalDateRange() {} // for serialization
InternalDateRange(String name, List<InternalDateRange.Bucket> ranges, DocValueFormat formatter, boolean keyed,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, ranges, formatter, keyed, pipelineAggregators, metaData);
}
/**
* Read from a stream.
*/
public InternalDateRange(StreamInput in) throws IOException {
super(in);
}
@Override
public Type type() {
return TYPE;
public String getWriteableName() {
return DateRangeAggregationBuilder.NAME;
}
@Override

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.range.InternalRange;
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregator;
@ -43,7 +44,8 @@ import java.util.List;
import java.util.Objects;
public class GeoDistanceAggregationBuilder extends ValuesSourceAggregationBuilder<ValuesSource.GeoPoint, GeoDistanceAggregationBuilder> {
public static final String NAME = InternalGeoDistance.TYPE.name();
public static final String NAME = "geo_distance";
public static final Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private final GeoPoint origin;

View File

@ -47,7 +47,7 @@ public class GeoDistanceParser extends GeoPointValuesSourceParser {
static final ParseField UNIT_FIELD = new ParseField("unit");
static final ParseField DISTANCE_TYPE_FIELD = new ParseField("distance_type");
private GeoPointParser geoPointParser = new GeoPointParser(InternalGeoDistance.TYPE, ORIGIN_FIELD);
private GeoPointParser geoPointParser = new GeoPointParser(GeoDistanceAggregationBuilder.TYPE, ORIGIN_FIELD);
public GeoDistanceParser() {
super(true, false);

View File

@ -20,7 +20,6 @@ package org.elasticsearch.search.aggregations.bucket.range.geodistance;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.range.InternalRange;
@ -36,22 +35,6 @@ import java.util.Map;
*
*/
public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucket, InternalGeoDistance> {
public static final Type TYPE = new Type("geo_distance", "gdist");
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalGeoDistance readResult(StreamInput in) throws IOException {
InternalGeoDistance geoDistance = new InternalGeoDistance();
geoDistance.readFrom(in);
return geoDistance;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
public static final Factory FACTORY = new Factory();
static class Bucket extends InternalRange.Bucket {
@ -79,10 +62,9 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
}
public static class Factory extends InternalRange.Factory<InternalGeoDistance.Bucket, InternalGeoDistance> {
@Override
public Type type() {
return TYPE;
return GeoDistanceAggregationBuilder.TYPE;
}
@Override
@ -120,17 +102,17 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
}
}
InternalGeoDistance() {} // for serialization
public InternalGeoDistance(String name, List<Bucket> ranges, boolean keyed,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
super(name, ranges, DocValueFormat.RAW, keyed, pipelineAggregators, metaData);
}
@Override
public Type type() {
return TYPE;
/**
* Read from a stream.
*/
public InternalGeoDistance(StreamInput in) throws IOException {
super(in);
}
@Override