Aggregations: Buckets can now be serialized outside of an Aggregation

This change means that buckets can now be serialised to JSON and serialized and deserialized to the transport API outside of the aggregation that contains them.  This is a required change for #8110 (Reducers framework) but should make sense on it's own since object should really take care of their own serialization rather than relying on their parent object.
This commit is contained in:
Colin Goodheart-Smithe 2014-09-15 15:56:07 +01:00
parent b0a14f0b63
commit 4723c2a2ee
34 changed files with 949 additions and 237 deletions

View File

@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.Map;
public class BucketStreamContext implements Streamable {
@Nullable
private ValueFormatter formatter;
private boolean keyed;
private Map<String, Object> attributes;
public BucketStreamContext() {
}
public void formatter(@Nullable ValueFormatter formatter) {
this.formatter = formatter;
}
public ValueFormatter formatter() {
return formatter;
}
public void keyed(boolean keyed) {
this.keyed = keyed;
}
public boolean keyed() {
return keyed;
}
public void attributes(Map<String, Object> attributes) {
this.attributes = attributes;
}
public Map<String, Object> attributes() {
return attributes;
}
@Override
public void readFrom(StreamInput in) throws IOException {
formatter = ValueFormatterStreams.readOptional(in);
keyed = in.readBoolean();
attributes = in.readMap();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
ValueFormatterStreams.writeOptional(formatter, out);
out.writeBoolean(keyed);
out.writeMap(attributes);
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
public class BucketStreams {
private static ImmutableMap<BytesReference, Stream> STREAMS = ImmutableMap.of();
/**
* A stream that knows how to read a bucket from the input.
*/
public static interface Stream<B extends MultiBucketsAggregation.Bucket> {
B readResult(StreamInput in, BucketStreamContext context) throws IOException;
BucketStreamContext getBucketStreamContext(B bucket);
}
/**
* Registers the given stream and associate it with the given types.
*
* @param stream The streams to register
* @param types The types associated with the streams
*/
public static synchronized void registerStream(Stream stream, BytesReference... types) {
MapBuilder<BytesReference, Stream> uStreams = MapBuilder.newMapBuilder(STREAMS);
for (BytesReference type : types) {
uStreams.put(type, stream);
}
STREAMS = uStreams.immutableMap();
}
/**
* Returns the stream that is registered for the given type
*
* @param type The given type
* @return The associated stream
*/
public static Stream stream(BytesReference type) {
return STREAMS.get(type);
}
}

View File

@ -19,14 +19,17 @@
package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.Comparators;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.HasAggregations;
import org.elasticsearch.search.aggregations.support.OrderPath;
import java.util.Collection;
import java.util.List;
/**
* An aggregation that returns multiple buckets
@ -38,7 +41,7 @@ public interface MultiBucketsAggregation extends Aggregation {
* A bucket represents a criteria to which all documents that fall in it adhere to. It is also uniquely identified
* by a key, and can potentially hold sub-aggregations computed over all documents in it.
*/
public interface Bucket extends HasAggregations {
public interface Bucket extends HasAggregations, ToXContent, Streamable {
/**
* @return The key associated with the bucket as a string
@ -90,7 +93,7 @@ public interface MultiBucketsAggregation extends Aggregation {
/**
* @return The buckets of this aggregation.
*/
Collection<? extends Bucket> getBuckets();
List<? extends Bucket> getBuckets();
/**
* The bucket that is associated with the given key.

View File

@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.bucket.filters;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import java.util.Collection;
import java.util.List;
/**
* A multi bucket aggregation where the buckets are defined by a set of filters (a bucket per filter). Each bucket
@ -38,7 +39,7 @@ public interface Filters extends MultiBucketsAggregation {
/**
* The buckets created by this aggregation.
*/
Collection<? extends Bucket> getBuckets();
List<? extends Bucket> getBuckets();
@Override
Bucket getBucketByKey(String key);

View File

@ -91,7 +91,7 @@ public class FiltersAggregator extends BucketsAggregator {
for (int i = 0; i < filters.length; i++) {
KeyedFilter filter = filters[i];
long bucketOrd = bucketOrd(owningBucketOrdinal, i);
InternalFilters.Bucket bucket = new InternalFilters.Bucket(filter.key, bucketDocCount(bucketOrd), bucketAggregations(bucketOrd));
InternalFilters.Bucket bucket = new InternalFilters.Bucket(filter.key, bucketDocCount(bucketOrd), bucketAggregations(bucketOrd), keyed);
buckets.add(bucket);
}
return new InternalFilters(name, buckets, keyed);
@ -102,7 +102,7 @@ public class FiltersAggregator extends BucketsAggregator {
InternalAggregations subAggs = buildEmptySubAggregations();
List<InternalFilters.Bucket> buckets = Lists.newArrayListWithCapacity(filters.length);
for (int i = 0; i < filters.length; i++) {
InternalFilters.Bucket bucket = new InternalFilters.Bucket(filters[i].key, 0, subAggs);
InternalFilters.Bucket bucket = new InternalFilters.Bucket(filters[i].key, 0, subAggs, keyed);
buckets.add(bucket);
}
return new InternalFilters(name, buckets, keyed);

View File

@ -29,9 +29,14 @@ import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
@ -49,20 +54,44 @@ public class InternalFilters extends InternalAggregation implements Filters {
}
};
private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
@Override
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
Bucket filters = new Bucket(context.keyed());
filters.readFrom(in);
return filters;
}
@Override
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
BucketStreamContext context = new BucketStreamContext();
context.keyed(bucket.keyed);
return context;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}
public static class Bucket implements Filters.Bucket {
private final boolean keyed;
private String key;
private long docCount;
InternalAggregations aggregations;
public Bucket(String key, long docCount, InternalAggregations aggregations) {
private Bucket(boolean keyed) {
// for serialization
this.keyed = keyed;
}
public Bucket(String key, long docCount, InternalAggregations aggregations, boolean keyed) {
this.key = key;
this.docCount = docCount;
this.aggregations = aggregations;
this.keyed = keyed;
}
public String getKey() {
@ -89,7 +118,7 @@ public class InternalFilters extends InternalAggregation implements Filters {
List<InternalAggregations> aggregationsList = Lists.newArrayListWithCapacity(buckets.size());
for (Bucket bucket : buckets) {
if (reduced == null) {
reduced = new Bucket(bucket.key, bucket.docCount, bucket.aggregations);
reduced = new Bucket(bucket.key, bucket.docCount, bucket.aggregations, bucket.keyed);
} else {
reduced.docCount += bucket.docCount;
}
@ -99,7 +128,8 @@ public class InternalFilters extends InternalAggregation implements Filters {
return reduced;
}
void toXContent(XContentBuilder builder, Params params, boolean keyed) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (keyed) {
builder.startObject(key);
} else {
@ -108,6 +138,21 @@ public class InternalFilters extends InternalAggregation implements Filters {
builder.field(CommonFields.DOC_COUNT, docCount);
aggregations.toXContentInternal(builder, params);
builder.endObject();
return builder;
}
@Override
public void readFrom(StreamInput in) throws IOException {
key = in.readOptionalString();
docCount = in.readVLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(key);
out.writeVLong(docCount);
aggregations.writeTo(out);
}
}
@ -129,7 +174,7 @@ public class InternalFilters extends InternalAggregation implements Filters {
}
@Override
public Collection<Bucket> getBuckets() {
public List<Bucket> getBuckets() {
return buckets;
}
@ -179,8 +224,9 @@ public class InternalFilters extends InternalAggregation implements Filters {
int size = in.readVInt();
List<Bucket> buckets = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
String key = in.readOptionalString();
buckets.add(new Bucket(key, in.readVLong(), InternalAggregations.readAggregations(in)));
Bucket bucket = new Bucket(keyed);
bucket.readFrom(in);
buckets.add(bucket);
}
this.buckets = buckets;
this.bucketMap = null;
@ -192,9 +238,7 @@ public class InternalFilters extends InternalAggregation implements Filters {
out.writeBoolean(keyed);
out.writeVInt(buckets.size());
for (Bucket bucket : buckets) {
out.writeOptionalString(bucket.key);
out.writeVLong(bucket.docCount);
bucket.aggregations.writeTo(out);
bucket.writeTo(out);
}
}
@ -206,7 +250,7 @@ public class InternalFilters extends InternalAggregation implements Filters {
builder.startArray(CommonFields.BUCKETS);
}
for (Bucket bucket : buckets) {
bucket.toXContent(builder, params, keyed);
bucket.toXContent(builder, params);
}
if (keyed) {
builder.endObject();

View File

@ -23,6 +23,7 @@ import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import java.util.Collection;
import java.util.List;
/**
* A {@code geohash_grid} aggregation. Defines multiple buckets, each representing a cell in a geo-grid of a specific
@ -51,7 +52,7 @@ public interface GeoHashGrid extends MultiBucketsAggregation {
* @return The buckets of this aggregation (each bucket representing a geohash grid cell)
*/
@Override
Collection<Bucket> getBuckets();
List<Bucket> getBuckets();
@Override
Bucket getBucketByKey(String key);

View File

@ -31,6 +31,8 @@ import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import java.io.IOException;
import java.util.*;
@ -53,8 +55,25 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
}
};
public static final BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
@Override
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
Bucket bucket = new Bucket();
bucket.readFrom(in);
return bucket;
}
@Override
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
BucketStreamContext context = new BucketStreamContext();
return context;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}
@ -64,6 +83,10 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
protected long docCount;
protected InternalAggregations aggregations;
public Bucket() {
// For Serialization only
}
public Bucket(long geohashAsLong, long docCount, InternalAggregations aggregations) {
this.docCount = docCount;
this.aggregations = aggregations;
@ -120,6 +143,29 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
return geohashAsLong;
}
@Override
public void readFrom(StreamInput in) throws IOException {
geohashAsLong = in.readLong();
docCount = in.readVLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(geohashAsLong);
out.writeVLong(docCount);
aggregations.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(CommonFields.KEY, getKeyAsText());
builder.field(CommonFields.DOC_COUNT, docCount);
aggregations.toXContentInternal(builder, params);
builder.endObject();
return builder;
}
}
private int requiredSize;
@ -141,9 +187,9 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
}
@Override
public Collection<GeoHashGrid.Bucket> getBuckets() {
public List<GeoHashGrid.Bucket> getBuckets() {
Object o = buckets;
return (Collection<GeoHashGrid.Bucket>) o;
return (List<GeoHashGrid.Bucket>) o;
}
@Override
@ -208,7 +254,9 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
int size = in.readVInt();
List<Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
buckets.add(new Bucket(in.readLong(), in.readVLong(), InternalAggregations.readAggregations(in)));
Bucket bucket = new Bucket();
bucket.readFrom(in);
buckets.add(bucket);
}
this.buckets = buckets;
this.bucketMap = null;
@ -220,9 +268,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
writeSize(requiredSize, out);
out.writeVInt(buckets.size());
for (Bucket bucket : buckets) {
out.writeLong(bucket.geohashAsLong);
out.writeVLong(bucket.getDocCount());
((InternalAggregations) bucket.getAggregations()).writeTo(out);
bucket.writeTo(out);
}
}
@ -230,11 +276,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.startArray(CommonFields.BUCKETS);
for (Bucket bucket : buckets) {
builder.startObject();
builder.field(CommonFields.KEY, bucket.getKeyAsText());
builder.field(CommonFields.DOC_COUNT, bucket.getDocCount());
((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params);
builder.endObject();
bucket.toXContent(builder, params);
}
builder.endArray();
return builder;

View File

@ -114,7 +114,7 @@ public class HistogramAggregator extends BucketsAggregator {
assert owningBucketOrdinal == 0;
List<InternalHistogram.Bucket> buckets = new ArrayList<>((int) bucketOrds.size());
for (long i = 0; i < bucketOrds.size(); i++) {
buckets.add(histogramFactory.createBucket(rounding.valueForKey(bucketOrds.get(i)), bucketDocCount(i), bucketAggregations(i), formatter));
buckets.add(histogramFactory.createBucket(rounding.valueForKey(bucketOrds.get(i)), bucketDocCount(i), bucketAggregations(i), keyed, formatter));
}
CollectionUtil.introSort(buckets, order.comparator());

View File

@ -23,6 +23,8 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -47,14 +49,35 @@ public class InternalDateHistogram extends InternalHistogram<InternalDateHistogr
}
};
private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
@Override
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
Bucket buckets = new Bucket(context.keyed(), context.formatter());
buckets.readFrom(in);
return buckets;
}
@Override
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
BucketStreamContext context = new BucketStreamContext();
context.formatter(bucket.formatter);
return context;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}
static class Bucket extends InternalHistogram.Bucket implements DateHistogram.Bucket {
Bucket(long key, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
super(key, docCount, formatter, aggregations);
Bucket(boolean keyed, @Nullable ValueFormatter formatter) {
super(keyed, formatter);
}
Bucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
super(key, docCount, keyed, formatter, aggregations);
}
@Override
@ -95,8 +118,8 @@ public class InternalDateHistogram extends InternalHistogram<InternalDateHistogr
}
@Override
public InternalDateHistogram.Bucket createBucket(long key, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
return new Bucket(key, docCount, aggregations, formatter);
public InternalDateHistogram.Bucket createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
return new Bucket(key, docCount, aggregations, keyed, formatter);
}
}
@ -142,8 +165,13 @@ public class InternalDateHistogram extends InternalHistogram<InternalDateHistogr
}
@Override
protected InternalDateHistogram.Bucket createBucket(long key, long docCount, InternalAggregations aggregations, ValueFormatter formatter) {
return new Bucket(key, docCount, aggregations, formatter);
protected InternalDateHistogram.Bucket createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, ValueFormatter formatter) {
return new Bucket(key, docCount, aggregations, keyed, formatter);
}
@Override
protected Bucket createEmptyBucket(boolean keyed, ValueFormatter formatter) {
return new Bucket(keyed, formatter);
}
@Override

