Aggregations: Unify histogram implementations
This change makes InternalHistogram the only InternalAggregation used by the Histogram Aggregator. There is still a separate Bucket implementation and Factory implementation. All buckets are created through the factory passed into the InternalHistogram meaning and the correct factory implementation is serialised as part of the aggregation to make sure the correct bucket types are always generate. This is needed by the Transformers (namely the derivative transformer) to allow it to generate buckets of the right type without having to know what the underlying bucket implementation is.
This commit is contained in:
parent
1695f76f68
commit
29c24d75e7
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.inject.SpawnModules;
|
||||
|
@ -27,7 +28,6 @@ import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
|
|||
import org.elasticsearch.search.aggregations.bucket.filters.InternalFilters;
|
||||
import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing;
|
||||
import org.elasticsearch.search.aggregations.bucket.nested.InternalNested;
|
||||
|
@ -96,7 +96,6 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM
|
|||
InternalDateRange.registerStream();
|
||||
InternalIPv4Range.registerStream();
|
||||
InternalHistogram.registerStream();
|
||||
InternalDateHistogram.registerStream();
|
||||
InternalGeoDistance.registerStream();
|
||||
InternalNested.registerStream();
|
||||
InternalReverseNested.registerStream();
|
||||
|
|
|
@ -19,9 +19,9 @@
|
|||
package org.elasticsearch.search.aggregations.bucket.histogram;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.joda.DateMathParser;
|
||||
import org.elasticsearch.common.rounding.DateTimeUnit;
|
||||
import org.elasticsearch.common.rounding.Rounding;
|
||||
import org.elasticsearch.common.rounding.TimeZoneRounding;
|
||||
|
@ -199,7 +199,8 @@ public class DateHistogramParser implements Aggregator.Parser {
|
|||
.preOffset(preOffset).postOffset(postOffset)
|
||||
.build();
|
||||
|
||||
return new HistogramAggregator.Factory(aggregationName, vsParser.config(), rounding, order, keyed, minDocCount, extendedBounds, InternalDateHistogram.FACTORY);
|
||||
return new HistogramAggregator.Factory(aggregationName, vsParser.config(), rounding, order, keyed, minDocCount, extendedBounds,
|
||||
new InternalDateHistogram.Factory());
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -132,7 +132,8 @@ public class HistogramParser implements Aggregator.Parser {
|
|||
extendedBounds.processAndValidate(aggregationName, context, ValueParser.RAW);
|
||||
}
|
||||
|
||||
return new HistogramAggregator.Factory(aggregationName, vsParser.config(), rounding, order, keyed, minDocCount, extendedBounds, InternalHistogram.FACTORY);
|
||||
return new HistogramAggregator.Factory(aggregationName, vsParser.config(), rounding, order, keyed, minDocCount, extendedBounds,
|
||||
new InternalHistogram.Factory());
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -18,73 +18,33 @@
|
|||
*/
|
||||
package org.elasticsearch.search.aggregations.bucket.histogram;
|
||||
|
||||
import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
|
||||
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.EmptyBucketInfo;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class InternalDateHistogram extends InternalHistogram<InternalDateHistogram.Bucket> {
|
||||
public class InternalDateHistogram {
|
||||
|
||||
final static Type TYPE = new Type("date_histogram", "dhisto");
|
||||
final static Factory FACTORY = new Factory();
|
||||
|
||||
private final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
|
||||
@Override
|
||||
public InternalDateHistogram readResult(StreamInput in) throws IOException {
|
||||
InternalDateHistogram histogram = new InternalDateHistogram();
|
||||
histogram.readFrom(in);
|
||||
return histogram;
|
||||
}
|
||||
};
|
||||
|
||||
private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
|
||||
@Override
|
||||
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
|
||||
Bucket buckets = new Bucket(context.keyed(), context.formatter());
|
||||
buckets.readFrom(in);
|
||||
return buckets;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
|
||||
BucketStreamContext context = new BucketStreamContext();
|
||||
context.formatter(bucket.formatter);
|
||||
return context;
|
||||
}
|
||||
};
|
||||
|
||||
public static void registerStream() {
|
||||
AggregationStreams.registerStream(STREAM, TYPE.stream());
|
||||
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
|
||||
}
|
||||
|
||||
static class Bucket extends InternalHistogram.Bucket {
|
||||
|
||||
Bucket(boolean keyed, @Nullable ValueFormatter formatter) {
|
||||
super(keyed, formatter);
|
||||
Bucket(boolean keyed, @Nullable ValueFormatter formatter, InternalHistogram.Factory<Bucket> factory) {
|
||||
super(keyed, formatter, factory);
|
||||
}
|
||||
|
||||
Bucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
|
||||
super(key, docCount, keyed, formatter, aggregations);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalHistogram.Factory<Bucket> getFactory() {
|
||||
return FACTORY;
|
||||
Bucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter,
|
||||
InternalHistogram.Factory<Bucket> factory) {
|
||||
super(key, docCount, keyed, formatter, factory, aggregations);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -105,7 +65,7 @@ public class InternalDateHistogram extends InternalHistogram<InternalDateHistogr
|
|||
|
||||
static class Factory extends InternalHistogram.Factory<InternalDateHistogram.Bucket> {
|
||||
|
||||
private Factory() {
|
||||
Factory() {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -114,50 +74,21 @@ public class InternalDateHistogram extends InternalHistogram<InternalDateHistogr
|
|||
}
|
||||
|
||||
@Override
|
||||
public InternalDateHistogram create(String name, List<InternalDateHistogram.Bucket> buckets, InternalOrder order,
|
||||
public InternalHistogram create(String name, List<InternalDateHistogram.Bucket> buckets, InternalOrder order,
|
||||
long minDocCount, EmptyBucketInfo emptyBucketInfo, @Nullable ValueFormatter formatter, boolean keyed, Map<String, Object> metaData) {
|
||||
return new InternalDateHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, metaData);
|
||||
return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, this, metaData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalDateHistogram.Bucket createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
|
||||
return new Bucket(key, docCount, aggregations, keyed, formatter);
|
||||
return new Bucket(key, docCount, aggregations, keyed, formatter, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalDateHistogram.Bucket createEmptyBucket(boolean keyed, @Nullable ValueFormatter formatter) {
|
||||
return new Bucket(keyed, formatter, this);
|
||||
}
|
||||
}
|
||||
|
||||
private ObjectObjectOpenHashMap<String, InternalDateHistogram.Bucket> bucketsMap;
|
||||
|
||||
InternalDateHistogram() {} // for serialization
|
||||
|
||||
InternalDateHistogram(String name, List<InternalDateHistogram.Bucket> buckets, InternalOrder order, long minDocCount,
|
||||
EmptyBucketInfo emptyBucketInfo, @Nullable ValueFormatter formatter, boolean keyed, Map<String, Object> metaData) {
|
||||
super(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, metaData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalHistogram.Factory<Bucket> getFactory() {
|
||||
return FACTORY;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalDateHistogram.Bucket createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, ValueFormatter formatter) {
|
||||
return new Bucket(key, docCount, aggregations, keyed, formatter);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Bucket createEmptyBucket(boolean keyed, ValueFormatter formatter) {
|
||||
return new Bucket(keyed, formatter);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doReadFrom(StreamInput in) throws IOException {
|
||||
super.doReadFrom(in);
|
||||
bucketsMap = null; // we need to reset this on read (as it's lazily created on demand)
|
||||
}
|
||||
|
||||
private InternalDateHistogram() {}
|
||||
}
|
||||
|
|
|
@ -18,11 +18,11 @@
|
|||
*/
|
||||
package org.elasticsearch.search.aggregations.bucket.histogram;
|
||||
|
||||
import com.carrotsearch.hppc.LongObjectOpenHashMap;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.apache.lucene.util.PriorityQueue;
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -53,7 +53,6 @@ import java.util.Map;
|
|||
public class InternalHistogram<B extends InternalHistogram.Bucket> extends InternalMultiBucketAggregation implements Histogram {
|
||||
|
||||
final static Type TYPE = new Type("histogram", "histo");
|
||||
final static Factory FACTORY = new Factory();
|
||||
|
||||
private final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
|
||||
@Override
|
||||
|
@ -67,7 +66,11 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
|
||||
@Override
|
||||
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
|
||||
Bucket histogram = new Bucket(context.keyed(), context.formatter());
|
||||
Factory<?> factory = (Factory<?>) context.attributes().get("factory");
|
||||
if (factory == null) {
|
||||
throw new ElasticsearchIllegalStateException("No factory found for histogram buckets");
|
||||
}
|
||||
Bucket histogram = new Bucket(context.keyed(), context.formatter(), factory);
|
||||
histogram.readFrom(in);
|
||||
return histogram;
|
||||
}
|
||||
|
@ -94,21 +97,24 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
InternalAggregations aggregations;
|
||||
private transient final boolean keyed;
|
||||
protected transient final @Nullable ValueFormatter formatter;
|
||||
private Factory<?> factory;
|
||||
|
||||
public Bucket(boolean keyed, @Nullable ValueFormatter formatter) {
|
||||
public Bucket(boolean keyed, @Nullable ValueFormatter formatter, Factory<?> factory) {
|
||||
this.formatter = formatter;
|
||||
this.keyed = keyed;
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
public Bucket(long key, long docCount, boolean keyed, @Nullable ValueFormatter formatter, InternalAggregations aggregations) {
|
||||
this(keyed, formatter);
|
||||
public Bucket(long key, long docCount, boolean keyed, @Nullable ValueFormatter formatter, Factory factory,
|
||||
InternalAggregations aggregations) {
|
||||
this(keyed, formatter, factory);
|
||||
this.key = key;
|
||||
this.docCount = docCount;
|
||||
this.aggregations = aggregations;
|
||||
}
|
||||
|
||||
protected Factory<?> getFactory() {
|
||||
return FACTORY;
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -228,27 +234,32 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
|
||||
public InternalHistogram<B> create(String name, List<B> buckets, InternalOrder order, long minDocCount,
|
||||
EmptyBucketInfo emptyBucketInfo, @Nullable ValueFormatter formatter, boolean keyed, Map<String, Object> metaData) {
|
||||
return new InternalHistogram<>(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, metaData);
|
||||
return new InternalHistogram<>(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, this, metaData);
|
||||
}
|
||||
|
||||
public B createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
|
||||
return (B) new Bucket(key, docCount, keyed, formatter, aggregations);
|
||||
return (B) new Bucket(key, docCount, keyed, formatter, this, aggregations);
|
||||
}
|
||||
|
||||
protected B createEmptyBucket(boolean keyed, @Nullable ValueFormatter formatter) {
|
||||
return (B) new Bucket(keyed, formatter, this);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected List<B> buckets;
|
||||
private LongObjectOpenHashMap<B> bucketsMap;
|
||||
private InternalOrder order;
|
||||
private @Nullable ValueFormatter formatter;
|
||||
private boolean keyed;
|
||||
private long minDocCount;
|
||||
private EmptyBucketInfo emptyBucketInfo;
|
||||
protected Factory<B> factory;
|
||||
|
||||
InternalHistogram() {} // for serialization
|
||||
|
||||
InternalHistogram(String name, List<B> buckets, InternalOrder order, long minDocCount,
|
||||
EmptyBucketInfo emptyBucketInfo, @Nullable ValueFormatter formatter, boolean keyed, Map<String, Object> metaData) {
|
||||
EmptyBucketInfo emptyBucketInfo,
|
||||
@Nullable ValueFormatter formatter, boolean keyed, Factory<B> factory, Map<String, Object> metaData) {
|
||||
super(name, metaData);
|
||||
this.buckets = buckets;
|
||||
this.order = order;
|
||||
|
@ -257,6 +268,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
this.emptyBucketInfo = emptyBucketInfo;
|
||||
this.formatter = formatter;
|
||||
this.keyed = keyed;
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -270,7 +282,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
}
|
||||
|
||||
protected Factory<B> getFactory() {
|
||||
return FACTORY;
|
||||
return factory;
|
||||
}
|
||||
|
||||
private static class IteratorAndCurrent<B> {
|
||||
|
@ -356,7 +368,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
long key = bounds.min;
|
||||
long max = bounds.max;
|
||||
while (key <= max) {
|
||||
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
|
||||
iter.add(getFactory().createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
|
||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
||||
}
|
||||
}
|
||||
|
@ -365,7 +377,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
long key = bounds.min;
|
||||
if (key < firstBucket.key) {
|
||||
while (key < firstBucket.key) {
|
||||
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
|
||||
iter.add(getFactory().createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
|
||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
||||
}
|
||||
}
|
||||
|
@ -380,7 +392,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
if (lastBucket != null) {
|
||||
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
|
||||
while (key < nextBucket.key) {
|
||||
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
|
||||
iter.add(getFactory().createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
|
||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
||||
}
|
||||
assert key == nextBucket.key;
|
||||
|
@ -393,7 +405,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
|
||||
long max = bounds.max;
|
||||
while (key <= max) {
|
||||
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
|
||||
iter.add(getFactory().createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
|
||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
||||
}
|
||||
}
|
||||
|
@ -423,16 +435,9 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
return getFactory().create(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, formatter, keyed, getMetaData());
|
||||
}
|
||||
|
||||
protected B createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
|
||||
return (B) new InternalHistogram.Bucket(key, docCount, keyed, formatter, aggregations);
|
||||
}
|
||||
|
||||
protected B createEmptyBucket(boolean keyed, @Nullable ValueFormatter formatter) {
|
||||
return (B) new InternalHistogram.Bucket(keyed, formatter);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doReadFrom(StreamInput in) throws IOException {
|
||||
this.factory = resolveFactory(in.readString());
|
||||
order = InternalOrder.Streams.readOrder(in);
|
||||
minDocCount = in.readVLong();
|
||||
if (minDocCount == 0) {
|
||||
|
@ -443,16 +448,27 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
int size = in.readVInt();
|
||||
List<B> buckets = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
B bucket = createEmptyBucket(keyed, formatter);
|
||||
B bucket = getFactory().createEmptyBucket(keyed, formatter);
|
||||
bucket.readFrom(in);
|
||||
buckets.add(bucket);
|
||||
}
|
||||
this.buckets = buckets;
|
||||
this.bucketsMap = null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <B extends InternalHistogram.Bucket> Factory<B> resolveFactory(String factoryType) {
|
||||
if (factoryType.equals(InternalDateHistogram.TYPE.name())) {
|
||||
return (Factory<B>) new InternalDateHistogram.Factory();
|
||||
} else if (factoryType.equals(TYPE.name())) {
|
||||
return new Factory<>();
|
||||
} else {
|
||||
throw new ElasticsearchIllegalStateException("Invalid histogram factory type [" + factoryType + "]");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doWriteTo(StreamOutput out) throws IOException {
|
||||
out.writeString(factory.type());
|
||||
InternalOrder.Streams.writeOrder(order, out);
|
||||
out.writeVLong(minDocCount);
|
||||
if (minDocCount == 0) {
|
||||
|
|
Loading…
Reference in New Issue