Reducers are now wired end-to-end into the agg framework

This commit is contained in:
Colin Goodheart-Smithe 2015-02-12 12:32:54 +00:00
parent 1e947c8d17
commit 55b82db346
13 changed files with 294 additions and 18 deletions

View File

@ -98,6 +98,8 @@ public class AggregationModule extends AbstractModule implements SpawnModules{
aggParsers.add(GeoBoundsParser.class); aggParsers.add(GeoBoundsParser.class);
aggParsers.add(ScriptedMetricParser.class); aggParsers.add(ScriptedMetricParser.class);
aggParsers.add(ChildrenParser.class); aggParsers.add(ChildrenParser.class);
// NOCOMMIT reducerParsers.add(FooParser.class);
} }
/** /**

View File

@ -18,6 +18,9 @@
*/ */
package org.elasticsearch.search.aggregations; package org.elasticsearch.search.aggregations;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -29,6 +32,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.reducers.Reducer; import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerStreams;
import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.aggregations.support.AggregationPath;
import java.io.IOException; import java.io.IOException;
@ -209,6 +213,11 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
public final void writeTo(StreamOutput out) throws IOException { public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name); out.writeString(name);
out.writeGenericValue(metaData); out.writeGenericValue(metaData);
out.writeVInt(reducers.size());
for (Reducer reducer : reducers) {
out.writeBytesReference(reducer.type().stream());
reducer.writeTo(out);
}
doWriteTo(out); doWriteTo(out);
} }
@ -217,6 +226,17 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
public final void readFrom(StreamInput in) throws IOException { public final void readFrom(StreamInput in) throws IOException {
name = in.readString(); name = in.readString();
metaData = in.readMap(); metaData = in.readMap();
int size = in.readVInt();
if (size == 0) {
reducers = ImmutableList.of();
} else {
reducers = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
BytesReference type = in.readBytesReference();
Reducer reducer = ReducerStreams.stream(type).readResult(in);
reducers.add(reducer);
}
}
doReadFrom(in); doReadFrom(in);
} }

View File

@ -165,7 +165,8 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
for (Map.Entry<String, List<InternalAggregation>> entry : aggByName.entrySet()) { for (Map.Entry<String, List<InternalAggregation>> entry : aggByName.entrySet()) {
List<InternalAggregation> aggregations = entry.getValue(); List<InternalAggregation> aggregations = entry.getValue();
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
reducedAggregations.add(first.doReduce(new InternalAggregation.ReduceContext(aggregations, context.bigArrays(), context.scriptService()))); reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, context.bigArrays(), context
.scriptService())));
} }
return new InternalAggregations(reducedAggregations); return new InternalAggregations(reducedAggregations);
} }

View File

@ -186,6 +186,14 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
out.writeVLong(docCount); out.writeVLong(docCount);
aggregations.writeTo(out); aggregations.writeTo(out);
} }
public ValueFormatter getFormatter() {
return formatter;
}
public boolean getKeyed() {
return keyed;
}
} }
static class EmptyBucketInfo { static class EmptyBucketInfo {
@ -224,7 +232,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
} }
static class Factory<B extends InternalHistogram.Bucket> { public static class Factory<B extends InternalHistogram.Bucket> {
protected Factory() { protected Factory() {
} }
@ -283,7 +291,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
return buckets; return buckets;
} }
protected Factory<B> getFactory() { public Factory<B> getFactory() {
return factory; return factory;
} }

View File

