Migrate top_hits, histogram, and ip_range aggregations to NamedWriteable

This is just another step towards removing AggregationStreams
in favor of NamedWriteable.
This commit is contained in:
Nik Everett 2016-07-11 17:57:03 -04:00
parent f2978f41b9
commit d14e06ce51
9 changed files with 222 additions and 274 deletions

View File

@ -536,8 +536,11 @@ public class SearchModule extends AbstractModule {
RangeAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalRange::new));
registerAggregation(new AggregationSpec(DateRangeAggregationBuilder::new, new DateRangeParser(),
DateRangeAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalDateRange::new));
registerAggregation(IpRangeAggregationBuilder::new, new IpRangeParser(), IpRangeAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(HistogramAggregationBuilder::new, new HistogramParser(), HistogramAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(
new AggregationSpec(IpRangeAggregationBuilder::new, new IpRangeParser(), IpRangeAggregationBuilder.AGGREGATION_NAME_FIELD)
.addResultReader(InternalBinaryRange::new));
registerAggregation(new AggregationSpec(HistogramAggregationBuilder::new, new HistogramParser(),
HistogramAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalHistogram::new));
registerAggregation(DateHistogramAggregationBuilder::new, new DateHistogramParser(),
DateHistogramAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(new AggregationSpec(GeoDistanceAggregationBuilder::new, new GeoDistanceParser(),
@ -548,8 +551,8 @@ public class SearchModule extends AbstractModule {
NestedAggregationBuilder.AGGREGATION_FIELD_NAME).addResultReader(InternalNested::new));
registerAggregation(new AggregationSpec(ReverseNestedAggregationBuilder::new, ReverseNestedAggregationBuilder::parse,
ReverseNestedAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalReverseNested::new));
registerAggregation(TopHitsAggregationBuilder::new, TopHitsAggregationBuilder::parse,
TopHitsAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(new AggregationSpec(TopHitsAggregationBuilder::new, TopHitsAggregationBuilder::parse,
TopHitsAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalTopHits::new));
registerAggregation(new AggregationSpec(GeoBoundsAggregationBuilder::new, new GeoBoundsParser(),
GeoBoundsAggregationBuilder.AGGREGATION_NAME_FIED).addResultReader(InternalGeoBounds::new));
registerAggregation(new AggregationSpec(GeoCentroidAggregationBuilder::new, new GeoCentroidParser(),
@ -821,11 +824,6 @@ public class SearchModule extends AbstractModule {
}
static {
// buckets
InternalBinaryRange.registerStream();
InternalHistogram.registerStream();
InternalTopHits.registerStreams();
// Pipeline Aggregations
DerivativePipelineAggregator.registerStreams();
InternalDerivative.registerStreams();

View File

@ -126,7 +126,7 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
private List<PipelineAggregator> pipelineAggregators;
/** Constructs an un initialized addAggregation (used for serialization) **/
protected InternalAggregation() {}
protected InternalAggregation() {} // NORELEASE remove when removing Streamable
/**
* Constructs an get with a given name.

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations.bucket.histogram;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
@ -26,8 +27,10 @@ import org.elasticsearch.search.aggregations.support.ValueType;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
/**
*
* Results of a date_historgram aggregation.
*/
public class InternalDateHistogram {
@ -35,16 +38,18 @@ public class InternalDateHistogram {
static final Type TYPE = new Type("date_histogram", "dhisto");
static class Bucket extends InternalHistogram.Bucket {
Bucket(boolean keyed, DocValueFormat formatter, InternalHistogram.Factory<Bucket> factory) {
super(keyed, formatter, factory);
}
Bucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, DocValueFormat formatter,
InternalHistogram.Factory<Bucket> factory) {
super(key, docCount, keyed, formatter, factory, aggregations);
}
/**
* Read from a stream.
*/
Bucket(StreamInput in, boolean keyed, DocValueFormat formatter, InternalHistogram.Factory<Bucket> factory) throws IOException {
super(in, keyed, formatter, factory);
}
@Override
public String getKeyAsString() {
return format.format(key);
@ -94,8 +99,8 @@ public class InternalDateHistogram {
}
@Override
protected InternalDateHistogram.Bucket createEmptyBucket(boolean keyed, DocValueFormat formatter) {
return new Bucket(keyed, formatter, this);
protected Bucket readBucket(StreamInput in, boolean keyed, DocValueFormat format) throws IOException {
return new Bucket(in, keyed, format, this);
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -46,50 +45,49 @@ import java.util.Map;
* TODO should be renamed to InternalNumericHistogram (see comment on {@link Histogram})?
*/
public class InternalHistogram<B extends InternalHistogram.Bucket> extends InternalMultiBucketAggregation<InternalHistogram<B>, B>
implements
Histogram {
implements Histogram {
public static final Factory<Bucket> HISTOGRAM_FACTORY = new Factory<Bucket>();
static final Type TYPE = new Type("histogram", "histo");
private static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@SuppressWarnings("rawtypes")
@Override
public InternalHistogram readResult(StreamInput in) throws IOException {
InternalHistogram histogram = new InternalHistogram();
histogram.readFrom(in);
return histogram;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
static final Type TYPE = new Type("histogram");
public static class Bucket extends InternalMultiBucketAggregation.InternalBucket implements Histogram.Bucket {
long key;
long docCount;
InternalAggregations aggregations;
final long key;
final long docCount;
final InternalAggregations aggregations;
private final transient boolean keyed;
protected final transient DocValueFormat format;
private Factory<?> factory;
private final Factory<?> factory;
public Bucket(boolean keyed, DocValueFormat formatter, Factory<?> factory) {
this.format = formatter;
public Bucket(long key, long docCount, boolean keyed, DocValueFormat format, Factory<?> factory,
InternalAggregations aggregations) {
this.format = format;
this.keyed = keyed;
this.factory = factory;
}
public Bucket(long key, long docCount, boolean keyed, DocValueFormat formatter, Factory<?> factory,
InternalAggregations aggregations) {
this(keyed, formatter, factory);
this.key = key;
this.docCount = docCount;
this.aggregations = aggregations;
}
/**
* Read from a stream.
*/
public Bucket(StreamInput in, boolean keyed, DocValueFormat format, Factory<?> factory) throws IOException {
this.format = format;
this.keyed = keyed;
this.factory = factory;
key = in.readLong();
docCount = in.readVLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(key);
out.writeVLong(docCount);
aggregations.writeTo(out);
}
protected Factory<?> getFactory() {
return factory;
}
@ -144,20 +142,6 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
return builder;
}
@Override
public void readFrom(StreamInput in) throws IOException {
key = in.readLong();
docCount = in.readVLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(key);
out.writeVLong(docCount);
aggregations.writeTo(out);
}
public DocValueFormat getFormatter() {
return format;
}
@ -244,21 +228,18 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
}
@SuppressWarnings("unchecked")
protected B createEmptyBucket(boolean keyed, DocValueFormat formatter) {
return (B) new Bucket(keyed, formatter, this);
protected B readBucket(StreamInput in, boolean keyed, DocValueFormat format) throws IOException {
return (B) new Bucket(in, keyed, format, this);
}
}
protected List<B> buckets;
private InternalOrder order;
private DocValueFormat format;
private boolean keyed;
private long minDocCount;
private EmptyBucketInfo emptyBucketInfo;
protected Factory<B> factory;
InternalHistogram() {} // for serialization
private final List<B> buckets;
private final InternalOrder order;
private final DocValueFormat format;
private final boolean keyed;
private final long minDocCount;
private final EmptyBucketInfo emptyBucketInfo;
private final Factory<B> factory;
InternalHistogram(String name, List<B> buckets, InternalOrder order, long minDocCount, EmptyBucketInfo emptyBucketInfo,
DocValueFormat formatter, boolean keyed, Factory<B> factory, List<PipelineAggregator> pipelineAggregators,
@ -274,6 +255,53 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
this.factory = factory;
}
/**
* Stream from a stream.
*/
public InternalHistogram(StreamInput in) throws IOException {
super(in);
factory = resolveFactory(in.readString());
order = InternalOrder.Streams.readOrder(in);
minDocCount = in.readVLong();
if (minDocCount == 0) {
emptyBucketInfo = EmptyBucketInfo.readFrom(in);
} else {
emptyBucketInfo = null;
}
format = in.readNamedWriteable(DocValueFormat.class);
keyed = in.readBoolean();
buckets = in.readList(stream -> factory.readBucket(stream, keyed, format));
}
@SuppressWarnings("unchecked")
protected static <B extends InternalHistogram.Bucket> Factory<B> resolveFactory(String factoryType) {
if (factoryType.equals(InternalDateHistogram.TYPE.name())) {
return (Factory<B>) new InternalDateHistogram.Factory();
} else if (factoryType.equals(TYPE.name())) {
return new Factory<>();
} else {
throw new IllegalStateException("Invalid histogram factory type [" + factoryType + "]");
}
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeString(factory.type().name());
InternalOrder.Streams.writeOrder(order, out);
out.writeVLong(minDocCount);
if (minDocCount == 0) {
EmptyBucketInfo.writeTo(emptyBucketInfo, out);
}
out.writeNamedWriteable(format);
out.writeBoolean(keyed);
out.writeList(buckets);
}
@Override
public String getWriteableName() {
return HistogramAggregationBuilder.NAME;
}
@Override
public Type type() {
return TYPE;
@ -465,53 +493,6 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
getMetaData());
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
this.factory = resolveFactory(in.readString());
order = InternalOrder.Streams.readOrder(in);
minDocCount = in.readVLong();
if (minDocCount == 0) {
emptyBucketInfo = EmptyBucketInfo.readFrom(in);
}
format = in.readNamedWriteable(DocValueFormat.class);
keyed = in.readBoolean();
int size = in.readVInt();
List<B> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
B bucket = getFactory().createEmptyBucket(keyed, format);
bucket.readFrom(in);
buckets.add(bucket);
}
this.buckets = buckets;
}
@SuppressWarnings("unchecked")
protected static <B extends InternalHistogram.Bucket> Factory<B> resolveFactory(String factoryType) {
if (factoryType.equals(InternalDateHistogram.TYPE.name())) {
return (Factory<B>) new InternalDateHistogram.Factory();
} else if (factoryType.equals(TYPE.name())) {
return new Factory<>();
} else {
throw new IllegalStateException("Invalid histogram factory type [" + factoryType + "]");
}
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeString(factory.type().name());
InternalOrder.Streams.writeOrder(order, out);
out.writeVLong(minDocCount);
if (minDocCount == 0) {
EmptyBucketInfo.writeTo(emptyBucketInfo, out);
}
out.writeNamedWriteable(format);
out.writeBoolean(keyed);
out.writeVInt(buckets.size());
for (B bucket : buckets) {
bucket.writeTo(out);
}
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
if (keyed) {

View File

@ -19,11 +19,14 @@
package org.elasticsearch.search.aggregations.bucket.range;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyList;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.BytesRef;
@ -318,19 +321,18 @@ public final class BinaryRangeAggregator extends BucketsAggregator {
@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
InternalBinaryRange.Bucket[] buckets = new InternalBinaryRange.Bucket[ranges.length];
for (int i = 0; i < buckets.length; ++i) {
List<InternalBinaryRange.Bucket> buckets = new ArrayList<>(ranges.length);
for (int i = 0; i < ranges.length; ++i) {
long bucketOrd = bucket * ranges.length + i;
buckets[i] = new InternalBinaryRange.Bucket(format, keyed,
buckets.add(new InternalBinaryRange.Bucket(format, keyed,
ranges[i].key, ranges[i].from, ranges[i].to,
bucketDocCount(bucketOrd), bucketAggregations(bucketOrd));
bucketDocCount(bucketOrd), bucketAggregations(bucketOrd)));
}
return new InternalBinaryRange(name, format, keyed, buckets, pipelineAggregators(), metaData());
}
@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalBinaryRange(name, format, keyed, new InternalBinaryRange.Bucket[0], pipelineAggregators(), metaData());
return new InternalBinaryRange(name, format, keyed, emptyList(), pipelineAggregators(), metaData());
}
}

View File

@ -23,50 +23,38 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static java.util.Collections.unmodifiableList;
/** A range aggregation for data that is encoded in doc values using a binary representation. */
public final class InternalBinaryRange
extends InternalMultiBucketAggregation<InternalBinaryRange, InternalBinaryRange.Bucket>
implements Range {
public static final Type TYPE = new Type("binary_range");
private static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalBinaryRange readResult(StreamInput in) throws IOException {
InternalBinaryRange range = new InternalBinaryRange();
range.readFrom(in);
return range;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
public static class Bucket extends InternalMultiBucketAggregation.InternalBucket implements Range.Bucket {
private final transient DocValueFormat format;
private final transient boolean keyed;
private String key;
private BytesRef from, to;
private long docCount;
private InternalAggregations aggregations;
private final String key;
private final BytesRef from, to;
private final long docCount;
private final InternalAggregations aggregations;
public Bucket(DocValueFormat format, boolean keyed, String key, BytesRef from, BytesRef to,
long docCount, InternalAggregations aggregations) {
this(format, keyed);
this.format = format;
this.keyed = keyed;
this.key = key;
this.from = from;
this.to = to;
@ -75,9 +63,37 @@ public final class InternalBinaryRange
}
// for serialization
private Bucket(DocValueFormat format, boolean keyed) {
private Bucket(StreamInput in, DocValueFormat format, boolean keyed) throws IOException {
this.format = format;
this.keyed = keyed;
key = in.readOptionalString();
if (in.readBoolean()) {
from = in.readBytesRef();
} else {
from = null;
}
if (in.readBoolean()) {
to = in.readBytesRef();
} else {
to = null;
}
docCount = in.readLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(key);
out.writeBoolean(from != null);
if (from != null) {
out.writeBytesRef(from);
}
out.writeBoolean(to != null);
if (to != null) {
out.writeBytesRef(to);
}
out.writeLong(docCount);
aggregations.writeTo(out);
}
@Override
@ -130,38 +146,6 @@ public final class InternalBinaryRange
return builder;
}
@Override
public void readFrom(StreamInput in) throws IOException {
key = in.readOptionalString();
if (in.readBoolean()) {
from = in.readBytesRef();
} else {
from = null;
}
if (in.readBoolean()) {
to = in.readBytesRef();
} else {
to = null;
}
docCount = in.readLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(key);
out.writeBoolean(from != null);
if (from != null) {
out.writeBytesRef(from);
}
out.writeBoolean(to != null);
if (to != null) {
out.writeBytesRef(to);
}
out.writeLong(docCount);
aggregations.writeTo(out);
}
@Override
public Object getFrom() {
return getFromAsString();
@ -184,11 +168,11 @@ public final class InternalBinaryRange
}
private DocValueFormat format;
private boolean keyed;
private Bucket[] buckets;
private final DocValueFormat format;
private final boolean keyed;
private final List<Bucket> buckets;
public InternalBinaryRange(String name, DocValueFormat format, boolean keyed, Bucket[] buckets,
public InternalBinaryRange(String name, DocValueFormat format, boolean keyed, List<Bucket> buckets,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
this.format = format;
@ -196,17 +180,37 @@ public final class InternalBinaryRange
this.buckets = buckets;
}
private InternalBinaryRange() {} // for serialization
/**
* Read from a stream.
*/
public InternalBinaryRange(StreamInput in) throws IOException {
super(in);
format = in.readNamedWriteable(DocValueFormat.class);
keyed = in.readBoolean();
buckets = in.readList(stream -> new Bucket(stream, format, keyed));
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeBoolean(keyed);
out.writeList(buckets);
}
@Override
public String getWriteableName() {
return IpRangeAggregationBuilder.NAME;
}
@Override
public List<Range.Bucket> getBuckets() {
return Arrays.asList(buckets);
return unmodifiableList(buckets);
}
@Override
public InternalBinaryRange create(List<Bucket> buckets) {
return new InternalBinaryRange(name, format, keyed, buckets.toArray(new Bucket[0]),
pipelineAggregators(), metaData);
return new InternalBinaryRange(name, format, keyed, buckets, pipelineAggregators(), metaData);
}
@Override
@ -214,34 +218,29 @@ public final class InternalBinaryRange
return new Bucket(format, keyed, prototype.key, prototype.from, prototype.to, prototype.docCount, aggregations);
}
@Override
public Type type() {
return TYPE;
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long[] docCounts = new long[buckets.length];
InternalAggregations[][] aggs = new InternalAggregations[buckets.length][];
long[] docCounts = new long[buckets.size()];
InternalAggregations[][] aggs = new InternalAggregations[buckets.size()][];
for (int i = 0; i < aggs.length; ++i) {
aggs[i] = new InternalAggregations[aggregations.size()];
}
for (int i = 0; i < aggregations.size(); ++i) {
InternalBinaryRange range = (InternalBinaryRange) aggregations.get(i);
if (range.buckets.length != buckets.length) {
throw new IllegalStateException("Expected " + buckets.length + " buckets, but got " + range.buckets.length);
if (range.buckets.size() != buckets.size()) {
throw new IllegalStateException("Expected [" + buckets.size() + "] buckets, but got [" + range.buckets.size() + "]");
}
for (int j = 0; j < buckets.length; ++j) {
Bucket bucket = range.buckets[j];
for (int j = 0; j < buckets.size(); ++j) {
Bucket bucket = range.buckets.get(j);
docCounts[j] += bucket.docCount;
aggs[j][i] = bucket.aggregations;
}
}
Bucket[] buckets = new Bucket[this.buckets.length];
for (int i = 0; i < buckets.length; ++i) {
Bucket b = this.buckets[i];
buckets[i] = new Bucket(format, keyed, b.key, b.from, b.to, docCounts[i],
InternalAggregations.reduce(Arrays.asList(aggs[i]), reduceContext));
List<Bucket> buckets = new ArrayList<>(this.buckets.size());
for (int i = 0; i < this.buckets.size(); ++i) {
Bucket b = this.buckets.get(i);
buckets.add(new Bucket(format, keyed, b.key, b.from, b.to, docCounts[i],
InternalAggregations.reduce(Arrays.asList(aggs[i]), reduceContext)));
}
return new InternalBinaryRange(name, format, keyed, buckets, pipelineAggregators(), metaData);
}
@ -264,28 +263,4 @@ public final class InternalBinaryRange
}
return builder;
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeBoolean(keyed);
out.writeVInt(buckets.length);
for (Bucket bucket : buckets) {
bucket.writeTo(out);
}
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
format = in.readNamedWriteable(DocValueFormat.class);
keyed = in.readBoolean();
Bucket[] buckets = new Bucket[in.readVInt()];
for (int i = 0; i < buckets.length; ++i) {
Bucket bucket = new Bucket(format, keyed);
bucket.readFrom(in);
buckets[i] = bucket;
}
this.buckets = buckets;
}
}

View File

@ -52,7 +52,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceType;
public final class IpRangeAggregationBuilder
extends ValuesSourceAggregationBuilder<ValuesSource.Bytes, IpRangeAggregationBuilder> {
private static final String NAME = "ip_range";
public static final String NAME = "ip_range";
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private static final InternalAggregation.Type TYPE = new InternalAggregation.Type(NAME);

View File

@ -28,7 +28,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -40,32 +39,14 @@ import java.util.List;
import java.util.Map;
/**
* Results of the {@link TopHitsAggregator}.
*/
public class InternalTopHits extends InternalMetricsAggregation implements TopHits {
public static final InternalAggregation.Type TYPE = new Type("top_hits");
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalTopHits readResult(StreamInput in) throws IOException {
InternalTopHits buckets = new InternalTopHits();
buckets.readFrom(in);
return buckets;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
private int from;
private int size;
private TopDocs topDocs;
private InternalSearchHits searchHits;
InternalTopHits() {
}
public InternalTopHits(String name, int from, int size, TopDocs topDocs, InternalSearchHits searchHits,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
@ -75,9 +56,29 @@ public class InternalTopHits extends InternalMetricsAggregation implements TopHi
this.searchHits = searchHits;
}
/**
* Read from a stream.
*/
public InternalTopHits(StreamInput in) throws IOException {
super(in);
from = in.readVInt();
size = in.readVInt();
topDocs = Lucene.readTopDocs(in);
assert topDocs != null;
searchHits = InternalSearchHits.readSearchHits(in);
}
@Override
public Type type() {
return TYPE;
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeVInt(from);
out.writeVInt(size);
Lucene.writeTopDocs(out, topDocs);
searchHits.writeTo(out);
}
@Override
public String getWriteableName() {
return TopHitsAggregationBuilder.NAME;
}
@Override
@ -139,23 +140,6 @@ public class InternalTopHits extends InternalMetricsAggregation implements TopHi
}
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
from = in.readVInt();
size = in.readVInt();
topDocs = Lucene.readTopDocs(in);
assert topDocs != null;
searchHits = InternalSearchHits.readSearchHits(in);
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeVInt(from);
out.writeVInt(size);
Lucene.writeTopDocs(out, topDocs);
searchHits.writeTo(out);
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
searchHits.toXContent(builder, params);

View File

@ -32,7 +32,9 @@ import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationInitializationException;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField;
@ -52,7 +54,8 @@ import java.util.Objects;
import java.util.Set;
public class TopHitsAggregationBuilder extends AbstractAggregationBuilder<TopHitsAggregationBuilder> {
public static final String NAME = InternalTopHits.TYPE.name();
public static final String NAME = "top_hits";
private static final InternalAggregation.Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private int from = 0;
@ -68,14 +71,14 @@ public class TopHitsAggregationBuilder extends AbstractAggregationBuilder<TopHit
private FetchSourceContext fetchSourceContext;
public TopHitsAggregationBuilder(String name) {
super(name, InternalTopHits.TYPE);
super(name, TYPE);
}
/**
* Read from a stream.
*/
public TopHitsAggregationBuilder(StreamInput in) throws IOException {
super(in, InternalTopHits.TYPE);
super(in, TYPE);
explain = in.readBoolean();
fetchSourceContext = in.readOptionalStreamable(FetchSourceContext::new);
if (in.readBoolean()) {