diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 2a361980a8e..6f65428c2bd 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -626,7 +626,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index ea3348e87fa..764a87589cc 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -429,7 +429,7 @@ public class SearchModule extends AbstractModule { namedWriteableRegistry.register(AggregationBuilder.class, spec.aggregationName.getPreferredName(), spec.builderReader); aggregationParserRegistry.register(spec.aggregationParser, spec.aggregationName); } - for (Map.Entry> t : spec.internalReaders.entrySet()) { + for (Map.Entry> t : spec.resultReaders.entrySet()) { String writeableName = t.getKey(); Writeable.Reader internalReader = t.getValue(); namedWriteableRegistry.register(InternalAggregation.class, writeableName, internalReader); @@ -437,7 +437,7 @@ public class SearchModule extends AbstractModule { } public static class AggregationSpec { - private final Map> internalReaders = new TreeMap<>(); + private final Map> resultReaders = new TreeMap<>(); private final Writeable.Reader builderReader; private final Aggregator.Parser aggregationParser; private final ParseField aggregationName; @@ -468,7 +468,7 @@ public class SearchModule extends AbstractModule { * Add a reader for the shard level results of the aggregation. */ public AggregationSpec addResultReader(String writeableName, Writeable.Reader resultReader) { - internalReaders.put(writeableName, resultReader); + resultReaders.put(writeableName, resultReader); return this; } } @@ -547,8 +547,8 @@ public class SearchModule extends AbstractModule { PercentileRanksAggregationBuilder.AGGREGATION_NAME_FIELD) .addResultReader(InternalTDigestPercentileRanks.NAME, InternalTDigestPercentileRanks::new) .addResultReader(InternalHDRPercentileRanks.NAME, InternalHDRPercentileRanks::new)); - registerAggregation(CardinalityAggregationBuilder::new, new CardinalityParser(), - CardinalityAggregationBuilder.AGGREGATION_NAME_FIELD); + registerAggregation(new AggregationSpec(CardinalityAggregationBuilder::new, new CardinalityParser(), + CardinalityAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalCardinality::new)); registerAggregation(GlobalAggregationBuilder::new, GlobalAggregationBuilder::parse, GlobalAggregationBuilder.AGGREGATION_NAME_FIELD); registerAggregation(MissingAggregationBuilder::new, new MissingParser(), MissingAggregationBuilder.AGGREGATION_NAME_FIELD); @@ -580,10 +580,10 @@ public class SearchModule extends AbstractModule { registerAggregation(TopHitsAggregationBuilder::new, TopHitsAggregationBuilder::parse, TopHitsAggregationBuilder.AGGREGATION_NAME_FIELD); registerAggregation(GeoBoundsAggregationBuilder::new, new GeoBoundsParser(), GeoBoundsAggregationBuilder.AGGREGATION_NAME_FIED); - registerAggregation(GeoCentroidAggregationBuilder::new, new GeoCentroidParser(), - GeoCentroidAggregationBuilder.AGGREGATION_NAME_FIELD); - registerAggregation(ScriptedMetricAggregationBuilder::new, ScriptedMetricAggregationBuilder::parse, - ScriptedMetricAggregationBuilder.AGGREGATION_NAME_FIELD); + registerAggregation(new AggregationSpec(GeoCentroidAggregationBuilder::new, new GeoCentroidParser(), + GeoCentroidAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalGeoCentroid::new)); + registerAggregation(new AggregationSpec(ScriptedMetricAggregationBuilder::new, ScriptedMetricAggregationBuilder::parse, + ScriptedMetricAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalScriptedMetric::new)); registerAggregation(ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse, ChildrenAggregationBuilder.AGGREGATION_NAME_FIELD); @@ -766,11 +766,6 @@ public class SearchModule extends AbstractModule { } static { - // calcs - InternalCardinality.registerStreams(); - InternalScriptedMetric.registerStreams(); - InternalGeoCentroid.registerStreams(); - // buckets InternalGlobal.registerStreams(); InternalFilter.registerStreams(); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/CardinalityAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/CardinalityAggregationBuilder.java index a7850c23475..2d573c48e8e 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/CardinalityAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/CardinalityAggregationBuilder.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.ValueType; @@ -38,7 +39,8 @@ import java.util.Objects; public final class CardinalityAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly { - public static final String NAME = InternalCardinality.TYPE.name(); + public static final String NAME = "cardinality"; + private static final Type TYPE = new Type(NAME); public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); public static final ParseField PRECISION_THRESHOLD_FIELD = new ParseField("precision_threshold"); @@ -46,14 +48,14 @@ public final class CardinalityAggregationBuilder private Long precisionThreshold = null; public CardinalityAggregationBuilder(String name, ValueType targetValueType) { - super(name, InternalCardinality.TYPE, ValuesSourceType.ANY, targetValueType); + super(name, TYPE, ValuesSourceType.ANY, targetValueType); } /** * Read from a stream. */ public CardinalityAggregationBuilder(StreamInput in) throws IOException { - super(in, InternalCardinality.TYPE, ValuesSourceType.ANY); + super(in, TYPE, ValuesSourceType.ANY); if (in.readBoolean()) { precisionThreshold = in.readLong(); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java index 02fe800adf3..7bfd54eb640 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java @@ -34,23 +34,7 @@ import java.util.List; import java.util.Map; public final class InternalCardinality extends InternalNumericMetricsAggregation.SingleValue implements Cardinality { - - public static final Type TYPE = new Type("cardinality"); - - public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { - @Override - public InternalCardinality readResult(StreamInput in) throws IOException { - InternalCardinality result = new InternalCardinality(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - AggregationStreams.registerStream(STREAM, TYPE.stream()); - } - - private HyperLogLogPlusPlus counts; + private final HyperLogLogPlusPlus counts; InternalCardinality(String name, HyperLogLogPlusPlus counts, List pipelineAggregators, Map metaData) { @@ -58,26 +42,11 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation this.counts = counts; } - private InternalCardinality() { - } - - @Override - public double value() { - return getValue(); - } - - @Override - public long getValue() { - return counts == null ? 0 : counts.cardinality(0); - } - - @Override - public Type type() { - return TYPE; - } - - @Override - protected void doReadFrom(StreamInput in) throws IOException { + /** + * Read from a stream. + */ + public InternalCardinality(StreamInput in) throws IOException { + super(in); format = in.readNamedWriteable(DocValueFormat.class); if (in.readBoolean()) { counts = HyperLogLogPlusPlus.readFrom(in, BigArrays.NON_RECYCLING_INSTANCE); @@ -97,6 +66,21 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation } } + @Override + public String getWriteableName() { + return CardinalityAggregationBuilder.NAME; + } + + @Override + public double value() { + return getValue(); + } + + @Override + public long getValue() { + return counts == null ? 0 : counts.cardinality(0); + } + @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { InternalCardinality reduced = null; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregationBuilder.java index f9bf2e0a346..3e61f34eef6 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/GeoCentroidAggregationBuilder.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.ValueType; @@ -36,18 +37,19 @@ import java.io.IOException; public class GeoCentroidAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly { - public static final String NAME = InternalGeoCentroid.TYPE.name(); + public static final String NAME = "geo_centroid"; + public static final Type TYPE = new Type(NAME); public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); public GeoCentroidAggregationBuilder(String name) { - super(name, InternalGeoCentroid.TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT); + super(name, TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT); } /** * Read from a stream. */ public GeoCentroidAggregationBuilder(StreamInput in) throws IOException { - super(in, InternalGeoCentroid.TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT); + super(in, TYPE, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/InternalGeoCentroid.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/InternalGeoCentroid.java index 6b15aab8142..06d9d369029 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/InternalGeoCentroid.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/geocentroid/InternalGeoCentroid.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -34,29 +33,11 @@ import java.util.List; import java.util.Map; /** - * Serialization and merge logic for {@link org.elasticsearch.search.aggregations.metrics.geocentroid.GeoCentroidAggregator} + * Serialization and merge logic for {@link GeoCentroidAggregator}. */ public class InternalGeoCentroid extends InternalMetricsAggregation implements GeoCentroid { - - public static final Type TYPE = new Type("geo_centroid"); - public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { - @Override - public InternalGeoCentroid readResult(StreamInput in) throws IOException { - InternalGeoCentroid result = new InternalGeoCentroid(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - AggregationStreams.registerStream(STREAM, TYPE.stream()); - } - - protected GeoPoint centroid; - protected long count; - - protected InternalGeoCentroid() { - } + protected final GeoPoint centroid; + protected final long count; public InternalGeoCentroid(String name, GeoPoint centroid, long count, List pipelineAggregators, Map metaData) { @@ -67,6 +48,37 @@ public class InternalGeoCentroid extends InternalMetricsAggregation implements G this.count = count; } + /** + * Read from a stream. + */ + public InternalGeoCentroid(StreamInput in) throws IOException { + super(in); + count = in.readVLong(); + if (in.readBoolean()) { + final long hash = in.readLong(); + centroid = new GeoPoint(GeoPointField.decodeLatitude(hash), GeoPointField.decodeLongitude(hash)); + } else { + centroid = null; + } + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeVLong(count); + if (centroid != null) { + out.writeBoolean(true); + // should we just write lat and lon separately? + out.writeLong(GeoPointField.encodeLatLon(centroid.lat(), centroid.lon())); + } else { + out.writeBoolean(false); + } + } + + @Override + public String getWriteableName() { + return GeoCentroidAggregationBuilder.NAME; + } + @Override public GeoPoint centroid() { return centroid; @@ -77,11 +89,6 @@ public class InternalGeoCentroid extends InternalMetricsAggregation implements G return count; } - @Override - public Type type() { - return TYPE; - } - @Override public InternalGeoCentroid doReduce(List aggregations, ReduceContext reduceContext) { double lonSum = Double.NaN; @@ -125,29 +132,6 @@ public class InternalGeoCentroid extends InternalMetricsAggregation implements G } } - @Override - protected void doReadFrom(StreamInput in) throws IOException { - count = in.readVLong(); - if (in.readBoolean()) { - final long hash = in.readLong(); - centroid = new GeoPoint(GeoPointField.decodeLatitude(hash), GeoPointField.decodeLongitude(hash)); - } else { - centroid = null; - } - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - out.writeVLong(count); - if (centroid != null) { - out.writeBoolean(true); - // should we just write lat and lon separately? - out.writeLong(GeoPointField.encodeLatLon(centroid.lat(), centroid.lon())); - } else { - out.writeBoolean(false); - } - } - static class Fields { public static final String CENTROID = "location"; } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetric.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetric.java index 3bcbc0cf845..c5704d9f2b6 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetric.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetric.java @@ -26,7 +26,6 @@ import org.elasticsearch.script.CompiledScript; import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptContext; -import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -39,38 +38,36 @@ import java.util.List; import java.util.Map; public class InternalScriptedMetric extends InternalMetricsAggregation implements ScriptedMetric { + private final Script reduceScript; + private final Object aggregation; - public static final Type TYPE = new Type("scripted_metric"); - - public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { - @Override - public InternalScriptedMetric readResult(StreamInput in) throws IOException { - InternalScriptedMetric result = new InternalScriptedMetric(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - AggregationStreams.registerStream(STREAM, TYPE.stream()); - } - - private Script reduceScript; - private Object aggregation; - - private InternalScriptedMetric() { - } - - private InternalScriptedMetric(String name, List pipelineAggregators, Map metaData) { + public InternalScriptedMetric(String name, Object aggregation, Script reduceScript, List pipelineAggregators, + Map metaData) { super(name, pipelineAggregators, metaData); - } - - public InternalScriptedMetric(String name, Object aggregation, Script reduceScript, List pipelineAggregators, Map metaData) { - this(name, pipelineAggregators, metaData); this.aggregation = aggregation; this.reduceScript = reduceScript; } + /** + * Read from a stream. + */ + public InternalScriptedMetric(StreamInput in) throws IOException { + super(in); + reduceScript = in.readOptionalWriteable(Script::new); + aggregation = in.readGenericValue(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(reduceScript); + out.writeGenericValue(aggregation); + } + + @Override + public String getWriteableName() { + return ScriptedMetricAggregationBuilder.NAME; + } + @Override public Object aggregation() { return aggregation; @@ -98,15 +95,11 @@ public class InternalScriptedMetric extends InternalMetricsAggregation implement } else { aggregation = aggregationObjects; } - return new InternalScriptedMetric(firstAggregation.getName(), aggregation, firstAggregation.reduceScript, pipelineAggregators(), getMetaData()); + return new InternalScriptedMetric(firstAggregation.getName(), aggregation, firstAggregation.reduceScript, pipelineAggregators(), + getMetaData()); } - @Override - public Type type() { - return TYPE; - } - @Override public Object getProperty(List path) { if (path.isEmpty()) { @@ -118,24 +111,6 @@ public class InternalScriptedMetric extends InternalMetricsAggregation implement } } - @Override - protected void doReadFrom(StreamInput in) throws IOException { - if (in.readBoolean()) { - reduceScript = new Script(in); - } - aggregation = in.readGenericValue(); - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - boolean hasScript = reduceScript != null; - out.writeBoolean(hasScript); - if (hasScript) { - reduceScript.writeTo(out); - } - out.writeGenericValue(aggregation); - } - @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { return builder.field("value", aggregation); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregationBuilder.java index 5a5f9894f33..cfd50ce9fbd 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregationBuilder.java @@ -31,6 +31,7 @@ import org.elasticsearch.script.ScriptParameterParser; import org.elasticsearch.script.ScriptParameterParser.ScriptParameterValue; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.support.AggregationContext; @@ -42,7 +43,8 @@ import java.util.Set; public class ScriptedMetricAggregationBuilder extends AbstractAggregationBuilder { - public static final String NAME = InternalScriptedMetric.TYPE.name(); + public static final String NAME = "scripted_metric"; + private static final Type TYPE = new Type(NAME); public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); private static final ParseField INIT_SCRIPT_FIELD = new ParseField("init_script"); @@ -59,14 +61,14 @@ public class ScriptedMetricAggregationBuilder extends AbstractAggregationBuilder private Map params; public ScriptedMetricAggregationBuilder(String name) { - super(name, InternalScriptedMetric.TYPE); + super(name, TYPE); } /** * Read from a stream. */ public ScriptedMetricAggregationBuilder(StreamInput in) throws IOException { - super(in, InternalScriptedMetric.TYPE); + super(in, TYPE); initScript = in.readOptionalWriteable(Script::new); mapScript = in.readOptionalWriteable(Script::new); combineScript = in.readOptionalWriteable(Script::new);