Migrate serial_diff aggregation to NamedWriteable

This is the last migration before AggregationStreams and
PipelineAggregatorStreams can be removed to remove redundant
code.
This commit is contained in:
Nik Everett 2016-07-18 13:00:06 -04:00
parent 3bb6a4dea6
commit 7861548786
3 changed files with 27 additions and 40 deletions

View File

@ -644,8 +644,11 @@ public class SearchModule extends AbstractModule {
BucketScriptPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); BucketScriptPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(BucketSelectorPipelineAggregationBuilder::new, BucketSelectorPipelineAggregationBuilder::parse, registerPipelineAggregation(BucketSelectorPipelineAggregationBuilder::new, BucketSelectorPipelineAggregationBuilder::parse,
BucketSelectorPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); BucketSelectorPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(SerialDiffPipelineAggregationBuilder::new, SerialDiffPipelineAggregationBuilder::parse, registerPipelineAggregation(new PipelineAggregationSpec(
SerialDiffPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); SerialDiffPipelineAggregationBuilder::new,
SerialDiffPipelineAggregationBuilder::parse,
SerialDiffPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
.addResultReader(SerialDiffPipelineAggregator::new));
} }
protected void configureSearch() { protected void configureSearch() {
@ -889,6 +892,5 @@ public class SearchModule extends AbstractModule {
CumulativeSumPipelineAggregator.registerStreams(); CumulativeSumPipelineAggregator.registerStreams();
BucketScriptPipelineAggregator.registerStreams(); BucketScriptPipelineAggregator.registerStreams();
BucketSelectorPipelineAggregator.registerStreams(); BucketSelectorPipelineAggregator.registerStreams();
SerialDiffPipelineAggregator.registerStreams();
} }
} }

View File

@ -41,7 +41,7 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT; import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT;
public class SerialDiffPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<SerialDiffPipelineAggregationBuilder> { public class SerialDiffPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<SerialDiffPipelineAggregationBuilder> {
public static final String NAME = SerialDiffPipelineAggregator.TYPE.name(); public static final String NAME = "serial_diff";
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private static final ParseField GAP_POLICY = new ParseField("gap_policy"); private static final ParseField GAP_POLICY = new ParseField("gap_policy");
@ -52,14 +52,14 @@ public class SerialDiffPipelineAggregationBuilder extends AbstractPipelineAggreg
private int lag = 1; private int lag = 1;
public SerialDiffPipelineAggregationBuilder(String name, String bucketsPath) { public SerialDiffPipelineAggregationBuilder(String name, String bucketsPath) {
super(name, SerialDiffPipelineAggregator.TYPE.name(), new String[] { bucketsPath }); super(name, NAME, new String[] { bucketsPath });
} }
/** /**
* Read from a stream. * Read from a stream.
*/ */
public SerialDiffPipelineAggregationBuilder(StreamInput in) throws IOException { public SerialDiffPipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, SerialDiffPipelineAggregator.TYPE.name()); super(in, NAME);
format = in.readOptionalString(); format = in.readOptionalString();
gapPolicy = GapPolicy.readFrom(in); gapPolicy = GapPolicy.readFrom(in);
lag = in.readVInt(); lag = in.readVInt();

View File

@ -26,13 +26,11 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -44,26 +42,10 @@ import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
public class SerialDiffPipelineAggregator extends PipelineAggregator { public class SerialDiffPipelineAggregator extends PipelineAggregator {
public static final Type TYPE = new Type("serial_diff");
public static final PipelineAggregatorStreams.Stream STREAM = in -> {
SerialDiffPipelineAggregator result = new SerialDiffPipelineAggregator();
result.readFrom(in);
return result;
};
public static void registerStreams() {
PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
}
private DocValueFormat formatter; private DocValueFormat formatter;
private GapPolicy gapPolicy; private GapPolicy gapPolicy;
private int lag; private int lag;
public SerialDiffPipelineAggregator() {
}
public SerialDiffPipelineAggregator(String name, String[] bucketsPaths, @Nullable DocValueFormat formatter, GapPolicy gapPolicy, public SerialDiffPipelineAggregator(String name, String[] bucketsPaths, @Nullable DocValueFormat formatter, GapPolicy gapPolicy,
int lag, Map<String, Object> metadata) { int lag, Map<String, Object> metadata) {
super(name, bucketsPaths, metadata); super(name, bucketsPaths, metadata);
@ -72,9 +54,26 @@ public class SerialDiffPipelineAggregator extends PipelineAggregator {
this.lag = lag; this.lag = lag;
} }
/**
* Read from a stream.
*/
public SerialDiffPipelineAggregator(StreamInput in) throws IOException {
super(in);
formatter = in.readNamedWriteable(DocValueFormat.class);
gapPolicy = GapPolicy.readFrom(in);
lag = in.readVInt();
}
@Override @Override
public Type type() { public void doWriteTo(StreamOutput out) throws IOException {
return TYPE; out.writeNamedWriteable(formatter);
gapPolicy.writeTo(out);
out.writeVInt(lag);
}
@Override
public String getWriteableName() {
return SerialDiffPipelineAggregationBuilder.NAME;
} }
@Override @Override
@ -125,18 +124,4 @@ public class SerialDiffPipelineAggregator extends PipelineAggregator {
} }
return factory.create(newBuckets, histo); return factory.create(newBuckets, histo);
} }
@Override
public void doReadFrom(StreamInput in) throws IOException {
formatter = in.readNamedWriteable(DocValueFormat.class);
gapPolicy = GapPolicy.readFrom(in);
lag = in.readVInt();
}
@Override
public void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(formatter);
gapPolicy.writeTo(out);
out.writeVInt(lag);
}
} }