Migrate global, filter, and filters aggs to NamedWriteable

Once all of these are migrated we'll be able to remove aggregation's
custom "streams" which function that same as NamedWriteable. It also
allows us to make most of the fields on aggregations final which is
rather nice.

Also starts to migrate MultiBucketAggregation.Bucket to Writeable,
allowing the buckets to have immutable parts.
This commit is contained in:
Nik Everett 2016-07-01 15:58:24 -04:00
parent c02de9227c
commit a728c4bb5c
10 changed files with 127 additions and 167 deletions

View File

@ -549,13 +549,13 @@ public class SearchModule extends AbstractModule {
.addResultReader(InternalHDRPercentileRanks.NAME, InternalHDRPercentileRanks::new));
registerAggregation(new AggregationSpec(CardinalityAggregationBuilder::new, new CardinalityParser(),
CardinalityAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalCardinality::new));
registerAggregation(GlobalAggregationBuilder::new, GlobalAggregationBuilder::parse,
GlobalAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(new AggregationSpec(GlobalAggregationBuilder::new, GlobalAggregationBuilder::parse,
GlobalAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalGlobal::new));
registerAggregation(MissingAggregationBuilder::new, new MissingParser(), MissingAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(FilterAggregationBuilder::new, FilterAggregationBuilder::parse,
FilterAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(FiltersAggregationBuilder::new, FiltersAggregationBuilder::parse,
FiltersAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(new AggregationSpec(FilterAggregationBuilder::new, FilterAggregationBuilder::parse,
FilterAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalFilter::new));
registerAggregation(new AggregationSpec(FiltersAggregationBuilder::new, FiltersAggregationBuilder::parse,
FiltersAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalFilters::new));
registerAggregation(SamplerAggregationBuilder::new, SamplerAggregationBuilder::parse,
SamplerAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(DiversifiedAggregationBuilder::new, new DiversifiedSamplerParser(),
@ -767,9 +767,6 @@ public class SearchModule extends AbstractModule {
static {
// buckets
InternalGlobal.registerStreams();
InternalFilter.registerStreams();
InternalFilters.registerStream();
InternalSampler.registerStreams();
UnmappedSampler.registerStreams();
InternalMissing.registerStreams();

View File

@ -19,9 +19,11 @@
package org.elasticsearch.search.aggregations;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -35,6 +37,13 @@ public abstract class InternalMultiBucketAggregation<A extends InternalMultiBuck
super(name, pipelineAggregators, metaData);
}
/**
* Read from a stream.
*/
protected InternalMultiBucketAggregation(StreamInput in) throws IOException {
super(in);
}
/**
* Create a new copy of this {@link Aggregation} with the same settings as
* this {@link Aggregation} and contains the provided buckets.

View File

@ -54,6 +54,27 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
this.aggregations = aggregations;
}
/**
* Read from a stream.
*/
protected InternalSingleBucketAggregation(StreamInput in) throws IOException {
super(in);
docCount = in.readVLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
docCount = in.readVLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeVLong(docCount);
aggregations.writeTo(out);
}
@Override
public long getDocCount() {
return docCount;
@ -114,18 +135,6 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
}
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
docCount = in.readVLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeVLong(docCount);
aggregations.writeTo(out);
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(CommonFields.DOC_COUNT, docCount);

View File

@ -19,7 +19,9 @@
package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.Comparators;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.search.aggregations.Aggregation;
@ -27,19 +29,19 @@ import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.HasAggregations;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import java.io.IOException;
import java.util.List;
/**
* An aggregation that returns multiple buckets
*/
public interface MultiBucketsAggregation extends Aggregation {
/**
* A bucket represents a criteria to which all documents that fall in it adhere to. It is also uniquely identified
* by a key, and can potentially hold sub-aggregations computed over all documents in it.
*/
public interface Bucket extends HasAggregations, ToXContent, Streamable {
public interface Bucket extends HasAggregations, ToXContent, Streamable, Writeable {
// NORELEASE remove Streamable
/**
* @return The key associated with the bucket
@ -64,6 +66,12 @@ public interface MultiBucketsAggregation extends Aggregation {
Object getProperty(String containingAggName, List<String> path);
@Override
default void readFrom(StreamInput in) throws IOException {
// NORELEASE remove me when no Buckets override it
throw new UnsupportedOperationException("Prefer the Writeable interface");
}
static class SubAggregationComparator<B extends Bucket> implements java.util.Comparator<B> {
private final AggregationPath path;

View File

@ -29,13 +29,15 @@ import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import java.io.IOException;
import java.util.Objects;
public class FilterAggregationBuilder extends AbstractAggregationBuilder<FilterAggregationBuilder> {
public static final String NAME = InternalFilter.TYPE.name();
public static final String NAME = "filter";
private static final Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private final QueryBuilder filter;
@ -49,7 +51,7 @@ public class FilterAggregationBuilder extends AbstractAggregationBuilder<FilterA
* {@link Filter} aggregation.
*/
public FilterAggregationBuilder(String name, QueryBuilder filter) {
super(name, InternalFilter.TYPE);
super(name, TYPE);
if (filter == null) {
throw new IllegalArgumentException("[filter] must not be null: [" + name + "]");
}
@ -60,7 +62,7 @@ public class FilterAggregationBuilder extends AbstractAggregationBuilder<FilterA
* Read from a stream.
*/
public FilterAggregationBuilder(StreamInput in) throws IOException {
super(in, InternalFilter.TYPE);
super(in, TYPE);
filter = in.readNamedWriteable(QueryBuilder.class);
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.search.aggregations.bucket.filter;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -28,35 +27,21 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
*
*/
public class InternalFilter extends InternalSingleBucketAggregation implements Filter {
public static final Type TYPE = new Type("filter");
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalFilter readResult(StreamInput in) throws IOException {
InternalFilter result = new InternalFilter();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
InternalFilter() {} // for serialization
InternalFilter(String name, long docCount, InternalAggregations subAggregations, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, docCount, subAggregations, pipelineAggregators, metaData);
}
/**
* Stream from a stream.
*/
public InternalFilter(StreamInput in) throws IOException {
super(in);
}
@Override
public Type type() {
return TYPE;
public String getWriteableName() {
return FilterAggregationBuilder.NAME;
}
@Override

View File

@ -29,6 +29,7 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.filters.FiltersAggregator.KeyedFilter;
import org.elasticsearch.search.aggregations.support.AggregationContext;
@ -43,7 +44,8 @@ import java.util.Objects;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
public class FiltersAggregationBuilder extends AbstractAggregationBuilder<FiltersAggregationBuilder> {
public static final String NAME = InternalFilters.TYPE.name();
public static final String NAME = "filters";
private static final Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private static final ParseField FILTERS_FIELD = new ParseField("filters");
@ -66,7 +68,7 @@ public class FiltersAggregationBuilder extends AbstractAggregationBuilder<Filter
}
private FiltersAggregationBuilder(String name, List<KeyedFilter> filters) {
super(name, InternalFilters.TYPE);
super(name, TYPE);
// internally we want to have a fixed order of filters, regardless of the order of the filters in the request
this.filters = new ArrayList<>(filters);
Collections.sort(this.filters, (KeyedFilter kf1, KeyedFilter kf2) -> kf1.key().compareTo(kf2.key()));
@ -80,7 +82,7 @@ public class FiltersAggregationBuilder extends AbstractAggregationBuilder<Filter
* the filters to use with this aggregation
*/
public FiltersAggregationBuilder(String name, QueryBuilder... filters) {
super(name, InternalFilters.TYPE);
super(name, TYPE);
List<KeyedFilter> keyedFilters = new ArrayList<>(filters.length);
for (int i = 0; i < filters.length; i++) {
keyedFilters.add(new KeyedFilter(String.valueOf(i), filters[i]));
@ -93,7 +95,7 @@ public class FiltersAggregationBuilder extends AbstractAggregationBuilder<Filter
* Read from a stream.
*/
public FiltersAggregationBuilder(StreamInput in) throws IOException {
super(in, InternalFilters.TYPE);
super(in, TYPE);
keyed = in.readBoolean();
int filtersSize = in.readVInt();
filters = new ArrayList<>(filtersSize);

View File

@ -22,13 +22,10 @@ package org.elasticsearch.search.aggregations.bucket.filters;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
@ -37,55 +34,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
*/
public class InternalFilters extends InternalMultiBucketAggregation<InternalFilters, InternalFilters.InternalBucket> implements Filters {
public static final Type TYPE = new Type("filters");
private static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalFilters readResult(StreamInput in) throws IOException {
InternalFilters filters = new InternalFilters();
filters.readFrom(in);
return filters;
}
};
private static final BucketStreams.Stream<InternalBucket> BUCKET_STREAM = new BucketStreams.Stream<InternalBucket>() {
@Override
public InternalBucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
InternalBucket filters = new InternalBucket(context.keyed());
filters.readFrom(in);
return filters;
}
@Override
public BucketStreamContext getBucketStreamContext(InternalBucket bucket) {
BucketStreamContext context = new BucketStreamContext();
context.keyed(bucket.keyed);
return context;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}
public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucket implements Filters.Bucket {
private final boolean keyed;
private String key;
private final String key;
private long docCount;
InternalAggregations aggregations;
private InternalBucket(boolean keyed) {
// for serialization
this.keyed = keyed;
}
public InternalBucket(String key, long docCount, InternalAggregations aggregations, boolean keyed) {
this.key = key;
this.docCount = docCount;
@ -93,6 +49,23 @@ public class InternalFilters extends InternalMultiBucketAggregation<InternalFilt
this.keyed = keyed;
}
/**
* Read from a stream.
*/
public InternalBucket(StreamInput in, boolean keyed) throws IOException {
this.keyed = keyed;
key = in.readOptionalString();
docCount = in.readVLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(key);
out.writeVLong(docCount);
aggregations.writeTo(out);
}
@Override
public String getKey() {
return key;
@ -140,27 +113,11 @@ public class InternalFilters extends InternalMultiBucketAggregation<InternalFilt
builder.endObject();
return builder;
}
@Override
public void readFrom(StreamInput in) throws IOException {
key = in.readOptionalString();
docCount = in.readVLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(key);
out.writeVLong(docCount);
aggregations.writeTo(out);
}
}
private List<InternalBucket> buckets;
private final List<InternalBucket> buckets;
private final boolean keyed;
private Map<String, InternalBucket> bucketMap;
private boolean keyed;
public InternalFilters() {} // for serialization
public InternalFilters(String name, List<InternalBucket> buckets, boolean keyed, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
@ -168,9 +125,33 @@ public class InternalFilters extends InternalMultiBucketAggregation<InternalFilt
this.keyed = keyed;
}
/**
* Read from a stream.
*/
public InternalFilters(StreamInput in) throws IOException {
super(in);
keyed = in.readBoolean();
int size = in.readVInt();
List<InternalBucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
buckets.add(new InternalBucket(in, keyed));
}
this.buckets = buckets;
this.bucketMap = null;
}
@Override
public Type type() {
return TYPE;
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeBoolean(keyed);
out.writeVInt(buckets.size());
for (InternalBucket bucket : buckets) {
bucket.writeTo(out);
}
}
@Override
public String getWriteableName() {
return FiltersAggregationBuilder.NAME;
}
@Override
@ -226,29 +207,6 @@ public class InternalFilters extends InternalMultiBucketAggregation<InternalFilt
return reduced;
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
keyed = in.readBoolean();
int size = in.readVInt();
List<InternalBucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
InternalBucket bucket = new InternalBucket(keyed);
bucket.readFrom(in);
buckets.add(bucket);
}
this.buckets = buckets;
this.bucketMap = null;
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeBoolean(keyed);
out.writeVInt(buckets.size());
for (InternalBucket bucket : buckets) {
bucket.writeTo(out);
}
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
if (keyed) {

View File

@ -26,24 +26,26 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import java.io.IOException;
public class GlobalAggregationBuilder extends AbstractAggregationBuilder<GlobalAggregationBuilder> {
public static final String NAME = InternalGlobal.TYPE.name();
public static final String NAME = "global";
private static final Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
public GlobalAggregationBuilder(String name) {
super(name, InternalGlobal.TYPE);
super(name, TYPE);
}
/**
* Read from a stream.
*/
public GlobalAggregationBuilder(StreamInput in) throws IOException {
super(in, InternalGlobal.TYPE);
super(in, TYPE);
}
@Override

View File

@ -19,7 +19,6 @@
package org.elasticsearch.search.aggregations.bucket.global;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -33,31 +32,20 @@ import java.util.Map;
* regardless the query.
*/
public class InternalGlobal extends InternalSingleBucketAggregation implements Global {
public static final Type TYPE = new Type("global");
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalGlobal readResult(StreamInput in) throws IOException {
InternalGlobal result = new InternalGlobal();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
InternalGlobal() {} // for serialization
InternalGlobal(String name, long docCount, InternalAggregations aggregations, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, docCount, aggregations, pipelineAggregators, metaData);
}
/**
* Read from a stream.
*/
public InternalGlobal(StreamInput in) throws IOException {
super(in);
}
@Override
public Type type() {
return TYPE;
public String getWriteableName() {
return GlobalAggregationBuilder.NAME;
}
@Override