@ -71,7 +71,7 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms {
public InternalAggregation doReduce(ReduceContext reduceContext) { public InternalAggregation doReduce(ReduceContext reduceContext) {
for (InternalAggregation aggregation : reduceContext.aggregations()) { for (InternalAggregation aggregation : reduceContext.aggregations()) {
if (!(aggregation instanceof UnmappedSignificantTerms)) { if (!(aggregation instanceof UnmappedSignificantTerms)) {
return aggregation.doReduce(reduceContext); return aggregation.reduce(reduceContext);
} }
} }
return this; return this;

View File

@ -86,7 +86,7 @@ public class UnmappedTerms extends InternalTerms {
public InternalAggregation doReduce(ReduceContext reduceContext) { public InternalAggregation doReduce(ReduceContext reduceContext) {
for (InternalAggregation agg : reduceContext.aggregations()) { for (InternalAggregation agg : reduceContext.aggregations()) {
if (!(agg instanceof UnmappedTerms)) { if (!(agg instanceof UnmappedTerms)) {
return agg.doReduce(reduceContext); return agg.reduce(reduceContext);
} }
} }
return this; return this;

View File

@ -18,9 +18,6 @@
*/ */
package org.elasticsearch.search.aggregations.metrics.tophits; package org.elasticsearch.search.aggregations.metrics.tophits;
import java.io.IOException;
import java.util.List;
import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort; import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
@ -35,9 +32,14 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchHits;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/** /**
*/ */
public class InternalTopHits extends InternalMetricsAggregation implements TopHits { public class InternalTopHits extends InternalMetricsAggregation implements TopHits {
@ -65,16 +67,17 @@ public class InternalTopHits extends InternalMetricsAggregation implements TopHi
InternalTopHits() { InternalTopHits() {
} }
public InternalTopHits(String name, int from, int size, TopDocs topDocs, InternalSearchHits searchHits) { public InternalTopHits(String name, int from, int size, TopDocs topDocs, InternalSearchHits searchHits, List<Reducer> reducers,
this.name = name; Map<String, Object> metaData) {
super(name, reducers, metaData);
this.from = from; this.from = from;
this.size = size; this.size = size;
this.topDocs = topDocs; this.topDocs = topDocs;
this.searchHits = searchHits; this.searchHits = searchHits;
} }
public InternalTopHits(String name, InternalSearchHits searchHits) { public InternalTopHits(String name, InternalSearchHits searchHits, List<Reducer> reducers, Map<String, Object> metaData) {
this.name = name; super(name, reducers, metaData);
this.searchHits = searchHits; this.searchHits = searchHits;
this.topDocs = Lucene.EMPTY_TOP_DOCS; this.topDocs = Lucene.EMPTY_TOP_DOCS;
} }
@ -123,7 +126,8 @@ public class InternalTopHits extends InternalMetricsAggregation implements TopHi
} while (shardDocs[scoreDoc.shardIndex].scoreDocs[position] != scoreDoc); } while (shardDocs[scoreDoc.shardIndex].scoreDocs[position] != scoreDoc);
hits[i] = (InternalSearchHit) shardHits[scoreDoc.shardIndex].getAt(position); hits[i] = (InternalSearchHit) shardHits[scoreDoc.shardIndex].getAt(position);
} }
return new InternalTopHits(name, new InternalSearchHits(hits, reducedTopDocs.totalHits, reducedTopDocs.getMaxScore())); return new InternalTopHits(name, new InternalSearchHits(hits, reducedTopDocs.totalHits, reducedTopDocs.getMaxScore()),
reducers(), getMetaData());
} catch (IOException e) { } catch (IOException e) {
throw ExceptionsHelper.convertToElastic(e); throw ExceptionsHelper.convertToElastic(e);
} }

View File

@ -158,13 +158,15 @@ public class TopHitsAggregator extends MetricsAggregator {
searchHitFields.sortValues(fieldDoc.fields); searchHitFields.sortValues(fieldDoc.fields);
} }
} }
return new InternalTopHits(name, subSearchContext.from(), subSearchContext.size(), topDocs, fetchResult.hits()); return new InternalTopHits(name, subSearchContext.from(), subSearchContext.size(), topDocs, fetchResult.hits(), reducers(),
metaData());
} }
} }
@Override @Override
public InternalAggregation buildEmptyAggregation() { public InternalAggregation buildEmptyAggregation() {
return new InternalTopHits(name, subSearchContext.from(), subSearchContext.size(), Lucene.EMPTY_TOP_DOCS, InternalSearchHits.empty()); return new InternalTopHits(name, subSearchContext.from(), subSearchContext.size(), Lucene.EMPTY_TOP_DOCS,
InternalSearchHits.empty(), reducers(), metaData());
} }
@Override @Override

