From 55b82db34638fa15470da2bbf71b4e98861d1203 Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Thu, 12 Feb 2015 12:32:54 +0000 Subject: [PATCH] Reducers are now wired end-to-end into the agg framework --- .../aggregations/AggregationModule.java | 2 + .../aggregations/InternalAggregation.java | 20 ++++ .../aggregations/InternalAggregations.java | 3 +- .../bucket/histogram/InternalHistogram.java | 12 +- .../significant/UnmappedSignificantTerms.java | 2 +- .../bucket/terms/UnmappedTerms.java | 2 +- .../metrics/tophits/InternalTopHits.java | 20 ++-- .../metrics/tophits/TopHitsAggregator.java | 6 +- .../reducers/InternalSimpleValue.java | 103 ++++++++++++++++++ .../search/aggregations/reducers/Reducer.java | 45 +++++++- .../aggregations/reducers/ReducerFactory.java | 3 +- .../aggregations/reducers/ReducerStreams.java | 68 ++++++++++++ .../aggregations/reducers/SimpleValue.java | 26 +++++ 13 files changed, 294 insertions(+), 18 deletions(-) create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/InternalSimpleValue.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerStreams.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/SimpleValue.java diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java index 3910f096246..cb4bef6ca34 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java @@ -98,6 +98,8 @@ public class AggregationModule extends AbstractModule implements SpawnModules{ aggParsers.add(GeoBoundsParser.class); aggParsers.add(ScriptedMetricParser.class); aggParsers.add(ChildrenParser.class); + + // NOCOMMIT reducerParsers.add(FooParser.class); } /** diff --git a/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index 828a1a7ee0f..fb621ea5103 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -18,6 +18,9 @@ */ package org.elasticsearch.search.aggregations; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; @@ -29,6 +32,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.aggregations.reducers.Reducer; +import org.elasticsearch.search.aggregations.reducers.ReducerStreams; import org.elasticsearch.search.aggregations.support.AggregationPath; import java.io.IOException; @@ -209,6 +213,11 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St public final void writeTo(StreamOutput out) throws IOException { out.writeString(name); out.writeGenericValue(metaData); + out.writeVInt(reducers.size()); + for (Reducer reducer : reducers) { + out.writeBytesReference(reducer.type().stream()); + reducer.writeTo(out); + } doWriteTo(out); } @@ -217,6 +226,17 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St public final void readFrom(StreamInput in) throws IOException { name = in.readString(); metaData = in.readMap(); + int size = in.readVInt(); + if (size == 0) { + reducers = ImmutableList.of(); + } else { + reducers = Lists.newArrayListWithCapacity(size); + for (int i = 0; i < size; i++) { + BytesReference type = in.readBytesReference(); + Reducer reducer = ReducerStreams.stream(type).readResult(in); + reducers.add(reducer); + } + } doReadFrom(in); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index ec4625e2387..c41e8a4ff77 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -165,7 +165,8 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl for (Map.Entry> entry : aggByName.entrySet()) { List aggregations = entry.getValue(); InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand - reducedAggregations.add(first.doReduce(new InternalAggregation.ReduceContext(aggregations, context.bigArrays(), context.scriptService()))); + reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, context.bigArrays(), context + .scriptService()))); } return new InternalAggregations(reducedAggregations); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index a33cdb49b3c..5c945afddf0 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -186,6 +186,14 @@ public class InternalHistogram extends Inter out.writeVLong(docCount); aggregations.writeTo(out); } + + public ValueFormatter getFormatter() { + return formatter; + } + + public boolean getKeyed() { + return keyed; + } } static class EmptyBucketInfo { @@ -224,7 +232,7 @@ public class InternalHistogram extends Inter } - static class Factory { + public static class Factory { protected Factory() { } @@ -283,7 +291,7 @@ public class InternalHistogram extends Inter return buckets; } - protected Factory getFactory() { + public Factory getFactory() { return factory; } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java index f382237dacf..04099009272 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java @@ -71,7 +71,7 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms { public InternalAggregation doReduce(ReduceContext reduceContext) { for (InternalAggregation aggregation : reduceContext.aggregations()) { if (!(aggregation instanceof UnmappedSignificantTerms)) { - return aggregation.doReduce(reduceContext); + return aggregation.reduce(reduceContext); } } return this; diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index 82c850bcac7..89134a394ec 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -86,7 +86,7 @@ public class UnmappedTerms extends InternalTerms { public InternalAggregation doReduce(ReduceContext reduceContext) { for (InternalAggregation agg : reduceContext.aggregations()) { if (!(agg instanceof UnmappedTerms)) { - return agg.doReduce(reduceContext); + return agg.reduce(reduceContext); } } return this; diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/InternalTopHits.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/InternalTopHits.java index 8c5eafa2961..b3e4c5cf4c9 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/InternalTopHits.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/InternalTopHits.java @@ -18,9 +18,6 @@ */ package org.elasticsearch.search.aggregations.metrics.tophits; -import java.io.IOException; -import java.util.List; - import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; import org.apache.lucene.search.TopDocs; @@ -35,9 +32,14 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation; +import org.elasticsearch.search.aggregations.reducers.Reducer; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; +import java.io.IOException; +import java.util.List; +import java.util.Map; + /** */ public class InternalTopHits extends InternalMetricsAggregation implements TopHits { @@ -65,16 +67,17 @@ public class InternalTopHits extends InternalMetricsAggregation implements TopHi InternalTopHits() { } - public InternalTopHits(String name, int from, int size, TopDocs topDocs, InternalSearchHits searchHits) { - this.name = name; + public InternalTopHits(String name, int from, int size, TopDocs topDocs, InternalSearchHits searchHits, List reducers, + Map metaData) { + super(name, reducers, metaData); this.from = from; this.size = size; this.topDocs = topDocs; this.searchHits = searchHits; } - public InternalTopHits(String name, InternalSearchHits searchHits) { - this.name = name; + public InternalTopHits(String name, InternalSearchHits searchHits, List reducers, Map metaData) { + super(name, reducers, metaData); this.searchHits = searchHits; this.topDocs = Lucene.EMPTY_TOP_DOCS; } @@ -123,7 +126,8 @@ public class InternalTopHits extends InternalMetricsAggregation implements TopHi } while (shardDocs[scoreDoc.shardIndex].scoreDocs[position] != scoreDoc); hits[i] = (InternalSearchHit) shardHits[scoreDoc.shardIndex].getAt(position); } - return new InternalTopHits(name, new InternalSearchHits(hits, reducedTopDocs.totalHits, reducedTopDocs.getMaxScore())); + return new InternalTopHits(name, new InternalSearchHits(hits, reducedTopDocs.totalHits, reducedTopDocs.getMaxScore()), + reducers(), getMetaData()); } catch (IOException e) { throw ExceptionsHelper.convertToElastic(e); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregator.java index 20aeaae2f5a..6abf1917c17 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregator.java @@ -158,13 +158,15 @@ public class TopHitsAggregator extends MetricsAggregator { searchHitFields.sortValues(fieldDoc.fields); } } - return new InternalTopHits(name, subSearchContext.from(), subSearchContext.size(), topDocs, fetchResult.hits()); + return new InternalTopHits(name, subSearchContext.from(), subSearchContext.size(), topDocs, fetchResult.hits(), reducers(), + metaData()); } } @Override public InternalAggregation buildEmptyAggregation() { - return new InternalTopHits(name, subSearchContext.from(), subSearchContext.size(), Lucene.EMPTY_TOP_DOCS, InternalSearchHits.empty()); + return new InternalTopHits(name, subSearchContext.from(), subSearchContext.size(), Lucene.EMPTY_TOP_DOCS, + InternalSearchHits.empty(), reducers(), metaData()); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/InternalSimpleValue.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/InternalSimpleValue.java new file mode 100644 index 00000000000..7d204c007c6 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/InternalSimpleValue.java @@ -0,0 +1,103 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers; + +import org.elasticsearch.common.inject.internal.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; +import org.elasticsearch.search.aggregations.metrics.max.InternalMax; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; +import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class InternalSimpleValue extends InternalNumericMetricsAggregation.SingleValue implements SimpleValue { + + public final static Type TYPE = new Type("simple_value"); + + public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public InternalSimpleValue readResult(StreamInput in) throws IOException { + InternalSimpleValue result = new InternalSimpleValue(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + } + + private double value; + + InternalSimpleValue() {} // for serialization + + public InternalSimpleValue(String name, double value, @Nullable ValueFormatter formatter, List reducers, Map metaData) { + super(name, reducers, metaData); + this.valueFormatter = formatter; + this.value = value; + } + + @Override + public double value() { + return value; + } + + public double getValue() { + return value; + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public InternalMax doReduce(ReduceContext reduceContext) { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + protected void doReadFrom(StreamInput in) throws IOException { + valueFormatter = ValueFormatterStreams.readOptional(in); + value = in.readDouble(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + ValueFormatterStreams.writeOptional(valueFormatter, out); + out.writeDouble(value); + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + boolean hasValue = !Double.isInfinite(value); + builder.field(CommonFields.VALUE, hasValue ? value : null); + if (hasValue && valueFormatter != null) { + builder.field(CommonFields.VALUE_AS_STRING, valueFormatter.format(value)); + } + return builder; + } +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/Reducer.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/Reducer.java index c74f6f0b0f2..d87d9fa72e1 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/Reducer.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/Reducer.java @@ -19,14 +19,19 @@ package org.elasticsearch.search.aggregations.reducers; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; +import java.util.Map; -public abstract class Reducer { +public abstract class Reducer implements Streamable { /** * Parses the reducer request and creates the appropriate reducer factory @@ -58,6 +63,44 @@ public abstract class Reducer { } + protected String name; + protected Map metaData; + + protected Reducer() { // for Serialisation + } + + protected Reducer(String name, Map metaData) { + this.name = name; + this.metaData = metaData; + } + + public String name() { + return name; + } + + public Map metaData() { + return metaData; + } + + public abstract Type type(); + public abstract InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext); + @Override + public final void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeMap(metaData); + doWriteTo(out); + } + + protected abstract void doWriteTo(StreamOutput out) throws IOException; + + @Override + public final void readFrom(StreamInput in) throws IOException { + name = in.readString(); + metaData = in.readMap(); + doReadFrom(in); + } + + protected abstract void doReadFrom(StreamInput in) throws IOException; } diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerFactory.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerFactory.java index 64e7d1c7baf..4249cde2dc3 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerFactory.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerFactory.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.search.aggregations.reducers; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.support.AggregationContext; @@ -28,7 +27,7 @@ import java.util.Map; /** * A factory that knows how to create an {@link Aggregator} of a specific type. */ -public abstract class ReducerFactory implements Streamable { +public abstract class ReducerFactory { protected String name; protected String type; diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerStreams.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerStreams.java new file mode 100644 index 00000000000..7a4319e0a2b --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerStreams.java @@ -0,0 +1,68 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.reducers; + +import com.google.common.collect.ImmutableMap; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +/** + * A registry for all the dedicated streams in the aggregation module. This is to support dynamic addAggregation that + * know how to stream themselves. + */ +public class ReducerStreams { + + private static ImmutableMap streams = ImmutableMap.of(); + + /** + * A stream that knows how to read an aggregation from the input. + */ + public static interface Stream { + Reducer readResult(StreamInput in) throws IOException; + } + + /** + * Registers the given stream and associate it with the given types. + * + * @param stream The streams to register + * @param types The types associated with the streams + */ + public static synchronized void registerStream(Stream stream, BytesReference... types) { + MapBuilder uStreams = MapBuilder.newMapBuilder(streams); + for (BytesReference type : types) { + uStreams.put(type, stream); + } + streams = uStreams.immutableMap(); + } + + /** + * Returns the stream that is registered for the given type + * + * @param type The given type + * @return The associated stream + */ + public static Stream stream(BytesReference type) { + return streams.get(type); + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/SimpleValue.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/SimpleValue.java new file mode 100644 index 00000000000..e1c510e1a29 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/SimpleValue.java @@ -0,0 +1,26 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers; + +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; + +public interface SimpleValue extends NumericMetricsAggregation.SingleValue { + +}