Migrate remaining calc aggs to NamedWriteable
Once all of these are migrated we'll be able to remove aggregation's custom "streams" which function that same as NamedWriteable. It also allows us to make most of the fields on aggregations final which is rather nice.
This commit is contained in:
parent
3343ceeae4
commit
c02de9227c
|
@ -626,7 +626,6 @@
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]percentiles[/\\]tdigest[/\\]AbstractTDigestPercentilesAggregator.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]percentiles[/\\]tdigest[/\\]AbstractTDigestPercentilesAggregator.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]percentiles[/\\]tdigest[/\\]TDigestPercentileRanksAggregator.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]percentiles[/\\]tdigest[/\\]TDigestPercentileRanksAggregator.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]percentiles[/\\]tdigest[/\\]TDigestPercentilesAggregator.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]percentiles[/\\]tdigest[/\\]TDigestPercentilesAggregator.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]scripted[/\\]InternalScriptedMetric.java" checks="LineLength" />
|
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]scripted[/\\]ScriptedMetricAggregator.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]scripted[/\\]ScriptedMetricAggregator.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]stats[/\\]extended[/\\]ExtendedStatsAggregator.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]stats[/\\]extended[/\\]ExtendedStatsAggregator.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]tophits[/\\]TopHitsAggregator.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]tophits[/\\]TopHitsAggregator.java" checks="LineLength" />
|
||||||
|
|
|
@ -429,7 +429,7 @@ public class SearchModule extends AbstractModule {
|
||||||
namedWriteableRegistry.register(AggregationBuilder.class, spec.aggregationName.getPreferredName(), spec.builderReader);
|
namedWriteableRegistry.register(AggregationBuilder.class, spec.aggregationName.getPreferredName(), spec.builderReader);
|
||||||
aggregationParserRegistry.register(spec.aggregationParser, spec.aggregationName);
|
aggregationParserRegistry.register(spec.aggregationParser, spec.aggregationName);
|
||||||
}
|
}
|
||||||
for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> t : spec.internalReaders.entrySet()) {
|
for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> t : spec.resultReaders.entrySet()) {
|
||||||
String writeableName = t.getKey();
|
String writeableName = t.getKey();
|
||||||
Writeable.Reader<? extends InternalAggregation> internalReader = t.getValue();
|
Writeable.Reader<? extends InternalAggregation> internalReader = t.getValue();
|
||||||
namedWriteableRegistry.register(InternalAggregation.class, writeableName, internalReader);
|
namedWriteableRegistry.register(InternalAggregation.class, writeableName, internalReader);
|
||||||
|
@ -437,7 +437,7 @@ public class SearchModule extends AbstractModule {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class AggregationSpec {
|
public static class AggregationSpec {
|
||||||
private final Map<String, Writeable.Reader<? extends InternalAggregation>> internalReaders = new TreeMap<>();
|
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
|
||||||
private final Writeable.Reader<? extends AggregationBuilder> builderReader;
|
private final Writeable.Reader<? extends AggregationBuilder> builderReader;
|
||||||
private final Aggregator.Parser aggregationParser;
|
private final Aggregator.Parser aggregationParser;
|
||||||
private final ParseField aggregationName;
|
private final ParseField aggregationName;
|
||||||
|
@ -468,7 +468,7 @@ public class SearchModule extends AbstractModule {
|
||||||
* Add a reader for the shard level results of the aggregation.
|
* Add a reader for the shard level results of the aggregation.
|
||||||
*/
|
*/
|
||||||
public AggregationSpec addResultReader(String writeableName, Writeable.Reader<? extends InternalAggregation> resultReader) {
|
public AggregationSpec addResultReader(String writeableName, Writeable.Reader<? extends InternalAggregation> resultReader) {
|
||||||
internalReaders.put(writeableName, resultReader);
|
resultReaders.put(writeableName, resultReader);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -547,8 +547,8 @@ public class SearchModule extends AbstractModule {
|
||||||
PercentileRanksAggregationBuilder.AGGREGATION_NAME_FIELD)
|
PercentileRanksAggregationBuilder.AGGREGATION_NAME_FIELD)
|
||||||
.addResultReader(InternalTDigestPercentileRanks.NAME, InternalTDigestPercentileRanks::new)
|
.addResultReader(InternalTDigestPercentileRanks.NAME, InternalTDigestPercentileRanks::new)
|
||||||
.addResultReader(InternalHDRPercentileRanks.NAME, InternalHDRPercentileRanks::new));
|
.addResultReader(InternalHDRPercentileRanks.NAME, InternalHDRPercentileRanks::new));
|
||||||
registerAggregation(CardinalityAggregationBuilder::new, new CardinalityParser(),
|
registerAggregation(new AggregationSpec(CardinalityAggregationBuilder::new, new CardinalityParser(),
|
||||||
CardinalityAggregationBuilder.AGGREGATION_NAME_FIELD);
|
CardinalityAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalCardinality::new));
|
||||||
registerAggregation(GlobalAggregationBuilder::new, GlobalAggregationBuilder::parse,
|
registerAggregation(GlobalAggregationBuilder::new, GlobalAggregationBuilder::parse,
|
||||||
GlobalAggregationBuilder.AGGREGATION_NAME_FIELD);
|
GlobalAggregationBuilder.AGGREGATION_NAME_FIELD);
|
||||||
registerAggregation(MissingAggregationBuilder::new, new MissingParser(), MissingAggregationBuilder.AGGREGATION_NAME_FIELD);
|
registerAggregation(MissingAggregationBuilder::new, new MissingParser(), MissingAggregationBuilder.AGGREGATION_NAME_FIELD);
|
||||||
|
@ -580,10 +580,10 @@ public class SearchModule extends AbstractModule {
|
||||||
registerAggregation(TopHitsAggregationBuilder::new, TopHitsAggregationBuilder::parse,
|
registerAggregation(TopHitsAggregationBuilder::new, TopHitsAggregationBuilder::parse,
|
||||||
TopHitsAggregationBuilder.AGGREGATION_NAME_FIELD);
|
TopHitsAggregationBuilder.AGGREGATION_NAME_FIELD);
|
||||||
registerAggregation(GeoBoundsAggregationBuilder::new, new GeoBoundsParser(), GeoBoundsAggregationBuilder.AGGREGATION_NAME_FIED);
|
registerAggregation(GeoBoundsAggregationBuilder::new, new GeoBoundsParser(), GeoBoundsAggregationBuilder.AGGREGATION_NAME_FIED);
|
||||||
registerAggregation(GeoCentroidAggregationBuilder::new, new GeoCentroidParser(),
|
registerAggregation(new AggregationSpec(GeoCentroidAggregationBuilder::new, new GeoCentroidParser(),
|
||||||
GeoCentroidAggregationBuilder.AGGREGATION_NAME_FIELD);
|
GeoCentroidAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalGeoCentroid::new));
|
||||||
registerAggregation(ScriptedMetricAggregationBuilder::new, ScriptedMetricAggregationBuilder::parse,
|
registerAggregation(new AggregationSpec(ScriptedMetricAggregationBuilder::new, ScriptedMetricAggregationBuilder::parse,
|
||||||
ScriptedMetricAggregationBuilder.AGGREGATION_NAME_FIELD);
|
ScriptedMetricAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalScriptedMetric::new));
|
||||||
registerAggregation(ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse,
|
registerAggregation(ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse,
|
||||||
ChildrenAggregationBuilder.AGGREGATION_NAME_FIELD);
|
ChildrenAggregationBuilder.AGGREGATION_NAME_FIELD);
|
||||||
|
|
||||||
|
@ -766,11 +766,6 @@ public class SearchModule extends AbstractModule {
|
||||||
}
|
}
|
||||||
|
|
||||||
static {
|
static {
|
||||||
// calcs
|
|
||||||
InternalCardinality.registerStreams();
|
|
||||||
InternalScriptedMetric.registerStreams();
|
|
||||||
InternalGeoCentroid.registerStreams();
|
|
||||||
|
|
||||||
// buckets
|
// buckets
|
||||||
InternalGlobal.registerStreams();
|
InternalGlobal.registerStreams();
|
||||||
InternalFilter.registerStreams();
|
InternalFilter.registerStreams();
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
|
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
|
||||||
|
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
|
||||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||||
|
@ -38,7 +39,8 @@ import java.util.Objects;
|
||||||
public final class CardinalityAggregationBuilder
|
public final class CardinalityAggregationBuilder
|
||||||
extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource, CardinalityAggregationBuilder> {
|
extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource, CardinalityAggregationBuilder> {
|
||||||
|
|
||||||
public static final String NAME = InternalCardinality.TYPE.name();
|
public static final String NAME = "cardinality";
|
||||||
|
private static final Type TYPE = new Type(NAME);
|
||||||
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
|
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
|
||||||
|
|
||||||
public static final ParseField PRECISION_THRESHOLD_FIELD = new ParseField("precision_threshold");
|
public static final ParseField PRECISION_THRESHOLD_FIELD = new ParseField("precision_threshold");
|
||||||
|
@ -46,14 +48,14 @@ public final class CardinalityAggregationBuilder
|
||||||
private Long precisionThreshold = null;
|
private Long precisionThreshold = null;
|
||||||
|
|
||||||
public CardinalityAggregationBuilder(String name, ValueType targetValueType) {
|
public CardinalityAggregationBuilder(String name, ValueType targetValueType) {
|
||||||
super(name, InternalCardinality.TYPE, ValuesSourceType.ANY, targetValueType);
|
super(name, TYPE, ValuesSourceType.ANY, targetValueType);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read from a stream.
|
* Read from a stream.
|
||||||
*/
|
*/
|
||||||
public CardinalityAggregationBuilder(StreamInput in) throws IOException {
|
public CardinalityAggregationBuilder(StreamInput in) throws IOException {
|
||||||
super(in, InternalCardinality.TYPE, ValuesSourceType.ANY);
|
super(in, TYPE, ValuesSourceType.ANY);
|
||||||
if (in.readBoolean()) {
|
if (in.readBoolean()) {
|
||||||
precisionThreshold = in.readLong();
|
precisionThreshold = in.readLong();
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,23 +34,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public final class InternalCardinality extends InternalNumericMetricsAggregation.SingleValue implements Cardinality {
|
public final class InternalCardinality extends InternalNumericMetricsAggregation.SingleValue implements Cardinality {
|
||||||
|
private final HyperLogLogPlusPlus counts;
|
||||||
public static final Type TYPE = new Type("cardinality");
|
|
||||||
|
|
||||||
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
|
|
||||||
@Override
|
|
||||||
public InternalCardinality readResult(StreamInput in) throws IOException {
|
|
||||||
InternalCardinality result = new InternalCardinality();
|
|
||||||
result.readFrom(in);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
public static void registerStreams() {
|
|
||||||
AggregationStreams.registerStream(STREAM, TYPE.stream());
|
|
||||||
}
|
|
||||||
|
|
||||||
private HyperLogLogPlusPlus counts;
|
|
||||||
|
|
||||||
InternalCardinality(String name, HyperLogLogPlusPlus counts, List<PipelineAggregator> pipelineAggregators,
|
InternalCardinality(String name, HyperLogLogPlusPlus counts, List<PipelineAggregator> pipelineAggregators,
|
||||||
Map<String, Object> metaData) {
|
Map<String, Object> metaData) {
|
||||||
|
@ -58,26 +42,11 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation
|
||||||
this.counts = counts;
|
this.counts = counts;
|
||||||
}
|
}
|
||||||
|
|
||||||
private InternalCardinality() {
|
/**
|
||||||
}
|
* Read from a stream.
|
||||||
|
*/
|
||||||
@Override
|
public InternalCardinality(StreamInput in) throws IOException {
|
||||||
public double value() {
|
super(in);
|
||||||
return getValue();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getValue() {
|
|
||||||
return counts == null ? 0 : counts.cardinality(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Type type() {
|
|
||||||
return TYPE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doReadFrom(StreamInput in) throws IOException {
|
|
||||||
format = in.readNamedWriteable(DocValueFormat.class);
|
format = in.readNamedWriteable(DocValueFormat.class);
|
||||||
if (in.readBoolean()) {
|
if (in.readBoolean()) {
|
||||||
counts = HyperLogLogPlusPlus.readFrom(in, BigArrays.NON_RECYCLING_INSTANCE);
|
counts = HyperLogLogPlusPlus.readFrom(in, BigArrays.NON_RECYCLING_INSTANCE);
|
||||||
|
@ -97,6 +66,21 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getWriteableName() {
|
||||||
|
return CardinalityAggregationBuilder.NAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public double value() {
|
||||||
|
return getValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getValue() {
|
||||||
|
return counts == null ? 0 : counts.cardinality(0);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
|
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
|
||||||
InternalCardinality reduced = null;
|
InternalCardinality reduced = null;
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
|
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
|
||||||
|
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
|
||||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||||
|
@ -36,18 +37,19 @@ import java.io.IOException;
|
||||||
|
|
||||||
public class GeoCentroidAggregationBuilder
|
public class GeoCentroidAggregationBuilder
|
||||||
extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.GeoPoint, GeoCentroidAggregationBuilder> {
|
extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.GeoPoint, GeoCentroidAggregationBuilder> {
|
||||||
public static final String NAME = InternalGeoCentroid.TYPE.name();
|
public static final String NAME = "geo_centroid";
|
||||||
|
public static final Type TYPE = new Type(NAME);
|
||||||
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
|
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
|
||||||
|
|
||||||
public GeoCentroidAggregationBuilder(String name) {
|
public GeoCentroidAggregationBuilder(String name) {
|
||||||
super(name, InternalGeoCentroid.TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
|
super(name, TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read from a stream.
|
* Read from a stream.
|
||||||
*/
|
*/
|
||||||
public GeoCentroidAggregationBuilder(StreamInput in) throws IOException {
|
public GeoCentroidAggregationBuilder(StreamInput in) throws IOException {
|
||||||
super(in, InternalGeoCentroid.TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
|
super(in, TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -24,7 +24,6 @@ import org.elasticsearch.common.geo.GeoPoint;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
|
||||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||||
import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
|
import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
|
||||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||||
|
@ -34,29 +33,11 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Serialization and merge logic for {@link org.elasticsearch.search.aggregations.metrics.geocentroid.GeoCentroidAggregator}
|
* Serialization and merge logic for {@link GeoCentroidAggregator}.
|
||||||
*/
|
*/
|
||||||
public class InternalGeoCentroid extends InternalMetricsAggregation implements GeoCentroid {
|
public class InternalGeoCentroid extends InternalMetricsAggregation implements GeoCentroid {
|
||||||
|
protected final GeoPoint centroid;
|
||||||
public static final Type TYPE = new Type("geo_centroid");
|
protected final long count;
|
||||||
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
|
|
||||||
@Override
|
|
||||||
public InternalGeoCentroid readResult(StreamInput in) throws IOException {
|
|
||||||
InternalGeoCentroid result = new InternalGeoCentroid();
|
|
||||||
result.readFrom(in);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
public static void registerStreams() {
|
|
||||||
AggregationStreams.registerStream(STREAM, TYPE.stream());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected GeoPoint centroid;
|
|
||||||
protected long count;
|
|
||||||
|
|
||||||
protected InternalGeoCentroid() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public InternalGeoCentroid(String name, GeoPoint centroid, long count, List<PipelineAggregator>
|
public InternalGeoCentroid(String name, GeoPoint centroid, long count, List<PipelineAggregator>
|
||||||
pipelineAggregators, Map<String, Object> metaData) {
|
pipelineAggregators, Map<String, Object> metaData) {
|
||||||
|
@ -67,6 +48,37 @@ public class InternalGeoCentroid extends InternalMetricsAggregation implements G
|
||||||
this.count = count;
|
this.count = count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read from a stream.
|
||||||
|
*/
|
||||||
|
public InternalGeoCentroid(StreamInput in) throws IOException {
|
||||||
|
super(in);
|
||||||
|
count = in.readVLong();
|
||||||
|
if (in.readBoolean()) {
|
||||||
|
final long hash = in.readLong();
|
||||||
|
centroid = new GeoPoint(GeoPointField.decodeLatitude(hash), GeoPointField.decodeLongitude(hash));
|
||||||
|
} else {
|
||||||
|
centroid = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doWriteTo(StreamOutput out) throws IOException {
|
||||||
|
out.writeVLong(count);
|
||||||
|
if (centroid != null) {
|
||||||
|
out.writeBoolean(true);
|
||||||
|
// should we just write lat and lon separately?
|
||||||
|
out.writeLong(GeoPointField.encodeLatLon(centroid.lat(), centroid.lon()));
|
||||||
|
} else {
|
||||||
|
out.writeBoolean(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getWriteableName() {
|
||||||
|
return GeoCentroidAggregationBuilder.NAME;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GeoPoint centroid() {
|
public GeoPoint centroid() {
|
||||||
return centroid;
|
return centroid;
|
||||||
|
@ -77,11 +89,6 @@ public class InternalGeoCentroid extends InternalMetricsAggregation implements G
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Type type() {
|
|
||||||
return TYPE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalGeoCentroid doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
|
public InternalGeoCentroid doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
|
||||||
double lonSum = Double.NaN;
|
double lonSum = Double.NaN;
|
||||||
|
@ -125,29 +132,6 @@ public class InternalGeoCentroid extends InternalMetricsAggregation implements G
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doReadFrom(StreamInput in) throws IOException {
|
|
||||||
count = in.readVLong();
|
|
||||||
if (in.readBoolean()) {
|
|
||||||
final long hash = in.readLong();
|
|
||||||
centroid = new GeoPoint(GeoPointField.decodeLatitude(hash), GeoPointField.decodeLongitude(hash));
|
|
||||||
} else {
|
|
||||||
centroid = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doWriteTo(StreamOutput out) throws IOException {
|
|
||||||
out.writeVLong(count);
|
|
||||||
if (centroid != null) {
|
|
||||||
out.writeBoolean(true);
|
|
||||||
// should we just write lat and lon separately?
|
|
||||||
out.writeLong(GeoPointField.encodeLatLon(centroid.lat(), centroid.lon()));
|
|
||||||
} else {
|
|
||||||
out.writeBoolean(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static class Fields {
|
static class Fields {
|
||||||
public static final String CENTROID = "location";
|
public static final String CENTROID = "location";
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.elasticsearch.script.CompiledScript;
|
||||||
import org.elasticsearch.script.ExecutableScript;
|
import org.elasticsearch.script.ExecutableScript;
|
||||||
import org.elasticsearch.script.Script;
|
import org.elasticsearch.script.Script;
|
||||||
import org.elasticsearch.script.ScriptContext;
|
import org.elasticsearch.script.ScriptContext;
|
||||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
|
||||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||||
import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
|
import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
|
||||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||||
|
@ -39,38 +38,36 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class InternalScriptedMetric extends InternalMetricsAggregation implements ScriptedMetric {
|
public class InternalScriptedMetric extends InternalMetricsAggregation implements ScriptedMetric {
|
||||||
|
private final Script reduceScript;
|
||||||
|
private final Object aggregation;
|
||||||
|
|
||||||
public static final Type TYPE = new Type("scripted_metric");
|
public InternalScriptedMetric(String name, Object aggregation, Script reduceScript, List<PipelineAggregator> pipelineAggregators,
|
||||||
|
Map<String, Object> metaData) {
|
||||||
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
|
|
||||||
@Override
|
|
||||||
public InternalScriptedMetric readResult(StreamInput in) throws IOException {
|
|
||||||
InternalScriptedMetric result = new InternalScriptedMetric();
|
|
||||||
result.readFrom(in);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
public static void registerStreams() {
|
|
||||||
AggregationStreams.registerStream(STREAM, TYPE.stream());
|
|
||||||
}
|
|
||||||
|
|
||||||
private Script reduceScript;
|
|
||||||
private Object aggregation;
|
|
||||||
|
|
||||||
private InternalScriptedMetric() {
|
|
||||||
}
|
|
||||||
|
|
||||||
private InternalScriptedMetric(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
|
|
||||||
super(name, pipelineAggregators, metaData);
|
super(name, pipelineAggregators, metaData);
|
||||||
}
|
|
||||||
|
|
||||||
public InternalScriptedMetric(String name, Object aggregation, Script reduceScript, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
|
|
||||||
this(name, pipelineAggregators, metaData);
|
|
||||||
this.aggregation = aggregation;
|
this.aggregation = aggregation;
|
||||||
this.reduceScript = reduceScript;
|
this.reduceScript = reduceScript;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read from a stream.
|
||||||
|
*/
|
||||||
|
public InternalScriptedMetric(StreamInput in) throws IOException {
|
||||||
|
super(in);
|
||||||
|
reduceScript = in.readOptionalWriteable(Script::new);
|
||||||
|
aggregation = in.readGenericValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doWriteTo(StreamOutput out) throws IOException {
|
||||||
|
out.writeOptionalWriteable(reduceScript);
|
||||||
|
out.writeGenericValue(aggregation);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getWriteableName() {
|
||||||
|
return ScriptedMetricAggregationBuilder.NAME;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object aggregation() {
|
public Object aggregation() {
|
||||||
return aggregation;
|
return aggregation;
|
||||||
|
@ -98,15 +95,11 @@ public class InternalScriptedMetric extends InternalMetricsAggregation implement
|
||||||
} else {
|
} else {
|
||||||
aggregation = aggregationObjects;
|
aggregation = aggregationObjects;
|
||||||
}
|
}
|
||||||
return new InternalScriptedMetric(firstAggregation.getName(), aggregation, firstAggregation.reduceScript, pipelineAggregators(), getMetaData());
|
return new InternalScriptedMetric(firstAggregation.getName(), aggregation, firstAggregation.reduceScript, pipelineAggregators(),
|
||||||
|
getMetaData());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Type type() {
|
|
||||||
return TYPE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object getProperty(List<String> path) {
|
public Object getProperty(List<String> path) {
|
||||||
if (path.isEmpty()) {
|
if (path.isEmpty()) {
|
||||||
|
@ -118,24 +111,6 @@ public class InternalScriptedMetric extends InternalMetricsAggregation implement
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doReadFrom(StreamInput in) throws IOException {
|
|
||||||
if (in.readBoolean()) {
|
|
||||||
reduceScript = new Script(in);
|
|
||||||
}
|
|
||||||
aggregation = in.readGenericValue();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doWriteTo(StreamOutput out) throws IOException {
|
|
||||||
boolean hasScript = reduceScript != null;
|
|
||||||
out.writeBoolean(hasScript);
|
|
||||||
if (hasScript) {
|
|
||||||
reduceScript.writeTo(out);
|
|
||||||
}
|
|
||||||
out.writeGenericValue(aggregation);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||||
return builder.field("value", aggregation);
|
return builder.field("value", aggregation);
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.script.ScriptParameterParser;
|
||||||
import org.elasticsearch.script.ScriptParameterParser.ScriptParameterValue;
|
import org.elasticsearch.script.ScriptParameterParser.ScriptParameterValue;
|
||||||
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
|
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
|
||||||
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
|
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
|
||||||
|
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
|
||||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||||
|
|
||||||
|
@ -42,7 +43,8 @@ import java.util.Set;
|
||||||
|
|
||||||
public class ScriptedMetricAggregationBuilder extends AbstractAggregationBuilder<ScriptedMetricAggregationBuilder> {
|
public class ScriptedMetricAggregationBuilder extends AbstractAggregationBuilder<ScriptedMetricAggregationBuilder> {
|
||||||
|
|
||||||
public static final String NAME = InternalScriptedMetric.TYPE.name();
|
public static final String NAME = "scripted_metric";
|
||||||
|
private static final Type TYPE = new Type(NAME);
|
||||||
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
|
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
|
||||||
|
|
||||||
private static final ParseField INIT_SCRIPT_FIELD = new ParseField("init_script");
|
private static final ParseField INIT_SCRIPT_FIELD = new ParseField("init_script");
|
||||||
|
@ -59,14 +61,14 @@ public class ScriptedMetricAggregationBuilder extends AbstractAggregationBuilder
|
||||||
private Map<String, Object> params;
|
private Map<String, Object> params;
|
||||||
|
|
||||||
public ScriptedMetricAggregationBuilder(String name) {
|
public ScriptedMetricAggregationBuilder(String name) {
|
||||||
super(name, InternalScriptedMetric.TYPE);
|
super(name, TYPE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read from a stream.
|
* Read from a stream.
|
||||||
*/
|
*/
|
||||||
public ScriptedMetricAggregationBuilder(StreamInput in) throws IOException {
|
public ScriptedMetricAggregationBuilder(StreamInput in) throws IOException {
|
||||||
super(in, InternalScriptedMetric.TYPE);
|
super(in, TYPE);
|
||||||
initScript = in.readOptionalWriteable(Script::new);
|
initScript = in.readOptionalWriteable(Script::new);
|
||||||
mapScript = in.readOptionalWriteable(Script::new);
|
mapScript = in.readOptionalWriteable(Script::new);
|
||||||
combineScript = in.readOptionalWriteable(Script::new);
|
combineScript = in.readOptionalWriteable(Script::new);
|
||||||
|
|
Loading…
Reference in New Issue