diff --git a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java index 52793f9bbfd..ce09d1e5c69 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations; import com.google.common.collect.ImmutableList; + import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.SpawnModules; @@ -27,7 +28,6 @@ import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; import org.elasticsearch.search.aggregations.bucket.filters.InternalFilters; import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid; import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal; -import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing; import org.elasticsearch.search.aggregations.bucket.nested.InternalNested; @@ -96,7 +96,6 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM InternalDateRange.registerStream(); InternalIPv4Range.registerStream(); InternalHistogram.registerStream(); - InternalDateHistogram.registerStream(); InternalGeoDistance.registerStream(); InternalNested.registerStream(); InternalReverseNested.registerStream(); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramParser.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramParser.java index 5335f997a7d..bfe26d2e04f 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramParser.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramParser.java @@ -19,9 +19,9 @@ package org.elasticsearch.search.aggregations.bucket.histogram; import com.google.common.collect.ImmutableMap; + import org.elasticsearch.common.ParseField; import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.joda.DateMathParser; import org.elasticsearch.common.rounding.DateTimeUnit; import org.elasticsearch.common.rounding.Rounding; import org.elasticsearch.common.rounding.TimeZoneRounding; @@ -199,7 +199,8 @@ public class DateHistogramParser implements Aggregator.Parser { .preOffset(preOffset).postOffset(postOffset) .build(); - return new HistogramAggregator.Factory(aggregationName, vsParser.config(), rounding, order, keyed, minDocCount, extendedBounds, InternalDateHistogram.FACTORY); + return new HistogramAggregator.Factory(aggregationName, vsParser.config(), rounding, order, keyed, minDocCount, extendedBounds, + new InternalDateHistogram.Factory()); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramParser.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramParser.java index 8dc2abbd900..b253d4fba7b 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramParser.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramParser.java @@ -132,7 +132,8 @@ public class HistogramParser implements Aggregator.Parser { extendedBounds.processAndValidate(aggregationName, context, ValueParser.RAW); } - return new HistogramAggregator.Factory(aggregationName, vsParser.config(), rounding, order, keyed, minDocCount, extendedBounds, InternalHistogram.FACTORY); + return new HistogramAggregator.Factory(aggregationName, vsParser.config(), rounding, order, keyed, minDocCount, extendedBounds, + new InternalHistogram.Factory()); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 338127e0c7d..63cab59ad6b 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -18,73 +18,33 @@ */ package org.elasticsearch.search.aggregations.bucket.histogram; -import com.carrotsearch.hppc.ObjectObjectOpenHashMap; - import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.bucket.BucketStreamContext; -import org.elasticsearch.search.aggregations.bucket.BucketStreams; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.EmptyBucketInfo; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; -import java.io.IOException; import java.util.List; import java.util.Map; /** * */ -public class InternalDateHistogram extends InternalHistogram { +public class InternalDateHistogram { final static Type TYPE = new Type("date_histogram", "dhisto"); - final static Factory FACTORY = new Factory(); - - private final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { - @Override - public InternalDateHistogram readResult(StreamInput in) throws IOException { - InternalDateHistogram histogram = new InternalDateHistogram(); - histogram.readFrom(in); - return histogram; - } - }; - - private final static BucketStreams.Stream BUCKET_STREAM = new BucketStreams.Stream() { - @Override - public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { - Bucket buckets = new Bucket(context.keyed(), context.formatter()); - buckets.readFrom(in); - return buckets; - } - - @Override - public BucketStreamContext getBucketStreamContext(Bucket bucket) { - BucketStreamContext context = new BucketStreamContext(); - context.formatter(bucket.formatter); - return context; - } - }; - - public static void registerStream() { - AggregationStreams.registerStream(STREAM, TYPE.stream()); - BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream()); - } static class Bucket extends InternalHistogram.Bucket { - Bucket(boolean keyed, @Nullable ValueFormatter formatter) { - super(keyed, formatter); + Bucket(boolean keyed, @Nullable ValueFormatter formatter, InternalHistogram.Factory factory) { + super(keyed, formatter, factory); } - Bucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) { - super(key, docCount, keyed, formatter, aggregations); - } - - @Override - protected InternalHistogram.Factory getFactory() { - return FACTORY; + Bucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter, + InternalHistogram.Factory factory) { + super(key, docCount, keyed, formatter, factory, aggregations); } @Override @@ -105,7 +65,7 @@ public class InternalDateHistogram extends InternalHistogram { - private Factory() { + Factory() { } @Override @@ -114,50 +74,21 @@ public class InternalDateHistogram extends InternalHistogram buckets, InternalOrder order, + public InternalHistogram create(String name, List buckets, InternalOrder order, long minDocCount, EmptyBucketInfo emptyBucketInfo, @Nullable ValueFormatter formatter, boolean keyed, Map metaData) { - return new InternalDateHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, metaData); + return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, this, metaData); } @Override public InternalDateHistogram.Bucket createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) { - return new Bucket(key, docCount, aggregations, keyed, formatter); + return new Bucket(key, docCount, aggregations, keyed, formatter, this); + } + + @Override + protected InternalDateHistogram.Bucket createEmptyBucket(boolean keyed, @Nullable ValueFormatter formatter) { + return new Bucket(keyed, formatter, this); } } - private ObjectObjectOpenHashMap bucketsMap; - - InternalDateHistogram() {} // for serialization - - InternalDateHistogram(String name, List buckets, InternalOrder order, long minDocCount, - EmptyBucketInfo emptyBucketInfo, @Nullable ValueFormatter formatter, boolean keyed, Map metaData) { - super(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, metaData); - } - - @Override - public Type type() { - return TYPE; - } - - @Override - protected InternalHistogram.Factory getFactory() { - return FACTORY; - } - - @Override - protected InternalDateHistogram.Bucket createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, ValueFormatter formatter) { - return new Bucket(key, docCount, aggregations, keyed, formatter); - } - - @Override - protected Bucket createEmptyBucket(boolean keyed, ValueFormatter formatter) { - return new Bucket(keyed, formatter); - } - - @Override - protected void doReadFrom(StreamInput in) throws IOException { - super.doReadFrom(in); - bucketsMap = null; // we need to reset this on read (as it's lazily created on demand) - } - + private InternalDateHistogram() {} } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index 2b49b403d2f..c7909442016 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -18,11 +18,11 @@ */ package org.elasticsearch.search.aggregations.bucket.histogram; -import com.carrotsearch.hppc.LongObjectOpenHashMap; import com.google.common.collect.Lists; import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -53,7 +53,6 @@ import java.util.Map; public class InternalHistogram extends InternalMultiBucketAggregation implements Histogram { final static Type TYPE = new Type("histogram", "histo"); - final static Factory FACTORY = new Factory(); private final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { @Override @@ -67,7 +66,11 @@ public class InternalHistogram extends Inter private final static BucketStreams.Stream BUCKET_STREAM = new BucketStreams.Stream() { @Override public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException { - Bucket histogram = new Bucket(context.keyed(), context.formatter()); + Factory factory = (Factory) context.attributes().get("factory"); + if (factory == null) { + throw new ElasticsearchIllegalStateException("No factory found for histogram buckets"); + } + Bucket histogram = new Bucket(context.keyed(), context.formatter(), factory); histogram.readFrom(in); return histogram; } @@ -94,21 +97,24 @@ public class InternalHistogram extends Inter InternalAggregations aggregations; private transient final boolean keyed; protected transient final @Nullable ValueFormatter formatter; + private Factory factory; - public Bucket(boolean keyed, @Nullable ValueFormatter formatter) { + public Bucket(boolean keyed, @Nullable ValueFormatter formatter, Factory factory) { this.formatter = formatter; this.keyed = keyed; + this.factory = factory; } - public Bucket(long key, long docCount, boolean keyed, @Nullable ValueFormatter formatter, InternalAggregations aggregations) { - this(keyed, formatter); + public Bucket(long key, long docCount, boolean keyed, @Nullable ValueFormatter formatter, Factory factory, + InternalAggregations aggregations) { + this(keyed, formatter, factory); this.key = key; this.docCount = docCount; this.aggregations = aggregations; } protected Factory getFactory() { - return FACTORY; + return factory; } @Override @@ -228,27 +234,32 @@ public class InternalHistogram extends Inter public InternalHistogram create(String name, List buckets, InternalOrder order, long minDocCount, EmptyBucketInfo emptyBucketInfo, @Nullable ValueFormatter formatter, boolean keyed, Map metaData) { - return new InternalHistogram<>(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, metaData); + return new InternalHistogram<>(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, this, metaData); } public B createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) { - return (B) new Bucket(key, docCount, keyed, formatter, aggregations); + return (B) new Bucket(key, docCount, keyed, formatter, this, aggregations); + } + + protected B createEmptyBucket(boolean keyed, @Nullable ValueFormatter formatter) { + return (B) new Bucket(keyed, formatter, this); } } protected List buckets; - private LongObjectOpenHashMap bucketsMap; private InternalOrder order; private @Nullable ValueFormatter formatter; private boolean keyed; private long minDocCount; private EmptyBucketInfo emptyBucketInfo; + protected Factory factory; InternalHistogram() {} // for serialization InternalHistogram(String name, List buckets, InternalOrder order, long minDocCount, - EmptyBucketInfo emptyBucketInfo, @Nullable ValueFormatter formatter, boolean keyed, Map metaData) { + EmptyBucketInfo emptyBucketInfo, + @Nullable ValueFormatter formatter, boolean keyed, Factory factory, Map metaData) { super(name, metaData); this.buckets = buckets; this.order = order; @@ -257,6 +268,7 @@ public class InternalHistogram extends Inter this.emptyBucketInfo = emptyBucketInfo; this.formatter = formatter; this.keyed = keyed; + this.factory = factory; } @Override @@ -270,7 +282,7 @@ public class InternalHistogram extends Inter } protected Factory getFactory() { - return FACTORY; + return factory; } private static class IteratorAndCurrent { @@ -356,7 +368,7 @@ public class InternalHistogram extends Inter long key = bounds.min; long max = bounds.max; while (key <= max) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); + iter.add(getFactory().createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); key = emptyBucketInfo.rounding.nextRoundingValue(key); } } @@ -365,7 +377,7 @@ public class InternalHistogram extends Inter long key = bounds.min; if (key < firstBucket.key) { while (key < firstBucket.key) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); + iter.add(getFactory().createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); key = emptyBucketInfo.rounding.nextRoundingValue(key); } } @@ -380,7 +392,7 @@ public class InternalHistogram extends Inter if (lastBucket != null) { long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key); while (key < nextBucket.key) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); + iter.add(getFactory().createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); key = emptyBucketInfo.rounding.nextRoundingValue(key); } assert key == nextBucket.key; @@ -393,7 +405,7 @@ public class InternalHistogram extends Inter long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key); long max = bounds.max; while (key <= max) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); + iter.add(getFactory().createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); key = emptyBucketInfo.rounding.nextRoundingValue(key); } } @@ -423,16 +435,9 @@ public class InternalHistogram extends Inter return getFactory().create(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, formatter, keyed, getMetaData()); } - protected B createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) { - return (B) new InternalHistogram.Bucket(key, docCount, keyed, formatter, aggregations); - } - - protected B createEmptyBucket(boolean keyed, @Nullable ValueFormatter formatter) { - return (B) new InternalHistogram.Bucket(keyed, formatter); - } - @Override protected void doReadFrom(StreamInput in) throws IOException { + this.factory = resolveFactory(in.readString()); order = InternalOrder.Streams.readOrder(in); minDocCount = in.readVLong(); if (minDocCount == 0) { @@ -443,16 +448,27 @@ public class InternalHistogram extends Inter int size = in.readVInt(); List buckets = new ArrayList<>(size); for (int i = 0; i < size; i++) { - B bucket = createEmptyBucket(keyed, formatter); + B bucket = getFactory().createEmptyBucket(keyed, formatter); bucket.readFrom(in); buckets.add(bucket); } this.buckets = buckets; - this.bucketsMap = null; + } + + @SuppressWarnings("unchecked") + private static Factory resolveFactory(String factoryType) { + if (factoryType.equals(InternalDateHistogram.TYPE.name())) { + return (Factory) new InternalDateHistogram.Factory(); + } else if (factoryType.equals(TYPE.name())) { + return new Factory<>(); + } else { + throw new ElasticsearchIllegalStateException("Invalid histogram factory type [" + factoryType + "]"); + } } @Override protected void doWriteTo(StreamOutput out) throws IOException { + out.writeString(factory.type()); InternalOrder.Streams.writeOrder(order, out); out.writeVLong(minDocCount); if (minDocCount == 0) {