Migration stats and extended stats to NamedWriteable

Migrates the `stats` and `extended_stats` aggregations and pipeline
aggregations from the special purpose aggregations streams to
`NamedWriteable`. These are the first pipeline aggregations so this
adds the infrastructure to support both streams and `NamedWriteable`s
for pipeline aggregations.
This commit is contained in:
Nik Everett 2016-06-30 15:19:58 -04:00
parent 1297a707da
commit 91b66e3cf4
17 changed files with 281 additions and 298 deletions

View File

@ -91,8 +91,8 @@ import org.elasticsearch.index.query.functionscore.ScriptScoreFunctionBuilder;
import org.elasticsearch.index.query.functionscore.WeightBuilder;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
@ -119,6 +119,7 @@ import org.elasticsearch.search.aggregations.bucket.nested.InternalNested;
import org.elasticsearch.search.aggregations.bucket.nested.InternalReverseNested;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.nested.ReverseNestedAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.InternalBinaryRange;
import org.elasticsearch.search.aggregations.bucket.range.InternalRange;
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.RangeParser;
@ -129,7 +130,6 @@ import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanc
import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanceParser;
import org.elasticsearch.search.aggregations.bucket.range.geodistance.InternalGeoDistance;
import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.InternalBinaryRange;
import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeParser;
import org.elasticsearch.search.aggregations.bucket.sampler.DiversifiedAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.sampler.DiversifiedSamplerParser;
@ -200,40 +200,42 @@ import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.InternalStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketParser;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.InternalExtendedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.EwmaModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltLinearModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltWintersModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.LinearModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.FetchSubPhase;
@ -445,16 +447,31 @@ public class SearchModule extends AbstractModule {
* Register a pipeline aggregation.
*
* @param reader reads the aggregation builder from a stream
* @param internalReader reads the {@link PipelineAggregator} from a stream
* @param internalReader reads the {@link InternalAggregation} that represents a bucket in this aggregation from a stream
* @param aggregationParser reads the aggregation builder from XContent
* @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the reader
* is registered under.
*/
public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader,
Writeable.Reader<? extends PipelineAggregator> internalReader, Writeable.Reader<? extends InternalAggregation> bucketReader,
PipelineAggregator.Parser aggregationParser, ParseField aggregationName) {
if (false == transportClient) {
namedWriteableRegistry.register(PipelineAggregationBuilder.class, aggregationName.getPreferredName(), reader);
pipelineAggregationParserRegistry.register(aggregationParser, aggregationName);
}
namedWriteableRegistry.register(PipelineAggregator.class, aggregationName.getPreferredName(), internalReader);
namedWriteableRegistry.register(InternalAggregation.class, aggregationName.getPreferredName(), bucketReader);
}
public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader,
PipelineAggregator.Parser aggregationParser, ParseField aggregationName) {
// NORELEASE remove me in favor of the above method
pipelineAggregationParserRegistry.register(aggregationParser, aggregationName);
namedWriteableRegistry.register(PipelineAggregationBuilder.class, aggregationName.getPreferredName(), reader);
}
@Override
protected void configure() {
if (false == transportClient) {
@ -475,9 +492,10 @@ public class SearchModule extends AbstractModule {
registerAggregation(SumAggregationBuilder::new, new SumParser(), SumAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(MinAggregationBuilder::new, new MinParser(), MinAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(MaxAggregationBuilder::new, new MaxParser(), MaxAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(StatsAggregationBuilder::new, new StatsParser(), StatsAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(ExtendedStatsAggregationBuilder::new, new ExtendedStatsParser(),
ExtendedStatsAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(StatsAggregationBuilder::new, InternalStats::new, new StatsParser(),
StatsAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(ExtendedStatsAggregationBuilder::new, InternalExtendedStats::new, new ExtendedStatsParser(),
ExtendedStatsAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(ValueCountAggregationBuilder::new, new ValueCountParser(), ValueCountAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(PercentilesAggregationBuilder::new, new PercentilesParser(),
PercentilesAggregationBuilder.AGGREGATION_NAME_FIELD);
@ -522,6 +540,7 @@ public class SearchModule extends AbstractModule {
ScriptedMetricAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse,
ChildrenAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(DerivativePipelineAggregationBuilder::new, DerivativePipelineAggregationBuilder::parse,
DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(MaxBucketPipelineAggregationBuilder::new, MaxBucketPipelineAggregationBuilder.PARSER,
@ -532,9 +551,11 @@ public class SearchModule extends AbstractModule {
AvgBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(SumBucketPipelineAggregationBuilder::new, SumBucketPipelineAggregationBuilder.PARSER,
SumBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(StatsBucketPipelineAggregationBuilder::new, StatsBucketPipelineAggregationBuilder.PARSER,
registerPipelineAggregation(StatsBucketPipelineAggregationBuilder::new, StatsBucketPipelineAggregator::new,
InternalStatsBucket::new, StatsBucketPipelineAggregationBuilder.PARSER,
StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(ExtendedStatsBucketPipelineAggregationBuilder::new, new ExtendedStatsBucketParser(),
registerPipelineAggregation(ExtendedStatsBucketPipelineAggregationBuilder::new, ExtendedStatsBucketPipelineAggregator::new,
InternalExtendedStatsBucket::new, new ExtendedStatsBucketParser(),
ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(PercentilesBucketPipelineAggregationBuilder::new, PercentilesBucketPipelineAggregationBuilder.PARSER,
PercentilesBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
@ -703,8 +724,6 @@ public class SearchModule extends AbstractModule {
InternalSum.registerStreams();
InternalMin.registerStreams();
InternalMax.registerStreams();
InternalStats.registerStreams();
InternalExtendedStats.registerStreams();
InternalValueCount.registerStreams();
InternalTDigestPercentiles.registerStreams();
InternalTDigestPercentileRanks.registerStreams();
@ -749,8 +768,6 @@ public class SearchModule extends AbstractModule {
MinBucketPipelineAggregator.registerStreams();
AvgBucketPipelineAggregator.registerStreams();
SumBucketPipelineAggregator.registerStreams();
StatsBucketPipelineAggregator.registerStreams();
ExtendedStatsBucketPipelineAggregator.registerStreams();
PercentilesBucketPipelineAggregator.registerStreams();
MovAvgPipelineAggregator.registerStreams();
CumulativeSumPipelineAggregator.registerStreams();

View File

@ -151,9 +151,13 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
} else {
pipelineAggregators = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
BytesReference type = in.readBytesReference();
PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
pipelineAggregators.add(pipelineAggregator);
if (in.readBoolean()) {
pipelineAggregators.add(in.readNamedWriteable(PipelineAggregator.class));
} else {
BytesReference type = in.readBytesReference();
PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
pipelineAggregators.add(pipelineAggregator);
}
}
}
}
@ -174,9 +178,13 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
} else {
pipelineAggregators = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
BytesReference type = in.readBytesReference();
PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
pipelineAggregators.add(pipelineAggregator);
if (in.readBoolean()) {
pipelineAggregators.add(in.readNamedWriteable(PipelineAggregator.class));
} else {
BytesReference type = in.readBytesReference();
PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
pipelineAggregators.add(pipelineAggregator);
}
}
}
doReadFrom(in);
@ -188,34 +196,43 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name); // NORELEASE remote writing the name - it is automatically handled with writeNamedWriteable
out.writeString(name); // NORELEASE remote writing the name? it is automatically handled with writeNamedWriteable
out.writeGenericValue(metaData);
out.writeVInt(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
out.writeBytesReference(pipelineAggregator.type().stream());
pipelineAggregator.writeTo(out);
// NORELEASE temporary hack to support old style streams and new style NamedWriteable
try {
pipelineAggregator.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams.
out.writeBoolean(true);
out.writeNamedWriteable(pipelineAggregator);
} catch (UnsupportedOperationException e) {
out.writeBoolean(false);
out.writeBytesReference(pipelineAggregator.type().stream());
pipelineAggregator.writeTo(out);
}
}
doWriteTo(out);
}
protected abstract void doWriteTo(StreamOutput out) throws IOException;
@Override
public String getName() {
return name;
}
@Override
public String getWriteableName() {
// NORELEASE remove me when all InternalAggregations override it
throw new UnsupportedOperationException("Override on every class");
}
@Override
public String getName() {
return name;
}
/**
* @return The {@link Type} of this aggregation
*/
public Type type() {
throw new UnsupportedOperationException("Use getWriteableName instead"); // NORELEASE remove me
// NORELEASE remove this method
throw new UnsupportedOperationException(getClass().getName() + " used type but should Use getWriteableName instead");
}
/**

View File

@ -22,7 +22,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -31,26 +30,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
*
*/
public class InternalStats extends InternalNumericMetricsAggregation.MultiValue implements Stats {
public final static Type TYPE = new Type("stats");
public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalStats readResult(StreamInput in) throws IOException {
InternalStats result = new InternalStats();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
enum Metrics {
count, sum, min, max, avg;
@ -60,12 +40,10 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue
}
}
protected long count;
protected double min;
protected double max;
protected double sum;
protected InternalStats() {} // for serialization
protected final long count;
protected final double min;
protected final double max;
protected final double sum;
public InternalStats(String name, long count, double sum, double min, double max, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators,
@ -78,6 +56,36 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue
this.format = formatter;
}
/**
* Read from a stream.
*/
public InternalStats(StreamInput in) throws IOException {
super(in);
format = in.readNamedWriteable(DocValueFormat.class);
count = in.readVLong();
min = in.readDouble();
max = in.readDouble();
sum = in.readDouble();
}
@Override
protected final void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeVLong(count);
out.writeDouble(min);
out.writeDouble(max);
out.writeDouble(sum);
writeOtherStatsTo(out);
}
protected void writeOtherStatsTo(StreamOutput out) throws IOException {
}
@Override
public String getWriteableName() {
return StatsAggregationBuilder.NAME;
}
@Override
public long getCount() {
return count;
@ -128,11 +136,6 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue
return valueAsString(Metrics.sum.name());
}
@Override
public Type type() {
return TYPE;
}
@Override
public double value(String name) {
Metrics metrics = Metrics.valueOf(name);
@ -163,32 +166,6 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue
return new InternalStats(name, count, sum, min, max, format, pipelineAggregators(), getMetaData());
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
format = in.readNamedWriteable(DocValueFormat.class);
count = in.readVLong();
min = in.readDouble();
max = in.readDouble();
sum = in.readDouble();
readOtherStatsFrom(in);
}
public void readOtherStatsFrom(StreamInput in) throws IOException {
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeVLong(count);
out.writeDouble(min);
out.writeDouble(max);
out.writeDouble(sum);
writeOtherStatsTo(out);
}
protected void writeOtherStatsTo(StreamOutput out) throws IOException {
}
static class Fields {
public static final String COUNT = "count";
public static final String MIN = "min";

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueType;
@ -36,18 +37,19 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import java.io.IOException;
public class StatsAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.Numeric, StatsAggregationBuilder> {
public static final String NAME = InternalStats.TYPE.name();
public static final String NAME = "stats";
private final static Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
public StatsAggregationBuilder(String name) {
super(name, InternalStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
super(name, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
}
/**
* Read from a stream.
*/
public StatsAggregationBuilder(StreamInput in) throws IOException {
super(in, InternalStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
super(in, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
}
@Override

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueType;
@ -38,20 +39,21 @@ import java.util.Objects;
public class ExtendedStatsAggregationBuilder
extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.Numeric, ExtendedStatsAggregationBuilder> {
public static final String NAME = InternalExtendedStats.TYPE.name();
public static final String NAME = "extended_stats";
public final static Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private double sigma = 2.0;
public ExtendedStatsAggregationBuilder(String name) {
super(name, InternalExtendedStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
super(name, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
}
/**
* Read from a stream.
*/
public ExtendedStatsAggregationBuilder(StreamInput in) throws IOException {
super(in, InternalExtendedStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
super(in, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
sigma = in.readDouble();
}

View File

@ -18,12 +18,10 @@
*/
package org.elasticsearch.search.aggregations.metrics.stats.extended;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.stats.InternalStats;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -32,26 +30,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
*
*/
public class InternalExtendedStats extends InternalStats implements ExtendedStats {
public final static Type TYPE = new Type("extended_stats", "estats");
public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalExtendedStats readResult(StreamInput in) throws IOException {
InternalExtendedStats result = new InternalExtendedStats();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
enum Metrics {
count, sum, min, max, avg, sum_of_squares, variance, std_deviation, std_upper, std_lower;
@ -61,10 +40,8 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat
}
}
private double sumOfSqrs;
private double sigma;
protected InternalExtendedStats() {} // for serialization
private final double sumOfSqrs;
private final double sigma;
public InternalExtendedStats(String name, long count, double sum, double min, double max, double sumOfSqrs, double sigma,
DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
@ -73,9 +50,24 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat
this.sigma = sigma;
}
/**
* Read from a stream.
*/
public InternalExtendedStats(StreamInput in) throws IOException {
super(in);
sumOfSqrs = in.readDouble();
sigma = in.readDouble();
}
@Override
public Type type() {
return TYPE;
protected void writeOtherStatsTo(StreamOutput out) throws IOException {
out.writeDouble(sumOfSqrs);
out.writeDouble(sigma);
}
@Override
public String getWriteableName() {
return ExtendedStatsAggregationBuilder.NAME;
}
@Override
@ -157,19 +149,6 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat
format, pipelineAggregators(), getMetaData());
}
@Override
public void readOtherStatsFrom(StreamInput in) throws IOException {
sumOfSqrs = in.readDouble();
sigma = in.readDouble();
}
@Override
protected void writeOtherStatsTo(StreamOutput out) throws IOException {
out.writeDouble(sumOfSqrs);
out.writeDouble(sigma);
}
static class Fields {
public static final String SUM_OF_SQRS = "sum_of_squares";
public static final String SUM_OF_SQRS_AS_STRING = "sum_of_squares_as_string";

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -33,7 +34,8 @@ import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import java.io.IOException;
import java.util.Map;
public abstract class PipelineAggregator implements Streamable {
public abstract class PipelineAggregator implements Streamable, NamedWriteable {
// NORELEASE remove Streamable
/**
* Parse the {@link PipelineAggregationBuilder} from a {@link QueryParseContext}.
@ -73,6 +75,49 @@ public abstract class PipelineAggregator implements Streamable {
this.metaData = metaData;
}
/**
* Read from a stream.
*/
protected PipelineAggregator(StreamInput in) throws IOException {
name = in.readString();
bucketsPaths = in.readStringArray();
metaData = in.readMap();
}
@Override
public final void readFrom(StreamInput in) throws IOException {
try {
getWriteableName(); // Throws UnsupportedOperationException if this aggregation should be read using old style Streams
assert false : "Used reading constructor instead";
} catch (UnsupportedOperationException e) {
// OK
}
name = in.readString();
bucketsPaths = in.readStringArray();
metaData = in.readMap();
doReadFrom(in);
}
protected void doReadFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("Use reading constructor instead"); // NORELEASE remove when we remove Streamable
}
@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name); // NORELEASE remote writing the name - it is automatically handled with writeNamedWriteable
out.writeStringArray(bucketsPaths);
out.writeMap(metaData);
doWriteTo(out);
}
protected abstract void doWriteTo(StreamOutput out) throws IOException;
@Override
public String getWriteableName() {
// NORELEASE remove me when all InternalAggregations override it
throw new UnsupportedOperationException("Override on every class");
}
public String name() {
return name;
}
@ -85,27 +130,10 @@ public abstract class PipelineAggregator implements Streamable {
return metaData;
}
public abstract Type type();
public Type type() {
// NORELEASE remove this method
throw new UnsupportedOperationException(getClass().getName() + " used type but should Use getWriteableName instead");
}
public abstract InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext);
@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeStringArray(bucketsPaths);
out.writeMap(metaData);
doWriteTo(out);
}
protected abstract void doWriteTo(StreamOutput out) throws IOException;
@Override
public final void readFrom(StreamInput in) throws IOException {
name = in.readString();
bucketsPaths = in.readStringArray();
metaData = in.readMap();
doReadFrom(in);
}
protected abstract void doReadFrom(StreamInput in) throws IOException;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
@ -27,6 +28,7 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -43,6 +45,13 @@ public abstract class SiblingPipelineAggregator extends PipelineAggregator {
super(name, bucketsPaths, metaData);
}
/**
* Read from a stream.
*/
protected SiblingPipelineAggregator(StreamInput in) throws IOException {
super(in);
}
@SuppressWarnings("unchecked")
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {

View File

@ -59,6 +59,35 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg
this.format = format;
}
/**
* Read from a stream.
*/
protected BucketMetricsPipelineAggregator(StreamInput in) throws IOException {
super(in);
format = in.readNamedWriteable(DocValueFormat.class);
gapPolicy = GapPolicy.readFrom(in);
}
@Override
public final void doReadFrom(StreamInput in) throws IOException {
format = in.readNamedWriteable(DocValueFormat.class);
gapPolicy = GapPolicy.readFrom(in);
innerReadFrom(in);
}
protected void innerReadFrom(StreamInput in) throws IOException {
}
@Override
public final void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
gapPolicy.writeTo(out);
innerWriteTo(out);
}
protected void innerWriteTo(StreamOutput out) throws IOException {
}
@Override
public final InternalAggregation doReduce(Aggregations aggregations, ReduceContext context) {
preCollection();
@ -109,25 +138,4 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg
* for this bucket
*/
protected abstract void collectBucketValue(String bucketKey, Double bucketValue);
@Override
public final void doReadFrom(StreamInput in) throws IOException {
format = in.readNamedWriteable(DocValueFormat.class);
gapPolicy = GapPolicy.readFrom(in);
innerReadFrom(in);
}
protected void innerReadFrom(StreamInput in) throws IOException {
}
@Override
public final void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
gapPolicy.writeTo(out);
innerWriteTo(out);
}
protected void innerWriteTo(StreamOutput out) throws IOException {
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.stats.InternalStats;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -31,34 +30,21 @@ import java.util.List;
import java.util.Map;
public class InternalStatsBucket extends InternalStats implements StatsBucket {
public final static Type TYPE = new Type("stats_bucket");
public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalStatsBucket readResult(StreamInput in) throws IOException {
InternalStatsBucket result = new InternalStatsBucket();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
public InternalStatsBucket(String name, long count, double sum, double min, double max, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, count, sum, min, max, formatter, pipelineAggregators, metaData);
}
InternalStatsBucket() {
// For serialization
/**
* Read from a stream.
*/
public InternalStatsBucket(StreamInput in) throws IOException {
super(in);
}
@Override
public Type type() {
return TYPE;
public String getWriteableName() {
return StatsBucketPipelineAggregationBuilder.NAME;
}
@Override

View File

@ -35,11 +35,11 @@ import java.util.List;
import java.util.Map;
public class StatsBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder<StatsBucketPipelineAggregationBuilder> {
public static final String NAME = StatsBucketPipelineAggregator.TYPE.name();
public static final String NAME = "stats_bucket";
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
public StatsBucketPipelineAggregationBuilder(String name, String bucketsPath) {
super(name, StatsBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
super(name, NAME, new String[] { bucketsPath });
}
/**
@ -47,7 +47,7 @@ public class StatsBucketPipelineAggregationBuilder extends BucketMetricsPipeline
*/
public StatsBucketPipelineAggregationBuilder(StreamInput in)
throws IOException {
super(in, StatsBucketPipelineAggregator.TYPE.name());
super(in, NAME);
}
@Override

View File

@ -22,10 +22,8 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator;
import java.io.IOException;
@ -33,23 +31,6 @@ import java.util.List;
import java.util.Map;
public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregator {
public final static Type TYPE = new Type("stats_bucket");
public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
@Override
public StatsBucketPipelineAggregator readResult(StreamInput in) throws IOException {
StatsBucketPipelineAggregator result = new StatsBucketPipelineAggregator();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
InternalStatsBucket.registerStreams();
}
private double sum = 0;
private long count = 0;
private double min = Double.POSITIVE_INFINITY;
@ -60,13 +41,13 @@ public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregat
super(name, bucketsPaths, gapPolicy, formatter, metaData);
}
StatsBucketPipelineAggregator() {
// For serialization
public StatsBucketPipelineAggregator(StreamInput in) throws IOException {
super(in);
}
@Override
public Type type() {
return TYPE;
public String getWriteableName() {
return StatsBucketPipelineAggregationBuilder.NAME;
}
@Override
@ -89,5 +70,4 @@ public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregat
protected InternalAggregation buildAggregation(List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
return new InternalStatsBucket(name(), count, sum, min, max, format, pipelineAggregators, metadata);
}
}

View File

@ -36,20 +36,20 @@ import java.util.Objects;
public class ExtendedStatsBucketPipelineAggregationBuilder
extends BucketMetricsPipelineAggregationBuilder<ExtendedStatsBucketPipelineAggregationBuilder> {
public static final String NAME = ExtendedStatsBucketPipelineAggregator.TYPE.name();
public static final String NAME = "extended_stats_bucket";
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private double sigma = 2.0;
public ExtendedStatsBucketPipelineAggregationBuilder(String name, String bucketsPath) {
super(name, ExtendedStatsBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
super(name, NAME, new String[] { bucketsPath });
}
/**
* Read from a stream.
*/
public ExtendedStatsBucketPipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, ExtendedStatsBucketPipelineAggregator.TYPE.name());
super(in, NAME);
sigma = in.readDouble();
}

View File

@ -23,10 +23,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator;
import java.io.IOException;
@ -34,29 +32,12 @@ import java.util.List;
import java.util.Map;
public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipelineAggregator {
public final static Type TYPE = new Type("extended_stats_bucket");
public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
@Override
public ExtendedStatsBucketPipelineAggregator readResult(StreamInput in) throws IOException {
ExtendedStatsBucketPipelineAggregator result = new ExtendedStatsBucketPipelineAggregator();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
InternalExtendedStatsBucket.registerStreams();
}
private final double sigma;
private double sum = 0;
private long count = 0;
private double min = Double.POSITIVE_INFINITY;
private double max = Double.NEGATIVE_INFINITY;
private double sumOfSqrs = 1;
private double sigma;
protected ExtendedStatsBucketPipelineAggregator(String name, String[] bucketsPaths, double sigma, GapPolicy gapPolicy,
DocValueFormat formatter, Map<String, Object> metaData) {
@ -64,13 +45,22 @@ public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipeline
this.sigma = sigma;
}
ExtendedStatsBucketPipelineAggregator() {
// For Serialization
/**
* Read from a stream.
*/
public ExtendedStatsBucketPipelineAggregator(StreamInput in) throws IOException {
super(in);
sigma = in.readDouble();
}
@Override
public Type type() {
return TYPE;
protected void innerWriteTo(StreamOutput out) throws IOException {
out.writeDouble(sigma);
}
@Override
public String getWriteableName() {
return ExtendedStatsBucketPipelineAggregationBuilder.NAME;
}
@Override
@ -95,14 +85,4 @@ public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipeline
protected InternalAggregation buildAggregation(List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
return new InternalExtendedStatsBucket(name(), count, sum, min, max, sumOfSqrs, sigma, format, pipelineAggregators, metadata);
}
@Override
protected void innerReadFrom(StreamInput in) throws IOException {
sigma = in.readDouble();
}
@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
out.writeDouble(sigma);
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.exten
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.stats.extended.InternalExtendedStats;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -31,35 +30,22 @@ import java.util.List;
import java.util.Map;
public class InternalExtendedStatsBucket extends InternalExtendedStats implements ExtendedStatsBucket {
public final static Type TYPE = new Type("extended_stats_bucket");
public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalExtendedStatsBucket readResult(StreamInput in) throws IOException {
InternalExtendedStatsBucket result = new InternalExtendedStatsBucket();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
InternalExtendedStatsBucket(String name, long count, double sum, double min, double max, double sumOfSqrs, double sigma,
DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
super(name, count, sum, min, max, sumOfSqrs, sigma, formatter, pipelineAggregators, metaData);
}
InternalExtendedStatsBucket() {
// for serialization
/**
* Read from a stream.
*/
public InternalExtendedStatsBucket(StreamInput in) throws IOException {
super(in);
}
@Override
public Type type() {
return TYPE;
public String getWriteableName() {
return ExtendedStatsBucketPipelineAggregationBuilder.NAME;
}
@Override

View File

@ -224,9 +224,14 @@ public class QuerySearchResult extends QuerySearchResultProvider {
int size = in.readVInt();
List<SiblingPipelineAggregator> pipelineAggregators = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
BytesReference type = in.readBytesReference();
PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
pipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator);
// NORELEASE temporary hack to support old style streams and new style NamedWriteable at the same time
if (in.readBoolean()) {
pipelineAggregators.add((SiblingPipelineAggregator) in.readNamedWriteable(PipelineAggregator.class));
} else {
BytesReference type = in.readBytesReference();
PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
pipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator);
}
}
this.pipelineAggregators = pipelineAggregators;
}
@ -273,8 +278,16 @@ public class QuerySearchResult extends QuerySearchResultProvider {
out.writeBoolean(true);
out.writeVInt(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
out.writeBytesReference(pipelineAggregator.type().stream());
pipelineAggregator.writeTo(out);
// NORELEASE temporary hack to support old style streams and new style NamedWriteable
try {
pipelineAggregator.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams.
out.writeBoolean(true);
out.writeNamedWriteable(pipelineAggregator);
} catch (UnsupportedOperationException e) {
out.writeBoolean(false);
out.writeBytesReference(pipelineAggregator.type().stream());
pipelineAggregator.writeTo(out);
}
}
}
if (suggest == null) {

View File

@ -22,7 +22,6 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extended.ExtendedStatsBucketPipelineAggregationBuilder;
import static org.hamcrest.Matchers.equalTo;
@ -51,7 +50,7 @@ public class ExtendedStatsBucketTests extends AbstractBucketMetricsTestCase<Exte
parser.nextToken(); // skip object start
ExtendedStatsBucketPipelineAggregationBuilder builder = (ExtendedStatsBucketPipelineAggregationBuilder) aggParsers
.pipelineParser(ExtendedStatsBucketPipelineAggregator.TYPE.name(), parseFieldMatcher)
.pipelineParser(ExtendedStatsBucketPipelineAggregationBuilder.NAME, parseFieldMatcher)
.parse("test", parseContext);
assertThat(builder.sigma(), equalTo(5.0));