diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketStreamContext.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketStreamContext.java new file mode 100644 index 00000000000..3b3ddda3dc3 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketStreamContext.java @@ -0,0 +1,80 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; +import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; + +import java.io.IOException; +import java.util.Map; + +public class BucketStreamContext implements Streamable { + + @Nullable + private ValueFormatter formatter; + private boolean keyed; + private Map attributes; + + public BucketStreamContext() { + } + + public void formatter(@Nullable ValueFormatter formatter) { + this.formatter = formatter; + } + + public ValueFormatter formatter() { + return formatter; + } + + public void keyed(boolean keyed) { + this.keyed = keyed; + } + + public boolean keyed() { + return keyed; + } + + public void attributes(Map attributes) { + this.attributes = attributes; + } + + public Map attributes() { + return attributes; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + formatter = ValueFormatterStreams.readOptional(in); + keyed = in.readBoolean(); + attributes = in.readMap(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + ValueFormatterStreams.writeOptional(formatter, out); + out.writeBoolean(keyed); + out.writeMap(attributes); + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketStreams.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketStreams.java new file mode 100644 index 00000000000..aa489e064ed --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketStreams.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +public class BucketStreams { + + private static ImmutableMap STREAMS = ImmutableMap.of(); + + /** + * A stream that knows how to read a bucket from the input. + */ + public static interface Stream { + B readResult(StreamInput in, BucketStreamContext context) throws IOException; + BucketStreamContext getBucketStreamContext(B bucket); + } + + /** + * Registers the given stream and associate it with the given types. + * + * @param stream The streams to register + * @param types The types associated with the streams + */ + public static synchronized void registerStream(Stream stream, BytesReference... types) { + MapBuilder uStreams = MapBuilder.newMapBuilder(STREAMS); + for (BytesReference type : types) { + uStreams.put(type, stream); + } + STREAMS = uStreams.immutableMap(); + } + + /** + * Returns the stream that is registered for the given type + * + * @param type The given type + * @return The associated stream + */ + public static Stream stream(BytesReference type) { + return STREAMS.get(type); + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/MultiBucketsAggregation.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/MultiBucketsAggregation.java index d1eebcfcb79..5572719cfdd 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/MultiBucketsAggregation.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/MultiBucketsAggregation.java @@ -19,14 +19,17 @@ package org.elasticsearch.search.aggregations.bucket; +import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.util.Comparators; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.HasAggregations; import org.elasticsearch.search.aggregations.support.OrderPath; import java.util.Collection; +import java.util.List; /** * An aggregation that returns multiple buckets @@ -38,7 +41,7 @@ public interface MultiBucketsAggregation extends Aggregation { * A bucket represents a criteria to which all documents that fall in it adhere to. It is also uniquely identified * by a key, and can potentially hold sub-aggregations computed over all documents in it. */ - public interface Bucket extends HasAggregations { + public interface Bucket extends HasAggregations, ToXContent, Streamable { /** * @return The key associated with the bucket as a string @@ -90,7 +93,7 @@ public interface MultiBucketsAggregation extends Aggregation { /** * @return The buckets of this aggregation. */ - Collection getBuckets(); + List getBuckets(); /** * The bucket that is associated with the given key. diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/Filters.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/Filters.java index f13cd650c71..a5515e7538f 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/Filters.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/Filters.java @@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.bucket.filters; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import java.util.Collection; +import java.util.List; /** * A multi bucket aggregation where the buckets are defined by a set of filters (a bucket per filter). Each bucket @@ -38,7 +39,7 @@ public interface Filters extends MultiBucketsAggregation { /** * The buckets created by this aggregation. */ - Collection getBuckets(); + List getBuckets(); @Override Bucket getBucketByKey(String key); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregator.java index e56b95159a1..efbccd0d181 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregator.java @@ -91,7 +91,7 @@ public class FiltersAggregator extends BucketsAggregator { for (int i = 0; i < filters.length; i++) { KeyedFilter filter = filters[i]; long bucketOrd = bucketOrd(owningBucketOrdinal, i); - InternalFilters.Bucket bucket = new InternalFilters.Bucket(filter.key, bucketDocCount(bucketOrd), bucketAggregations(bucketOrd)); + InternalFilters.Bucket bucket = new InternalFilters.Bucket(filter.key, bucketDocCount(bucketOrd), bucketAggregations(bucketOrd), keyed); buckets.add(bucket); } return new InternalFilters(name, buckets, keyed); @@ -102,7 +102,7 @@ public class FiltersAggregator extends BucketsAggregator { InternalAggregations subAggs = buildEmptySubAggregations(); List buckets = Lists.newArrayListWithCapacity(filters.length); for (int i = 0; i < filters.length; i++) { - InternalFilters.Bucket bucket = new InternalFilters.Bucket(filters[i].key, 0, subAggs); + InternalFilters.Bucket bucket = new InternalFilters.Bucket(filters[i].key, 0, subAggs, keyed); buckets.add(bucket); } return new InternalFilters(name, buckets, keyed); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/InternalFilters.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/InternalFilters.java index ac058d8fabd..879dcb982ae 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/InternalFilters.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/InternalFilters.java @@ -29,9 +29,14 @@ import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; +import org.elasticsearch.search.aggregations.bucket.BucketStreams; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * @@ -49,20 +54,44 @@ public class InternalFilters extends InternalAggregation implements Filters { } }; + private final static BucketStreams.Stream BUCKET_STREAM = new BucketStreams.Stream() { + @Override + public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { + Bucket filters = new Bucket(context.keyed()); + filters.readFrom(in); + return filters; + } + + @Override + public BucketStreamContext getBucketStreamContext(Bucket bucket) { + BucketStreamContext context = new BucketStreamContext(); + context.keyed(bucket.keyed); + return context; + } + }; + public static void registerStream() { AggregationStreams.registerStream(STREAM, TYPE.stream()); + BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); } public static class Bucket implements Filters.Bucket { + private final boolean keyed; private String key; private long docCount; InternalAggregations aggregations; - public Bucket(String key, long docCount, InternalAggregations aggregations) { + private Bucket(boolean keyed) { + // for serialization + this.keyed = keyed; + } + + public Bucket(String key, long docCount, InternalAggregations aggregations, boolean keyed) { this.key = key; this.docCount = docCount; this.aggregations = aggregations; + this.keyed = keyed; } public String getKey() { @@ -89,7 +118,7 @@ public class InternalFilters extends InternalAggregation implements Filters { List aggregationsList = Lists.newArrayListWithCapacity(buckets.size()); for (Bucket bucket : buckets) { if (reduced == null) { - reduced = new Bucket(bucket.key, bucket.docCount, bucket.aggregations); + reduced = new Bucket(bucket.key, bucket.docCount, bucket.aggregations, bucket.keyed); } else { reduced.docCount += bucket.docCount; } @@ -99,7 +128,8 @@ public class InternalFilters extends InternalAggregation implements Filters { return reduced; } - void toXContent(XContentBuilder builder, Params params, boolean keyed) throws IOException { + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { if (keyed) { builder.startObject(key); } else { @@ -108,6 +138,21 @@ public class InternalFilters extends InternalAggregation implements Filters { builder.field(CommonFields.DOC_COUNT, docCount); aggregations.toXContentInternal(builder, params); builder.endObject(); + return builder; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + key = in.readOptionalString(); + docCount = in.readVLong(); + aggregations = InternalAggregations.readAggregations(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(key); + out.writeVLong(docCount); + aggregations.writeTo(out); } } @@ -129,7 +174,7 @@ public class InternalFilters extends InternalAggregation implements Filters { } @Override - public Collection getBuckets() { + public List getBuckets() { return buckets; } @@ -179,8 +224,9 @@ public class InternalFilters extends InternalAggregation implements Filters { int size = in.readVInt(); List buckets = Lists.newArrayListWithCapacity(size); for (int i = 0; i < size; i++) { - String key = in.readOptionalString(); - buckets.add(new Bucket(key, in.readVLong(), InternalAggregations.readAggregations(in))); + Bucket bucket = new Bucket(keyed); + bucket.readFrom(in); + buckets.add(bucket); } this.buckets = buckets; this.bucketMap = null; @@ -192,9 +238,7 @@ public class InternalFilters extends InternalAggregation implements Filters { out.writeBoolean(keyed); out.writeVInt(buckets.size()); for (Bucket bucket : buckets) { - out.writeOptionalString(bucket.key); - out.writeVLong(bucket.docCount); - bucket.aggregations.writeTo(out); + bucket.writeTo(out); } } @@ -206,7 +250,7 @@ public class InternalFilters extends InternalAggregation implements Filters { builder.startArray(CommonFields.BUCKETS); } for (Bucket bucket : buckets) { - bucket.toXContent(builder, params, keyed); + bucket.toXContent(builder, params); } if (keyed) { builder.endObject(); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGrid.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGrid.java index c73d24efecf..3fbc5ead187 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGrid.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGrid.java @@ -23,6 +23,7 @@ import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import java.util.Collection; +import java.util.List; /** * A {@code geohash_grid} aggregation. Defines multiple buckets, each representing a cell in a geo-grid of a specific @@ -51,7 +52,7 @@ public interface GeoHashGrid extends MultiBucketsAggregation { * @return The buckets of this aggregation (each bucket representing a geohash grid cell) */ @Override - Collection getBuckets(); + List getBuckets(); @Override Bucket getBucketByKey(String key); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java index c9eca6c8e66..a678f31e34a 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java @@ -31,6 +31,8 @@ import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; +import org.elasticsearch.search.aggregations.bucket.BucketStreams; import java.io.IOException; import java.util.*; @@ -53,8 +55,25 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG } }; + + public static final BucketStreams.Stream BUCKET_STREAM = new BucketStreams.Stream() { + @Override + public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { + Bucket bucket = new Bucket(); + bucket.readFrom(in); + return bucket; + } + + @Override + public BucketStreamContext getBucketStreamContext(Bucket bucket) { + BucketStreamContext context = new BucketStreamContext(); + return context; + } + }; + public static void registerStreams() { AggregationStreams.registerStream(STREAM, TYPE.stream()); + BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); } @@ -64,6 +83,10 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG protected long docCount; protected InternalAggregations aggregations; + public Bucket() { + // For Serialization only + } + public Bucket(long geohashAsLong, long docCount, InternalAggregations aggregations) { this.docCount = docCount; this.aggregations = aggregations; @@ -120,6 +143,29 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG return geohashAsLong; } + @Override + public void readFrom(StreamInput in) throws IOException { + geohashAsLong = in.readLong(); + docCount = in.readVLong(); + aggregations = InternalAggregations.readAggregations(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(geohashAsLong); + out.writeVLong(docCount); + aggregations.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(CommonFields.KEY, getKeyAsText()); + builder.field(CommonFields.DOC_COUNT, docCount); + aggregations.toXContentInternal(builder, params); + builder.endObject(); + return builder; + } } private int requiredSize; @@ -141,9 +187,9 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG } @Override - public Collection getBuckets() { + public List getBuckets() { Object o = buckets; - return (Collection) o; + return (List) o; } @Override @@ -208,7 +254,9 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG int size = in.readVInt(); List buckets = new ArrayList<>(size); for (int i = 0; i < size; i++) { - buckets.add(new Bucket(in.readLong(), in.readVLong(), InternalAggregations.readAggregations(in))); + Bucket bucket = new Bucket(); + bucket.readFrom(in); + buckets.add(bucket); } this.buckets = buckets; this.bucketMap = null; @@ -220,9 +268,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG writeSize(requiredSize, out); out.writeVInt(buckets.size()); for (Bucket bucket : buckets) { - out.writeLong(bucket.geohashAsLong); - out.writeVLong(bucket.getDocCount()); - ((InternalAggregations) bucket.getAggregations()).writeTo(out); + bucket.writeTo(out); } } @@ -230,11 +276,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { builder.startArray(CommonFields.BUCKETS); for (Bucket bucket : buckets) { - builder.startObject(); - builder.field(CommonFields.KEY, bucket.getKeyAsText()); - builder.field(CommonFields.DOC_COUNT, bucket.getDocCount()); - ((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params); - builder.endObject(); + bucket.toXContent(builder, params); } builder.endArray(); return builder; diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregator.java index 879da94cfc7..fe93725c363 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregator.java @@ -114,7 +114,7 @@ public class HistogramAggregator extends BucketsAggregator { assert owningBucketOrdinal == 0; List buckets = new ArrayList<>((int) bucketOrds.size()); for (long i = 0; i < bucketOrds.size(); i++) { - buckets.add(histogramFactory.createBucket(rounding.valueForKey(bucketOrds.get(i)), bucketDocCount(i), bucketAggregations(i), formatter)); + buckets.add(histogramFactory.createBucket(rounding.valueForKey(bucketOrds.get(i)), bucketDocCount(i), bucketAggregations(i), keyed, formatter)); } CollectionUtil.introSort(buckets, order.comparator()); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 2eca156a6f9..c6679b4a8c8 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -23,6 +23,8 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; +import org.elasticsearch.search.aggregations.bucket.BucketStreams; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -47,14 +49,35 @@ public class InternalDateHistogram extends InternalHistogram BUCKET_STREAM = new BucketStreams.Stream() { + @Override + public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { + Bucket buckets = new Bucket(context.keyed(), context.formatter()); + buckets.readFrom(in); + return buckets; + } + + @Override + public BucketStreamContext getBucketStreamContext(Bucket bucket) { + BucketStreamContext context = new BucketStreamContext(); + context.formatter(bucket.formatter); + return context; + } + }; + public static void registerStream() { AggregationStreams.registerStream(STREAM, TYPE.stream()); + BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); } static class Bucket extends InternalHistogram.Bucket implements DateHistogram.Bucket { - Bucket(long key, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) { - super(key, docCount, formatter, aggregations); + Bucket(boolean keyed, @Nullable ValueFormatter formatter) { + super(keyed, formatter); + } + + Bucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) { + super(key, docCount, keyed, formatter, aggregations); } @Override @@ -95,8 +118,8 @@ public class InternalDateHistogram extends InternalHistogram extends Inter } }; + private final static BucketStreams.Stream BUCKET_STREAM = new BucketStreams.Stream() { + @Override + public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { + Bucket histogram = new Bucket(context.keyed(), context.formatter()); + histogram.readFrom(in); + return histogram; + } + + @Override + public BucketStreamContext getBucketStreamContext(Bucket bucket) { + BucketStreamContext context = new BucketStreamContext(); + context.formatter(bucket.formatter); + context.keyed(bucket.keyed); + return context; + } + }; + public static void registerStream() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); } public static class Bucket implements Histogram.Bucket { - final long key; - final long docCount; + long key; + long docCount; + InternalAggregations aggregations; + private transient final boolean keyed; protected transient final @Nullable ValueFormatter formatter; - final InternalAggregations aggregations; - public Bucket(long key, long docCount, @Nullable ValueFormatter formatter, InternalAggregations aggregations) { + public Bucket(boolean keyed, @Nullable ValueFormatter formatter) { + this.formatter = formatter; + this.keyed = keyed; + } + + public Bucket(long key, long docCount, boolean keyed, @Nullable ValueFormatter formatter, InternalAggregations aggregations) { + this(keyed, formatter); this.key = key; this.docCount = docCount; - this.formatter = formatter; this.aggregations = aggregations; } @@ -114,10 +141,11 @@ public class InternalHistogram extends Inter aggregations.add((InternalAggregations) bucket.getAggregations()); } InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); - return (B) getFactory().createBucket(key, docCount, aggs, formatter); + return (B) getFactory().createBucket(key, docCount, aggs, keyed, formatter); } - void toXContent(XContentBuilder builder, Params params, boolean keyed, @Nullable ValueFormatter formatter) throws IOException { + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { if (formatter != null && formatter != ValueFormatter.RAW) { Text keyTxt = new StringText(formatter.format(key)); if (keyed) { @@ -137,6 +165,21 @@ public class InternalHistogram extends Inter builder.field(CommonFields.DOC_COUNT, docCount); aggregations.toXContentInternal(builder, params); builder.endObject(); + return builder; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + key = in.readLong(); + docCount = in.readVLong(); + aggregations = InternalAggregations.readAggregations(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(key); + out.writeVLong(docCount); + aggregations.writeTo(out); } } @@ -194,8 +237,8 @@ public class InternalHistogram extends Inter return new InternalHistogram<>(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed); } - public B createBucket(long key, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) { - return (B) new Bucket(key, docCount, formatter, aggregations); + public B createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) { + return (B) new Bucket(key, docCount, keyed, formatter, aggregations); } } @@ -295,7 +338,7 @@ public class InternalHistogram extends Inter long key = bounds.min; long max = bounds.max; while (key <= max) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter)); + iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); key = emptyBucketInfo.rounding.nextRoundingValue(key); } } @@ -304,7 +347,7 @@ public class InternalHistogram extends Inter long key = bounds.min; if (key < firstBucket.key) { while (key < firstBucket.key) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter)); + iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); key = emptyBucketInfo.rounding.nextRoundingValue(key); } } @@ -319,7 +362,7 @@ public class InternalHistogram extends Inter if (lastBucket != null) { long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key); while (key < nextBucket.key) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter)); + iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); key = emptyBucketInfo.rounding.nextRoundingValue(key); } assert key == nextBucket.key; @@ -332,7 +375,7 @@ public class InternalHistogram extends Inter long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key); long max = bounds.max; while (key <= max) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter)); + iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); key = emptyBucketInfo.rounding.nextRoundingValue(key); } } @@ -348,8 +391,12 @@ public class InternalHistogram extends Inter return getFactory().create(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, formatter, keyed); } - protected B createBucket(long key, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) { - return (B) new InternalHistogram.Bucket(key, docCount, formatter, aggregations); + protected B createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) { + return (B) new InternalHistogram.Bucket(key, docCount, keyed, formatter, aggregations); + } + + protected B createEmptyBucket(boolean keyed, @Nullable ValueFormatter formatter) { + return (B) new InternalHistogram.Bucket(keyed, formatter); } @Override @@ -365,7 +412,9 @@ public class InternalHistogram extends Inter int size = in.readVInt(); List buckets = new ArrayList<>(size); for (int i = 0; i < size; i++) { - buckets.add(createBucket(in.readLong(), in.readVLong(), InternalAggregations.readAggregations(in), formatter)); + B bucket = createEmptyBucket(keyed, formatter); + bucket.readFrom(in); + buckets.add(bucket); } this.buckets = buckets; this.bucketsMap = null; @@ -383,9 +432,7 @@ public class InternalHistogram extends Inter out.writeBoolean(keyed); out.writeVInt(buckets.size()); for (B bucket : buckets) { - out.writeLong(bucket.key); - out.writeVLong(bucket.docCount); - bucket.aggregations.writeTo(out); + bucket.writeTo(out); } } @@ -397,7 +444,7 @@ public class InternalHistogram extends Inter builder.startArray(CommonFields.BUCKETS); } for (B bucket : buckets) { - bucket.toXContent(builder, params, keyed, formatter); + bucket.toXContent(builder, params); } if (keyed) { builder.endObject(); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java index faa77beef60..57d03ed6a3e 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java @@ -29,11 +29,16 @@ import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; +import org.elasticsearch.search.aggregations.bucket.BucketStreams; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * @@ -53,26 +58,50 @@ public class InternalRange extends InternalAggre } }; + private final static BucketStreams.Stream BUCKET_STREAM = new BucketStreams.Stream() { + @Override + public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { + Bucket buckets = new Bucket(context.keyed(), context.formatter()); + buckets.readFrom(in); + return buckets; + } + + @Override + public BucketStreamContext getBucketStreamContext(Bucket bucket) { + BucketStreamContext context = new BucketStreamContext(); + context.formatter(bucket.formatter); + context.keyed(bucket.keyed); + return context; + } + }; + public static void registerStream() { AggregationStreams.registerStream(STREAM, TYPE.stream()); + BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); } public static class Bucket implements Range.Bucket { - private final ValueFormatter formatter; - private final double from; - private final double to; - private final long docCount; - final InternalAggregations aggregations; - private final String key; + protected transient final boolean keyed; + protected transient final ValueFormatter formatter; + private double from; + private double to; + private long docCount; + InternalAggregations aggregations; + private String key; - public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) { + public Bucket(boolean keyed, @Nullable ValueFormatter formatter) { + this.keyed = keyed; + this.formatter = formatter; + } + + public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) { + this(keyed, formatter); this.key = key != null ? key : generateKey(from, to, formatter); this.from = from; this.to = to; this.docCount = docCount; this.aggregations = aggregations; - this.formatter = formatter; } public String getKey() { @@ -116,10 +145,11 @@ public class InternalRange extends InternalAggre aggregationsList.add(range.aggregations); } final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); - return getFactory().createBucket(key, from, to, docCount, aggs, formatter); + return getFactory().createBucket(key, from, to, docCount, aggs, keyed, formatter); } - void toXContent(XContentBuilder builder, Params params, @Nullable ValueFormatter formatter, boolean keyed) throws IOException { + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { if (keyed) { builder.startObject(key); } else { @@ -141,6 +171,7 @@ public class InternalRange extends InternalAggre builder.field(CommonFields.DOC_COUNT, docCount); aggregations.toXContentInternal(builder, params); builder.endObject(); + return builder; } protected String generateKey(double from, double to, @Nullable ValueFormatter formatter) { @@ -151,6 +182,15 @@ public class InternalRange extends InternalAggre return sb.toString(); } + @Override + public void readFrom(StreamInput in) throws IOException { + + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + + } } public static class Factory> { @@ -164,8 +204,8 @@ public class InternalRange extends InternalAggre } - public B createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) { - return (B) new Bucket(key, from, to, docCount, aggregations, formatter); + public B createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) { + return (B) new Bucket(key, from, to, docCount, aggregations, keyed, formatter); } } @@ -189,7 +229,7 @@ public class InternalRange extends InternalAggre } @Override - public Collection getBuckets() { + public List getBuckets() { return ranges; } @@ -240,7 +280,7 @@ public class InternalRange extends InternalAggre List ranges = Lists.newArrayListWithCapacity(size); for (int i = 0; i < size; i++) { String key = in.readOptionalString(); - ranges.add(getFactory().createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(), InternalAggregations.readAggregations(in), formatter)); + ranges.add(getFactory().createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(), InternalAggregations.readAggregations(in), keyed, formatter)); } this.ranges = ranges; this.rangeMap = null; @@ -269,7 +309,7 @@ public class InternalRange extends InternalAggre builder.startArray(CommonFields.BUCKETS); } for (B range : ranges) { - range.toXContent(builder, params, formatter, keyed); + range.toXContent(builder, params); } if (keyed) { builder.endObject(); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/Range.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/Range.java index fee28b68d6a..40585d3955a 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/Range.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/Range.java @@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.bucket.range; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import java.util.Collection; +import java.util.List; /** * A {@code range} aggregation. Defines multiple buckets, each associated with a pre-defined value range of a field, @@ -47,7 +48,7 @@ public interface Range extends MultiBucketsAggregation { /** * Return the buckets of this range aggregation. */ - Collection getBuckets(); + List getBuckets(); @Override Bucket getBucketByKey(String key); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java index a97f4442b45..283362581e0 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java @@ -200,7 +200,7 @@ public class RangeAggregator extends BucketsAggregator { Range range = ranges[i]; final long bucketOrd = subBucketOrdinal(owningBucketOrdinal, i); org.elasticsearch.search.aggregations.bucket.range.Range.Bucket bucket = - rangeFactory.createBucket(range.key, range.from, range.to, bucketDocCount(bucketOrd),bucketAggregations(bucketOrd), formatter); + rangeFactory.createBucket(range.key, range.from, range.to, bucketDocCount(bucketOrd),bucketAggregations(bucketOrd), keyed, formatter); buckets.add(bucket); } // value source can be null in the case of unmapped fields @@ -214,7 +214,7 @@ public class RangeAggregator extends BucketsAggregator { for (int i = 0; i < ranges.length; i++) { Range range = ranges[i]; org.elasticsearch.search.aggregations.bucket.range.Range.Bucket bucket = - rangeFactory.createBucket(range.key, range.from, range.to, 0, subAggs, formatter); + rangeFactory.createBucket(range.key, range.from, range.to, 0, subAggs, keyed, formatter); buckets.add(bucket); } // value source can be null in the case of unmapped fields @@ -273,7 +273,7 @@ public class RangeAggregator extends BucketsAggregator { InternalAggregations subAggs = buildEmptySubAggregations(); List buckets = new ArrayList<>(ranges.size()); for (RangeAggregator.Range range : ranges) { - buckets.add(factory.createBucket(range.key, range.from, range.to, 0, subAggs, formatter)); + buckets.add(factory.createBucket(range.key, range.from, range.to, 0, subAggs, keyed, formatter)); } return factory.create(name, buckets, formatter, keyed); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/date/DateRange.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/date/DateRange.java index 4ce44297e4a..aaec2ee84bb 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/date/DateRange.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/date/DateRange.java @@ -22,6 +22,7 @@ import org.elasticsearch.search.aggregations.bucket.range.Range; import org.joda.time.DateTime; import java.util.Collection; +import java.util.List; /** * A range aggregation on date values. @@ -36,7 +37,7 @@ public interface DateRange extends Range { } @Override - Collection getBuckets(); + List getBuckets(); @Override DateRange.Bucket getBucketByKey(String key); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/date/InternalDateRange.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/date/InternalDateRange.java index 68e5c86da80..c987580eadb 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/date/InternalDateRange.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/date/InternalDateRange.java @@ -23,6 +23,8 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; +import org.elasticsearch.search.aggregations.bucket.BucketStreams; import org.elasticsearch.search.aggregations.bucket.range.InternalRange; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.joda.time.DateTime; @@ -47,20 +49,42 @@ public class InternalDateRange extends InternalRange i } }; + private final static BucketStreams.Stream BUCKET_STREAM = new BucketStreams.Stream() { + @Override + public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { + Bucket buckets = new Bucket(context.keyed(), context.formatter()); + buckets.readFrom(in); + return buckets; + } + + @Override + public BucketStreamContext getBucketStreamContext(Bucket bucket) { + BucketStreamContext context = new BucketStreamContext(); + context.formatter(bucket.formatter()); + context.keyed(bucket.keyed()); + return context; + } + }; + public static void registerStream() { AggregationStreams.registerStream(STREAM, TYPE.stream()); + BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); } public static final Factory FACTORY = new Factory(); public static class Bucket extends InternalRange.Bucket implements DateRange.Bucket { - public Bucket(String key, double from, double to, long docCount, List aggregations, ValueFormatter formatter) { - super(key, from, to, docCount, new InternalAggregations(aggregations), formatter); + public Bucket(boolean keyed, @Nullable ValueFormatter formatter) { + super(keyed, formatter); } - public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) { - super(key, from, to, docCount, aggregations, formatter); + public Bucket(String key, double from, double to, long docCount, List aggregations, boolean keyed, ValueFormatter formatter) { + super(key, from, to, docCount, new InternalAggregations(aggregations), keyed, formatter); + } + + public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, ValueFormatter formatter) { + super(key, from, to, docCount, aggregations, keyed, formatter); } @Override @@ -77,6 +101,14 @@ public class InternalDateRange extends InternalRange i protected InternalRange.Factory getFactory() { return FACTORY; } + + boolean keyed() { + return keyed; + } + + ValueFormatter formatter() { + return formatter; + } } private static class Factory extends InternalRange.Factory { @@ -92,8 +124,8 @@ public class InternalDateRange extends InternalRange i } @Override - public Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) { - return new Bucket(key, from, to, docCount, aggregations, formatter); + public Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, ValueFormatter formatter) { + return new Bucket(key, from, to, docCount, aggregations, keyed, formatter); } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/GeoDistance.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/GeoDistance.java index ce941fcad81..05a53ead3ba 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/GeoDistance.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/GeoDistance.java @@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.bucket.range.geodistance; import org.elasticsearch.search.aggregations.bucket.range.Range; import java.util.Collection; +import java.util.List; /** * An aggregation that computes ranges of geo distances. @@ -34,7 +35,7 @@ public interface GeoDistance extends Range { } @Override - Collection getBuckets(); + List getBuckets(); @Override GeoDistance.Bucket getBucketByKey(String key); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/InternalGeoDistance.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/InternalGeoDistance.java index 5eefc6ae66f..d9c16b86378 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/InternalGeoDistance.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/InternalGeoDistance.java @@ -23,6 +23,8 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; +import org.elasticsearch.search.aggregations.bucket.BucketStreams; import org.elasticsearch.search.aggregations.bucket.range.InternalRange; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; @@ -45,26 +47,56 @@ public class InternalGeoDistance extends InternalRange BUCKET_STREAM = new BucketStreams.Stream() { + @Override + public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { + Bucket buckets = new Bucket(context.keyed(), context.formatter()); + buckets.readFrom(in); + return buckets; + } + + @Override + public BucketStreamContext getBucketStreamContext(Bucket bucket) { + BucketStreamContext context = new BucketStreamContext(); + context.formatter(bucket.formatter()); + context.keyed(bucket.keyed()); + return context; + } + }; + public static void registerStream() { AggregationStreams.registerStream(STREAM, TYPE.stream()); + BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); } public static final Factory FACTORY = new Factory(); static class Bucket extends InternalRange.Bucket implements GeoDistance.Bucket { - Bucket(String key, double from, double to, long docCount, List aggregations, ValueFormatter formatter) { - this(key, from, to, docCount, new InternalAggregations(aggregations), formatter); + Bucket(boolean keyed, @Nullable ValueFormatter formatter) { + super(keyed, formatter); } - Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) { - super(key, from, to, docCount, aggregations, formatter); + Bucket(String key, double from, double to, long docCount, List aggregations, boolean keyed, @Nullable ValueFormatter formatter) { + this(key, from, to, docCount, new InternalAggregations(aggregations), keyed, formatter); + } + + Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) { + super(key, from, to, docCount, aggregations, keyed, formatter); } @Override protected InternalRange.Factory getFactory() { return FACTORY; } + + boolean keyed() { + return keyed; + } + + ValueFormatter formatter() { + return formatter; + } } private static class Factory extends InternalRange.Factory { @@ -80,8 +112,8 @@ public class InternalGeoDistance extends InternalRange getBuckets(); + List getBuckets(); @Override IPv4Range.Bucket getBucketByKey(String key); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/InternalIPv4Range.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/InternalIPv4Range.java index c34766775dc..eb7e08770f2 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/InternalIPv4Range.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/InternalIPv4Range.java @@ -23,6 +23,8 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; +import org.elasticsearch.search.aggregations.bucket.BucketStreams; import org.elasticsearch.search.aggregations.bucket.range.InternalRange; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; @@ -47,20 +49,41 @@ public class InternalIPv4Range extends InternalRange i } }; + private final static BucketStreams.Stream BUCKET_STREAM = new BucketStreams.Stream() { + @Override + public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { + Bucket buckets = new Bucket(context.keyed()); + buckets.readFrom(in); + return buckets; + } + + @Override + public BucketStreamContext getBucketStreamContext(Bucket bucket) { + BucketStreamContext context = new BucketStreamContext(); + context.keyed(bucket.keyed()); + return context; + } + }; + public static void registerStream() { AggregationStreams.registerStream(STREAM, TYPE.stream()); + BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); } public static final Factory FACTORY = new Factory(); public static class Bucket extends InternalRange.Bucket implements IPv4Range.Bucket { - public Bucket(String key, double from, double to, long docCount, List aggregations) { - super(key, from, to, docCount, new InternalAggregations(aggregations), ValueFormatter.IPv4); + public Bucket(boolean keyed) { + super(keyed, ValueFormatter.IPv4); } - public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations) { - super(key, from, to, docCount, aggregations, ValueFormatter.IPv4); + public Bucket(String key, double from, double to, long docCount, List aggregations, boolean keyed) { + super(key, from, to, docCount, new InternalAggregations(aggregations), keyed, ValueFormatter.IPv4); + } + + public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed) { + super(key, from, to, docCount, aggregations, keyed, ValueFormatter.IPv4); } @Override @@ -79,6 +102,10 @@ public class InternalIPv4Range extends InternalRange i protected InternalRange.Factory getFactory() { return FACTORY; } + + boolean keyed() { + return keyed; + } } private static class Factory extends InternalRange.Factory { @@ -94,8 +121,8 @@ public class InternalIPv4Range extends InternalRange i } @Override - public Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) { - return new Bucket(key, from, to, docCount, aggregations); + public Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) { + return new Bucket(key, from, to, docCount, aggregations, keyed); } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java index 3e0eef8173a..965146cdf91 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java @@ -36,7 +36,7 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple protected SignificanceHeuristic significanceHeuristic; protected int requiredSize; protected long minDocCount; - protected Collection buckets; + protected List buckets; protected Map bucketMap; protected long subsetSize; protected long supersetSize; @@ -50,6 +50,11 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple protected InternalAggregations aggregations; double score; + protected Bucket(long subsetSize, long supersetSize) { + // for serialization + super(subsetSize, supersetSize); + } + protected Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) { super(subsetDf, subsetSize, supersetDf, supersetSize); this.aggregations = aggregations; @@ -110,7 +115,7 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple } } - protected InternalSignificantTerms(long subsetSize, long supersetSize, String name, int requiredSize, long minDocCount, SignificanceHeuristic significanceHeuristic, Collection buckets) { + protected InternalSignificantTerms(long subsetSize, long supersetSize, String name, int requiredSize, long minDocCount, SignificanceHeuristic significanceHeuristic, List buckets) { super(name); this.requiredSize = requiredSize; this.minDocCount = minDocCount; @@ -127,9 +132,9 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple } @Override - public Collection getBuckets() { + public List getBuckets() { Object o = buckets; - return (Collection) o; + return (List) o; } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java index 1c265cf4d44..ca35abd14d9 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java @@ -27,6 +27,8 @@ import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; +import org.elasticsearch.search.aggregations.bucket.BucketStreams; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicStreams; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; @@ -34,8 +36,9 @@ import org.elasticsearch.search.aggregations.support.format.ValueFormatterStream import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * @@ -53,16 +56,46 @@ public class SignificantLongTerms extends InternalSignificantTerms { } }; + private final static BucketStreams.Stream BUCKET_STREAM = new BucketStreams.Stream() { + @Override + public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { + Bucket buckets = new Bucket((long) context.attributes().get("subsetSize"), (long) context.attributes().get("supersetSize"), + context.formatter()); + buckets.readFrom(in); + return buckets; + } + + @Override + public BucketStreamContext getBucketStreamContext(Bucket bucket) { + BucketStreamContext context = new BucketStreamContext(); + Map attributes = new HashMap<>(); + attributes.put("subsetSize", bucket.subsetSize); + attributes.put("supersetSize", bucket.supersetSize); + context.attributes(attributes); + context.formatter(bucket.formatter); + return context; + } + }; + public static void registerStreams() { AggregationStreams.registerStream(STREAM, TYPE.stream()); + BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); } static class Bucket extends InternalSignificantTerms.Bucket { long term; + private transient final ValueFormatter formatter; - public Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations) { + public Bucket(long subsetSize, long supersetSize, @Nullable ValueFormatter formatter) { + super(subsetSize, supersetSize); + this.formatter = formatter; + // for serialization + } + + public Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations, @Nullable ValueFormatter formatter) { super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations); + this.formatter = formatter; this.term = term; } @@ -88,9 +121,39 @@ public class SignificantLongTerms extends InternalSignificantTerms { @Override Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) { - return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations); + return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations, formatter); } + @Override + public void readFrom(StreamInput in) throws IOException { + subsetDf = in.readVLong(); + supersetDf = in.readVLong(); + term = in.readLong(); + aggregations = InternalAggregations.readAggregations(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(subsetDf); + out.writeVLong(supersetDf); + out.writeLong(term); + aggregations.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(CommonFields.KEY, term); + if (formatter != null) { + builder.field(CommonFields.KEY_AS_STRING, formatter.format(term)); + } + builder.field(CommonFields.DOC_COUNT, getDocCount()); + builder.field("score", score); + builder.field("bg_count", supersetDf); + aggregations.toXContentInternal(builder, params); + builder.endObject(); + return builder; + } } private ValueFormatter formatter; @@ -99,7 +162,7 @@ public class SignificantLongTerms extends InternalSignificantTerms { } // for serialization public SignificantLongTerms(long subsetSize, long supersetSize, String name, @Nullable ValueFormatter formatter, - int requiredSize, long minDocCount, SignificanceHeuristic significanceHeuristic, Collection buckets) { + int requiredSize, long minDocCount, SignificanceHeuristic significanceHeuristic, List buckets) { super(subsetSize, supersetSize, name, requiredSize, minDocCount, significanceHeuristic, buckets); this.formatter = formatter; @@ -129,12 +192,10 @@ public class SignificantLongTerms extends InternalSignificantTerms { int size = in.readVInt(); List buckets = new ArrayList<>(size); for (int i = 0; i < size; i++) { - long subsetDf = in.readVLong(); - long supersetDf = in.readVLong(); - long term = in.readLong(); - Bucket readBucket = new Bucket(subsetDf, subsetSize, supersetDf,supersetSize, term, InternalAggregations.readAggregations(in)); - readBucket.updateScore(significanceHeuristic); - buckets.add(readBucket); + Bucket bucket = new Bucket(subsetSize, supersetSize, formatter); + bucket.readFrom(in); + bucket.updateScore(significanceHeuristic); + buckets.add(bucket); } this.buckets = buckets; this.bucketMap = null; @@ -153,10 +214,7 @@ public class SignificantLongTerms extends InternalSignificantTerms { } out.writeVInt(buckets.size()); for (InternalSignificantTerms.Bucket bucket : buckets) { - out.writeVLong(((Bucket) bucket).subsetDf); - out.writeVLong(((Bucket) bucket).supersetDf); - out.writeLong(((Bucket) bucket).term); - ((InternalAggregations) bucket.getAggregations()).writeTo(out); + bucket.writeTo(out); } } @@ -165,16 +223,7 @@ public class SignificantLongTerms extends InternalSignificantTerms { builder.field("doc_count", subsetSize); builder.startArray(CommonFields.BUCKETS); for (InternalSignificantTerms.Bucket bucket : buckets) { - builder.startObject(); - builder.field(CommonFields.KEY, ((Bucket) bucket).term); - if (formatter != null) { - builder.field(CommonFields.KEY_AS_STRING, formatter.format(((Bucket) bucket).term)); - } - builder.field(CommonFields.DOC_COUNT, bucket.getDocCount()); - builder.field("score", bucket.score); - builder.field("bg_count", bucket.supersetDf); - ((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params); - builder.endObject(); + bucket.toXContent(builder, params); } builder.endArray(); return builder; diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java index be4367caf4a..ff3b2e272d0 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java @@ -69,7 +69,7 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator { SignificantLongTerms.Bucket spare = null; for (long i = 0; i < bucketOrds.size(); i++) { if (spare == null) { - spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null); + spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, formatter); } spare.term = bucketOrds.get(i); spare.subsetDf = bucketDocCount(i); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java index be813fa5a71..2184feba279 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java @@ -29,13 +29,16 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; +import org.elasticsearch.search.aggregations.bucket.BucketStreams; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicStreams; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * @@ -53,6 +56,30 @@ public class SignificantStringTerms extends InternalSignificantTerms { } }; + private final static BucketStreams.Stream BUCKET_STREAM = new BucketStreams.Stream() { + @Override + public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { + Bucket buckets = new Bucket((long) context.attributes().get("subsetSize"), (long) context.attributes().get("supersetSize")); + buckets.readFrom(in); + return buckets; + } + + @Override + public BucketStreamContext getBucketStreamContext(Bucket bucket) { + BucketStreamContext context = new BucketStreamContext(); + Map attributes = new HashMap<>(); + attributes.put("subsetSize", bucket.subsetSize); + attributes.put("supersetSize", bucket.supersetSize); + context.attributes(attributes); + return context; + } + }; + + public static void registerStream() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); + } + public static void registerStreams() { AggregationStreams.registerStream(STREAM, TYPE.stream()); } @@ -61,6 +88,10 @@ public class SignificantStringTerms extends InternalSignificantTerms { BytesRef termBytes; + public Bucket(long subsetSize, long supersetSize) { + // for serialization + super(subsetSize, supersetSize); + } public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) { super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations); @@ -92,12 +123,40 @@ public class SignificantStringTerms extends InternalSignificantTerms { Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) { return new Bucket(termBytes, subsetDf, subsetSize, supersetDf, supersetSize, aggregations); } + + @Override + public void readFrom(StreamInput in) throws IOException { + termBytes = in.readBytesRef(); + subsetDf = in.readVLong(); + supersetDf = in.readVLong(); + aggregations = InternalAggregations.readAggregations(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBytesRef(termBytes); + out.writeVLong(subsetDf); + out.writeVLong(supersetDf); + aggregations.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.utf8Field(CommonFields.KEY, termBytes); + builder.field(CommonFields.DOC_COUNT, getDocCount()); + builder.field("score", score); + builder.field("bg_count", supersetDf); + aggregations.toXContentInternal(builder, params); + builder.endObject(); + return builder; + } } SignificantStringTerms() {} // for serialization public SignificantStringTerms(long subsetSize, long supersetSize, String name, int requiredSize, - long minDocCount, SignificanceHeuristic significanceHeuristic, Collection buckets) { + long minDocCount, SignificanceHeuristic significanceHeuristic, List buckets) { super(subsetSize, supersetSize, name, requiredSize, minDocCount, significanceHeuristic, buckets); } @@ -123,12 +182,10 @@ public class SignificantStringTerms extends InternalSignificantTerms { int size = in.readVInt(); List buckets = new ArrayList<>(size); for (int i = 0; i < size; i++) { - BytesRef term = in.readBytesRef(); - long subsetDf = in.readVLong(); - long supersetDf = in.readVLong(); - Bucket readBucket = new Bucket(term, subsetDf, subsetSize, supersetDf, supersetSize, InternalAggregations.readAggregations(in)); - readBucket.updateScore(significanceHeuristic); - buckets.add(readBucket); + Bucket bucket = new Bucket(subsetSize, supersetSize); + bucket.readFrom(in); + bucket.updateScore(significanceHeuristic); + buckets.add(bucket); } this.buckets = buckets; this.bucketMap = null; @@ -146,10 +203,7 @@ public class SignificantStringTerms extends InternalSignificantTerms { } out.writeVInt(buckets.size()); for (InternalSignificantTerms.Bucket bucket : buckets) { - out.writeBytesRef(((Bucket) bucket).termBytes); - out.writeVLong(((Bucket) bucket).subsetDf); - out.writeVLong(((Bucket) bucket).supersetDf); - ((InternalAggregations) bucket.getAggregations()).writeTo(out); + bucket.writeTo(out); } } @@ -161,13 +215,7 @@ public class SignificantStringTerms extends InternalSignificantTerms { //There is a condition (presumably when only one shard has a bucket?) where reduce is not called // and I end up with buckets that contravene the user's min_doc_count criteria in my reducer if (bucket.subsetDf >= minDocCount) { - builder.startObject(); - builder.utf8Field(CommonFields.KEY, ((Bucket) bucket).termBytes); - builder.field(CommonFields.DOC_COUNT, bucket.getDocCount()); - builder.field("score", bucket.score); - builder.field("bg_count", bucket.supersetDf); - ((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params); - builder.endObject(); + bucket.toXContent(builder, params); } } builder.endArray(); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTerms.java index 424a32375eb..f80f7574b18 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTerms.java @@ -20,7 +20,7 @@ package org.elasticsearch.search.aggregations.bucket.significant; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; -import java.util.Collection; +import java.util.List; /** * An aggregation that collects significant terms in comparison to a background set. @@ -35,14 +35,18 @@ public interface SignificantTerms extends MultiBucketsAggregation, Iterable getBuckets(); + List getBuckets(); @Override Bucket getBucketByKey(String key); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java index 4f80abf8c13..782ffdf601f 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java @@ -38,7 +38,7 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms { public static final Type TYPE = new Type("significant_terms", "umsigterms"); - private static final Collection BUCKETS = Collections.emptyList(); + private static final List BUCKETS = Collections.emptyList(); private static final Map BUCKETS_MAP = Collections.emptyMap(); public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java index 78fa778589b..c647102aee1 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -27,12 +27,16 @@ import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; +import org.elasticsearch.search.aggregations.bucket.BucketStreams; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * @@ -50,16 +54,40 @@ public class DoubleTerms extends InternalTerms { } }; + private final static BucketStreams.Stream BUCKET_STREAM = new BucketStreams.Stream() { + @Override + public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { + Bucket buckets = new Bucket(context.formatter(), (boolean) context.attributes().get("showDocCountError")); + buckets.readFrom(in); + return buckets; + } + + @Override + public BucketStreamContext getBucketStreamContext(Bucket bucket) { + BucketStreamContext context = new BucketStreamContext(); + Map attributes = new HashMap<>(); + attributes.put("showDocCountError", bucket.showDocCountError); + context.attributes(attributes); + context.formatter(bucket.formatter); + return context; + } + }; + public static void registerStreams() { AggregationStreams.registerStream(STREAM, TYPE.stream()); + BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); } static class Bucket extends InternalTerms.Bucket { double term; - public Bucket(double term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError) { - super(docCount, aggregations, showDocCountError, docCountError); + public Bucket(@Nullable ValueFormatter formatter, boolean showDocCountError) { + super(formatter, showDocCountError); + } + + public Bucket(double term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError, @Nullable ValueFormatter formatter) { + super(docCount, aggregations, showDocCountError, docCountError, formatter); this.term = term; } @@ -90,7 +118,44 @@ public class DoubleTerms extends InternalTerms { @Override Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) { - return new Bucket(term, docCount, aggs, showDocCountError, docCountError); + return new Bucket(term, docCount, aggs, showDocCountError, docCountError, formatter); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + term = in.readDouble(); + docCount = in.readVLong(); + docCountError = -1; + if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showDocCountError) { + docCountError = in.readLong(); + } + aggregations = InternalAggregations.readAggregations(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeDouble(term); + out.writeVLong(getDocCount()); + if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showDocCountError) { + out.writeLong(docCountError); + } + aggregations.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(CommonFields.KEY, term); + if (formatter != null && formatter != ValueFormatter.RAW) { + builder.field(CommonFields.KEY_AS_STRING, formatter.format(term)); + } + builder.field(CommonFields.DOC_COUNT, getDocCount()); + if (showDocCountError) { + builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, getDocCountError()); + } + aggregations.toXContentInternal(builder, params); + builder.endObject(); + return builder; } } @@ -135,14 +200,9 @@ public class DoubleTerms extends InternalTerms { int size = in.readVInt(); List buckets = new ArrayList<>(size); for (int i = 0; i < size; i++) { - double term = in.readDouble(); - long docCount = in.readVLong(); - long bucketDocCountError = -1; - if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showTermDocCountError) { - bucketDocCountError = in.readLong(); - } - InternalAggregations aggregations = InternalAggregations.readAggregations(in); - buckets.add(new Bucket(term, docCount, aggregations, showTermDocCountError, bucketDocCountError)); + Bucket bucket = new Bucket(formatter, showTermDocCountError); + bucket.readFrom(in); + buckets.add(bucket); } this.buckets = buckets; this.bucketMap = null; @@ -164,12 +224,7 @@ public class DoubleTerms extends InternalTerms { out.writeVLong(minDocCount); out.writeVInt(buckets.size()); for (InternalTerms.Bucket bucket : buckets) { - out.writeDouble(((Bucket) bucket).term); - out.writeVLong(bucket.getDocCount()); - if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showTermDocCountError) { - out.writeLong(bucket.docCountError); - } - ((InternalAggregations) bucket.getAggregations()).writeTo(out); + bucket.writeTo(out); } } @@ -178,17 +233,7 @@ public class DoubleTerms extends InternalTerms { builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError); builder.startArray(CommonFields.BUCKETS); for (InternalTerms.Bucket bucket : buckets) { - builder.startObject(); - builder.field(CommonFields.KEY, ((Bucket) bucket).term); - if (formatter != null && formatter != ValueFormatter.RAW) { - builder.field(CommonFields.KEY_AS_STRING, formatter.format(((Bucket) bucket).term)); - } - builder.field(CommonFields.DOC_COUNT, bucket.getDocCount()); - if (showTermDocCountError) { - builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, bucket.getDocCountError()); - } - ((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params); - builder.endObject(); + bucket.toXContent(builder, params); } builder.endArray(); return builder; diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java index 426102b2f22..6aa4dece6d3 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java @@ -62,7 +62,7 @@ public class DoubleTermsAggregator extends LongTermsAggregator { private static DoubleTerms.Bucket convertToDouble(InternalTerms.Bucket bucket) { final long term = bucket.getKeyAsNumber().longValue(); final double value = NumericUtils.sortableLongToDouble(term); - return new DoubleTerms.Bucket(value, bucket.docCount, bucket.aggregations, bucket.showDocCountError, bucket.docCountError); + return new DoubleTerms.Bucket(value, bucket.docCount, bucket.aggregations, bucket.showDocCountError, bucket.docCountError, bucket.formatter); } private static DoubleTerms convertToDouble(LongTerms terms) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index fb05c4f2bcf..30048184918 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -28,10 +28,14 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.LongBitSet; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.fielddata.AbstractRandomAccessOrds; import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalMapping; import org.elasticsearch.search.aggregations.Aggregator; @@ -201,7 +205,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr long globalOrd; OrdBucket(long globalOrd, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError) { - super(docCount, aggregations, showDocCountError, docCountError); + super(docCount, aggregations, showDocCountError, docCountError, null); this.globalOrd = globalOrd; } @@ -234,6 +238,21 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr public Number getKeyAsNumber() { throw new UnsupportedOperationException(); } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + throw new UnsupportedOperationException(); + } } private static interface Collector { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index c2cfc21c885..fd0db757bd1 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -22,12 +22,14 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import java.util.*; @@ -46,12 +48,18 @@ public abstract class InternalTerms extends InternalAggregation implements Terms protected long docCountError; protected InternalAggregations aggregations; protected boolean showDocCountError; + transient final ValueFormatter formatter; + protected Bucket(@Nullable ValueFormatter formatter, boolean showDocCountError) { + // for serialization + this.showDocCountError = showDocCountError; + this.formatter = formatter; + } - protected Bucket(long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError) { + protected Bucket(long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError, @Nullable ValueFormatter formatter) { + this(formatter, showDocCountError); this.docCount = docCount; this.aggregations = aggregations; - this.showDocCountError = showDocCountError; this.docCountError = docCountError; } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java index f7ec921e66a..24cb842c964 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java @@ -27,12 +27,16 @@ import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; +import org.elasticsearch.search.aggregations.bucket.BucketStreams; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * @@ -50,17 +54,40 @@ public class LongTerms extends InternalTerms { } }; + private final static BucketStreams.Stream BUCKET_STREAM = new BucketStreams.Stream() { + @Override + public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { + Bucket buckets = new Bucket(context.formatter(), (boolean) context.attributes().get("showDocCountError")); + buckets.readFrom(in); + return buckets; + } + + @Override + public BucketStreamContext getBucketStreamContext(Bucket bucket) { + BucketStreamContext context = new BucketStreamContext(); + Map attributes = new HashMap<>(); + attributes.put("showDocCountError", bucket.showDocCountError); + context.attributes(attributes); + context.formatter(bucket.formatter); + return context; + } + }; + public static void registerStreams() { AggregationStreams.registerStream(STREAM, TYPE.stream()); + BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); } - static class Bucket extends InternalTerms.Bucket { long term; - public Bucket(long term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError) { - super(docCount, aggregations, showDocCountError, docCountError); + public Bucket(@Nullable ValueFormatter formatter, boolean showDocCountError) { + super(formatter, showDocCountError); + } + + public Bucket(long term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError, @Nullable ValueFormatter formatter) { + super(docCount, aggregations, showDocCountError, docCountError, formatter); this.term = term; } @@ -91,7 +118,44 @@ public class LongTerms extends InternalTerms { @Override Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) { - return new Bucket(term, docCount, aggs, showDocCountError, docCountError); + return new Bucket(term, docCount, aggs, showDocCountError, docCountError, formatter); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + term = in.readLong(); + docCount = in.readVLong(); + docCountError = -1; + if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showDocCountError) { + docCountError = in.readLong(); + } + aggregations = InternalAggregations.readAggregations(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(term); + out.writeVLong(getDocCount()); + if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showDocCountError) { + out.writeLong(docCountError); + } + aggregations.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(CommonFields.KEY, term); + if (formatter != null && formatter != ValueFormatter.RAW) { + builder.field(CommonFields.KEY_AS_STRING, formatter.format(term)); + } + builder.field(CommonFields.DOC_COUNT, getDocCount()); + if (showDocCountError) { + builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, getDocCountError()); + } + aggregations.toXContentInternal(builder, params); + builder.endObject(); + return builder; } } @@ -136,14 +200,9 @@ public class LongTerms extends InternalTerms { int size = in.readVInt(); List buckets = new ArrayList<>(size); for (int i = 0; i < size; i++) { - long term = in.readLong(); - long docCount = in.readVLong(); - long bucketDocCountError = -1; - if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showTermDocCountError) { - bucketDocCountError = in.readLong(); - } - InternalAggregations aggregations = InternalAggregations.readAggregations(in); - buckets.add(new Bucket(term, docCount, aggregations, showTermDocCountError, bucketDocCountError)); + Bucket bucket = new Bucket(formatter, showTermDocCountError); + bucket.readFrom(in); + buckets.add(bucket); } this.buckets = buckets; this.bucketMap = null; @@ -165,12 +224,7 @@ public class LongTerms extends InternalTerms { out.writeVLong(minDocCount); out.writeVInt(buckets.size()); for (InternalTerms.Bucket bucket : buckets) { - out.writeLong(((Bucket) bucket).term); - out.writeVLong(bucket.getDocCount()); - if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showTermDocCountError) { - out.writeLong(bucket.docCountError); - } - ((InternalAggregations) bucket.getAggregations()).writeTo(out); + bucket.writeTo(out); } } @@ -179,17 +233,7 @@ public class LongTerms extends InternalTerms { builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError); builder.startArray(CommonFields.BUCKETS); for (InternalTerms.Bucket bucket : buckets) { - builder.startObject(); - builder.field(CommonFields.KEY, ((Bucket) bucket).term); - if (formatter != null && formatter != ValueFormatter.RAW) { - builder.field(CommonFields.KEY_AS_STRING, formatter.format(((Bucket) bucket).term)); - } - builder.field(CommonFields.DOC_COUNT, bucket.getDocCount()); - if (showTermDocCountError) { - builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, bucket.getDocCountError()); - } - ((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params); - builder.endObject(); + bucket.toXContent(builder, params); } builder.endArray(); return builder; diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java index 5c5668addf1..52a1339de30 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java @@ -126,7 +126,7 @@ public class LongTermsAggregator extends TermsAggregator { LongTerms.Bucket spare = null; for (long i = 0; i < bucketOrds.size(); i++) { if (spare == null) { - spare = new LongTerms.Bucket(0, 0, null, showTermDocCountError, 0); + spare = new LongTerms.Bucket(0, 0, null, showTermDocCountError, 0, formatter); } spare.term = bucketOrds.get(i); spare.docCount = bucketDocCount(i); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java index 93e385565b6..eb48681a3c1 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java @@ -29,10 +29,14 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; +import org.elasticsearch.search.aggregations.bucket.BucketStreams; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * @@ -50,8 +54,27 @@ public class StringTerms extends InternalTerms { } }; + private final static BucketStreams.Stream BUCKET_STREAM = new BucketStreams.Stream() { + @Override + public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { + Bucket buckets = new Bucket((boolean) context.attributes().get("showDocCountError")); + buckets.readFrom(in); + return buckets; + } + + @Override + public BucketStreamContext getBucketStreamContext(Bucket bucket) { + BucketStreamContext context = new BucketStreamContext(); + Map attributes = new HashMap<>(); + attributes.put("showDocCountError", bucket.showDocCountError); + context.attributes(attributes); + return context; + } + }; + public static void registerStreams() { AggregationStreams.registerStream(STREAM, TYPE.stream()); + BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); } @@ -59,8 +82,12 @@ public class StringTerms extends InternalTerms { BytesRef termBytes; + public Bucket(boolean showDocCountError) { + super(null, showDocCountError); + } + public Bucket(BytesRef term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError) { - super(docCount, aggregations, showDocCountError, docCountError); + super(docCount, aggregations, showDocCountError, docCountError, null); this.termBytes = term; } @@ -94,6 +121,40 @@ public class StringTerms extends InternalTerms { Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) { return new Bucket(termBytes, docCount, aggs, showDocCountError, docCountError); } + + @Override + public void readFrom(StreamInput in) throws IOException { + termBytes = in.readBytesRef(); + docCount = in.readVLong(); + docCountError = -1; + if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showDocCountError) { + docCountError = in.readLong(); + } + aggregations = InternalAggregations.readAggregations(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBytesRef(termBytes); + out.writeVLong(getDocCount()); + if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showDocCountError) { + out.writeLong(docCountError); + } + aggregations.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.utf8Field(CommonFields.KEY, termBytes); + builder.field(CommonFields.DOC_COUNT, getDocCount()); + if (showDocCountError) { + builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, getDocCountError()); + } + aggregations.toXContentInternal(builder, params); + builder.endObject(); + return builder; + } } StringTerms() {} // for serialization @@ -133,14 +194,9 @@ public class StringTerms extends InternalTerms { int size = in.readVInt(); List buckets = new ArrayList<>(size); for (int i = 0; i < size; i++) { - BytesRef termBytes = in.readBytesRef(); - long docCount = in.readVLong(); - long bucketDocCountError = -1; - if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showTermDocCountError) { - bucketDocCountError = in.readLong(); - } - InternalAggregations aggregations = InternalAggregations.readAggregations(in); - buckets.add(new Bucket(termBytes, docCount, aggregations, showTermDocCountError, bucketDocCountError)); + Bucket bucket = new Bucket(showTermDocCountError); + bucket.readFrom(in); + buckets.add(bucket); } this.buckets = buckets; this.bucketMap = null; @@ -161,12 +217,7 @@ public class StringTerms extends InternalTerms { out.writeVLong(minDocCount); out.writeVInt(buckets.size()); for (InternalTerms.Bucket bucket : buckets) { - out.writeBytesRef(((Bucket) bucket).termBytes); - out.writeVLong(bucket.getDocCount()); - if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showTermDocCountError) { - out.writeLong(bucket.docCountError); - } - ((InternalAggregations) bucket.getAggregations()).writeTo(out); + bucket.writeTo(out); } } @@ -175,14 +226,7 @@ public class StringTerms extends InternalTerms { builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError); builder.startArray(CommonFields.BUCKETS); for (InternalTerms.Bucket bucket : buckets) { - builder.startObject(); - builder.utf8Field(CommonFields.KEY, ((Bucket) bucket).termBytes); - builder.field(CommonFields.DOC_COUNT, bucket.getDocCount()); - if (showTermDocCountError) { - builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, bucket.getDocCountError()); - } - ((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params); - builder.endObject(); + bucket.toXContent(builder, params); } builder.endArray(); return builder; diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java index dd096340d83..638a1115c0f 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java @@ -99,7 +99,7 @@ public class SignificanceHeuristicTests extends ElasticsearchTestCase { ArrayList buckets = new ArrayList<>(); if (randomBoolean()) { BytesRef term = new BytesRef("123.0"); - buckets.add(new SignificantLongTerms.Bucket(1, 2, 3, 4, 123, InternalAggregations.EMPTY)); + buckets.add(new SignificantLongTerms.Bucket(1, 2, 3, 4, 123, InternalAggregations.EMPTY, null)); sTerms[0] = new SignificantLongTerms(10, 20, "some_name", null, 1, 1, heuristic, buckets); sTerms[1] = new SignificantLongTerms(); } else {