View File

@ -0,0 +1,103 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class InternalSimpleValue extends InternalNumericMetricsAggregation.SingleValue implements SimpleValue {
public final static Type TYPE = new Type("simple_value");
public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalSimpleValue readResult(StreamInput in) throws IOException {
InternalSimpleValue result = new InternalSimpleValue();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
private double value;
InternalSimpleValue() {} // for serialization
public InternalSimpleValue(String name, double value, @Nullable ValueFormatter formatter, List<Reducer> reducers, Map<String, Object> metaData) {
super(name, reducers, metaData);
this.valueFormatter = formatter;
this.value = value;
}
@Override
public double value() {
return value;
}
public double getValue() {
return value;
}
@Override
public Type type() {
return TYPE;
}
@Override
public InternalMax doReduce(ReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
valueFormatter = ValueFormatterStreams.readOptional(in);
value = in.readDouble();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
ValueFormatterStreams.writeOptional(valueFormatter, out);
out.writeDouble(value);
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
boolean hasValue = !Double.isInfinite(value);
builder.field(CommonFields.VALUE, hasValue ? value : null);
if (hasValue && valueFormatter != null) {
builder.field(CommonFields.VALUE_AS_STRING, valueFormatter.format(value));
}
return builder;
}
}

View File

@ -19,14 +19,19 @@
package org.elasticsearch.search.aggregations.reducers; package org.elasticsearch.search.aggregations.reducers;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
public abstract class Reducer { public abstract class Reducer implements Streamable {
/** /**
* Parses the reducer request and creates the appropriate reducer factory * Parses the reducer request and creates the appropriate reducer factory
@ -58,6 +63,44 @@ public abstract class Reducer {
} }
protected String name;
protected Map<String, Object> metaData;
protected Reducer() { // for Serialisation
}
protected Reducer(String name, Map<String, Object> metaData) {
this.name = name;
this.metaData = metaData;
}
public String name() {
return name;
}
public Map<String, Object> metaData() {
return metaData;
}
public abstract Type type();
public abstract InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext); public abstract InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext);
@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeMap(metaData);
doWriteTo(out);
}
protected abstract void doWriteTo(StreamOutput out) throws IOException;
@Override
public final void readFrom(StreamInput in) throws IOException {
name = in.readString();
metaData = in.readMap();
doReadFrom(in);
}
protected abstract void doReadFrom(StreamInput in) throws IOException;
} }

View File

@ -18,7 +18,6 @@
*/ */
package org.elasticsearch.search.aggregations.reducers; package org.elasticsearch.search.aggregations.reducers;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.AggregationContext;
@ -28,7 +27,7 @@ import java.util.Map;
/** /**
* A factory that knows how to create an {@link Aggregator} of a specific type. * A factory that knows how to create an {@link Aggregator} of a specific type.
*/ */
public abstract class ReducerFactory implements Streamable { public abstract class ReducerFactory {
protected String name; protected String name;
protected String type; protected String type;

View File

@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
/**
* A registry for all the dedicated streams in the aggregation module. This is to support dynamic addAggregation that
* know how to stream themselves.
*/
public class ReducerStreams {
private static ImmutableMap<BytesReference, Stream> streams = ImmutableMap.of();
/**
* A stream that knows how to read an aggregation from the input.
*/
public static interface Stream {
Reducer readResult(StreamInput in) throws IOException;
}
/**
* Registers the given stream and associate it with the given types.
*
* @param stream The streams to register
* @param types The types associated with the streams
*/
public static synchronized void registerStream(Stream stream, BytesReference... types) {
MapBuilder<BytesReference, Stream> uStreams = MapBuilder.newMapBuilder(streams);
for (BytesReference type : types) {
uStreams.put(type, stream);
}
streams = uStreams.immutableMap();
}
/**
* Returns the stream that is registered for the given type
*
* @param type The given type
* @return The associated stream
*/
public static Stream stream(BytesReference type) {
return streams.get(type);
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
public interface SimpleValue extends NumericMetricsAggregation.SingleValue {
}