diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml
index c3ae18705e2..fe399828b7b 100644
--- a/buildSrc/src/main/resources/checkstyle_suppressions.xml
+++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml
@@ -555,7 +555,6 @@
-
@@ -611,7 +610,6 @@
-
diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java
index f2103f21b96..e39b945a200 100644
--- a/core/src/main/java/org/elasticsearch/search/SearchModule.java
+++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java
@@ -442,7 +442,7 @@ public class SearchModule extends AbstractModule {
*
* @param reader reads the aggregation builder from a stream
* @param internalReader reads the {@link PipelineAggregator} from a stream
- * @param internalReader reads the {@link InternalAggregation} that represents a bucket in this aggregation from a stream
+ * @param bucketReader reads the {@link InternalAggregation} that represents a bucket in this aggregation from a stream
* @param aggregationParser reads the aggregation builder from XContent
* @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the reader
* is registered under.
@@ -562,8 +562,8 @@ public class SearchModule extends AbstractModule {
registerAggregation(new AggregationSpec(ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse,
ChildrenAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalChildren::new));
- registerPipelineAggregation(DerivativePipelineAggregationBuilder::new, DerivativePipelineAggregationBuilder::parse,
- DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
+ registerPipelineAggregation(DerivativePipelineAggregationBuilder::new, DerivativePipelineAggregator::new, InternalDerivative::new,
+ DerivativePipelineAggregationBuilder::parse, DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(MaxBucketPipelineAggregationBuilder::new, MaxBucketPipelineAggregationBuilder.PARSER,
MaxBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(MinBucketPipelineAggregationBuilder::new, MinBucketPipelineAggregationBuilder.PARSER,
@@ -825,8 +825,6 @@ public class SearchModule extends AbstractModule {
static {
// Pipeline Aggregations
- DerivativePipelineAggregator.registerStreams();
- InternalDerivative.registerStreams();
InternalSimpleValue.registerStreams();
InternalBucketMetricValue.registerStreams();
MaxBucketPipelineAggregator.registerStreams();
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java
index 458f6edee7c..52a0ad1b17d 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java
@@ -81,7 +81,7 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
/**
* @return The name of the stream type (used for registering the aggregation stream
- * (see {@link AggregationStreams#registerStream(AggregationStreams.Stream, org.elasticsearch.common.bytes.BytesReference...)}).
+ * (see {@link AggregationStreams#registerStream(AggregationStreams.Stream, BytesReference...)}).
*/
public BytesReference stream() {
return stream;
@@ -196,7 +196,7 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
@Override
public final void writeTo(StreamOutput out) throws IOException {
- out.writeString(name); // NORELEASE remote writing the name? it is automatically handled with writeNamedWriteable
+ out.writeString(name);
out.writeGenericValue(metaData);
out.writeVInt(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMetricsAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMetricsAggregation.java
index 5c2b1c46f5e..a5c64c5cde7 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMetricsAggregation.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMetricsAggregation.java
@@ -29,7 +29,7 @@ import java.util.Map;
public abstract class InternalMetricsAggregation extends InternalAggregation {
- protected InternalMetricsAggregation() {} // for serialization
+ protected InternalMetricsAggregation() {} // NORELEASE remove when we remove streamable
protected InternalMetricsAggregation(String name, List pipelineAggregators, Map metaData) {
super(name, pipelineAggregators, metaData);
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java
index ffa41a9ed8c..36771500972 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalNumericMetricsAggregation.java
@@ -37,7 +37,7 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA
public abstract static class SingleValue extends InternalNumericMetricsAggregation implements NumericMetricsAggregation.SingleValue {
- protected SingleValue() {}
+ protected SingleValue() {} // NORELEASE remove when we remove Streamable
protected SingleValue(String name, List pipelineAggregators, Map metaData) {
super(name, pipelineAggregators, metaData);
@@ -101,7 +101,7 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA
}
}
- private InternalNumericMetricsAggregation() {} // for serialization
+ private InternalNumericMetricsAggregation() {} // NORELEASE remove when we remove Streamable
private InternalNumericMetricsAggregation(String name, List pipelineAggregators, Map metaData) {
super(name, pipelineAggregators, metaData);
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java
index 24f2b88b952..01ac5778f5c 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java
@@ -51,7 +51,7 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
private double value;
- protected InternalSimpleValue() {
+ protected InternalSimpleValue() { // NORELEASE remove and make value final if possible
} // for serialization
public InternalSimpleValue(String name, double value, DocValueFormat formatter, List pipelineAggregators,
@@ -61,6 +61,27 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
this.value = value;
}
+ /**
+ * Read from a stream.
+ */
+ public InternalSimpleValue(StreamInput in) throws IOException {
+ super(in);
+ format = in.readNamedWriteable(DocValueFormat.class);
+ value = in.readDouble();
+ }
+
+ @Override
+ protected void doReadFrom(StreamInput in) throws IOException {
+ format = in.readNamedWriteable(DocValueFormat.class);
+ value = in.readDouble();
+ }
+
+ @Override
+ protected void doWriteTo(StreamOutput out) throws IOException {
+ out.writeNamedWriteable(format);
+ out.writeDouble(value);
+ }
+
@Override
public double value() {
return value;
@@ -80,18 +101,6 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
throw new UnsupportedOperationException("Not supported");
}
- @Override
- protected void doReadFrom(StreamInput in) throws IOException {
- format = in.readNamedWriteable(DocValueFormat.class);
- value = in.readDouble();
- }
-
- @Override
- protected void doWriteTo(StreamOutput out) throws IOException {
- out.writeNamedWriteable(format);
- out.writeDouble(value);
- }
-
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
boolean hasValue = !(Double.isInfinite(value) || Double.isNaN(value));
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregationBuilder.java
index 5ad2f88b198..fd4cb6139b0 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregationBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregationBuilder.java
@@ -46,7 +46,7 @@ import java.util.Map;
import java.util.Objects;
public class DerivativePipelineAggregationBuilder extends AbstractPipelineAggregationBuilder {
- public static final String NAME = DerivativePipelineAggregator.TYPE.name();
+ public static final String NAME = "derivative";
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private static final ParseField FORMAT_FIELD = new ParseField("format");
@@ -62,14 +62,14 @@ public class DerivativePipelineAggregationBuilder extends AbstractPipelineAggreg
}
private DerivativePipelineAggregationBuilder(String name, String[] bucketsPaths) {
- super(name, DerivativePipelineAggregator.TYPE.name(), bucketsPaths);
+ super(name, NAME, bucketsPaths);
}
/**
* Read from a stream.
*/
public DerivativePipelineAggregationBuilder(StreamInput in) throws IOException {
- super(in, DerivativePipelineAggregator.TYPE.name());
+ super(in, NAME);
format = in.readOptionalString();
if (in.readBoolean()) {
gapPolicy = GapPolicy.readFrom(in);
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregator.java
index b40666128cb..7dc2ba76704 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregator.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregator.java
@@ -25,12 +25,10 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.joda.time.DateTime;
import java.io.IOException;
@@ -43,28 +41,9 @@ import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
public class DerivativePipelineAggregator extends PipelineAggregator {
-
- public static final Type TYPE = new Type("derivative");
-
- public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
- @Override
- public DerivativePipelineAggregator readResult(StreamInput in) throws IOException {
- DerivativePipelineAggregator result = new DerivativePipelineAggregator();
- result.readFrom(in);
- return result;
- }
- };
-
- public static void registerStreams() {
- PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
- }
-
- private DocValueFormat formatter;
- private GapPolicy gapPolicy;
- private Double xAxisUnits;
-
- public DerivativePipelineAggregator() {
- }
+ private final DocValueFormat formatter;
+ private final GapPolicy gapPolicy;
+ private final Double xAxisUnits;
public DerivativePipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, GapPolicy gapPolicy, Long xAxisUnits,
Map metadata) {
@@ -74,9 +53,26 @@ public class DerivativePipelineAggregator extends PipelineAggregator {
this.xAxisUnits = xAxisUnits == null ? null : (double) xAxisUnits;
}
+ /**
+ * Read from a stream.
+ */
+ public DerivativePipelineAggregator(StreamInput in) throws IOException {
+ super(in);
+ formatter = in.readNamedWriteable(DocValueFormat.class);
+ gapPolicy = GapPolicy.readFrom(in);
+ xAxisUnits = in.readOptionalDouble();
+ }
+
@Override
- public Type type() {
- return TYPE;
+ public void doWriteTo(StreamOutput out) throws IOException {
+ out.writeNamedWriteable(formatter);
+ gapPolicy.writeTo(out);
+ out.writeOptionalDouble(xAxisUnits);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return DerivativePipelineAggregationBuilder.NAME;
}
@Override
@@ -124,27 +120,4 @@ public class DerivativePipelineAggregator extends PipelineAggregator {
+ ". Found bucket with key " + key);
}
}
-
- @Override
- public void doReadFrom(StreamInput in) throws IOException {
- formatter = in.readNamedWriteable(DocValueFormat.class);
- gapPolicy = GapPolicy.readFrom(in);
- if (in.readBoolean()) {
- xAxisUnits = in.readDouble();
- } else {
- xAxisUnits = null;
-
- }
- }
-
- @Override
- public void doWriteTo(StreamOutput out) throws IOException {
- out.writeNamedWriteable(formatter);
- gapPolicy.writeTo(out);
- boolean hasXAxisUnitsValue = xAxisUnits != null;
- out.writeBoolean(hasXAxisUnitsValue);
- if (hasXAxisUnitsValue) {
- out.writeDouble(xAxisUnits);
- }
- }
}
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/InternalDerivative.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/InternalDerivative.java
index dea181ff25c..e18c0d81eeb 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/InternalDerivative.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/InternalDerivative.java
@@ -23,7 +23,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@@ -32,43 +31,38 @@ import java.util.List;
import java.util.Map;
public class InternalDerivative extends InternalSimpleValue implements Derivative {
+ private final double normalizationFactor;
- public static final Type TYPE = new Type("derivative");
-
- public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
- @Override
- public InternalDerivative readResult(StreamInput in) throws IOException {
- InternalDerivative result = new InternalDerivative();
- result.readFrom(in);
- return result;
- }
- };
-
- public static void registerStreams() {
- AggregationStreams.registerStream(STREAM, TYPE.stream());
- }
-
- private double normalizationFactor;
-
- InternalDerivative() {
- }
-
- public InternalDerivative(String name, double value, double normalizationFactor, DocValueFormat formatter, List pipelineAggregators,
- Map metaData) {
+ public InternalDerivative(String name, double value, double normalizationFactor, DocValueFormat formatter,
+ List pipelineAggregators, Map metaData) {
super(name, value, formatter, pipelineAggregators, metaData);
this.normalizationFactor = normalizationFactor;
}
+ /**
+ * Read from a stream.
+ */
+ public InternalDerivative(StreamInput in) throws IOException {
+ super(in);
+ normalizationFactor = in.readDouble();
+ }
+
+ @Override
+ protected void doWriteTo(StreamOutput out) throws IOException {
+ super.doWriteTo(out);
+ out.writeDouble(normalizationFactor);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return DerivativePipelineAggregationBuilder.NAME;
+ }
+
@Override
public double normalizedValue() {
return normalizationFactor > 0 ? (value() / normalizationFactor) : value();
}
- @Override
- public Type type() {
- return TYPE;
- }
-
@Override
public Object getProperty(List path) {
if (path.isEmpty()) {
@@ -82,18 +76,6 @@ public class InternalDerivative extends InternalSimpleValue implements Derivativ
}
}
- @Override
- protected void doWriteTo(StreamOutput out) throws IOException {
- super.doWriteTo(out);
- out.writeDouble(normalizationFactor);
- }
-
- @Override
- protected void doReadFrom(StreamInput in) throws IOException {
- super.doReadFrom(in);
- normalizationFactor = in.readDouble();
- }
-
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
super.doXContentBody(builder, params);