From 9cfa6c6af7141bb662de64e979aeb79d385f4a69 Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Thu, 12 Feb 2015 13:22:22 +0000 Subject: [PATCH] Basic derivative reducer --- .../aggregations/AggregationModule.java | 3 +- .../TransportAggregationModule.java | 6 +- .../reducers/derivative/DerivativeParser.java | 69 +++++++++ .../derivative/DerivativeReducer.java | 138 ++++++++++++++++++ 4 files changed, 214 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeParser.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java index cb4bef6ca34..d1cb6d96800 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java @@ -56,6 +56,7 @@ import org.elasticsearch.search.aggregations.metrics.sum.SumParser; import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsParser; import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser; import org.elasticsearch.search.aggregations.reducers.Reducer; +import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser; import java.util.List; @@ -99,7 +100,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{ aggParsers.add(ScriptedMetricParser.class); aggParsers.add(ChildrenParser.class); - // NOCOMMIT reducerParsers.add(FooParser.class); + reducerParsers.add(DerivativeParser.class); } /** diff --git a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java index ce09d1e5c69..c99f885462c 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java @@ -57,6 +57,7 @@ import org.elasticsearch.search.aggregations.metrics.stats.extended.InternalExte import org.elasticsearch.search.aggregations.metrics.sum.InternalSum; import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits; import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount; +import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer; /** * A module that registers all the transport streams for the addAggregation @@ -89,7 +90,7 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM SignificantStringTerms.registerStreams(); SignificantLongTerms.registerStreams(); UnmappedSignificantTerms.registerStreams(); - InternalGeoHashGrid.registerStreams(); + InternalGeoHashGrid.registerStreams(); DoubleTerms.registerStreams(); UnmappedTerms.registerStreams(); InternalRange.registerStream(); @@ -102,6 +103,9 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM InternalTopHits.registerStreams(); InternalGeoBounds.registerStream(); InternalChildren.registerStream(); + + // Reducers + DerivativeReducer.registerStreams(); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeParser.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeParser.java new file mode 100644 index 00000000000..0e9b1f7f41f --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeParser.java @@ -0,0 +1,69 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.derivative; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.SearchParseException; +import org.elasticsearch.search.aggregations.reducers.Reducer; +import org.elasticsearch.search.aggregations.reducers.ReducerFactory; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; + +public class DerivativeParser implements Reducer.Parser { + + public static final ParseField BUCKETS_PATH = new ParseField("bucketsPath"); + + @Override + public String type() { + return DerivativeReducer.TYPE.name(); + } + + @Override + public ReducerFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException { + XContentParser.Token token; + String currentFieldName = null; + String bucketsPath = null; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if (BUCKETS_PATH.match(currentFieldName)) { + bucketsPath = parser.text(); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" + + currentFieldName + "]."); + } + } else { + throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "]."); + } + } + + if (bucketsPath == null) { + throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName() + + "] for derivative aggregation [" + reducerName + "]"); + } + + return new DerivativeReducer.Factory(reducerName, bucketsPath); + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java new file mode 100644 index 00000000000..d2cfae6784b --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java @@ -0,0 +1,138 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.derivative; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; +import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue; +import org.elasticsearch.search.aggregations.reducers.Reducer; +import org.elasticsearch.search.aggregations.reducers.ReducerFactory; +import org.elasticsearch.search.aggregations.reducers.ReducerStreams; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.AggregationPath; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class DerivativeReducer extends Reducer { + + public final static Type TYPE = new Type("derivative"); + + public final static ReducerStreams.Stream STREAM = new ReducerStreams.Stream() { + @Override + public DerivativeReducer readResult(StreamInput in) throws IOException { + DerivativeReducer result = new DerivativeReducer(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + ReducerStreams.registerStream(STREAM, TYPE.stream()); + } + + private String bucketsPath; + private static final Function FUNCTION = new Function() { + @Override + public InternalAggregation apply(Aggregation input) { + return (InternalAggregation) input; + } + }; + + public DerivativeReducer() { + } + + public DerivativeReducer(String name, String bucketsPath, Map metadata) { + super(name, metadata); + this.bucketsPath = bucketsPath; + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { + InternalHistogram histo = (InternalHistogram) aggregation; + List buckets = histo.getBuckets(); + InternalHistogram.Factory factory = histo.getFactory(); + List newBuckets = new ArrayList<>(); + Double lastBucketValue = null; + for (InternalHistogram.Bucket bucket : buckets) { + double thisBucketValue = (double) bucket.getProperty(histo.getName(), AggregationPath.parse(bucketsPath) + .getPathElementsAsStringList()); + if (lastBucketValue != null) { + double diff = thisBucketValue - lastBucketValue; + + List aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), FUNCTION)); + aggs.add(new InternalSimpleValue(bucketsPath, diff, null, new ArrayList(), metaData())); // NOCOMMIT implement formatter for derivative reducer + InternalHistogram.Bucket newBucket = factory.createBucket(((DateTime) bucket.getKey()).getMillis(), bucket.getDocCount(), + new InternalAggregations(aggs), bucket.getKeyed(), bucket.getFormatter()); // NOCOMMIT fix key resolution for dates + newBuckets.add(newBucket); + } else { + newBuckets.add(bucket); + } + lastBucketValue = thisBucketValue; + } + return factory.create(histo.getName(), newBuckets, null, 1, null, null, false, new ArrayList(), histo.getMetaData()); // NOCOMMIT get order, minDocCount, emptyBucketInfo etc. from histo + } + + @Override + public void doReadFrom(StreamInput in) throws IOException { + bucketsPath = in.readString(); + + } + + @Override + public void doWriteTo(StreamOutput out) throws IOException { + out.writeString(bucketsPath); + } + + public static class Factory extends ReducerFactory { + + private String bucketsPath; + + public Factory(String name, String field) { + super(name, TYPE.name()); + this.bucketsPath = field; + } + + @Override + protected Reducer createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket, + Map metaData) throws IOException { + return new DerivativeReducer(name, bucketsPath, metaData); + } + + } +}