Migrate value_count, percentiles, and percentile_ranks aggregations to NamedWriteable

These are the first aggregations with multiple `InternalAggregation`s
backing the same `AggregationBuilder`. This required a change in the
register method's signature.
This commit is contained in:
Nik Everett 2016-06-30 15:54:09 -04:00
parent f30a70c51f
commit de71a9abb3
11 changed files with 214 additions and 250 deletions

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ParseFieldRegistry;
@ -93,6 +94,7 @@ import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.Aggregator.Parser;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
@ -259,8 +261,10 @@ import org.elasticsearch.search.suggest.Suggester;
import org.elasticsearch.search.suggest.Suggesters;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
/**
*
@ -417,23 +421,56 @@ public class SearchModule extends AbstractModule {
return movingAverageModelParserRegistry;
}
/**
* Register an aggregation.
*/
public void registerAggregation(AggregationSpec spec) {
if (false == transportClient) {
namedWriteableRegistry.register(AggregationBuilder.class, spec.aggregationName.getPreferredName(), spec.builderReader);
aggregationParserRegistry.register(spec.aggregationParser, spec.aggregationName);
}
for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> t : spec.internalReaders.entrySet()) {
String writeableName = t.getKey();
Writeable.Reader<? extends InternalAggregation> internalReader = t.getValue();
namedWriteableRegistry.register(InternalAggregation.class, writeableName, internalReader);
}
}
public static class AggregationSpec {
private final Map<String, Writeable.Reader<? extends InternalAggregation>> internalReaders = new TreeMap<>();
private final Writeable.Reader<? extends AggregationBuilder> builderReader;
private final Aggregator.Parser aggregationParser;
private final ParseField aggregationName;
/**
* Register an aggregation.
*
* @param builderReader reads the {@link AggregationBuilder} from a stream
* @param internalReader reads the {@link InternalAggregation} from a stream
* @param aggregationParser reads the aggregation builder from XContent
* @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the reader
* is registered under.
* @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the
* reader is registered under.
*/
public void registerAggregation(Writeable.Reader<? extends AggregationBuilder> builderReader,
Writeable.Reader<? extends InternalAggregation> internalReader, Aggregator.Parser aggregationParser,
ParseField aggregationName) {
if (false == transportClient) {
namedWriteableRegistry.register(AggregationBuilder.class, aggregationName.getPreferredName(), builderReader);
aggregationParserRegistry.register(aggregationParser, aggregationName);
public AggregationSpec(Reader<? extends AggregationBuilder> builderReader, Parser aggregationParser, ParseField aggregationName) {
this.builderReader = builderReader;
this.aggregationParser = aggregationParser;
this.aggregationName = aggregationName;
}
/**
* Add a reader for the shard level results of the aggregation with {@linkplain aggregationName}'s
* {@link ParseField#getPreferredName()} as the {@link NamedWriteable#getWriteableName()}.
*/
public AggregationSpec addResultReader(Writeable.Reader<? extends InternalAggregation> resultReader) {
return addResultReader(aggregationName.getPreferredName(), resultReader);
}
/**
* Add a reader for the shard level results of the aggregation.
*/
public AggregationSpec addResultReader(String writeableName, Writeable.Reader<? extends InternalAggregation> resultReader) {
internalReaders.put(writeableName, resultReader);
return this;
}
namedWriteableRegistry.register(InternalAggregation.class, aggregationName.getPreferredName(), internalReader);
}
public void registerAggregation(Writeable.Reader<? extends AggregationBuilder> builderReader, Aggregator.Parser aggregationParser,
@ -488,19 +525,28 @@ public class SearchModule extends AbstractModule {
}
private void registerBuiltinAggregations() {
registerAggregation(AvgAggregationBuilder::new, InternalAvg::new, new AvgParser(), AvgAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(SumAggregationBuilder::new, InternalSum::new, new SumParser(), SumAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(MinAggregationBuilder::new, InternalMin::new, new MinParser(), MinAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(MaxAggregationBuilder::new, InternalMax::new, new MaxParser(), MaxAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(StatsAggregationBuilder::new, InternalStats::new, new StatsParser(),
StatsAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(ExtendedStatsAggregationBuilder::new, InternalExtendedStats::new, new ExtendedStatsParser(),
ExtendedStatsAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(ValueCountAggregationBuilder::new, new ValueCountParser(), ValueCountAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(PercentilesAggregationBuilder::new, new PercentilesParser(),
PercentilesAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(PercentileRanksAggregationBuilder::new, new PercentileRanksParser(),
PercentileRanksAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(new AggregationSpec(AvgAggregationBuilder::new, new AvgParser(), AvgAggregationBuilder.AGGREGATION_NAME_FIELD)
.addResultReader(InternalAvg::new));
registerAggregation(new AggregationSpec(SumAggregationBuilder::new, new SumParser(), SumAggregationBuilder.AGGREGATION_NAME_FIELD)
.addResultReader(InternalSum::new));
registerAggregation(new AggregationSpec(MinAggregationBuilder::new, new MinParser(), MinAggregationBuilder.AGGREGATION_NAME_FIELD)
.addResultReader(InternalMin::new));
registerAggregation(new AggregationSpec(MaxAggregationBuilder::new, new MaxParser(), MaxAggregationBuilder.AGGREGATION_NAME_FIELD)
.addResultReader(InternalMax::new));
registerAggregation(new AggregationSpec(StatsAggregationBuilder::new, new StatsParser(),
StatsAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalStats::new));
registerAggregation(new AggregationSpec(ExtendedStatsAggregationBuilder::new, new ExtendedStatsParser(),
ExtendedStatsAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalExtendedStats::new));
registerAggregation(new AggregationSpec(ValueCountAggregationBuilder::new, new ValueCountParser(),
ValueCountAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalValueCount::new));
registerAggregation(new AggregationSpec(PercentilesAggregationBuilder::new, new PercentilesParser(),
PercentilesAggregationBuilder.AGGREGATION_NAME_FIELD)
.addResultReader(InternalTDigestPercentiles.NAME, InternalTDigestPercentiles::new)
.addResultReader(InternalHDRPercentiles.NAME, InternalHDRPercentiles::new));
registerAggregation(new AggregationSpec(PercentileRanksAggregationBuilder::new, new PercentileRanksParser(),
PercentileRanksAggregationBuilder.AGGREGATION_NAME_FIELD)
.addResultReader(InternalTDigestPercentileRanks.NAME, InternalTDigestPercentileRanks::new)
.addResultReader(InternalHDRPercentileRanks.NAME, InternalHDRPercentileRanks::new));
registerAggregation(CardinalityAggregationBuilder::new, new CardinalityParser(),
CardinalityAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(GlobalAggregationBuilder::new, GlobalAggregationBuilder::parse,
@ -721,11 +767,6 @@ public class SearchModule extends AbstractModule {
static {
// calcs
InternalValueCount.registerStreams();
InternalTDigestPercentiles.registerStreams();
InternalTDigestPercentileRanks.registerStreams();
InternalHDRPercentiles.registerStreams();
InternalHDRPercentileRanks.registerStreams();
InternalCardinality.registerStreams();
InternalScriptedMetric.registerStreams();
InternalGeoCentroid.registerStreams();

View File

@ -25,8 +25,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.metrics.percentiles.hdr.HDRPercentileRanksAggregatorFactory;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.InternalTDigestPercentileRanks;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.TDigestPercentileRanksAggregatorFactory;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueType;
@ -42,7 +42,8 @@ import java.util.Arrays;
import java.util.Objects;
public class PercentileRanksAggregationBuilder extends LeafOnly<ValuesSource.Numeric, PercentileRanksAggregationBuilder> {
public static final String NAME = InternalTDigestPercentileRanks.TYPE.name();
public static final String NAME = PercentileRanks.TYPE_NAME;
public static final Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private double[] values;
@ -52,14 +53,14 @@ public class PercentileRanksAggregationBuilder extends LeafOnly<ValuesSource.Num
private boolean keyed = true;
public PercentileRanksAggregationBuilder(String name) {
super(name, InternalTDigestPercentileRanks.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
super(name, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
}
/**
* Read from a stream.
*/
public PercentileRanksAggregationBuilder(StreamInput in) throws IOException {
super(in, InternalTDigestPercentileRanks.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
super(in, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
values = in.readDoubleArray();
keyed = in.readBoolean();
numberOfSignificantValueDigits = in.readVInt();

View File

@ -25,8 +25,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.metrics.percentiles.hdr.HDRPercentilesAggregatorFactory;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.InternalTDigestPercentiles;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.TDigestPercentilesAggregatorFactory;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueType;
@ -42,7 +42,8 @@ import java.util.Arrays;
import java.util.Objects;
public class PercentilesAggregationBuilder extends LeafOnly<ValuesSource.Numeric, PercentilesAggregationBuilder> {
public static final String NAME = InternalTDigestPercentiles.TYPE.name();
public static final String NAME = Percentiles.TYPE_NAME;
public static final Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private double[] percents = PercentilesParser.DEFAULT_PERCENTS;
@ -52,14 +53,14 @@ public class PercentilesAggregationBuilder extends LeafOnly<ValuesSource.Numeric
private boolean keyed = true;
public PercentilesAggregationBuilder(String name) {
super(name, InternalTDigestPercentiles.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
super(name, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
}
/**
* Read from a stream.
*/
public PercentilesAggregationBuilder(StreamInput in) throws IOException {
super(in, InternalTDigestPercentiles.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
super(in, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
percents = in.readDoubleArray();
keyed = in.readBoolean();
numberOfSignificantValueDigits = in.readVInt();

View File

@ -36,11 +36,9 @@ import java.util.zip.DataFormatException;
abstract class AbstractInternalHDRPercentiles extends InternalNumericMetricsAggregation.MultiValue {
protected double[] keys;
protected DoubleHistogram state;
private boolean keyed;
AbstractInternalHDRPercentiles() {} // for serialization
protected final double[] keys;
protected final DoubleHistogram state;
private final boolean keyed;
public AbstractInternalHDRPercentiles(String name, double[] keys, DoubleHistogram state, boolean keyed, DocValueFormat format,
List<PipelineAggregator> pipelineAggregators,
@ -52,6 +50,38 @@ abstract class AbstractInternalHDRPercentiles extends InternalNumericMetricsAggr
this.format = format;
}
/**
* Read from a stream.
*/
protected AbstractInternalHDRPercentiles(StreamInput in) throws IOException {
super(in);
format = in.readNamedWriteable(DocValueFormat.class);
keys = in.readDoubleArray();
long minBarForHighestToLowestValueRatio = in.readLong();
final int serializedLen = in.readVInt();
byte[] bytes = new byte[serializedLen];
in.readBytes(bytes, 0, serializedLen);
ByteBuffer stateBuffer = ByteBuffer.wrap(bytes);
try {
state = DoubleHistogram.decodeFromCompressedByteBuffer(stateBuffer, minBarForHighestToLowestValueRatio);
} catch (DataFormatException e) {
throw new IOException("Failed to decode DoubleHistogram for aggregation [" + name + "]", e);
}
keyed = in.readBoolean();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeDoubleArray(keys);
out.writeLong(state.getHighestToLowestValueRatio());
ByteBuffer stateBuffer = ByteBuffer.allocate(state.getNeededByteBufferCapacity());
final int serializedLen = state.encodeIntoCompressedByteBuffer(stateBuffer);
out.writeVInt(serializedLen);
out.writeBytes(stateBuffer.array(), 0, serializedLen);
out.writeBoolean(keyed);
}
@Override
public double value(String name) {
return value(Double.parseDouble(name));
@ -80,41 +110,6 @@ abstract class AbstractInternalHDRPercentiles extends InternalNumericMetricsAggr
protected abstract AbstractInternalHDRPercentiles createReduced(String name, double[] keys, DoubleHistogram merged, boolean keyed,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData);
@Override
protected void doReadFrom(StreamInput in) throws IOException {
format = in.readNamedWriteable(DocValueFormat.class);
keys = new double[in.readInt()];
for (int i = 0; i < keys.length; ++i) {
keys[i] = in.readDouble();
}
long minBarForHighestToLowestValueRatio = in.readLong();
final int serializedLen = in.readVInt();
byte[] bytes = new byte[serializedLen];
in.readBytes(bytes, 0, serializedLen);
ByteBuffer stateBuffer = ByteBuffer.wrap(bytes);
try {
state = DoubleHistogram.decodeFromCompressedByteBuffer(stateBuffer, minBarForHighestToLowestValueRatio);
} catch (DataFormatException e) {
throw new IOException("Failed to decode DoubleHistogram for aggregation [" + name + "]", e);
}
keyed = in.readBoolean();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeInt(keys.length);
for (int i = 0 ; i < keys.length; ++i) {
out.writeDouble(keys[i]);
}
out.writeLong(state.getHighestToLowestValueRatio());
ByteBuffer stateBuffer = ByteBuffer.allocate(state.getNeededByteBufferCapacity());
final int serializedLen = state.encodeIntoCompressedByteBuffer(stateBuffer);
out.writeVInt(serializedLen);
out.writeBytes(stateBuffer.array(), 0, serializedLen);
out.writeBoolean(keyed);
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
if (keyed) {

View File

@ -21,7 +21,6 @@ package org.elasticsearch.search.aggregations.metrics.percentiles.hdr;
import org.HdrHistogram.DoubleHistogram;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.metrics.percentiles.InternalPercentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.PercentileRanks;
@ -32,34 +31,26 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*
*/
public class InternalHDRPercentileRanks extends AbstractInternalHDRPercentiles implements PercentileRanks {
public static final Type TYPE = new Type(PercentileRanks.TYPE_NAME, "hdr_percentile_ranks");
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalHDRPercentileRanks readResult(StreamInput in) throws IOException {
InternalHDRPercentileRanks result = new InternalHDRPercentileRanks();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
InternalHDRPercentileRanks() {
} // for serialization
public static final String NAME = "hdr_percentile_ranks";
public InternalHDRPercentileRanks(String name, double[] cdfValues, DoubleHistogram state, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, cdfValues, state, keyed, formatter, pipelineAggregators, metaData);
}
/**
* Read from a stream.
*/
public InternalHDRPercentileRanks(StreamInput in) throws IOException {
super(in);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public Iterator<Percentile> iterator() {
return new Iter(keys, state);
@ -86,11 +77,6 @@ public class InternalHDRPercentileRanks extends AbstractInternalHDRPercentiles i
return new InternalHDRPercentileRanks(name, keys, merged, keyed, format, pipelineAggregators, metaData);
}
@Override
public Type type() {
return TYPE;
}
static double percentileRank(DoubleHistogram state, double value) {
if (state.getTotalCount() == 0) {
return Double.NaN;

View File

@ -21,7 +21,6 @@ package org.elasticsearch.search.aggregations.metrics.percentiles.hdr;
import org.HdrHistogram.DoubleHistogram;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.metrics.percentiles.InternalPercentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles;
@ -32,34 +31,26 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*
*/
public class InternalHDRPercentiles extends AbstractInternalHDRPercentiles implements Percentiles {
public static final Type TYPE = new Type(Percentiles.TYPE_NAME, "hdr_percentiles");
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalHDRPercentiles readResult(StreamInput in) throws IOException {
InternalHDRPercentiles result = new InternalHDRPercentiles();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
InternalHDRPercentiles() {
} // for serialization
public static final String NAME = "hdr_percentiles";
public InternalHDRPercentiles(String name, double[] percents, DoubleHistogram state, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, percents, state, keyed, formatter, pipelineAggregators, metaData);
}
/**
* Read from a stream.
*/
public InternalHDRPercentiles(StreamInput in) throws IOException {
super(in);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public Iterator<Percentile> iterator() {
return new Iter(keys, state);
@ -89,11 +80,6 @@ public class InternalHDRPercentiles extends AbstractInternalHDRPercentiles imple
return new InternalHDRPercentiles(name, keys, merged, keyed, format, pipelineAggregators, metaData);
}
@Override
public Type type() {
return TYPE;
}
public static class Iter implements Iterator<Percentile> {
private final double[] percents;

View File

@ -33,11 +33,9 @@ import java.util.Map;
abstract class AbstractInternalTDigestPercentiles extends InternalNumericMetricsAggregation.MultiValue {
protected double[] keys;
protected TDigestState state;
private boolean keyed;
AbstractInternalTDigestPercentiles() {} // for serialization
protected final double[] keys;
protected final TDigestState state;
private final boolean keyed;
public AbstractInternalTDigestPercentiles(String name, double[] keys, TDigestState state, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators,
@ -49,6 +47,25 @@ abstract class AbstractInternalTDigestPercentiles extends InternalNumericMetrics
this.format = formatter;
}
/**
* Read from a stream.
*/
protected AbstractInternalTDigestPercentiles(StreamInput in) throws IOException {
super(in);
format = in.readNamedWriteable(DocValueFormat.class);
keys = in.readDoubleArray();
state = TDigestState.read(in);
keyed = in.readBoolean();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeDoubleArray(keys);
TDigestState.write(state, out);
out.writeBoolean(keyed);
}
@Override
public double value(String name) {
return value(Double.parseDouble(name));
@ -76,28 +93,6 @@ abstract class AbstractInternalTDigestPercentiles extends InternalNumericMetrics
protected abstract AbstractInternalTDigestPercentiles createReduced(String name, double[] keys, TDigestState merged, boolean keyed,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData);
@Override
protected void doReadFrom(StreamInput in) throws IOException {
format = in.readNamedWriteable(DocValueFormat.class);
keys = new double[in.readInt()];
for (int i = 0; i < keys.length; ++i) {
keys[i] = in.readDouble();
}
state = TDigestState.read(in);
keyed = in.readBoolean();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeInt(keys.length);
for (int i = 0 ; i < keys.length; ++i) {
out.writeDouble(keys[i]);
}
TDigestState.write(state, out);
out.writeBoolean(keyed);
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
if (keyed) {

View File

@ -20,7 +20,6 @@ package org.elasticsearch.search.aggregations.metrics.percentiles.tdigest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.metrics.percentiles.InternalPercentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.PercentileRanks;
@ -31,33 +30,26 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*
*/
public class InternalTDigestPercentileRanks extends AbstractInternalTDigestPercentiles implements PercentileRanks {
public static final Type TYPE = new Type(PercentileRanks.TYPE_NAME, "t_digest_percentile_ranks");
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalTDigestPercentileRanks readResult(StreamInput in) throws IOException {
InternalTDigestPercentileRanks result = new InternalTDigestPercentileRanks();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
InternalTDigestPercentileRanks() {} // for serialization
public static final String NAME = "tdigest_percentile_ranks";
public InternalTDigestPercentileRanks(String name, double[] cdfValues, TDigestState state, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, cdfValues, state, keyed, formatter, pipelineAggregators, metaData);
}
/**
* Read from a stream.
*/
public InternalTDigestPercentileRanks(StreamInput in) throws IOException {
super(in);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public Iterator<Percentile> iterator() {
return new Iter(keys, state);
@ -84,11 +76,6 @@ public class InternalTDigestPercentileRanks extends AbstractInternalTDigestPerce
return new InternalTDigestPercentileRanks(name, keys, merged, keyed, format, pipelineAggregators, metaData);
}
@Override
public Type type() {
return TYPE;
}
static double percentileRank(TDigestState state, double value) {
double percentileRank = state.cdf(value);
if (percentileRank < 0) {

View File

@ -20,7 +20,6 @@ package org.elasticsearch.search.aggregations.metrics.percentiles.tdigest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.metrics.percentiles.InternalPercentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles;
@ -31,34 +30,26 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*
*/
public class InternalTDigestPercentiles extends AbstractInternalTDigestPercentiles implements Percentiles {
public static final Type TYPE = new Type(Percentiles.TYPE_NAME, "t_digest_percentiles");
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalTDigestPercentiles readResult(StreamInput in) throws IOException {
InternalTDigestPercentiles result = new InternalTDigestPercentiles();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
InternalTDigestPercentiles() {
} // for serialization
public static final String NAME = "tdigest_percentiles";
public InternalTDigestPercentiles(String name, double[] percents, TDigestState state, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, percents, state, keyed, formatter, pipelineAggregators, metaData);
}
/**
* Read from a stream.
*/
public InternalTDigestPercentiles(StreamInput in) throws IOException {
super(in);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public Iterator<Percentile> iterator() {
return new Iter(keys, state);
@ -85,11 +76,6 @@ public class InternalTDigestPercentiles extends AbstractInternalTDigestPercentil
return new InternalTDigestPercentiles(name, keys, merged, keyed, format, pipelineAggregators, metaData);
}
@Override
public Type type() {
return TYPE;
}
public static class Iter implements Iterator<Percentile> {
private final double[] percents;

View File

@ -21,7 +21,6 @@ package org.elasticsearch.search.aggregations.metrics.valuecount;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -34,25 +33,7 @@ import java.util.Map;
* An internal implementation of {@link ValueCount}.
*/
public class InternalValueCount extends InternalNumericMetricsAggregation.SingleValue implements ValueCount {
public static final Type TYPE = new Type("value_count", "vcount");
private static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalValueCount readResult(StreamInput in) throws IOException {
InternalValueCount count = new InternalValueCount();
count.readFrom(in);
return count;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
private long value;
InternalValueCount() {} // for serialization
private final long value;
public InternalValueCount(String name, long value, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
@ -60,6 +41,24 @@ public class InternalValueCount extends InternalNumericMetricsAggregation.Single
this.value = value;
}
/**
* Read from a stream.
*/
public InternalValueCount(StreamInput in) throws IOException {
super(in);
value = in.readVLong();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeVLong(value);
}
@Override
public String getWriteableName() {
return ValueCountAggregationBuilder.NAME;
}
@Override
public long getValue() {
return value;
@ -70,11 +69,6 @@ public class InternalValueCount extends InternalNumericMetricsAggregation.Single
return value;
}
@Override
public Type type() {
return TYPE;
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long valueCount = 0;
@ -84,16 +78,6 @@ public class InternalValueCount extends InternalNumericMetricsAggregation.Single
return new InternalValueCount(name, valueCount, pipelineAggregators(), getMetaData());
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
value = in.readVLong();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeVLong(value);
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(CommonFields.VALUE, value);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
@ -35,18 +36,19 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import java.io.IOException;
public class ValueCountAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource, ValueCountAggregationBuilder> {
public static final String NAME = InternalValueCount.TYPE.name();
public static final String NAME = "value_count";
public static final Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
public ValueCountAggregationBuilder(String name, ValueType targetValueType) {
super(name, InternalValueCount.TYPE, ValuesSourceType.ANY, targetValueType);
super(name, TYPE, ValuesSourceType.ANY, targetValueType);
}
/**
* Read from a stream.
*/
public ValueCountAggregationBuilder(StreamInput in) throws IOException {
super(in, InternalValueCount.TYPE, ValuesSourceType.ANY);
super(in, TYPE, ValuesSourceType.ANY);
}
@Override