View File

@ -34,6 +34,8 @@ import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
@ -59,21 +61,46 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
}
};
private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
@Override
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
Bucket histogram = new Bucket(context.keyed(), context.formatter());
histogram.readFrom(in);
return histogram;
}
@Override
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
BucketStreamContext context = new BucketStreamContext();
context.formatter(bucket.formatter);
context.keyed(bucket.keyed);
return context;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}
public static class Bucket implements Histogram.Bucket {
final long key;
final long docCount;
long key;
long docCount;
InternalAggregations aggregations;
private transient final boolean keyed;
protected transient final @Nullable ValueFormatter formatter;
final InternalAggregations aggregations;
public Bucket(long key, long docCount, @Nullable ValueFormatter formatter, InternalAggregations aggregations) {
public Bucket(boolean keyed, @Nullable ValueFormatter formatter) {
this.formatter = formatter;
this.keyed = keyed;
}
public Bucket(long key, long docCount, boolean keyed, @Nullable ValueFormatter formatter, InternalAggregations aggregations) {
this(keyed, formatter);
this.key = key;
this.docCount = docCount;
this.formatter = formatter;
this.aggregations = aggregations;
}
@ -114,10 +141,11 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
aggregations.add((InternalAggregations) bucket.getAggregations());
}
InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return (B) getFactory().createBucket(key, docCount, aggs, formatter);
return (B) getFactory().createBucket(key, docCount, aggs, keyed, formatter);
}
void toXContent(XContentBuilder builder, Params params, boolean keyed, @Nullable ValueFormatter formatter) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (formatter != null && formatter != ValueFormatter.RAW) {
Text keyTxt = new StringText(formatter.format(key));
if (keyed) {
@ -137,6 +165,21 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
builder.field(CommonFields.DOC_COUNT, docCount);
aggregations.toXContentInternal(builder, params);
builder.endObject();
return builder;
}
@Override
public void readFrom(StreamInput in) throws IOException {
key = in.readLong();
docCount = in.readVLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(key);
out.writeVLong(docCount);
aggregations.writeTo(out);
}
}
@ -194,8 +237,8 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
return new InternalHistogram<>(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed);
}
public B createBucket(long key, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
return (B) new Bucket(key, docCount, formatter, aggregations);
public B createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
return (B) new Bucket(key, docCount, keyed, formatter, aggregations);
}
}
@ -295,7 +338,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
long key = bounds.min;
long max = bounds.max;
while (key <= max) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
@ -304,7 +347,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
long key = bounds.min;
if (key < firstBucket.key) {
while (key < firstBucket.key) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
@ -319,7 +362,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
if (lastBucket != null) {
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
while (key < nextBucket.key) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
assert key == nextBucket.key;
@ -332,7 +375,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
long max = bounds.max;
while (key <= max) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
@ -348,8 +391,12 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
return getFactory().create(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, formatter, keyed);
}
protected B createBucket(long key, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
return (B) new InternalHistogram.Bucket(key, docCount, formatter, aggregations);
protected B createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
return (B) new InternalHistogram.Bucket(key, docCount, keyed, formatter, aggregations);
}
protected B createEmptyBucket(boolean keyed, @Nullable ValueFormatter formatter) {
return (B) new InternalHistogram.Bucket(keyed, formatter);
}
@Override
@ -365,7 +412,9 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
int size = in.readVInt();
List<B> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
buckets.add(createBucket(in.readLong(), in.readVLong(), InternalAggregations.readAggregations(in), formatter));
B bucket = createEmptyBucket(keyed, formatter);
bucket.readFrom(in);
buckets.add(bucket);
}
this.buckets = buckets;
this.bucketsMap = null;
@ -383,9 +432,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
out.writeBoolean(keyed);
out.writeVInt(buckets.size());
for (B bucket : buckets) {
out.writeLong(bucket.key);
out.writeVLong(bucket.docCount);
bucket.aggregations.writeTo(out);
bucket.writeTo(out);
}
}
@ -397,7 +444,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
builder.startArray(CommonFields.BUCKETS);
}
for (B bucket : buckets) {
bucket.toXContent(builder, params, keyed, formatter);
bucket.toXContent(builder, params);
}
if (keyed) {
builder.endObject();

View File

@ -29,11 +29,16 @@ import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
@ -53,26 +58,50 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
}
};
private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
@Override
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
Bucket buckets = new Bucket(context.keyed(), context.formatter());
buckets.readFrom(in);
return buckets;
}
@Override
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
BucketStreamContext context = new BucketStreamContext();
context.formatter(bucket.formatter);
context.keyed(bucket.keyed);
return context;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}
public static class Bucket implements Range.Bucket {
private final ValueFormatter formatter;
private final double from;
private final double to;
private final long docCount;
final InternalAggregations aggregations;
private final String key;
protected transient final boolean keyed;
protected transient final ValueFormatter formatter;
private double from;
private double to;
private long docCount;
InternalAggregations aggregations;
private String key;
public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
public Bucket(boolean keyed, @Nullable ValueFormatter formatter) {
this.keyed = keyed;
this.formatter = formatter;
}
public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
this(keyed, formatter);
this.key = key != null ? key : generateKey(from, to, formatter);
this.from = from;
this.to = to;
this.docCount = docCount;
this.aggregations = aggregations;
this.formatter = formatter;
}
public String getKey() {
@ -116,10 +145,11 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
aggregationsList.add(range.aggregations);
}
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
return getFactory().createBucket(key, from, to, docCount, aggs, formatter);
return getFactory().createBucket(key, from, to, docCount, aggs, keyed, formatter);
}
void toXContent(XContentBuilder builder, Params params, @Nullable ValueFormatter formatter, boolean keyed) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (keyed) {
builder.startObject(key);
} else {
@ -141,6 +171,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
builder.field(CommonFields.DOC_COUNT, docCount);
aggregations.toXContentInternal(builder, params);
builder.endObject();
return builder;
}
protected String generateKey(double from, double to, @Nullable ValueFormatter formatter) {
@ -151,6 +182,15 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
return sb.toString();
}
@Override
public void readFrom(StreamInput in) throws IOException {
}
@Override
public void writeTo(StreamOutput out) throws IOException {
}
}
public static class Factory<B extends Bucket, R extends InternalRange<B>> {
@ -164,8 +204,8 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
}
public B createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
return (B) new Bucket(key, from, to, docCount, aggregations, formatter);
public B createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
return (B) new Bucket(key, from, to, docCount, aggregations, keyed, formatter);
}
}
@ -189,7 +229,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
}
@Override
public Collection<B> getBuckets() {
public List<B> getBuckets() {
return ranges;
}
@ -240,7 +280,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
List<B> ranges = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
String key = in.readOptionalString();
ranges.add(getFactory().createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(), InternalAggregations.readAggregations(in), formatter));
ranges.add(getFactory().createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(), InternalAggregations.readAggregations(in), keyed, formatter));
}
this.ranges = ranges;
this.rangeMap = null;
@ -269,7 +309,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
builder.startArray(CommonFields.BUCKETS);
}
for (B range : ranges) {
range.toXContent(builder, params, formatter, keyed);
range.toXContent(builder, params);
}
if (keyed) {
builder.endObject();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.bucket.range;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import java.util.Collection;
import java.util.List;
/**
* A {@code range} aggregation. Defines multiple buckets, each associated with a pre-defined value range of a field,
@ -47,7 +48,7 @@ public interface Range extends MultiBucketsAggregation {
/**
* Return the buckets of this range aggregation.
*/
Collection<? extends Bucket> getBuckets();
List<? extends Bucket> getBuckets();
@Override
Bucket getBucketByKey(String key);

View File

@ -200,7 +200,7 @@ public class RangeAggregator extends BucketsAggregator {
Range range = ranges[i];
final long bucketOrd = subBucketOrdinal(owningBucketOrdinal, i);
org.elasticsearch.search.aggregations.bucket.range.Range.Bucket bucket =
rangeFactory.createBucket(range.key, range.from, range.to, bucketDocCount(bucketOrd),bucketAggregations(bucketOrd), formatter);
rangeFactory.createBucket(range.key, range.from, range.to, bucketDocCount(bucketOrd),bucketAggregations(bucketOrd), keyed, formatter);
buckets.add(bucket);
}
// value source can be null in the case of unmapped fields
@ -214,7 +214,7 @@ public class RangeAggregator extends BucketsAggregator {
for (int i = 0; i < ranges.length; i++) {
Range range = ranges[i];
org.elasticsearch.search.aggregations.bucket.range.Range.Bucket bucket =
rangeFactory.createBucket(range.key, range.from, range.to, 0, subAggs, formatter);
rangeFactory.createBucket(range.key, range.from, range.to, 0, subAggs, keyed, formatter);
buckets.add(bucket);
}
// value source can be null in the case of unmapped fields
@ -273,7 +273,7 @@ public class RangeAggregator extends BucketsAggregator {
InternalAggregations subAggs = buildEmptySubAggregations();
List<org.elasticsearch.search.aggregations.bucket.range.Range.Bucket> buckets = new ArrayList<>(ranges.size());
for (RangeAggregator.Range range : ranges) {
buckets.add(factory.createBucket(range.key, range.from, range.to, 0, subAggs, formatter));
buckets.add(factory.createBucket(range.key, range.from, range.to, 0, subAggs, keyed, formatter));
}
return factory.create(name, buckets, formatter, keyed);
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.search.aggregations.bucket.range.Range;
import org.joda.time.DateTime;
import java.util.Collection;
import java.util.List;
/**
* A range aggregation on date values.
@ -36,7 +37,7 @@ public interface DateRange extends Range {
}
@Override
Collection<? extends DateRange.Bucket> getBuckets();
List<? extends Bucket> getBuckets();
@Override
DateRange.Bucket getBucketByKey(String key);

View File

@ -23,6 +23,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import org.elasticsearch.search.aggregations.bucket.range.InternalRange;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.joda.time.DateTime;
@ -47,20 +49,42 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
}
};
private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
@Override
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
Bucket buckets = new Bucket(context.keyed(), context.formatter());
buckets.readFrom(in);
return buckets;
}
@Override
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
BucketStreamContext context = new BucketStreamContext();
context.formatter(bucket.formatter());
context.keyed(bucket.keyed());
return context;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}
public static final Factory FACTORY = new Factory();
public static class Bucket extends InternalRange.Bucket implements DateRange.Bucket {
public Bucket(String key, double from, double to, long docCount, List<InternalAggregation> aggregations, ValueFormatter formatter) {
super(key, from, to, docCount, new InternalAggregations(aggregations), formatter);
public Bucket(boolean keyed, @Nullable ValueFormatter formatter) {
super(keyed, formatter);
}
public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) {
super(key, from, to, docCount, aggregations, formatter);
public Bucket(String key, double from, double to, long docCount, List<InternalAggregation> aggregations, boolean keyed, ValueFormatter formatter) {
super(key, from, to, docCount, new InternalAggregations(aggregations), keyed, formatter);
}
public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, ValueFormatter formatter) {
super(key, from, to, docCount, aggregations, keyed, formatter);
}
@Override
@ -77,6 +101,14 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
protected InternalRange.Factory<Bucket, ?> getFactory() {
return FACTORY;
}
boolean keyed() {
return keyed;
}
ValueFormatter formatter() {
return formatter;
}
}
private static class Factory extends InternalRange.Factory<InternalDateRange.Bucket, InternalDateRange> {
@ -92,8 +124,8 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
}
@Override
public Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) {
return new Bucket(key, from, to, docCount, aggregations, formatter);
public Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, ValueFormatter formatter) {
return new Bucket(key, from, to, docCount, aggregations, keyed, formatter);
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.bucket.range.geodistance;
import org.elasticsearch.search.aggregations.bucket.range.Range;
import java.util.Collection;
import java.util.List;
/**
* An aggregation that computes ranges of geo distances.
@ -34,7 +35,7 @@ public interface GeoDistance extends Range {
}
@Override
Collection<? extends GeoDistance.Bucket> getBuckets();
List<? extends Bucket> getBuckets();
@Override
GeoDistance.Bucket getBucketByKey(String key);

View File

@ -23,6 +23,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import org.elasticsearch.search.aggregations.bucket.range.InternalRange;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
@ -45,26 +47,56 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
}
};
private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
@Override
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
Bucket buckets = new Bucket(context.keyed(), context.formatter());
buckets.readFrom(in);
return buckets;
}
@Override
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
BucketStreamContext context = new BucketStreamContext();
context.formatter(bucket.formatter());
context.keyed(bucket.keyed());
return context;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}
public static final Factory FACTORY = new Factory();
static class Bucket extends InternalRange.Bucket implements GeoDistance.Bucket {
Bucket(String key, double from, double to, long docCount, List<InternalAggregation> aggregations, ValueFormatter formatter) {
this(key, from, to, docCount, new InternalAggregations(aggregations), formatter);
Bucket(boolean keyed, @Nullable ValueFormatter formatter) {
super(keyed, formatter);
}
Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) {
super(key, from, to, docCount, aggregations, formatter);
Bucket(String key, double from, double to, long docCount, List<InternalAggregation> aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
this(key, from, to, docCount, new InternalAggregations(aggregations), keyed, formatter);
}
Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
super(key, from, to, docCount, aggregations, keyed, formatter);
}
@Override
protected InternalRange.Factory<Bucket, ?> getFactory() {
return FACTORY;
}
boolean keyed() {
return keyed;
}
ValueFormatter formatter() {
return formatter;
}
}
private static class Factory extends InternalRange.Factory<InternalGeoDistance.Bucket, InternalGeoDistance> {
@ -80,8 +112,8 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
}
@Override
public Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
return new Bucket(key, from, to, docCount, aggregations, formatter);
public Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
return new Bucket(key, from, to, docCount, aggregations, keyed, formatter);
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.bucket.range.ipv4;
import org.elasticsearch.search.aggregations.bucket.range.Range;
import java.util.Collection;
import java.util.List;
/**
* A range aggregation on ipv4 values.
@ -36,7 +37,7 @@ public interface IPv4Range extends Range {
}
@Override
Collection<? extends IPv4Range.Bucket> getBuckets();
List<? extends Bucket> getBuckets();
@Override
IPv4Range.Bucket getBucketByKey(String key);

View File

@ -23,6 +23,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import org.elasticsearch.search.aggregations.bucket.range.InternalRange;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
@ -47,20 +49,41 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
}
};
private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
@Override
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
Bucket buckets = new Bucket(context.keyed());
buckets.readFrom(in);
return buckets;
}
@Override
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
BucketStreamContext context = new BucketStreamContext();
context.keyed(bucket.keyed());
return context;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}
public static final Factory FACTORY = new Factory();
public static class Bucket extends InternalRange.Bucket implements IPv4Range.Bucket {
public Bucket(String key, double from, double to, long docCount, List<InternalAggregation> aggregations) {
super(key, from, to, docCount, new InternalAggregations(aggregations), ValueFormatter.IPv4);
public Bucket(boolean keyed) {
super(keyed, ValueFormatter.IPv4);
}
public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations) {
super(key, from, to, docCount, aggregations, ValueFormatter.IPv4);
public Bucket(String key, double from, double to, long docCount, List<InternalAggregation> aggregations, boolean keyed) {
super(key, from, to, docCount, new InternalAggregations(aggregations), keyed, ValueFormatter.IPv4);
}
public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed) {
super(key, from, to, docCount, aggregations, keyed, ValueFormatter.IPv4);
}
@Override
@ -79,6 +102,10 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
protected InternalRange.Factory<Bucket, ?> getFactory() {
return FACTORY;
}
boolean keyed() {
return keyed;
}
}
private static class Factory extends InternalRange.Factory<InternalIPv4Range.Bucket, InternalIPv4Range> {
@ -94,8 +121,8 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
}
@Override
public Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
return new Bucket(key, from, to, docCount, aggregations);
public Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
return new Bucket(key, from, to, docCount, aggregations, keyed);
}
}

View File

@ -36,7 +36,7 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
protected SignificanceHeuristic significanceHeuristic;
protected int requiredSize;
protected long minDocCount;
protected Collection<Bucket> buckets;
protected List<Bucket> buckets;
protected Map<String, Bucket> bucketMap;
protected long subsetSize;
protected long supersetSize;
@ -50,6 +50,11 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
protected InternalAggregations aggregations;
double score;
protected Bucket(long subsetSize, long supersetSize) {
// for serialization
super(subsetSize, supersetSize);
}
protected Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) {
super(subsetDf, subsetSize, supersetDf, supersetSize);
this.aggregations = aggregations;
@ -110,7 +115,7 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
}
}
protected InternalSignificantTerms(long subsetSize, long supersetSize, String name, int requiredSize, long minDocCount, SignificanceHeuristic significanceHeuristic, Collection<Bucket> buckets) {
protected InternalSignificantTerms(long subsetSize, long supersetSize, String name, int requiredSize, long minDocCount, SignificanceHeuristic significanceHeuristic, List<Bucket> buckets) {
super(name);
this.requiredSize = requiredSize;
this.minDocCount = minDocCount;
@ -127,9 +132,9 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
}
@Override
public Collection<SignificantTerms.Bucket> getBuckets() {
public List<SignificantTerms.Bucket> getBuckets() {
Object o = buckets;
return (Collection<SignificantTerms.Bucket>) o;
return (List<SignificantTerms.Bucket>) o;
}
@Override

View File

@ -27,6 +27,8 @@ import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicStreams;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
@ -34,8 +36,9 @@ import org.elasticsearch.search.aggregations.support.format.ValueFormatterStream
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
@ -53,16 +56,46 @@ public class SignificantLongTerms extends InternalSignificantTerms {
}
};
private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
@Override
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
Bucket buckets = new Bucket((long) context.attributes().get("subsetSize"), (long) context.attributes().get("supersetSize"),
context.formatter());
buckets.readFrom(in);
return buckets;
}
@Override
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
BucketStreamContext context = new BucketStreamContext();
Map<String, Object> attributes = new HashMap<>();
attributes.put("subsetSize", bucket.subsetSize);
attributes.put("supersetSize", bucket.supersetSize);
context.attributes(attributes);
context.formatter(bucket.formatter);
return context;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}
static class Bucket extends InternalSignificantTerms.Bucket {
long term;
private transient final ValueFormatter formatter;
public Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations) {
public Bucket(long subsetSize, long supersetSize, @Nullable ValueFormatter formatter) {
super(subsetSize, supersetSize);
this.formatter = formatter;
// for serialization
}
public Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations);
this.formatter = formatter;
this.term = term;
}
@ -88,9 +121,39 @@ public class SignificantLongTerms extends InternalSignificantTerms {
@Override
Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) {
return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations);
return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations, formatter);
}
@Override
public void readFrom(StreamInput in) throws IOException {
subsetDf = in.readVLong();
supersetDf = in.readVLong();
term = in.readLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(subsetDf);
out.writeVLong(supersetDf);
out.writeLong(term);
aggregations.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(CommonFields.KEY, term);
if (formatter != null) {
builder.field(CommonFields.KEY_AS_STRING, formatter.format(term));
}
builder.field(CommonFields.DOC_COUNT, getDocCount());
builder.field("score", score);
builder.field("bg_count", supersetDf);
aggregations.toXContentInternal(builder, params);
builder.endObject();
return builder;
}
}
private ValueFormatter formatter;
@ -99,7 +162,7 @@ public class SignificantLongTerms extends InternalSignificantTerms {
} // for serialization
public SignificantLongTerms(long subsetSize, long supersetSize, String name, @Nullable ValueFormatter formatter,
int requiredSize, long minDocCount, SignificanceHeuristic significanceHeuristic, Collection<InternalSignificantTerms.Bucket> buckets) {
int requiredSize, long minDocCount, SignificanceHeuristic significanceHeuristic, List<InternalSignificantTerms.Bucket> buckets) {
super(subsetSize, supersetSize, name, requiredSize, minDocCount, significanceHeuristic, buckets);
this.formatter = formatter;
@ -129,12 +192,10 @@ public class SignificantLongTerms extends InternalSignificantTerms {
int size = in.readVInt();
List<InternalSignificantTerms.Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
long subsetDf = in.readVLong();
long supersetDf = in.readVLong();
long term = in.readLong();
Bucket readBucket = new Bucket(subsetDf, subsetSize, supersetDf,supersetSize, term, InternalAggregations.readAggregations(in));
readBucket.updateScore(significanceHeuristic);
buckets.add(readBucket);
Bucket bucket = new Bucket(subsetSize, supersetSize, formatter);
bucket.readFrom(in);
bucket.updateScore(significanceHeuristic);
buckets.add(bucket);
}
this.buckets = buckets;
this.bucketMap = null;
@ -153,10 +214,7 @@ public class SignificantLongTerms extends InternalSignificantTerms {
}
out.writeVInt(buckets.size());
for (InternalSignificantTerms.Bucket bucket : buckets) {
out.writeVLong(((Bucket) bucket).subsetDf);
out.writeVLong(((Bucket) bucket).supersetDf);
out.writeLong(((Bucket) bucket).term);
((InternalAggregations) bucket.getAggregations()).writeTo(out);
bucket.writeTo(out);
}
}
@ -165,16 +223,7 @@ public class SignificantLongTerms extends InternalSignificantTerms {
builder.field("doc_count", subsetSize);
builder.startArray(CommonFields.BUCKETS);
for (InternalSignificantTerms.Bucket bucket : buckets) {
builder.startObject();
builder.field(CommonFields.KEY, ((Bucket) bucket).term);
if (formatter != null) {
builder.field(CommonFields.KEY_AS_STRING, formatter.format(((Bucket) bucket).term));
}
builder.field(CommonFields.DOC_COUNT, bucket.getDocCount());
builder.field("score", bucket.score);
builder.field("bg_count", bucket.supersetDf);
((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params);
builder.endObject();
bucket.toXContent(builder, params);
}
builder.endArray();
return builder;

View File

@ -69,7 +69,7 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
SignificantLongTerms.Bucket spare = null;
for (long i = 0; i < bucketOrds.size(); i++) {
if (spare == null) {
spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null);
spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, formatter);
}
spare.term = bucketOrds.get(i);
spare.subsetDf = bucketDocCount(i);

View File

@ -29,13 +29,16 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicStreams;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
@ -53,6 +56,30 @@ public class SignificantStringTerms extends InternalSignificantTerms {
}
};
private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
@Override
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
Bucket buckets = new Bucket((long) context.attributes().get("subsetSize"), (long) context.attributes().get("supersetSize"));
buckets.readFrom(in);
return buckets;
}
@Override
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
BucketStreamContext context = new BucketStreamContext();
Map<String, Object> attributes = new HashMap<>();
attributes.put("subsetSize", bucket.subsetSize);
attributes.put("supersetSize", bucket.supersetSize);
context.attributes(attributes);
return context;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
@ -61,6 +88,10 @@ public class SignificantStringTerms extends InternalSignificantTerms {
BytesRef termBytes;
public Bucket(long subsetSize, long supersetSize) {
// for serialization
super(subsetSize, supersetSize);
}
public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) {
super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations);
@ -92,12 +123,40 @@ public class SignificantStringTerms extends InternalSignificantTerms {
Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) {
return new Bucket(termBytes, subsetDf, subsetSize, supersetDf, supersetSize, aggregations);
}
@Override
public void readFrom(StreamInput in) throws IOException {
termBytes = in.readBytesRef();
subsetDf = in.readVLong();
supersetDf = in.readVLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBytesRef(termBytes);
out.writeVLong(subsetDf);
out.writeVLong(supersetDf);
aggregations.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.utf8Field(CommonFields.KEY, termBytes);
builder.field(CommonFields.DOC_COUNT, getDocCount());
builder.field("score", score);
builder.field("bg_count", supersetDf);
aggregations.toXContentInternal(builder, params);
builder.endObject();
return builder;
}
}
SignificantStringTerms() {} // for serialization
public SignificantStringTerms(long subsetSize, long supersetSize, String name, int requiredSize,
long minDocCount, SignificanceHeuristic significanceHeuristic, Collection<InternalSignificantTerms.Bucket> buckets) {
long minDocCount, SignificanceHeuristic significanceHeuristic, List<InternalSignificantTerms.Bucket> buckets) {
super(subsetSize, supersetSize, name, requiredSize, minDocCount, significanceHeuristic, buckets);
}
@ -123,12 +182,10 @@ public class SignificantStringTerms extends InternalSignificantTerms {
int size = in.readVInt();
List<InternalSignificantTerms.Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
BytesRef term = in.readBytesRef();
long subsetDf = in.readVLong();
long supersetDf = in.readVLong();
Bucket readBucket = new Bucket(term, subsetDf, subsetSize, supersetDf, supersetSize, InternalAggregations.readAggregations(in));
readBucket.updateScore(significanceHeuristic);
buckets.add(readBucket);
Bucket bucket = new Bucket(subsetSize, supersetSize);
bucket.readFrom(in);
bucket.updateScore(significanceHeuristic);
buckets.add(bucket);
}
this.buckets = buckets;
this.bucketMap = null;
@ -146,10 +203,7 @@ public class SignificantStringTerms extends InternalSignificantTerms {
}
out.writeVInt(buckets.size());
for (InternalSignificantTerms.Bucket bucket : buckets) {
out.writeBytesRef(((Bucket) bucket).termBytes);
out.writeVLong(((Bucket) bucket).subsetDf);
out.writeVLong(((Bucket) bucket).supersetDf);
((InternalAggregations) bucket.getAggregations()).writeTo(out);
bucket.writeTo(out);
}
}
@ -161,13 +215,7 @@ public class SignificantStringTerms extends InternalSignificantTerms {
//There is a condition (presumably when only one shard has a bucket?) where reduce is not called
// and I end up with buckets that contravene the user's min_doc_count criteria in my reducer
if (bucket.subsetDf >= minDocCount) {
builder.startObject();
builder.utf8Field(CommonFields.KEY, ((Bucket) bucket).termBytes);
builder.field(CommonFields.DOC_COUNT, bucket.getDocCount());
builder.field("score", bucket.score);
builder.field("bg_count", bucket.supersetDf);
((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params);
builder.endObject();
bucket.toXContent(builder, params);
}
}
builder.endArray();

View File

@ -20,7 +20,7 @@ package org.elasticsearch.search.aggregations.bucket.significant;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import java.util.Collection;
import java.util.List;
/**
* An aggregation that collects significant terms in comparison to a background set.
@ -35,14 +35,18 @@ public interface SignificantTerms extends MultiBucketsAggregation, Iterable<Sign
long supersetDf;
long supersetSize;
Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize) {
super();
this.subsetDf = subsetDf;
protected Bucket(long subsetSize, long supersetSize) {
// for serialization
this.subsetSize = subsetSize;
this.supersetDf = supersetDf;
this.supersetSize = supersetSize;
}
Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize) {
this(subsetSize, supersetSize);
this.subsetDf = subsetDf;
this.supersetDf = supersetDf;
}
public abstract Number getKeyAsNumber();
abstract int compareTerm(SignificantTerms.Bucket other);
@ -68,7 +72,7 @@ public interface SignificantTerms extends MultiBucketsAggregation, Iterable<Sign
}
@Override
Collection<Bucket> getBuckets();
List<Bucket> getBuckets();
@Override
Bucket getBucketByKey(String key);

View File

@ -38,7 +38,7 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms {
public static final Type TYPE = new Type("significant_terms", "umsigterms");
private static final Collection<Bucket> BUCKETS = Collections.emptyList();
private static final List<Bucket> BUCKETS = Collections.emptyList();
private static final Map<String, Bucket> BUCKETS_MAP = Collections.emptyMap();
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {

View File

@ -27,12 +27,16 @@ import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
@ -50,16 +54,40 @@ public class DoubleTerms extends InternalTerms {
}
};
private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
@Override
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
Bucket buckets = new Bucket(context.formatter(), (boolean) context.attributes().get("showDocCountError"));
buckets.readFrom(in);
return buckets;
}
@Override
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
BucketStreamContext context = new BucketStreamContext();
Map<String, Object> attributes = new HashMap<>();
attributes.put("showDocCountError", bucket.showDocCountError);
context.attributes(attributes);
context.formatter(bucket.formatter);
return context;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}
static class Bucket extends InternalTerms.Bucket {
double term;
public Bucket(double term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError) {
super(docCount, aggregations, showDocCountError, docCountError);
public Bucket(@Nullable ValueFormatter formatter, boolean showDocCountError) {
super(formatter, showDocCountError);
}
public Bucket(double term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError, @Nullable ValueFormatter formatter) {
super(docCount, aggregations, showDocCountError, docCountError, formatter);
this.term = term;
}
@ -90,7 +118,44 @@ public class DoubleTerms extends InternalTerms {
@Override
Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) {
return new Bucket(term, docCount, aggs, showDocCountError, docCountError);
return new Bucket(term, docCount, aggs, showDocCountError, docCountError, formatter);
}
@Override
public void readFrom(StreamInput in) throws IOException {
term = in.readDouble();
docCount = in.readVLong();
docCountError = -1;
if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showDocCountError) {
docCountError = in.readLong();
}
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeDouble(term);
out.writeVLong(getDocCount());
if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showDocCountError) {
out.writeLong(docCountError);
}
aggregations.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(CommonFields.KEY, term);
if (formatter != null && formatter != ValueFormatter.RAW) {
builder.field(CommonFields.KEY_AS_STRING, formatter.format(term));
}
builder.field(CommonFields.DOC_COUNT, getDocCount());
if (showDocCountError) {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, getDocCountError());
}
aggregations.toXContentInternal(builder, params);
builder.endObject();
return builder;
}
}
@ -135,14 +200,9 @@ public class DoubleTerms extends InternalTerms {
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
double term = in.readDouble();
long docCount = in.readVLong();
long bucketDocCountError = -1;
if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showTermDocCountError) {
bucketDocCountError = in.readLong();
}
InternalAggregations aggregations = InternalAggregations.readAggregations(in);
buckets.add(new Bucket(term, docCount, aggregations, showTermDocCountError, bucketDocCountError));
Bucket bucket = new Bucket(formatter, showTermDocCountError);
bucket.readFrom(in);
buckets.add(bucket);
}
this.buckets = buckets;
this.bucketMap = null;
@ -164,12 +224,7 @@ public class DoubleTerms extends InternalTerms {
out.writeVLong(minDocCount);
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
out.writeDouble(((Bucket) bucket).term);
out.writeVLong(bucket.getDocCount());
if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showTermDocCountError) {
out.writeLong(bucket.docCountError);
}
((InternalAggregations) bucket.getAggregations()).writeTo(out);
bucket.writeTo(out);
}
}
@ -178,17 +233,7 @@ public class DoubleTerms extends InternalTerms {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
builder.startArray(CommonFields.BUCKETS);
for (InternalTerms.Bucket bucket : buckets) {
builder.startObject();
builder.field(CommonFields.KEY, ((Bucket) bucket).term);
if (formatter != null && formatter != ValueFormatter.RAW) {
builder.field(CommonFields.KEY_AS_STRING, formatter.format(((Bucket) bucket).term));
}
builder.field(CommonFields.DOC_COUNT, bucket.getDocCount());
if (showTermDocCountError) {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, bucket.getDocCountError());
}
((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params);
builder.endObject();
bucket.toXContent(builder, params);
}
builder.endArray();
return builder;

View File

@ -62,7 +62,7 @@ public class DoubleTermsAggregator extends LongTermsAggregator {
private static DoubleTerms.Bucket convertToDouble(InternalTerms.Bucket bucket) {
final long term = bucket.getKeyAsNumber().longValue();
final double value = NumericUtils.sortableLongToDouble(term);
return new DoubleTerms.Bucket(value, bucket.docCount, bucket.aggregations, bucket.showDocCountError, bucket.docCountError);
return new DoubleTerms.Bucket(value, bucket.docCount, bucket.aggregations, bucket.showDocCountError, bucket.docCountError, bucket.formatter);
}
private static DoubleTerms convertToDouble(LongTerms terms) {

View File

@ -28,10 +28,14 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LongBitSet;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.fielddata.AbstractRandomAccessOrds;
import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalMapping;
import org.elasticsearch.search.aggregations.Aggregator;
@ -201,7 +205,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
long globalOrd;
OrdBucket(long globalOrd, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError) {
super(docCount, aggregations, showDocCountError, docCountError);
super(docCount, aggregations, showDocCountError, docCountError, null);
this.globalOrd = globalOrd;
}
@ -234,6 +238,21 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
public Number getKeyAsNumber() {
throw new UnsupportedOperationException();
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
throw new UnsupportedOperationException();
}
}
private static interface Collector {

View File

@ -22,12 +22,14 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import java.util.*;
@ -46,12 +48,18 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
protected long docCountError;
protected InternalAggregations aggregations;
protected boolean showDocCountError;
transient final ValueFormatter formatter;
protected Bucket(@Nullable ValueFormatter formatter, boolean showDocCountError) {
// for serialization
this.showDocCountError = showDocCountError;
this.formatter = formatter;
}
protected Bucket(long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError) {
protected Bucket(long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError, @Nullable ValueFormatter formatter) {
this(formatter, showDocCountError);
this.docCount = docCount;
this.aggregations = aggregations;
this.showDocCountError = showDocCountError;
this.docCountError = docCountError;
}

View File

@ -27,12 +27,16 @@ import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
@ -50,17 +54,40 @@ public class LongTerms extends InternalTerms {
}
};
private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
@Override
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
Bucket buckets = new Bucket(context.formatter(), (boolean) context.attributes().get("showDocCountError"));
buckets.readFrom(in);
return buckets;
}
@Override
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
BucketStreamContext context = new BucketStreamContext();
Map<String, Object> attributes = new HashMap<>();
attributes.put("showDocCountError", bucket.showDocCountError);
context.attributes(attributes);
context.formatter(bucket.formatter);
return context;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}
static class Bucket extends InternalTerms.Bucket {
long term;
public Bucket(long term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError) {
super(docCount, aggregations, showDocCountError, docCountError);
public Bucket(@Nullable ValueFormatter formatter, boolean showDocCountError) {
super(formatter, showDocCountError);
}
public Bucket(long term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError, @Nullable ValueFormatter formatter) {
super(docCount, aggregations, showDocCountError, docCountError, formatter);
this.term = term;
}
@ -91,7 +118,44 @@ public class LongTerms extends InternalTerms {
@Override
Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) {
return new Bucket(term, docCount, aggs, showDocCountError, docCountError);
return new Bucket(term, docCount, aggs, showDocCountError, docCountError, formatter);
}
@Override
public void readFrom(StreamInput in) throws IOException {
term = in.readLong();
docCount = in.readVLong();
docCountError = -1;
if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showDocCountError) {
docCountError = in.readLong();
}
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(term);
out.writeVLong(getDocCount());
if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showDocCountError) {
out.writeLong(docCountError);
}
aggregations.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(CommonFields.KEY, term);
if (formatter != null && formatter != ValueFormatter.RAW) {
builder.field(CommonFields.KEY_AS_STRING, formatter.format(term));
}
builder.field(CommonFields.DOC_COUNT, getDocCount());
if (showDocCountError) {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, getDocCountError());
}
aggregations.toXContentInternal(builder, params);
builder.endObject();
return builder;
}
}
@ -136,14 +200,9 @@ public class LongTerms extends InternalTerms {
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
long term = in.readLong();
long docCount = in.readVLong();
long bucketDocCountError = -1;
if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showTermDocCountError) {
bucketDocCountError = in.readLong();
}
InternalAggregations aggregations = InternalAggregations.readAggregations(in);
buckets.add(new Bucket(term, docCount, aggregations, showTermDocCountError, bucketDocCountError));
Bucket bucket = new Bucket(formatter, showTermDocCountError);
bucket.readFrom(in);
buckets.add(bucket);
}
this.buckets = buckets;
this.bucketMap = null;
@ -165,12 +224,7 @@ public class LongTerms extends InternalTerms {
out.writeVLong(minDocCount);
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
out.writeLong(((Bucket) bucket).term);
out.writeVLong(bucket.getDocCount());
if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showTermDocCountError) {
out.writeLong(bucket.docCountError);
}
((InternalAggregations) bucket.getAggregations()).writeTo(out);
bucket.writeTo(out);
}
}
@ -179,17 +233,7 @@ public class LongTerms extends InternalTerms {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
builder.startArray(CommonFields.BUCKETS);
for (InternalTerms.Bucket bucket : buckets) {
builder.startObject();
builder.field(CommonFields.KEY, ((Bucket) bucket).term);
if (formatter != null && formatter != ValueFormatter.RAW) {
builder.field(CommonFields.KEY_AS_STRING, formatter.format(((Bucket) bucket).term));
}
builder.field(CommonFields.DOC_COUNT, bucket.getDocCount());
if (showTermDocCountError) {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, bucket.getDocCountError());
}
((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params);
builder.endObject();
bucket.toXContent(builder, params);
}
builder.endArray();
return builder;

View File

@ -126,7 +126,7 @@ public class LongTermsAggregator extends TermsAggregator {
LongTerms.Bucket spare = null;
for (long i = 0; i < bucketOrds.size(); i++) {
if (spare == null) {
spare = new LongTerms.Bucket(0, 0, null, showTermDocCountError, 0);
spare = new LongTerms.Bucket(0, 0, null, showTermDocCountError, 0, formatter);
}
spare.term = bucketOrds.get(i);
spare.docCount = bucketDocCount(i);

View File

@ -29,10 +29,14 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
@ -50,8 +54,27 @@ public class StringTerms extends InternalTerms {
}
};
private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
@Override
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
Bucket buckets = new Bucket((boolean) context.attributes().get("showDocCountError"));
buckets.readFrom(in);
return buckets;
}
@Override
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
BucketStreamContext context = new BucketStreamContext();
Map<String, Object> attributes = new HashMap<>();
attributes.put("showDocCountError", bucket.showDocCountError);
context.attributes(attributes);
return context;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}
@ -59,8 +82,12 @@ public class StringTerms extends InternalTerms {
BytesRef termBytes;
public Bucket(boolean showDocCountError) {
super(null, showDocCountError);
}
public Bucket(BytesRef term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError) {
super(docCount, aggregations, showDocCountError, docCountError);
super(docCount, aggregations, showDocCountError, docCountError, null);
this.termBytes = term;
}
@ -94,6 +121,40 @@ public class StringTerms extends InternalTerms {
Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) {
return new Bucket(termBytes, docCount, aggs, showDocCountError, docCountError);
}
@Override
public void readFrom(StreamInput in) throws IOException {
termBytes = in.readBytesRef();
docCount = in.readVLong();
docCountError = -1;
if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showDocCountError) {
docCountError = in.readLong();
}
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBytesRef(termBytes);
out.writeVLong(getDocCount());
if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showDocCountError) {
out.writeLong(docCountError);
}
aggregations.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.utf8Field(CommonFields.KEY, termBytes);
builder.field(CommonFields.DOC_COUNT, getDocCount());
if (showDocCountError) {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, getDocCountError());
}
aggregations.toXContentInternal(builder, params);
builder.endObject();
return builder;
}
}
StringTerms() {} // for serialization
@ -133,14 +194,9 @@ public class StringTerms extends InternalTerms {
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
BytesRef termBytes = in.readBytesRef();
long docCount = in.readVLong();
long bucketDocCountError = -1;
if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showTermDocCountError) {
bucketDocCountError = in.readLong();
}
InternalAggregations aggregations = InternalAggregations.readAggregations(in);
buckets.add(new Bucket(termBytes, docCount, aggregations, showTermDocCountError, bucketDocCountError));
Bucket bucket = new Bucket(showTermDocCountError);
bucket.readFrom(in);
buckets.add(bucket);
}
this.buckets = buckets;
this.bucketMap = null;
@ -161,12 +217,7 @@ public class StringTerms extends InternalTerms {
out.writeVLong(minDocCount);
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
out.writeBytesRef(((Bucket) bucket).termBytes);
out.writeVLong(bucket.getDocCount());
if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1) && showTermDocCountError) {
out.writeLong(bucket.docCountError);
}
((InternalAggregations) bucket.getAggregations()).writeTo(out);
bucket.writeTo(out);
}
}
@ -175,14 +226,7 @@ public class StringTerms extends InternalTerms {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
builder.startArray(CommonFields.BUCKETS);
for (InternalTerms.Bucket bucket : buckets) {
builder.startObject();
builder.utf8Field(CommonFields.KEY, ((Bucket) bucket).termBytes);
builder.field(CommonFields.DOC_COUNT, bucket.getDocCount());
if (showTermDocCountError) {
builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, bucket.getDocCountError());
}
((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params);
builder.endObject();
bucket.toXContent(builder, params);
}
builder.endArray();
return builder;

View File

@ -99,7 +99,7 @@ public class SignificanceHeuristicTests extends ElasticsearchTestCase {
ArrayList<InternalSignificantTerms.Bucket> buckets = new ArrayList<>();
if (randomBoolean()) {
BytesRef term = new BytesRef("123.0");
buckets.add(new SignificantLongTerms.Bucket(1, 2, 3, 4, 123, InternalAggregations.EMPTY));
buckets.add(new SignificantLongTerms.Bucket(1, 2, 3, 4, 123, InternalAggregations.EMPTY, null));
sTerms[0] = new SignificantLongTerms(10, 20, "some_name", null, 1, 1, heuristic, buckets);
sTerms[1] = new SignificantLongTerms();
} else {