Migrate derivative pipeline aggregation to NamedWriteable

This is another step in the effort to remove AggregationStreams and
instead use NamedWriteableRegistry like the rest of the code base.
This commit is contained in:
Nik Everett 2016-07-12 21:26:19 -04:00
parent 29fd0f1bd8
commit 88d3527178
9 changed files with 77 additions and 117 deletions

View File

@ -555,7 +555,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]MultiValueMode.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]SearchService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]AggregatorFactories.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]InternalAggregation.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]InternalMultiBucketAggregation.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]ValuesSourceAggregationBuilder.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]BucketsAggregator.java" checks="LineLength" />
@ -611,7 +610,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]stats[/\\]extended[/\\]ExtendedStatsAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]tophits[/\\]TopHitsAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]pipeline[/\\]bucketscript[/\\]BucketScriptPipelineAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]pipeline[/\\]derivative[/\\]InternalDerivative.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]support[/\\]AggregationPath.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]support[/\\]ValuesSourceParser.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]support[/\\]format[/\\]ValueFormat.java" checks="LineLength" />

View File

@ -442,7 +442,7 @@ public class SearchModule extends AbstractModule {
*
* @param reader reads the aggregation builder from a stream
* @param internalReader reads the {@link PipelineAggregator} from a stream
* @param internalReader reads the {@link InternalAggregation} that represents a bucket in this aggregation from a stream
* @param bucketReader reads the {@link InternalAggregation} that represents a bucket in this aggregation from a stream
* @param aggregationParser reads the aggregation builder from XContent
* @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the reader
* is registered under.
@ -562,8 +562,8 @@ public class SearchModule extends AbstractModule {
registerAggregation(new AggregationSpec(ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse,
ChildrenAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalChildren::new));
registerPipelineAggregation(DerivativePipelineAggregationBuilder::new, DerivativePipelineAggregationBuilder::parse,
DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(DerivativePipelineAggregationBuilder::new, DerivativePipelineAggregator::new, InternalDerivative::new,
DerivativePipelineAggregationBuilder::parse, DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(MaxBucketPipelineAggregationBuilder::new, MaxBucketPipelineAggregationBuilder.PARSER,
MaxBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(MinBucketPipelineAggregationBuilder::new, MinBucketPipelineAggregationBuilder.PARSER,
@ -825,8 +825,6 @@ public class SearchModule extends AbstractModule {
static {
// Pipeline Aggregations
DerivativePipelineAggregator.registerStreams();
InternalDerivative.registerStreams();
InternalSimpleValue.registerStreams();
InternalBucketMetricValue.registerStreams();
MaxBucketPipelineAggregator.registerStreams();

View File

@ -81,7 +81,7 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
/**
* @return The name of the stream type (used for registering the aggregation stream
* (see {@link AggregationStreams#registerStream(AggregationStreams.Stream, org.elasticsearch.common.bytes.BytesReference...)}).
* (see {@link AggregationStreams#registerStream(AggregationStreams.Stream, BytesReference...)}).
*/
public BytesReference stream() {
return stream;
@ -196,7 +196,7 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name); // NORELEASE remote writing the name? it is automatically handled with writeNamedWriteable
out.writeString(name);
out.writeGenericValue(metaData);
out.writeVInt(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {

View File

@ -29,7 +29,7 @@ import java.util.Map;
public abstract class InternalMetricsAggregation extends InternalAggregation {
protected InternalMetricsAggregation() {} // for serialization
protected InternalMetricsAggregation() {} // NORELEASE remove when we remove streamable
protected InternalMetricsAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);

View File

@ -37,7 +37,7 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA
public abstract static class SingleValue extends InternalNumericMetricsAggregation implements NumericMetricsAggregation.SingleValue {
protected SingleValue() {}
protected SingleValue() {} // NORELEASE remove when we remove Streamable
protected SingleValue(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
@ -101,7 +101,7 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA
}
}
private InternalNumericMetricsAggregation() {} // for serialization
private InternalNumericMetricsAggregation() {} // NORELEASE remove when we remove Streamable
private InternalNumericMetricsAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);

View File

@ -51,7 +51,7 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
private double value;
protected InternalSimpleValue() {
protected InternalSimpleValue() { // NORELEASE remove and make value final if possible
} // for serialization
public InternalSimpleValue(String name, double value, DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators,
@ -61,6 +61,27 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
this.value = value;
}
/**
* Read from a stream.
*/
public InternalSimpleValue(StreamInput in) throws IOException {
super(in);
format = in.readNamedWriteable(DocValueFormat.class);
value = in.readDouble();
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
format = in.readNamedWriteable(DocValueFormat.class);
value = in.readDouble();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeDouble(value);
}
@Override
public double value() {
return value;
@ -80,18 +101,6 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
throw new UnsupportedOperationException("Not supported");
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
format = in.readNamedWriteable(DocValueFormat.class);
value = in.readDouble();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeDouble(value);
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
boolean hasValue = !(Double.isInfinite(value) || Double.isNaN(value));

View File

@ -46,7 +46,7 @@ import java.util.Map;
import java.util.Objects;
public class DerivativePipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<DerivativePipelineAggregationBuilder> {
public static final String NAME = DerivativePipelineAggregator.TYPE.name();
public static final String NAME = "derivative";
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private static final ParseField FORMAT_FIELD = new ParseField("format");
@ -62,14 +62,14 @@ public class DerivativePipelineAggregationBuilder extends AbstractPipelineAggreg
}
private DerivativePipelineAggregationBuilder(String name, String[] bucketsPaths) {
super(name, DerivativePipelineAggregator.TYPE.name(), bucketsPaths);
super(name, NAME, bucketsPaths);
}
/**
* Read from a stream.
*/
public DerivativePipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, DerivativePipelineAggregator.TYPE.name());
super(in, NAME);
format = in.readOptionalString();
if (in.readBoolean()) {
gapPolicy = GapPolicy.readFrom(in);

View File

@ -25,12 +25,10 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.joda.time.DateTime;
import java.io.IOException;
@ -43,28 +41,9 @@ import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
public class DerivativePipelineAggregator extends PipelineAggregator {
public static final Type TYPE = new Type("derivative");
public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
@Override
public DerivativePipelineAggregator readResult(StreamInput in) throws IOException {
DerivativePipelineAggregator result = new DerivativePipelineAggregator();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
}
private DocValueFormat formatter;
private GapPolicy gapPolicy;
private Double xAxisUnits;
public DerivativePipelineAggregator() {
}
private final DocValueFormat formatter;
private final GapPolicy gapPolicy;
private final Double xAxisUnits;
public DerivativePipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, GapPolicy gapPolicy, Long xAxisUnits,
Map<String, Object> metadata) {
@ -74,9 +53,26 @@ public class DerivativePipelineAggregator extends PipelineAggregator {
this.xAxisUnits = xAxisUnits == null ? null : (double) xAxisUnits;
}
/**
* Read from a stream.
*/
public DerivativePipelineAggregator(StreamInput in) throws IOException {
super(in);
formatter = in.readNamedWriteable(DocValueFormat.class);
gapPolicy = GapPolicy.readFrom(in);
xAxisUnits = in.readOptionalDouble();
}
@Override
public Type type() {
return TYPE;
public void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(formatter);
gapPolicy.writeTo(out);
out.writeOptionalDouble(xAxisUnits);
}
@Override
public String getWriteableName() {
return DerivativePipelineAggregationBuilder.NAME;
}
@Override
@ -124,27 +120,4 @@ public class DerivativePipelineAggregator extends PipelineAggregator {
+ ". Found bucket with key " + key);
}
}
@Override
public void doReadFrom(StreamInput in) throws IOException {
formatter = in.readNamedWriteable(DocValueFormat.class);
gapPolicy = GapPolicy.readFrom(in);
if (in.readBoolean()) {
xAxisUnits = in.readDouble();
} else {
xAxisUnits = null;
}
}
@Override
public void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(formatter);
gapPolicy.writeTo(out);
boolean hasXAxisUnitsValue = xAxisUnits != null;
out.writeBoolean(hasXAxisUnitsValue);
if (hasXAxisUnitsValue) {
out.writeDouble(xAxisUnits);
}
}
}

View File

@ -23,7 +23,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -32,43 +31,38 @@ import java.util.List;
import java.util.Map;
public class InternalDerivative extends InternalSimpleValue implements Derivative {
private final double normalizationFactor;
public static final Type TYPE = new Type("derivative");
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalDerivative readResult(StreamInput in) throws IOException {
InternalDerivative result = new InternalDerivative();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
private double normalizationFactor;
InternalDerivative() {
}
public InternalDerivative(String name, double value, double normalizationFactor, DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
public InternalDerivative(String name, double value, double normalizationFactor, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, value, formatter, pipelineAggregators, metaData);
this.normalizationFactor = normalizationFactor;
}
/**
* Read from a stream.
*/
public InternalDerivative(StreamInput in) throws IOException {
super(in);
normalizationFactor = in.readDouble();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
super.doWriteTo(out);
out.writeDouble(normalizationFactor);
}
@Override
public String getWriteableName() {
return DerivativePipelineAggregationBuilder.NAME;
}
@Override
public double normalizedValue() {
return normalizationFactor > 0 ? (value() / normalizationFactor) : value();
}
@Override
public Type type() {
return TYPE;
}
@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {
@ -82,18 +76,6 @@ public class InternalDerivative extends InternalSimpleValue implements Derivativ
}
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
super.doWriteTo(out);
out.writeDouble(normalizationFactor);
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
super.doReadFrom(in);
normalizationFactor = in.readDouble();
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
super.doXContentBody(builder, params);