Migrate moving_avg pipeline aggregation to NamedWriteable

This is the first pipeline aggregation that doesn't have its own
bucket type that needs serializing. It uses InternalHistogram instead.
So that required reworking the new-style `registerAggregation` method
to not require bucket readers. So I built `PipelineAggregationSpec` to
mirror `AggregationSpec`. It allows registering any number of bucket
readers or result readers.
This commit is contained in:
Nik Everett 2016-07-13 12:21:51 -04:00
parent 8394544548
commit 16812cc032
3 changed files with 136 additions and 92 deletions

View File

@ -98,7 +98,6 @@ import org.elasticsearch.plugins.SearchPlugin.SearchPluginSpec;
import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.Aggregator.Parser;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
@ -381,8 +380,10 @@ public class SearchModule extends AbstractModule {
* Register an aggregation.
*/
public void registerAggregation(AggregationSpec spec) {
namedWriteableRegistry.register(AggregationBuilder.class, spec.aggregationName.getPreferredName(), spec.builderReader);
aggregationParserRegistry.register(spec.aggregationParser, spec.aggregationName);
if (false == transportClient) {
aggregationParserRegistry.register(spec.parser, spec.name);
}
namedWriteableRegistry.register(AggregationBuilder.class, spec.name.getPreferredName(), spec.builderReader);
for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> t : spec.resultReaders.entrySet()) {
String writeableName = t.getKey();
Writeable.Reader<? extends InternalAggregation> internalReader = t.getValue();
@ -393,29 +394,30 @@ public class SearchModule extends AbstractModule {
public static class AggregationSpec {
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
private final Writeable.Reader<? extends AggregationBuilder> builderReader;
private final Aggregator.Parser aggregationParser;
private final ParseField aggregationName;
private final Aggregator.Parser parser;
private final ParseField name;
/**
* Register an aggregation.
*
* @param builderReader reads the {@link AggregationBuilder} from a stream
* @param aggregationParser reads the aggregation builder from XContent
* @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the
* reader is registered under.
* @param parser reads the aggregation builder from XContent
* @param name names by which the aggregation may be parsed. The first name is special because it is the name that the reader is
* registered under.
*/
public AggregationSpec(Reader<? extends AggregationBuilder> builderReader, Parser aggregationParser, ParseField aggregationName) {
public AggregationSpec(Reader<? extends AggregationBuilder> builderReader, Aggregator.Parser parser,
ParseField name) {
this.builderReader = builderReader;
this.aggregationParser = aggregationParser;
this.aggregationName = aggregationName;
this.parser = parser;
this.name = name;
}
/**
* Add a reader for the shard level results of the aggregation with {@linkplain #aggregationName}'s
* {@link ParseField#getPreferredName()} as the {@link NamedWriteable#getWriteableName()}.
* Add a reader for the shard level results of the aggregation with {@linkplain #name}'s {@link ParseField#getPreferredName()} as
* the {@link NamedWriteable#getWriteableName()}.
*/
public AggregationSpec addResultReader(Writeable.Reader<? extends InternalAggregation> resultReader) {
return addResultReader(aggregationName.getPreferredName(), resultReader);
return addResultReader(name.getPreferredName(), resultReader);
}
/**
@ -429,23 +431,73 @@ public class SearchModule extends AbstractModule {
/**
* Register a pipeline aggregation.
*
* @param reader reads the aggregation builder from a stream
* @param internalReader reads the {@link PipelineAggregator} from a stream
* @param bucketReader reads the {@link InternalAggregation} that represents a bucket in this aggregation from a stream
* @param aggregationParser reads the aggregation builder from XContent
* @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the reader
* is registered under.
*/
public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader,
Writeable.Reader<? extends PipelineAggregator> internalReader, Writeable.Reader<? extends InternalAggregation> bucketReader,
PipelineAggregator.Parser aggregationParser, ParseField aggregationName) {
public void registerPipelineAggregation(PipelineAggregationSpec spec) {
if (false == transportClient) {
pipelineAggregationParserRegistry.register(aggregationParser, aggregationName);
pipelineAggregationParserRegistry.register(spec.parser, spec.name);
}
namedWriteableRegistry.register(PipelineAggregationBuilder.class, spec.name.getPreferredName(), spec.builderReader);
for (Map.Entry<String, Writeable.Reader<? extends PipelineAggregator>> resultReader : spec.resultReaders.entrySet()) {
namedWriteableRegistry.register(PipelineAggregator.class, resultReader.getKey(), resultReader.getValue());
}
for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> bucketReaders : spec.bucketReaders.entrySet()) {
namedWriteableRegistry.register(InternalAggregation.class, bucketReaders.getKey(), bucketReaders.getValue());
}
}
public static class PipelineAggregationSpec {
private final Map<String, Writeable.Reader<? extends PipelineAggregator>> resultReaders = new TreeMap<>();
private final Map<String, Writeable.Reader<? extends InternalAggregation>> bucketReaders = new TreeMap<>();
private final Writeable.Reader<? extends PipelineAggregationBuilder> builderReader;
private final PipelineAggregator.Parser parser;
private final ParseField name;
/**
* Register a pipeline aggregation.
*
* @param builderReader reads the {@link PipelineAggregationBuilder} from a stream
* @param parser reads the aggregation builder from XContent
* @param name names by which the aggregation may be parsed. The first name is special because it is the name that the reader is
* registered under.
*/
public PipelineAggregationSpec(Reader<? extends PipelineAggregationBuilder> builderReader,
PipelineAggregator.Parser parser, ParseField name) {
this.builderReader = builderReader;
this.parser = parser;
this.name = name;
}
/**
* Add a reader for the shard level results of the aggregation with {@linkplain #name}'s {@link ParseField#getPreferredName()} as
* the {@link NamedWriteable#getWriteableName()}.
*/
public PipelineAggregationSpec addResultReader(Writeable.Reader<? extends PipelineAggregator> resultReader) {
return addResultReader(name.getPreferredName(), resultReader);
}
/**
* Add a reader for the shard level results of the aggregation.
*/
public PipelineAggregationSpec addResultReader(String writeableName, Writeable.Reader<? extends PipelineAggregator> resultReader) {
resultReaders.put(writeableName, resultReader);
return this;
}
/**
* Add a reader for the shard level bucket results of the aggregation with {@linkplain name}'s {@link ParseField#getPreferredName()}
* as the {@link NamedWriteable#getWriteableName()}.
*/
public PipelineAggregationSpec addBucketReader(Writeable.Reader<? extends InternalAggregation> resultReader) {
return addBucketReader(name.getPreferredName(), resultReader);
}
/**
* Add a reader for the shard level results of the aggregation.
*/
public PipelineAggregationSpec addBucketReader(String writeableName, Writeable.Reader<? extends InternalAggregation> resultReader) {
bucketReaders.put(writeableName, resultReader);
return this;
}
namedWriteableRegistry.register(PipelineAggregationBuilder.class, aggregationName.getPreferredName(), reader);
namedWriteableRegistry.register(PipelineAggregator.class, aggregationName.getPreferredName(), internalReader);
namedWriteableRegistry.register(InternalAggregation.class, aggregationName.getPreferredName(), bucketReader);
}
public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader,
@ -552,8 +604,12 @@ public class SearchModule extends AbstractModule {
registerAggregation(new AggregationSpec(ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse,
ChildrenAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalChildren::new));
registerPipelineAggregation(DerivativePipelineAggregationBuilder::new, DerivativePipelineAggregator::new, InternalDerivative::new,
DerivativePipelineAggregationBuilder::parse, DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(new PipelineAggregationSpec(
DerivativePipelineAggregationBuilder::new,
DerivativePipelineAggregationBuilder::parse,
DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
.addResultReader(DerivativePipelineAggregator::new)
.addBucketReader(InternalDerivative::new));
registerPipelineAggregation(MaxBucketPipelineAggregationBuilder::new, MaxBucketPipelineAggregationBuilder.PARSER,
MaxBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(MinBucketPipelineAggregationBuilder::new, MinBucketPipelineAggregationBuilder.PARSER,
@ -562,17 +618,26 @@ public class SearchModule extends AbstractModule {
AvgBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(SumBucketPipelineAggregationBuilder::new, SumBucketPipelineAggregationBuilder.PARSER,
SumBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(StatsBucketPipelineAggregationBuilder::new, StatsBucketPipelineAggregator::new,
InternalStatsBucket::new, StatsBucketPipelineAggregationBuilder.PARSER,
StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(ExtendedStatsBucketPipelineAggregationBuilder::new, ExtendedStatsBucketPipelineAggregator::new,
InternalExtendedStatsBucket::new, new ExtendedStatsBucketParser(),
ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(new PipelineAggregationSpec(
StatsBucketPipelineAggregationBuilder::new,
StatsBucketPipelineAggregationBuilder.PARSER,
StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
.addResultReader(StatsBucketPipelineAggregator::new)
.addBucketReader(InternalStatsBucket::new));
registerPipelineAggregation(new PipelineAggregationSpec(
ExtendedStatsBucketPipelineAggregationBuilder::new,
new ExtendedStatsBucketParser(),
ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
.addResultReader(ExtendedStatsBucketPipelineAggregator::new)
.addBucketReader(InternalExtendedStatsBucket::new));
registerPipelineAggregation(PercentilesBucketPipelineAggregationBuilder::new, PercentilesBucketPipelineAggregationBuilder.PARSER,
PercentilesBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(MovAvgPipelineAggregationBuilder::new,
registerPipelineAggregation(new PipelineAggregationSpec(
MovAvgPipelineAggregationBuilder::new,
(n, c) -> MovAvgPipelineAggregationBuilder.parse(movingAverageModelParserRegistry, n, c),
MovAvgPipelineAggregationBuilder.AGGREGATION_FIELD_NAME);
MovAvgPipelineAggregationBuilder.AGGREGATION_FIELD_NAME)
.addResultReader(MovAvgPipelineAggregator::new)
/* Uses InternalHistogram for buckets */);
registerPipelineAggregation(CumulativeSumPipelineAggregationBuilder::new, CumulativeSumPipelineAggregationBuilder::parse,
CumulativeSumPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(BucketScriptPipelineAggregationBuilder::new, BucketScriptPipelineAggregationBuilder::parse,
@ -821,7 +886,6 @@ public class SearchModule extends AbstractModule {
AvgBucketPipelineAggregator.registerStreams();
SumBucketPipelineAggregator.registerStreams();
PercentilesBucketPipelineAggregator.registerStreams();
MovAvgPipelineAggregator.registerStreams();
CumulativeSumPipelineAggregator.registerStreams();
BucketScriptPipelineAggregator.registerStreams();
BucketSelectorPipelineAggregator.registerStreams();

View File

@ -50,7 +50,7 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY;
public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<MovAvgPipelineAggregationBuilder> {
public static final String NAME = MovAvgPipelineAggregator.TYPE.name();
public static final String NAME = "moving_avg";
public static final ParseField AGGREGATION_FIELD_NAME = new ParseField(NAME);
public static final ParseField MODEL = new ParseField("model");
@ -67,14 +67,14 @@ public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregatio
private Boolean minimize;
public MovAvgPipelineAggregationBuilder(String name, String bucketsPath) {
super(name, MovAvgPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
super(name, NAME, new String[] { bucketsPath });
}
/**
* Read from a stream.
*/
public MovAvgPipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, MovAvgPipelineAggregator.TYPE.name());
super(in, NAME);
format = in.readOptionalString();
gapPolicy = GapPolicy.readFrom(in);
window = in.readVInt();

View File

@ -26,13 +26,11 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.joda.time.DateTime;
@ -47,31 +45,12 @@ import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
public class MovAvgPipelineAggregator extends PipelineAggregator {
public static final Type TYPE = new Type("moving_avg");
public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
@Override
public MovAvgPipelineAggregator readResult(StreamInput in) throws IOException {
MovAvgPipelineAggregator result = new MovAvgPipelineAggregator();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
}
private DocValueFormat formatter;
private GapPolicy gapPolicy;
private int window;
private final DocValueFormat formatter;
private final GapPolicy gapPolicy;
private final int window;
private MovAvgModel model;
private int predict;
private boolean minimize;
public MovAvgPipelineAggregator() {
}
private final int predict;
private final boolean minimize;
public MovAvgPipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, GapPolicy gapPolicy,
int window, int predict, MovAvgModel model, boolean minimize, Map<String, Object> metadata) {
@ -84,9 +63,32 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
this.minimize = minimize;
}
/**
* Read from a stream.
*/
public MovAvgPipelineAggregator(StreamInput in) throws IOException {
super(in);
formatter = in.readNamedWriteable(DocValueFormat.class);
gapPolicy = GapPolicy.readFrom(in);
window = in.readVInt();
predict = in.readVInt();
model = in.readNamedWriteable(MovAvgModel.class);
minimize = in.readBoolean();
}
@Override
public Type type() {
return TYPE;
public void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(formatter);
gapPolicy.writeTo(out);
out.writeVInt(window);
out.writeVInt(predict);
out.writeNamedWriteable(model);
out.writeBoolean(minimize);
}
@Override
public String getWriteableName() {
return MovAvgPipelineAggregationBuilder.NAME;
}
@Override
@ -246,26 +248,4 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
return SimulatedAnealingMinimizer.minimize(model, values, test);
}
@Override
public void doReadFrom(StreamInput in) throws IOException {
formatter = in.readNamedWriteable(DocValueFormat.class);
gapPolicy = GapPolicy.readFrom(in);
window = in.readVInt();
predict = in.readVInt();
model = in.readNamedWriteable(MovAvgModel.class);
minimize = in.readBoolean();
}
@Override
public void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(formatter);
gapPolicy.writeTo(out);
out.writeVInt(window);
out.writeVInt(predict);
out.writeNamedWriteable(model);
out.writeBoolean(minimize);
}
}