From 3a777545de9df45d688171c942c2234660c8a9b7 Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Thu, 12 Feb 2015 15:36:02 +0000 Subject: [PATCH] derivative reducer now works with both date_histogram and histogram --- .../bucket/histogram/InternalDateHistogram.java | 12 ++++++++++-- .../bucket/histogram/InternalHistogram.java | 10 ++++++++-- .../search/aggregations/reducers/ReducerFactory.java | 2 +- .../reducers/derivative/DerivativeReducer.java | 7 ++++--- 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 9f9ad81c953..0457ad9e92c 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.bucket.histogram; import org.elasticsearch.common.Nullable; +import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.EmptyBucketInfo; @@ -83,8 +84,15 @@ public class InternalDateHistogram { } @Override - public InternalDateHistogram.Bucket createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) { - return new Bucket(key, docCount, aggregations, keyed, formatter, this); + public InternalDateHistogram.Bucket createBucket(Object key, long docCount, InternalAggregations aggregations, boolean keyed, + @Nullable ValueFormatter formatter) { + if (key instanceof Number) { + return new Bucket(((Number) key).longValue(), docCount, aggregations, keyed, formatter, this); + } else if (key instanceof DateTime) { + return new Bucket(((DateTime) key).getMillis(), docCount, aggregations, keyed, formatter, this); + } else { + throw new AggregationExecutionException("Expected key of type Number or DateTime but got [" + key + "]"); + } } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index 5c945afddf0..d5b3a1384f1 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.rounding.Rounding; import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -247,8 +248,13 @@ public class InternalHistogram extends Inter return new InternalHistogram<>(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, this, reducers, metaData); } - public B createBucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) { - return (B) new Bucket(key, docCount, keyed, formatter, this, aggregations); + public B createBucket(Object key, long docCount, InternalAggregations aggregations, boolean keyed, + @Nullable ValueFormatter formatter) { + if (key instanceof Number) { + return (B) new Bucket(((Number) key).longValue(), docCount, keyed, formatter, this, aggregations); + } else { + throw new AggregationExecutionException("Expected key of type Number but got [" + key + "]"); + } } protected B createEmptyBucket(boolean keyed, @Nullable ValueFormatter formatter) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerFactory.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerFactory.java index 4249cde2dc3..c4c6b304ba8 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerFactory.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerFactory.java @@ -49,7 +49,7 @@ public abstract class ReducerFactory { /** * Validates the state of this factory (makes sure the factory is properly configured) */ - public final void validate() { + public final void validate() { // NOCOMMIT hook in validation doValidate(); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java index 975ad809adf..76d3bd9b3ac 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java @@ -37,7 +37,6 @@ import org.elasticsearch.search.aggregations.reducers.ReducerFactory; import org.elasticsearch.search.aggregations.reducers.ReducerStreams; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.AggregationPath; -import org.joda.time.DateTime; import java.io.IOException; import java.util.ArrayList; @@ -89,6 +88,7 @@ public class DerivativeReducer extends Reducer { InternalHistogram.Factory factory = histo.getFactory(); List newBuckets = new ArrayList<>(); Double lastBucketValue = null; + // NOCOMMIT this needs to be improved so that the aggs are cloned correctly to ensure aggs are fully immutable. for (InternalHistogram.Bucket bucket : buckets) { double thisBucketValue = (double) bucket.getProperty(histo.getName(), AggregationPath.parse(bucketsPath) .getPathElementsAsStringList()); @@ -97,8 +97,9 @@ public class DerivativeReducer extends Reducer { List aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), FUNCTION)); aggs.add(new InternalSimpleValue(name(), diff, null, new ArrayList(), metaData())); // NOCOMMIT implement formatter for derivative reducer - InternalHistogram.Bucket newBucket = factory.createBucket(((DateTime) bucket.getKey()).getMillis(), bucket.getDocCount(), - new InternalAggregations(aggs), bucket.getKeyed(), bucket.getFormatter()); // NOCOMMIT fix key resolution to deal with numbers and dates + InternalHistogram.Bucket newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), + new InternalAggregations( + aggs), bucket.getKeyed(), bucket.getFormatter()); newBuckets.add(newBucket); } else { newBuckets.add(bucket);